kafka-python客戶端支持Kafka 0.9,但顯然不包含新的身份驗證和加密功能,所以我的猜測是它只適用於開放服務器(如以前的版本)。無論如何,即使Java客戶端需要一個特殊的消息中心登錄模塊來連接(或者從示例中看來),這表明除非有類似的Python模塊可用,否則什麼都不會起作用。我可以從Python調用Bluemix消息中心服務嗎?
我的具體情況是,我想使用也在Bluemix(Apache Spark服務)中託管的Jupyter筆記本中的消息中心服務。
kafka-python客戶端支持Kafka 0.9,但顯然不包含新的身份驗證和加密功能,所以我的猜測是它只適用於開放服務器(如以前的版本)。無論如何,即使Java客戶端需要一個特殊的消息中心登錄模塊來連接(或者從示例中看來),這表明除非有類似的Python模塊可用,否則什麼都不會起作用。我可以從Python調用Bluemix消息中心服務嗎?
我的具體情況是,我想使用也在Bluemix(Apache Spark服務)中託管的Jupyter筆記本中的消息中心服務。
在Bluemix Apache Spark服務本機支持此功能之前,您可以按照與Realtime Sentiment Analysis project相同的方法進行操作。幫助代碼可以在cds labs spark samples github repo上找到。
在卡夫卡Python客戶機的SASL支持已經請求:https://github.com/dpkp/kafka-python/issues/533但直到通過信息中心所使用的用戶名/密碼登錄方法的支持,它將無法工作
我們添加了一些文本文檔以外對非Java語言支持 - 見「連接和認證非Java應用程序」部分: https://www.ng.bluemix.net/docs/services/MessageHub/index.html
我們目前的驗證方法是非標準的,而不是由Apache項目的支持,但治標不治本。 Message Hub團隊正在與Apache Kafka社區合作開發KIP-43。一旦完成,我們將更改Message Hub認證實現以匹配,並且可以按照任何語言實現該規範的客戶端。
我能夠用卡夫卡的Python庫連接:
$ pip install --user kafka-python
則...
from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl
############################################
# Service credentials from Bluemix UI:
############################################
bootstrap_servers = # kafka_brokers_sasl
sasl_plain_username = # user
sasl_plain_password = # password
############################################
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'
# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
sasl_plain_username = sasl_plain_username,
sasl_plain_password = sasl_plain_password,
security_protocol = security_protocol,
ssl_context = context,
sasl_mechanism = sasl_mechanism,
api_version=(0,10))
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
這爲我工作從Bluemix火花作爲服務從jupyter筆記本,但是,請注意,這種方法不使用spark。代碼剛剛在驅動主機上運行。
該解決方案對我來說非常適合! –