2015-01-02 18 views
5

Supponiamo di voler essenzialmente Stream.from(0) come InputDStream. Come andrei su questo? L'unico modo che vedo è utilizzare StreamingContext#queueStream, ma dovrei accodare elementi da un'altra discussione o sottoclasse Queue per creare una coda che si comporta come un flusso infinito, che si sentono entrambi come un hack.Come utilizzare infiniti flussi Scala come origine in Spark Streaming?

Qual è il modo corretto per farlo?

risposta

2

Non penso che sia disponibile in Spark di default ma è facile da implementare con ReceiverInputDStream.

import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 

class InfiniteStreamInputDStream[T](
     @transient ssc_ : StreamingContext, 
     stream: Stream[T], 
     storageLevel: StorageLevel 
    ) extends ReceiverInputDStream[T](ssc_) { 

    override def getReceiver(): Receiver[T] = { 
    new InfiniteStreamReceiver(stream, storageLevel) 
    } 
} 

class InfiniteStreamReceiver[T](stream: Stream[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) { 

    // Stateful iterator 
    private val streamIterator = stream.iterator 

    private class ReadAndStore extends Runnable { 
    def run(): Unit = { 
     while (streamIterator.hasNext) { 
     val next = streamIterator.next() 
     store(next) 
     } 
    } 
    } 

    override def onStart(): Unit = { 
    new Thread(new ReadAndStore).run()  
    } 

    override def onStop(): Unit = { } 
} 
0

codice Leggermente modificato tat funziona con Spark 2.0:

import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 
import scala.reflect.ClassTag 

class InfiniteDStream[T: ClassTag](
            @transient ssc_ : StreamingContext, 
            stream: Stream[T], 
            storageLevel: StorageLevel 
           ) extends ReceiverInputDStream[T](ssc_) { 

    override def getReceiver(): Receiver[T] = { 
    new InfiniteStreamReceiver(stream, storageLevel) 
    } 
} 

class InfiniteStreamReceiver[T](stream: Stream[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) { 

    private class ReadAndStore extends Runnable { 
    def run(): Unit = { 
     stream.foreach(store) 
    } 
    } 

    override def onStart(): Unit = { 
    val t = new Thread(new ReadAndStore) 
    t.setDaemon(true) 
    t.run() 
    } 

    override def onStop(): Unit = {} 
} 
Problemi correlati