2010-05-13 137 views
1

我正在從事一個基本上是聊天室的java程序。這是班級的任務,所以沒有代碼請,我只是有一些問題,確定處理我需要做的最可行的方法。我已經爲使用線程獲取數據輸入流的單個客戶端設置了服務器程序,並且有一個線程用於處理數據輸出流上的發送。我現在需要做的是爲每個傳入請求創建一個新線程。使用線程來處理套接字

我的想法是創建一個鏈表來包含客戶端套接字或可能的線程。我磕磕絆絆的地方是搞清楚如何處理將消息發送給所有客戶端。如果我爲每個傳入消息都有一個線程,那麼我該如何轉向並將其發送到每個客戶端套接字。

我在想,如果我有一個clientsockets的鏈表,我可以遍歷這個鏈表並將它發送給每個鏈表,但是我必須每次都創建一個dataoutputstream。我可以創建一個數據輸出流的鏈表嗎​​?對不起,如果聽起來像我漫不經心,但我不想開始編碼這個,它可能會得到混亂,沒有一個好的計劃。謝謝!

編輯 我決定發佈我到目前爲止的代碼。我還沒有機會測試它,所以任何評論都會很棒。謝謝!

import java.io.BufferedReader; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.Socket; 
import java.net.ServerSocket; 
import java.util.LinkedList; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class prog4_server { 

    // A Queue of Strings used to hold out bound Messages 
    // It blocks till on is available 
    static BlockingQueue<String> outboundMessages = new LinkedBlockingQueue<String>(); 

    // A linked list of data output streams 
    // to all the clients 
    static LinkedList<DataOutputStream> outputstreams; 

    // public variables to track the number of clients 
    // and the state of the server 
    static Boolean serverstate = true; 
    static int clients = 0; 

    public static void main(String[] args) throws IOException{ 

     //create a server socket and a clientSocket 
     ServerSocket serverSocket = null; 
     try { 
      serverSocket = new ServerSocket(6789); 
     } catch (IOException e) { 
      System.out.println("Could not listen on port: 6789"); 
      System.exit(-1); 
     }// try{...}catch(IOException e){...} 

     Socket clientSocket; 

     // start the output thread which waits for elements 
     // in the message queue 
     OutputThread out = new OutputThread(); 
     out.start(); 

     while(serverstate){ 

      try { 

       // wait and accept a new client 
       // pass the socket to a new Input Thread 
       clientSocket = serverSocket.accept(); 
       DataOutputStream ServerOut = new DataOutputStream(clientSocket.getOutputStream()); 
       InputThread in = new InputThread(clientSocket, clients); 
       in.start(); 
       outputstreams.add(ServerOut); 

      } catch (IOException e) { 

       System.out.println("Accept failed: 6789"); 
       System.exit(-1); 
      }// try{...}catch{..} 

      // increment the number of clients and report 
      clients = clients++; 

      System.out.println("Client #" + clients + "Accepted"); 

     }//while(serverstate){... 

    }//public static void main 

    public static class OutputThread extends Thread { 

     //OutputThread Class Constructor 
     OutputThread() { 
     }//OutputThread(...){... 

     public void run() { 

      //string variable to contain the message 
      String msg = null; 

      while(!this.interrupted()) { 

       try { 

        msg = outboundMessages.take(); 

        for(int i=0;i<outputstreams.size();i++){ 

         outputstreams.get(i).writeBytes(msg + '\n'); 

        }// for(...){... 

       } catch (IOException e) { 

        System.out.println(e); 

       } catch (InterruptedException e){ 

        System.out.println(e); 

       }//try{...}catch{...} 

      }//while(...){ 

     }//public void run(){... 

    }// public OutputThread(){... 

    public static class InputThread extends Thread { 

     Boolean threadstate = true; 
     BufferedReader ServerIn; 
     String user; 
     int threadID; 
     //SocketThread Class Constructor 
     InputThread(Socket clientSocket, int ID) { 

      threadID = ID; 

      try{ 
       ServerIn = new BufferedReader(
        new InputStreamReader(clientSocket.getInputStream())); 
        user = ServerIn.readLine(); 
      } 
      catch(IOException e){ 
       System.out.println(e); 
      } 

     }// InputThread(...){... 

     public void run() { 

      String msg = null; 

     while (threadstate) { 

       try { 

        msg = ServerIn.readLine(); 

        if(msg.equals("EXITEXIT")){ 

         // if the client is exiting close the thread 
         // close the output stream with the same ID 
         // and decrement the number of clients 
      threadstate = false; 
         outputstreams.get(threadID).close(); 
         outputstreams.remove(threadID); 
         clients = clients--; 
         if(clients == 0){ 
          // if the number of clients has dropped to zero 
          // close the server 
          serverstate = false; 
          ServerIn.close(); 
         }// if(clients == 0){... 
        }else{ 

         // add a message to the message queue 
         outboundMessages.add(user + ": " + msg); 

        }//if..else... 

       } catch (IOException e) { 

        System.out.println(e); 

       }// try { ... } catch { ...} 

     }// while 

     }// public void run() { ... 
    } 

    public static class ServerThread extends Thread { 

     //public variable declaration 
     BufferedReader UserIn = 
       new BufferedReader(new InputStreamReader(System.in)); 

     //OutputThread Class Constructor 
     ServerThread() { 

     }//OutputThread(...){... 

     public void run() { 

      //string variable to contain the message 
      String msg = null; 

      try { 

       //while loop will continue until 
       //exit command is received 
       //then send the exit command to all clients 

       msg = UserIn.readLine(); 

       while (!msg.equals("EXITEXIT")) { 

        System.out.println("Enter Message: "); 
        msg = UserIn.readLine(); 

       }//while(...){ 

       outboundMessages.add(msg); 
       serverstate = false; 
       UserIn.close(); 

      } catch (IOException e) { 
       System.out.println(e); 

      }//try{...}catch{...} 


     }//public void run(){... 
    }// public serverThread(){... 

}// public class prog4_server 
+1

