2013-03-10 11 views
5

Sto provando a scrivere un sink personalizzato per flume-ng. Ho esaminato i sink e la documentazione esistenti e l'ho codificato. Tuttavia, il metodo 'process()' che dovrebbe ricevere gli eventi finisce sempre con null. Sto facendo Event event = channel.take(); ma l'evento è nullo. Vedo nei registri che questo metodo viene chiamato ripetutamente poiché l'evento è ancora nel canale.Sink personalizzato per evento null Flume-ng

Qualcuno può indicarmi la giusta direzione?

risposta

5

Questo è lo scheletro di una funzione di processo ... se non si riesce ottenere un evento che rollback, cambiare lo stato di backoff. Altrimenti, è possibile eseguire il commit e impostare lo stato su READY. Non importa cosa, si chiude sempre la transazione.

Status status = null; 
    Channel channel = getChannel(); 
    Transaction transaction = channel.getTransaction(); 
    transaction.begin(); 
    try { 
     Event event = channel.take(); 

     if (event != null && validEvent(event.getBody()) >= 0) { 
      # make some printing 
     } 
     transaction.commit(); 
     status = Status.READY; 
    } catch (Throwable ex) { 
     transaction.rollback(); 
     status = Status.BACKOFF; 
     logger.error("Failed to deliver event. Exception follows.", ex); 
     throw new EventDeliveryException("Failed to deliver event: " + ex); 
    } finally { 
     transaction.close(); 
    } 
    return status; 

Sono sicuro che funzionerà :).

+0

Impressionante, grazie, mi sta ancora aiutando nel 2016 .. – logicalgeek

+0

hey ho un problema simile qui: https://stackoverflow.com/questions/46479157/streaming-kafka- messages-to-mysql-database hai qualche idea al riguardo? –

4

Questo è di progettazione. Il sink runner eseguirà il polling del sink con gli eventi null in modo che sia sicuro che il sink è vivo e pronto ad accettare eventi futuri. Quando ricevi un evento null, assicurati di restituire Status.BACKOFF e il processore di sink attenderà un po 'prima di riprovare.

+0

Strano che [documentazione] (http://flume.apache.org/FlumeDeveloperGuide.html#sink) non dica nulla al riguardo. – Dmitry

+0

Sono d'accordo. La documentazione di Flume è molto minimale e dovrebbe essere un po 'più dettagliata. – logicalgeek

+0

Qual è la durata del backoff? E come è controllato? La classe AbstractSink non implementa metodi come le Sorgenti, ad es. public long getBackOffSleepIncrement() public long getMaxBackOffSleepInterval ( – bearrito

Problemi correlati