2017-04-17 58 views
0

我有下面的代碼,其中我使用可變列表緩衝區來存儲從kafka使用者接收的文件,然後當列表大小達到15時,我將它們插入到cassandra中。 但他們是否有任何方式使用不可變列表來做同樣的事情。編寫用於存儲數據的不可變代碼在listBuffer in scala

val filesList = ListBuffer[SystemTextFile]() 
    storeservSparkService.configFilesTopicInBatch.subscribe.atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile => 
    filesList += file 
    if (filesList.size == 15) { 
     storeServSystemRepository.config.insertFileInBatch(filesList.toList) 
     filesList.clear() 
    } 
    Future(Done) 
    }) 
+0

'storeServSystemRepository.config.insertFileInBatch'做了什麼?這是同步操作還是異步?什麼是方法簽名? –

+0

這將批量插入數據15,所以我們在這裏使用Lagom cassandra會話和批處理語句來插入數據。是的,它返回未來(完成)。 –

+0

如果'insertFileInBatch'返回'Future [Done]',那麼您應該從傳遞給'mapAsync'的塊返回未來,而不是創建一個新的獨立的未來。 –

回答

0

您是否試過使用Vector?

 val filesList = Vector[SystemTextFile]() 
     storeservSparkService.configFilesTopicInBatch.subscribe. 
      atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile => 
     filesList = filesList :+ file 
     if (filesList.length == 15) { 
      storeServSystemRepository.config.insertFileInBatch(filesList.toList) 
     } 
     Future(Done) 
    }) 
3

東西沿着這些線?

Flow[SystemTextFile].grouped(15).mapAsync(4){ files => 
    storeServSystemRepository.config.insertFileInBatch(files) 
} 
+0

當我添加分組關鍵字時,它不會消耗這些信息。 –

+0

此外,如果我嘗試與groupedWithin(15,持續時間(20,「秒」)),則它只消耗20秒中的1個文件,而沒有組關鍵字它消耗所有文件(70)以毫秒爲單位 –

+0

根據@ Piyush_Rana上面的註釋中,'insertFileInBatch'返回'Future [Done]',所以應該直接返回,而不是在它周圍包裝另一個'Future'。你想更新你的答案,還是應該寫一個新的答案? –

相關問題