2015-08-09 14 views
7

Iniziamo con una semplice funzione che restituisce sempre un intero casuale:numeri casuali in PySpark

import numpy as np 

def f(x): 
    return np.random.randint(1000) 

e RDD riempito con zeri e mappati usando f:

rdd = sc.parallelize([0] * 10).map(f) 

Poiché sopra RDD è non persistono Mi aspetto che otterrò un output diverso ogni volta che raccolgo:

> rdd.collect() 
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255] 

Se ignoriamo il fatto che la distribuzione dei valori non sembra davvero casuale, è più o meno ciò che accade. Problema inizia quando ci si richiedere solo un primo elemento:

assert len(set(rdd.first() for _ in xrange(100))) == 1 

o

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1 

Sembra restituire lo stesso numero ogni volta. Sono stato in grado di riprodurre questo comportamento su due macchine diverse con Spark 1.2, 1.3 e 1.4. Qui sto usando np.random.randint ma si comporta allo stesso modo con random.randint.

Questo problema, stesso che i risultati non esattamente-casuali con collect, sembra essere Python specifici e non ho potuto riprodurlo utilizzando Scala:

def f(x: Int) = scala.util.Random.nextInt(1000) 

val rdd = sc.parallelize(List.fill(10)(0)).map(f) 
(1 to 100).map(x => rdd.first).toSet.size 

rdd.collect() 

mi sono perso qualcosa di ovvio qui?

Edit:

Attiva la fonte del problema è l'attuazione di Python RNG. Per quotare official documentation:

Le funzioni fornite da questo modulo sono in realtà metodi associati di un'istanza nascosta della classe random.Random. Puoi istanziare le tue istanze di Random per ottenere generatori che non condividono lo stato.

suppongo NumPy funziona allo stesso modo e riscrittura f usando RandomState esempio come segue

import os 
import binascii 

def f(x, seed=None): 
    seed = (
     seed if seed is not None 
     else int(binascii.hexlify(os.urandom(4)), 16)) 
    rs = np.random.RandomState(seed) 
    return rs.randint(1000) 

rende più lento ma risolve il problema.

Mentre in precedenza non si spiegano risultati casuali da raccolta, non riesco ancora a capire come influisce su first/take(1) tra più azioni.

+0

Giusto per chiarire: se sto usando la funzione casuale di numpy in Spark, sceglie sempre gli stessi risultati in ogni partizione? Come posso usare np.random.choice in modo che sia casuale? – member555

+0

_Si sceglie sempre gli stessi risultati in ogni partizione_ - non esattamente, ma i valori calcolati su un singolo lavoratore non saranno indipendenti. Come posso usare np.random.choice in modo casuale? _ - Ho già descritto la soluzione in una modifica. Dovresti usare uno stato separato. Poiché è piuttosto costoso, potresti voler inizializzarlo una volta per partizione. – zero323

+0

Puoi spiegarmi più dettagliatamente qual è il problema? perché lo stato condiviso di Python è un problema? – member555

risposta

2

Quindi il problema attuale qui è relativamente semplice.Ogni sottoprocesso in Python eredita il suo stato da suo genitore:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) 
# 1 

Dal Stato genitore non ha alcun motivo di cambiare in questo scenario particolare e lavoratori hanno una durata limitata, stato di ogni bambino sarà esattamente lo stesso su ogni esecuzione.

1

Questo sembra essere un bug (o funzionalità) di randint. Vedo lo stesso comportamento, ma non appena cambio il f, i valori effettivamente cambiano. Quindi, non sono sicuro della casualità effettiva di questo metodo ... Non riesco a trovare alcuna documentazione, ma sembra che stia usando un algoritmo matematico deterministico invece di usare più funzioni variabili della macchina in esecuzione. Anche se vado avanti e indietro, i numeri sembrano essere gli stessi al ritorno al valore originale ...

+0

È un generatore pseudocasuale che implementa Mersenne Twister ma non dovrebbe essere un problema. Il problema è sicuramente correlato alla classe 'Random' condivisa (ho modificato la domanda per riflettere ciò), ma il modo in cui influenza l'output' first' mi lascia ancora dei dubbi. – zero323