2016-10-15 81 views
7

我想扇出/鏈/複製一個輸入AWS Kinesis流到N個新Kinesis流,這樣寫入輸入Kinesis的每條記錄都會出現在N個流的每一箇中。如何扇出AWS kinesis流?

是否有AWS服務或開源解決方案

如果有現成的解決方案,我寧願不寫代碼來做這件事。 AWS Kinesis firehose是無法解決的,因爲它不能輸出到kinesis。如果AWS Lambda解決方案的運行成本不高,那麼可能是AWS Lambda解決方案?

+2

好奇你爲什麼覺得你需要做一個粉絲? kinesis流可以支持多個消費者從流的不同部分讀取。作爲@ E.J.Brennan提到的 –

+0

,你爲什麼需要扇出? –

+1

@ E.J.Brennan確實,kinesis支持多個消費者,但它的全侷限制是每秒5次讀取。雖然每次讀取都可以提取大量記錄,但一旦有超過20位消費者,您的延遲時間會變爲4秒。這是一個不適合我的應用程序。查看更多:https://brandur.org/kinesis-in-production#five-reads –

回答

11

有兩種方法,你可以完成扇出亞馬遜的Kinesis流的:

  • 使用亞馬遜室壁運動分析將記錄複製到其他數據流
  • 觸發AWS LAMBDA功能將記錄複製到另一個流

選項1:使用亞馬遜Kinesis分析扇出

您可以使用Amazon Kinesis Analytics從現有流中生成新的流。

Amazon Kinesis Analytics documentation:實時

亞馬遜室壁運動分析應用持續讀取和處理數據流。您使用SQL編寫應用程序代碼來處理傳入流數據並生成輸出。然後,Amazon Kinesis Analytics 將輸出寫入配置的目標

Amazon Kinesis Analytics flow diagram

扇出在Application Code部分中提到:

您也可以編寫運行相互獨立的SQL查詢。例如,您可以編寫兩條查詢相同應用程序內部流的SQL語句,但會將輸出發送到不同的應用程序內部流

我設法實現這個如下:

  • 創建了三個流:輸入,輸出1,輸出2
  • 創建了兩個亞馬遜室壁運動分析應用:COPY1,COPY2

亞馬遜Kinesis Analytics SQL應用程序如下所示:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
(log VARCHAR(16)); 

CREATE OR REPLACE PUMP "COPY_PUMP1" AS 
    INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001"; 

此代碼創建一個(將其視爲連續選擇語句),從input流中選擇並輸出到output1流。我創建了另一個相同的應用程序,輸出到output2流。

爲了測試,我送數據到input流:

#!/usr/bin/env python 

import json, time 
from boto import kinesis 

kinesis = kinesis.connect_to_region("us-west-2") 
i = 0 

while True: 
    data={} 
    data['log'] = 'Record ' + str(i) 
    i += 1 
    print data 
    kinesis.put_record("input", json.dumps(data), "key") 
    time.sleep(2) 

我讓它運行一段時間,然後使用此代碼顯示在輸出:

from boto import kinesis 

kinesis = kinesis.connect_to_region("us-west-2") 
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator'] 
records = kinesis.get_records(iterator, 5) 
print [r['Data'] for r in records['Records']] 

輸出爲:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}'] 

我跑了它再次爲output2和相同的輸出顯示。

選項2:使用AWS LAMBDA

如果你是扇形散開許多流,更有效的方法可能是創建一個AWS lambda表達式:

  • 亞馬遜觸發 Kinesis流記錄
  • 將記錄寫入多個Amazon Kinesis'輸出'流

甚至可以讓Lambda函數根據命名約定自動發現輸出流(例如,任何名爲app-output-*的流)。

+0

你也可以使用相同的Kinesis Analytics應用程序,並添加兩個輸出流:) – alexcasalboni

相關問題