2011-02-11 46 views
2

我所知道的事實,下面的代碼看起來粗俗,但我是新來的這些東西,爲了得到它的工作只是想盡一切..一個線程停止太早不管的CyclicBarrier

問題:儘管我正在使用CyclicBarrier(可能是錯誤的方式),但它似乎永遠都是一樣的 - 線程停止得太快並打印出他的向量,在這些「傳入連接」消息中缺少11個消息。在我的循環的最後一次迭代中可能有一些可怕的錯誤,但我似乎無法找到究竟是什麼......現在程序只是循環等待處理最後一次連接。

public class VectorClockClient implements Runnable { 
/* 
* Attributes 
*/ 

/* 
* The client number is store to provide fast 
* array access when, for example, a thread's own 
* clock simply needs to be incremented. 
*/ 
private int clientNumber; 
private File configFile, inputFile; 
int[] vectorClock; 

/* 
* Constructor 
* @param 
* - File config 
* - int line 
* - File input 
* - int clients 
*/ 
public VectorClockClient(File config, int line, File input, int clients) { 
    /* 
    * Make sure that File handles aren't null and that 
    * the line number is valid. 
    */ 
    if (config != null && line >= 0 && input != null) { 
     configFile = config; 
     inputFile = input; 
     clientNumber = line; 
     /* 
     * Set the array size to the number of lines found in the 
     * config file and initialize with zero values. 
     */ 
     vectorClock = new int[clients]; 
     for (int i = 0; i < vectorClock.length; i++) { 
      vectorClock[i] = 0; 
     } 
    } 
} 

private int parsePort() { 
    int returnable = 0; 
    try { 
     FileInputStream fstream = new FileInputStream(configFile.getName()); 
     DataInputStream in = new DataInputStream(fstream); 
     BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
     String strLine = ""; 
     for (int i = 0; i < clientNumber + 1; i++) { 
      strLine = br.readLine(); 
     } 
     String[] tokens = strLine.split(" "); 
     returnable = Integer.parseInt(tokens[1]); 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    System.out.println("[" + clientNumber + "] returned with " + returnable + "."); 
    return returnable; 
} 

private int parsePort(int client) { 
    int returnable = 0; 
    try { 
     FileInputStream fstream = new FileInputStream(configFile.getName()); 
     DataInputStream in = new DataInputStream(fstream); 
     BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
     String strLine = ""; 
     for (int i = 0; i < client; i++) { 
      strLine = br.readLine(); 
     } 
     String[] tokens = strLine.split(" "); 
     returnable = Integer.parseInt(tokens[1]); 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    return returnable; 
} 

private int parseAction(String s) { 
    int returnable = -1; 
    try { 
     FileInputStream fstream = new FileInputStream(configFile.getName()); 
     DataInputStream in = new DataInputStream(fstream); 
     BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
     String[] tokens = s.split(" "); 
     if (!(Integer.parseInt(tokens[0]) == this.clientNumber + 1)) { 
      return -1; 
     } 
     else { 
      if (tokens[1].equals("L")) { 
       vectorClock[clientNumber] += Integer.parseInt(tokens[2]); 
      } 
      else { 
       returnable = Integer.parseInt(tokens[2]); 
      } 
     } 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    return returnable; 
} 

/* 
* Do the actual work. 
*/ 
public void run() { 
    try { 
     InitClients.barrier.await(); 
    } 
    catch (Exception e) { 
     System.out.println(e); 
    } 
    int port = parsePort(); 
    String hostname = "localhost"; 
    String strLine; 
    ServerSocketChannel ssc; 
    SocketChannel sc; 
    FileInputStream fstream; 
    DataInputStream in; 
    BufferedReader br; 
    boolean eof = false; 
    try { 
     ssc = ServerSocketChannel.open(); 
     ssc.socket().bind(new InetSocketAddress(hostname, port)); 
     ssc.configureBlocking(false); 
     fstream = new FileInputStream("input_vector.txt"); 
     in = new DataInputStream(fstream); 
     br = new BufferedReader(new InputStreamReader(in)); 

     try { 
      InitClients.barrier.await(); 
     } 
     catch (Exception e) { 
      System.out.println(e); 
     } 

     while (true && (eof == false)) { 
      sc = ssc.accept(); 

      if (sc == null) { 
       if ((strLine = br.readLine()) != null) { 
        int result = parseAction(strLine); 
        if (result >= 0) { 
         //System.out.println("[" + (clientNumber + 1) 
         //+ "] Send a message to " + result + "."); 
         try { 
          SocketChannel client = SocketChannel.open(); 
          client.configureBlocking(true); 
          client.connect(
            new InetSocketAddress("localhost", 
            parsePort(result))); 
          //ByteBuffer buf = ByteBuffer.allocateDirect(32); 
          //buf.put((byte)0xFF); 
          //buf.flip(); 
          //vectorClock[clientNumber] += 1; 
          //int numBytesWritten = client.write(buf); 
          String obj = Integer.toString(clientNumber+1); 
          ObjectOutputStream oos = new 
            ObjectOutputStream(
            client.socket().getOutputStream()); 
          oos.writeObject(obj); 
          oos.close(); 
         } 
         catch (Exception e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
       else { 
        eof = true; 
       } 
      } 
      else { 
       ObjectInputStream ois = new 
         ObjectInputStream(sc.socket().getInputStream()); 
       String clientNumberString = (String)ois.readObject(); 
       System.out.println("At {Client[" + (clientNumber + 1) 
         + "]}Incoming connection from: " 
         + sc.socket().getRemoteSocketAddress() 
         + " from {Client[" + clientNumberString + "]}"); 
       sc.close(); 
      } 
      try { 
       InitClients.barrier.await(); 
      } 
      catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    printVector(); 
} 

private void printVector() { 
    System.out.print("{Client[" + (clientNumber + 1) + "]}{"); 
    for (int i = 0; i < vectorClock.length; i++) { 
     System.out.print(vectorClock[i] + "\t"); 
    } 
    System.out.println("}"); 
} 

}
澄清,這裏使用的文件格式。 Config包含客戶端使用的線程和輸入文件行的主機名和端口,意思是「該客戶端向該客戶端發送消息」或「該客戶端通過某個常量值遞增其邏輯時鐘」。

1 M 2(M表示發送消息)
2 M 3
-3 M 4
2升7(L意味着遞增時鐘)
2 M 1
...
127.0.0.1 9000
127.0.0.1 9001
127.0.0.1 9002
127.0.0.1 9003
。 ..

回答

0

我會看你在期待一個傳入套接字連接的相關邏輯。從你的問題看來,你期望有一定數量的傳入套接字連接(可能是每個傳出消息之後的傳入連接?)。由於您在傳入套接字上使用非阻塞I/O,因此在建立傳入套接字之前,while語句始終可能會循環。結果,一個線程將能夠繼續並從文件中讀取下一行,而無需接收連接。由於您的結束狀態一旦達到文件結尾,就可能會丟失傳入的套接字連接。

我會添加一些簡單的打印輸出,當您從文件中讀取時,當您發送消息和收到傳入連接時顯示。這應該很快告訴你一個特定的線程是否缺少預期的傳入連接。如果事實證明問題是由於非阻塞I/O造成的,那麼當您希望傳入套接字或實現一個控制器時,您可能需要禁用非阻塞I/O,以跟蹤您期望有多少傳入套接字並繼續直到達到目標。

希望這會有所幫助。

+0

某些線程可能不會收到任何連接請求。輸入文件用於確定誰應該做什麼。 目前,如果我啓用非阻塞客戶端,程序將生成關於尚未連接的套接字的錯誤。另外,如果我禁用非阻塞服務器,程序將會掛起,因爲每個線程都在等待連接,並且沒有人正在讀取輸入文件。我也嘗試實現一個選擇器,但問題是那裏的循環阻塞在select()調用。 – treiman 2011-02-11 14:00:22