2011-09-12 64 views
3

假設我有Iterator[A]大小是無限的),並且我想從中得到Iterator[B]其中類型A的某些後續值被聚合。如何在Scala中編寫聚合模式?

例子: 我有一個字符串列表:

Iterator(
    "START", 
    "DATA1", 
    "DATA2", 
    "DATA3", 
    "START", 
    "DATA1", 
    "DATA2", 
    //.. 10^10 more records 
) 

我想從開始加入串下一個啓動排除。即編寫解析器。

Iterator(
"START DATA1 DATA2 DATA3", 
"START DATA1 DATA2", 
    //.. 10^10/5 more records 
) 

我知道如何做到這一點勢在必行,但我想用scala高階函數來完成它。有任何想法嗎?

PS EIP Aggregate http://camel.apache.org/aggregator2.html

回答

5

如果你想有一個實用的解決方案,你應該使用流而不是迭代器(流是不可變的)。這裏是一個可能的辦法:

def aggregate(strs: Stream[String]) = { 
    aggregateRec(strs) 
} 

def aggregateRec(strs: Stream[String]): Stream[String] = { 
    val tail = strs.drop(1) 
    if(tail.nonEmpty) { 
    val (str, rest) = accumulate(tail) 
    Stream.cons(str, aggregateRec(rest)) 
    } 
    else Stream.empty 
} 

def accumulate(strs: Stream[String]): (String, Stream[String]) = { 
    val first = "START " + strs.takeWhile(_ != "START").mkString(" ") 
    val rest = strs.dropWhile(_ != "START") 
    (first, rest) 
} 

它按預期工作:

val strs = Stream("START", "1", "2", "3", "START", "A", "B") 
val strs2 = aggregate(strs) 
strs2 foreach println 
+1

我有點困惑。如果迭代器的底層實現是Stream,那麼爲什麼它不夠?即爲什麼當我們只使用一個Iterator時,我們會明確地使用一個Stream,如果這個Iterator恰好在一個Stream上迭代,那麼好嗎? –

+1

因爲迭代器是可變的。調用next將改變迭代器的狀態。我只是想提供一個功能齊全的解決方案。 – paradigmatic

+0

啊,夠公平的。謝謝。我只是確保沒有更多的險惡我失蹤。 –

1

你可以用摺疊試試吧:

val ls = List(
    "START", 
    "DATA1", 
    "DATA2", 
    "DATA3", 
    "START", 
    "DATA1", 
    "DATA2" 
) 

(List[List[String]]() /: ls) { (acc, elem) => 
    if (elem == "START") 
    List(elem) :: acc // new head list 
    else 
    (elem :: acc.head) :: acc.tail // prepend to current head list 
} map (_.reverse mkString " ") reverse; 
+0

很酷,但我已經添加了限制,即LS是無限流/迭代器。所以摺疊不會在這裏工作 – yura

+0

在問題提出後,不要再添加更多限制... –

+0

對不起,我添加了它們,因爲它實際上是我的問題。我想爲非常大的日誌文件編寫解析器。我只是忘記了,我很清楚onlyю – yura

5

嗯,無限流變化的東西,而大幅提升。假設我理解你的處境的休息,這應該工作:

def aggregate(it: Iterator[String]) = new Iterator[String] { 
    if (it.hasNext) it.next 
    def hasNext = it.hasNext 
    def next = "START " + (it.takeWhile(_ != "START")).mkString(" ") 
} 

,讓您可以:

val i = aggregate(yourStream.iterator) 
i.take(20).foreach(println) // or whatever 
0

隨着流:

object Iter { 
    def main(args: Array[String]) { 
    val es = List("START", "DATA1", "DATA2", "START", "DATA1", "START") 
    val bit = batched(es.iterator, "START") 
    println(bit.head.toList) 
    println(bit.tail.head.toList) 
    } 

    def batched[T](it: Iterator[T], start: T) = { 
    def nextBatch(): Stream[List[T]] = { 
     (it takeWhile { _ != start }).toList match { 
     case Nil => nextBatch() 
     case es => Stream.cons(start :: es, nextBatch()) 
     } 
    } 
    nextBatch() 
    } 

}