4

我使用AWS Lambda(node.js)作爲AWS Kinesis使用者。我可以看到您可以設置最大批量,但是我想知道是否可以設置最小批量。這樣我就可以確保每個lambda都能處理至少50個(或任何數量)的記錄。Kinesis Lambda消費者最小批處理大小

我想有一個最小批量大小,因爲拉姆達消費者將建立一個RDS MySQL實例連接,我試圖保持併發連接小的數目。

如果沒有可以設置最小值的配置功能,那麼可以使用任何解決方法。

謝謝。

回答

0

我的第一個問題是您在流中打開了多少個碎片?您只能獲得1個併發執行的每個shard的lambda實例。所以如果你只有1個碎片,那麼你一次只能有1個Lambda碰到你的RDS實例。

你有數據,表示這是一個問題嗎?

下面是一個破解可能會或可能無法可靠地工作。也許不應該用在產品環境中。

最少批量大小,你可以從你的node.js lambda函數,如果批量大小小於所需的記錄數返回error

E.g.

handler(event, context, callback) { 
    const records = event.Records; 
    if (records.length() < minBatchSize) { 
    callback('insufficient batch size'); 
    } else { 
    processRecords(records, callback); 
    } 
} 

兩個問題浮現在腦海中:

1)你不能沒有,因爲是上配置的最大事件時間限制運行數據丟失的風險,這樣做下去你的流。在這段時間之後,記錄從流中消失。請注意,您爲此功能支付額外費用(請參閱extended data retention)。

您可以從拉姆達/室壁運動碎片迭代年齡指標看http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html推斷分批時間。

我不知道如何可靠這一點,特別是如果你有超過1塊碎片,但如

handler(event, context, callback) { 
    const records = event.Records; 
    if (records.length() < minBatchSize) { 
    if (calculateLambdaAge() > tooLongDelayThreshold) { 
     processRecords(records, callback); 
    } else { 
     callback(new Error('insufficient batch size')); 
    } 
    } else { 
    processRecords(records, callback); 
    } 
} 

calculateLambdaAge() { 
    // interrogate cloudwatch 
} 

如果CloudWatch的不會告訴你,你可能需要跟蹤它自己的地方,是至少具有可擴展性的RDS(Redis的/發電機)。

2)而不是把精力投入到製作#1可靠,可以額外的努力剛剛進入擴大您的RDS實例使您當前使用更有效率?


我的代碼樣本放在一起時稱thisthis