我們使用U-SQL從一組.csv文件中提取傳感器數據。每個記錄包含一傳感器ID,測量和值的時間,以及接收到該記錄時的時間:並行識別最新記錄
+----------+---------------------+------------------+---------------------+
| SensorID | MeasurementTime | MeasurementValue | ReceivedTime |
+----------+---------------------+------------------+---------------------+
| xxx | 2017-09-10 11:00:00 | 12.342 | 2017-09-19 14:25:17 |
| xxx | 2017-09-10 12:00:00 | 14.654 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 11:00:00 | 1.054 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 12:00:00 | 1.354 | 2017-09-19 14:25:17 |
...
| xxx | 2017-09-10 11:00:00 | 10.261 | 2017-09-19 15:25:17 |
+----------+---------------------+------------------+---------------------+
的文件存儲在ADLS基於所述測量的日期部分的路徑所以上面的數據可以在/Data/2017/09/10/measurements.csv
中找到,其中前四行是在9月19日14:25:17寫的,最後一行是在一小時後的15:25:17追加。
如上例所示,可以在以後收到同一個SensorID和MeasurementTime的新值。每個分區擁有幾百萬行,每天有幾千行被添加到少量分區。我們希望每24小時運行一次批處理作業,對於任何給定的SensorID和MeasurementTime,它只會輸出最新的值。爲此,我們將U-SQL腳本,它類似於此:
@newestMeasurements_addRN =
SELECT *,
ROW_NUMBER() OVER (PARTITION BY PDate,
SensorId,
MeasurementTime
ORDER BY ReceivedTime DESC) AS MeasurementRN;
@newestMeasurements =
SELECT SensorId,
MeasurementTime,
MeasurementValue
FROM @newestMeasurements_addRN
WHERE MeasurementRN == 1;
這裏,PDate
是從CSV文件的路徑爲yyyy/MM/DD推斷出一個虛擬列(等於日期 - MeasurementTime的一部分)。現在
,因爲我們在窗口函數的PARTITION BY
部分使用PDate
,我預計這種操作可以並行化,因爲試圖找到任何最新的記錄時,我們不必考慮不同天(分區)給定SensorID和MeasurementTime。不幸的是,這似乎並沒有這樣的情況,看着工作圖:
在這裏,我們提取4種不同天的數據。 Extract頂點輸出完整的記錄數,只將最新記錄的任務留給底部的合併頂點,表明ROW_NUMBER
和後續過濾不是並行發生的。
- 這是一個執行
ROW_NUMBER
的bug嗎? - 我們可以使用不同的U-SQL技術來確保並行性嗎?