2017-09-18 74 views
0

有沒有辦法在流分析中做array_agg或string_agg的postgres等價物?我每隔幾秒就會收到一次數據,並希望在一段時間內獲得數值。Azure流分析array_agg相當於?

數據:

{time:12:01:01,name:A,location:X,value:10} 
{time:12:01:01,name:B,location:X,value:9} 
{time:12:01:02,name:C,location:Y,value:5} 
{time:12:01:02,name:B,location:Y,value:4} 
{time:12:01:03,name:B,location:Z,value:2} 
{time:12:01:03,name:A,location:Z,value:3} 
{time:12:01:06,name:B,location:Z,value:4} 
{time:12:01:06,name:C,location:Z,value:7} 
{time:12:01:08,name:B,location:Y,value:1} 
{time:12:01:13,name:B,location:X,value:8} 

2秒的滑動窗口,我要對數據進行分組,看看以下內容:

12:01:01, 2 events, 9.5 avg, 2 distinct names, 1 distinct location, nameA:1, nameB:1, locationX:1 
12:01:02, 4 events, 7 avg, 3 distinct names, 2 distinct location, nameA:1, nameB:2,nameC:1,locationX:1,locationY:1 
12:01:03... 
12:01:06... 
... 

我能得到的事件,平均的數量,和獨特的計數沒有問題。我使用一個窗口以及一個with語句來加入時間戳,以獲得該時間戳的聚合計數。我無法弄清楚如何通過名稱和位置獲取總計數,主要是因爲我不知道如何在Azure中聚合字符串。

with agg1 as (
select system.timestamp as start, 
avg(value) as avg, 
count(1) as events, 
count(distinct name) as distinct names, 
count(distinct location) as distinct location 
from input timestamp by created 
group by slidingwindow(second,2) 
), 
agg2 as (
select agg2_inner.start, 
array_agg(name,'|',ct_name) as countbyname (????) 
from (
    select system.timestamp as start, 
    name, count(1) as ct_name 
    from input timestamp by created 
    group by slidingwindow(second,2), name 
) as agg2_inner 
group by agg2_inner.start, slidingwindow(seconds,2) 
) 

select * from agg1 join agg2 on (datediff(second,agg1,agg2) between 0 and 2 
and agg1.start = agg2.start) 

沒有設置名稱,位置的列表,所以查詢需要有點動態。如果計數位於單個查詢中的某個對象中,則可以稍後進行解析以獲取單個計數。

回答

1

據我所知,azure流分析不提供array_agg方法。但它提供了Collect方法,可以從窗口返回所有記錄值。我建議你可以用Collect方法首先返回按時間和窗口分組的數組。

然後,您可以使用Azure Stream Analytics JavaScript user-defined functions編寫自己的邏輯將數組轉換爲結果。

更多細節,你可以參考下面的示例:

查詢是這樣的:

SELECT 
    time, udf.yourunfname(COLLECT()) as Result 
INTO 
    [YourOutputAlias] 
FROM 
    [YourInputAlias] 
Group by time, TumblingWindow(minute, 10) 

的UDF是這樣的:

我剛剛返回AVG和事件的長度。

function main(InputJSON) { 
     var sum = 0; 
     for (i = 0; i < InputJSON.length; i++) { 
      sum += InputJSON[i].value; 

     } 
    var result = {events:InputJSON.length,avg:sum/InputJSON.length }; 

    return result; 
} 

數據:

{"name": "A", "time":"12:01:01","value":10} 

{"name": "B", "time":"12:01:01","value":9} 

{"name": "C", "time":"12:01:02","value":10} 

結果:

enter image description here