2013-04-02 11 views
5

Sto cercando di integrare Storm (see here) nel mio progetto. Io strappo i concetti di topologie, beccucci e bulloni. Ma ora, sto cercando di capire l'effettiva implementazione di alcune cose.Storm> Howto Integrazione della callback Java in uno becco

A) Ho un ambiente poliglotta con Java e Clojure. Il mio codice Java è una classe di callback con metodi che attivano lo streaming di dati. I dati dell'evento spinti a questi metodi sono ciò che voglio usare come erogatore.

Quindi la prima domanda è come collegare i dati che entrano in questi metodi, a un beccuccio? Sto cercando di i) passare un backtype.storm.topology.IRichSpout, quindi ii) passare un backtype.storm.spout.SpoutOutputCollector (see here) per aperta funzione che di beccuccio (see here). Ma non riesco a vedere un modo per passare effettivamente alcun tipo di mappa o elenco.

B) Il resto del mio progetto è tutto Clojure. Ci saranno molti dati che arrivano attraverso questi metodi. Ogni evento avrà un ID compreso tra 1 e 100. In Clojure, voglio dividere i dati provenienti dallo spout, in diversi thread di esecuzione. Quelli, penso, saranno i bulloni.

Come posso impostare un bullone Clojure per prelevare i dati degli eventi dallo spout, quindi interrompere un thread in base all'ID dell'evento in arrivo?

Grazie in anticipo Tim

[EDIT 1]

realtà ho ottenuto oltre questo problema. Ho finito per 1) implementando il mio IRichSpout. Ho quindi 2) collegato la tupla interna di quel beccuccio ai dati del flusso in ingresso nella mia classe di callback java. Non sono sicuro che sia idiomatico. Ma si compila e funziona senza errori. Tuttavia, 3) Non vedo i dati del flusso in entrata (sicuramente lì), che provengono dallo stampo bullone.

Per garantire che i dati dell'evento vengano propagati, c'è qualcosa di specifico che devo fare nell'implementazione spout o bullone o nella topologia? Grazie.

 

     ;; tie Java callbacks to a Spout that I created 
     (.setSpout java-callback ibspout) 

     (storm/defbolt printstuff ["word"] [tuple collector] 
     (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) 
    ) 
     (storm/topology 
     { "1" (storm/spout-spec ibspout) 
     } 
     { "3" (storm/bolt-spec { "1" :shuffle } 
           printstuff 
      ) 
     }) 

[EDIT 2]

Su consiglio di SO membro Ankur, sto e cambiare canale, il mio topologia. Dopo aver creato la mia callback Java, ho passato la sua tupla al seguente IBSpout, usando (.setTuple ibspout (.getTuple java-callback)). Non passo l'intero oggetto callback Java, perché ottengo un errore NotSerializable. Tutto si compila e funziona senza errori. Ma ancora una volta, non ci sono dati che arrivano al mio printstuff bullone. Hmmm.

 

    public class IBSpout implements IRichSpout { 

     /** 
     * Storm spout stuff 
     */ 
     private SpoutOutputCollector _collector; 

     private List _tuple = new ArrayList(); 
     public void setTuple(List tuple) { _tuple = tuple; } 
     public List getTuple() { return _tuple; } 

     /** 
     * Storm ISpout interface functions 
     */ 
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     _collector = collector; 
     } 
     public void close() {} 
     public void activate() {} 
     public void deactivate() {} 
     public void nextTuple() { 
     _collector.emit(_tuple); 
     } 
     public void ack(Object msgId) {} 
     public void fail(Object msgId) {} 


     public void declareOutputFields(OutputFieldsDeclarer declarer) {} 
     public java.util.Map getComponentConfiguration() { return new HashMap(); } 

    } 

risposta

0

risposta alla parte B:

La risposta diretta Sembra a me come siete alla ricerca di un raggruppamento di campo in modo da poter controllare ciò che funziona viene raggruppato insieme durante l'esecuzione per ID.

Detto questo, non sono sicuro che questa sia davvero una risposta completa perché non so perché stai cercando di farlo in questo modo. Se vuoi solo un carico di lavoro bilanciato, un raggruppamento shuffle è una scelta migliore.

+0

Ehi, grazie per aver guardato questo. In realtà ho specificato un **: shuffle ** per bilanciare il carico di lavoro. Il problema che sto avendo ora, è che non vedo i miei dati dell'evento propagarsi al mio bullone (vedi su modifica). Qualsiasi intuizione lì, è apprezzata. – Nutritioustim

+0

@Nutritioustim hai effettivamente capito qual era il problema? – Vor

+0

@Vor, No. Storm sembra un po 'troppo impraticabile per quello che sto cercando di fare. Per ora *** [Lamina] (https://github.com/ztellman/lamina) *** soddisfa i miei bisogni. HTH. – Nutritioustim

3

Sembra che si passi lo spout alla classe di callback che sembra un po 'strana. Quando una topologia viene eseguita, la tempesta richiama periodicamente il metodo spout nextTuple, quindi ciò che devi fare è passare il callback java all'implementazione dello spout personalizzato in modo che quando la tempesta chiama il beccuccio, lo spout chiama il callback java per ottenere il set successivo di tuple da inserire nella topologia.

Il concetto chiave da capire è che Beccucci estrae i dati quando richiesto dalla tempesta, si non spingere i dati a becchi. La richiamata non può chiamare lo spout per inviare dati ad esso, piuttosto che il beccuccio deve estrarre i dati (da qualche metodo java o qualsiasi buffer di memoria) quando viene chiamato il metodo nextTuple dello spout.

+0

Oh bello. Grazie per l'intuizione. Ma non vedo ancora i dati passare attraverso il beccuccio, al mio fulmine. Ho dato una descrizione migliore. Forse dovrei fare qualcosa di specifico con il mio beccuccio? C'è un modo particolare per cui la struttura dati deve essere passata allo Spout? Grazie. – Nutritioustim

+0

@Nutritioustim Hai ricevuto una risposta? – hawkeye

+0

Heyya. Vedi. Non potevo avere la tempesta per fare ciò che volevo. E *** [Lamina] (https://github.com/ztellman/lamina) *** è stato uno strumento molto più leggero che ha risolto il mio problema. HTH. – Nutritioustim