2014-07-13 13 views
16

Come si eliminano le righe da un RDD in PySpark? In particolare la prima riga, dal momento che tende a contenere i nomi delle colonne nei miei dataset. Dalla lettura dell'API, non riesco a trovare un modo semplice per farlo. Ovviamente potrei farlo tramite Bash/HDFS, ma voglio solo sapere se è possibile farlo all'interno di PySpark.PySpark Drop Righe

+1

utilizzare 'filtro' per filtrare le righe errate – aaronman

+0

Cosa succede se si desidera eliminare solo la prima riga? E diciamo per amor di discussione, non possiamo usare alcuna informazione nel vettore di riga x, cioè non possiamo fare 'lambda x: (alcune condizioni usando x)'. – Jack

+0

Checkout la mia risposta potrebbe essere un po 'più vicina a quello che stavi cercando – aaronman

risposta

15

AFAIK non c'è un modo "facile" per farlo.

Questo dovrebbe fare il trucco, però:

val header = data.first 
val rows = data.filter(line => line != header) 
+0

Questo è ragionevole. Grazie! – Jack

+0

Non dovrebbe essere data.first? data.take (1) restituirà una matrice [T] con lunghezza 1. – Bar

+0

@Bar sì - hai ragione. Aggiornerò la risposta Grazie. – maasg

1

Personalmente penso solo utilizzando un filtro per sbarazzarsi di questa roba è il modo più semplice. Ma per il tuo commento ho un altro approccio. Glom il RDD in modo che ogni partizione sia un array (presumo che tu abbia 1 file per partizione, e ogni file abbia la riga offendente in cima) e poi salti il ​​primo elemento (questo è con scala api).

data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index

Tenete a mente una delle grandi caratteristiche di RDD di è che sono immutabili, eliminando così naturalmente di fila è una cosa difficile da fare

UPDATE: migliore soluzione.
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/})
Come il glom, ma non ha il sovraccarico di mettere tutto in un array, dal momento che x è un iteratore in questo caso

13

Specifico PySpark:

Come da @maasg, si potrebbe A questo scopo:

header = rdd.first() 
rdd.filter(lambda line: line != header) 

ma non è tecnicamente corretto, in quanto è possibile escludere righe contenenti dati e l'intestazione. Tuttavia, questo sembra funzionare per me:

def remove_header(itr_index, itr): 
    return iter(list(itr)[1:]) if itr_index == 0 else itr 
rdd.mapPartitionsWithIndex(remove_header) 

Allo stesso modo:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0]) 

Sono nuovo di Spark, quindi non posso commentare in modo intelligente su che sarà più veloce.

+0

Puoi spiegare cosa sta succedendo lì (io sono un dev JS)? 'return iter (list (itr) [1:] se itr_index == 0 else itr)'? 1) - 'iter' prende un' (oggetto [, sentinel]) '- quindi suppongo che' iter' prende una lista di itr' iterables (righe), quindi usa l'operatore 'range' di Python, iniziando dal 2 ° indice (Basato su 0), quindi itera fino a "itr_index == 0', altrimenti continua a restituire righe' itr'? Sto chiedendo perché sto usando la stessa cosa ma la riga iniziale dei campi non appare, piuttosto la prima riga di dati diventa i campi. – Growler

+1

Il 'iter' sta forse confondendo il problema. Se 'rdd.mapParitionsWithIndex' restituisce l'indice della partizione, più i dati della partizione come elenco, sarebbe semplicemente' itr [1:] se itr_index == 0 altrimenti itr'- cioè se è la prima partizione (cioè ' itr_index == 0') quindi escludi la prima riga (cioè l'intestazione), e non è la prima partizione (cioè nessuna intestazione), basta restituire l'intera partizione. L''iter' e' list' sono perché in realtà utilizza iterabili invece di liste. Per inciso, sono abbastanza sicuro che ci sia un percorso più efficiente di 'iter (list (itr) [1:])'. – ianhoolihan

4

Un modo semplice per raggiungere questo obiettivo in PySpark (Python API):

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda (row,index): index > 0).keys() 
0

Ho testato con spark2.1. Diciamo che vuoi rimuovere le prime 14 righe senza sapere il numero di colonne che il file ha.

sc = spark.sparkContext 
lines = sc.textFile("s3://location_of_csv") 
parts = lines.map(lambda l: l.split(",")) 
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0]) 

withColumn è una funzione df. Quindi di seguito non funzionerà in stile RDD come usato nel caso precedente.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)