2016-06-30 59 views
0

我試圖如此解決類似於this post的問題。我的原始數據是一個包含多個傳感器值(觀測值)的文本文件。每個觀察結果都有一個時間戳,但傳感器名稱只給出一次,而不是每行。但是在一個文件中有幾個傳感器。PySpark:使用newAPIHadoopFile從多行記錄文本文件讀取,映射和縮小

Time MHist::852-YF-007 
2016-05-10 00:00:00 0 
2016-05-09 23:59:00 0 
2016-05-09 23:58:00 0 
2016-05-09 23:57:00 0 
2016-05-09 23:56:00 0 
2016-05-09 23:55:00 0 
2016-05-09 23:54:00 0 
2016-05-09 23:53:00 0 
2016-05-09 23:52:00 0 
2016-05-09 23:51:00 0 
2016-05-09 23:50:00 0 
2016-05-09 23:49:00 0 
2016-05-09 23:48:00 0 
2016-05-09 23:47:00 0 
2016-05-09 23:46:00 0 
2016-05-09 23:45:00 0 
2016-05-09 23:44:00 0 
2016-05-09 23:43:00 0 
2016-05-09 23:42:00 0 
Time MHist::852-YF-008 
2016-05-10 00:00:00 0 
2016-05-09 23:59:00 0 
2016-05-09 23:58:00 0 
2016-05-09 23:57:00 0 
2016-05-09 23:56:00 0 
2016-05-09 23:55:00 0 
2016-05-09 23:54:00 0 
2016-05-09 23:53:00 0 
2016-05-09 23:52:00 0 
2016-05-09 23:51:00 0 
2016-05-09 23:50:00 0 
2016-05-09 23:49:00 0 
2016-05-09 23:48:00 0 
2016-05-09 23:47:00 0 
2016-05-09 23:46:00 0 
2016-05-09 23:45:00 0 
2016-05-09 23:44:00 0 
2016-05-09 23:43:00 0 
2016-05-09 23:42:00 0 

因此,我想配置Hadoop在給出傳感器信息的那些行分割文件。然後從這些行讀取傳感器名稱(例如852-YF-007和852-YF-008),並使用MapReduce相應地讀取每個傳感器的值。

我在Python這樣做(Jupyter筆記本):

sheet = sc.newAPIHadoopFile(
    '/user/me/sample.txt', 
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 
    'org.apache.hadoop.io.LongWritable', 
    'org.apache.hadoop.io.Text', 
    conf={'textinputformat.record.delimiter': 'Time\tMHist'} 
) 

sf = sheet.filter(lambda (k, v): v) 
sf.map(lambda (k, v): v).splitlines()) 

sf.take(50) 

的輸出是這樣的:

[[u'::852-YF-007\t', 
    u'2016-05-10 00:00:00\t0', 
    u'2016-05-09 23:59:00\t0', 
    u'2016-05-09 23:58:00\t0', 
    u'2016-05-09 23:57:00\t0', 
    u'2016-05-09 23:56:00\t0', 
    u'2016-05-09 23:55:00\t0', 
    u'2016-05-09 23:54:00\t0', 
    u'2016-05-09 23:53:00\t0', 
    u'2016-05-09 23:52:00\t0', 
    u'2016-05-09 23:51:00\t0', 
    u'2016-05-09 23:50:00\t0', 
    u'2016-05-09 23:49:00\t0', 
    u'2016-05-09 23:48:00\t0', 
    u'2016-05-09 23:47:00\t0', 
    u'2016-05-09 23:46:00\t0', 
    u'2016-05-09 23:45:00\t0', 
    u'2016-05-09 23:44:00\t0', 
    u'2016-05-09 23:43:00\t0', 
    u'2016-05-09 23:42:00\t0'], 
[u'::852-YF-008\t', 
    u'2016-05-10 00:00:00\t0', 
    u'2016-05-09 23:59:00\t0', 
    u'2016-05-09 23:58:00\t0', 
    u'2016-05-09 23:57:00\t0', 
    u'2016-05-09 23:56:00\t0', 
    u'2016-05-09 23:55:00\t0', 
    u'2016-05-09 23:54:00\t0', 
    u'2016-05-09 23:53:00\t0', 
    u'2016-05-09 23:52:00\t0', 
    u'2016-05-09 23:51:00\t0', 
    u'2016-05-09 23:50:00\t0', 
    u'2016-05-09 23:49:00\t0', 
    u'2016-05-09 23:48:00\t0', 
    u'2016-05-09 23:47:00\t0', 
    u'2016-05-09 23:46:00\t0', 
    u'2016-05-09 23:45:00\t0', 
    u'2016-05-09 23:44:00\t0', 
    u'2016-05-09 23:43:00\t0', 
    u'2016-05-09 23:42:00\t0']] 

我的問題是,如何進一步處理此來提取傳感器名稱,並且具有該傳感器的價值線。有點喜歡

852-YF-007 --> array of sensor_lines 
852-YF-008 --> array of sensor_lines 

線條本身將被分割爲時間戳和值。但我更感興趣的是從行中分割傳感器名稱。

回答

1

我個人:

  • ::

    sheet = sc.newAPIHadoopFile(
        path, 
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 
        'org.apache.hadoop.io.LongWritable', 
        'org.apache.hadoop.io.Text', 
        conf={'textinputformat.record.delimiter': 'Time\tMHist::'} 
    ) 
    
  • 下降鍵延長分隔符:

    values = sheet.values() 
    
  • 過濾掉空項

    non_empty = values.filter(lambda x: x) 
    
  • 分裂:

    grouped_lines = non_empty.map(str.splitlines) 
    
  • 獨立的鍵和值:

    from operator import itemgetter 
    
    pairs = grouped_lines.map(itemgetter(0, slice(1, None))) 
    
  • 終於拆分值:

    pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs]) 
    

所有這一切都可以做一個單一的功能當然是:

import dateutil.parser 

def process(pair): 
    _, content = pair 
    clean = [x.strip() for x in content.strip().splitlines()] 
    if not clean: 
     return [] 
    k, vs = clean[0], clean[1:] 
    for v in vs: 
     try: 
      ds, x = v.split("\t") 
      yield k, (dateutil.parser.parse(ds), float(x)) # or int(x) 
     except ValueError: 
      pass 

sheet.flatMap(process) 
+0

有趣的是,你可以嘗試使它更加明確。 – zero323