2016-02-29 45 views
1

解析多行日誌文件條目的正確spark | scala技術是什麼? SQL跟蹤文本文件:Spark/Scala:解析多行記錄

# createStatement call (thread 132053, con-id 422996) at 2015-07-24 12:39:47.076339 
# con info [con-id 422996, tx-id 47, cl-pid 50593, cl-ip 10.32.50.24, user: SAPABA, schema: SAPABA] 
cursor_140481797152768_c22996 = con_c22996.cursor() 

# begin PreparedStatement_execute (thread 132053, con-id 422996) at 2015-07-24 12:39:47.076422 
# con info [con-id 422996, tx-id 47, cl-pid 50593, cl-ip 10.32.50.24, user: SAPABA, schema: SAPABA] 
cursor_140481797152768_c22996.execute("SELECT DISTINCT blah blah blah") 
# end PreparedStatement_execute (thread 132053, con-id 422996) at 2015-07-24 12:39:47.077706 

每個記錄由三行組成;每種記錄類型的屬性(例如createStatementPreparedStatement)都不相同。 我想逐行讀取文件中的行,決定記錄類型,然後創建一個數據幀行的每個記錄:

insert into prepared_statements values (132053,422996, '2015-07-24 12:39:47.076422','SELECT DISTINCT blah blah blah') 

要做到這一點,我需要檢查每行以確定它是哪種記錄類型,然後讀取下兩行以獲取該記錄類型的屬性。另外,根據記錄不同,行格式也不同,所以我需要有條件地檢查每行三行的開始,以確定記錄類型。是否有火花技術來解析多行記錄?

+0

你能否假定在兩個「邏輯記錄」之間總是會有這個空行,或者只是爲了清晰的例子而添加了這個? –

+0

嗨Tzach,是每個邏輯記錄之間有一條空行。 – MarkTeehan

+1

只需使用'\ n \ n'作爲記錄分隔符。 – zero323

回答

3

下面是一個工作解決方案,其基礎是將每行與下一個空行的索引進行匹配,然後按這些索引進行分組,以將每個「邏輯記錄」的行組合在一起。 假設輸入是在rdd

val indexedRows: RDD[(String, Long)] = rdd.zipWithIndex().cache() 
val emptyRowIndices = indexedRows.filter(_._1.isEmpty).values.collect().sorted 

val withIndexOfNextGap: RDD[(String, Long)] = indexedRows 
    .filter(!_._1.isEmpty) 
    .mapValues(i => emptyRowIndices.find(_ > i).getOrElse(0)) // finds lowest index of empty line greater than current line index 

val logicalRecords: RDD[Iterable[String]] = withIndexOfNextGap.map(_.swap).groupByKey().values 

logicalRecords.map(f) // f maps each Iterable[String] into whatever you need 

請注意,此解決方案具有幾個注意事項:

  • 它假定的「邏輯記錄」(多行的條目)的數量不太偉大的收集他們指數到駕駛員記憶
  • 這不是超級有效,因爲我們將掃描每行這些指標