2017-09-19 56 views
2

我們使用U-SQL從一組.csv文件中提取傳感器數據。每個記錄包含一傳感器ID,測量和值的時間,以及接收到該記錄時的時間:並行識別最新記錄

+----------+---------------------+------------------+---------------------+ 
| SensorID | MeasurementTime | MeasurementValue | ReceivedTime  | 
+----------+---------------------+------------------+---------------------+ 
| xxx  | 2017-09-10 11:00:00 |   12.342 | 2017-09-19 14:25:17 | 
| xxx  | 2017-09-10 12:00:00 |   14.654 | 2017-09-19 14:25:17 | 
| yyy  | 2017-09-10 11:00:00 |   1.054 | 2017-09-19 14:25:17 | 
| yyy  | 2017-09-10 12:00:00 |   1.354 | 2017-09-19 14:25:17 | 
    ... 
| xxx  | 2017-09-10 11:00:00 |   10.261 | 2017-09-19 15:25:17 | 
+----------+---------------------+------------------+---------------------+ 

的文件存儲在ADLS基於所述測量的日期部分的路徑所以上面的數據可以在/Data/2017/09/10/measurements.csv中找到,其中前四行是在9月19日14:25:17寫的,最後一行是在一小時後的15:25:17追加。

如上例所示,可以在以後收到同一個SensorID和MeasurementTime的新值。每個分區擁有幾百萬行,每天有幾千行被添加到少量分區。我們希望每24小時運行一次批處理作業,對於任何給定的SensorID和MeasurementTime,它只會輸出最新的值。爲此,我們將U-SQL腳本,它類似於此:

@newestMeasurements_addRN = 
    SELECT *, 
      ROW_NUMBER() OVER (PARTITION BY PDate, 
              SensorId, 
              MeasurementTime 
           ORDER BY ReceivedTime DESC) AS MeasurementRN; 

@newestMeasurements = 
    SELECT SensorId, 
      MeasurementTime, 
      MeasurementValue 
    FROM @newestMeasurements_addRN 
    WHERE MeasurementRN == 1; 

這裏,PDate是從CSV文件的路徑爲yyyy/MM/DD推斷出一個虛擬列(等於日期 - MeasurementTime的一部分)。現在

,因爲我們在窗口函數的PARTITION BY部分使用PDate,我預計這種操作可以並行化,因爲試圖找到任何最新的記錄時,我們不必考慮不同天(分區)給定SensorID和MeasurementTime。不幸的是,這似乎並沒有這樣的情況,看着工作圖:

enter image description here

在這裏,我們提取4種不同天的數據。 Extract頂點輸出完整的記錄數,只將最新記錄的任務留給底部的合併頂點,表明ROW_NUMBER和後續過濾不是並行發生的。

  • 這是一個執行ROW_NUMBER的bug嗎?
  • 我們可以使用不同的U-SQL技術來確保並行性嗎?

回答

1

我設法找到一個可用的解決方案,其中,我封裝在U-SQL,其檢測一U SQL存儲的過程,這需要對應於pdate作爲輸入參數的值內的最新測量結果。

然後,我只是執行這個存儲過程幾次,日期的列表,我想並行處理:

DetectLatestMeasurements(20170910); 
DetectLatestMeasurements(20170911); 
DetectLatestMeasurements(20170912); 
DetectLatestMeasurements(20170913); 

的存儲過程處理的數據價值1天提取,轉換和輸出,所以這就完成了這項工作,並且按照我的預期進行並行化。