0

如果標題過於模糊,我很抱歉,但我無法正確說出它。在Apache Spark中加入流式數據

所以基本上我想弄清楚Apache Spark和Apache Kafka是否能夠將數據從我的關係數據庫同步到Elasticsearch。

我的計劃是使用Kafka連接器之一從RDBMS中讀取數據並將其推入到Kafka主題中。那將是模型和DDL的ERD。很基本的,ReportProduct表有許多到許多存在於ReportProduct表關係: ERD

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY, 
    Title NVARCHAR(500) NOT NULL, 
    PublishedOn DATETIME2 NOT NULL); 

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY, 
    ProductName NVARCHAR(100) NOT NULL); 

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL, 
    ProductID INT NOT NULL, 
    PRIMARY KEY (ReportID, ProductID), 
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID), 
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID)); 

INSERT INTO dbo.Report (ReportID, Title, PublishedOn) 
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28'); 

INSERT INTO dbo.Product (ProductID, ProductName) 
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product'); 

INSERT INTO dbo.ReportProduct (ReportID, ProductID) 
VALUES (1, 1), (1, 2), (1, 3), (1, 4); 

SELECT * 
FROM dbo.Report AS R 
INNER JOIN dbo.ReportProduct AS RP 
    ON RP.ReportID = R.ReportID 
INNER JOIN dbo.Product AS P 
    ON P.ProductID = RP.ProductID; 

我的目標是用以下結構轉變成文檔這樣的:

{ 
    "ReportID":1, 
    "Title":"Yet Another Apache Spark StackOverflow question", 
    "PublishedOn":"2017-09-12T19:15:28+00:00", 
    "Product":[ 
    { 
     "ProductID":1, 
     "ProductName":"Apache" 
    }, 
    { 
     "ProductID":2, 
     "ProductName":"Spark" 
    }, 
    { 
     "ProductID":3, 
     "ProductName":"StackOverflow" 
    }, 
    { 
     "ProductID":4, 
     "ProductName":"Random product" 
    } 
    ] 
} 

report.join(
    report_product.join(product, "product_id") 
    .groupBy("report_id") 
    .agg(
     collect_list(struct("product_id", "product_name")).alias("product") 
    ), "report_id").show 
:我使用的靜態數據,我已在本地嘲笑了能夠形成這樣一種結構

但是我意識到這太基本了,流會變得更加複雜。

數據被不規則地改變(主要在每週),報告和他們的產品正在不斷地變化,產品在一段時間換一次。

我想複製到這些表中發生的Elasticsearch的任何類型的變化。

回答

1
  1. 卡夫卡連接到從源數據庫提取數據 - 可以使用JDBC Source它可爲Confluent Platform(或separately)部分,也可以調查kafka-connect-cdc-mssql

  2. 一旦你」我們已經獲得了卡夫卡的數據,使用Kafka Streams API來根據需要操縱數據,或者查看最新發布的KSQL。你的選擇會之類的東西你偏愛在類似SQL的環境(KSQL)在Java編碼(卡夫卡流)或操縱數據來驅動。無論如何,這兩個的輸出將成爲另一個卡夫卡話題。

  3. 最後,使用Elasticsearch卡夫卡連接插件(可here,或爲Confluent Platform的一部分)

+0

這聽起來真不錯流卡夫卡話題從上方插入Elasticsearch。從我之前所做的研究,卡夫卡不會讓你在這可能是我的情況不分區鍵加入。 KSQL解決了這個問題嗎? –

+0

您可以使用KSQL輕鬆重新分區,我認爲這將解決此問題。我還沒有嘗試過。 –