2017-02-23 198 views
1

我試圖在Kafka消息流上進行流處理和CEP。爲此,我選擇了Apache Ignite來首先實現原型。但是我無法連接到隊列:Apache Ignite Kafka連接問題

使用 kafka_2.11-0.10.1.0 阿帕奇點燃織物-1.8.0-斌

斌/ zookeeper-server-start.sh配置/飼養員。屬性 斌/ kafka-server-start.sh配置/ server.properties 斌/ kafka-topics.sh --create --zookeeper本地主機:2181 --replication因子1 --partitions 1 --topic測試

卡夫卡正常工作,我與消費者進行了測試。 然後我開始點燃,然後運行下面的彈簧啓動命令行應用程序。

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); 

    Ignition.setClientMode(true); 

    Ignite ignite = Ignition.start(); 

    Properties settings = new Properties(); 
    // Set a few key parameters 
    settings.put("bootstrap.servers", "localhost:9092"); 
    settings.put("group.id", "test"); 
    settings.put("zookeeper.connect", "localhost:2181"); 
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    // Create an instance of StreamsConfig from the Properties instance 
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings); 

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache"); 

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) { 
     // allow overwriting cache data 
     stmr.allowOverwrite(true); 

     kafkaStreamer.setIgnite(ignite); 
     kafkaStreamer.setStreamer(stmr); 

     // set the topic 
     kafkaStreamer.setTopic("test"); 

     // set the number of threads to process Kafka streams 
     kafkaStreamer.setThreads(1); 

     // set Kafka consumer configurations 
     kafkaStreamer.setConsumerConfig(config); 

     // set decoders 
     StringDecoder keyDecoder = new StringDecoder(null); 
     StringDecoder valueDecoder = new StringDecoder(null); 

     kafkaStreamer.setKeyDecoder(keyDecoder); 
     kafkaStreamer.setValueDecoder(valueDecoder); 

     kafkaStreamer.start(); 
    } finally { 
     kafkaStreamer.stop(); 
    } 

應用程序啓動時我得到

2017年2月23日10:25:23.409 WARN 1388 --- [主] kafka.utils.VerifiableProperties:房產bootstrap.servers無效 2017年-02-23 10:25:23.410 INFO 1388 --- [main] kafka.utils.VerifiableProperties:屬性group.id被覆蓋測試 2017-02-23 10:25:23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties:屬性key.deserializer無效 2017-02-23 10:25:23.411警告1388 --- [main] kafka.utils.VerifiableProperties:屬性key.serializer無效 2017-0 2-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties:屬性value.deserializer無效 2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka。 utils.VerifiableProperties:屬性value.serializer無效 2017年2月23日10:25:23.411 INFO 1388 --- [主要] kafka.utils.VerifiableProperties:屬性zookeeper.connect被覆蓋爲localhost:2181

然後

2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils $:從主題[Set(test)]的相關ID 0獲取主題元數據經紀人[BrokerEndPoint(0,user.local,9092)]失敗

java.nio.channels.ClosedChannelException:null at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:80)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer.scala:79)〜[kafka_2.11 -0.10.0.1.jar:na] at kafka.producer.SyncProducer.send(SyncProducer.scala:124)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils $ .fetchTopicMetadata( ClientUtils.scala:59)[kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:94)[kafka_2.11-0.10.0.1.jar:na] 在kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork(ConsumerFetcherManag 66)[kafka_2.11-0.10.0.1.jar:na] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)[kafka_2.11-0.10.0.1.jar:na]

從隊列中讀取不起作用。 有沒有人有一個想法如何解決這個問題?

編輯:如果我評論finally塊,然後下面的錯誤的內容來

[2m2017-02-27 16時42分27秒。780 [0; 39m [35m29946 [0; 39m [2m ... [0; 39m [2m [pool-3-thread-1] [0; 39m [36m [0; 39m [2m: [0; 39m由於錯誤消息被忽略[msg = MessageAndMetadata(test,0,Message(magic = 1,attributes = 0,CreateTime = -1,crc = 2558126716,key = java.nio.HeapByteBuffer [pos = 0 lim = 1 cap = 79],payload = java.nio.HeapByteBuffer [pos = 0 lim = 74 cap = 74]),15941704,kafka.serializer.StringDecoder @ 74a96647,kafka.serializer.StringDecoder @ 42849d34,-1,CreateTime )]

java.lang.IllegalStateException:數據流傳輸已關閉。 at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal。處理器.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl。 java:667)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.stream.kafka.KafkaStreamer $ 1.run(KafkaStreamer.java:180)〜[ignite-kafka-1.8 .0.jar:1.8.0] at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)[na:1.8.0_111] at java.util.concurrent.FutureTask.run(FutureTask。 java:266)[na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j ava:1142)[na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)[na:1.8.0_111] at java.lang.Thread.run(Thread。 java:745)[na:1.8.0_111]

謝謝!

回答

1

我想,這是因爲它的開始之後KafkaStreamer是越來越關閉(kafkaStreamer.stop()呼叫finally塊)。 kafkaStreamer.start()不是同步的,它只是旋出線程從卡夫卡消耗並退出。

+0

感謝您的回答,如果我評論「finally」塊的內容,我收到上面發佈的錯誤(在編輯中) – razvan

+0

這是因爲您也關閉了'IgniteDataStreamer'。擺脫嘗試與資源塊,它會工作。 –

+0

嗨,我沒有得到應用程序運行(因爲我不知道如何從緩存中讀取),但至少我沒有得到錯誤了。所以我會將這個問題標記爲答案,併爲其餘的開啓一個新的問題。再次感謝,也許你也可以看看https://stackoverflow.com/questions/42562766/how-to-properly-read-from-ignite-cache – razvan