2016-11-28 82 views
0

我正在嘗試駱駝卡夫卡集成。
我有兩個隊列:
queue1queue2駱駝卡夫卡集成問題

有三種途徑:

  1. ROUTE1放入queue1兩個消息的列表(它應該這樣做只能一次)。
  2. Route2到讀取queue1列表,將其分解,並提出個人信息在queue2
  3. 路徑3讀取queue2消息,只是打印了。

的代碼如下:

import java.util.ArrayList; 
import java.util.List; 

import org.apache.camel.CamelContext; 
import org.apache.camel.Exchange; 
import org.apache.camel.Processor; 
import org.apache.camel.builder.RouteBuilder; 
import org.apache.camel.impl.DefaultCamelContext; 

public class CamelListTest { 
    public static void main(String[] args) throws Exception { 
     CamelContext context = new DefaultCamelContext(); 
     context.addRoutes(new CamelListRoute()); 
     context.start(); 
     Thread.sleep(30000); 
     context.stop(); 
    } 
} 

class CamelListRoute extends RouteBuilder { 
    @Override 
    public void configure() throws Exception { 


     //Route1, expected to run once 
     from("timer://timerName?repeatCount=1").process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       List<String> inOrderList = new ArrayList<String>(); 
       inOrderList.add("1"); 
       inOrderList.add("2"); 
       exchange.getIn().setBody(inOrderList, ArrayList.class); 
      } 
     }) 
     .to("kafka:<ip>:9092?topic=queue1"); 


     //Route2 
     from("kafka:<ip>:9092?topic=queue1&groupId=testing&autoOffsetReset=latest&consumersCount=1") 
     .split() 
     .body().process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       System.out.println("2nd Route : " + (exchange.getIn().getBody().toString())); 
      } 
     }) 
     .to("kafka:<ip>:9092?topic=queue2"); 


     //Route3 
     from("kafka:<ip>:9092?topic=queue2&groupId=testing&autoOffsetReset=latest&consumersCount=1") 
     .process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       System.out.println("3rd Route : " + (exchange.getIn().getBody().toString())); 
      } 
     }); 
    } 
} 

它不按預期工作,並有觀察到幾個問題:

  1. 第一條路線,預計將只運行一次(repeatCount = 1),連續運行,一次又一次地將相同的消息放入queue1
  2. 第二條路線讀取queue1消息,將其分解,但不把它放在queue2
  3. 由於第二路由沒有放任何東西queue2,這條路線沒有得到任何消息。

誰能幫我弄清楚這裏有什麼問題嗎?

回答

0

我看到幾件事情:

  1. 我希望你給卡夫卡網址是這樣的: 「卡夫卡://本地主機:9092主題=隊列1」

注:卡夫卡: //

  1. 爲消費者提供zookeeper網址例如:kafka://?話題=隊列1 & zookeeperConnect = & consumerStreams = 1 &的groupId =測試& autoOffsetReset = 最大

  2. 注意在以前的點autoOffsetReset值將最大最小代替最新

+0

是的,我正確地使用了網址。使用exchange.getOut()。setBody()將數據傳遞到下一個路由後,問題得到解決。 – rvd

+0

Ohk,我想你可以發表答案。這對其他人會有幫助。謝謝。 –

0

我想你應該交換消息。

在處理器

做類似:

exchng.getOut()的setHeader( 「類型」, 「隊列」);。 exchng.getOut()。setBody(exchng.getIn()。getBody());

然後可以在第二條路線上添加一個選項,不需要第三條路線。