2010-10-24 57 views
2

我有一個迭代器(實際上是一個Source.getLines),它從URL中讀取無限數據流。當有連接問題時,偶爾迭代器會拋出java.io.IOException。在這種情況下,我需要重新連接並重新啓動迭代器。我希望這是無縫的,以便迭代器看起來像一個正常的消費者迭代器,但在下面根據需要重新啓動。重新啓動迭代器在Scala中的異常

例如,我想看到以下行爲:

scala> val iter = restartingIterator(() => new Iterator[Int]{ 
    var i = -1 
    def hasNext = { 
    if (this.i < 3) { 
     true 
    } else { 
     throw new IOException 
    } 
    } 
    def next = { 
    this.i += 1 
    i 
    } 
}) 
res0: ... 

scala> iter.take(6).toList 
res1: List[Int] = List(0, 1, 2, 3, 0, 1) 

我有一個部分解決了這個問題,但它會在某個角落的情況下失敗(例如IOException異常後的第一項重新啓動),它是相當難看:

def restartingIterator[T](getIter:() => Iterator[T]) = new Iterator[T] { 
    var iter = getIter() 
    def hasNext = { 
    try { 
     iter.hasNext 
    } catch { 
     case e: IOException => { 
     this.iter = getIter() 
     iter.hasNext 
     } 
    } 
    } 
    def next = { 
    try { 
     iter.next 
    } catch { 
     case e: IOException => { 
     this.iter = getIter() 
     iter.next 
     } 
    } 
    } 
} 

我一直感覺像有一個更好的解決了這個,也許一些Iterator.continuallyutil.control.Exception或類似的東西的組合,但我想不出一個出來。有任何想法嗎?

+0

我添加了一個解決方案'持續'和'util.control.Exception'到我原來的答案。 – huynhjl 2010-11-20 12:07:58

回答

4

這是相當接近的版本,並使用scala.util.control.Exception

def restartingIterator[T](getIter:() => Iterator[T]) = new Iterator[T] { 
    import util.control.Exception.allCatch 
    private[this] var i = getIter() 
    private[this] def replace() = i = getIter() 
    def hasNext: Boolean = allCatch.opt(i.hasNext).getOrElse{replace(); hasNext} 
    def next(): T = allCatch.opt(i.next).getOrElse{replace(); next} 
} 

出於某種原因,這並不是尾遞歸,但它可以通過使用一個稍微詳細的版本是固定的:

def restartingIterator2[T](getIter:() => Iterator[T]) = new Iterator[T] { 
    import util.control.Exception.allCatch 
    private[this] var i = getIter() 
    private[this] def replace() = i = getIter() 
    @annotation.tailrec def hasNext: Boolean = { 
    val v = allCatch.opt(i.hasNext) 
    if (v.isDefined) v.get else {replace(); hasNext} 
    } 
    @annotation.tailrec def next(): T = { 
    val v = allCatch.opt(i.next) 
    if (v.isDefined) v.get else {replace(); next} 
    } 
} 

編輯:有一個解決方案,與util.control.ExceptionIterator.continually

def restartingIterator[T](getIter:() => Iterator[T]) = { 
    import util.control.Exception.allCatch 
    var iter = getIter() 
    def f: T = allCatch.opt(iter.next).getOrElse{iter = getIter(); f} 
    Iterator.continually { f } 
} 
+0

是的,使它遞歸解決了我有點擔心的角落案例。我想我可以通過將我的解決方案中的第二個「iter.hasNext」和「iter.next」更改爲「this.hasNext」和「this.next」並添加talrec註釋,獲得幾乎相同的行爲。 我很希望有一個更簡單的解決方案的基礎上,但不知何故。 – Steve 2010-10-25 07:19:32

+0

非常酷。這正是我期待的那種,謝謝! – Steve 2010-11-21 15:04:38

+0

@ huynhji-我有點困惑的片段if(v.isDefined)v.get else {replace(); next}和if(v.isDefined)v.get else {replace(); hasNext}。如果出現異常,這兩行不要將迭代器重置爲開始。我試圖理解它將如何跳過拋出異常的部分,並轉移到它正在迭代的源的下一個元素? – 2013-06-09 21:02:23

2

有一個更好的解決方案中,Iteratee:

http://apocalisp.wordpress.com/2010/10/17/scalaz-tutorial-enumeration-based-io-with-iteratees/

這裏是例如其上遇到的異常重新啓動的枚舉器。

def enumReader[A](r: => BufferedReader, it: IterV[String, A]): IO[IterV[String, A]] = { 
    val tmpReader = r 
    def loop: IterV[String, A] => IO[IterV[String, A]] = { 
    case [email protected](_, _) => IO { i } 
    case Cont(k) => for { 
     s <- IO { try { val x = tmpReader.readLine; IO(x) } 
       catch { case e => enumReader(r, it) }}.join 
     a <- if (s == null) k(EOF) else loop(k(El(s))) 
    } yield a 
    } 
    loop(it) 
} 

內循環推進了Iteratee,但外函數仍保留原來的。由於Iteratee是一個持久的數據結構,要重新啓動,您只需再次調用該函數即可。

我在這裏通過讀卡器的名稱,以便r本質上是一個功能,爲您提供一個全新的(重新啓動)讀卡器。在實踐中,您會希望更有效地將其括起來(關閉現有讀者的例外情況)。

+0

有趣的文章,但它並沒有真正談論處理異常。你能詳細說明你將如何使用scalaz迭代器來處理我的問題嗎? – Steve 2010-10-25 07:11:56

+0

我盯着這個15分鐘,但我仍然無法把頭圍住它。我認爲這對我來說可能不好,即使/當我知道這些代碼時...... – Steve 2010-10-25 13:53:13

+1

這篇文章解釋了它。代碼基本上是這樣說的:要將Reader從Reader中提供給Iteratee,請檢查它是否接受了輸入。如果是,請將其退回。如果需要更多的輸入,它將有一個函數'k'來接受輸入。從閱讀器讀取一行並將其分配給's'。如果我們得到異常,請重新啓動整個枚舉。如果我們得到一個空行,向Iteratee發信號表示我們已經到達EOF。否則,將's'輸入'k'並循環。 – Apocalisp 2010-10-25 17:56:55

1

這裏是不起作用的答案,但感覺像它應該:

def restartingIterator[T](getIter:() => Iterator[T]): Iterator[T] = { 
    new Traversable[T] { 
    def foreach[U](f: T => U): Unit = { 
     try { 
     for (item <- getIter()) { 
      f(item) 
     } 
     } catch { 
     case e: IOException => this.foreach(f) 
     } 
    } 
    }.toIterator 
} 

我覺得這很清楚地描述了控制流,這是偉大的。

該代碼將引發Scala中的一個StackOverflowError 2.8.0因爲bug in Traversable.toStream的,但即使是修復該錯誤後,該代碼仍然不會給我使用的情況下工作,因爲toIterator電話toStream,這意味着它會將所有項目存儲在內存中。

我希望能夠通過編寫foreach方法來定義Iterator,但似乎沒有任何簡單的方法可以做到這一點。