我有下面的代碼,其中我使用可變列表緩衝區來存儲從kafka使用者接收的文件,然後當列表大小達到15時,我將它們插入到cassandra中。 但他們是否有任何方式使用不可變列表來做同樣的事情。編寫用於存儲數據的不可變代碼在listBuffer in scala
val filesList = ListBuffer[SystemTextFile]()
storeservSparkService.configFilesTopicInBatch.subscribe.atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile =>
filesList += file
if (filesList.size == 15) {
storeServSystemRepository.config.insertFileInBatch(filesList.toList)
filesList.clear()
}
Future(Done)
})
'storeServSystemRepository.config.insertFileInBatch'做了什麼?這是同步操作還是異步?什麼是方法簽名? –
這將批量插入數據15,所以我們在這裏使用Lagom cassandra會話和批處理語句來插入數據。是的,它返回未來(完成)。 –
如果'insertFileInBatch'返回'Future [Done]',那麼您應該從傳遞給'mapAsync'的塊返回未來,而不是創建一個新的獨立的未來。 –