我試圖創建一個索引策略,它要求索引塊與數據塊位於同一數據節點上,以減少數據檢索時的延遲。我設法編寫了讀取與特定文件相關的數據塊的代碼。爲了寫入,我打開一個到特定datanode的套接字連接,寫入我的數據,然後關閉套接字。不幸的是,我不確定使用這種方法寫入數據的位置和方式,因爲當我使用hadoop fs -ls
來查詢HDFS時,我看不到我的數據寫在任何地方(在某些零散文件中可能?!),但我的程序執行沒有任何錯誤。無法堅持'寫'到HDFS中的特定數據節點
這裏是我的代碼:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
import org.apache.hadoop.security. UserGroupInformation;
public class CopyOfChunkedIndexes {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: ChunkedIndexes <input path>");
System.exit(-1);
}
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000"); //for defaulting to HDFS rather than local filesystem
conf.set("hadoop.security.authentication", "simple"); //disable authentication
conf.set("hadoop.security.authorization", "false"); //disable authorization
Job job = Job.getInstance(conf, "Chunked Indexes");
job.setJarByClass(CopyOfChunkedIndexes.class);
Path inputPath = new Path("/user/hadoop-user/sample.txt");
FileInputFormat.setInputPaths(job, inputPath);
try{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem dfs = (DistributedFileSystem) fs;
DFSClient dfsclient = dfs.getClient();
System.out.println("Proceeding for file: " + inputPath.toString());
FileStatus fileStatus = fs.getFileStatus(inputPath);
BlockLocation[] bLocations = fs.getFileBlockLocations(inputPath, 0, fileStatus.getLen());
for(int i = 0; i < bLocations.length; i++)
{
System.out.println("Block[" + + i + "]::");
System.out.println("\nHost(s): ");
String[] temp = bLocations[i].getHosts();
for(int j = 0; j < temp.length; j++)
{
System.out.println(temp[j] + "\t");
}
System.out.println("\nBlock length: " + bLocations[i].getLength() +
"\n\nDataNode(s) hosting this block: ");
temp = bLocations[i].getNames();
for(int j = 0; j < temp.length; j++)
{
System.out.println(temp[j] + "\t");
}
System.out.println("\nOffset: " + bLocations[i].getOffset());
//READING A BLOCK
FSDataInputStream in = fs.open(inputPath);
in.seek(bLocations[i].getOffset());
byte[] buf = new byte[(int)bLocations[i].getLength()];
in.read(buf, (int)bLocations[i].getOffset(), (int)bLocations[i].getLength());
in.close();
System.out.println(new String(buf, "UTF-8"));
System.out.println("--------------------------------------------------------------------------------------------");
}
//WRITE A FILE TO A SPECIFIC DATANODE
for(int i = 0; i < bLocations.length; i++)
{
System.out.println("Block[" + + i + "]::");
String[] temp;
System.out.println("\n\nDataNode(s) hosting this block: "); //Name(s) = datanode addresses
temp = bLocations[i].getNames();
for(int j = 0; j < temp.length; j++)
{
System.out.println(temp[j].split(":")[0] + "\t" + temp[j].split(":")[1]); //host vs. port
}
Socket sock = SocketFactory.getDefault().createSocket();
InetSocketAddress targetAddr = new InetSocketAddress(temp[0].split(":")[0], Integer.parseInt(temp[0].split(":")[1]));
NetUtils.connect(sock, targetAddr, 10000);
sock.setSoTimeout(10000);
OutputStream baseStream = NetUtils.getOutputStream(sock, 10000);
DataOutputStream oStream = new DataOutputStream(new BufferedOutputStream(baseStream, 10000));
oStream.writeBytes("-----------------------------------------Sample text-----------------------------------------------");
sock.close();
System.out.println("Data written, socket closed!");
}
}catch(Exception ex){
ex.printStackTrace();
}
}
}
任何幫助,因爲我要去錯在何處將深表感激!謝謝!
[PS:我在Linux VM上使用Hadoop 2.2.0。我在上面的代碼中禁用了授權/身份驗證,因爲我想直接訪問數據節點(沒有身份驗證開銷),因爲這是用於測試目的。]