2017-04-10 127 views
0

我使用無卡夫卡創建如下的消費記錄Source如何使卡夫卡源重新連接時,卡夫卡重新啓動

val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer) 
.withBootstrapServers(bootstrapServers) 
.withGroupId(groupName) 
// what offset to begin with if there's no offset for this group 
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
// do we want to automatically commit offsets? 
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") 
// auto-commit offsets every 1 minute, in the background 
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") 
// reconnect every 1 second, when disconnected 
.withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000") 
// every session lasts 30 seconds 
.withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") 
// send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms 
.withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000") 
// how many records to fetch in each poll() 
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100") 

Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value) 

我有我的本地機器上運行卡夫卡1個實例。我通過控制檯生產者將值推入主題,並將其打印出來。然後我殺了卡夫卡,並重新啓動它來查看源是否重新連接。

這些都是我的日誌如何進行:

* Connection with /192.168.0.1 disconnected 
    java.net.ConnectException: Connection refused 
* Give up sending metadata request since no node is available 
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds 
* Resuming partition test-events-0 
* Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR} 
* Sending metadata request (type=MetadataRequest, topics=test-events) to node 0 
* Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null) 
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds 
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup 
* Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR} 
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup 
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available. 
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata 
* Initiating API versions fetch from node 2147483647 
* Offset commit for group mytestgroup failed: This is not the correct coordinator for this group. 
* Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup 
* The Kafka consumer has closed. 

如何確保這個源重新連接並繼續處理日誌?

回答

1

我認爲你需要至少有2個經紀人。如果一個人失敗了,另一個人可以完成這項工作,你可以重新啓動另一個人。