我在Scala的collection.mutable.PriorityQueue
上看到一些奇怪的行爲。我正在執行外部排序並使用1M記錄進行測試。每次運行測試並驗證10-20條記錄之間的結果都沒有正確排序。我用java.util.PriorityQueue
替換了scala PriorityQueue
實現,並且它在100%的時間內工作。有任何想法嗎?scala優先隊列排序不正確?
下面是代碼(對不起有點長...)。我測試了使用工具gensort -a 1000000
和valsort
從http://sortbenchmark.org/
def externalSort(inFileName: String, outFileName: String)
(implicit ord: Ordering[String]): Int = {
val MaxTempFiles = 1024
val TempBufferSize = 4096
val inFile = new java.io.File(inFileName)
/** Partitions input file and sorts each partition */
def partitionAndSort()(implicit ord: Ordering[String]):
List[java.io.File] = {
/** Gets block size to use */
def getBlockSize: Long = {
var blockSize = inFile.length/MaxTempFiles
val freeMem = Runtime.getRuntime().freeMemory()
if (blockSize < freeMem/2)
blockSize = freeMem/2
else if (blockSize >= freeMem)
System.err.println("Not enough free memory to use external sort.")
blockSize
}
/** Sorts and writes data to temp files */
def writeSorted(buf: List[String]): java.io.File = {
// Create new temp buffer
val tmp = java.io.File.createTempFile("external", "sort")
tmp.deleteOnExit()
// Sort buffer and write it out to tmp file
val out = new java.io.PrintWriter(tmp)
try {
for (l <- buf.sorted) {
out.println(l)
}
} finally {
out.close()
}
tmp
}
val blockSize = getBlockSize
var tmpFiles = List[java.io.File]()
var buf = List[String]()
var currentSize = 0
// Read input and divide into blocks
for (line <- io.Source.fromFile(inFile).getLines()) {
if (currentSize > blockSize) {
tmpFiles ::= writeSorted(buf)
buf = List[String]()
currentSize = 0
}
buf ::= line
currentSize += line.length() * 2 // 2 bytes per char
}
if (currentSize > 0) tmpFiles ::= writeSorted(buf)
tmpFiles
}
/** Merges results of sorted partitions into one output file */
def mergeSortedFiles(fs: List[java.io.File])
(implicit ord: Ordering[String]): Int = {
/** Temp file buffer for reading lines */
class TempFileBuffer(val file: java.io.File) {
private val in = new java.io.BufferedReader(
new java.io.FileReader(file), TempBufferSize)
private var curLine: String = ""
readNextLine() // prep first value
def currentLine = curLine
def isEmpty = curLine == null
def readNextLine() {
if (curLine == null) return
try {
curLine = in.readLine()
} catch {
case _: java.io.EOFException => curLine = null
}
if (curLine == null) in.close()
}
override protected def finalize() {
try {
in.close()
} finally {
super.finalize()
}
}
}
val wrappedOrd = new Ordering[TempFileBuffer] {
def compare(o1: TempFileBuffer, o2: TempFileBuffer): Int = {
ord.compare(o1.currentLine, o2.currentLine)
}
}
val pq = new collection.mutable.PriorityQueue[TempFileBuffer](
)(wrappedOrd)
// Init queue with item from each file
for (tmp <- fs) {
val buf = new TempFileBuffer(tmp)
if (!buf.isEmpty) pq += buf
}
var count = 0
val out = new java.io.PrintWriter(new java.io.File(outFileName))
try {
// Read each value off of queue
while (pq.size > 0) {
val buf = pq.dequeue()
out.println(buf.currentLine)
count += 1
buf.readNextLine()
if (buf.isEmpty) {
buf.file.delete() // don't need anymore
} else {
// re-add to priority queue so we can process next line
pq += buf
}
}
} finally {
out.close()
}
count
}
mergeSortedFiles(partitionAndSort())
}
什麼是Scala版本? –
Scala版本是2.9.0.1 – Mike