如果標題過於模糊,我很抱歉,但我無法正確說出它。在Apache Spark中加入流式數據
所以基本上我想弄清楚Apache Spark和Apache Kafka是否能夠將數據從我的關係數據庫同步到Elasticsearch。
我的計劃是使用Kafka連接器之一從RDBMS中讀取數據並將其推入到Kafka主題中。那將是模型和DDL的ERD。很基本的,Report
和Product
表有許多到許多存在於ReportProduct
表關係:
CREATE TABLE dbo.Report (
ReportID INT NOT NULL PRIMARY KEY,
Title NVARCHAR(500) NOT NULL,
PublishedOn DATETIME2 NOT NULL);
CREATE TABLE dbo.Product (
ProductID INT NOT NULL PRIMARY KEY,
ProductName NVARCHAR(100) NOT NULL);
CREATE TABLE dbo.ReportProduct (
ReportID INT NOT NULL,
ProductID INT NOT NULL,
PRIMARY KEY (ReportID, ProductID),
FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));
INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');
INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');
INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
ON P.ProductID = RP.ProductID;
我的目標是用以下結構轉變成文檔這樣的:
{
"ReportID":1,
"Title":"Yet Another Apache Spark StackOverflow question",
"PublishedOn":"2017-09-12T19:15:28+00:00",
"Product":[
{
"ProductID":1,
"ProductName":"Apache"
},
{
"ProductID":2,
"ProductName":"Spark"
},
{
"ProductID":3,
"ProductName":"StackOverflow"
},
{
"ProductID":4,
"ProductName":"Random product"
}
]
}
report.join(
report_product.join(product, "product_id")
.groupBy("report_id")
.agg(
collect_list(struct("product_id", "product_name")).alias("product")
), "report_id").show
:我使用的靜態數據,我已在本地嘲笑了能夠形成這樣一種結構
但是我意識到這太基本了,流會變得更加複雜。
數據被不規則地改變(主要在每週),報告和他們的產品正在不斷地變化,產品在一段時間換一次。
我想複製到這些表中發生的Elasticsearch的任何類型的變化。
這聽起來真不錯流卡夫卡話題從上方插入Elasticsearch。從我之前所做的研究,卡夫卡不會讓你在這可能是我的情況不分區鍵加入。 KSQL解決了這個問題嗎? –
您可以使用KSQL輕鬆重新分區,我認爲這將解決此問題。我還沒有嘗試過。 –