2015-02-19 15 views
6

Ho appena generato Spark su un computer Windows 7 (usando sbt) e sto camminando attraverso lo quick start. Il lavoro Spark ha esito negativo quando si chiama first().Spark job fallito quando si chiama first() in PySpark

Sono nuovo in Java e non ho una chiara idea di cosa mi mostra l'errore stacktrace anche se sembra essere correlato a java.net.SocketException dato il messaggio. Nota Non sto usando un'installazione Hadoop. Si noti inoltre che quando si esegue questo esempio in Scala, non ci sono errori.

Ambiente:

Windows 7
Spark 1.2.1
Anaconda Python 2.7.8
Scala 2.10.4
sbt 0.13.7
JDK 1.7.0.75

In [2]: path = u'C:\\Users\\striji\\Documents\\Personal\\python\\pyspark-flights\\2001.csv.bz2' 

In [3]: textFile = sc.textFile(path) 

In [4]: textFile 
Out[4]: C:\Users\striji\Documents\Personal\python\pyspark-flights\2001.csv.bz2 MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 

In [5]: textFile.count() 
     ... 
Out[5]: 5967781 

In [6]: textFile.first() 
15/02/19 08:52:01 INFO SparkContext: Starting job: runJob at PythonRDD.scala:344 
15/02/19 08:52:01 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:344) with 1 output partitions (allowLocal=true) 
15/02/19 08:52:01 INFO DAGScheduler: Final stage: Stage 1(runJob at PythonRDD.scala:344) 
15/02/19 08:52:01 INFO DAGScheduler: Parents of final stage: List() 
15/02/19 08:52:01 INFO DAGScheduler: Missing parents: List() 
15/02/19 08:52:01 INFO DAGScheduler: Submitting Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents 
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(4560) called with curMem=46832, maxMem=278302556 
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.5 KB, free 265.4 MB) 
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(3417) called with curMem=51392, maxMem=278302556 
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.3 KB, free 265.4 MB) 
15/02/19 08:52:01 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:51106 (size: 3.3 KB, free: 265.4 MB) 
15/02/19 08:52:01 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 
15/02/19 08:52:01 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838 
15/02/19 08:52:01 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43) 
15/02/19 08:52:01 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
15/02/19 08:52:01 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1341 bytes) 
15/02/19 08:52:01 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
15/02/19 08:52:04 INFO HadoopRDD: Input split: file:/C:/Users/striji/Documents/Personal/python/pyspark-flights/2001.csv.bz2:0+83478700 
15/02/19 08:52:04 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.net.SocketException: Connection reset 
     at java.net.SocketInputStream.read(SocketInputStream.java:196) 
     at java.net.SocketInputStream.read(SocketInputStream.java:122) 
     at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) 
     at java.io.BufferedInputStream.read(BufferedInputStream.java:254) 
     at java.io.DataInputStream.readInt(DataInputStream.java:387) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:56) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
15/02/19 08:52:05 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset 
     at java.net.SocketInputStream.read(SocketInputStream.java:196) 
     at java.net.SocketInputStream.read(SocketInputStream.java:122) 
     at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) 
     at java.io.BufferedInputStream.read(BufferedInputStream.java:254) 
     at java.io.DataInputStream.readInt(DataInputStream.java:387) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:56) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

15/02/19 08:52:05 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 
15/02/19 08:52:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/02/19 08:52:05 INFO TaskSchedulerImpl: Cancelling stage 1 
15/02/19 08:52:05 INFO DAGScheduler: Job 1 failed: runJob at PythonRDD.scala:344, took 3.796728 s 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-6-674a86098a8f> in <module>() 
----> 1 textFile.first() 

c:\spark-1.2.1\python\pyspark\rdd.pyc in first(self) 
    1137   ValueError: RDD is empty 
    1138   """ 
-> 1139   rs = self.take(1) 
    1140   if rs: 
    1141    return rs[0] 

c:\spark-1.2.1\python\pyspark\rdd.pyc in take(self, num) 
    1119 
    1120    p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
-> 1121    res = self.context.runJob(self, takeUpToNumLeft, p, True) 
    1122 
    1123    items += res 

c:\spark-1.2.1\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    825   # SparkContext#runJob. 
    826   mappedRDD = rdd.mapPartitions(partitionFunc) 
--> 827   it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 
    828   return list(mappedRDD._collect_iterator_through_file(it)) 
    829 

c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 
in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset 
     at java.net.SocketInputStream.read(SocketInputStream.java:196) 
     at java.net.SocketInputStream.read(SocketInputStream.java:122) 
     at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) 
     at java.io.BufferedInputStream.read(BufferedInputStream.java:254) 
     at java.io.DataInputStream.readInt(DataInputStream.java:387) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:56) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1 
214) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
     at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

risposta

0

Secondo l'errore - è perché il tuo RDD è vuoto.

Stai chiamando prima() su qualcosa che non esiste.

provare questo - ad esempio pyspark

`

People=["1,Maj,123","2,Pvt,333","3,Col,999"] 
rrd1=sc.parallelize(People) 
rrd1.first() 

`

dovrebbe uscita

`

'1,Maj,123' 

`

+0

Ho ancora lo stesso errore usando il tuo codice. – Pete

Problemi correlati