2014-07-01 10 views
9

Sto cercando di risolvere il seguente problema utilizzando pyspark. Ho un file su hdfs nel formato che è un dump della tabella di ricerca.Creazione di un dizionario di grandi dimensioni in pyspark

key1, value1 
key2, value2 
... 

Voglio caricare questo nel dizionario python in pyspark e usarlo per qualche altro scopo. Così ho provato a fare:

table = {} 
def populateDict(line): 
    (k,v) = line.split(",", 1) 
    table[k] = v 

kvfile = sc.textFile("pathtofile") 
kvfile.foreach(populateDict) 

Ho trovato che la variabile di tabella non viene modificata. Quindi, c'è un modo per creare un grande hash inmemory nella scintilla?

risposta

4

foreach è un calcolo distribuito in modo che non ci si possa aspettare che modifichi una struttura dati visibile solo nel driver. Quello che vuoi è.

kv.map(line => { line.split(" ") match { 
    case Array(k,v) => (k,v) 
    case _ => ("","") 
}.collectAsMap() 

Questo è in scala, ma si ottiene l'idea, la funzione importante è collectAsMap() che restituisce una mappa per il conducente.

Se i dati sono molto grandi, è possibile utilizzare un PairRDD come mappa. Prima mappa per coppie

kv.map(line => { line.split(" ") match { 
     case Array(k,v) => (k,v) 
     case _ => ("","") 
    } 

allora si può accedere con rdd.lookup("key") che restituisce una sequenza di valori associati con la chiave, anche se questo sicuramente non sarà così efficace come altri negozi KV distribuiti, come scintilla non è realmente costruito per quella.

+0

Cool grazie. Ciò significa che la mappa deve adattarsi alla memoria del conducente? O è ancora distribuito? – Kamal

+0

@Kamal si deve adattarsi a mem. U potrebbe usare pair rdd come tabella di ricerca. Anche pensato a una soluzione con cumulabile, posterò presto – aaronman

+0

Ok. Stavo cercando una mappa distribuita in scintilla. Sembra che non sia possibile! – Kamal

1

Per l'efficienza, vedi: sortByKey() and lookup()

lookup (chiave):

restituire l'elenco di valori nella RDD per la chiave chiave. Questa operazione viene eseguita in modo efficiente se l'RDD ha un partizionatore noto eseguendo solo ricerche nella partizione a cui la chiave esegue la mappatura.

La RDD sarà ri-partizionato da sortByKey() (see: OrderedRDD) ed efficiente cercato durante lookup() chiamate. In codice, qualcosa del tipo,

kvfile = sc.textFile("pathtofile") 
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey() 

sorted_kv.lookup('key1').take(10) 

farà il trucco sia come RDD che in modo efficiente.

Problemi correlati