2012-05-21 30 views
2

我想要在Play 2中使用Iteratees來處理流式處理彗星的結果。我已經得到了處理從一個回調和枚舉從地圖創建一個枚舉。 我的問題是與Enumeratee.map,這需要一個函數,它接受一個純輸入並返回一個純輸出(例如doc中的字符串到INT轉換)。我想要做的是接受一個純粹的投入,並返回一個結果的承諾。 畢竟,一個枚舉器向枚舉器提供承諾,一個枚舉器將一個枚舉器轉換爲另一個枚舉器,所以應該有一種方法來創建映射到承諾的枚舉器。play2讓一個Enumeratee在另一個Promise中轉換一個Promise

現在讓我舉一個例子來說明這一點。假設我有一個http請求加入了一個要在我的數據庫中查詢的ID列表。假設這些id表示數據庫表中的行,並且請求在這些行上執行一組(長)計算,然後返回一組表示計算的json對象。 正如我一直堵的東西做,這將是冷靜,在一次流一個ID,所以我想有一個enumeratee管道,做:

  1. 查詢行中的數據庫(返回該行承諾)
  2. 使該行很長的計算(需要一個行並返回一個計算的承諾)
  3. 轉換長時間的計算,以JSON
  4. &>這一點通過播放所提供的彗星enumeratee 2

1很容易,我可以使用fromCallback構造一個枚舉器,它將返回查詢結果的承諾。 3也很容易,因爲它是一個簡單的Enumeratee.map

但我不能包裹我的頭圍繞如何實現第2步枚舉的applyOn。我可以理解,我有博建立一個新的迭代器從「內部」迭代器獲得承諾,flatMap計算並返回新的承諾。我沒有得到的是如何使這給予奇申applyOn簽名:def applyOn[A](it: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]]

有人可以幫我嗎?

感謝

+0

只是一個說明,在這個簡單的例子,我可以清楚地合併圖1和2以簡單的枚舉器,但真正的應用程序實際上有點複雜;) – Mortimer

回答

1

applyOn簽名更有意義,當你覺得enumeratee與像enumerator |>> (enumeratee &> iteratee)權的iteratee結合。迭代器的類型爲Iteratee[E, A],枚舉器期望Iteratee[Promise[E], Iteratee[E, A],以便可以提取內部迭代器。所以applyOn將不得不Iteratee[E, A]並返回Iteratee[Promise[E], Iteratee[E, A]

這裏是一個實現的概要。它定義了一個採用輸入並返回預期結果的步驟函數。然後遞歸地遍歷承諾元素。

import play.api.libs.concurrent._ 
import play.api.libs.iteratee._ 

def unpromise[E]: Enumeratee[Promise[E], E] = new Enumeratee[Promise[E], E] { 
    def applyOn[A](inner: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = { 
    def step(input: Input[Promise[E]], i: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = { 
     input match { 
     case Input.EOF => Done(i, Input.EOF) 
     case Input.Empty => Cont(step(_, i)) 
     case Input.El(pe) => 
      val pe2 = pe.map(e => i.feed(Input.El(e))).flatMap(identity) 
      val i2 = Iteratee.flatten(pe2) 
      i2.pureFlatFold(
      (a, e2) => Done(i2, Input.Empty), 
      k => Cont(step(_, i2)), 
      (msg, e2) => Done(i2, Input.Empty)) 
     } 
    } 
    // should check that inner is not done or error - skipped for clarity 
    Cont(step(_, inner)) 
    } 
} 

我丟棄e2,所以有可能是更多的代碼,以確保某些輸入不會丟失。

+0

好吧,它似乎是我的問題的解決方案:有一個地圖枚舉器產生的承諾,然後轉換爲unpromise並繼續像這樣。我將不得不研究這一點,因爲我對e2和i2仍然很迷惑。爲了檢查錯誤/完成,我想我可能能夠使用Enumeratee.CheckDone。謝謝。 – Mortimer

+1

@Mortimer,'pe'是'Promise [E]'。 'pe.map(e => i.feed(Input.El(e)))'應該是'Promise [Promise [Iteratee [E,A]]]''。 'pe2'是'Promise [Iteratee [E,A]]'。 'i2'是'Iteratee [E,A]',是正在進行的計算的下一步。如果正在進行的計算已完成或者是錯誤,則'e2'是剩餘輸入。大多數情況下它會是'Input.Empty',但是如果輸入沒有被完全消耗,它可能是一個'E'類型的值(例如'peek')。 – huynhjl

+0

非常感謝,非常感謝@huynhjl – Mortimer

相關問題