2017-04-02 63 views
1

我正在嘗試使用Kotlin協同來處理非阻塞I/O。情況如下:Kotlin:使用非阻塞I/O阻塞協程

  1. 從螺紋2上運行的線程1
  2. 等待這個數據然後消費它異步回調接收數據。

我當前的代碼看起來像這樣(簡化爲簡潔起見):

private var latch = CountDownLatch(1) 
private var data: Any? = null 

// Async callback from non-blocking I/O 
fun onReceive(data: Any) { 
    currentData = data 
    latch.countDown() 
} 

// Wait and consume data 
fun getData(): Any? { 
    latch.await() 
    latch = CountDownLatch(1) 
    return currentData 
} 

fun processData() { 
    launch(CommonPool) { 
     while (true) { 
      val data = getData() 
      // Consume data     
     } 
    } 
} 

據我瞭解,科特林協同程序應該能夠幫助我擺脫CountDownLatch的。閱讀後,所有我能想出是這樣的:

// Wait and consume data 
fun getData() = async(CommonPool) { 
    latch.await() 
    latch = CountDownLatch(1) 
    currentData 
} 

fun processData() { 
    launch(CommonPool) { 
     while (true) { 
      runBlocking { 
       val data = getData().await() 
       // Consume data     
      } 
     } 
    } 
} 

我也試圖與Pipelines,具有相似的結果。我顯然不理解如何使用這些功能。

+0

從問題中的代碼很難理解你的目標是什麼。請說明什麼是外部函數。 – voddan

+0

在這種情況下,需要知道哪些API調用返回承諾以及哪些類型。請將此信息添加到問題 – voddan

+0

@voddan我的目標是等待來自非阻塞源的數據準備就緒並對其進行處理(實際源是用戶輸入,但我認爲這不相關,因爲它可能是不同的非阻塞源)。當數據準備好處理時,非阻塞源調用'onReceive()'。數據類型是不相關的,你可以認爲它最適合你的任何類型(在我的實際代碼中數據類型是一個自定義類)。 – m0skit0

回答

1

你沒有說過,如果在onReceive()收到的數據可以並行處理。這是主要問題。如果是的話,你可以在onReceive()。如果不允許,請讓onReceive()的每個電話在CommonPool上啓動一個任務,而不使用任何協程。如果他們應該按順序處理,那麼最簡單的方法是啓動一個線程裏面循環:

fun onReceive(data: Any) { 
    queue.put(data); 
} 

.... 

// loop in a thread 
while(true) { 
    data = queue.take(); 
    processData(data); 
} 

同樣,不需要協同程序。

通常,協程是句法糖,用來表示異步程序,就好像它是同步的。我不認爲你的程序是使用協程的情況。