2017-04-20 46 views
0

我在hadoop歸檔文件中有大量數據.har格式。因爲,har不包含任何壓縮,我試圖進一步gzip它並存儲在HDFS中。我唯一可以無誤地工作的是:使用Spark在HDFS上對Gzipping Har文件進行Gzipping

harFile.coalesce(1, "true") 
.saveAsTextFile("hdfs://namenode/archive/GzipOutput", classOf[org.apache.hadoop.io.compress.GzipCodec]) 
//`coalesce` because Gzip isn't splittable. 

但是,這並沒有給我正確的結果。生成一個Gzipped文件,但輸出無效(單行說rdd類型等)

任何幫助將不勝感激。我也接受任何其他方法。

謝謝。

+0

HAR檔案中有什麼樣的內容 - CSV,JSON,非結構化文本(例如日誌),二進制文件?你有沒有考慮將每個HAR分別存檔,將每個文件內部壓縮並重新歸檔?如果不是二進制文件,您是否考慮將每個HAR(或多個HAR)的內容合併爲一個帶有MR或Spark作業的單個GZip(或BZipped)文件?如果結構化,您是否考慮將每個HAR(或多個HAR)的內容合併爲GZip壓縮等柱狀格式,例如Parquet或ORC? –

+0

@SamsonScharfrichter har將包含平面文本文件或鑲木地板文件。沒有像xmls,但我不希望數據分裂。 Gzip每個文件是一個問題,因爲har可能包含350多個目錄,每個目錄內都會有一個文件。我不知道該怎麼做。我嘗試使用PIG使用GZip壓縮來壓縮該單個har文件。它壓縮成功但創建了部分文件,因爲GZip不可拆分,所以這又會是不可取的。最後,不能合併多個HAR,因爲每個har都需要單獨進行gzip壓縮。 – philantrovert

回答

1

用於創建現有HDFS文件的壓縮版本的Java代碼片段。

在一個文本編輯器中,我匆匆建立了一段時間以前寫的一個Java應用程序,因此沒有經過測試;預計會出現一些錯別字和差距。

// HDFS API 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.security.UserGroupInformation; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.FileStatus; 
// native Hadoop compression libraries 
import org.apache.hadoop.io.compress.CompressionCodecFactory; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.Compressor; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.io.compress.BZip2Codec; 
import org.apache.hadoop.io.compress.SnappyCodec; 
import org.apache.hadoop.io.compress.Lz4Codec; 

.............. 

    // Hadoop "Configuration" (and its derivatives for HDFS, HBase etc.) constructors try to auto-magically 
    // find their config files by searching CLASSPATH for directories, and searching each dir for hard-coded 
    // name "core-site.xml", plus "hdfs-site.xml" and/or "hbase-site.xml" etc. 
    // WARNING - if these config files are not found, the "Configuration" reverts to hard-coded defaults without 
    // any warning, resulting in bizarre error messages later > let's run some explicit controls here 
    Configuration cnfHadoop = new Configuration() ; 
    String propDefaultFs =cnfHadoop.get("fs.defaultFS") ; 
    if (propDefaultFs ==null || ! propDefaultFs.startsWith("hdfs://")) 
    { throw new IllegalArgumentException(
       "HDFS configuration is missing - no proper \"core-site.xml\" found, please add\n" 
       +"directory /etc/hadoop/conf/ (or custom dir with custom XML conf files) in CLASSPATH" 
       ) ; 
    } 
/* 
    // for a Kerberised cluster, either you already have a valid TGT in the default 
    // ticket cache (via "kinit"), or you have to authenticate by code 
    UserGroupInformation.setConfiguration(cnfHadoop) ; 
    UserGroupInformation.loginUserFromKeytab("[email protected]", "/some/path/to/user.keytab") ; 
*/ 
    FileSystem fsCluster =FileSystem.get(cnfHadoop) ; 
    Path source = new Path("/some/hdfs/path/to/XXX.har") ; 
    Path target = new Path("/some/hdfs/path/to/XXX.har.gz") ; 

    // alternative: "BZip2Codec" for better compression (but higher CPU cost) 
    // alternative: "SnappyCodec" or "Lz4Codec" for lower compression (but much lower CPU cost) 
    CompressionCodecFactory codecBootstrap = new CompressionCodecFactory(cnfHadoop) ; 
    CompressionCodec codecHadoop =codecBootstrap.getCodecByClassName(GzipCodec.class.getName()) ; 
    Compressor compressorHadoop =codecHadoop.createCompressor() ; 

    byte[] buffer = new byte[16*1024*1024] ; 
    int bufUsedCapacity ; 
    InputStream sourceStream =fsCluster.open(source) ; 
    OutputStream targetStream =codecHadoop.createOutputStream(fsCluster.create(target, true), compressorHadoop) ; 
    while ((bufUsedCapacity =sourceStream.read(buffer)) >0) 
    { targetStream.write(buffer, 0, bufUsedCapacity) ; } 
    targetStream.close() ; 
    sourceStream.close() ; 

.............. 
+0

謝謝答案參孫。將嘗試並更新。 – philantrovert

+0

所以我試了一下,但沒有成功,顯然'har'文件是一個目錄,你不能壓縮目錄。你會建議在HDFS上創建一個'tar'而不是'har'(使用'org.apache.commons.compress')然後對其進行壓縮? – philantrovert

+0

Duh ...看起來HAR確實是一個奇怪的野獸。但是,如果HDFS聲明它是一個目錄,並允許你訪問單個文件,那麼你應該能夠從HAR(使用標準的'java.util.zip.ZipOutputStream'和'。putNextEntry()')構建一個ZIP文件。等等) - _disclaimer:我不是古老的TAR格式_的忠實粉絲。 –

相關問題