2013-08-22 30 views
1

我的程序需要分析每天從每個應用服務器每小時生成一堆日誌文件。並行日誌聚合的最佳方法

因此,如果我有2個應用服務器,我將處理48個文件(24個文件* 2個應用服務器)。

文件大小範圍100-300 mb。在每個文件中的每一行是一個日誌條目,其是格式的

[標識符] - [片數] - [片] - [日誌的一部分]

例如

xxx-3-1-ABC 
xxx-3-2-ABC 
xxx-3-3-ABC 

這些可以在我提到的,我需要合併這些日誌像這樣

xxx-PAIR-ABCABCABC 

的48個文件分發我的實現使用threa d池通過在平行文件中讀取信息,然後使用的ConcurrentHashMap

我定義一個類它們聚集LogEvent.scala

class LogEvent (val id: String, val total: Int, var piece: Int, val json: String) { 

    var additions: Long = 0 
    val pieces = new Array[String](total) 
    addPiece(json) 


    private def addPiece (json: String): Unit = { 
    pieces(piece) = json 
    additions += 1 
    } 

    def isDone: Boolean = { 
    additions == total 
    } 


    def add (slot: Int, json: String): Unit = { 
    piece = slot 
    addPiece(json) 
    } 

主處理髮生在多個線程和代碼上的

東西線
//For each file 
val logEventMap = new ConcurrentHashMap[String, LogEvent]().asScala 
Future { 
      Source.fromInputStream(gis(file)).getLines().foreach { 
      line => 

        //Extract the id part of the line 
        val idPart: String = IDPartExtractor(line) 
        //Split line on '-' 
        val split: Array[String] = idPart.split("-") 



        val id: String = split(0) + "-" + split(1) 
        val logpart: String = JsonPartExtractor(line) 
        val total = split(2) toInt 
        val piece = split(3) toInt 

        def slot: Int = { 
         piece match { 
         case x if x - 1 < 0 => 0 
         case _ => piece - 1 
         } 
        } 

        def writeLogEvent (logEvent: LogEvent): Unit = { 
         if (logEvent.isDone) { 
         //write to buffer 
         val toWrite = id + "-PAIR-" + logEvent.pieces.mkString("") 
         logEventMap.remove(logEvent.id) 
         writer.writeLine(toWrite) 
         } 
        } 

        //The LOCK 
        appendLock { 
         if (!logEventMap.contains(id)) { 
         val logEvent = new LogEvent(id, total, slot, jsonPart) 
         logEventMap.put(id, logEvent) 
         //writeLogEventToFile() 
         } 
         else { 
         val logEvent = logEventMap.get(id).get 
         logEvent.add(slot, jsonPart) 
         writeLogEvent(logEvent) 

         } 
        } 
       } 
      } 

主線程塊,直到所有的期貨完整

使用這種方法我已經能夠削減處理恬從一小時+到大約7-8分鐘。

我的問題如下 -

  1. 可以這樣以更好的方式來完成,我使用不同的線程讀取多個文件,我需要在那裏聚集發生在塊鎖定,是否有更好的方法這樣做?
  2. 該地圖在內存中增長非常快,任何關於這種用例的堆外存儲的建議
  3. 任何其他反饋。

感謝

+1

也許一個好主意是使用像Cassandra,Riak,MongoDb等分佈式數據庫?然後在每個節點上創建一個預定的bean,它將負責獲取日誌數據,然後放入db中。從集羣外部,你可以使用分佈式數據庫的另一個客戶端,它將負責在你的機器上覆制這些日誌。這是分佈式數據庫的常見用例。 – wonsky

+0

我對使用分佈式DB /存儲器持懷疑態度,因爲不需要任何持久性,我只需要存儲數據的時候就是合併的時候。也許像Redis這樣的純內存商店會做?任何想法 – winash

回答

0

對於這樣的事情,如果可以的話,使用Splunk,如果沒有,複製它的作用是指數上的需求聚集的日誌文件在稍後的點。

對於堆外存儲,請查看分佈式緩存 - Hazelcast或Coherence。這兩個支持都提供了存儲在多個JVM上的java.util.Map實現。

+0

Splunk是一款付費工具,我認爲這對於這件事是一種矯枉過正。 – winash

+0

這正是Splunk的意義所在 - 它們的收費模式基於被索引的數據量,因此您將支付所使用的費用。 –

1

執行此操作的常見方法是對每個文件進行排序,然後合併排序後的文件。結果是一個單個文件按照您希望的順序包含各個項目。然後,您的程序只需要對文件進行一次傳遞,將相鄰的匹配項組合起來。

這有一些非常有吸引力的好處:

  1. 排序/合併由您不必寫
  2. 你的聚合程序是非常簡單的標準工具來完成。或者,甚至可能有一個標準的工具可以做到這一點。
  3. 內存需求減少。排序/合併程序知道如何管理內存,並且您的聚合程序的內存要求很小。

當然存在一些缺陷。您將使用更多的磁盤空間,並且由於I/O成本的原因,該過程會稍慢一些。

當我面對這樣的事情時,我幾乎總是使用標準工具和簡單的聚合器程序。我從一個自定義程序中獲得的性能提高並不足以說明開發這件事的時間。

+0

這種方法肯定會很慢,我在某些情況下生成高達14 GB的合併文件, 也可以指向我提及的某些工具 – winash

+0

@winash:標準工具,如GNU工具排序,將進行排序和合並。 Windows帶有一個排序程序,但我不會推薦它作爲玩具。無論如何它都不會合並。 –

相關問題