2016-04-30 14 views
6

ho un po 'codice come questo:Spark Scala ottenere i dati da rdd.foreachPartition

 println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass) 
     val lastRevs = distinctFileGidsRDD. 
     foreachPartition(iter => { 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      while(iter.hasNext) { 
      val item = iter.next() 
      //println(item(0)) 
      println("String: "+item(0).toString()) 
      val jsonStr = DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar". 
       map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      println("\nJSON: "+jsonStr) 
      } 
     }) 
     println("\nEND Last Revs Class: "+ lastRevs.getClass) 

Le uscite di codice (con modifiche pesanti) qualcosa come:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD 
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
... 
JSON: None() 
END Last Revs Class: void 

DOMANDA 1: Come può Ottengo il valore lastRevs in un formato utile come la stringa JSON/null o un'opzione come Some/None?

DOMANDA 2: La mia preferenza: esiste un altro modo per ottenere nei dati delle partizioni un formato simile a RDD (piuttosto che il formato iteratore)?

dstream.foreachRDD { (rdd, time) => 
    rdd.foreachPartition { partitionIterator => 
    val partitionId = TaskContext.get.partitionId() 
    val uniqueId = generateUniqueId(time.milliseconds, partitionId) 
    // use this uniqueId to transactionally commit the data in partitionIterator 
    } 
} 

da http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

DOMANDA 3: è il metodo di ottenere i dati che sto usando un metodo sano di mente (dato sto seguendo il link qui sopra)? (Metti da parte il fatto che questo è un sistema scalikejdbc JDBC in questo momento. Questo sarà un deposito chiave di valore diverso da questo prototipo.)

+0

Non capisco la domanda. 'lastRevs' dovrebbe essere' Unit' perché '.forEachPartition' è usato solo per il suo effetto collaterale (la funzione è T => Unit). Penso che tu voglia trasformare i dati, come invece usare 'mapPartitions'. Mi piacerebbe capire qual è l'obiettivo generale qui, perché le singole domande non hanno molto senso (per me) – maasg

+0

@maasg: Sì. Questa è la risposta che sto cercando - mapPartitions. Ho trovato un altro esempio su http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine. – codeaperature

risposta

4

Per creare una trasformazione che utilizza risorse locali per l'esecutore (come un DB o una connessione di rete), è necessario utilizzare rdd.mapPartitions. Permette di inizializzare alcuni codici localmente sull'esecutore e utilizzare quelle risorse locali per elaborare i dati nella partizione.

Il codice dovrebbe essere simile:

val lastRevs = distinctFileGidsRDD. 
     mapPartitions{iter => 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      iter.map{ element => 
      DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar" 
       .map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      } 
     } 
+0

vuoi dire che differisce da 'foreachPartition' in quanto utilizza le risorse dell'Executor invece delle risorse del Driver? Vale a dire. codice 'codice foreachPartition' viene eseguito su Driver considerando' mapPartitions' su Executor ... giusto? – lisak

+2

@lisak No, sia 'foreachPartition' che' mapPartitions' ti permetteranno di eseguire codice sugli esecutori. La differenza è che 'foreachPartition' fa solo effetti collaterali (come scrivere in un db), mentre' mapPartitions' restituisce un valore. La chiave di questa domanda è "come recuperare i dati", quindi "mapPartitions" è la strada da seguire. – maasg

Problemi correlati