1
得到錯誤的Pyspark通信
我剛使用從遠程PC通過RPC.For測試目的pyspark通信的RabbitMQ我已經開發了是給我的錯誤 通過RPC中的RabbitMQ
我跟了一個測試代碼超過pyspark
這裏實現RPC RabbitMQ的文檔教程是我的火花RPC服務器代碼
import pika
from tkinter import*
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
import json
import re
connectionparam=pika.ConnectionParameters(host="localhost")
connection=pika.BlockingConnection(connectionparam)
channel=connection.channel()
channel.queue_declare(queue='rpc_queue')
spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\
\
.appName("TestApp").\
enableHiveSupport().getOrCreate()
print("success")
#establishhing chraracter
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
def queryBuilder(sqlval):
print("printing",sqlval)
df=spark.sql(sqlval)
print("printing data frame table")
df.show()
resultlist = df.toJSON().collect()
dumpdata = re.sub(r"\'", "", str(resultlist))
jsondata = json.dumps(dumpdata)
#print(jsondata)
return jsondata
def on_request(ch,method,props, body):
n=body
print("printing request body ",n)
response=queryBuilder(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=response
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print("[x] Awaiting RPC Request")
channel.start_consuming()
master=Tk()
entryval=Entry(master)
entryval.grid(row=0,column=1)
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50)
mainloop()
和遠程pyspark applicati我下面的RPC客戶端代碼上是
import pika
import uuid
class SparkRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, querymsg):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=querymsg)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
sparkrpc = SparkRpcClient()
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
print(" [x] Requesting query")
response = sparkrpc.call(sqlstring)
print(" [.] Got %s" % response)
我的服務器已經從客戶端接收到的請求字符串,並打印出來,但它不能在我的作品querybuild()函數處理的SqlString並返回JSON數據。更多的我已經請求了多次,它似乎是個人請求排隊在rpc隊列,但沒有清除。因爲如果我只運行服務器腳本我得到相同的錯誤。可能是我在這裏失去了一些東西,任何人都可以幫我弄清楚。我只是想提前 格利揚
謝謝你,我剛剛解碼字節流 – Kalyan