2015-06-30 43 views
7

我有多個大文件(> 5M行數據),它們按唯一時間戳排序。除少數幾行隨機丟失的行外(< 1000),所有文件幾乎包含所有相同的時間戳。我想將所有文件中的數據高效地加入到每個時間戳一行的單個數據集中,最好使用一個生成器。Python 3加入排序的大文件中的數據

除缺少的行,我可以只使用ZIP:

def get_data(list_of_iterables): 
    for data in zip(*list_of_iterables): 
     yield data 

然而,由於有一些遺漏行,我需要加入時間戳的數據,而不是簡單地荏苒。我可以簡單地忽略每個文件中沒有匹配時間戳的行。

是否有一種pythonic的方式來實現這幾個功能?

我的方法是依次推進每個迭代,直到它的時間戳不再小於該組迭代的最大時間戳。每當所有時間戳匹配時,產生一排並推進所有迭代。但是,當我試圖實施這種方法時,邏輯似乎很混亂。

編輯:性能。

實現需要開始返回行,而不是先讀取所有數據到內存中。讀取所有數據需要一段時間,並且很多時候只需要檢查第一行數據。

+1

請添加一些示例數據 – synner

+0

我建議你看看熊貓(http://pandas.pydata.org/)。它有工具可以完成您提到的確切類型的合併。 – Ivan

+0

@Ivan當我搜索這個問題時,我注意到了熊貓圖書館。該庫對於我正在執行的許多操作看起來非常有用。我目前嚴重依賴於Numpy,但當我花一些時間時,我會去調查熊貓。 – RandomBits

回答

0

我的第一個猜測是使用帶時間戳作爲鍵的字典和行中其餘數據作爲值,然後對於每個文件中的每一行,只有在具有相同時間戳的項目時纔將其添加到字典(關鍵)尚不存在。但是,如果你確實在處理巨大的數據集(在這種情況下它看起來像你),那麼你在原始問題中提到的方法將是你的最佳選擇。

+0

我可以看到這將如何工作,但似乎我對性能的評論,因爲這種方法似乎需要首先將所有數據讀入內存。 – RandomBits

+0

@RandomBits好的,謝謝你的額外信息。在那種情況下,我想不出一個比你原來的問題更好的解決問題的方法。 –

0

好吧,我對這個問題感興趣(最近有一個類似的問題),並對它做了一些工作。你可以嘗試這樣的事情:

import io 
import datetime 
from csv import DictReader 

file0 = io.StringIO('''timestamp,data 
2015-06-01 10:00, data00 
2015-06-01 11:00, data01 
2015-06-01 12:00, data02 
2015-06-01 12:30, data03 
2015-06-01 13:00, data04 
''') 

file1 = io.StringIO('''timestamp,data 
2015-06-01 09:00, data10 
2015-06-01 10:30, data11 
2015-06-01 11:00, data12 
2015-06-01 12:30, data13 
''') 

class Data(object): 

    def __init__(self): 
     self.timestamp = None 
     self.data = None 

    @staticmethod 
    def new_from_dict(dct=None): 
     if dct is None: 
      return None 
     ret = Data() 
     ret.data = dct['data'].strip() 
     ret.timestamp = datetime.datetime.strptime(dct['timestamp'], 
                '%Y-%m-%d %H:%M') 
     return ret 

    def __lt__(self, other): 
     if other is None: 
      return False 
     return self.timestamp < other.timestamp 

    def __gt__(self, other): 
     if other is None: 
      return False 
     return self.timestamp > other.timestamp 

    def __str__(self): 
     ret = '{0.__class__.__name__}'.format(self) +\ 
       '(timestamp={0.timestamp}, data={0.data})'.format(self) 
     return ret 


def next_or_none(reader): 
    try: 
     return Data.new_from_dict(next(reader)) 
    except StopIteration: 
     return None 


def yield_in_order(reader0, reader1): 

    data0 = next_or_none(reader0) 
    data1 = next_or_none(reader1) 

    while not data0 == data1 == None: 

     if data0 is None: 
      yield None, data1 
      data1 = next_or_none(reader1) 
      continue 
     if data1 is None: 
      yield data0, None 
      data0 = next_or_none(reader0) 
      continue 

     while data0 < data1: 
      yield data0, None 
      data0 = next_or_none(reader0) 

     while data0 > data1: 
      yield None, data1 
      data1 = next_or_none(reader1) 

     if data0 is not None and data1 is not None: 
      if data0.timestamp == data1.timestamp: 
       yield data0, data1 
       data0 = next_or_none(reader0) 
       data1 = next_or_none(reader1) 

csv0 = DictReader(file0) 
csv1 = DictReader(file1) 

FMT = '{!s:50s} | {!s:50s}' 
print(FMT.format('file0', 'file1')) 
print(101*'-') 
for dta0, dta1 in yield_in_order(csv0, csv1): 
    print(FMT.format(dta0, dta1)) 

這僅用於2個文件。

1

我最後寫了下面的代碼,以解決我的問題,這原來是打火機比我預期:

def advance_values(iters): 
    for it in iters: 
     yield next(it) 

def align_values(iters, values, key): 
    for it, value in zip(iters, values): 
     while (value[0],value[1]) < key: 
      value = next(it) 
     yield value 

def merge_join(*iters): 
    values = list(advance_values(iters)) 
    while True: 
     if len(values) != len(iters): 
      return 
     tms = [(v[0],v[1]) for v in values] 
     max_tm = max(tms) 
     if all((v[0],v[1]) == max_tm for v in values): 
      yield values 
      values = list(advance_values(iters)) 
     else: 
      values = list(align_values(iters, values, max_tm)) 
+0

對不起。更新了我的答案,現在只看到你自己發佈了一個解決方案... –

1

如果每個迭代中list_of_iterablestimestamp進行排序,那麼你可以使用heapq.merge()合併它們考慮到數據可能存在的差距和itertools.groupby()以相同的時間戳組記錄:

from heapq import merge 
from itertools import groupby 
from operator import attrgetter 

for timestamp, group in groupby(merge(*list_of_iterables), 
           key=attrgetter('timestamp')): 
    print(timestamp, list(group)) # same timestamp 

的實施產生羣體沒有讀取所有的數據到我第一次。