2015-10-16 13 views
7

per esempio risultato di questaCome salvare una scintilla DataFrame come csv sul disco?

df.filter("project = 'en'").select("title","count").groupBy("title").sum() 

che restituisce un array.

Come salvare una scintilla DataFrame come csv su disco?

+1

btw questo non restituisce un array, ma un DataFrame! [riferimento qui] (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah

+0

Se la risposta fornita risolve la tua domanda, ti preghiamo di accettarla e up-vote quindi possiamo classificare questa domanda come risolta! – eliasah

risposta

13

Apache Spark non supporta l'output CSV nativo su disco.

Sono disponibili quattro soluzioni disponibili però:

  1. È possibile convertire i dataframe in un RDD:

    def convertToReadableString(r : Row) = ??? 
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath) 
    

    questo creerà un percorso file della cartella. Sotto il percorso del file, troverete i file partizioni (ad esempio, part-000 *)

    Cosa faccio di solito se voglio aggiungere tutte le partizioni in un grande CSV è

    cat filePath/part* > mycsvfile.csv 
    

    Alcuni utilizzerà coalesce(1,false) per creare una partizione dall'RDD. Di solito è una cattiva pratica , poiché potrebbe sovraccaricare il driver estraendo tutti i dati che si stanno raccogliendo.

    Si noti che df.rdd restituirà un RDD[Row].

  2. È possibile utilizzare Databricks scintilla csv library:

    • Spark 1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath) 
      
    • Spark 1.3:

      df.save(filepath,"com.databricks.spark.csv") 
      
  3. Wit h Spark 2.x il pacchetto spark-csv non è necessario poiché è incluso in Spark.

    df.write.format("csv").save(filepath) 
    
  4. È possibile convertire in frame di dati Panda locale e utilizzare il metodo to_csv (PySpark solo).

Nota: Solutions 1, 2 e 3 si tradurrà in file in formato CSV (part-*) generati dal sottostante API Hadoop quella scintilla chiamate quando si richiama save. Avrai un file part- per partizione.

+1

Penso che 'spark-csv' sia la soluzione preferita. Non è facile creare una linea csv corretta da zero. Tutti i dialetti e la corretta escaping possono essere abbastanza complicati. – zero323

+0

Sono totalmente d'accordo – eliasah

+1

In PySpark è anche possibile convertire la tabella piccola in Panda e salvare localmente. ma probabilmente una domanda di Scala. – zero323

0

Ho avuto un problema simile. Avevo bisogno di scrivere il file csv sul driver mentre ero connesso al cluster in modalità client.

Ho voluto riutilizzare lo stesso codice di analisi CSV di Apache Spark per evitare potenziali errori.

Ho controllato il codice spark-csv e il codice trovato responsabile della conversione del dataframe in csv grezzo RDD[String] in com.databricks.spark.csv.CsvSchemaRDD.

Tristemente codificato con sc.textFile e la fine del metodo pertinente.

Ho copiato il codice e rimosso le ultime righe con sc.textFile e ho restituito direttamente RDD.

Il mio codice:

/* 
    This is copypasta from com.databricks.spark.csv.CsvSchemaRDD 
    Spark's code has perfect method converting Dataframe -> raw csv RDD[String] 
    But in last lines of that method it's hardcoded against writing as text file - 
    for our case we need RDD. 
*/ 
object DataframeToRawCsvRDD { 

    val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat 

    def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map()) 
      (implicit ctx: ExecutionContext): RDD[String] = { 
    val delimiter = parameters.getOrElse("delimiter", ",") 
    val delimiterChar = if (delimiter.length == 1) { 
     delimiter.charAt(0) 
    } else { 
     throw new Exception("Delimiter cannot be more than one character.") 
    } 

    val escape = parameters.getOrElse("escape", null) 
    val escapeChar: Character = if (escape == null) { 
     null 
    } else if (escape.length == 1) { 
     escape.charAt(0) 
    } else { 
     throw new Exception("Escape character cannot be more than one character.") 
    } 

    val quote = parameters.getOrElse("quote", "\"") 
    val quoteChar: Character = if (quote == null) { 
     null 
    } else if (quote.length == 1) { 
     quote.charAt(0) 
    } else { 
     throw new Exception("Quotation cannot be more than one character.") 
    } 

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL") 
    val quoteMode: QuoteMode = if (quoteModeString == null) { 
     null 
    } else { 
     QuoteMode.valueOf(quoteModeString.toUpperCase) 
    } 

    val nullValue = parameters.getOrElse("nullValue", "null") 

    val csvFormat = defaultCsvFormat 
     .withDelimiter(delimiterChar) 
     .withQuote(quoteChar) 
     .withEscape(escapeChar) 
     .withQuoteMode(quoteMode) 
     .withSkipHeaderRecord(false) 
     .withNullString(nullValue) 

    val generateHeader = parameters.getOrElse("header", "false").toBoolean 
    val headerRdd = if (generateHeader) { 
     ctx.sparkContext.parallelize(Seq(
     csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) 
    )) 
    } else { 
     ctx.sparkContext.emptyRDD[String] 
    } 

    val rowsRdd = dataFrame.rdd.map(row => { 
     csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*) 
    }) 

    headerRdd union rowsRdd 
    } 

} 
0

ho avuto problema simile in cui ho dovuto salvare il contenuto del dataframe in un file CSV del nome che ho definito. df.write("csv").save("<my-path>") stava creando directory piuttosto che file. Quindi devono venire con le seguenti soluzioni. La maggior parte del codice è presa dal seguente dataframe-to-csv con piccole modifiche alla logica.

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = { 
    val tmpParquetDir = "Posts.tmp.parquet" 

    df.repartition(1).write. 
     format("com.databricks.spark.csv"). 
     option("header", header.toString). 
     option("delimiter", sep). 
     save(tmpParquetDir) 

    val dir = new File(tmpParquetDir) 
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv" 
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString 
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput)) 

    dir.listFiles.foreach(f => f.delete) 
    dir.delete 
    } 
Problemi correlati