2016-06-28 24 views
5

Sto provando il seguente codice che aggiunge un numero a ogni riga in un RDD e restituisce un elenco di RDD utilizzando PySpark.Valutazione PySpark

from pyspark.context import SparkContext 
file = "file:///home/sree/code/scrap/sample.txt" 
sc = SparkContext('local', 'TestApp') 
data = sc.textFile(file) 
splits = [data.map(lambda p : int(p) + i) for i in range(4)] 
print splits[0].collect() 
print splits[1].collect() 
print splits[2].collect() 

Il contenuto del file di input (sample.txt) è:

1 
2 
3 

mi aspettavo un output del tipo (aggiungendo i numeri nella RDD con 0, 1, 2 rispettivamente):

[1,2,3] 
[2,3,4] 
[3,4,5] 

mentre l'uscita effettiva era:

[4, 5, 6] 
[4, 5, 6] 
[4, 5, 6] 

che significa che la comprensione ha utilizzato solo il valore 3 per la variabile i, indipendentemente dall'intervallo (4).

Perché questo comportamento si verifica?

risposta

3

Succede a causa del binding in ritardo di Python e non è (Py) Spark specifico. i verrà cercato quando viene utilizzato lambda p : int(p) + i, non quando è definito. In genere significa quando viene chiamato, ma in questo particolare contesto è quando viene serializzato per essere inviato ai lavoratori.

Si può fare, per esempio qualcosa di simile:

def f(i): 
    def _f(x): 
     try: 
      return int(x) + i 
     except: 
      pass 
    return _f 

data = sc.parallelize(["1", "2", "3"]) 
splits = [data.map(f(i)) for i in range(4)] 
[rdd.collect() for rdd in splits] 
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]] 
+0

avevo provato passando 'p' ad una semplice funzione esterna, e ad una funzione interna (come quella nella risposta) chiamata attraverso una lambda, a scopo di prova ed errore. notato il comportamento corretto, quando ho fatto questo: http://pastebin.com/z7E7wGKx Grazie per la risposta con il motivo per cui questo accade. – srjit

+0

vale la pena notare che questo accade praticamente in qualsiasi lingua con chiusure/lambda, anche C# –

2

Ciò è dovuto al fatto che lambda si riferiscono alla via i di riferimento! Non ha niente a che fare con la scintilla. See this

Si può provare questo:

a =[(lambda y: (lambda x: y + int(x)))(i) for i in range(4)] 
splits = [data.map(a[x]) for x in range(4)] 

o in una linea

splits = [ 
    data.map([(lambda y: (lambda x: y + int(x)))(i) for i in range(4)][x]) 
    for x in range(4) 
] 
+1

Se si desidera utilizzare 'lambdas' c'è un semplice trucco che evita il nesting:' [lambda x, i = i: i + int (x) per i in range (4)] '. – zero323