2016-03-11 18 views
0

我有兩個流,我希望能夠根據每運行x秒的計算來使用一個流。scalaz-stream基於計算值消耗流量

我想我基本上需要創建第三tick流 - 類似every(3.seconds) - ,做計算,然後拿出某種其他兩個之間的切換。

我有點卡在這裏(我只是剛剛開始與scalaz-stream混在一起)。

謝謝!

回答

0

有幾種方法可以解決這個問題。一種方法是使用awakeEvery。有關具體示例,請參見here

要簡要描述示例,請考慮我們希望每5秒鐘查詢Twitter並獲取推文並執行情感分析。我們可以按如下撰寫這條管道:

val source = 
    awakeEvery(5 seconds) |> buildTwitterQuery(query) through queryChannel flatMap { 
     Process emitAll _ 
    } 

注意,queryChannel可以如下說明。

def statusTask(query: Query): Task[List[Status]] = Task { 
     twitterClient.search(query).getTweets.toList 
} 

val queryChannel: Channel[Task, Query, List[Status]] = channel lift statusTask 

讓我知道你是否有任何問題。如前所述,完整示例請參閱this

我希望它有幫助!