2015-05-20 71 views
-1
I have tried a program which download files parallely using java.nio by creating a thread per file download. 

    package com.java.tftp.nio; 

import java.io.File; 
import java.io.FileNotFoundException; 
import java.io.FileOutputStream; 
import java.io.IOException; 
import java.io.OutputStream; 
import java.net.InetSocketAddress; 
import java.net.SocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.DatagramChannel; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Set; 

/** 
* This class is used to download files concurrently from tftp server by 
* configuring the filenames, no of files. 
* 
* @author SHRIRAM 
* 
*/ 
public class TFTP_NIO_Client { 

    /** 
    * destination folder 
    * */ 
    private String destinationFolder; 

    /** 
    * list of files names to download 
    * */ 
    private List<String> fileNames; 
    /** 
    * integer indicates the number of files to download concurrently 
    * */ 
    private int noOfFilesToDownload; 

    public TFTP_NIO_Client(List<String> fileNames, String destinationFolder, 
      int noOfFilesToDownload) { 
     this.destinationFolder = destinationFolder; 
     this.fileNames = fileNames; 
     this.noOfFilesToDownload = noOfFilesToDownload; 
     initializeHandlers(); 
    } 

    /** 
    * This method creates threads to register the channel to process download 
    * files concurrently. 
    * 
    * @param noOfFilesToDownload 
    *   - no of files to download 
    */ 
    private void initializeHandlers() { 
     for (int i = 0; i < noOfFilesToDownload; i++) { 
      try { 
       Selector aSelector = Selector.open(); 
       SelectorHandler theSelectionHandler = new SelectorHandler(
         aSelector, fileNames.get(i)); 
       theSelectionHandler.start(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    /** 
    * Setup RRQ/WRQ packet Packet : | Opcode | FileName | 0 | mode | 0 | 
    * Filename -> Filename in array of bytes. 0 -> indicates end of file mode 
    * -> string in byte array 'netascii' or 'octet' 
    * 
    * @param aOpcode 
    * @param aMode 
    * @param aFileName 
    * @throws IOException 
    */ 
    private void sendRequest(int aOpcode, int aMode, String aFileName, 
      DatagramChannel aChannel, InetSocketAddress aAddress) 
      throws IOException { 
     // Read request packet 
     TFTPReadRequestPacket theRequestPacket = new TFTPReadRequestPacket(); 
     aChannel.send(
       theRequestPacket.constructReadRequestPacket(aFileName, aMode), 
       aAddress); 
    } 

    /** 
    * sends TFTP ACK Packet Packet : | opcode | Block# | opcode -> 4 -> 2 bytes 
    * Block -> block number -> 2bytes 
    * 
    * @param aBlock 
    */ 
    private ByteBuffer sendAckPacket(int aBlockNumber) { 
     // acknowledge packet 
     TFTPAckPacket theAckPacket = new TFTPAckPacket(); 
     return theAckPacket.getTFTPAckPacket(aBlockNumber); 
    } 

    /** 
    * This class is used to handle concurrent downloads from the server. 
    * 
    * */ 
    public class SelectorHandler extends Thread { 
     private Selector selector; 
     private String fileName; 

     /** 
     * flag to indicate the file completion. 
     * */ 
     private boolean isFileReadFinished = false; 

     public SelectorHandler(Selector aSelector, String aFileName) 
       throws IOException { 
      this.selector = aSelector; 
      this.fileName = aFileName; 
      registerChannel(); 
     } 

     private void registerChannel() throws IOException { 
      DatagramChannel theChannel = DatagramChannel.open(); 
      theChannel.configureBlocking(false); 
      selector.wakeup(); 
      theChannel.register(selector, SelectionKey.OP_READ); 
      sendRequest(Constants.OP_READ, Constants.ASCII_MODE, fileName, 
        theChannel, new InetSocketAddress(Constants.HOST, 
          Constants.TFTP_PORT)); 
     } 

     @Override 
     public void run() { 
      process(); 
     } 

     private void process() { 
      System.out.println("Download started for " + fileName + " "); 
      File theFile = new File(destinationFolder 
        + fileName.substring(fileName.lastIndexOf("/"))); 
      FileOutputStream theFout = null; 
      try { 
       theFout = new FileOutputStream(theFile); 
      } catch (FileNotFoundException e) { 
       e.printStackTrace(); 
      } 

      while (!isFileReadFinished) { 
       try { 
        if (selector.select() == 0) { 
         try { 
          // sleep 2sec was introduced because selector is 
          // thread safe but keys are not thread safe 
          Thread.sleep(2000); 
         } catch (InterruptedException e) { 
          continue; 
         } 
         continue; 
        } 
        Set<SelectionKey> theSet = selector.selectedKeys(); 
        Iterator<SelectionKey> theSelectedKeys = theSet.iterator(); 
        synchronized (theSelectedKeys) { 
         while (theSelectedKeys.hasNext()) { 
          SelectionKey theKey = theSelectedKeys.next(); 
          theSelectedKeys.remove(); 
          if (theKey.isReadable()) { 
           isFileReadFinished = read(theKey, theFout, 
             fileName); 
           if (!isFileReadFinished) { 
            theKey.interestOps(SelectionKey.OP_READ); 
           } 
          } else if (theKey.isWritable()) { 
           // there is no implementation for file write to 
           // server. 
           theKey.interestOps(SelectionKey.OP_READ); 
          } 
         } 
        } 
       } catch (IOException ie) { 
        ie.printStackTrace(); 
       } 
      } 
      System.out.println("Download finished for " + fileName); 
      try { 
       if (selector.isOpen()) { 
        selector.close(); 
       } 
       if (theFout != null) { 
        theFout.close(); 
       } 
      } catch (IOException ie) { 

      } 
     } 
    } 

    /** 
    * @param aKey 
    *   registered key for the selector 
    * @param aOutStream 
    *   - file output stream to write the file contents. 
    * @return boolean 
    * @throws IOException 
    */ 
    private boolean read(SelectionKey aKey, OutputStream aOutStream, 
      String aFileName) throws IOException { 
     DatagramChannel theChannel = (DatagramChannel) aKey.channel(); 

     // data packet 
     TFTPDataPacket theDataPacket = new TFTPDataPacket(); 
     ByteBuffer theReceivedBuffer = theDataPacket.constructTFTPDataPacket(); 
     SocketAddress theSocketAddress = theChannel.receive(theReceivedBuffer); 
     theReceivedBuffer.flip(); 
     byte[] theBuffer = theReceivedBuffer.array(); 
     byte[] theDataBuffer = theDataPacket.getDataBlock(); 
     if (theDataPacket.getOpCode() == Constants.OP_DATA) { 
      int theLimit = theDataPacket.getLimit(); 
      // checks the limit of the buffer because a packet with data less 
      // than 512 bytes of content signals that it is the last packet in 
      // transmission for this particular file 
      if (theLimit != Constants.MAX_BUFFER_SIZE 
        && theLimit < Constants.MAX_BUFFER_SIZE) { 
       byte[] theLastBlock = new byte[theLimit]; 
       System.arraycopy(theBuffer, 0, theLastBlock, 0, theLimit); 
       // writes the lastblock 
       aOutStream.write(theLastBlock); 

       // sends an acknowledgment to the server using TFTP packet 
       // block number 
       theChannel 
         .send(sendAckPacket((((theBuffer[2] & 0xff) << 8) | (theBuffer[3] & 0xff))), 
           theSocketAddress); 
       if (theChannel.isOpen()) { 
        theChannel.close(); 
       } 
       return true; 
      } else { 
       aOutStream.write(theDataBuffer); 
       // sends an acknowledgment to the server using TFTP packet 
       // block number 
       theChannel 
         .send(sendAckPacket((((theBuffer[2] & 0xff) << 8) | (theBuffer[3] & 0xff))), 
           theSocketAddress); 
       return false; 
      } 
     } else if (Integer.valueOf(theBuffer[1]) == Constants.OP_ERROR) { 
      System.out.println("File : " + aFileName + " not found "); 
      handleError(theReceivedBuffer); 
     } 
     return false; 
    } 

    /** 
    * This method handles the error packet received from Server. 
    * 
    * @param aBuffer 
    */ 
    private void handleError(ByteBuffer aBuffer) { 

     // Error packet 
     new TFTPErrorPacket(aBuffer); 
    } 
} 

    Is it possible to download multiple files in parallel using java.nio by not creating a thread per file download? If yes can anybody suggest a solution to proceed further. 
+0

是的。它是。但請展示你的工作。 – rmalchow

+0

@rmalchow我附上了檔案.. – Shriram

回答

0

我將提供一種方法來實現你的目標爲:

  1. 大號的文件列表被下載。
  2. 創建地圖M它將保存要下載的文件名和相應的Selector實例的映射。

  3. 對於每個文件˚F大號 獲取從M個選擇器SK對應於˚F 過程中的選擇器的通過檢查任何正在準備的事件的狀態。 如果處理完成,則將對應於F的選擇器設置爲空。這將有助於識別 其 處理完成的文件。或者,您可以刪除FL;以便下一次循環時只處理尚未完全下載的文件。

上面這樣說,我很好奇,爲什麼你會想要嘗試這樣的壯舉?如果這個要求背後的思考過程是將線程數量減少到1,那麼它是不正確的。請記住,最終會導致單線程運行,並確保吞吐量不一定是是最佳的,因爲單線程將同時處理網絡和磁盤I/O。另外,考慮在將多個文件中的一個文件寫入磁盤時遇到異常的情況 - 您最終會中止所有文件的傳輸;我相信你不想要的東西。

更好和更具可擴展性的方法是在單個線程上輪詢選擇器,但將任何I/O活動交給工作線程。更好的方法仍然是閱讀Doug Lea's paper中介紹的技術並實施它們。事實上,Netty library已經實現了這種模式,並被廣泛用於生產。