我正在嘗試駱駝卡夫卡集成。
我有兩個隊列:
queue1
和queue2
。駱駝卡夫卡集成問題
有三種途徑:
- ROUTE1放入
queue1
兩個消息的列表(它應該這樣做只能一次)。 - Route2到讀取
queue1
列表,將其分解,並提出個人信息在queue2
- 路徑3讀取
queue2
消息,只是打印了。
的代碼如下:
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class CamelListTest {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new CamelListRoute());
context.start();
Thread.sleep(30000);
context.stop();
}
}
class CamelListRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
//Route1, expected to run once
from("timer://timerName?repeatCount=1").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<String> inOrderList = new ArrayList<String>();
inOrderList.add("1");
inOrderList.add("2");
exchange.getIn().setBody(inOrderList, ArrayList.class);
}
})
.to("kafka:<ip>:9092?topic=queue1");
//Route2
from("kafka:<ip>:9092?topic=queue1&groupId=testing&autoOffsetReset=latest&consumersCount=1")
.split()
.body().process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("2nd Route : " + (exchange.getIn().getBody().toString()));
}
})
.to("kafka:<ip>:9092?topic=queue2");
//Route3
from("kafka:<ip>:9092?topic=queue2&groupId=testing&autoOffsetReset=latest&consumersCount=1")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("3rd Route : " + (exchange.getIn().getBody().toString()));
}
});
}
}
它不按預期工作,並有觀察到幾個問題:
- 第一條路線,預計將只運行一次(repeatCount = 1),連續運行,一次又一次地將相同的消息放入
queue1
。 - 第二條路線讀取
queue1
消息,將其分解,但不把它放在queue2
- 由於第二路由沒有放任何東西
queue2
,這條路線沒有得到任何消息。
誰能幫我弄清楚這裏有什麼問題嗎?
是的,我正確地使用了網址。使用exchange.getOut()。setBody()將數據傳遞到下一個路由後,問題得到解決。 – rvd
Ohk,我想你可以發表答案。這對其他人會有幫助。謝謝。 –