2017-09-25 193 views
4

我有2周卡夫卡的話題 - recommendationsclicks。第一個主題具有由唯一ID(稱爲recommendationsId)鍵入的推薦對象。每個產品都有一個用戶可以點擊的URL。卡夫卡流加入

clicks主題得到由上向用戶推薦產品的那些的URL的點擊生成的消息。它已經這樣設置,這些點擊消息也被recommendationId鍵入。

注意,建議和點擊之間

  1. 關係是一對多。建議可能會導致多次點擊,但點擊總是與單個建議相關聯。

  2. 每個點擊對象都會有相應的推薦對象。

  3. 點擊對象有一個時間戳晚於建議反對。

  4. 的建議和相應的點擊(S)之間的差距可能是幾秒鐘至幾天(比如說,7天最多)。

我的目標是使用卡夫卡流加入來加入這兩個主題。我不清楚的是我應該使用KStream x KStream加入還是KStream x KTable加入。

我實施了KStream x KTable加入clicks流通過recommendations表。但是,如果建議在生成之前之前生成了連接器,並且在連接器啓動之後點擊到達,我無法看到任何加入的點擊 - 建議對。

我使用正確的連接嗎?我應該使用KStream x KStream加入嗎?如果是這樣,爲了能夠在過去至多7天內加入推薦的點擊,我應該將窗口大小設置爲7天嗎?在這種情況下,我是否還需要設置「保留期」?

我的代碼來執行KStream x KTable加入如下。請注意,我已經定義了類RecommendationsClick及其相應的serde。點擊消息只是簡單的String(url)。此URL字符串與Recommendations對象合併,以創建發送到jointTopicClick對象。

public static void main(String[] args){ 
    if(args.length!=4){ 
     throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic"); 
    } 

    final String booststrapList = args[0]; 
    final String clicksTopic = args[1]; 
    final String recsTopic = args[2]; 
    final String jointTopic = args[3]; 

    Properties config = new Properties(); 
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id"); 
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList); 
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName()); 

    KStreamBuilder builder = new KStreamBuilder(); 

    // load clicks as KStream 
    KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic); 

    // load recommendations as KTable 
    KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic); 

    // join the two 
    KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs)); 

    // emit the join to the jointTopic 
    join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic); 

    // let the action begin 
    KafkaStreams streams = new KafkaStreams(builder, config); 
    streams.start(); 
    } 

這工作得很好,只要雙方的建議和點擊的夾板(以上程序)後,已產生運行。但是,如果點擊到達,建議生成之前運行了連接器,我看不到任何連接發生。我該如何解決?

如果解決方案是使用KStream x KSTream加入,那麼請幫助我瞭解什麼窗口大小,我應該選擇和選擇什麼樣的保留期限。

+2

這可能有幫助:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/ –

回答

5

您的整體觀察結果是正確的。從概念上講,您可以通過兩種方式獲得正確的結果。如果您使用的流表時,你有

  • 您已經提到的兩個缺點(這可能會重新審視和卡夫卡的未來版本雖然改進),如果點擊獲取的處理相應的建議前,(inner- )加入將失敗。然而,正如你知道會有推薦,你可以使用左連接而不是內連接,檢查連接結果,並且如果推薦是null(例如,你得到了一個重試邏輯) - 或者當然,單次推薦的連續點擊可能會出現亂碼,您可能需要在應用程序代碼中對此進行解釋。
  • KTable的第二個缺點是,它會隨着時間的推移而永遠長大,無限制,因爲您將爲其添加越來越多的獨特建議。因此,您需要通過向建議主題發送<recommendationsId, null>表格的墓碑記錄來實施某些「到期邏輯」,以刪除您不再關心的舊建議。
  • 這種方法的優點是,與流式流連接相比,您將需要更少的內存/磁盤空間,因爲您只需要緩存應用程序中的所有建議(但無需點擊)。

如果您使用流式流連接,並且在推薦後7天內可能發生點擊,則窗口大小必須爲7天 - 否則,點擊將不會與建議一起使用。

  • 這種方法的缺點是,您將需要更多的內存/磁盤,因爲您將在應用程序中緩存所有點擊和最近7天的所有建議。
  • 優點是,訂單或處理(即推薦與點擊)無關緊要(即,您不需要執行上述的重試策略)
  • 此外,舊建議將自動更新因此你不需要實現特殊的「過期邏輯」。

對於流式流連接,保留時間答案有點不同。由於窗口大小爲7天,它必須至少7天。否則,你會刪除你的「運行窗口」的記錄。您還可以設置更長的保留期限,以便能夠處理「晚期數據」。假設用戶在窗口時間範圍的結尾處點擊(建議的7天時間跨度前5分鐘結束),但點擊僅在1小時後報告給您的應用程序。如果您的保留期限爲7天作爲您的窗口大小,則此遲到記錄將無法再處理(因爲建議已被刪除)。如果您設置了較長的保留期,例如8天,您仍然可以處理延遲記錄。它取決於你的應用程序/語義需求你想使用什麼保留時間。

摘要: 從實現的角度來看,使用stream-stream join比使用stream-table join更簡單。不過,預計內存/磁盤節省量可能會很大,具體取決於您的點擊流數據速率。

+0

感謝您的解釋(和偉大的博客BTW!)。我有一個後續問題。假設我實現了'KStream x KStream' inner-join,運行這個木匠的機器是否會在過去7天內下載並保存所有*建議和點擊消息(對於相應的分區,假設機器數量=分區)?這聽起來像很多物理內存。 有沒有辦法擴展它(比如有兩倍的機器數量作爲分區數量)? – Nik

+2

它需要保存最近7天的所有數據,但不包含在內存中。我們在內部使用RocksDB,可能會泄露到磁盤。所以你可以保存比主存更大的狀態。 - 關於縮放:您不能擁有比分區更多的實例。如果你需要更高的並行性來處理,你需要有更多的分區 - 一種方法是創建一個包含所需分區數量的主題,並在執行之前調用'through()'來重新分配輸入數據加入。由於這個新主題僅用於縮放,如果可以有較短的保留時間(如1小時?)。 –