Vorrei convertire un DStream in un array, elenco, ecc. Quindi posso quindi tradurlo in json e servirlo su un endpoint. Sto usando la scintilla di apache, iniettando dati di Twitter. Come faccio a preformare questa operazione su Dstream statuses
? Non riesco a trovare niente per funzionare diverso da print()
.Per ciascun RDD in un DStream, come posso convertirlo in un array o qualche altro tipico tipo di dati Java?
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
// Location of the Spark directory
val sparkHome = "/opt/spark"
// URL of the Spark cluster
val sparkUrl = "local[8]"
// Location of the required JAR files
val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"
// HDFS directory for checkpointing
val checkpointDir = "/tmp"
// Configure Twitter credentials using twitter.txt
TutorialHelper.configureTwitterCredentials()
val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))
val filters = Array("#americasgottalent", "iamawesome")
val tweets = TwitterUtils.createStream(ssc, None, filters)
val statuses = tweets.map(status => status.getText())
val arry = Array("firstval")
statuses.foreachRDD {
arr :+ _.collect()
}
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
Se rimuovo statuses.print() e aggiungi statuses.foreachRDD {val = arr _.collect()} ottengo un errore: manca il tipo di parametro per la funzione espansa ((x $ 1) => x 1 $. collect()) – CodingIsAwesome
@CodingIsAwesome sistema di scala di tipo è un po 'strano a volte, i tuoi RDD hanno stringhe giusto? – aaronman
@CodingIsAwesome Penso che l'aggiornamento dovrebbe funzionare se non, basta aggiungere le informazioni sul tipo come in una normale scala lambda 'foreachRDD ((x: RDD [String]) => arr ++ x.collect())' – aaronman