2017-09-22 60 views
0

我有一個很煩人的文件集結構,像這樣:星火/蜂巢 - 組數據爲「樞軸表」格式

userId string, 
eventType string, 
source string, 
errorCode string, 
startDate timestamp, 
endDate timestamp 

每個文件可能包含的每EVENTID記錄的任意數,用不同eventTypes和來源,以及不同的代碼和每個的開始/結束日期。

是否有蜂巢的方式或火花組所有這些一起userId的,有點像一個key-value,其中值是與用戶id相關聯的所有字段列表?具體來說,我希望它通過eventType和源鍵入。基本上我想交易寬度的表格長度,有點像數據透視表。我的目標是最終將存儲爲Apache Parquet或Avro文件格式,以便將來進行更快速的分析。

下面是一個例子:

源數據:

userId, eventType, source, errorCode, startDate, endDate 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452' 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775' 
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229' 
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976' 
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623' 
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777' 

目標:

userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 

字段名稱或順序並不重要,只要我能區分它們。

我已經試過兩種方法已經得到這個工作:

  1. 手動從表中選擇的每個組合,並加入到主數據集。這很好,並行性好,但不允許關鍵字段的任意數量的值,並且需要預定義模式。
  2. 使用Spark創建一個key:value記錄的字典,其中每個值都是一個字典。基本上遍歷數據集,如果字典不存在,則向字典添加一個新的鍵;如果該字段不存在,則爲該條目添加一個新的字段到值字典中。這種方法非常精美,但速度非常慢,如果完全平行的話也不會很好地並行化。另外我不確定這是否是Avro/Parquet兼容格式。

是否有任何替代這兩個方法?甚至比我的目標更好的結構?

回答

1

你想有這樣的事情?

from pyspark.sql.functions import struct, col, create_map, collect_list 

df = sc.parallelize([ 
    ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'], 
    ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'], 
    ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'], 
    ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'], 
    ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'], 
    ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'], 
    ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'] 
]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate')) 
df.show() 

new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\ 
    withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')])) 

new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail')) 
new_df.show() 
+0

謝謝!這似乎可以工作!我在我的實時數據集上嘗試過它,就它的分組方式而言,它幾乎返回了我想要的結果。儘管如此,我並不熟悉「地圖列表」數據結構,並且在操作的任何地方都找不到任何記錄。我想後續的問題是,我如何與這個數據結構交互?作爲一個例子,我如何獲得特定用戶的CHARGE/MERCH屬性? –

+1

很高興它幫助!我認爲這可能有助於開始跟進問題:'從itertools導入鏈; new_df.printSchema(); 。 RDD1集= new_df.where(COL( '用戶id')== '552113')選擇( 'event_detail')rdd.flatMap(拉姆達X:鏈(*(X)))。 keys = rdd1.map(lambda x:x.keys())。collect(); values = rdd1.map(lambda x:x.values())。collect();''keys'和'values'是需要研究的。 – Prem

0

你可以試試這個,給你的意見,

>>> from pyspark.sql import SparkSession 
>>> from pyspark.sql import functions as F 
>>> from pyspark.sql.types import * 

>>> spark = SparkSession.builder.getOrCreate() 

>>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')] 

>>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate']) 
>>> df.show(10,False) 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|userId|eventType|source |errorCode|startDate    |endDate    | 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|552113|ACK  |PROVIDER|0  |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452| 
|284723|ACK  |PROVIDER|0  |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775| 
|552113|TRADE |MERCH |0  |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229| 
|552113|CHARGE |MERCH |0  |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976| 
|284723|REFUND |MERCH |1  |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|CLOSE |PROVIDER|0  |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623| 
|284723|CLOSE |PROVIDER|0  |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777| 
+------+---------+--------+---------+-----------------------+-----------------------+ 

>>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list 
>>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list')) 

>>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId 

>>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output 

>>> def f(Vals): 
     aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above 
     if len(aggVals) == 5: 
      return aggVals 
     else: 
      missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null 
      for idx,val in missngval: 
       aggVals.insert(idx,[None]*5) 
      return aggVals 

>>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType()))) 
>>> df3 = df2.select('userId',myudf2('agg_list').alias('values')) 

>>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array] 

>>> oldnames = df4.columns 
>>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch'] 

>>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns 
>>> finalDF.show()  
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+ 
|userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider |endDateAckProvider  |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch |endDateTradeMerch  |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch |endDateChargeMerch  |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch |endDateRefundMerch  | 

|284723|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null    |null   |null    |null     |null     |null    |null    |null    |null     |null     |CLOSE     |PROVIDER   |0      |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND    |MERCH   |1     |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE    |MERCH   |0     |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE    |MERCH   |0     |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE     |PROVIDER   |0      |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null    |null    |null    |null     |null     | 
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+