2014-03-31 40 views
0

我試圖創建一個索引策略,它要求索引塊與數據塊位於同一數據節點上,以減少數據檢索時的延遲。我設法編寫了讀取與特定文件相關的數據塊的代碼。爲了寫入,我打開一個到特定datanode的套接字連接,寫入我的數據,然後關閉套接字。不幸的是,我不確定使用這種方法寫入數據的位置和方式,因爲當我使用hadoop fs -ls來查詢HDFS時,我看不到我的數據寫在任何地方(在某些零散文件中可能?!),但我的程序執行沒有任何錯誤。無法堅持'寫'到HDFS中的特定數據節點

這裏是我的代碼:

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.BlockLocation; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hdfs.DFSClient; 
import org.apache.hadoop.hdfs.DistributedFileSystem; 
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.net.NetUtils; 
import org.apache.hadoop.security.UserGroupInformation; 
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 
import org.apache.hadoop.hdfs.server.datanode.DataNode; 
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; 

import java.io.BufferedOutputStream; 
import java.io.DataOutputStream; 
import java.io.OutputStream; 
import java.io.RandomAccessFile; 
import java.net.InetSocketAddress; 
import java.net.Socket; 
import java.nio.channels.FileChannel; 
import java.nio.file.OpenOption; 
import java.nio.file.StandardOpenOption; 
import java.util.Random; 
import java.nio.ByteBuffer; 

import javax.net.SocketFactory; 

import org.apache.hadoop.security. UserGroupInformation; 

public class CopyOfChunkedIndexes { 

    public static void main(String[] args) throws Exception { 
     if (args.length != 1) { 
      System.err.println("Usage: ChunkedIndexes <input path>"); 
      System.exit(-1); 
     } 


     Configuration conf = new Configuration(); 
     conf.set("fs.default.name", "hdfs://localhost:9000");    //for defaulting to HDFS rather than local filesystem 
     conf.set("hadoop.security.authentication", "simple");    //disable authentication 
     conf.set("hadoop.security.authorization", "false");     //disable authorization 

     Job job = Job.getInstance(conf, "Chunked Indexes"); 
     job.setJarByClass(CopyOfChunkedIndexes.class); 


     Path inputPath = new Path("/user/hadoop-user/sample.txt"); 
     FileInputFormat.setInputPaths(job, inputPath); 

     try{ 
      FileSystem fs = FileSystem.get(conf); 
      DistributedFileSystem dfs = (DistributedFileSystem) fs; 
      DFSClient dfsclient = dfs.getClient(); 



      System.out.println("Proceeding for file: " + inputPath.toString()); 

      FileStatus fileStatus = fs.getFileStatus(inputPath); 
      BlockLocation[] bLocations = fs.getFileBlockLocations(inputPath, 0, fileStatus.getLen()); 


      for(int i = 0; i < bLocations.length; i++) 
      { 
       System.out.println("Block[" + + i + "]::"); 
       System.out.println("\nHost(s): "); 

       String[] temp = bLocations[i].getHosts(); 
       for(int j = 0; j < temp.length; j++) 
       { 
        System.out.println(temp[j] + "\t"); 
       } 

       System.out.println("\nBlock length: " + bLocations[i].getLength() + 
            "\n\nDataNode(s) hosting this block: "); 

       temp = bLocations[i].getNames(); 
       for(int j = 0; j < temp.length; j++) 
       { 
        System.out.println(temp[j] + "\t"); 
       } 

       System.out.println("\nOffset: " + bLocations[i].getOffset()); 

       //READING A BLOCK 
       FSDataInputStream in = fs.open(inputPath); 
       in.seek(bLocations[i].getOffset()); 

       byte[] buf = new byte[(int)bLocations[i].getLength()]; 
       in.read(buf, (int)bLocations[i].getOffset(), (int)bLocations[i].getLength()); 
       in.close(); 

       System.out.println(new String(buf, "UTF-8")); 
       System.out.println("--------------------------------------------------------------------------------------------"); 
      } 


      //WRITE A FILE TO A SPECIFIC DATANODE 
      for(int i = 0; i < bLocations.length; i++) 
      { 
       System.out.println("Block[" + + i + "]::"); 
       String[] temp; 

       System.out.println("\n\nDataNode(s) hosting this block: ");      //Name(s) = datanode addresses 

       temp = bLocations[i].getNames(); 
       for(int j = 0; j < temp.length; j++) 
       { 
        System.out.println(temp[j].split(":")[0] + "\t" + temp[j].split(":")[1]);  //host vs. port 
       } 

       Socket sock = SocketFactory.getDefault().createSocket(); 
       InetSocketAddress targetAddr = new InetSocketAddress(temp[0].split(":")[0], Integer.parseInt(temp[0].split(":")[1])); 
       NetUtils.connect(sock, targetAddr, 10000); 
       sock.setSoTimeout(10000); 

       OutputStream baseStream = NetUtils.getOutputStream(sock, 10000); 
       DataOutputStream oStream = new DataOutputStream(new BufferedOutputStream(baseStream, 10000)); 
       oStream.writeBytes("-----------------------------------------Sample text-----------------------------------------------"); 


       sock.close(); 
       System.out.println("Data written, socket closed!"); 
      } 

     }catch(Exception ex){ 
      ex.printStackTrace(); 
     } 

    } 
} 

任何幫助,因爲我要去錯在何處將深表感激!謝謝!

[PS:我在Linux VM上使用Hadoop 2.2.0。我在上面的代碼中禁用了授權/身份驗證,因爲我想直接訪問數據節點(沒有身份驗證開銷),因爲這是用於測試目的。]

回答

0

有一個叫做BlockPlacementPolicy的類,它可以在理論上擴展定製HDFS如何選擇數據節點。儘管哈克,但這種方法可能適用於那些希望在將來做類似事情的人,並且會在這個問題上遇到困難。

0

所有編輯都會被集羣丟棄,因爲您沒有通過namenode。所有修改都被視爲文件損壞。

Hadoop已經爲您完成這項工作:當您希望在Hadoop集羣上執行分佈式任務時,會爲任務加載最近的數據。例如,如果您有一個elasticsearch集羣和一個Hadoop集羣共享相同的硬件,那麼您只需創建一個將使用本地elasticsearch節點的mapreduce任務,這就是:您的數據沒有網絡舞蹈,所有任務都會加載部分數據集並將它們推送到本地elasticsearch實例。

享受!