2013-11-20 117 views
0

我試圖實現一個停止並使用在同一臺機器上進行通信的發送方和接收方等待ARQ。我的問題是使兩個線程同時運行,然後在兩個線程之間進行通信(可能使用Thread.notify())。目前,當我在兩個類中運行兩個獨立的主方法時,我的代碼已經工作,沒有實現流控制協議。但是,當我嘗試從單獨的主方法運行代碼時,我只能首先運行Receiver線程或發送方線程,其中任何一個都會導致代碼無限期地等待。我一般新的線程,所以任何幫助,非常感謝!一次運行兩個線程,然後在它們之間進行通信

發件人類:

import java.net.DatagramSocket; 
import java.net.DatagramPacket; 
import java.net.InetSocketAddress; 

import java.io.File; 
import java.io.FileInputStream; 
import tcdIO.*; 

/** 
* 
* Sending side of a communication channel. 
* 
* The start method splits an image into a number of packets and sends them to a given receiver. 
* The main method acts as test for the class by filling the destination host and port number and the source port number. 
* 
*/ 
public class Sender implements Runnable{ 
    static final int DEFAULT_SRC_PORT = 50000; 
    static final int DEFAULT_DST_PORT = 50001; 
    static final String DEFAULT_DST_HOST = "localhost"; 

    static final String FILENAME = "input.jpg"; 

    static final int MTU = 1500; 

    static Terminal terminal; 

    DatagramSocket socket; 
    InetSocketAddress dstAddress; 

    /** 
    * Constructor 
    * 
    */ 
    Sender() { 
     this(DEFAULT_DST_HOST, DEFAULT_DST_PORT, DEFAULT_SRC_PORT); 
    } 


    /** 
    * Constructor 
    * 
    * Attempts to create socket at given port and create an InetSocketAddress for the destinations 
    */ 
    Sender(String dstHost, int dstPort, int srcPort) { 
     try { 
      dstAddress= new InetSocketAddress(dstHost, dstPort); 
      socket= new DatagramSocket(srcPort); 
     } 
     catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    synchronized void sleep() { 
     try {this.wait(100);}catch(Exception e){e.printStackTrace();} 
    } 


    /** 
    * Sender Method 
    * 
    * Transmits a given image as a collection of packets; the first packet contains the size of the image as string. 
    */ 
    public void run() { 
     byte[] data= null; 
     DatagramPacket packet= null; 

     File file= null; 
     FileInputStream fin= null; 
     byte[] buffer= null; 
     int size; 
     int counter; 

     try { 
      file= new File(FILENAME);    // Reserve buffer for length of file and read file 
      buffer= new byte[(int) file.length()]; 
      fin= new FileInputStream(file); 
      size= fin.read(buffer); 
      if (size==-1) throw new Exception("Problem with File Access"); 
      terminal.println("File size: " + buffer.length + ", read: " + size); 

      data= (Integer.toString(size)).getBytes(); // 1st packet contains the length only 
      packet= new DatagramPacket(data, data.length, dstAddress); 
      terminal.println("Please press any key"); 
      terminal.readChar(); 
      socket.send(packet);    

      counter= 0; 
      do { 
       data= new byte[(counter+MTU<size) ? MTU : size-counter]; // The length of the packet is either MTU or a remainder 
       java.lang.System.arraycopy(buffer, counter, data, 0, data.length); 
       terminal.println("Counter: " + counter + " - Payload size: " + data.length); 

       packet= new DatagramPacket(data, data.length, dstAddress); 
       socket.send(packet); 
       this.sleep(); 
       counter+= data.length; 
      } while (counter<size); 

     terminal.println("Send complete"); 
    } 
    catch(java.lang.Exception e) { 
     e.printStackTrace(); 
    }  
} 



public static void main(String[] args) { 
    Sender s; 
    try {   
     String dstHost; 
     int dstPort; 
     int srcPort; 

     //dstHost= args[0]; 
     //dstPort= Integer.parseInt(args[1]); 
     //srcPort= Integer.parseInt(args[2]); 
     dstHost= DEFAULT_DST_HOST; 
     dstPort= DEFAULT_DST_PORT; 
     srcPort= DEFAULT_SRC_PORT; 

     terminal= new Terminal("Sender"); 

     s= new Sender(dstHost, dstPort, srcPort); 
     s.run(); 

     terminal.println("Program completed"); 
    } catch(java.lang.Exception e) { 
     e.printStackTrace(); 
    } 
} 


} 

接收機類:

import java.io.File; 
import java.io.FileOutputStream; 
import java.net.DatagramSocket; 
import java.net.DatagramPacket; 

import tcdIO.*; 

/** 
* Receiving side of a communication channel. 
* 
* The class provides the basic functionality to receive a datagram from a sender. 
* The main method acts as test for the class by filling the port number at which to receive the datagram. 
*/ 
public class Receiver implements Runnable{ 
    static final String FILENAME = "output.jpg"; 
    static final int DEFAULT_PORT = 50001; 
    static final int MTU = 1500; 
    static Terminal terminal; 

