Ho due RDDs
in PySpark:Come aggiungere colonne di 2 RDDs a da un unico RDD e poi fare l'aggregazione di righe in base a dati aggiornati in PySpark
RDD1:
[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]
RDD2:
[(u'41',u'42.0'),(u'24',u'98.0'),....]
Sia RDDs
hanno lo stesso numero o righe. Ora quello che voglio fare è prendere tutte le colonne in ogni riga da RDD1 (convertito da unicode
a normale string
) e la seconda colonna da ogni riga in RDD2 (convertito da unicode string
a float
) e formare un nuovo RDD con quello. Così il nuovo RDD sarà simile a questa:
RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]
Una volta fatto ciò, allora voglio fare aggregation
dell'ultimo valore in ogni riga (il valore float) in questo nuovo RDD3
da il valore date
nella prima colonna. Questo include tutte le righe in cui date
è 2013-01-31 00:00:00
, devono essere aggiunti i loro ultimi valori numerici.
Come posso farlo in PySpark?
Non è una chiave di unirsi a loro, quindi penso che si dovrebbe zip ... –
@AlbertoBonsanto può mostrare come io posso fare questo? –
@AlbertoBonsanto restituirà 'rdd3 = izip (rdd1.toLocalIterator(), rdd2.toLocalIterator()) 'essere sufficiente? –