Ho avuto un problema simile. Avevo bisogno di scrivere il file csv sul driver mentre ero connesso al cluster in modalità client.
Ho voluto riutilizzare lo stesso codice di analisi CSV di Apache Spark per evitare potenziali errori.
Ho controllato il codice spark-csv e il codice trovato responsabile della conversione del dataframe in csv grezzo RDD[String]
in com.databricks.spark.csv.CsvSchemaRDD
.
Tristemente codificato con sc.textFile
e la fine del metodo pertinente.
Ho copiato il codice e rimosso le ultime righe con sc.textFile
e ho restituito direttamente RDD.
Il mio codice:
/*
This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
But in last lines of that method it's hardcoded against writing as text file -
for our case we need RDD.
*/
object DataframeToRawCsvRDD {
val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat
def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
(implicit ctx: ExecutionContext): RDD[String] = {
val delimiter = parameters.getOrElse("delimiter", ",")
val delimiterChar = if (delimiter.length == 1) {
delimiter.charAt(0)
} else {
throw new Exception("Delimiter cannot be more than one character.")
}
val escape = parameters.getOrElse("escape", null)
val escapeChar: Character = if (escape == null) {
null
} else if (escape.length == 1) {
escape.charAt(0)
} else {
throw new Exception("Escape character cannot be more than one character.")
}
val quote = parameters.getOrElse("quote", "\"")
val quoteChar: Character = if (quote == null) {
null
} else if (quote.length == 1) {
quote.charAt(0)
} else {
throw new Exception("Quotation cannot be more than one character.")
}
val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
val quoteMode: QuoteMode = if (quoteModeString == null) {
null
} else {
QuoteMode.valueOf(quoteModeString.toUpperCase)
}
val nullValue = parameters.getOrElse("nullValue", "null")
val csvFormat = defaultCsvFormat
.withDelimiter(delimiterChar)
.withQuote(quoteChar)
.withEscape(escapeChar)
.withQuoteMode(quoteMode)
.withSkipHeaderRecord(false)
.withNullString(nullValue)
val generateHeader = parameters.getOrElse("header", "false").toBoolean
val headerRdd = if (generateHeader) {
ctx.sparkContext.parallelize(Seq(
csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
))
} else {
ctx.sparkContext.emptyRDD[String]
}
val rowsRdd = dataFrame.rdd.map(row => {
csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
})
headerRdd union rowsRdd
}
}
btw questo non restituisce un array, ma un DataFrame! [riferimento qui] (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah
Se la risposta fornita risolve la tua domanda, ti preghiamo di accettarla e up-vote quindi possiamo classificare questa domanda come risolta! – eliasah