我正在使用Scala從列專賣店Cassandra中讀取列。每列包含多個條目n,其中n可以在10和20之間。我們讀取一批條目,即每次1000條,並且必須從條目創建列;每個條目都附有一個我們可以用來分組的附加ID。Scala:在可變大小的批處理器上迭代器的功能替代
目前我們使用迭代器遍歷批次中的條目,並通過比較當前ID和以前的ID來確定我們是否在新列上,並且在完成之前我們會讀取很多批次。我們需要在每次批次迭代結束時存儲一個部分列,因爲列的其餘部分將在下一批中。我在下面放了一些僞代碼來演示我們目前使用的基本算法。
如何以功能的方式做到這一點?(如果n是恆定的,這將是一個簡單的問題,我們可以設置適當的批量大小。)
僞代碼:
val resultBuffer // collects all columns
val columnBuffer // collects entries for current column
var currentId // id of current column
while(batchIterator.hasNext){
val batch = batchIterator.getNext
val entryIterator = batch.entries.iterator
while(entryIterator.hasNext){
val entry = entryIterator.next
if(entry.id != currentId) {
currentId = entry.id
resultBuffer += columnBuilder(columnBuffer)
columnBuffer.removeAll
columnBuffer += entry
} else {
columnBuffer += entry
}
}
}
在你的代碼中'入口'是否對應於curr或next?我在滑動(2)時遇到的問題是我們在batch.entries上的迭代次數減少了1次,所以在第一次或最後一次迭代中,我們做了一些特殊的事情來確保處理所有條目,這很好避免。 – ChucK
最可能的情況是,必要的解決方案是最快的。現在你說基準,你是否可以用較慢的代碼替換快速代碼(儘管它更好)。其實,如果重要性很重要,我會簡單地使用現有的強制性解決方案。 – ziggystar
@ChucK查看我的編輯。 –