2016-05-24 115 views
2

我想將一些代碼從熊貓移植到(py)Spark。不幸的是,我已經失敗了輸入部分,我想讀取二進制數據並將其放入Spark Dataframe。如何從hdfs將二進制文件讀入Spark數據框?

到目前爲止,我使用fromfile從numpy的:

dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')]) 
data = np.fromfile('binary_file.bin', dtype=dt) 
data=data[1:]           #throw away header 
df_bin = pd.DataFrame(data, columns=data.dtype.names) 

但對於星火我找不到如何做到這一點。到目前爲止,我的解決方法是使用csv-Files而不是二進制文件,但這不是理想的解決方案。我知道我不應該用火花來使用numpy的fromfile。 如何讀取已加載到hdfs中的二進制文件?

我想是這樣

fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin]) 
fileRDD.map(lambda x: ???) 

但它給我一個No such file or directory錯誤。

我看到這個問題: spark in python: creating an rdd by loading binary data with numpy.fromfile 但只有當我有文件存儲在驅動程序節點的家中才有效。

回答

2

因此,對於任何人,以星火開始爲我和絆uopn二進制文件,這裏是我如何解決它:

dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'), 
      ('value','>f8'),('pollID','>i2')]) 
schema=StructType([StructField('idx_metric',IntegerType(),False), 
        StructField('idx_resource',IntegerType(),False), 
        StructField('date',IntegerType),False), 
        StructField('value',DoubleType(),False), 
        StructField('pollID',IntegerType(),False)]) 

filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary') 

def read_array(rdd): 
    #output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped 
    array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes) 
    array=array.newbyteorder().byteswap() # big Endian 
    return array.tolist() 

unzipped=filenameRdd.flatMap(read_array) 
bin_df=sqlContext.createDataFrame(unzipped,schema) 

現在你可以做你與你的數據幀星火想要的任何花哨的東西。

2

編輯: 請仔細閱讀使用sc.binaryFiles的這裏提到: https://stackoverflow.com/a/28753276/5088142


嘗試使用:

hdfs://machine_host_name:8020/user/bin_file1.bin 

你的主機名fs.defaultFScore-site.xml

+0

fs.defaultFS說nameservice1,而且還與'HDFS :// nameservice1:8020/user/bin_file1.bin'我仍然收到文件未找到錯誤。 它可以與我放在地圖中的功能鏈接? 'DEF read_bin: 張開( 「MYFILE」, 「RB」)爲f: 字節= f.read(1) 而字節= 「」: 字節= f.read(1)' – WilliamEllisWebb

+0

在哪個行你有沒有找到「文件未找到錯誤」?你打算如何使用「read_bin」函數?打開的方法似乎不適用於HDFS .... – Yaron

+0

錯誤位於read_bin的第2行。你是對的,開放的方法不喜歡HDFS。我正在尋找類似於'sc.textfile(filename).map(lambda line:line.split(','))。map(lambda x:(int(x [0],int(x [1] ....)' – WilliamEllisWebb

0

我最近做了這樣的事情:

from struct import unpack_from 

# creates an RDD of binaryrecords for determinted record length 
binary_rdd = sc.binaryRecords("hdfs://" + file_name, record_length) 

# map()s each binary record to unpack() it 
unpacked_rdd = binary_rdd.map(lambda record: unpack_from(unpack_format, record)) 

# registers a data frame with this schema; registerTempTable() it as table_name 
raw_df = sqlc.createDataFrame(unpacked_rdd, sparkSchema) 
raw_df.registerTempTable(table_name) 

其中unpack_format和sparkSchema必須是 「同步」。

我有一個腳本,可以動態生成unpack_format和sparkSchema變量;兩者在同一時間。(這是一個更大的代碼庫的一部分,所以不會在這裏發帖的readbility)

unpack_format和sparkSchema可以定義爲以下,例如,

from pyspark.sql.types import * 

unpack_format = '<' # '<' means little-endian: https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment 
sparkSchema = StructType() 
record_length = 0 

unpack_format += '35s' # 35 bytes that represent a character string 
sparkSchema.add("FirstName", 'string', True) # True = nullable 
record_length += 35 

unpack_format += 'H' # 'H' = unsigned 2-byte integer 
sparkSchema.add("ZipCode", 'integer', True) 
record_length += 2 

# and so on for each field.. 
相關問題