2015-12-07 7 views
5

Ho due RDDs in PySpark:Come aggiungere colonne di 2 RDDs a da un unico RDD e poi fare l'aggregazione di righe in base a dati aggiornati in PySpark

RDD1:

[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....] 

RDD2:

[(u'41',u'42.0'),(u'24',u'98.0'),....] 

Sia RDDs hanno lo stesso numero o righe. Ora quello che voglio fare è prendere tutte le colonne in ogni riga da RDD1 (convertito da unicode a normale string) e la seconda colonna da ogni riga in RDD2 (convertito da unicode string a float) e formare un nuovo RDD con quello. Così il nuovo RDD sarà simile a questa:

RDD3:

[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....] 

Una volta fatto ciò, allora voglio fare aggregation dell'ultimo valore in ogni riga (il valore float) in questo nuovo RDD3 da il valore date nella prima colonna. Questo include tutte le righe in cui date è 2013-01-31 00:00:00, devono essere aggiunti i loro ultimi valori numerici.

Come posso farlo in PySpark?

+1

Non è una chiave di unirsi a loro, quindi penso che si dovrebbe zip ... –

+0

@AlbertoBonsanto può mostrare come io posso fare questo? –

+0

@AlbertoBonsanto restituirà 'rdd3 = izip (rdd1.toLocalIterator(), rdd2.toLocalIterator()) 'essere sufficiente? –

risposta

0

Per la prima parte della sua domanda, che sta combinando i due RDDs in uno in cui ogni riga è una tupla di 7, si può fare questo:

rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,b,c,d,e,f,g)) 

io non sono sicuro di quello che alla fine serve , è solo la data e la somma del secondo valore? Se è così, non è necessario tutti i valori:

rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,g)) 
rdd4 = rdd3.reduceByKey(lambda x, y: x+y) 
+0

sì in rdd4 (quello dopo l'aggregazione) Ho bisogno della data e dell'ultimo valore dopo l'aggregazione –

+0

quindi, questa risposta funziona per voi, o avete bisogno più aiuto con questo? –

2

È necessario zipWithIndex vostra RDDs, questo metodo crea una tupla con i tuoi dati e con un altro valore che rappresenta l'indice di quella voce, quindi è possibile unire entrambi RDDs di index.

Il tuo approccio dovrebbe essere simile a (scommetto che ci sono modi più efficienti):

rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"]) 
rdd2 = sc.parallelize(xrange(5)) 

zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v)) 
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v)) 

print zdd1.join(zdd2).collect() 

L'output sarà: [(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))], dopo questo solo map è necessario per ricomporre i dati. Per esempio. qui di seguito:

combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v) 
print combinedRDD.collect() 

# You can use the .zip method combinedRDD = rdd1.zip(rdd2) 

L'output sarà: [(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]

Circa la conversione del tipo di dati, ho avuto quel problema prima e per risolvere questo io uso this snippet.

import unicodedata 

convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1) 
             .encode('ascii','ignore'), v2) 

combinedRDD = combinedRDD.map(convert) 
print combinedRDD.collect() 

uscita sarà: [('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]

Problemi correlati