2015-08-20 72 views
0

我有一個Spark Streaming應用程序正在處理一系列網站點擊事件。每個事件都有一個包含GUID的屬性,用於標識事件所屬的用戶會話。Spark Streaming的countByValueAndWindow如何工作?

我的應用程序向上計數發生爲每個會話的事件數,使用窗口:

def countEvents(kafkaStream: DStream[(String, Event)]): DStream[(String, Session)] = { 

    // Get a list of the session GUIDs from the events 
    val sessionGuids = kafkaStream 
    .map(_._2) 
    .map(_.getSessionGuid) 

    // Count up the GUIDs over our sliding window 
    val sessionGuidCountsInWindow = sessionGuids.countByValueAndWindow(Seconds(60), Seconds(1)) 

    // Create new session objects with the event count 
    sessionGuidCountsInWindow 
    .map({ 
     case (guidS, eventCount) => 
     guidS -> new Session().setGuid(guidS).setEventCount(eventCount) 
    }) 
} 

我的理解是,countByValueAndWindow功能只對在其被調用的函數在DSTREAM值。換句話說,在上面的代碼中,對countByValueAndWindow的調用應僅爲我們調用該函數的sessionGuids DStream中的會話GUID返回事件計數。

但是我正在觀察一些不同的東西;對countByValueAndWindow的調用將返回不在sessionGUID中的會話GUID的計數。它似乎是返回前幾批中處理的會話GUID的計數。我只是誤解了這個功能是如何工作的?我一直無法在網上找到有用的文檔。

回答

1

我的一位熟悉Spark的方法的同事比我幫助我做的更好。顯然,我誤解了countByValueAndWindow函數的工作方式。我認爲它只會返回您調用函數的DStream中的值的計數。但是,實際上,它會返回整個窗口中所有值的計數。爲了解決我的問題,我只需在輸入DStream和由countByValueAndWindow操作產生的DStream之間執行連接。因此,我只能得到輸入DStream中值的結果。