2015-01-27 14 views
5

Ho un programma Spark molto semplice (utilizzando Flambo in Clojure, ma dovrebbe essere facile da seguire). Questi sono tutti oggetti sulla JVM. Sto testando su un'istanza local (anche se suppongo che Spark continui a serializzare e deserializzare).Utilizzo di JodaTime nel gruppo SparkByKey e countByKey

(let [dt (t/date-time 2014) 
     input (f/parallelize sc [{:the-date dt :x "A"} 
           {:the-date dt :x "B"} 
           {:the-date dt :x "C"} 
           {:the-date dt :x "D"}]) 
     by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x]))) 

ingresso è un RDD di quattro tuple, ciascuna con lo stesso oggetto data. La prima mappa produce un RDD valore-chiave di data => x.

Il contenuto di input è, come previsto:

=> (f/foreach input prn) 
[#<DateTime 2014-01-01T00:00:00.000Z> "A"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "B"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "C"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "D"] 

Giusto per essere chiari, l'uguaglianza e la .hashCode lavoro sul oggetto data:

=> (= dt dt) 
true 
=> (.hashCode dt) 
1260848926 
=> (.hashCode dt) 
1260848926 

Sono esempi di JodaTime di DateTime, che implement equals as expected .

Quando provo countByKey, ottengo l'atteso:

=> (f/count-by-key by-date) 
{#<DateTime 2014-01-01T00:00:00.000Z> 4} 

Ma quando ho groupByKey, non sembra funzionare.

=> (f/foreach (f/group-by-key by-date) prn) 
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]] 

I tasti sono tutti uguali quindi mi aspetterei che il risultato sia una sola voce con la data come la chiave e ["A", "B", "C", "D"] come valore. Sta succedendo qualcosa perché i valori sono tutti elenchi.

In qualche modo groupByKey non equivale correttamente ai tasti. Ma è countByKey. Qual è la differenza tra i due? Come posso far sì che si comportino allo stesso modo?

Qualche idea?

risposta

3

Mi sto avvicinando a una risposta. Penso che questo appartenga alla sezione di risposta piuttosto che alla sezione di domanda.

Questo gruppo per chiave, si trasforma in una raccolta locale, estrae il primo elemento (data).

=> (def result-dates (map first (f/collect (f/group-by-key by-date)))) 
=> result-dates 
(#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z>) 

I codici hash sono tutti uguali

=> (map #(.hashCode %) result-dates) 
(1260848926 
1260848926 
1260848926 
1260848926) 

Le millisecondi sono tutti uguali:

=> (map #(.getMillis %) result-dates) 
(1388534400000 
1388534400000 
1388534400000 
1388534400000) 

equals fallisce, ma isEquals riesce

=> (.isEqual (first result-dates) (second result-dates)) 
true 

=> (.equals (first result-dates) (second result-dates)) 
false 

documentation for .equals says:

confronta questo oggetto con l'oggetto specificato per l'uguaglianza base al millisecondo immediata e la cronologia

loro millisecondi sono tutte uguali e loro Chronologies sembrano essere:

=> (map #(.getChronology %) result-dates) 
(#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]>) 

Tuttavia, le cronologie non corrispondono a.

=> (def a (first result-dates)) 
=> (def b (second result-dates)) 

=> (= (.getChronology a) (.getChronology b)) 
false 

Sebbene i codici hash fanno

=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b))) 
true 

Ma joda.time.Chronology non fornisce its own equals method e eredita da oggetto, che usa solo la parità riferimento.

La mia teoria è che queste date vengono tutte deserializzate con i propri oggetti di cronologia individuali, diversi e costruiti, ma JodaTime ha its own serializer che probabilmente si occupa di questo. Forse un serializzatore personalizzato Kryo sarebbe di aiuto in questo senso.

Per ora, la mia soluzione a utilizzare JodaTime in Spark è quello di utilizzare org.joda.time .Instant chiamando toInstant, o un java.util.Date piuttosto che un org.joda.time.DateTime.

Entrambi comportano l'eliminazione di informazioni sul fuso orario, che non è l'ideale, quindi se qualcuno ha più informazioni sarebbe molto gradito!

+0

forse potresti usare il tempo storico in millis invece di oggetti data/ora. Sembra un'alternativa più sicura. Abbiamo avuto problemi con il keying dei dati con altri hash basati su memory-location, come le enumerazioni Java. Non funzionano su un ambiente distribuito. – maasg

+0

Grazie, questo è quello che io (penso di me) suggerito con Instant. Buono a sapersi, non sono l'unico con questo problema! – Joe

+0

Si sta utilizzando un mix eterogeneo di cronologie e fusi orari all'interno di un RDD? In caso contrario, manterrò tali informazioni al livello RDD e conserverò l'impronta di memoria di ciascun record (come si fa con 'Instant') – climbage

Problemi correlati