Come si eliminano le righe da un RDD in PySpark? In particolare la prima riga, dal momento che tende a contenere i nomi delle colonne nei miei dataset. Dalla lettura dell'API, non riesco a trovare un modo semplice per farlo. Ovviamente potrei farlo tramite Bash/HDFS, ma voglio solo sapere se è possibile farlo all'interno di PySpark.PySpark Drop Righe
risposta
AFAIK non c'è un modo "facile" per farlo.
Questo dovrebbe fare il trucco, però:
val header = data.first
val rows = data.filter(line => line != header)
Personalmente penso solo utilizzando un filtro per sbarazzarsi di questa roba è il modo più semplice. Ma per il tuo commento ho un altro approccio. Glom il RDD in modo che ogni partizione sia un array (presumo che tu abbia 1 file per partizione, e ogni file abbia la riga offendente in cima) e poi salti il primo elemento (questo è con scala api).
data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index
Tenete a mente una delle grandi caratteristiche di RDD di è che sono immutabili, eliminando così naturalmente di fila è una cosa difficile da fare
UPDATE: migliore soluzione.
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/})
Come il glom, ma non ha il sovraccarico di mettere tutto in un array, dal momento che x è un iteratore in questo caso
Specifico PySpark:
Come da @maasg, si potrebbe A questo scopo:
header = rdd.first()
rdd.filter(lambda line: line != header)
ma non è tecnicamente corretto, in quanto è possibile escludere righe contenenti dati e l'intestazione. Tuttavia, questo sembra funzionare per me:
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)
Allo stesso modo:
rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
Sono nuovo di Spark, quindi non posso commentare in modo intelligente su che sarà più veloce.
Puoi spiegare cosa sta succedendo lì (io sono un dev JS)? 'return iter (list (itr) [1:] se itr_index == 0 else itr)'? 1) - 'iter' prende un' (oggetto [, sentinel]) '- quindi suppongo che' iter' prende una lista di itr' iterables (righe), quindi usa l'operatore 'range' di Python, iniziando dal 2 ° indice (Basato su 0), quindi itera fino a "itr_index == 0', altrimenti continua a restituire righe' itr'? Sto chiedendo perché sto usando la stessa cosa ma la riga iniziale dei campi non appare, piuttosto la prima riga di dati diventa i campi. – Growler
Il 'iter' sta forse confondendo il problema. Se 'rdd.mapParitionsWithIndex' restituisce l'indice della partizione, più i dati della partizione come elenco, sarebbe semplicemente' itr [1:] se itr_index == 0 altrimenti itr'- cioè se è la prima partizione (cioè ' itr_index == 0') quindi escludi la prima riga (cioè l'intestazione), e non è la prima partizione (cioè nessuna intestazione), basta restituire l'intera partizione. L''iter' e' list' sono perché in realtà utilizza iterabili invece di liste. Per inciso, sono abbastanza sicuro che ci sia un percorso più efficiente di 'iter (list (itr) [1:])'. – ianhoolihan
Un modo semplice per raggiungere questo obiettivo in PySpark (Python API):
noHeaderRDD = rawRDD.zipWithIndex().filter(lambda (row,index): index > 0).keys()
Ho testato con spark2.1. Diciamo che vuoi rimuovere le prime 14 righe senza sapere il numero di colonne che il file ha.
sc = spark.sparkContext
lines = sc.textFile("s3://location_of_csv")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn è una funzione df. Quindi di seguito non funzionerà in stile RDD come usato nel caso precedente.
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
- 1. Drag and drop righe all'interno QTableWidget
- 2. Javascript Dynamic Drop Drop
- 3. Controllo drop drop doppio elenco
- 4. Valutazione PySpark
- 5. TABELLA TEMPORANEA LENTO DROP
- 6. raccogliere RDD con buffer in pyspark
- 7. Esplodi in PySpark
- 8. Pyspark: ripartizione vs partizioneBy
- 9. takeOrdered discendente Pyspark
- 10. PySpark 1.5 & MSSQL jdbc
- 11. Correlazione calcolo Pyspark
- 12. numeri casuali in PySpark
- 13. Pyspark: shuffle RDD
- 14. Problemi nell'installazione di Pyspark
- 15. Aggiungi Jar standalone pyspark
- 16. Registrazione di PySpark?
- 17. Come ottenere righe da DF che contengono il valore Nessuno in pyspark (scintilla)
- 18. pySpark DataFrames Funzioni di aggregazione con SciPy
- 19. Differenza tra drop e drop Purge in Oracle
- 20. T-SQL: DROP Vincoli a cascata della tabella DROP equivalenti?
- 21. Pyspark StructType non è definito
- 22. In esecuzione nosetests per pyspark
- 23. tabella Query alveare in pyspark
- 24. Crea profilo PySpark per IPython
- 25. sovrascrivendo un'uscita scintilla utilizzando pyspark
- 26. Come impostare spark.sql.parquet.output.committer.class in pyspark
- 27. matrice pyspark con variabili dummy
- 28. Come importare pyspark in anaconda
- 29. PySpark row-wise funzione composizione
- 30. Split RDD per convalida K-fold: pyspark
utilizzare 'filtro' per filtrare le righe errate – aaronman
Cosa succede se si desidera eliminare solo la prima riga? E diciamo per amor di discussione, non possiamo usare alcuna informazione nel vettore di riga x, cioè non possiamo fare 'lambda x: (alcune condizioni usando x)'. – Jack
Checkout la mia risposta potrebbe essere un po 'più vicina a quello che stavi cercando – aaronman