2016-09-12 96 views
1

我想將數據存儲在火花中,以便5秒或更短時間差的時間戳與相應的數據一起落入一個5秒存儲桶中。同樣,下一組5秒鐘與其餘日誌一起。 (這樣我就可以在桶中彙總數據)。我的日誌:在apache火花中創建存儲桶

1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt 
1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg 
1472120397.633 HTTP GEO er.abff.kagsf.weyfh.ajfg 
1472120397.261 HTTP GEO er.laffg.ayhrff.agyfr.yawr 
1472120394.328 HTTP GEO er.qfryf.aqwruf.oiuqwr.agsf 
1472120393.737 HTTP GEO er.aysf.aouf.ujaf.casf 
. 
. 
. 

我仍然無法弄清楚如何在火花中做到這一點。

帶時間戳記的日誌1472120400.107,1472120399.999,1472120397.633,1472120397.261等分爲一個分區,下一個分區設置爲下一個分區等等。

輸出:

所有時間戳1472120400.107,1472120399.999,1472120397.633,1472120397.261的loglines將被保存在存儲器(一個桶),以便進一步的處理將被完成的那些比如尋找計爲整個桶。同樣,下一個桶。

+0

您的預期輸出是什麼? – mtoto

+0

你是什麼意思的「桶」? – mtoto

+0

這只不過是分區日誌。分區的日誌組形成了存儲桶。 – kaks

回答

0

只需按照您要創建的粒度劃分時間戳即可。在BINDDD中輸入bin號作爲鍵,其中數據是輸入,然後是reduceByKey。

我將在Scala中編寫代碼示例,基本上將它轉換爲python是微不足道的,我想說明一下。

val l5 = List("1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt", "1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg") 
val l5RDD = sc.parallelize(l5) //input as RDD 
val l5tmp = l5RDD.map(item => item.split(" ")) //Split the sentence 
val l5tmp2 = l5tmp.map(item => ((item(0).toDouble/3600000).toInt, List(item))) //Map the data to a bin (in the key) according to the wanted granularity 
val collected = l5tmp2.reduceByKey(_ ++ _) //Collect the lists to create the bins of data 
collected.collect().foreach(println) //Prints (408,List([Ljava.lang.String;@2c6aed22, [Ljava.lang.String;@e322ec9)) - means that both entries collected to a bin named 408 
+0

我不明白這部分代碼:'val collected = l5tmp2.reduceByKey(_ ++ _)'。什麼是++? – kaks

+0

@kaks您需要定義如何將項目收集在一起,在這種情況下,我將項目放在列表中,因此爲了將列表添加到一起,我使用++。每個列表將包含所有應該落入分檔的項目 –

+0

您能告訴我們如何在python中編寫該項目嗎?我對此感到困難。 – kaks