2012-11-09 79 views
0

作爲計算機網絡軟件開發類的作業分配,教授讓我們爲端口1-1024構建一個端口掃描器,以針對本地主機運行。練習的要點是使用演員來演示任務級別的並行性。 prof提供的代碼按順序掃描每個端口。我們將創建一個並行執行此操作的版本,併爲系統提供每個處理器或超線程的參與者。我們的目標是花時間完成對所有端口1-1024的全面掃描,並將並行掃描的結果與串行掃描的結果進行比較。這裏是我的並行掃描代碼:scala參與者無法在並行範圍內工作

import java.net.Socket 
import scala.actors._ 
import Actor._ 
import scala.collection.mutable.ArrayBuffer 

object LowPortScanner { 

    var lastPort = 0 
    var openPorts = ArrayBuffer[Int]() 
    var longestRunTime = 00.00 
    var results = List[Tuple3[Int, Range, Double]]() 

    val host = "localhost" 
    val numProcs = 1 to Runtime.getRuntime().availableProcessors() 
    val portsPerProc = 1024/numProcs.size 
    val caller = self 

    def main(args: Array[String]): Unit = { 

    //spawn an actor for each processor that scans a given port range 
    numProcs.foreach { proc => 
     actor { 
     val portRange: Range = (lastPort + 1) to (lastPort + portsPerProc) 
     lastPort = lastPort + portsPerProc 
     caller ! scan(proc, portRange) 
     } 
    } 

    //catch results from the processor actors above 
    def act { 
     loop { 
     reactWithin(100) { 
      //update the list of results returned from scan 
      case scanResult: Tuple3[Int, Range, Double] => 
      results = results ::: List(scanResult) 

      //check if all results have been returned for each actor 
      case TIMEOUT => 
      if (results.size == numProcs.size) wrapUp 

      case _ => 
      println("got back something weird from one of the port scan actors!") 
      wrapUp 
     } 
     } 
    } 

    //Attempt to open a socket on each port in the given range 
    //returns a Tuple3[procID: Int, ports: Range, time: Double 
    def scan(proc: Int, ports: Range) { 
     val startTime = System.nanoTime() 
     ports.foreach { n => 
     try { 
      println("Processor " + proc + "is checking port " + n) 

      val socket = new Socket(host, n) 

      //println("Found open port: " + n) 
      openPorts += n 

      socket.close 
     } catch { 
      case e: Exception => 
      //println("While scanning port " + n + " caught Exception: " + e) 
     } 
     } 
     (proc, ports, startTime - System.nanoTime()) 
    } 

    //output results and kill the main actor 
    def wrapUp { 
     println("These are the open ports in the range 1-1024:") 
     openPorts.foreach { port => println(port) } 
     results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} } 
     println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime/1000)) 
     caller ! exit 
    } 
    } 
} 

我有一個四核i7處理器,所以我numProcs = 8.在此硬件平臺上,每個演員PROC應掃描128個端口(八分之一千零二十四= 128)。我的意圖是proc1參與者掃描0 - 128,proc2應該掃描129-256,等等......但是,這不是發生了什麼事情。一些演員最終與其他演員合作的範圍相同。下面的輸出樣本示出了問題:

處理器2被檢查端口1
處理器7被檢查端口385
處理器1檢查端口1
處理器5被檢查端口1
處理器4被檢查端口1
處理器8是檢查口129
處理器3正在檢查端口1
處理器6被檢查端口257
處理器1檢查端口2
處理器5是檢查口2
處理器1檢查端口3
處理器3正在檢查端口2
處理器5被檢查端口3
處理器1檢查口4

EDIT

最後的 「工作」 代碼:

import java.net.Socket 
import scala.actors._ 
import Actor._ 
import scala.collection.mutable.ArrayBuffer 

object LowPortScanner { 

    var lastPort = 0 
    var openPorts = ArrayBuffer[Int]() 
    var longestRunTime = 00.00 
    var results = List[Tuple3[Int, Range, Double]]() 

    val host = "localhost" 
    val numProcs = 1 to Runtime.getRuntime().availableProcessors() 
    val portsPerProc = 1024/numProcs.size 
    val caller = self 
    val procPortRanges = numProcs.foldLeft(List[Tuple2[Int, Range]]()) { (portRanges, proc) => 
    val tuple2 = (proc.toInt, (lastPort + 1) to (lastPort + portsPerProc)) 
    lastPort += portsPerProc 
    tuple2 :: portRanges 
    } 

    def main(args: Array[String]): Unit = { 

    //spawn an actor for each processor that scans a given port range 
    procPortRanges.foreach { proc => 
     actor { 
     caller ! scan(proc._1, proc._2) 
     } 
    } 

//catch results from the processor actors above 
def act { 
    loop { 
    reactWithin(100) { 
     //update the list of results returned from scan 
     case scanResult: Tuple3[Int, Range, Double] => 
     results = results ::: List(scanResult) 

     //check if results have been returned for each actor 
     case TIMEOUT => 
     if (results.size == numProcs.size) wrapUp 

     case _ => 
     println("got back something weird from one of the port scan actors!") 
     wrapUp 
    } 
    } 
} 

    //Attempt to open a socket on each port in the given range 
    //returns a Tuple3[procID: Int, ports: Range, time: Double 
    def scan(proc: Int, ports: Range) { 
     val startTime = System.nanoTime() 
     ports.foreach { n => 
     try { 
      println("Processor " + proc + "is checking port " + n) 

      val socket = new Socket(host, n) 

      //println("Found open port: " + n) 
      openPorts += n 

      socket.close 
     } catch { 
      case e: Exception => 
      //println("While scanning port " + n + " caught Exception: " + e) 
     } 
     } 
     (proc, ports, startTime - System.nanoTime()) 
    } 

    //output results and kill the main actor 
    def wrapUp { 
     println("These are the open ports in the range 1-1024:") 
     openPorts.foreach { port => println(port) } 
     results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} } 
     println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime/1000)) 
     caller ! exit 
    } 
    } 
} 

回答

3

在這個硬件平臺上,每個proc actor應該掃描128個端口(1024/8 = 128)。

除非你有

val portsPerProc = numProcs.size/1024 

和一千零二十四分之八是0。注意,你也有一個差一錯誤導致每個演員進行掃描比portsPerProc 1多端口它應該掃描lastPort to (lastPort + portsPerProc) - 1(lastPort + 1) to (lastPort + portsPerProc)

對於未來,如果你有一個不同的問題,您應該單獨問吧:)但在這裏你有一個非常明顯的競爭條件:所有的演員都試圖執行

val portRange: Range = (lastPort + 1) to (lastPort + portsPerProc) 
    lastPort = lastPort + portsPerProc 

兼任。想想當(例如)actor1和2在任何actor到達第二個之前執行第一行時會發生什麼。

+0

D'oh!多麼明顯的一個可怕的錯誤......我修正了portsPerProc問題和錯誤的錯誤。但是,它仍然沒有達到我想要的效果。現在編輯與新問題的問題。 – NickAbbey

+0

好吧,我有一種感覺,那就是我正在做的。我編輯了原信息以包含我的修復。我通過構建一個包含Tuple2的列表來保存每個proc的範圍,並修改了掃描調用,以便它使用該元組來調用具有特定proc ID和唯一範圍的掃描來解決該問題。不知道爲什麼我的wrapUp方法從來沒有被調用過......我會根據您以前的建議發佈一個單獨的問題。謝謝! – NickAbbey