0
我正在使用帶有Kafka活頁夾的Spring雲流。它工作得很好,但客戶端收到重複的消息。已經嘗試過所有卡夫卡消費者物業,但沒有結果。使用Spring雲流與Kafka重複處理郵件
在我的應用程序示例 - AggregateApplication和EventFilterApplication中檢查2個類。在我運行EventFilterApplication的情況下 - 只有1條消息,以防AggregateApplication - 2條相同的消息。
這裏是下面我的代碼:
1)聚合
import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
@SpringBootApplication
public class AggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
.from(EventFilterApplication.class)
.run(args);
}
}
2)EventFilterApplication
@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {
@Autowired
LiveProcessor source;
@StreamListener(LiveProcessor.INPUT)
public void handle(byte[] event) {
try {
System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));
} catch (Exception e) {
System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
}
}
public static void main(String[] args) {
SpringApplication.run(EventFilterApplication.class, args);
}
interface LiveProcessor extends Source {
String INPUT = "liveSource";
@Input(INPUT)
SubscribableChannel input();
}
}
3)application.yml
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka-broker.example.com:9092
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: kafka-zookeeper.example.com
type: kafka
bindings:
liveSource:
binder: kafka
consumer:
headerMode: raw
autoCommitOffset: true
destination: topic_example_name
4)的build.gradle
buildscript {
ext { springBootVersion = '1.4.2.RELEASE' }
repositories {
jcenter()
maven { url 'http://repo.spring.io/plugins-release' }
}
dependencies {
classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
}
}
ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'
subprojects {
apply plugin: 'java'
apply plugin: 'propdeps'
apply plugin: 'propdeps-idea'
apply plugin: "io.spring.dependency-management"
sourceCompatibility = 1.8
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
}
}
dependencies {
compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
exclude module: "spring-boot-starter-tomcat"
exclude group: 'log4j'
}
compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")
compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
exclude group: "org.slf4j"
}
compile("org.springframework.cloud:spring-cloud-stream:")
compile("org.springframework.cloud:spring-cloud-starter-sleuth")
compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")
testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
exclude group: "org.slf4j"
}
}
}
其實我需要聚合使用我的應用程序的幾個組件(與EventFilterApplication一起)。 該示例非常簡單,還有EventCheckerApplication和其他許多在AggregateApplication中進行了組裝的東西。 –
這很好,但只是不要'EventFilterApplication'作爲參數添加到'AggregateApplicationBuilder' –
非常感謝您的幫助,它的工作原理。它仍然理解爲什麼如此) –