2014-07-16 10 views
6

Vorrei convertire un DStream in un array, elenco, ecc. Quindi posso quindi tradurlo in json e servirlo su un endpoint. Sto usando la scintilla di apache, iniettando dati di Twitter. Come faccio a preformare questa operazione su Dstream statuses? Non riesco a trovare niente per funzionare diverso da print().Per ciascun RDD in un DStream, come posso convertirlo in un array o qualche altro tipico tipo di dati Java?

import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.twitter._ 
import org.apache.spark.streaming.StreamingContext._ 
import TutorialHelper._ 
object Tutorial { 
    def main(args: Array[String]) { 

    // Location of the Spark directory 
    val sparkHome = "/opt/spark" 

    // URL of the Spark cluster 
    val sparkUrl = "local[8]" 

    // Location of the required JAR files 
    val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar" 

    // HDFS directory for checkpointing 
    val checkpointDir = "/tmp" 

    // Configure Twitter credentials using twitter.txt 
    TutorialHelper.configureTwitterCredentials() 

    val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile)) 

    val filters = Array("#americasgottalent", "iamawesome") 
    val tweets = TwitterUtils.createStream(ssc, None, filters) 

    val statuses = tweets.map(status => status.getText()) 

    val arry = Array("firstval") 
    statuses.foreachRDD { 
     arr :+ _.collect() 
    } 

    ssc.checkpoint(checkpointDir) 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

risposta

10

Se il tuo RDD è statuses puoi farlo.

val arr = new ArrayBuffer[String](); 
statuses.foreachRDD { 
    arr ++= _.collect() //you can now put it in an array or d w/e you want with it 
    ... 
} 

Tenete a mente questo potrebbe finire per essere modo più dati di quanto si vuole nel driver dal momento che un DSTREAM può essere enorme.

+0

Se rimuovo statuses.print() e aggiungi statuses.foreachRDD {val = arr _.collect()} ottengo un errore: manca il tipo di parametro per la funzione espansa ((x $ 1) => x 1 $. collect()) – CodingIsAwesome

+0

@CodingIsAwesome sistema di scala di tipo è un po 'strano a volte, i tuoi RDD hanno stringhe giusto? – aaronman

+0

@CodingIsAwesome Penso che l'aggiornamento dovrebbe funzionare se non, basta aggiungere le informazioni sul tipo come in una normale scala lambda 'foreachRDD ((x: RDD [String]) => arr ++ x.collect())' – aaronman

4

Trasforma il nostro livello, ma quello che ho cercato è.

statuses.foreachRDD(rdd => { 
    for(item <- rdd.collect().toArray) { 
     println(item); 
    } 
}) 
+0

non per scherzare, ma la mia risposta è quasi identica e porta sicuramente i contenuti del RDD all'autista, forse tu stavo usando il buffer di array sbagliato/stampando l'array sbagliato, tieni a mente 'println (arr)' non fa quello che ti aspetti – aaronman

Problemi correlati