    DatagramSocket socket; 

    /** 
    * Constructor 
    * 
    */ 
    Receiver() { 
     this(DEFAULT_PORT); 
    } 


    /** 
    * Constructor 
    * 
    * Attempts to create socket at given port 
    */ 
    Receiver(int port) { 
     try { 
      socket= new DatagramSocket(port); 
     } 
     catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     } 
    } 


    /** 
    * Receiver Method 
    * 
    * Attempts to receive a number of packets that contain an image; the first packet contains the size of the image 
    */ 
    public void run() { 
     byte[] data; 
     byte[] buffer; 
     DatagramPacket packet; 
     int counter; 
     int size; 

     File file; 
     FileOutputStream fout; 

     try { 
      data= new byte[MTU]; // receive first packet with size of image as payload 
      packet= new DatagramPacket(data, data.length); 
      terminal.println("Waiting for incoming packets"); 
      socket.receive(packet);   

      data= packet.getData(); // reserve buffer to receive image 
      size= (Integer.valueOf(new String(data, 0, packet.getLength()))).intValue(); 
      terminal.println("Filesize:" + size); 
      buffer= new byte[size]; 

      counter= 0;   
      while(counter<size) { // receive packet and store payload in array 
       data= new byte[MTU]; 
       packet= new DatagramPacket(data, data.length); 
       socket.receive(packet); 
       terminal.println("Received packet - Port: " + packet.getPort() + " - Counter: " + counter + " - Payload: "+packet.getLength()); 

       System.arraycopy(data, 0, buffer, counter, packet.getLength()); 
       counter+= packet.getLength(); 
      } 

      file= new File(FILENAME);    // Create file and write buffer into file 
      fout= new FileOutputStream(file); 
      fout.write(buffer, 0, buffer.length); 
      fout.flush(); 
      fout.close(); 
     } 
     catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     }  
    } 


    /** 
    * Test method 
    * 
    * Creates an instance of the class Receiver 
    * 
    * @param args arg[0] Port number to receive information on 
    */
    public static void main(String[] args) { 
     Receiver r; 

     try { 
      terminal= new Terminal("Receiver"); 
      int port; 

      //port= Integer.parseInt(args[0]); 
      port= DEFAULT_PORT; 
      r= new Receiver(port); 
      r.run(); 

      terminal.println("Program completed"); 
     } catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     } 
    } 
    */ 
} 

和主,這只是實例都和運行它們:

import tcdIO.Terminal; 


public class FlowControlMain { 

    /** 
    * 
    * 
    */ 
    public static void main(String[] args) { 

     Sender s; 
     Receiver r; 
     try{ 
      String dstHost= "localhost"; 
      int dstPort= 50001; 
      int srcPort= 50000; 

      Sender.terminal= new Terminal("Sender"); 
      Receiver.terminal = new Terminal("Receiver"); 

      s= new Sender(dstHost, dstPort, srcPort); 
      r = new Receiver(dstPort); 
      s.run(); 
      r.run(); 

     }catch(Exception e){ 
      e.printStackTrace(); 

     } 

    } 


} 

道歉代碼的巨量,只是想給出一個完整的圖片

回答

2

你沒有使用線程,你在主線程中執行run()方法。

在自己Thread開始Runnable正確的方法是要麼

Thread t = new Thread(myRunnable); 
t.start(); 

或使用一個ExecutorService,這是一個高一點的水平,並允許的東西,如線程池。

+0

非常感謝!他們之間的溝通方面會通知()工作嗎? –

+0

取決於你的意思是溝通。等待/通知機制允許您控制線程,使一個(或多個線程)在繼續之前等待另一個完成某個任務。 – Kayaman

+0

這正是我需要發生的事情,我需要發件人線程等待接收方在發送下一個之前確認收到數據包 –

相關問題