2017-10-10 87 views
1
GET test_data/_search 
{ 
    "query": {"bool": {"must": [ 
    {"match": {"company":"foo"}} 
    ] 
    }}, 

    "size": 0, 
    "aggs" : { 
     "filenames": { 
     "terms":{ 
      "field": "filename.keyword" 
     }, 
     "aggs": { 
     "maxDate": {"max": {"field":"timestamp"}}, 
     "minDate": {"min": {"field":"timestamp"}} 
     } 
     }  
    } 
} 

輸出示例:檢索彙總/桶火花

{ 
    "took": 1052, 
    "timed_out": false, 
    "_shards": { 
    "total": 5, 
    "successful": 5, 
    "failed": 0 
    }, 
    "hits": { 
    "total": 52120825, 
    "max_score": 0, 
    "hits": [] 
    }, 
    "aggregations": { 
    "filenames": { 
     "doc_count_error_upper_bound": 97326, 
     "sum_other_doc_count": 51389890, 
     "buckets": [ 
     { 
      "key": "Messages_20170711_080003.mes", 
      "doc_count": 187131, 
      "minDate": { 
      "value": 1499724098000, 
      "value_as_string": "2017-07-10T22:01:38.000Z" 
      }, 
      "maxDate": { 
      "value": 1499760002000, 
      "value_as_string": "2017-07-11T08:00:02.000Z" 
      } 
     }, 
     { 
      "key": "Messages_20170213_043108.mes", 
      "doc_count": 115243, 
      "minDate": { 
      "value": 1486735453000, 
      "value_as_string": "2017-02-10T14:04:13.000Z" 
      }, 
      "maxDate": { 
      "value": 1486960265000, 
      "value_as_string": "2017-02-13T04:31:05.000Z" 
      } 
     }, 

該查詢返回kibana開發工具進入時所需的結果。

當我嘗試用火花elasticsearch

val df = spark.sqlContext.esDF(esInputIndexName, query = queryString) 
df.show(10, false) 

數據框返回的結果水桶顯示我全部命中,而不是與內部的聚合桶。 如何將聚合/桶提供的結果存儲在數據框中?

回答

0

一種解決方法是您通過spark執行聚合。

import org.apache.spark.sql.functions{min, max} 

val df = spark.sqlContext.esDF(esInputIndexName) 

val aggDF = df.groupBy("filenames").agg(min("timestamp").as("minDate"), max("timestamp").as("maxDate")) 

aggDF.show(10) 
+0

感謝您的回答,這幾乎是我迄今爲止所做的。運行時間雖然(20分鐘+)是一個痛苦的屁股。如果數據過濾將在查詢中發生,這將工作得更快。 – user2811630