2017-08-30 77 views
1
得到錯誤的Pyspark通信

我剛使用從遠程PC通過RPC.For測試目的pyspark通信的RabbitMQ我已經開發了是給我的錯誤 enter image description here通過RPC中的RabbitMQ

我跟了一個測試代碼超過pyspark

這裏實現RPC RabbitMQ的文檔教程是我的火花RPC服務器代碼

import pika 
from tkinter import* 
from pyspark.sql import SparkSession 
from pyspark import SparkConf,SparkContext 
import json 
import re 



connectionparam=pika.ConnectionParameters(host="localhost") 
connection=pika.BlockingConnection(connectionparam) 

channel=connection.channel() 

channel.queue_declare(queue='rpc_queue') 







spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\ 
    \ 
    .appName("TestApp").\ 
    enableHiveSupport().getOrCreate() 

print("success") 
#establishhing chraracter 
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID" 



def queryBuilder(sqlval): 
    print("printing",sqlval) 
    df=spark.sql(sqlval) 
    print("printing data frame table") 
    df.show() 

    resultlist = df.toJSON().collect() 
    dumpdata = re.sub(r"\'", "", str(resultlist)) 
    jsondata = json.dumps(dumpdata) 
    #print(jsondata) 
    return jsondata 


def on_request(ch,method,props, body): 
    n=body 
    print("printing request body ",n) 
    response=queryBuilder(n) 
    ch.basic_publish(exchange='', 
        routing_key=props.reply_to, 
        properties=pika.BasicProperties(correlation_id=props.correlation_id), 
        body=response 
        ) 
    ch.basic_ack(delivery_tag=method.delivery_tag) 


channel.basic_qos(prefetch_count=1) 
channel.basic_consume(on_request,queue='rpc_queue') 
print("[x] Awaiting RPC Request") 

channel.start_consuming() 

master=Tk() 
entryval=Entry(master) 
entryval.grid(row=0,column=1) 
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50) 
mainloop() 

和遠程pyspark applicati我下面的RPC客戶端代碼上是

import pika 
import uuid 

class SparkRpcClient(object): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters(
       host='localhost')) 

     self.channel = self.connection.channel() 

     result = self.channel.queue_declare(exclusive=True) 
     self.callback_queue = result.method.queue 

     self.channel.basic_consume(self.on_response, no_ack=True, 
            queue=self.callback_queue) 

    def on_response(self, ch, method, props, body): 
     if self.corr_id == props.correlation_id: 
      self.response = body 

    def call(self, querymsg): 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     self.channel.basic_publish(exchange='', 
            routing_key='rpc_queue', 
            properties=pika.BasicProperties(
             reply_to = self.callback_queue, 
             correlation_id = self.corr_id, 
             ), 
            body=querymsg) 
     while self.response is None: 
      self.connection.process_data_events() 
     return int(self.response) 

sparkrpc = SparkRpcClient() 
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID" 


print(" [x] Requesting query") 
response = sparkrpc.call(sqlstring) 
print(" [.] Got %s" % response) 

我的服務器已經從客戶端接收到的請求字符串,並打印出來,但它不能在我的作品querybuild()函數處理的SqlString並返回JSON數據。更多的我已經請求了多次,它似乎是個人請求排隊在rpc隊列,但沒有清除。因爲如果我只運行服務器腳本我得到相同的錯誤。可能是我在這裏失去了一些東西,任何人都可以幫我弄清楚。我只是想提前 格利揚

回答

1

你傳遞不兼容的類型(貌似無論bytesbytearray),其中str預計JSON數據返回給客戶端 感謝。

您應該首先將decode的內容串起來。

def queryBuilder(sqlval, enc): 
    ... 
    df = spark.sql(sqlval.decode(enc)) 
    df.show() 
    ... 
+0

謝謝你,我剛剛解碼字節流 – Kalyan