2012-05-30 118 views
0

我們希望保證消費者進程在生產者完成寫入HDFS文件後讀取由生產者創建的數據。以下是應用程序中使用的一種方法,我們正在嘗試改進。HDFS保證讀/寫文件的數據

監製:

private void produce(String file, int sleepSeconds) throws Exception { 
     Configuration conf = new Configuration(); 
     conf.addResource(new Path(
       "C:\\dev\\software\\hadoop-0.22.0-src\\conf\\core-site.xml")); 
     conf.set("fs.defaultFS", "hdfs://XXX:9000"); 
     FileSystem fileSystem = FileSystem.get(conf); 

     Path path = new Path(file); 
     if (fileSystem.exists(path)) { 
      fileSystem.delete(path, false); 
     } 
     System.out.println("Creating file"); 
     FSDataOutputStream out = fileSystem.create(path); 
     System.out.println("Writing data"); 
     out.writeUTF("--data--"); 
     System.out.println("Sleeping"); 
     Thread.sleep(sleepSeconds * 1000L); 
     System.out.println("Writing data"); 
     out.writeUTF("--data--"); 
     System.out.println("Flushing"); 
     out.flush(); 
     out.close(); 
     fileSystem.close(); 
     System.out.println("Releasing lock on file"); 
    } 

消費者:

private void consume(String file) throws Exception { 
     Configuration conf = new Configuration(); 
     conf.addResource(new Path(
       "C:\\dev\\software\\hadoop-0.22.0-src\\conf\\core-site.xml")); 
     conf.set("fs.defaultFS", "hdfs://XXX:9000"); 
     FileSystem fileSystem = FileSystem.get(conf); 

     Path path = new Path(file); 
     if (fileSystem.exists(path)) { 
      System.out.println("File exists"); 
     } else { 
      System.out.println("File doesn't exist"); 
      return; 
     } 
     FSDataOutputStream fsOut = null; 
     while (fsOut == null) { 
      try { 
       fsOut = fileSystem.append(path); 
      } catch (IOException e) { 
       Thread.sleep(1000); 
      } 
     } 
     FSDataInputStream in = fileSystem.open(path); 
     OutputStream out = new BufferedOutputStream(System.out); 
     byte[] b = new byte[1024]; 
     int numBytes = 0; 
     while ((numBytes = in.read(b)) > 0) { 
      out.write(b, 0, numBytes); 
     } 
     in.close(); 
     out.close(); 
     if (fsOut != null) 
      fsOut.close(); 
     fileSystem.close(); 
     System.out.println("Releasing lock on file"); 
    } 

如何是應該運行的過程如下要求:

  1. 生產者進程(不是線程)啓動。 thread.sleep模擬一堆數據庫調用和業務邏輯

  2. 消費者進程(不是線程)在不同的機器中啓動,該機器會阻止生產者釋放其鎖。雖然消費者讀取,沒有其他進程應該修改數據文件

對我們如何去改善這個代碼/設計在同一時間低保讀者不丟失數據,使用HDFS的Java API有何建議?

+0

生產者生成多少個文件? –

+0

1個生產商產生的文件 –

回答

1

一種解決方案是將寫入用臨時後綴/前綴的文件,和重命名文件一次寫入完成:

例如輸出到文件FILE1.TXT:

  • 寫到一個名爲.file1.txtfile1.txt.tmp
  • 文件關閉了該文件一旦完成
  • 重命名.file1.txt或file1.txt.tmpfile1.txt
  • 同時,消費者正在等待file1.txt變得可用
+0

謝謝,這絕對是板上的一個選項 –