2017-04-10 117 views
2

發送大於15GB的文件時,數據傳輸速率有問題。我有3臺服務器和一臺客戶機。當從客戶端發送文件到服務器時,我將文件分成塊(每個塊通常爲256MB),每個塊在2個服務器上覆制。複製發生在管道方法中。發送數據塊時,每個數據塊被分割成更小的數據包(每個數據包通常爲128 KB),發送到服務器,並在服務器端合併以存儲在硬盤驅動器中。這裏一切都很好。我測試了5GB到50GB文件的系統,增量爲5GB。所有文件的平均寫入大約爲600MB /秒。見下圖。我在這裏與HDFS進行比較。15GB後數據傳輸速率變慢,文件傳輸變大

enter image description here

問題從服務器讀取相同的文件時發生。文件分佈在多個服務器上。例如,我可以從server1讀取block1,從server2讀取block2,依此類推。直觀地說,讀取必須比寫入更快,因爲客戶端並行地從3個服務器讀取。當讀取小於15GB {5GB,10GB,15GG}的文件時,性能約爲1.1GB /秒。讀取大於20GB的文件時出現問題{20GB,25GB,...,50GB}。隨着文件大小的增加,性能會下降。

Performance graph for 50GB file when reading

上圖爲用於讀取50GB文件的基準測試。每個黑點顯示一個單獨的塊讀取時間。正如你所看到的那樣,表現在60-70區塊之後開始下降。有趣的是,這發生在所有大於15GB的文件上,在相同的地方(大約第65塊)放慢。隨着文件大小的增加,緩慢的部分佔主導地位,並且性能越來越差。我覺得在16GB左右有一些障礙。我看到的唯一提示可能會有所幫助,那就是3臺服務器並行地隨機發送數據塊,直到第65位。所以塊的轉移是重疊的。之後,一個服務器按循環順序一次發送。我可以從日誌輸出中看到這一點。這裏還有一些重疊,但沒有之前的第65塊。

我對這個項目使用java 1.8,而netty 4.1.8。作爲tcp服務器。 OS是CentOS 7. 每臺服務器有兩個CPU(Intel(R)Xeon®CPU E5-2650 v3 @ 2.30GHz)= 40個核心 64GB RAM 10 GBit以太網。

我花了很多時間,找不到問題的根本原因。 Java VM,Netty,OS,OS TCP默認值或其他原因可能導致此問題。

服務器端BlockSenderManager

