2015-04-30 24 views
10

Dopo la ripartizione di un dataframe in Spark 1.3.0 ottengo un'eccezione .parquet da salvare in S3 di Amazon.errore parquet durante il salvataggio da Spark

logsForDate 
    .repartition(10) 
    .saveAsParquetFile(destination) // <-- Exception here 

L'eccezione che ricevo è:

java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN 
at parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137) 
at parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129) 
at parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173) 
at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152) 
at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112) 
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) 
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:635) 
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649) 
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
at org.apache.spark.scheduler.Task.run(Task.scala:64) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Vorrei sapere qual è il problema e come risolverlo.

+0

Hai ricevuto l'errore ogni volta o solo qualche volta? Lo prendi anche per file più piccoli? Lo prendi solo su S3 o su altri file system? Hai provato Apache Spark 1.3.1? [Le sue note di rilascio] (http://spark.apache.org/releases/spark-release-1-3-1.html) menzionano alcune correzioni relative al Parquet. –

+0

Ricevo sempre l'errore quando lavoro al di sopra di una certa dimensione del file. Ho provato solo S3. Ho provato 1.3.0.d. – Interfector

+0

Sono in grado di riprodurre questo errore con Spark 1.3.1 su EMR, scrivendo su S3. L'utilizzo del vecchio Parquet API (sqlContext.setConf ("spark.sql.parquet.useDataSourceApi", "false")) non aiuta. Scrivere su HDFS funziona correttamente. –

risposta

4

Posso effettivamente riprodurre questo problema con Spark 1.3.1 su EMR, quando si salva su S3.

Tuttavia, il salvataggio su HDFS funziona correttamente. Puoi innanzitutto salvare su HDFS e quindi utilizzare, ad es. s3distcp per spostare i file su S3.

+0

Avrei preferito una soluzione in cui salvavo direttamente su S3, però. – Interfector

1

Ho riscontrato questo errore quando saveAsParquetFile in HDFS. Era perché datanode socket write timeout, quindi lo cambio ad uno più lungo in Impostazioni Hadoop:

<property> 
    <name>dfs.datanode.socket.write.timeout</name> 
    <value>3000000</value> 
</property> 
<property> 
    <name>dfs.socket.timeout</name> 
    <value>3000000</value> 
</property> 

Spero che questo aiuti, se è possibile impostare S3 come questo.

+0

Non sembra esserci alcuna impostazione equivalente per * s3n * – Interfector

+1

@Interfector, controlla il log del lavoratore e trova se ci sono messaggi utili lì. Vorrei eliminare la risposta più tardi –

+0

Lo stacktrace fornito è l'unica informazione che sono riuscito a trovare. – Interfector

1

Sei sicuro che questo non è dovuto a SPARK-6351 ("FS errato" al salvataggio del parquet in S3)? Se lo è, quindi non ha nulla a che fare con il ripartizionamento, ed è stato risolto in spark-1.3.1. Se però, come me, sei bloccato con la scintilla 1.3.0 perché stai usando CDH-5.4.0, ho appena scoperto ieri sera un modo per aggirarlo direttamente dal codice (nessuna modifica del file di configurazione):

spark.hadoopConfiguration.set("fs.defaultFS", "s3n://mybucket") 

Successivamente, è possibile salvare file di parquet su S3 senza problemi.

Si noti che ci sono diversi inconvenienti a questo, tuttavia. Penso (non ci ho provato) che non riuscirà a scrivere su un altro FS che su S3 e forse anche su un altro bucket. Potrebbe anche costringere Spark a scrivere file temporanei su S3 piuttosto che localmente, ma non l'ho nemmeno verificato.

+0

È possibile eseguire Spark 1.3.1 o 1.4.0 su CDH 5.4 bene. Basta eseguirlo come qualsiasi altra APPARECCHIO YARN –

+0

Sembra fantastico. Come si fa? Ho provato a cambiare la versione spark nel nostro build.sbt, ma si rompe. Uno dei motivi per cui utilizziamo CDH è quello di evitare mal di testa con versioni incompatibili delle dipendenze. –

+0

Non sono sicuro che non sia per quello, ma ho provato con Spark 1.3.1 e non ha funzionato. – Interfector

Problemi correlati