2014-07-17 12 views
6

Ho un file in hdfs che è distribuito attraverso i nodi nel cluster.Campionamento di un grande set di dati distribuiti utilizzando pyspark/spark

Sto cercando di ottenere un campione casuale di 10 righe da questo file.

nel guscio pyspark, ho letto il file in un RDD utilizzando:

>>> textFile = sc.textFile("/user/data/myfiles/*") 

e poi voglio prendere semplicemente un campione ... la cosa bella di Spark è che ci sono comandi come takeSample, Purtroppo penso che sto facendo qualcosa di sbagliato, perché il seguente prende un sacco di tempo:

>>> textFile.takeSample(False, 10, 12345) 

così ho cercato di creare una partizione su ciascun nodo, e poi istruendo ogni nodo di assaggiare quella partizione usando il seguente comando:

>>> textFile.partitionBy(4).mapPartitions(lambda blockOfLines: blockOfLines.takeSample(False, 10, 1234)).first() 

ma questo dà un errore ValueError: too many values to unpack:

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/serializers.py", line 117, in dump_stream 
    for obj in iterator: 
    File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/rdd.py", line 821, in add_shuffle_key 
    for (k, v) in iterator: 
ValueError: too many values to unpack 

Come posso assaggiare 10 linee di dati di grandi dimensioni distribuiti impostato utilizzando scintilla o pyspark?

+0

non credo che questo è un problema con la scintilla, vedere http://stackoverflow.com/questions/7053551/python-valueerror-too-many-values-to-unpack – aaronman

+0

@aaronman siete sulla strada giusta nel percepire che l'errore "troppi valori" è sicuramente un errore python. Aggiungerò ulteriori dettagli sul messaggio di errore. La mia impressione è che ci sia qualcosa di sbagliato nel mio codice pyspark: sei in grado di eseguire questo codice con successo sul tuo setup di accensione? – mgoldwasser

+1

Uso solo la scala scintilla API, penso che lo stile funzionale di scala si adatti molto bene a Mapreduce in generale – aaronman

risposta

11

Utilizzando campione invece di takeSample sembra rendere le cose abbastanza veloce:

textFile.sample(False, .0001, 12345) 

il problema con questo è che è difficile sapere la frazione diritto di scegliere se non si ha una vaga idea del numero di righe il tuo set di dati.

21

Provare a utilizzare textFile.sample(false,fraction,seed) invece. takeSample sarà in genere molto lento perché è calls count() on the RDD. Ha bisogno di farlo perché altrimenti non richiederebbe un numero uniforme di partizioni, in pratica utilizza il conteggio insieme alla dimensione del campione richiesta per calcolare la frazione e chiama internamente il numero sample. sample è veloce perché utilizza solo un generatore booleano casuale che restituisce il vero fraction percento delle volte e quindi non ha bisogno di chiamare count.

Inoltre, non penso che questo stia accadendo ma se la dimensione del campione restituita non è abbastanza grande chiama di nuovo sample che ovviamente può rallentarlo. Dato che dovresti avere un'idea della dimensione dei tuoi dati, ti consiglio di chiamare sample e poi di ridimensionare il campione per ridimensionare te stesso, dal momento che sai di più sui tuoi dati rispetto a spark.

+0

Questo è un po 'strano. Il conteggio non è un'operazione lenta - È ~ 2 ordini di grandezza più veloce di takeSample, suggerendo che questo non è il problema principale. – nbubis

Problemi correlati