Spark DStream ha l'API mapPartition
, mentre l'API Flink DataStream
non lo fa. C'è qualcuno che potrebbe aiutare a spiegare il motivo. Quello che voglio fare è implementare un'API simile a Spark reduceByKey
su Flink.API Apache Flink DataStream non ha una trasformazione mapPartition
risposta
Il modello di elaborazione del flusso di Flink è molto diverso da Spark Streaming che è centrato su mini lotti. In Spark Streaming ogni mini batch viene eseguito come un normale programma batch su un insieme finito di dati, mentre i programmi Flink DataStream elaborano continuamente i record.
In DataSet API di Flink, uno MapPartitionFunction
ha due parametri. Un iteratore per l'input e un collector per il risultato della funzione. Un MapPartitionFunction
in un programma Flink DataStream non ritornerà mai dalla prima chiamata di funzione, perché l'iteratore dovrebbe scorrere su un flusso infinito di record. Tuttavia, il modello di elaborazione del flusso interno di Flink richiede che le funzioni utente vengano riportate allo stato della funzione checkpoint. Pertanto, l'API DataStream non offre una trasformazione mapPartition
.
Per implementare funzionalità simili a Spark Streaming reduceByKey
, è necessario definire una finestra con chiave sullo stream. Windows discretizza i flussi che è in qualche modo simile ai mini batch ma Windows offre molta più flessibilità. Poiché una finestra è di dimensioni finite, è possibile chiamare la finestra reduce
.
Questo potrebbe apparire come:
yourStream.keyBy("myKey") // organize stream by key "myKey"
.timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
.reduce(new YourReduceFunction); // apply a reduce function on each window
Il DataStream documentation mostra come definire i vari tipi di finestra e spiega tutte le funzioni disponibili.
Nota: L'API DataStream è stata rielaborata di recente. Nell'esempio si presuppone l'ultima versione (0.10-SNAPSHOT) che sarà rilasciata come 0.10.0 nei prossimi giorni.
Assumendo che il flusso di ingresso è di dati singola partizione (ad esempio String)
val new_number_of_partitions = 4
//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)
//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
// var local_val_to_different_part : Type = null
var myTaskId : Int = null
//below function is executed once for each mapper function (one mapper per partition)
override def open(config: Configuration): Unit = {
myTaskId = getRuntimeContext.getIndexOfThisSubtask
//do whatever initialization you want to do. read from data sources..
}
def map(value: String): (String, Int) = {
(value, myTasKId)
}
})
val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function
Flink streaming è puro in streaming (non dosato uno). Dai un'occhiata all'API di Iterate.
- 1. Kafka -> Flink DataStream -> MongoDB
- 2. Grado di parallelismo in Apache Flink
- 3. zipWithIndex su Apache Flink
- 4. Flink Scala API "non abbastanza argomenti"
- 5. Apache Flink vs Twitter Airone?
- 6. Ordinamento globale in Apache Flink
- 7. Impossibile usare apache flink in amazon emr
- 8. Ingresso compresso BZip2 per Apache Flink
- 9. Qual è la differenza tra Apache Spark e Apache Flink?
- 10. In che modo Apache Flink implementa le iterazioni?
- 11. Ottenere elementi JSON da un web con Apache Flink
- 12. Apache Flink vs Apache Spark come piattaforme per l'apprendimento automatico su larga scala?
- 13. Come eseguire il flatMap di una funzione su GroupedDataSet in Apache Flink
- 14. Dov'è la differenza tra una "trasformazione" e una "trasformazione affine"?
- 15. Crashlytics ha trovato una chiave API non valida - build AndroidStudio
- 16. api = twitter.Api() AttributeError: l'oggetto 'module' non ha attributo 'Api
- 17. Trasformazione CSS3 non funzionante
- 18. Che aspetto ha una "stringa di connessione" di Apache Curator?
- 19. JavaScript, oggetto di trasformazione in una matrice
- 20. Spark vs Flink memoria disponibile
- 21. Scoping una trasformazione in cetriolo
- 22. comportamento Iterator in Flink reduceGroup
- 23. In che modo Apache Flink è paragonabile a Mapreduce su Hadoop?
- 24. Trasformazioni XDT - Trasformazione della trasformazione
- 25. Implementazione trasformazione operativa (non javascript)
- 26. Conversione di una trasformazione affine Eigen in una trasformazione isometrica Eigen
- 27. scrivibile hadoop NotSerializableException con Apache Spark API
- 28. java: API Preferenze vs Configurazione Apache Commons
- 29. Apache Shiro vs API native Java EE
- 30. XSLT: trasformazione in contenuto non-xml?
Sembra che la soluzione di "reduceByKey" fornita sia simile a "GroupByKey" nella scintilla diversa da "reduceByKey". https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html –
No, il comando 'reduce()' di Flink è simile a 'reduceByKey' di Spark, una funzione di riduzione della coppia su un gruppo. La definizione del gruppo è leggermente diversa, perché Flink usa le coppie chiave-valore di Windows e Spark in mini gruppi. Non c'è un equivalente diretto di Spark's 'groupByKey' in Flink, perché ciò implica che il gruppo completo deve essere materializzato in memoria, il che può portare a OutOfMemoryErrors e uccidere la JVM. Flink offre 'groupReduce()' per consumare un iteratore in streaming. –
Vedo che il metodo Riduci() si applica combinabile. E 'una ragione simile che Flink DataStream non ha riduci API Group come mapPartition? –