我想扇出/鏈/複製一個輸入AWS Kinesis流到N個新Kinesis流,這樣寫入輸入Kinesis的每條記錄都會出現在N個流的每一箇中。如何扇出AWS kinesis流?
是否有AWS服務或開源解決方案?
如果有現成的解決方案,我寧願不寫代碼來做這件事。 AWS Kinesis firehose是無法解決的,因爲它不能輸出到kinesis。如果AWS Lambda解決方案的運行成本不高,那麼可能是AWS Lambda解決方案?
我想扇出/鏈/複製一個輸入AWS Kinesis流到N個新Kinesis流,這樣寫入輸入Kinesis的每條記錄都會出現在N個流的每一箇中。如何扇出AWS kinesis流?
是否有AWS服務或開源解決方案?
如果有現成的解決方案,我寧願不寫代碼來做這件事。 AWS Kinesis firehose是無法解決的,因爲它不能輸出到kinesis。如果AWS Lambda解決方案的運行成本不高,那麼可能是AWS Lambda解決方案?
有兩種方法,你可以完成扇出亞馬遜的Kinesis流的:
選項1:使用亞馬遜Kinesis分析扇出
您可以使用Amazon Kinesis Analytics從現有流中生成新的流。
從Amazon Kinesis Analytics documentation:實時
亞馬遜室壁運動分析應用持續讀取和處理數據流。您使用SQL編寫應用程序代碼來處理傳入流數據並生成輸出。然後,Amazon Kinesis Analytics 將輸出寫入配置的目標。
扇出在Application Code部分中提到:
您也可以編寫運行相互獨立的SQL查詢。例如,您可以編寫兩條查詢相同應用程序內部流的SQL語句,但會將輸出發送到不同的應用程序內部流。
我設法實現這個如下:
亞馬遜Kinesis Analytics SQL應用程序如下所示:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));
CREATE OR REPLACE PUMP "COPY_PUMP1" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";
此代碼創建一個泵(將其視爲連續選擇語句),從input
流中選擇並輸出到output1
流。我創建了另一個相同的應用程序,輸出到output2
流。
爲了測試,我送數據到input
流:
#!/usr/bin/env python
import json, time
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
i = 0
while True:
data={}
data['log'] = 'Record ' + str(i)
i += 1
print data
kinesis.put_record("input", json.dumps(data), "key")
time.sleep(2)
我讓它運行一段時間,然後使用此代碼顯示在輸出:
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]
輸出爲:
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']
我跑了它再次爲output2
和相同的輸出顯示。
選項2:使用AWS LAMBDA
如果你是扇形散開許多流,更有效的方法可能是創建一個AWS lambda表達式:
甚至可以讓Lambda函數根據命名約定自動發現輸出流(例如,任何名爲app-output-*
的流)。
你也可以使用相同的Kinesis Analytics應用程序,並添加兩個輸出流:) – alexcasalboni
有一個從亞馬遜實驗室提供粉絲使用lambda的github回購。 https://github.com/awslabs/aws-lambda-fanout。另請閱讀https://medium.com/retailmenot-engineering/building-a-high-throughput-data-pipeline-with-kinesis-lambda-and-dynamodb-7d78e992a02d上的「將同步Lambda調用轉換爲異步調用」,這對於構建真正的異步處理非常重要。
好奇你爲什麼覺得你需要做一個粉絲? kinesis流可以支持多個消費者從流的不同部分讀取。作爲@ E.J.Brennan提到的 –
,你爲什麼需要扇出? –
@ E.J.Brennan確實,kinesis支持多個消費者,但它的全侷限制是每秒5次讀取。雖然每次讀取都可以提取大量記錄,但一旦有超過20位消費者,您的延遲時間會變爲4秒。這是一個不適合我的應用程序。查看更多:https://brandur.org/kinesis-in-production#five-reads –