2
我在流上下文中使用Pyspark Dataframe API,我已經在我的火花流應用程序(我使用的是kafka接收器)中將RDD轉換爲DF foreach DStream,這是我已經在我的過程RDD函數來完成:從數據庫獲取每行數據DataFrame Pyspark
rowRdd = data_lined_parameters.map(
lambda x: Row(SYS=x[0], METRIC='temp', SEN=x[1], OCCURENCE=x[2], THRESHOLD_HIGH=x[3], OSH=x[4], OSM=x[5], OEH=x[6], OEM=x[7],OSD=x[8],OED=x[9],REMOVE_HOLIDAYS=x[10],TS=x[11],VALUE=x[12],DAY=x[13],WEEKDAY=x[14],HOLIDAY=x[15]))
rawDataDF = sqlContext.createDataFrame(rowRdd)
rawDataRequirementsCheckedDF = rawDataDF.filter("WEEKDAY <= OED AND WEEKDAY >=OSD AND HOLIDAY = false VALUE > THRESHOLD_HIGH ")
我的下一個步驟是從HBase的表豐富了我的rawDataRequirementsCheckedDF每一行與新的欄目,我的問題是,什麼是最有效的方式來獲得HBase的數據(鳳凰)並將其加入到我的原始數據幀中:
--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+
| DAY|HOLIDAY|METRIC|OCCURENCE|OED|OEH|OEM|OSD|OSH|OSM|REMOVE_HOLIDAYS|SEN| SYS|THRESHOLD_HIGH| TS| VALUE|WEEKDAY|
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+
|2017-08-03 00:00:...| false| temp| 3| 4| 19| 59| 0| 8| 0| TRUE| 1|0201| 26|2017-08-03 16:22:...|28.4375| 3|
|2017-08-03 00:00:...| false| temp| 3| 4| 19| 59| 0| 8| 0| TRUE| 1|0201| 26|2017-08-03 16:22:...|29.4375| 3|
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+
hbase表主鍵是DAY,SYS,SEN,因此它將導致具有相同格式的數據幀。
編輯:
這是我到目前爲止已經試過:
sysList = rawDataRequirementsCheckedDF.map(lambda x : "'"+x['SYS']+"'").collect()
df_sensor = sqlContext.read.format("jdbc").option("dbtable","(select DATE,SYSTEMUID,SENSORUID,OCCURENCE from ANOMALY where SYSTEMUID in ("+','.join(sysList)+"))").option("url", "jdbc:phoenix:clustdev1:2181:/hbase-unsecure").option("driver", "org.apache.phoenix.jdbc.PhoenixDriver").load()
df_anomaly = rawDataRequirementsCheckedDF.join(df_sensor, col("SYS") == col("SYSTEMUID"), 'outer')
謝謝您的回答,我可能不會一直在清楚我的問題,但我的問題是,我的sql請求中使用的參數是從我的rawDataRequirementsCheckedDF發出的,我需要從我的hbase表中獲得foreach SYS數據 – azelix
我試圖做類似於: sysList = rawDataRequirementsCheckedDF.map(lambda x:「'」+ x ['SYS' ] +「'」)。collect() df_sensor = sqlContext.read.format(「jdbc」)。選項(「dbtable」,「(從select中選擇OCCURENCE where SYSTEMUID in(」+','。join(sysList )+「))」)。option(「url」,「jdbc:phoenix:dev1:2181:/ hbase-unsecure」)。option(「driver」,「org.apache.phoenix.jdbc.PhoenixDriver」)。load () 但我不確定這是否是最好的方法 – azelix