我正在運行到Scala,Apache Spark世界,我想了解如何創建一個將根據我收到的事件生成DataFrame的「管道」。更新scala基於事件的DF
例如,這個想法是,當我收到一個特定的日誌/事件時,我不得不在DF中插入/更新一行。
讓我們來舉一個真實的例子。 我想創建一個DataFrame,它將代表我的數據庫中存在的用戶的狀態(postgres,mongo)。 當我說的狀態,我的意思是用戶的當前狀態(ACTIVE,INCOMPLETE,BLOCKED等)。這表示根據用戶活動進行更改,因此我將接收帶有「狀態」關鍵字(「ACTIVE」)的日誌(JSON)等。
因此,例如,我收到來自卡夫卡主題的日誌..在某些時候,我收到一個我感興趣的日誌,因爲它定義了關於用戶的有用信息(狀態等)。 我拿這個日誌,然後我創建一個包含此日誌的DF。 然後我收到第二個日誌,但這個是由同一個用戶執行的,所以這個行需要更新(如果狀態發生了變化!),所以沒有新的行,但更新現有的行。第三個日誌,新用戶,新信息在現有DF中作爲新行存儲..等等。 在這個過程/流水線結束時,我應該有一個DF,其中包含所有用戶的信息以及他們的"status"
,所以我可以說:「哦,看看那個,有43個用戶是blocked
和13個是active
!太棒了!「
這是想法..過程必須實時。
到目前爲止,我已經嘗試過使用不與kafka主題連接的文件。 舉例來說,我已經紅文件如下:
val DF = mysession.read.json("/FileStore/tables/bm2ube021498209258980/exampleLog_dp_api-fac53.json","/FileStore/tables/zed9y2s11498229410434/exampleLog_dp_api-fac53.json")
這generats一個DF 2行與裏面的一切。
+--------------------+-----------------+------+--------------------+-----+
| _id| _index|_score| _source|_type|
+--------------------+-----------------+------+--------------------+-----+
|AVzO9dqvoaL5S78GvkQU|dp_api-2017.06.22| 1|[2017-06-22T08:40...|DPAPI|
| AVzO9dq5S78GvkQU|dp_api-2017.06.22| 1|[null,null,[Wrapp...|DPAPI|
+--------------------+-----------------+------+--------------------+-----+
在
_source
有所有嵌套的事情(我提到的status
就在這裏!)。
然後我又選擇了像
DF.select("_id", "_source.request.user_ip","_source.request.aw", "_type").show(false)
+--------------------+------------+------------------------------------+-----+
|_id |user_ip |aw |_type|
+--------------------+------------+------------------------------------+-----+
|AVzO9dqvoaL5S78GvkQU|111.11.11.12|285d5034-dfd6-44ad-9fb7-ba06a516cdbf|DPAPI|
|AVzO9dq5S78GvkQU |111.11.11.82|null |DPAPI|
+--------------------+------------+------------------------------------+-----+
一些有用的信息,這個想法是創建該DF與卡夫卡的話題到達日誌和更新插入在日誌中這樣DF。 希望我解釋清楚,我不想要一個「代碼」解決方案我更喜歡暗示或示例如何實現此結果。 謝謝。