2017-03-01 45 views
0

我正在使用帶有Kafka活頁夾的Spring雲流。它工作得很好,但客戶端收到重複的消息。已經嘗試過所有卡夫卡消費者物業,但沒有結果。使用Spring雲流與Kafka重複處理郵件

在我的應用程序示例 - AggregateApplication和EventFilterApplication中檢查2個類。在我運行EventFilterApplication的情況下 - 只有1條消息,以防AggregateApplication - 2條相同的消息。


這裏是下面我的代碼:

1)聚合

import com.example.EventFilterApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder; 

@SpringBootApplication 
public class AggregateApplication { 
    public static void main(String[] args) { 
     new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args) 
      .from(EventFilterApplication.class) 
      .run(args); 
    } 
} 

2)EventFilterApplication

@SpringBootApplication 
@EnableBinding(EventFilterApplication.LiveProcessor.class) 
public class EventFilterApplication { 

    @Autowired 
    LiveProcessor source; 

    @StreamListener(LiveProcessor.INPUT) 
    public void handle(byte[] event) { 
     try { 

      System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event)); 

     } catch (Exception e) { 
      System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event))); 
     } 
    } 
    public static void main(String[] args) { 
     SpringApplication.run(EventFilterApplication.class, args); 
    } 

    interface LiveProcessor extends Source { 

     String INPUT = "liveSource"; 

     @Input(INPUT) 
     SubscribableChannel input(); 
    } 
} 

3)application.yml

spring: 
cloud: 
    stream: 
     kafka: 
      binder: 
       brokers: kafka-broker.example.com:9092 
       defaultBrokerPort: 9092 
       defaultZkPort: 2181 
       zkNodes: kafka-zookeeper.example.com 
     type: kafka 
     bindings: 
      liveSource: 
       binder: kafka 
       consumer: 
        headerMode: raw 
        autoCommitOffset: true 
       destination: topic_example_name 

4)的build.gradle

buildscript { 
    ext { springBootVersion = '1.4.2.RELEASE' } 
    repositories { 
     jcenter() 
     maven { url 'http://repo.spring.io/plugins-release' } 
    } 
    dependencies { 
     classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7") 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion") 
     classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE") 
    } 
} 

ext['logstashLogbackEncoderV'] = '4.8' 
ext['springCloudV'] = 'Camden.SR1' 
ext['springCloudStreamV'] = 'Brooklyn.SR2' 
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE' 

subprojects { 
    apply plugin: 'java' 
    apply plugin: 'propdeps' 
    apply plugin: 'propdeps-idea' 
    apply plugin: "io.spring.dependency-management" 

    sourceCompatibility = 1.8 

    dependencyManagement { 
     imports { 
      mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1" 
      mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2" 
      mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE" 
     } 
    } 

    dependencies { 
     compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") { 
      exclude module: "spring-boot-starter-tomcat" 
      exclude group: 'log4j' 
     } 

     compile("org.springframework.cloud:spring-cloud-starter-stream-kafka") 

     compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") { 
      exclude group: "org.slf4j" 
     } 

     compile("org.springframework.cloud:spring-cloud-stream:") 

     compile("org.springframework.cloud:spring-cloud-starter-sleuth") 

     compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}") 

     testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") { 
      exclude group: "org.slf4j" 
     } 
    } 
} 

回答

0

該複製是由EventFilterApplication引起作爲母體根:

public class AggregateApplication { 
    public static void main(String[] args) { 
     new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args) 
      .from(EventFilterApplication.class) 
      .run(args); 
    } 
} 

這很可能會創建兩個訂閱。而不是增加EventFilterApplication爲根,你可以簡單地做:

public class AggregateApplication { 
    public static void main(String[] args) { 
     new AggregateApplicationBuilder(args) 
      .from(EventFilterApplication.class) 
      // rest of the pipeline 
      .run(args); 
    } 
} 

如果您不需要創建聚合,這已經足夠了:

public static void main(String[] args) { 
     SpringApplication.run(EventFilterApplication.class, args); 
} 

編輯:添加一個額外的例子和澄清回答。

+0

其實我需要聚合使用我的應用程序的幾個組件(與EventFilterApplication一起)。 該示例非常簡單,還有EventCheckerApplication和其他許多在AggregateApplication中進行了組裝的東西。 –

+0

這很好,但只是不要'EventFilterApplication'作爲參數添加到'AggregateApplicationBuilder' –

+0

非常感謝您的幫助,它的工作原理。它仍然理解爲什麼如此) –

相關問題