Ho un programma Spark molto semplice (utilizzando Flambo in Clojure, ma dovrebbe essere facile da seguire). Questi sono tutti oggetti sulla JVM. Sto testando su un'istanza local
(anche se suppongo che Spark continui a serializzare e deserializzare).Utilizzo di JodaTime nel gruppo SparkByKey e countByKey
(let [dt (t/date-time 2014)
input (f/parallelize sc [{:the-date dt :x "A"}
{:the-date dt :x "B"}
{:the-date dt :x "C"}
{:the-date dt :x "D"}])
by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x])))
ingresso è un RDD di quattro tuple, ciascuna con lo stesso oggetto data. La prima mappa produce un RDD valore-chiave di data => x.
Il contenuto di input
è, come previsto:
=> (f/foreach input prn)
[#<DateTime 2014-01-01T00:00:00.000Z> "A"]
[#<DateTime 2014-01-01T00:00:00.000Z> "B"]
[#<DateTime 2014-01-01T00:00:00.000Z> "C"]
[#<DateTime 2014-01-01T00:00:00.000Z> "D"]
Giusto per essere chiari, l'uguaglianza e la .hashCode
lavoro sul oggetto data:
=> (= dt dt)
true
=> (.hashCode dt)
1260848926
=> (.hashCode dt)
1260848926
Sono esempi di JodaTime di DateTime, che implement equals as expected .
Quando provo countByKey
, ottengo l'atteso:
=> (f/count-by-key by-date)
{#<DateTime 2014-01-01T00:00:00.000Z> 4}
Ma quando ho groupByKey
, non sembra funzionare.
=> (f/foreach (f/group-by-key by-date) prn)
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]]
I tasti sono tutti uguali quindi mi aspetterei che il risultato sia una sola voce con la data come la chiave e ["A", "B", "C", "D"]
come valore. Sta succedendo qualcosa perché i valori sono tutti elenchi.
In qualche modo groupByKey
non equivale correttamente ai tasti. Ma è countByKey
. Qual è la differenza tra i due? Come posso far sì che si comportino allo stesso modo?
Qualche idea?
forse potresti usare il tempo storico in millis invece di oggetti data/ora. Sembra un'alternativa più sicura. Abbiamo avuto problemi con il keying dei dati con altri hash basati su memory-location, come le enumerazioni Java. Non funzionano su un ambiente distribuito. – maasg
Grazie, questo è quello che io (penso di me) suggerito con Instant. Buono a sapersi, non sono l'unico con questo problema! – Joe
Si sta utilizzando un mix eterogeneo di cronologie e fusi orari all'interno di un RDD? In caso contrario, manterrò tali informazioni al livello RDD e conserverò l'impronta di memoria di ciascun record (come si fa con 'Instant') – climbage