2017-09-26 45 views
0

我正在設計一個以讀取的平面文件開始的數據管道。文件中的每一行都是單個記錄。使用模糊匹配重複數據刪除流處理的最佳做法

加載之後,每個記錄都將被解析,轉換和豐富。這發生與其他記錄無關。

作爲最後一步,我想基於幾個記錄的字段的模糊匹配來重複記錄記錄。要做到這一點,我想獲得2記錄的所有組合。

當前我使用sql表作爲緩衝區。我的表中包含的所有記錄,我加入了表本身,on即鍵不同的條件,並在名稱模糊匹配與sounds like

CREATE TABLE temp_tblSoundsLikeName AS 
SELECT DISTINCT clients1.client_name client_name1, 
       clients1.client_id client_id1, 
       clients2.client_name client_name2, 
       clients2.client_id client_id2, 
FROM tblClients clients1 
    JOIN tblClients clients2 
    ON clients1.client_name != clients2.client_name 
     AND clients1.ban_id < clients2.ban_id 
     AND SUBSTRING_INDEX(clients2.client_name,' ',1) SOUNDS LIKE SUBSTRING_INDEX(clients1.client_name,' ',1) 

在temp_tblSoundsLikeName的記錄代表的重複,我會在tblClients合併。

我正在考慮使用卡夫卡流,我以前沒有用過。當消息M(代表記錄R)到達重複數據刪除主題時,我希望我的應用程序使用它並因此生成包含來自R和另一個消息R'的信息的消息,其中R'是到達的任何消息在過去5個小時內的重複數據刪除階段。這些包含2條消息組合的消息應發送到另一個主題,在這些主題中可以通過匹配和模糊匹配條件進行過濾,最後階段是合併重複記錄並將合併記錄推送到具有kafka連接JDBC的RDBMS。

但我不確定如何爲所有這樣的RR'組合創建消息。 這可能嗎? 這是Kafka Streams的一個很好的用例嗎?

回答

2

使用Kafka的Streams API進行重複數據刪除的起點爲EventDeduplicationLambdaIntegrationTest.java,地址爲https://github.com/confluentinc/kafka-streams-examples(Confluent Platform 3.3.0/Apache Kafka 0.11.0:EventDeduplicationLambdaIntegrationTest.java的直接鏈接)。

isDuplicate控制新的事件是否被認爲是重複的方法:

private boolean isDuplicate(final E eventId) { 
    long eventTime = context.timestamp(); 
    WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
     eventId, 
     eventTime - leftDurationMs, 
     eventTime + rightDurationMs); 
    boolean isDuplicate = timeIterator.hasNext(); 
    timeIterator.close(); 
    return isDuplicate; 

eventIdStore是一個所謂的「狀態存儲」,它可以讓你要記住,從過去的事件信息這樣你就可以做出「重複是/否?」決定。

當一個消息M(代表記錄R)到達的重複數據刪除主題,我想我的應用程序使用,並作爲結果,生成含有從R和從另一個消息R」,其中,所述信息的消息R'是過去5個小時內到達重複數據刪除階段的任何消息。這些包含2條消息組合的消息應發送到另一個主題,在這些主題中可以通過匹配和模糊匹配條件進行過濾,最後階段是合併重複記錄並將合併記錄推送到具有kafka連接JDBC的RDBMS。你有

一種選擇是做「賦予了新的R,讓我們找到所有R'郵件,然後重複數據刪除」一步到位,即在一個處理步驟(類似於上面的例子,使用所謂的Transformer)執行此操作,而不是創建一堆新的下游消息,這會導致寫入放大(1 * R => N * "(R/R')"下游消息)。國家商店可用於追蹤所有之前的消息,包括您在R抵達時感興趣的各種R'

+0

@ michael-g-noll謝謝你的回答。我想重複匹配模糊匹配(例如字符串編輯距離)。如果我想要完全匹配,我可以看到州商店如何有用,但是如何使用州商店獲得各種用於模糊匹配的「R」?例如,如果'R = Michael'我想要去掉'R'1 = michael,R'2 = Michale,R'3 = Michele' – polo

+0

您可以實現一個自定義狀態存儲,允許您傳入「Michael 「,並返回模糊匹配」邁克爾「(如邁克爾,邁克爾,米歇爾)的條目列表。見例如'ProbabilisticCountingScalaIntegrationTest'在https://github.com/confluentinc/kafka-streams-examples上演示如何實現自己的自定義狀態存儲(這個特定的例子與你的用例不同,但問題是你可以插入在你自己的國家商店,就像一個經過優化的模糊匹配)。 –