Iniziamo con una semplice funzione che restituisce sempre un intero casuale:numeri casuali in PySpark
import numpy as np
def f(x):
return np.random.randint(1000)
e RDD riempito con zeri e mappati usando f
:
rdd = sc.parallelize([0] * 10).map(f)
Poiché sopra RDD è non persistono Mi aspetto che otterrò un output diverso ogni volta che raccolgo:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
Se ignoriamo il fatto che la distribuzione dei valori non sembra davvero casuale, è più o meno ciò che accade. Problema inizia quando ci si richiedere solo un primo elemento:
assert len(set(rdd.first() for _ in xrange(100))) == 1
o
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
Sembra restituire lo stesso numero ogni volta. Sono stato in grado di riprodurre questo comportamento su due macchine diverse con Spark 1.2, 1.3 e 1.4. Qui sto usando np.random.randint
ma si comporta allo stesso modo con random.randint
.
Questo problema, stesso che i risultati non esattamente-casuali con collect
, sembra essere Python specifici e non ho potuto riprodurlo utilizzando Scala:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
mi sono perso qualcosa di ovvio qui?
Edit:
Attiva la fonte del problema è l'attuazione di Python RNG. Per quotare official documentation:
Le funzioni fornite da questo modulo sono in realtà metodi associati di un'istanza nascosta della classe random.Random. Puoi istanziare le tue istanze di Random per ottenere generatori che non condividono lo stato.
suppongo NumPy funziona allo stesso modo e riscrittura f
usando RandomState
esempio come segue
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
rende più lento ma risolve il problema.
Mentre in precedenza non si spiegano risultati casuali da raccolta, non riesco ancora a capire come influisce su first
/take(1)
tra più azioni.
Giusto per chiarire: se sto usando la funzione casuale di numpy in Spark, sceglie sempre gli stessi risultati in ogni partizione? Come posso usare np.random.choice in modo che sia casuale? – member555
_Si sceglie sempre gli stessi risultati in ogni partizione_ - non esattamente, ma i valori calcolati su un singolo lavoratore non saranno indipendenti. Come posso usare np.random.choice in modo casuale? _ - Ho già descritto la soluzione in una modifica. Dovresti usare uno stato separato. Poiché è piuttosto costoso, potresti voler inizializzarlo una volta per partizione. – zero323
Puoi spiegarmi più dettagliatamente qual è il problema? perché lo stato condiviso di Python è un problema? – member555