ho un po 'codice come questo:Spark Scala ottenere i dati da rdd.foreachPartition
println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
val lastRevs = distinctFileGidsRDD.
foreachPartition(iter => {
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
while(iter.hasNext) {
val item = iter.next()
//println(item(0))
println("String: "+item(0).toString())
val jsonStr = DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
map { resultSet => resultSet.string(1) }.single.apply()
}
println("\nJSON: "+jsonStr)
}
})
println("\nEND Last Revs Class: "+ lastRevs.getClass)
Le uscite di codice (con modifiche pesanti) qualcosa come:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...)
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...)
...
JSON: None()
END Last Revs Class: void
DOMANDA 1: Come può Ottengo il valore lastRevs in un formato utile come la stringa JSON/null o un'opzione come Some/None?
DOMANDA 2: La mia preferenza: esiste un altro modo per ottenere nei dati delle partizioni un formato simile a RDD (piuttosto che il formato iteratore)?
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
da http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
DOMANDA 3: è il metodo di ottenere i dati che sto usando un metodo sano di mente (dato sto seguendo il link qui sopra)? (Metti da parte il fatto che questo è un sistema scalikejdbc JDBC in questo momento. Questo sarà un deposito chiave di valore diverso da questo prototipo.)
Non capisco la domanda. 'lastRevs' dovrebbe essere' Unit' perché '.forEachPartition' è usato solo per il suo effetto collaterale (la funzione è T => Unit). Penso che tu voglia trasformare i dati, come invece usare 'mapPartitions'. Mi piacerebbe capire qual è l'obiettivo generale qui, perché le singole domande non hanno molto senso (per me) – maasg
@maasg: Sì. Questa è la risposta che sto cercando - mapPartitions. Ho trovato un altro esempio su http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine. – codeaperature