2015-09-08 14 views
7

Ho un problema quando utilizzo lo sparkming per leggere da Cassandra.Lettura da Cassandra con Spark Streaming

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

Come il link qui sopra, io uso

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3) 

per selezionare i dati da Cassandra, ma sembra che lo streaming scintilla ha un solo query di una volta, ma voglio che continua a interrogare utilizzando un intervallo di 10 sencondi.

Il mio codice è come segue, desidero la vostra risposta.

Grazie!

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import com.datastax.spark.connector.streaming._ 
import org.apache.spark.rdd._ 
import scala.collection.mutable.Queue 


object SimpleApp { 
def main(args: Array[String]){ 
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1") 

    val ssc = new StreamingContext(conf, Seconds(10)) 

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

    //rdd.collect().foreach(println) 

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]() 


    val dstream = ssc.queueStream(rddQueue) 

    dstream.print() 

    ssc.start() 
    rdd.collect().foreach(println) 
    rddQueue += rdd 
    ssc.awaitTermination() 
} 

}

+0

Potresti descrivere che cosa si vuole raggiungere? Leggi la tabella completa su ogni intervallo? Da dove provengono i dati di streaming? – maasg

+0

@maasg Voglio leggere la tabella su ogni intervallo (come 10s) per interrogare alcuni record che sono correlati al tempo. Significa che voglio lasciare che la Cassandra sia la fonte di Spark Streaming. In una parola, sono bloccato alla creazione di DStream. Vuoi dare alcuni suggerimenti ed esempi? Grazie mille! –

risposta

6

È possibile creare un ConstantInputDStream con la CassandraRDD come input. ConstantInputDStream fornirà lo stesso RDD su ogni intervallo di streaming e, eseguendo un'azione su tale RDD, si attiverà una materializzazione del lignaggio RDD, che porterà all'esecuzione della query su Cassandra ogni volta.

Assicurarsi che i dati in corso di ricerca non si estendano illimitati per evitare l'aumento dei tempi di interrogazione e il risultato di un processo di streaming instabile.

Qualcosa del genere dovrebbe fare il trucco (utilizzando il codice come punto di partenza):

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 
+3

Cosa succede se desidero solo leggere ** nuovi dati ** salvati nella tabella dall'ultima elaborazione RDD? È possibile? –

+2

esiste un modo per impedire il recupero dei vecchi dati? mantiene in un ciclo infinito. –

+0

@yurishkuro AFAIK che al momento non è possibile. – maasg

0

ho avuto lo stesso problema e trovato una soluzione per la creazione di una sottoclasse di InputDStream. È necessario definire i metodi start() e compute().

start() può essere utilizzato per la preparazione. La logica principale risiede in compute(). Restituisce Option[RDD[T]]. Per rendere flessibile la classe, è definito il tratto InputStreamQuery.

trait InputStreamQuery[T] { 
    // where clause condition for partition key 
    def partitionCond : (String, Any) 
    // function to return next partition key 
    def nextValue(v:Any) : Option[Any] 
    // where clause condition for clustering key 
    def whereCond : (String, (T) => Any) 
    // batch size 
    def batchSize : Int 
} 

Per la tabella Cassandra keyspace.test, creare test_by_date che riorganizza la tabella per il partizionamento chiave date.

CREATE TABLE IF NOT exists keyspace.test 
(id timeuuid, date text, value text, primary key (id)) 

CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS 
SELECT * 
FROM keyspace.test 
WHERE id IS NOT NULL 
PRIMARY KEY (date, id) 
WITH CLUSTERING ORDER BY (id ASC); 

Una possibile implementazione per test tabella è

class class Test(id:UUID, date:String, value:String) 

trait InputStreamQueryTest extends InputStreamQuery[Test] { 
    val dateFormat = "uuuu-MM-dd" 

    // set batch size as 10 records 
    override def batchSize: Int = 10 

    // partitioning key conditions, query string and initial value 
    override def partitionCond: (String, Any) = ("date = ?", "2017-10-01") 
    // clustering key condition, query string and function to get clustering key from the instance 
    override def whereCond: (String, Test => Any) = (" id > ?", m => m.id) 
    // return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01' 
    override def nextValue(v: Any): Option[Any] = { 

    import java.time.format.DateTimeFormatter 

    val formatter = DateTimeFormatter.ofPattern(dateFormat) 
    val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1) 
    if (nextDate.isAfter(LocalDate.now())) None 
    else Some(nextDate.format(formatter)) 
    } 
} 

che può essere usata nella classe CassandraInputStream come segue.

class CassandraInputStream[T: ClassTag] 
(_ssc: StreamingContext, keyspace:String, table:String) 
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T]) 
extends InputDStream[T](_ssc) with InputStreamQuery[T] { 

var lastElm:Option[T] = None 
var partitionKey : Any = _ 

override def start(): Unit = { 

    // find a partition key which stores some records 
    def findStartValue(cql : String, value:Any): Any = { 
    val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1) 

    if (rdd.cassandraCount() > 0) value 
    else { 
     nextValue(value).map(findStartValue(cql, _)).getOrElse(value) 
    } 
    } 
    // get query string and initial value from partitionCond method 
    val (cql, value) = partitionCond 
    partitionKey = findStartValue(cql, value) 
} 

override def stop(): Unit = {} 

override def compute(validTime: Time): Option[RDD[T]] = { 
    val (cql, _) = partitionCond 
    val (wh, whKey) = whereCond 

    def fetchNext(patKey: Any) : Option[CassandraTableScanRDD[T]] = { 
    // query with partitioning condition 
    val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, patKey) 

    val rdd = lastElm.map{ x => 
     query.where(wh, whKey(x)).withAscOrder.limit(batchSize) 
    }.getOrElse(query.withAscOrder.limit(batchSize)) 

    if (rdd.cassandraCount() > 0) { 
     // store the last element of this RDD 
     lastElm = Some(rdd.collect.last) 
     Some(rdd) 
    } 
    else { 
     // find the next partition key which stores data 
     nextValue(patKey).flatMap{ k => 
     partitionKey = k 
     fetchNext(k)} 
    } 
    } 

    fetchNext(partitionKey) 
} 
} 

Combinando tutte le classi,

val conf = new SparkConf().setAppName(appName).setMaster(master) 
val ssc = new StreamingContext(conf, Seconds(10)) 

val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest 

dstream.map(println).saveToCassandra(...) 

ssc.start() 
ssc.awaitTermination() 
Problemi correlati