Dopo la ripartizione di un dataframe in Spark 1.3.0 ottengo un'eccezione .parquet da salvare in S3 di Amazon.errore parquet durante il salvataggio da Spark
logsForDate
.repartition(10)
.saveAsParquetFile(destination) // <-- Exception here
L'eccezione che ricevo è:
java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN
at parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137)
at parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129)
at parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173)
at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:635)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Vorrei sapere qual è il problema e come risolverlo.
Hai ricevuto l'errore ogni volta o solo qualche volta? Lo prendi anche per file più piccoli? Lo prendi solo su S3 o su altri file system? Hai provato Apache Spark 1.3.1? [Le sue note di rilascio] (http://spark.apache.org/releases/spark-release-1-3-1.html) menzionano alcune correzioni relative al Parquet. –
Ricevo sempre l'errore quando lavoro al di sopra di una certa dimensione del file. Ho provato solo S3. Ho provato 1.3.0.d. – Interfector
Sono in grado di riprodurre questo errore con Spark 1.3.1 su EMR, scrivendo su S3. L'utilizzo del vecchio Parquet API (sqlContext.setConf ("spark.sql.parquet.useDataSourceApi", "false")) non aiuta. Scrivere su HDFS funziona correttamente. –