2014-11-04 13 views
11

Quindi sto cercando di imparare Spark usando Python (Pyspark). Voglio sapere come funziona la funzione mapPartitions. Questo è ciò che Input prende e quale Output dà. Non sono riuscito a trovare alcun esempio corretto da internet. Diciamo, ho un oggetto RDD contenente elenchi, come di seguito.Come funziona la funzione mapPartitions di pyspark?

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 

E voglio rimuovere l'elemento 2 da tutte le liste, come faccio a ottenere che l'utilizzo di mapPartitions.

risposta

17

mapPartition deve essere considerato un'operazione di mapping su partizioni e non sugli elementi della partizione. È l'input è l'insieme delle partizioni correnti, il suo output sarà un altro insieme di partizioni.

La funzione si passa mappa deve prendere un singolo elemento della vostra RDD

La funzione si passa mapPartition deve prendere un iterabile del vostro tipo di RDD e tornare e iterabile di qualche altro o lo stesso tipo.

Nel tuo caso, probabilmente si vuole solo fare qualcosa di simile

def filterOut2(line): 
    return [x for x in line if x != 2] 

filtered_lists = data.map(filterOut2) 

se si voleva utilizzare mapPartition sarebbe

def filterOut2FromPartion(list_of_lists): 
    final_iterator = [] 
    for sub_list in list_of_lists: 
    final_iterator.append([x for x in sub_list if x != 2]) 
    return iter(final_iterator) 

filtered_lists = data.mapPartition(filterOut2FromPartion) 
+0

Perché non si restituisce nulla in filterOut2FromPartition f unzione. In secondo luogo, è finale qualche parola chiave in python? Penso che intendevi dire final.iterator = [] invece di final_iterator. – MetallicPriest

+0

Risolti i problemi – bearrito

+0

Ho provato a implementarlo ma ottengo l'errore "elenco oggetti non è un iteratore". Inoltre, penso che quando hai scritto [x per x in linea se x! = 2], penso che intendessi [x per x nella lista se x! = 2]. Ho usato la lista lì. – MetallicPriest

18

E 'più facile da usare mapPartitions con una funzione di generatore utilizzando il yield sintassi:

def filter_out_2(partition): 
    for element in partition: 
     if element != 2: 
      yield element 

filtered_lists = data.mapPartition(filter_out_2) 
+0

È più veloce di una semplice lista? – cgreen

+1

@cgreen la partizione contiene tutti i tuoi dati. Non sono sicuro di voler caricare tutti i tuoi dati in un elenco. I generatori sono preferiti sugli elenchi quando si itera su dati. – Narek

+0

@cgreen I generatori utilizzano meno memoria, poiché generano ciascun elemento come necessario, invece di dover generare inizialmente un intero elenco di oggetti. Quindi utilizza decisamente meno memoria e quindi è probabilmente più veloce. [Ecco una buona spiegazione dei generatori in Python] (https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db). –

Problemi correlati