「每個請求的線程數」或「每個線程的線程數」不會擴展 - 請考慮5K客戶端連接到服務器時會發生什麼情況。 – 2010-05-13 16:29:15

+0

我不確定你的意思,你是說我需要限制線程數? – Levi 2010-05-13 16:46:03

+0

尼古拉,如果你不提供更好的主意,你的'抱怨'有什麼意義?我也會有興趣知道如何正確地'縮放':) – 2010-05-13 16:46:25

回答

3

我已經通過定義每個客戶端連接的「MessageHandler」級,負責入站/出站郵件流量在過去的解決了這個問題。處理程序在內部使用一個BlockingQueue實現,出站消息放置在該實現上(通過內部工作線程)。 I/O發送者線程不斷嘗試從隊列中讀取(如果需要,阻塞),並將檢索到的每個消息發送給客戶端。

這裏的一些骨架示例代碼(未測試):

/** 
* Our Message definition. A message is capable of writing itself to 
* a DataOutputStream. 
*/ 
public interface Message { 
    void writeTo(DataOutputStream daos) throws IOException; 
} 

/** 
* Handler definition. The handler contains two threads: One for sending 
* and one for receiving messages. It is initialised with an open socket. 
*/  
public class MessageHandler { 
    private final DataOutputStream daos; 
    private final DataInputStream dais; 
    private final Thread sender; 
    private final Thread receiver; 
    private final BlockingQueue<Message> outboundMessages = new LinkedBlockingQueue<Message>(); 

    public MessageHandler(Socket skt) throws IOException { 
    this.daos = new DataOutputStream(skt.getOutputStream()); 
    this.dais = new DataInputStream(skt.getInputStream()); 

    // Create sender and receiver threads responsible for performing the I/O. 
    this.sender = new Thread(new Runnable() { 
     public void run() { 
     while (!Thread.interrupted()) { 
      Message msg = outboundMessages.take(); // Will block until a message is available. 

      try { 
      msg.writeTo(daos); 
      } catch(IOException ex) { 
      // TODO: Handle exception 
      } 
     } 
     } 
    }, String.format("SenderThread-%s", skt.getRemoteSocketAddress())); 

    this.receiver = new Thread(new Runnable() { 
     public void run() { 
     // TODO: Read from DataInputStream and create inbound message. 
     } 
    }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress())); 

    sender.start(); 
    receiver.start(); 
    } 

    /** 
    * Submits a message to the outbound queue, ready for sending. 
    */ 
    public void sendOutboundMessage(Message msg) { 
    outboundMessages.add(msg); 
    } 

    public void destroy() { 
    // TODO: Interrupt and join with threads. Close streams and socket. 
    } 
} 

注意尼古拉是在I/O使用1(或2)每連接線程不是一個可擴展的解決方案,並且通常應用可能是阻擋正確使用Java NIO編寫來解決這個問題。但是,實際上,除非你正在編寫一個有數千個客戶端同時連接的企業服務器,否則這不是一個真正的問題。使用Java NIO編寫無錯誤的可伸縮應用程序是困難,當然不是我推薦的東西。

+0

謝謝。我之前從未使用過阻塞隊列,但我可以看到它如何適合我的原始設計。我知道這不是可擴展的,但這不屬於本作業的範圍。 我看到的是,我最初使用兩個線程可以使用全局消息變量並在輸出線程中阻塞隊列,以便在所有活動客戶端端口上發送消息。 我打算看看這是否可行。我也需要找到一種方法來關閉與一個套接字相關的線程。 – Levi 2010-05-13 17:16:47

+0

是的 - 您可以創建一個消息,然後將其傳遞給每個處理程序。要關閉你的線程,你需要調用thread.interrupt(),然後再調用thread.join()。然而,爲了這個工作,重要的是兩個線程通過Thread.interrupted()定期檢查它們的中斷狀態。 – Adamski 2010-05-13 17:34:44

+0

我之前的版本基於用戶輸入而退出。這對於輸入線程來說很簡單,但我很難看到如何保持該輸入線程與其相應的輸出線程之間的關聯。 再次感謝您的幫助,我喜歡學習新的更好的方式來做事情。 – Levi 2010-05-13 18:04:29