我有一個Apache Spark羣集和一個RabbitMQ代理,我想使用pyspark.streaming
模塊來使用消息並計算一些指標。如何使用Pyspark Streaming模塊實現RabbitMQ消費者?
的問題是我只找到this package,但在的Java和斯卡拉實現。除此之外,我還沒有在Python中找到任何示例或橋接實現。
我有一個消費者實施使用Pika,但我不知道如何將有效載荷傳遞到我的StreamingContext
。
我有一個Apache Spark羣集和一個RabbitMQ代理,我想使用pyspark.streaming
模塊來使用消息並計算一些指標。如何使用Pyspark Streaming模塊實現RabbitMQ消費者?
的問題是我只找到this package,但在的Java和斯卡拉實現。除此之外,我還沒有在Python中找到任何示例或橋接實現。
我有一個消費者實施使用Pika,但我不知道如何將有效載荷傳遞到我的StreamingContext
。
該解決方案使用pika asynchronous consumer example和socketTextStream
方法從星火流
.py
Consumer
類在if __name__ == '__main__':
我們需要打開一個套接字對應於你的TCP連接HOST
和PORT
星火流。我們必須從插座保存方法sendall
到一個變量它傳遞給Consumer
類
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
dispatcher = conn.sendall #assigning sendall to dispatcher variable
consumer = Consumer(dispatcher)
try:
consumer.run()
except Exception as e:
consumer.stop()
s.close()
修改消費者的__init__
方法傳遞dispatcher
def __init__(self,dispatcher):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
#new code
self._dispatcher = dispatcher
在該方法中on_message
消費者中我們呼叫self._dispatcher
發送AMQP消息的body
def on_message(self, unused_channel, basic_deliver, properties, body):
self._channel.basic_ack(basic_deliver.delivery_tag)
try:
# we need an '\n' at the each row Spark socketTextStream
self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
except Exception as e:
raise
在Spark中,將ssc.socketTextStream(HOST, int(PORT))
與HOST
和PORT
對應我們的TCP套接字。星火將管理連接
運行第一消費者,然後星火應用
結束語:
嗯,我只是發現[Pyspark](http://spark.apache。org/docs/latest/api/python/pyspark.streaming.html#module-pyspark.streaming.mqtt)和[RabbitMQ](https://www.rabbitmq.com/mqtt.html)都說** MQTT協議* *。這可能是一個解決方案,但它們是一些折衷和限制 – jocerfranquiz
在RabbitMQ集羣上使用MQTT協議意味着更改隊列配置。對我來說這不是一個解決方案。我找到了解決方法。一旦我完成我的測試,我會發佈一個解決方案 – jocerfranquiz
嘿,有什麼進展?我現在面臨同樣的問題。就我而言,我甚至無法設置MQTT概念驗證。 – gwaramadze