2016-04-22 5 views
5

Sto provando a creare un dizionario da un elenco in pyspark. Ho la seguente lista di liste:Cosa fa eccezione: la casualità dell'hash della stringa deve essere disabilitata tramite PYTHONHASHSEED in pyspark?

rawPositions 

[[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3904.125, 390412.5], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3900.75, 390075.0], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3882.5625, 388256.25], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3926.25, 392625.0], 
[2766232, 
    'CDX IG CDSI S25 V1 5Y CBBT CORP', 
    'BC85', 
    'Enterprise', 
    30000000.0, 
    -16323.2439825, 
    30000000.0], 
[2766232, 
    'CDX IG CDSI S25 V1 5Y CBBT CORP', 
    'BC85', 
    'Enterprise', 
    30000000.0, 
    -16928.620101900004, 
    30000000.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 105.0, 129596.25, 12959625.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 128.0, 162112.0, 16211200.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 135.0, 167146.875, 16714687.5], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 109.0, 132884.625, 13288462.5]] 

Poi usando il mio sparkcontext sc variabile I parallelizzare l'elenco

i = sc.parallelize(rawPositions) 
#i.collect() 

poi cerco di trasformarlo in un dizionario da utilizzando una funzione groupby sul terzo elemento di ciascuna voce di elenco.

j = i.groupBy(lambda x: x[3]) 
j.collect() 

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-143-6113a75f0a9e> in <module>() 
     2 #i.collect() 
     3 j = i.groupBy(lambda x: x[3]) 
----> 4 j.collect() 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 50.0 failed 4 times, most recent failure: Lost task 14.3 in stage 50.0 (TID 7583, brllxhtce01.bluecrest.local): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream 
    for obj in iterator: 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key 
    buckets[partitionFunc(k) % numPartitions].append((k, v)) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 74, in portable_hash 
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") 
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream 
    for obj in iterator: 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key 
    buckets[partitionFunc(k) % numPartitions].append((k, v)) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 74, in portable_hash 
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") 
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    ... 1 more 

Non ho idea di che cosa questo errore si riferisce a ... qualsiasi aiuto sarebbe grande!

risposta

13

Dal Python 3.2.3+ hash di str, byte e datetime oggetti in Python viene salata con valore casuale per prevenire alcuni tipi di attacchi denial-of-service. Significa che i valori hash sono coerenti all'interno della singola sessione di interprete, ma differiscono da una sessione all'altra. PYTHONHASHSEED imposta seme RNG per fornire un valore coerente tra la sessione.

Si può facilmente controllare questo nella vostra shell. Se PYTHONHASHSEED non è impostato si otterrà alcuni valori casuali:

unset PYTHONHASHSEED 
for i in `seq 1 3`; 
    do 
    python3 -c "print(hash('foo'))"; 
    done 

## -7298483006336914254 
## -6081529125171670673 
## -3642265530762908581 

ma quando è impostato si otterrà lo stesso valore su ogni esecuzione:

export PYTHONHASHSEED=323 
for i in `seq 1 3`; 
    do 
    python3 -c "print(hash('foo'))"; 
    done 

## 8902216175227028661 
## 8902216175227028661 
## 8902216175227028661 

Dal groupBy e altre operazioni che dipendono sul partizionatore predefinito uso hashing lo stesso valore di PYTHONHASHSEED su tutte le macchine nel cluster per ottenere risultati coerenti.

Consulta anche:

+0

Grazie. Questo l'ha risolto! – ThatDataGuy

+0

Wow, questo dovrebbe davvero essere nel tutorial di "iniziare con Spark". Immagino che non sia perché Spark gira su Python 2.7 per impostazione predefinita. Grazie, mi hai salvato. – sudo

Problemi correlati