作爲計算機網絡軟件開發類的作業分配,教授讓我們爲端口1-1024構建一個端口掃描器,以針對本地主機運行。練習的要點是使用演員來演示任務級別的並行性。 prof提供的代碼按順序掃描每個端口。我們將創建一個並行執行此操作的版本,併爲系統提供每個處理器或超線程的參與者。我們的目標是花時間完成對所有端口1-1024的全面掃描,並將並行掃描的結果與串行掃描的結果進行比較。這裏是我的並行掃描代碼:scala參與者無法在並行範圍內工作
import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer
object LowPortScanner {
var lastPort = 0
var openPorts = ArrayBuffer[Int]()
var longestRunTime = 00.00
var results = List[Tuple3[Int, Range, Double]]()
val host = "localhost"
val numProcs = 1 to Runtime.getRuntime().availableProcessors()
val portsPerProc = 1024/numProcs.size
val caller = self
def main(args: Array[String]): Unit = {
//spawn an actor for each processor that scans a given port range
numProcs.foreach { proc =>
actor {
val portRange: Range = (lastPort + 1) to (lastPort + portsPerProc)
lastPort = lastPort + portsPerProc
caller ! scan(proc, portRange)
}
}
//catch results from the processor actors above
def act {
loop {
reactWithin(100) {
//update the list of results returned from scan
case scanResult: Tuple3[Int, Range, Double] =>
results = results ::: List(scanResult)
//check if all results have been returned for each actor
case TIMEOUT =>
if (results.size == numProcs.size) wrapUp
case _ =>
println("got back something weird from one of the port scan actors!")
wrapUp
}
}
}
//Attempt to open a socket on each port in the given range
//returns a Tuple3[procID: Int, ports: Range, time: Double
def scan(proc: Int, ports: Range) {
val startTime = System.nanoTime()
ports.foreach { n =>
try {
println("Processor " + proc + "is checking port " + n)
val socket = new Socket(host, n)
//println("Found open port: " + n)
openPorts += n
socket.close
} catch {
case e: Exception =>
//println("While scanning port " + n + " caught Exception: " + e)
}
}
(proc, ports, startTime - System.nanoTime())
}
//output results and kill the main actor
def wrapUp {
println("These are the open ports in the range 1-1024:")
openPorts.foreach { port => println(port) }
results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} }
println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime/1000))
caller ! exit
}
}
}
我有一個四核i7處理器,所以我numProcs = 8.在此硬件平臺上,每個演員PROC應掃描128個端口(八分之一千零二十四= 128)。我的意圖是proc1參與者掃描0 - 128,proc2應該掃描129-256,等等......但是,這不是發生了什麼事情。一些演員最終與其他演員合作的範圍相同。下面的輸出樣本示出了問題:
處理器2被檢查端口1
處理器7被檢查端口385
處理器1檢查端口1
處理器5被檢查端口1
處理器4被檢查端口1
處理器8是檢查口129
處理器3正在檢查端口1
處理器6被檢查端口257
處理器1檢查端口2
處理器5是檢查口2
處理器1檢查端口3
處理器3正在檢查端口2
處理器5被檢查端口3
處理器1檢查口4
EDIT
最後的 「工作」 代碼:
import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer
object LowPortScanner {
var lastPort = 0
var openPorts = ArrayBuffer[Int]()
var longestRunTime = 00.00
var results = List[Tuple3[Int, Range, Double]]()
val host = "localhost"
val numProcs = 1 to Runtime.getRuntime().availableProcessors()
val portsPerProc = 1024/numProcs.size
val caller = self
val procPortRanges = numProcs.foldLeft(List[Tuple2[Int, Range]]()) { (portRanges, proc) =>
val tuple2 = (proc.toInt, (lastPort + 1) to (lastPort + portsPerProc))
lastPort += portsPerProc
tuple2 :: portRanges
}
def main(args: Array[String]): Unit = {
//spawn an actor for each processor that scans a given port range
procPortRanges.foreach { proc =>
actor {
caller ! scan(proc._1, proc._2)
}
}
//catch results from the processor actors above
def act {
loop {
reactWithin(100) {
//update the list of results returned from scan
case scanResult: Tuple3[Int, Range, Double] =>
results = results ::: List(scanResult)
//check if results have been returned for each actor
case TIMEOUT =>
if (results.size == numProcs.size) wrapUp
case _ =>
println("got back something weird from one of the port scan actors!")
wrapUp
}
}
}
//Attempt to open a socket on each port in the given range
//returns a Tuple3[procID: Int, ports: Range, time: Double
def scan(proc: Int, ports: Range) {
val startTime = System.nanoTime()
ports.foreach { n =>
try {
println("Processor " + proc + "is checking port " + n)
val socket = new Socket(host, n)
//println("Found open port: " + n)
openPorts += n
socket.close
} catch {
case e: Exception =>
//println("While scanning port " + n + " caught Exception: " + e)
}
}
(proc, ports, startTime - System.nanoTime())
}
//output results and kill the main actor
def wrapUp {
println("These are the open ports in the range 1-1024:")
openPorts.foreach { port => println(port) }
results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} }
println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime/1000))
caller ! exit
}
}
}
D'oh!多麼明顯的一個可怕的錯誤......我修正了portsPerProc問題和錯誤的錯誤。但是,它仍然沒有達到我想要的效果。現在編輯與新問題的問題。 – NickAbbey
好吧,我有一種感覺,那就是我正在做的。我編輯了原信息以包含我的修復。我通過構建一個包含Tuple2的列表來保存每個proc的範圍,並修改了掃描調用,以便它使用該元組來調用具有特定proc ID和唯一範圍的掃描來解決該問題。不知道爲什麼我的wrapUp方法從來沒有被調用過......我會根據您以前的建議發佈一個單獨的問題。謝謝! – NickAbbey