2013-07-06 3 views
6

Quindi il mio dati di input ha due campi/colonne: id1 & ID2, e il mio codice è il seguente:Scottatura: come conservare l'altro campo, dopo un groupBy ('campo) {. Dimensione}?

TextLine(args("input")) 
.read 
.mapTo('line->('id1,'id2)) {line: String => 
    val fields = line.split("\t") 
     (fields(0),fields(1)) 
} 
.groupBy('id2){.size} 
.write(Tsv(args("output"))) 

I risultati di uscita in (ciò che io presumo) due campi: id2 * dimensioni. Sono un po 'bloccato a scoprire se è possibile mantenere il valore id1 che è stato anche raggruppato con id2 e aggiungerlo come un altro campo?

risposta

8

Non puoi farlo in un modo carino, ho paura. Pensa a come funziona sotto il cofano: divide i dati da contare in blocchi e li invia a processi diversi, ogni processo conta il suo chunk, quindi un singolo riduttore li aggiunge tutti alla fine. Mentre ogni processo sta contando, non conosce l'intera dimensione, quindi non può aggiungere il campo. L'unico modo è tornare indietro e aggiungerlo ai dati una volta che l'intera dimensione è nota (cioè un join).

Se ogni gruppo si inserisce in memoria (ed è possibile configurare la memoria), è possibile:

Tsv(args("input"), ('id1, 'id2)) 
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list)) 
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) { 
    case (list, size) => list.map(record => (record._1, record._2, size)) 
} 
.write(Tsv(args("output"))) 

Ma se il vostro sistema non dispone di memoria sufficiente, si dovrà utilizzare un costoso aderire.

Nota: È possibile utilizzare Tsv anziché TextLine seguito da mapTo e splitting.

+0

Si prega di vedere se ha senso, sento lo stesso dolore. http://stackoverflow.com/questions/25994879/scalding-flatten-fields-after-groupby – Sergey

Problemi correlati