我們正在嘗試使用TPL Dataflow
框架的數據處理管道。阻止數據流的塊設計
管道的基本要點是:
- 迭代通過CSV文件在文件系統(10,000)
- 驗證我們尚未導入的內容,如果我們通過一個單一的內容忽略
- 迭代CSV文件(20,000-120,000行)並創建符合我們需要的數據結構。
- 批量處理這些新的dataStructured項目並將其推送到數據庫中
- 將CSV文件標記爲導入。
現在我們有一個現有的Python文件,它以非常緩慢的方式執行上述所有操作 - 代碼很混亂。我的想法如下TPL Dataflow
。
BufferBlock<string>
將所有文件發佈到TransformBlock<string, SensorDataDto>
謂詞來檢測是否導入該文件TransformBlock<string, SensorDataDto>
讀通過CSV文件,並創建SensorDataDto
結構BatchBlock<SensorDataDto>
用於內的TransformBlock
委託來分批處理100個請求。4.5。
ActionBlock<SensorDataDto>
將100條記錄推送到數據庫中。ActionBlock
將CSV標記爲導入。
我創建了最初的幾個操作和他們正在(BufferBlock
- >TransformBlock
+ Predicate
& &過程中,如果還沒有),但我不知道該如何繼續流動,這樣我可以張貼100至內的BatchBlock
並連線以下操作。
這看起來是否正確 - 基本要點,以及如何解決TPL數據流暢方式中的BufferBlock
位?
bufferBlock.LinkTo(readCsvFile, ShouldImportFile)
bufferBlock.LinkTo(DataflowBlock.NullTarget<string>())
readCsvFile.LinkTo(normaliseData)
normaliseData.LinkTo(updateCsvImport)
updateCsvImport.LinkTo(completionBlock)
batchBlock.LinkTo(insertSensorDataBlock)
bufferBlock.Completion.ContinueWith(t => readCsvFile.Complete());
readCsvFile.Completion.ContinueWith(t => normaliseData.Complete());
normaliseData.Completion.ContinueWith(t => updateCsvImport.Complete());
updateCsvImport.Completion.ContinueWith(t => completionBlock.Complete());
batchBlock.Completion.ContinueWith(t => insertSensorDataBlock.Complete());
裏面normaliseData
方法我打電話BatchBlock.Post<..>(...)
,是一個很好的模式,還是應該有不同的結構?我的問題是,我只能在所有記錄被推入後纔將文件標記爲正在導入。
Task.WhenAll(bufferBlock.Completion, batchBlock.Completion).Wait();
如果我們有一批100
,如果80
被推進,有沒有辦法排最後80
?
我不確定我是否應該在主管道中連接BatchBlock
,我會等到兩個都完成。