發送大於15GB的文件時,數據傳輸速率有問題。我有3臺服務器和一臺客戶機。當從客戶端發送文件到服務器時,我將文件分成塊(每個塊通常爲256MB),每個塊在2個服務器上覆制。複製發生在管道方法中。發送數據塊時,每個數據塊被分割成更小的數據包(每個數據包通常爲128 KB),發送到服務器,並在服務器端合併以存儲在硬盤驅動器中。這裏一切都很好。我測試了5GB到50GB文件的系統,增量爲5GB。所有文件的平均寫入大約爲600MB /秒。見下圖。我在這裏與HDFS進行比較。15GB後數據傳輸速率變慢,文件傳輸變大
問題從服務器讀取相同的文件時發生。文件分佈在多個服務器上。例如,我可以從server1讀取block1,從server2讀取block2,依此類推。直觀地說,讀取必須比寫入更快,因爲客戶端並行地從3個服務器讀取。當讀取小於15GB {5GB,10GB,15GG}的文件時,性能約爲1.1GB /秒。讀取大於20GB的文件時出現問題{20GB,25GB,...,50GB}。隨着文件大小的增加,性能會下降。
上圖爲用於讀取50GB文件的基準測試。每個黑點顯示一個單獨的塊讀取時間。正如你所看到的那樣,表現在60-70區塊之後開始下降。有趣的是,這發生在所有大於15GB的文件上,在相同的地方(大約第65塊)放慢。隨着文件大小的增加,緩慢的部分佔主導地位,並且性能越來越差。我覺得在16GB左右有一些障礙。我看到的唯一提示可能會有所幫助,那就是3臺服務器並行地隨機發送數據塊,直到第65位。所以塊的轉移是重疊的。之後,一個服務器按循環順序一次發送。我可以從日誌輸出中看到這一點。這裏還有一些重疊,但沒有之前的第65塊。
我對這個項目使用java 1.8,而netty 4.1.8。作爲tcp服務器。 OS是CentOS 7. 每臺服務器有兩個CPU(Intel(R)Xeon®CPU E5-2650 v3 @ 2.30GHz)= 40個核心 64GB RAM 10 GBit以太網。
我花了很多時間,找不到問題的根本原因。 Java VM,Netty,OS,OS TCP默認值或其他原因可能導致此問題。
服務器端BlockSenderManager
@Override
public void run(){
while(nodeManager.isRunning()){
try
{
BlockRequest br = blockSenders.take();
if(br != null){
executor.execute(new BlockSender(br, this));
}
if(wait.take())
System.out.println(br.getBlockId()+" Delivered");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
BlockSender在服務器端:
@Override
public void run()
{
FileInputStream fis = null;
try
{
java.io.File file = new java.io.File(path+"/" + blockRequest.getBlockId());
fis = new FileInputStream(file);
fSize = file.length();
long rem = fSize;
sendBlockInfo();
int bufSize;
if (fSize < (long) packetSize)
bufSize = (int) fSize;
else
bufSize = packetSize;
int read = 0, packetOrder = 1;
byte[] data;
if(bufSize <= rem)
data = new byte[bufSize];
else
data = new byte[(int)rem];
while ((read = (fis.read(data))) > 0)
{
if (read < 1)
break;
BlockPacket bp = new BlockPacket();
bp.setRequestId(blockRequest.getRequestId());
bp.setBlockId(blockRequest.getBlockId());
bp.setData(data);
bp.setPacketSeqNo(packetOrder);
if(read < bufSize)
{
bp.setIsLastPacket(true);
}
executor.execute(new Sender(bp));
packetOrder++;
if(rem > bufSize)
rem = rem - bufSize;
if(bufSize <= rem)
data = new byte[bufSize];
else
{
data = new byte[(int)rem];
}
}
fis.close();
executor.shutdown();
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public class Sender implements Runnable
{
private final BlockPacket bp;
private final FileBlock fb;
private DataClient dc;
public Sender(BlockPacket bp)
{
this.bp = bp;
this.fb = null;
dc = getDataClient(requestClient);
}
public Sender(FileBlock fb)
{
this.bp = null;
this.fb = fb;
dc = getDataClient(requestClient);
}
@Override
public void run()
{
if (dc != null)
{
if (bp != null)
{
dc.send(bp);
}
else if (fb != null)
{
dc.send(fb);
}
}
}
}
ReceivedPacketProcessor在客戶端
public void processBlockPacket(BlockPacket bp)
{
ByteBuffer buf = ByteBuffer.wrap(bp.getData());
try
{
inChannel.write(buf);
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run()
{
try
{
aFile = new RandomAccessFile(path+"/"+fileName, "rw");
inChannel = aFile.getChannel();
//java.io.File f = new java.io.File(path+"/"+fileName);
//fop = new FileOutputStream(f);
String reqId = file.getFileID();
currentBlockId = reqId + "_" + currentBlockSeq;
while (true)
{
BlockPacket bp = null;
if (numberOfBlocks > 0)
{
try
{
bp = this.blockingQueue.take();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
if (bp.getBlockId().equals(currentBlockId))
{
if (currentPacket == bp.getPacketSeqNo())
{
if(fileBlocks.containsKey(currentBlockId))
{
processBlockPacket(bp);
if(currentPacket < fileBlocks.get(currentBlockId).getNoOfPackets())
currentPacket++;
else
{
if (fileBlocks.get(currentBlockId).getPackets().size() < 1)
{
removeFileBlock(currentBlockId);
currentBlockSeq++;
currentBlockId = reqId + "_" + currentBlockSeq;
currentPacket = 1;
numberOfBlocks--;
}
}
}
else
{
tempList.add(bp);
}
for(int k =numberOfBlocks; k>0; k--)
{
if(fileBlocks.containsKey(currentBlockId))
{
int pCount = fileBlocks.get(currentBlockId).getNoOfPackets();
int i;
for (i = currentPacket; i <= pCount; i++)
{
if (fileBlocks.get(currentBlockId).getPackets().containsKey(i))
{
processBlockPacket(fileBlocks.get(currentBlockId).getPackets().remove(i));
currentPacket++;
}
else
{
break;
}
}
if(i <= pCount)
{
break;
}
else
{
if (fileBlocks.get(currentBlockId).getPackets().size() < 1)
{
removeFileBlock(currentBlockId);
currentBlockSeq++;
currentBlockId = reqId + "_" + currentBlockSeq;
currentPacket = 1;
numberOfBlocks--;
}
}
}
}
}
}
else
{
if(fileBlocks.containsKey(bp.getBlockId())){
fileBlocks.get(bp.getBlockId()).getPackets().put(bp.getPacketSeqNo(), bp);
}else{
tempList.add(bp);
}
}
}
else{
break;
}
for(int i=0; i<tempList.size(); i++){
if(fileBlocks.containsKey(tempList.get(i).getBlockId())){
BlockPacket temp = tempList.remove(i);
fileBlocks.get(temp.getBlockId()).getPackets().put(temp.getPacketSeqNo(), temp);
}
}
}
System.out.println("CLOSING FILE....");
this.isCompleted.put(true);
inChannel.force(true);
inChannel.close();
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
隨着-XX:+ PrintGCDetails開啓,here is a sample log。
任何評論/幫助表示讚賞。
我會檢查JVM GC,網絡帶寬利用率,在服務器上讀IO。服務器上的文件系統緩存利用率 –
@JigarJoshi感謝您的意見。我根本沒有碰到JVM GC。它以默認值運行。如果網絡帶寬是問題,我認爲,對於15GB的文件也是一個問題。從本地IO讀取塊不是這種情況的瓶頸。我已經檢查過了。 – celik
@celik我認爲Jigar試圖說的是打開GC日誌,看看這可能是一個問題。你如何閱讀這些文件並不是很清楚......可能是一些代碼或你實現它的方式? 'FileChannels'? – Eugene