2012-11-23 62 views
0

我正在使用Scala從列專賣店Cassandra中讀取列。每列包含多個條目n,其中n可以在10和20之間。我們讀取一批條目,即每次1000條,並且必須從條目創建列;每個條目都附有一個我們可以用來分組的附加ID。Scala:在可變大小的批處理器上迭代器的功能替代

目前我們使用迭代器遍歷批次中的條目,並通過比較當前ID和以前的ID來確定我們是否在新列上,並且在完成之前我們會讀取很多批次。我們需要在每次批次迭代結束時存儲一個部分列,因爲列的其餘部分將在下一批中。我在下面放了一些僞代碼來演示我們目前使用的基本算法。

如何以功能的方式做到這一點?(如果n是恆定的,這將是一個簡單的問題,我們可以設置適當的批量大小。)

僞代碼:

val resultBuffer // collects all columns 
val columnBuffer // collects entries for current column 
var currentId // id of current column 

while(batchIterator.hasNext){ 
    val batch = batchIterator.getNext 
    val entryIterator = batch.entries.iterator 

    while(entryIterator.hasNext){ 
      val entry = entryIterator.next 
      if(entry.id != currentId) { 
       currentId = entry.id 
       resultBuffer += columnBuilder(columnBuffer) 
       columnBuffer.removeAll 
       columnBuffer += entry 
      } else { 
       columnBuffer += entry 
      } 
    } 
} 

回答

1

下面是一個使用滑動組更多的功能實現的草圖條目在入門迭代器:

val resultBuffer // collects all columns 

batchIterator.foreach(batch => { 
    val buffer = 
    batch.entries.sliding(2).foldLeft(new ColumnBuffer){(buffer, (curr, next)) => 
     if (curr.id != next.id) { 
     resultBuffer += columnBuilder(buffer :+ entry /* Append entry to buffer */) 
     new ColumnBuffer 
     } else 
     buffer += entry /* Return buffer with entry added */ 
    } 

    if (buffer.nonEmpty) resultBuffer += columnBuilder(buffer) 
} 

這裏,那就是「全球性」,因而必須是可變的唯一對象是resultBuffer。我們甚至可以通過將其作爲內部foldLeft中的另一個累加器並通過用另一個foldLeft代替外部foreach來擺脫該問題。

如果運行時效率對您的代碼至關重要,那麼您應該對各種可能的實現進行基準測試,以便在功能和性能之間找到一個良好的折衷。


EDIT 1:固定在草圖的誤差,即,存儲在buffer條目的最後一個序列中沒有添加到resultBuffer。該錯誤在OP的代碼中已經存在。


編輯2:(地址查克的評論)

curr將採取值entries(0)entries(entries.size() - 2),也就是最後一個元素將不會被處理。解決這個問題的一種方法是在迭代器中附加一個虛擬元素,例如,

(batch.entries ++ List(dummy)).sliding(2).foldLeft ... 

這是不是很好,更重要的是,它將無法​​正常工作batch.entries是空的,因爲sliding(2)然後產生單一窗口List(dummy)。另一種解決方案是將next包含在內部foldLeft的累加器中,並在foldLeft終止後對其進行處理。我沒有解決它,但看起來好像這會使解決方案更不吸引人。

+1

在你的代碼中'入口'是否對應於curr或next?我在滑動(2)時遇到的問題是我們在batch.entries上的迭代次數減少了1次,所以在第一次或最後一次迭代中,我們做了一些特殊的事情來確保處理所有條目,這很好避免。 – ChucK

+0

最可能的情況是,必要的解決方案是最快的。現在你說基準,你是否可以用較慢的代碼替換快速代碼(儘管它更好)。其實,如果重要性很重要,我會簡單地使用現有的強制性解決方案。 – ziggystar

+0

@ChucK查看我的編輯。 –