Sto usando KafkaSpout. Si prega di trovare il programma di test qui sotto.Kafka Storm Integration using Kafka Spout
Sto usando Storm 0.8.1. La classe Multischeme è presente in Storm 0.8.2. Lo userò. Voglio solo sapere in che modo le versioni precedenti funzionavano solo istanziando la classe StringScheme()? Dove posso scaricare le versioni precedenti di Kafka Spout? Ma dubito che sarebbe un'alternativa corretta rispetto a lavorare su Storm 0.8.2. ??? (Confuso)
Quando eseguo il codice (indicato di seguito) nel cluster Storm (vale a dire quando spingo la topologia) ottengo il seguente errore (Questo accade quando la parte Scheme è commentata altrimenti ovviamente otterrò l'errore del compilatore come la classe non è lì in 0.8.1):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
nel codice può trovare la spoutConfig.scheme = new StringScheme() di seguito riportate voi; parte commentata. Stavo ricevendo l'errore del compilatore se non commento quella linea che è naturale dato che non ci sono costruttori. Anche quando istanzio MultiScheme ottengo un errore perché non ho quella classe in 0.8.1.
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}
Immagino Non capisco il problema: funziona solo se vai a 0.8.2? In tal caso, perché provare a eseguire anche 0.8.1: 0.8.2 lo sostituisce con alcune correzioni di bug e altri miglioramenti. –