2017-10-19 73 views
0

Producer.send回調提供了一個消息對象。 message.offset()通常在看起來是一個錯誤時返回0。confluent-python kafka生產者發送回調message.offset()返回0

這是用的: confluent-kafka python庫版本0.11.0 librdkafka:stable 0.11.0(瓶裝),HEAD。通過的Mac OS自制

下面簡單的測試程序安裝:

import confluent_kafka 
import timeit 


def delivery_callback(error, message): 
    print("delivery_callback. error={}. message={}".format(error, message)) 
    print("message.topic={}".format(message.topic())) 
    print("message.timestamp={}".format(message.timestamp())) 
    print("message.key={}".format(message.key())) 
    print("message.value={}".format(message.value())) 
    print("message.partition={}".format(message.partition())) 
    print("message.offset={}".format(message.offset())) 


def produce_string_messages(kafka_producer, topic_name, num_messages): 
    start_time = timeit.default_timer() 

    for i in range(num_messages): 
     kafka_producer.produce(topic_name, value="cf-k test. v{}".format(i), on_delivery=delivery_callback) 

    elapsed = timeit.default_timer() - start_time 
    print("completed producing messages. They are queued for delivery. elapsed={}. elapsed/msg={}".format(elapsed, elapsed/num_messages)) 


if __name__ == "__main__": 
    print("starting") 

    conf = { 
     'bootstrap.servers': "kafka-broker-1:9092" 
    } 

    kafka_producer = confluent_kafka.Producer(conf) 

    print("opened KafkaProducer") 
    produce_string_messages(kafka_producer, "my-string-topic", 3) 

    print("flushing...") 
    kafka_producer.flush() 

    print("exiting") 

生產:

starting 
opened KafkaProducer 
completed producing messages. They are queued for delivery. elapsed=0.000994920730591. elapsed/msg=0.00033164024353 
flushing... 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v0 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v1 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v2 
message.partition=0 
message.offset=24 
exiting 

注意message.offset()是用於第一兩個消息和用於第三非零爲零。如果我再次運行這個測試程序,通過3發送三封郵件,第三message.offset增量這看起來只是其中message.offset()常常錯誤地返回0。

回答

0

出於性能方面的錯誤[1]方面的原因,遞送報告只提供了一個有效的偏移對於生產批次中的最後一條消息。這是可以改變的,以提供該批次的所有郵件適當的偏移由produce.offset.report主題級的配置屬性設置爲true,像這樣:

p = confluent_kafka.Producer({'bootstrap.servers': ..., 
           'default.topic.config': { 'produce.offset.report': True } }) 

我們將更改默認爲True在未來釋放Python客戶端。

[1]:它避免了批量消息的線性掃描,但是對Python性能的影響是微乎其微的,無關緊要。

+0

完美。謝謝!作爲主要作者,您是否可以回覆此問題:https://stackoverflow.com/questions/44732214/apt-get-install-librdkafka1-fails-on-debian-9-x-due-to-libssl-dependency – clay