0
我們希望保證消費者進程在生產者完成寫入HDFS文件後讀取由生產者創建的數據。以下是應用程序中使用的一種方法,我們正在嘗試改進。HDFS保證讀/寫文件的數據
監製:
private void produce(String file, int sleepSeconds) throws Exception {
Configuration conf = new Configuration();
conf.addResource(new Path(
"C:\\dev\\software\\hadoop-0.22.0-src\\conf\\core-site.xml"));
conf.set("fs.defaultFS", "hdfs://XXX:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path path = new Path(file);
if (fileSystem.exists(path)) {
fileSystem.delete(path, false);
}
System.out.println("Creating file");
FSDataOutputStream out = fileSystem.create(path);
System.out.println("Writing data");
out.writeUTF("--data--");
System.out.println("Sleeping");
Thread.sleep(sleepSeconds * 1000L);
System.out.println("Writing data");
out.writeUTF("--data--");
System.out.println("Flushing");
out.flush();
out.close();
fileSystem.close();
System.out.println("Releasing lock on file");
}
消費者:
private void consume(String file) throws Exception {
Configuration conf = new Configuration();
conf.addResource(new Path(
"C:\\dev\\software\\hadoop-0.22.0-src\\conf\\core-site.xml"));
conf.set("fs.defaultFS", "hdfs://XXX:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path path = new Path(file);
if (fileSystem.exists(path)) {
System.out.println("File exists");
} else {
System.out.println("File doesn't exist");
return;
}
FSDataOutputStream fsOut = null;
while (fsOut == null) {
try {
fsOut = fileSystem.append(path);
} catch (IOException e) {
Thread.sleep(1000);
}
}
FSDataInputStream in = fileSystem.open(path);
OutputStream out = new BufferedOutputStream(System.out);
byte[] b = new byte[1024];
int numBytes = 0;
while ((numBytes = in.read(b)) > 0) {
out.write(b, 0, numBytes);
}
in.close();
out.close();
if (fsOut != null)
fsOut.close();
fileSystem.close();
System.out.println("Releasing lock on file");
}
如何是應該運行的過程如下要求:
生產者進程(不是線程)啓動。 thread.sleep模擬一堆數據庫調用和業務邏輯
消費者進程(不是線程)在不同的機器中啓動,該機器會阻止生產者釋放其鎖。雖然消費者讀取,沒有其他進程應該修改數據文件
對我們如何去改善這個代碼/設計在同一時間低保讀者不丟失數據,使用HDFS的Java API有何建議?
生產者生成多少個文件? –
1個生產商產生的文件 –