2013-06-27 21 views
5

Sto usando KafkaSpout. Si prega di trovare il programma di test qui sotto.Kafka Storm Integration using Kafka Spout

Sto usando Storm 0.8.1. La classe Multischeme è presente in Storm 0.8.2. Lo userò. Voglio solo sapere in che modo le versioni precedenti funzionavano solo istanziando la classe StringScheme()? Dove posso scaricare le versioni precedenti di Kafka Spout? Ma dubito che sarebbe un'alternativa corretta rispetto a lavorare su Storm 0.8.2. ??? (Confuso)

Quando eseguo il codice (indicato di seguito) nel cluster Storm (vale a dire quando spingo la topologia) ottengo il seguente errore (Questo accade quando la parte Scheme è commentata altrimenti ovviamente otterrò l'errore del compilatore come la classe non è lì in 0.8.1):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme 
     at storm.kafka.TestTopology.main(TestTopology.java:37) 
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme 

nel codice può trovare la spoutConfig.scheme = new StringScheme() di seguito riportate voi; parte commentata. Stavo ricevendo l'errore del compilatore se non commento quella linea che è naturale dato che non ci sono costruttori. Anche quando istanzio MultiScheme ottengo un errore perché non ho quella classe in 0.8.1.

public class TestTopology { 
    public static class PrinterBolt extends BaseBasicBolt { 
     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     } 

     public void execute(Tuple tuple, BasicOutputCollector collector) { 
      System.out.println(tuple.toString()); 
     } 
    } 

    public static void main(String [] args) throws Exception { 
     List<HostPort> hosts = new ArrayList<HostPort>(); 
     hosts.add(new HostPort("127.0.0.1",9092)); 
     LocalCluster cluster = new LocalCluster(); 
     TopologyBuilder builder = new TopologyBuilder(); 
     SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID"); 
     spoutConfig.zkServers=ImmutableList.of("localhost"); 
     spoutConfig.zkPort=2181; 
     //spoutConfig.scheme=new StringScheme(); 
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     builder.setSpout("spout",new KafkaSpout(spoutConfig)); 
     builder.setBolt("printer", new PrinterBolt()) 
       .shuffleGrouping("spout"); 
     Config config = new Config(); 

     cluster.submitTopology("kafka-test", config, builder.createTopology()); 

     Thread.sleep(600000); 
    } 
+0

Immagino Non capisco il problema: funziona solo se vai a 0.8.2? In tal caso, perché provare a eseguire anche 0.8.1: 0.8.2 lo sostituisce con alcune correzioni di bug e altri miglioramenti. –

risposta

8

Ho avuto lo stesso problema. Alla fine l'ho risolto, e ho messo l'esempio completo in esecuzione su github.

Si sono invitati a verificarlo qui> https://github.com/buildlackey/cep

(clic sulla directory tempesta + Kafka per un programma di esempio che dovrebbe farti installato e funzionante).

+8

Considera di aggiungere una o due frasi alla tua risposta per descrivere cosa hai fatto in modo che la tua risposta sia pertinente senza fare affidamento su quel repository Git attivo. – neontapir

+0

Sicuro: il progetto contiene test unitari e programmi di esempio che illustrano come sviluppare applicazioni di elaborazione eventi complesse (CEP) su Storm, Kafka ed Esper. –

+0

mi suona bene –

5

Abbiamo avuto un problema simile.

La nostra soluzione:

  1. Aprire pom.xml

  2. Cambia ambito da fornita per <scope>compile</scope>

Se vuoi sapere di più sugli ambiti di dipendenza controllare il docu Maven: Maven docu - dependency scopes