2011-10-16 110 views
3

我在Scala的collection.mutable.PriorityQueue上看到一些奇怪的行爲。我正在執行外部排序並使用1M記錄進行測試。每次運行測試並驗證10-20條記錄之間的結果都沒有正確排序。我用java.util.PriorityQueue替換了scala PriorityQueue實現,並且它在100%的時間內工作。有任何想法嗎?scala優先隊列排序不正確?

下面是代碼(對不起有點長...)。我測試了使用工具gensort -a 1000000valsorthttp://sortbenchmark.org/

def externalSort(inFileName: String, outFileName: String) 
    (implicit ord: Ordering[String]): Int = { 

    val MaxTempFiles = 1024 
    val TempBufferSize = 4096 

    val inFile = new java.io.File(inFileName) 

    /** Partitions input file and sorts each partition */ 
    def partitionAndSort()(implicit ord: Ordering[String]): 
     List[java.io.File] = { 

    /** Gets block size to use */ 
    def getBlockSize: Long = { 
     var blockSize = inFile.length/MaxTempFiles 
     val freeMem = Runtime.getRuntime().freeMemory() 
     if (blockSize < freeMem/2) 
     blockSize = freeMem/2 
     else if (blockSize >= freeMem) 
     System.err.println("Not enough free memory to use external sort.") 
     blockSize 
    } 

    /** Sorts and writes data to temp files */ 
    def writeSorted(buf: List[String]): java.io.File = { 
     // Create new temp buffer 
     val tmp = java.io.File.createTempFile("external", "sort") 
     tmp.deleteOnExit() 

     // Sort buffer and write it out to tmp file 
     val out = new java.io.PrintWriter(tmp) 
     try { 
     for (l <- buf.sorted) { 
      out.println(l) 
     } 
     } finally { 
     out.close() 
     } 

     tmp 
    } 

    val blockSize = getBlockSize 
    var tmpFiles = List[java.io.File]() 
    var buf = List[String]() 
    var currentSize = 0 

    // Read input and divide into blocks 
    for (line <- io.Source.fromFile(inFile).getLines()) { 
     if (currentSize > blockSize) { 
     tmpFiles ::= writeSorted(buf) 
     buf = List[String]() 
     currentSize = 0 
     } 
     buf ::= line 
     currentSize += line.length() * 2 // 2 bytes per char 
    } 
    if (currentSize > 0) tmpFiles ::= writeSorted(buf) 

    tmpFiles 
    } 

    /** Merges results of sorted partitions into one output file */ 
    def mergeSortedFiles(fs: List[java.io.File]) 
     (implicit ord: Ordering[String]): Int = { 

    /** Temp file buffer for reading lines */ 
    class TempFileBuffer(val file: java.io.File) { 

     private val in = new java.io.BufferedReader(
     new java.io.FileReader(file), TempBufferSize) 
     private var curLine: String = "" 

     readNextLine() // prep first value 

     def currentLine = curLine 

     def isEmpty = curLine == null 

     def readNextLine() { 
     if (curLine == null) return 

     try { 
      curLine = in.readLine() 
     } catch { 
      case _: java.io.EOFException => curLine = null 
     } 

     if (curLine == null) in.close() 
     } 

     override protected def finalize() { 
     try { 
      in.close() 
     } finally { 
      super.finalize() 
     } 
     } 
    } 

    val wrappedOrd = new Ordering[TempFileBuffer] { 
     def compare(o1: TempFileBuffer, o2: TempFileBuffer): Int = { 
     ord.compare(o1.currentLine, o2.currentLine) 
     } 
    } 

    val pq = new collection.mutable.PriorityQueue[TempFileBuffer](
    )(wrappedOrd) 

    // Init queue with item from each file 
    for (tmp <- fs) { 
     val buf = new TempFileBuffer(tmp) 
     if (!buf.isEmpty) pq += buf 
    } 

    var count = 0 

    val out = new java.io.PrintWriter(new java.io.File(outFileName)) 
    try { 
     // Read each value off of queue 
     while (pq.size > 0) { 
     val buf = pq.dequeue() 
     out.println(buf.currentLine) 
     count += 1 
     buf.readNextLine() 
     if (buf.isEmpty) { 
      buf.file.delete() // don't need anymore 
     } else { 
      // re-add to priority queue so we can process next line 
      pq += buf 
     } 
     } 
    } finally { 
     out.close() 
    } 

    count 
    } 

    mergeSortedFiles(partitionAndSort()) 
} 
+0

什麼是Scala版本? –

+0

Scala版本是2.9.0.1 – Mike

回答

3

我的測試中沒有表現出任何的PriorityQueue錯誤。

import org.scalacheck._ 
import Prop._ 

object PriorityQueueProperties extends Properties("PriorityQueue") { 
    def listToPQ(l: List[String]): PriorityQueue[String] = { 
    val pq = new PriorityQueue[String] 
    l foreach (pq +=) 
    pq 
    } 
    def pqToList(pq: PriorityQueue[String]): List[String] = 
    if (pq.isEmpty) Nil 
    else { val h = pq.dequeue; h :: pqToList(pq) } 

    property("Enqueued elements are dequeued in reverse order") = 
    forAll { (l: List[String]) => l.sorted == pqToList(listToPQ(l)).reverse } 

    property("Adding/removing elements doesn't break sorting") = 
    forAll { (l: List[String], s: String) => 
     (l.size > 0) ==> 
     ((s :: l.sorted.init).sorted == { 
     val pq = listToPQ(l) 
     pq.dequeue 
     pq += s 
     pqToList(pq).reverse 
     }) 
    } 
} 

scala> PriorityQueueProperties.check 
+ PriorityQueue.Enqueued elements are dequeued in reverse order: OK, passed 
    100 tests. 
+ PriorityQueue.Adding/removing elements doesn't break sorting: OK, passed 
    100 tests. 

如果你可以以某種方式減少輸入足夠做一個測試用例,這將有所幫助。

+0

這就是問題所在,它不會在小輸入時出現。第一個無序記錄最遲可以在600K時開始進行測試。 – Mike

1

我跑了500萬輸入幾次,輸出匹配預期總是。從查看你的代碼我的猜測是你的訂購是問題(即它給出了不一致的答案。)