2017-08-29 26 views
1

我使用的是駱駝卡夫卡組件,我不清楚引擎蓋下發生的錯誤。如下所示,我正在彙總記錄,我認爲對於我的用例來說,只有在記錄保存到SFTP後提交偏移纔有意義。如何用camel-kafka手動控制偏移提交?

是否可以手動控制何時可以執行提交?

private static class MyRouteBuilder extends RouteBuilder { 

    @Override 
    public void configure() throws Exception { 

     from("kafka:{{mh.topic}}?" + getKafkaConfigString()) 
     .unmarshal().string() 
     .aggregate(constant(true), new MyAggregationStrategy()) 
      .completionSize(1000) 
      .completionTimeout(1000) 
     .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime()) 
     .to("sftp://" + getSftpConfigString()) 

     // how to commit offset only after saving messages to SFTP? 

     ; 
    } 

    private final class MyAggregationStrategy implements AggregationStrategy { 
     @Override 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
      if (oldExchange == null) { 
       return newExchange; 
      } 
      String oldBody = oldExchange.getIn().getBody(String.class); 
      String newBody = newExchange.getIn().getBody(String.class); 
      String body = oldBody + newBody; 
      oldExchange.getIn().setBody(body); 
      return oldExchange; 
     } 
    } 
} 

private static String getSftpConfigString() { 
     return "{{sftp.hostname}}/{{sftp.dir}}?" 
       + "username={{sftp.username}}" 
       + "&password={{sftp.password}}" 
       + "&tempPrefix=.temp." 
       + "&fileExist=Append" 
       ; 
} 

private static String getKafkaConfigString() { 
     return "brokers={{mh.brokers}}" 
      + "&saslMechanism={{mh.saslMechanism}}" 
      + "&securityProtocol={{mh.securityProtocol}}" 
      + "&sslProtocol={{mh.sslProtocol}}" 
      + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" 
      + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}" 
      + "&saslJaasConfig={{mh.saslJaasConfig}}" 
      + "&groupId={{mh.groupId}}" 
      ; 
} 

回答

2

你就是不行。 Kafka每隔X秒在後臺執行一次自動提交(您可以配置這個)。

camel-kafka沒有手動提交支持。由於聚合者與卡夫卡消費者及其執行提交的消費者分離,所以這也是不可能的。