1
我試圖分析數百萬條遭受不幸缺陷折磨的日誌文件。與單個事件相關的數據可以在日誌條目之間進行拆分,但不存在將數據跨多行重新排列爲單行的直接鏈接;相反,我必須推斷這種關係。根據上一次某些條件爲真,將數據列中的數據與另一行對齊
背景簡介:
- 有我在乎的是將多次修改
- 有8個線程,將收集的隨機的事情之一,並開始處理的線程池4個對象。此事件由
thing_n
,A
,B
和C
標識,全部具有非空值,我也可以從此記錄的事件中獲取線程號。 - 在日誌後面的某個地方,會有一個日誌條目,說明線程進行了多少次迭代。此事件將不包含任何其他信息(即它不會報告
thing_n
它動手術) thread_num
/thing_n
配對會改變不斷- 任意數量的線程可以登錄任何數量的點2和3之間的事件,所以你不能簡單地將
.shift()
的Iterations
列重新排列成單行。
不知怎的,我需要重新調整與以前的(只有前一個)排在thing_I_care_about
,A
,B
,C
不爲空,且thread_num
匹配的迭代列。有時間戳(不在我的MCVE中),如果有幫助,所有事件按升序排序。
例輸入:
thing_I_care_about thread_num A B C Iterations
0 thing_1 2 X X X NaN
1 NaN 2 X X NaN NaN
2 thing_2 3 NaN X X NaN
3 NaN 2 NaN NaN NaN 110.0
4 thing_3 7 X X X NaN
5 thing_4 5 X X NaN NaN
6 NaN 7 NaN NaN NaN 150.0
輸出示例:
thing_I_care_about thread_num A B C Realigned Iterations
0 thing_1 2 X X X 110.0
1 NaN 2 X X NaN NaN
2 thing_2 3 NaN X X NaN
3 NaN 2 NaN NaN NaN NaN
4 thing_3 7 X X X 150.0
5 thing_4 5 X X NaN NaN
6 NaN 7 NaN NaN NaN NaN
我可以管理一個純Python的方法(下圖),但是這種分析將反覆按需進行,必須處理數百個數百萬次這樣的事件。從概念上講,我能想到的大熊貓這樣做的唯一方法是:
groupby()
thread_num
,並通過他們的時間戳各組排序- 嘗試以某種方式得到了DF爲每個線程交替
notnull([thing_n, A, B, C, thread_num])
和notnull([thread_num, Iterations])
行,以便我可以shift(-1)
他們重新調整數據 - 不知怎的,扎此回原來的數據幀
不過,我似乎無法使這種方法的進步。有沒有這樣做的聰明方式,還是我堅持在Python中處理這部分?
純Python的方法:
import numpy as np
import pandas as pd
raw_data = [['thing_I_care_about', 'thread_num', 'A', 'B', 'C', 'Iterations'], ['thing_1', 2, 'X', 'X', 'X', np.nan], [np.nan, 2, 'X', 'X', np.nan, np.nan], ['thing_2', 3, np.nan, 'X', 'X', np.nan], [np.nan, 2, np.nan, np.nan, np.nan, 110], ['thing_3', 7, 'X', 'X', 'X', np.nan], ['thing_4', 5, 'X', 'X', np.nan, np.nan], [np.nan, 7, np.nan, np.nan, np.nan, 150]]
data = pd.DataFrame(raw_data[1:], columns=raw_data[0])
print "Input format"
print data
header_dict = {item: x for x, item in enumerate(data.columns)}
# Take data out of DF to become nested list
data_list = data.as_matrix()
# Track the row in which a thread starts its process
active_threads = {}
# Create a list to become to re-aligned column in the DF at the end for num iterations
realigned_data = [np.nan for x in xrange(len(data_list))]
for x, entry in enumerate(data_list):
thread_num = int(entry[header_dict['thread_num']])
if all([pd.notnull(entry[header_dict['thing_I_care_about']]),
pd.notnull(entry[header_dict['A']]),
pd.notnull(entry[header_dict['B']]),
pd.notnull(entry[header_dict['C']])]):
active_threads[thread_num] = x
elif pd.notnull(entry[header_dict['Iterations']]) and entry[header_dict['thread_num']] in active_threads:
realigned_data[active_threads[thread_num]] = entry[header_dict['Iterations']]
data['realigned_iterations'] = realigned_data
print "Output format"
print data
這看起來很酷嚴重。我需要一段時間才能在我的完整數據集中驗證這一點,並讓我的頭腦在最後一行。但當然,它看起來像我所需要的;我希望我的示例數據足以對此進行測試 - 它是否會第二次出現「thread_num == 2」?編輯:線程2(例如)必須同時報告'thing_I_care_about'的「拾取」和它在它上面執行的迭代次數,然後纔可以再次進行。這是唯一的模式。 – roganjosh
@roganjosh所有這一切真正關心的是四列全部爲空,並且所有四列都不爲空。這些行之間的任何內容都無關緊要。讓我知道並解釋最後的陳述。 –
我編輯了我的最後評論,使其更清晰順便。這可能是明天之前,我可以測試這個,因爲我努力運行我的查詢從家裏獲取數據。我會盡快報告。 – roganjosh