@Override 
    public void run(){ 

     while(nodeManager.isRunning()){ 
      try 
      { 
       BlockRequest br = blockSenders.take(); 
       if(br != null){ 
        executor.execute(new BlockSender(br, this)); 
       } 

       if(wait.take()) 
        System.out.println(br.getBlockId()+" Delivered"); 
      } 
      catch (InterruptedException e) 
      { 
       e.printStackTrace(); 
      } 
     } 

BlockSender在服務器端:

@Override 
     public void run() 
     { 
      FileInputStream fis = null; 

      try 
      { 
       java.io.File file = new java.io.File(path+"/" + blockRequest.getBlockId()); 

       fis = new FileInputStream(file); 
       fSize = file.length(); 
       long rem = fSize; 

       sendBlockInfo(); 
       int bufSize; 
       if (fSize < (long) packetSize) 
        bufSize = (int) fSize; 
       else 
        bufSize = packetSize; 
       int read = 0, packetOrder = 1; 

       byte[] data; 
       if(bufSize <= rem) 
        data = new byte[bufSize]; 
       else 
        data = new byte[(int)rem]; 
       while ((read = (fis.read(data))) > 0) 
       { 
        if (read < 1) 
         break; 

        BlockPacket bp = new BlockPacket(); 

        bp.setRequestId(blockRequest.getRequestId()); 
        bp.setBlockId(blockRequest.getBlockId()); 
        bp.setData(data); 
        bp.setPacketSeqNo(packetOrder); 
        if(read < bufSize) 
        { 
         bp.setIsLastPacket(true); 
        } 

        executor.execute(new Sender(bp)); 

        packetOrder++; 
        if(rem > bufSize) 
         rem = rem - bufSize; 

        if(bufSize <= rem) 
         data = new byte[bufSize]; 
        else 
        { 
         data = new byte[(int)rem]; 
        } 
       } 

       fis.close(); 
       executor.shutdown(); 
      } 
      catch (FileNotFoundException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      catch (IOException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

public class Sender implements Runnable 
    { 
     private final BlockPacket bp; 
     private final FileBlock fb; 
     private DataClient dc; 

     public Sender(BlockPacket bp) 
     { 
      this.bp = bp; 
      this.fb = null; 
      dc = getDataClient(requestClient); 
     } 

     public Sender(FileBlock fb) 
     { 
      this.bp = null; 
      this.fb = fb; 
      dc = getDataClient(requestClient); 
     } 

     @Override 
     public void run() 
     { 

      if (dc != null) 
      { 
       if (bp != null) 
       { 
        dc.send(bp); 
       } 
       else if (fb != null) 
       { 
        dc.send(fb); 
       } 
      } 

     } 
    } 

ReceivedPacketProcessor在客戶端

public void processBlockPacket(BlockPacket bp) 
    { 
     ByteBuffer buf = ByteBuffer.wrap(bp.getData()); 
     try 
     { 
      inChannel.write(buf); 
     } 
     catch (IOException e) 
     { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
     public void run() 
     { 
      try 
      { 
       aFile = new RandomAccessFile(path+"/"+fileName, "rw"); 
       inChannel = aFile.getChannel(); 
       //java.io.File f = new java.io.File(path+"/"+fileName); 
       //fop = new FileOutputStream(f); 
       String reqId = file.getFileID(); 
       currentBlockId = reqId + "_" + currentBlockSeq; 
       while (true) 
       { 
        BlockPacket bp = null; 
        if (numberOfBlocks > 0) 
        { 
         try 
         { 
          bp = this.blockingQueue.take(); 
         } 
         catch (InterruptedException e) 
         { 
          e.printStackTrace(); 
         } 
         if (bp.getBlockId().equals(currentBlockId)) 
         { 
          if (currentPacket == bp.getPacketSeqNo()) 
          { 

           if(fileBlocks.containsKey(currentBlockId)) 
           { 
            processBlockPacket(bp); 
            if(currentPacket < fileBlocks.get(currentBlockId).getNoOfPackets()) 
             currentPacket++; 
            else 
            { 
             if (fileBlocks.get(currentBlockId).getPackets().size() < 1) 
             { 
              removeFileBlock(currentBlockId); 
              currentBlockSeq++; 
              currentBlockId = reqId + "_" + currentBlockSeq; 
              currentPacket = 1; 
              numberOfBlocks--; 
             } 
            } 
           } 
           else 
           { 
            tempList.add(bp); 
           } 

           for(int k =numberOfBlocks; k>0; k--) 
           { 
            if(fileBlocks.containsKey(currentBlockId)) 
            { 
             int pCount = fileBlocks.get(currentBlockId).getNoOfPackets(); 
             int i; 
             for (i = currentPacket; i <= pCount; i++) 
             { 
              if (fileBlocks.get(currentBlockId).getPackets().containsKey(i)) 
              { 
               processBlockPacket(fileBlocks.get(currentBlockId).getPackets().remove(i)); 
               currentPacket++; 
              } 
              else 
              { 
               break; 
              } 
             } 
             if(i <= pCount) 
             { 
              break; 
             } 
             else 
             { 
              if (fileBlocks.get(currentBlockId).getPackets().size() < 1) 
              { 
               removeFileBlock(currentBlockId); 
               currentBlockSeq++; 
               currentBlockId = reqId + "_" + currentBlockSeq; 
               currentPacket = 1; 
               numberOfBlocks--; 
              } 
             } 
            } 
           } 
          } 
         } 
         else 
         { 
          if(fileBlocks.containsKey(bp.getBlockId())){ 
           fileBlocks.get(bp.getBlockId()).getPackets().put(bp.getPacketSeqNo(), bp); 
          }else{ 
           tempList.add(bp); 
          } 

         } 
        } 
        else{ 
         break; 
        } 
        for(int i=0; i<tempList.size(); i++){ 
         if(fileBlocks.containsKey(tempList.get(i).getBlockId())){ 
          BlockPacket temp = tempList.remove(i); 
          fileBlocks.get(temp.getBlockId()).getPackets().put(temp.getPacketSeqNo(), temp); 
         } 
        } 
       } 
       System.out.println("CLOSING FILE...."); 
       this.isCompleted.put(true); 
       inChannel.force(true); 
       inChannel.close(); 
      } 
      catch (FileNotFoundException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      catch (IOException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      catch (InterruptedException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

隨着-XX:+ PrintGCDetails開啓,here is a sample log

任何評論/幫助表示讚賞。

+1

我會檢查JVM GC,網絡帶寬利用率,在服務器上讀IO。服務器上的文件系統緩存利用率 –

+0

@JigarJoshi感謝您的意見。我根本沒有碰到JVM GC。它以默認值運行。如果網絡帶寬是問題,我認爲,對於15GB的文件也是一個問題。從本地IO讀取塊不是這種情況的瓶頸。我已經檢查過了。 – celik

+0

@celik我認爲Jigar試圖說的是打開GC日誌,看看這可能是一個問題。你如何閱讀這些文件並不是很清楚......可能是一些代碼或你實現它的方式? 'FileChannels'? – Eugene

回答

0

這是因爲內存中的髒頁面配給。由於輸入數據速率高於本地IO刷新吞吐量,因此數據會在內存中累積。一旦達到允許的最大髒頁比率,接收器不接受更多數據。因此,系統與本地IO有關,而不是本例中的網絡。所以回報的減少發生在15GB左右。您可以在

更改一些設置的/etc/sysctl.conf

如:

vm.dirty_background_ratio = 2 
vm.dirty_ratio = 80 
vm.dirty_expire_centisecs = 3000 
vm.dirty_writeback_centisecs = 500 

This可能是一個有用的閱讀。

系統性能仍受本地IO和最大允許髒頁比限制。您可以增加增加骯髒的頁面比率,以僅延遲返回時間的遞減。如果文件/數據較大,它將再次達到這一點。新結果: enter image description here