2014-12-10 14 views
7

Ho un programma spark ban. Ho ridotto l'input a un file con una riga al suo interno. Quindi sono fiducioso che questa non è la tradizionale pressione della memoria.KryoException: buffer overflow con input molto piccolo

Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296 
    at com.esotericsoftware.kryo.io.Output.require(Output.java:138) 
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) 
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
    at carbonite.serializer$write_map.invoke(serializer.clj:69) 

posso impostare spark.kryoserializer.buffer.mb, ma penso che sto solo rimandare il problema. Mi piacerebbe capirlo.

Non penso che ci sia qualcosa di non standard riguardo al programma. Se rimuovo una singola riga (apparentemente a caso) l'errore scompare.

Sembra che io stia colpendo una sorta di limite fisso. Ma il fatto che il mio file di input sia molto piccolo e le sole operazioni che sto facendo sono prevedibili maps e reduceByKey s sospetto che ci sia qualcos'altro.

Sto usando la libreria Flambo Clojure 0.4.0 (ma non penso che lo stia causando) e Spark Core 2.10.

Ecco l'esempio di funzionamento minimo. Scusa, è un po 'criptico ma ho rimosso tutto estraneo.

(ns mytest.core 
    (:require [flambo.conf :as conf]) 
    (:require [flambo.api :as f])) 

(def sc (f/spark-context (-> (conf/spark-conf) 
      (conf/master "local") 
      (conf/app-name "test") 
      (conf/set "spark.driver.memory" "1g") 
      (conf/set "spark.executor.memory" "1g")))) 

(defn -main 

    [& args] 
    (let [logfile (f/text-file sc "file://tmp/one-line-file") 
     a (f/map logfile (f/fn [u] nil)) 
     b (f/map logfile (f/fn [u] nil)) 
     c (f/map logfile (f/fn [u] nil)) 
     d (f/map logfile (f/fn [u] nil)) 
     e (f/map logfile (f/fn [u] nil)) 
     g (f/map logfile (f/fn [u] nil)) 
     h (f/map logfile (f/fn [u] nil)) 
     i (f/map logfile (f/fn [u] nil)) 
     j (f/map logfile (f/fn [u] nil)) 
     k (f/map logfile (f/fn [u] nil)) 
     l (f/map logfile (f/fn [u] nil)) 
     m (f/map logfile (f/fn [u] nil)) 
     n (f/map logfile (f/fn [u] nil)) 
     o (f/map logfile (f/fn [u] nil)) 
     p (f/map logfile (f/fn [u] nil)) 
     q (f/map logfile (f/fn [u] nil)) 
     r (f/map logfile (f/fn [u] nil)) 
     s (f/map logfile (f/fn [u] nil)) 
     t (f/map logfile (f/fn [u] nil)) 
])) 

EDIT

Se ho diviso questo in due pezzi e ricreare il flusso di file pigro, funziona:

(defn get-inputs [] 
    (f/text-file sc "file://tmp/one-line-file")) 

(defn -main 

    [& args] 
    (let [logfile (get-inputs) 
     a (f/map logfile (f/fn [u] nil)) 
     b (f/map logfile (f/fn [u] nil)) 
     c (f/map logfile (f/fn [u] nil)) 
     d (f/map logfile (f/fn [u] nil)) 
     e (f/map logfile (f/fn [u] nil)) 
     g (f/map logfile (f/fn [u] nil)) 
     h (f/map logfile (f/fn [u] nil)) 
     i (f/map logfile (f/fn [u] nil))]) 

    (let [logfile (get-inputs) 
     j (f/map logfile (f/fn [u] nil)) 
     k (f/map logfile (f/fn [u] nil)) 
     l (f/map logfile (f/fn [u] nil)) 
     m (f/map logfile (f/fn [u] nil)) 
     n (f/map logfile (f/fn [u] nil)) 
     o (f/map logfile (f/fn [u] nil)) 
     p (f/map logfile (f/fn [u] nil)) 
     q (f/map logfile (f/fn [u] nil)) 
     r (f/map logfile (f/fn [u] nil)) 
     s (f/map logfile (f/fn [u] nil)) 
     t (f/map logfile (f/fn [u] nil))])) 

In Java questo sarebbe l'equivalente di creare due ambiti locali (ad esempio due metodi separati). E get-inputs è solo un metodo che restituisce un oggetto file di testo di nuova costruzione.

Ho pensato che il metodo textFile creerebbe un flusso pigro che può essere (ri) letto più volte, quindi i due esempi non dovrebbero essere molto diversi.

risposta

1

aggiungere questo alla tua scintilla contesto conf:

conf.set("spark.kryoserializer.buffer.mb","128") 
+1

Grazie ho fatto, ma che non risponde alla mia domanda. Perché il buffer dovrebbe esaurire lo spazio con un singolo file? – Joe

+0

Forse perché il file è troppo grande. Un altro motivo potrebbe essere un ricordo/scritto su disco join. –

Problemi correlati