2017-07-28 73 views
1

我試圖分析數百萬條遭受不幸缺陷折磨的日誌文件。與單個事件相關的數據可以在日誌條目之間進行拆分,但不存在將數據跨多行重新排列爲單行的直接鏈接;相反,我必須推斷這種關係。根據上一次某些條件爲真,將數據列中的數據與另一行對齊

背景簡介:

  1. 有我在乎的是將多次修改
  2. 有8個線程,將收集的隨機的事情之一,並開始處理的線程池4個對象。此事件由thing_n,A,BC標識,全部具有非空值,我也可以從此記錄的事件中獲取線程號。
  3. 在日誌後面的某個地方,會有一個日誌條目,說明線程進行了多少次迭代。此事件將不包含任何其他信息(即它不會報告thing_n它動手術)
  4. thread_num/thing_n配對會改變不斷
  5. 任意數量的線程可以登錄任何數量的點2和3之間的事件,所以你不能簡單地將.shift()Iterations列重新排列成單行。

不知怎的,我需要重新調整與以前的(只有前一個)排在thing_I_care_aboutABC不爲空,且thread_num匹配的迭代列。有時間戳(不在我的MCVE中),如果有幫助,所有事件按升序排序。

例輸入:

thing_I_care_about thread_num A B C  Iterations 
0 thing_1    2    X X X  NaN 
1 NaN     2    X X NaN NaN 
2 thing_2    3    NaN X X  NaN 
3 NaN     2    NaN NaN NaN 110.0 
4 thing_3    7    X X X  NaN 
5 thing_4    5    X X NaN NaN 
6 NaN     7    NaN NaN NaN 150.0 

輸出示例:

thing_I_care_about thread_num A B C  Realigned Iterations 
0 thing_1    2    X X X  110.0 
1 NaN     2    X X NaN NaN 
2 thing_2    3    NaN X X  NaN 
3 NaN     2    NaN NaN NaN NaN 
4 thing_3    7    X X X  150.0 
5 thing_4    5    X X NaN NaN 
6 NaN     7    NaN NaN NaN NaN 

我可以管理一個純Python的方法(下圖),但是這種分析將反覆按需進行,必須處理數百個數百萬次這樣的事件。從概念上講,我能想到的大熊貓這樣做的唯一方法是:

  1. groupby()thread_num,並通過他們的時間戳各組排序
  2. 嘗試以某種方式得到了DF爲每個線程交替notnull([thing_n, A, B, C, thread_num])notnull([thread_num, Iterations])行,以便我可以shift(-1)他們重新調整數據
  3. 不知怎的,扎此回原來的數據幀

不過,我似乎無法使這種方法的進步。有沒有這樣做的聰明方式,還是我堅持在Python中處理這部分?

純Python的方法:

import numpy as np 
import pandas as pd 

raw_data = [['thing_I_care_about', 'thread_num', 'A', 'B', 'C', 'Iterations'], ['thing_1', 2, 'X', 'X', 'X', np.nan], [np.nan, 2, 'X', 'X', np.nan, np.nan], ['thing_2', 3, np.nan, 'X', 'X', np.nan], [np.nan, 2, np.nan, np.nan, np.nan, 110], ['thing_3', 7, 'X', 'X', 'X', np.nan], ['thing_4', 5, 'X', 'X', np.nan, np.nan], [np.nan, 7, np.nan, np.nan, np.nan, 150]] 

data = pd.DataFrame(raw_data[1:], columns=raw_data[0]) 
print "Input format" 
print data 

header_dict = {item: x for x, item in enumerate(data.columns)} 

# Take data out of DF to become nested list 
data_list = data.as_matrix() 

# Track the row in which a thread starts its process 
active_threads = {} 

# Create a list to become to re-aligned column in the DF at the end for num iterations 
realigned_data = [np.nan for x in xrange(len(data_list))] 

for x, entry in enumerate(data_list): 
    thread_num = int(entry[header_dict['thread_num']]) 

    if all([pd.notnull(entry[header_dict['thing_I_care_about']]), 
      pd.notnull(entry[header_dict['A']]), 
      pd.notnull(entry[header_dict['B']]), 
      pd.notnull(entry[header_dict['C']])]): 
     active_threads[thread_num] = x 

    elif pd.notnull(entry[header_dict['Iterations']]) and entry[header_dict['thread_num']] in active_threads: 
     realigned_data[active_threads[thread_num]] = entry[header_dict['Iterations']] 

data['realigned_iterations'] = realigned_data 
print "Output format" 
print data 

回答

1

IIUC,我認爲你可以這樣來做。創建兩個掩碼,代表當前迭代值所在的行。並且,第二個掩碼在您希望迭代值移動的第一個記錄上放置True。然後將第一個面具與cumsum分組,然後將所有記錄的當前值放在一起,然後在第二個面具中使用第二個面具。

mask=(df['thing_I_care_about'].isnull() & 
     df['A'].isnull() & 
     df['B'].isnull() & 
     df['C'].isnull()) 

fmask = (df['thing_I_care_about'].notnull() & 
     df['A'].notnull() & 
     df['B'].notnull() & 
     df['C'].notnull()) 

df.assign(Iterations=df.groupby(mask[::-1].cumsum())['Iterations'].transform(lambda x: x.iloc[-1]).where(fmask)) 

輸出:

thing_I_care_about thread_num A B C Iterations 
0   thing_1   2 X X X  110.0 
1    NaN   2 X X NaN   NaN 
2   thing_2   3 NaN X X   NaN 
3    NaN   2 NaN NaN NaN   NaN 
4   thing_3   7 X X X  150.0 
5   thing_4   5 X X NaN   NaN 
6    NaN   7 NaN NaN NaN   NaN 
+0

這看起來很酷嚴重。我需要一段時間才能在我的完整數據集中驗證這一點,並讓我的頭腦在最後一行。但當然,它看起來像我所需要的;我希望我的示例數據足以對此進行測試 - 它是否會第二次出現「thread_num == 2」?編輯:線程2(例如)必須同時報告'thing_I_care_about'的「拾取」和它在它上面執行的迭代次數,然後纔可以再次進行。這是唯一的模式。 – roganjosh

+0

@roganjosh所有這一切真正關心的是四列全部爲空,並且所有四列都不爲空。這些行之間的任何內容都無關緊要。讓我知道並解釋最後的陳述。 –

+0

我編輯了我的最後評論,使其更清晰順便。這可能是明天之前,我可以測試這個,因爲我努力運行我的查詢從家裏獲取數據。我會盡快報告。 – roganjosh

相關問題