2016-01-13 10 views
13

sto solo ottenendo il blocco di Spark, e non ho la funzione che deve essere mappato a un rdd, ma utilizza un dizionario globale:Broadcast un dizionario per RDD in PySpark

from pyspark import SparkContext 

sc = SparkContext('local[*]', 'pyspark') 

my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified 
my_list = ["a", "d", "c", "b"] 

def my_func(letter): 
    return my_dict[letter] 

my_list_rdd = sc.parallelize(my_list) 

result = my_list_rdd.map(lambda x: my_func(x)).collect() 

print result 
Quanto sopra

dà l'atteso risultato; tuttavia, non sono sicuro del mio utilizzo della variabile globale my_dict. Sembra che una copia del dizionario sia fatta con ogni partizione. E non mi sembra giusto ..

Sembrava che broadcast è quello che sto cercando. Tuttavia, quando cerco di usarlo:

my_dict_bc = sc.broadcast(my_dict) 

def my_func(letter): 
    return my_dict_bc[letter] 

ottengo il seguente errore:

TypeError: 'Broadcast' object has no attribute '__getitem__ 

Questo sembra implicare che non riesco a trasmettere un dizionario.

La mia domanda: se ho una funzione che utilizza un dizionario globale, che deve essere mappato su rdd, qual è il modo corretto per farlo?

Il mio esempio è molto semplice, ma in realtà my_dict e my_list sono molto più grandi e my_func è più complicato.

risposta

15

Hai dimenticato qualcosa di importante sugli oggetti Broadcast, hanno una proprietà chiamata value dove sono memorizzati i dati.

Pertanto è necessario modificare my_func a qualcosa di simile:

my_dict_bc = sc.broadcast(my_dict) 

def my_func(letter): 
    return my_dict_bc.value[letter] 
+0

vedo !, Così, quando aggiungo 'my_dict_bc.value' funziona correttamente. E 'broadcasting' è un approccio standard per lavorare con oggetti che vengono condivisi, giusto? – Akavall

+0

Sì, è una buona pratica, tuttavia se il dizionario non è troppo grande, è possibile utilizzare un oggetto globale senza alcun problema. –

+0

Ha senso. Grazie. – Akavall