2017-06-26 33 views
0

我正在運行到Scala,Apache Spark世界,我想了解如何創建一個將根據我收到的事件生成DataFrame的「管道」。更新scala基於事件的DF

例如,這個想法是,當我收到一個特定的日誌/事件時,我不得不在DF中插入/更新一行。

讓我們來舉一個真實的例子。 我想創建一個DataFrame,它將代表我的數據庫中存在的用戶的狀態(postgres,mongo)。 當我說的狀態,我的意思是用戶的當前狀態(ACTIVE,INCOMPLETE,BLOCKED等)。這表示根據用戶活動進行更改,因此我將接收帶有「狀態」關鍵字(「ACTIVE」)的日誌(JSON)等。

因此,例如,我收到來自卡夫卡主題的日誌..在某些時候,我收到一個我感興趣的日誌,因爲它定義了關於用戶的有用信息(狀態等)。 我拿這個日誌,然後我創建一個包含此日誌的DF。 然後我收到第二個日誌,但這個是由同一個用戶執行的,所以這個行需要更新(如果狀態發生了變化!),所以沒有新的行,但更新現有的行。第三個日誌,新用戶,新信息在現有DF中作爲新行存儲..等等。 在這個過程/流水線結束時,我應該有一個DF,其中包含所有用戶的信息以及他們的"status",所以我可以說:「哦,看看那個,有43個用戶是blocked和13個是active!太棒了!「

這是想法..過程必須實時。

到目前爲止,我已經嘗試過使用不與kafka主題連接的文件。 舉例來說,我已經紅文件如下:

val DF = mysession.read.json("/FileStore/tables/bm2ube021498209258980/exampleLog_dp_api-fac53.json","/FileStore/tables/zed9y2s11498229410434/exampleLog_dp_api-fac53.json") 

這generats一個DF 2行與裏面的一切。

+--------------------+-----------------+------+--------------------+-----+ 
|     _id|   _index|_score|    _source|_type| 
+--------------------+-----------------+------+--------------------+-----+ 
|AVzO9dqvoaL5S78GvkQU|dp_api-2017.06.22|  1|[2017-06-22T08:40...|DPAPI| 
| AVzO9dq5S78GvkQU|dp_api-2017.06.22|  1|[null,null,[Wrapp...|DPAPI| 
+--------------------+-----------------+------+--------------------+-----+ 
_source

有所有嵌套的事情(我提到的status就在這裏!)。

然後我又選擇了像

DF.select("_id", "_source.request.user_ip","_source.request.aw", "_type").show(false) 

+--------------------+------------+------------------------------------+-----+ 
|_id     |user_ip  |aw         |_type| 
+--------------------+------------+------------------------------------+-----+ 
|AVzO9dqvoaL5S78GvkQU|111.11.11.12|285d5034-dfd6-44ad-9fb7-ba06a516cdbf|DPAPI| 
|AVzO9dq5S78GvkQU |111.11.11.82|null        |DPAPI| 
+--------------------+------------+------------------------------------+-----+ 

一些有用的信息,這個想法是創建該DF與卡夫卡的話題到達日誌和更新插入在日誌中這樣DF。 希望我解釋清楚,我不想要一個「代碼」解決方案我更喜歡暗示或示例如何實現此結果。 謝謝。

回答

0

當您在尋找資源時,我會建議您閱讀以下內容: 查看Spark Streaming編程指南(https://spark.apache.org/docs/latest/streaming-programming-guide.html)和Spark Streaming + Kafka集成指南(https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html)。

使用Spark Streaming + Kafka集成指南瞭解如何使用Kafka內容打開Stream。 然後查看Spark Streaming編程指南中「Sparks Streaming中的轉換」一章中可能對Spark Streaming執行的轉換 一旦您已經對流進行轉換,您就可以對其執行最終操作查看Spark Streaming Programming Guide中的「DStreams輸出操作」。我認爲.forEachRDD可能是你正在尋找的東西 - 因爲你可以爲流的每個元素執行一個操作(比如檢查某個關鍵字是否在你的字符串中,並且基於這個做一個數據庫調用)。