2015-06-23 20 views
6

Sto esplorando Spark per l'elaborazione in batch. Sto facendo scoccare la scintilla sulla mia macchina locale usando la modalità standalone.Scrivere RDD come file di testo utilizzando Apache Spark

Sto cercando di convertire Spark RDD come singolo file [output finale] utilizzando il metodo saveTextFile(), ma non funziona.

Ad esempio, se si dispone di più di una partizione, è possibile ottenere un singolo file come output finale.

Aggiornamento:

Ho provato gli approcci di seguito, ma io sono sempre un'eccezione di puntatore nullo.

person.coalesce(1).toJavaRDD().saveAsTextFile("C://Java_All//output"); 
person.repartition(1).toJavaRDD().saveAsTextFile("C://Java_All//output"); 

L'eccezione è:

15/06/23 18:25:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
15/06/23 18:25:27 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 
15/06/23 18:25:27 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 
15/06/23 18:25:27 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 
15/06/23 18:25:27 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir 
15/06/23 18:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.lang.NullPointerException 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
    at org.apache.hadoop.util.Shell.run(Shell.java:379) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) 
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) 
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) 
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
15/06/23 18:25:27 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
    at org.apache.hadoop.util.Shell.run(Shell.java:379) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) 
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) 
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) 
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

15/06/23 18:25:27 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 
15/06/23 18:25:27 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/06/23 18:25:27 INFO TaskSchedulerImpl: Cancelling stage 1 
15/06/23 18:25:27 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at TestSpark.java:40) failed in 0.249 s 
15/06/23 18:25:28 INFO DAGScheduler: Job 0 failed: saveAsTextFile at TestSpark.java:40, took 0.952286 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
    at org.apache.hadoop.util.Shell.run(Shell.java:379) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) 
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) 
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) 
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
15/06/23 18:25:28 INFO SparkContext: Invoking stop() from shutdown hook 
15/06/23 18:25:28 INFO SparkUI: Stopped Spark web UI at http://10.37.145.179:4040 
15/06/23 18:25:28 INFO DAGScheduler: Stopping DAGScheduler 
15/06/23 18:25:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/06/23 18:25:28 INFO Utils: path = C:\Users\crh537\AppData\Local\Temp\spark-a52371d8-ae6a-4567-b759-0a6c66c1908c\blockmgr-4d17a5b4-c8f8-4408-af07-0e88239794e8, already present as root for deletion. 
15/06/23 18:25:28 INFO MemoryStore: MemoryStore cleared 
15/06/23 18:25:28 INFO BlockManager: BlockManager stopped 
15/06/23 18:25:28 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/06/23 18:25:28 INFO SparkContext: Successfully stopped SparkContext 
15/06/23 18:25:28 INFO Utils: Shutdown hook called 

saluti, Shankar

+0

beh, il tuo rdd si sta svuotando da qualche parte. non possiamo aiutarti a trovare l'errore con la porzione di codice che ci hai fornito .. ti consiglio di provare almeno a contare il tuo controllo rdd se è vuoto e fallo uno per uno! – eliasah

+0

È possibile verificare le autorizzazioni FileSystem o HDFS per quella particolare cartella. Inoltre è possibile aggiungere il protocollo prima del percorso del filesystem. Inoltre, come menzionato in precedenza, potresti aver bisogno di impostare WinUtils nel tuo percorso di sistema. Se vuoi eseguire le cose relative a hadoop sul tuo Local. –

risposta

5

È possibile utilizzare coalesce metodo per salvare in un unico file. In questo modo il codice sarà simile a questa:

val myFile = sc.textFile("file.txt") 
val finalRdd = doStuff(myFile) 
finalRdd.coalesce(1).saveAsTextFile("newfile") 

C'è anche un altro metodo repartition a fare la stessa cosa, ma causerà un riordino che è può essere molto costoso, mentre si fondono cercherà di evitare uno shuffle.

+0

sto usando Java per implementare Spark, ma sto ricevendo l'eccezione, ho aggiornato la domanda con i dettagli delle eccezioni. – Shankar

+2

Sembra che stia cercando di scrivere un file e fallisce. Puoi controllare se hai i permessi per scrivere nella directory? Inoltre, poiché Spark è pigro, può darsi che il problema sia di persona rdd. Puoi eseguire 'person.coalesce (1) .toJavaRDD(). Count()' per assicurarti che produca un numero di righe e non lancia l'eccezione? – Maksud

+0

quando uso saveAsTextFile ("") dove salverà il file, intendo quale nodo (worker o driver). Possiamo anche dare un nome specifico al file come file di output? – Shankar

-1

È possibile utilizzare il metodo di ripartizione in RDD. In realtà crea tante partizioni quante ne hai passato integer. Nel tuo caso sarà:

rdd.repartition(1).saveAsTextFile("path to save rdd") 
+0

sto usando Java per implementare Spark, ma sto ricevendo l'eccezione, ho aggiornato la domanda con i dettagli delle eccezioni. – Shankar

-1
  1. Scarica winutils.exe
  2. Luogo winutils.exe sotto la cartella bin di qualsiasi unità (D:/Winutils/bin /)
  3. Impostare il percorso nel codice come riportato sotto

    System.setProperty ("hadoop.home.dir", "D: \\ Winutils \\");

Ora eseguire il codice, deve funzionare.

Problemi correlati