2016-06-28 183 views
1

我正在使用influxDB來存儲我的時間序列數據。如何寫入使用golang客戶端連續寫入influxdb

我寫了一個簡單的golang應用程序來讀取文件time.log中的行。

文檔在https://github.com/influxdata/influxdb/blob/master/client/README.md#inserting-data說:

插入數據

時間序列數據又名點被寫入使用批量插入數據庫。該機制是創建一個或多個點,然後創建一個批次aka批處理點並將其寫入給定的數據庫和系列。一個系列是測量(時間/值)和一組標籤的組合。

在本示例中,我們將創建一個1,000點的批次。每個點都有一個時間和一個值,以及2個表示形狀和顏色的標籤。我們使用名爲形狀的度量將這些點寫入名爲square_holes的數據庫。

注意:您可以指定一個RetentionPolicy作爲批點的一部分。如果未提供,InfluxDB將使用數據庫默認保留策略。

func writePoints(clnt client.Client) { 
    sampleSize := 1000 
    rand.Seed(42) 

    bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ 
     Database: "systemstats", 
     Precision: "us", 
    }) 

    for i := 0; i < sampleSize; i++ { 
     regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"} 
     tags := map[string]string{ 
      "cpu": "cpu-total", 
      "host": fmt.Sprintf("host%d", rand.Intn(1000)), 
      "region": regions[rand.Intn(len(regions))], 
     } 

     idle := rand.Float64() * 100.0 
     fields := map[string]interface{}{ 
      "idle": idle, 
      "busy": 100.0 - idle, 
     } 

     bp.AddPoint(client.NewPoint(
      "cpu_usage", 
      tags, 
      fields, 
      time.Now(), 
     )) 
    } 

    err := clnt.Write(bp) 
    if err != nil { 
     log.Fatal(err) 
    } 
} 

但因爲我不斷地從日誌中讀取數據。我從來沒有讀過日誌。那麼,我寫入入流服務器的最佳方式是什麼?

這是我的當前代碼:

cmdBP := client.NewBatchPoints(...) 
for line := range logFile.Lines { 
    pt := parseLine(line.Text) 
    cmdBP.AddPoint(pt) 
} 

influxClient.Write(cmdBP) 

基本上範圍logFile.Lines永遠不會終止,因爲它是基於一個頻道上。

+0

那麼每處理N行日誌時只調用'client.Write(cmdBP)'怎麼辦? – sberry

+0

所以我需要使用範圍循環內的計數器? –

回答

1

使用的批次分時間進行組合(運行作爲一個夠程):

func (h *InfluxDBHook) loop() { 
    var coll []*client.Point 
    tick := time.NewTicker(h._batchInterval) 

    for { 
     timeout := false 

     select { 
     case pt := <-h._points: 
      coll = append(coll, pt) 
     case <-tick.C: 
      timeout = true 
     } 

     if (timeout || len(coll) >= h._batchSize) && len(coll) > 0 { 
      bp, err := client.NewBatchPoints(h._batchPointsConfig) 
      if err != nil { 
       //TODO: 
      } 
      bp.AddPoints(coll) 
      err = h._client.Write(bp) 
      if err != nil { 
       //TODO: 
      } else { 
       coll = nil 
      } 
     } 
    } 
} 

順便說一句,你可以用一個鉤子與logrus日誌記錄包,以日誌發送到InfluxDB(示例代碼從logrusInfluxDB hook)。