2016-05-30 70 views
5

我有一個Apache Spark羣集和一個RabbitMQ代理,我想使用pyspark.streaming模塊來使用消息並計算一些指標。如何使用Pyspark Streaming模塊實現RabbitMQ消費者?

的問題是我只找到this package,但在的Java斯卡拉實現。除此之外,我還沒有在Python中找到任何示例或橋接實現。

我有一個消費者實施使用Pika,但我不知道如何將有效載荷傳遞到我的StreamingContext

+0

嗯,我只是發現[Pyspark](http://spark.apache。org/docs/latest/api/python/pyspark.streaming.html#module-pyspark.streaming.mqtt)和[RabbitMQ](https://www.rabbitmq.com/mqtt.html)都說** MQTT協議* *。這可能是一個解決方案,但它們是一些折衷和限制 – jocerfranquiz

+2

在RabbitMQ集羣上使用MQTT協議意味着更改隊列配置。對我來說這不是一個解決方案。我找到了解決方法。一旦我完成我的測試,我會發佈一個解決方案 – jocerfranquiz

+0

嘿,有什麼進展?我現在面臨同樣的問題。就我而言,我甚至無法設置MQTT概念驗證。 – gwaramadze

回答

2

該解決方案使用pika asynchronous consumer examplesocketTextStream方法從星火流

  1. 下載的例子,並保存爲一個文件.py
  2. 修改使用自己的RabbitMQ的憑據和連接參數的文件。在我來說,我不得不修改Consumer
  3. if __name__ == '__main__':我們需要打開一個套接字對應於你的TCP連接HOSTPORT星火流。我們必須從插座保存方法sendall到一個變量它傳遞給Consumer

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 
        s.bind((HOST, PORT)) 
        s.listen(1) 
        conn, addr = s.accept() 
        dispatcher = conn.sendall #assigning sendall to dispatcher variable 
    consumer = Consumer(dispatcher) 
    try: 
        consumer.run() 
    except Exception as e: 
        consumer.stop() 
        s.close() 
    
  4. 修改消費者的__init__方法傳遞dispatcher

    def __init__(self,dispatcher): 
        self._connection = None 
        self._channel = None 
        self._closing = False 
        self._consumer_tag = None 
        self._url = amqp_url 
        #new code 
        self._dispatcher = dispatcher 
    
  5. 在該方法中on_message消費者中我們呼叫self._dispatcher發送AMQP消息的body

    def on_message(self, unused_channel, basic_deliver, properties, body): 
        self._channel.basic_ack(basic_deliver.delivery_tag) 
        try: 
        # we need an '\n' at the each row Spark socketTextStream 
        self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8")) 
        except Exception as e: 
        raise 
    
  6. 在Spark中,將ssc.socketTextStream(HOST, int(PORT))HOSTPORT對應我們的TCP套接字。星火將管理連接

  7. 運行第一消費者,然後星火應用

結束語:

  • 嘗試在不同的機器上運行你的消費者,而不是你的星火機器
  • 超過10000的任何端口應該沒問題。不要讓內核打開一些隨機端口
  • 平臺:Linux Debian的7和8,和Ubuntu 14.04和16.04
  • 鼠兔版本0.10.0
  • Python版本3.5.2
  • 星火1.6版。 1,1.6.2和2.0.0