2015-03-11 24 views
5

chiedendo perché l'esempio StatefulNetworkWordCount.scala chiama l'updateStateByKey infame funzione di(), che dovrebbe prendere una funzione solo come parametro con invece:Spark in streaming viene chiamato updateStateByKey con parametri aggiuntivi

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, 
    new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) 

Perché la necessità (e come viene elaborato - questo non è nella firma di updateStateByKey()?) per passare un partizionatore, un booleano e un RDD?

grazie, Matt

risposta

4

è perché:

  1. Si vede il diverso ramo di release Spark: https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala. In Spark 1.2 questo codice era solo con updateStateByKey ricevendo una singola funzione come parametro, mentre in 1.3 l'hanno ottimizzato
  2. Esistono versioni diverse di updateStateByKey in 1.2 e 1.3. Ma in 1.2 non esiste una versione con 4 parametri, è stato introdotto solo nel 1.3: https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Ecco il codice:

/** 
* Return a new "state" DStream where the state for each key is updated by applying 
* the given function on the previous state of the key and the new values of each key. 
* org.apache.spark.Partitioner is used to control the partitioning of each RDD. 
* @param updateFunc State update function. Note, that this function may generate a different 
* tuple with a different key than the input key. Therefore keys may be removed 
* or added in this way. It is up to the developer to decide whether to 
* remember the partitioner despite the key being changed. 
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new 
* DStream 
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. 
* @param initialRDD initial state value of each key. 
* @tparam S State type 
*/ 
def updateStateByKey[S: ClassTag](
    updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], 
    partitioner: Partitioner, 
    rememberPartitioner: Boolean, 
    initialRDD: RDD[(K, S)] 
): DStream[(K, S)] = { 
    new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, 
    rememberPartitioner, Some(initialRDD)) 
} 
Problemi correlati