2015-12-08 8 views
9

Sto riscontrando alcuni problemi a comprendere la semantica attorno alla finestra temporale degli eventi. Il seguente programma genera alcune tuple con i timestamp che vengono utilizzati come tempo dell'evento e fa una semplice aggregazione di finestre. Mi aspetto che l'output sia nello stesso ordine dell'input, ma l'output è ordinato in modo diverso. Perché l'uscita è fuori servizio rispetto al tempo dell'evento?Ordinamento finestre per lo streaming degli eventi Flink streaming

import java.util.concurrent.TimeUnit 
import org.apache.flink.streaming.api.TimeCharacteristic 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.scala._ 

object WindowExample extends App { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.getConfig.enableTimestamps() 
    env.setParallelism(1) 

    val start = 1449597577379L 
    val tuples = (1 to 10).map(t => (start + t * 1000, t)) 

    env.fromCollection(tuples) 
     .assignAscendingTimestamps(_._1) 
     .timeWindowAll(Time.of(1, TimeUnit.SECONDS)) 
     .sum(1) 
     .print() 

    env.execute() 
} 

L'input:

(1449597578379,1) 
(1449597579379,2) 
(1449597580379,3) 
(1449597581379,4) 
(1449597582379,5) 
(1449597583379,6) 
(1449597584379,7) 
(1449597585379,8) 
(1449597586379,9) 
(1449597587379,10) 

risultati:

[info] (1449597579379,2) 
[info] (1449597581379,4) 
[info] (1449597583379,6) 
[info] (1449597585379,8) 
[info] (1449597587379,10) 
[info] (1449597578379,1) 
[info] (1449597580379,3) 
[info] (1449597582379,5) 
[info] (1449597584379,7) 
[info] (1449597586379,9) 

risposta

10

La ragione di questo comportamento è che nel Flink l'ordinamento di elementi (rispetto al timestamp) non si tiene account. Solo la correttezza delle filigrane e la loro relazione con il timestamp degli elementi è importante per le operazioni che considerano il tempo poiché le filigrane normalmente innescano il calcolo in operazioni basate sul tempo.

Nell'esempio, l'operatore della finestra memorizza tutti gli elementi dall'origine nei buffer di finestra interni. Quindi, la sorgente emette una filigrana che dice che nessun elemento con un timestamp più piccolo arriverà in futuro. Questo, a sua volta, dice all'operatore della finestra di elaborare tutte le finestre con timestamp di fine che sono sotto le filigrane (che è vero per tutte le finestre). Pertanto, emette tutte le finestre (con ordinamento arbitrario) e successivamente emette una filigrana. Le operazioni a valle di questo stesso riceveranno gli elementi e potranno eseguire l'elaborazione una volta che avranno ricevuto filigrane.

Per impostazione predefinita, l'intervallo di emissione delle filigrane dalle origini è 200 ms. Con la piccola quantità di elementi che la tua fonte emette tutti vengono emessi prima che venga emessa la prima filigrana. In un caso d'uso del mondo reale, dove gli intervalli di emissione della filigrana sono molto più piccoli della dimensione della finestra, si otterrebbe il comportamento previsto delle finestre emesse nell'ordine del loro timestamp. Ad esempio, se hai 1 ora finestre e filigrane ogni 500 ms.

+1

Potrebbe fornire o indicare un esempio di un'operazione downstream che potrebbe riordinare gli elementi in base all'ora dell'evento una volta ricevuta una filigrana? Grazie! –

+1

@MaximKolchin tale riordino avviene ad es. nella libreria CEP. Puoi dare un'occhiata qui: https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java –