我有一個每15分鐘由S3事件觸發的Java Lambda函數。我注意到,在大約每3個小時的時間段內,每個Lambda調用都會包含上傳的最新文件以及在3小時時間範圍內上傳的所有文件。在AWS中多次處理S3文件Lambda
因此,如果迭代遍歷整個列表時,它會重複在早期的Lambda調用中已經處理的文件。
如何獲取它只處理最新上傳的文件?在node.js中,有一個context.suceed(),我認爲它將該事件標記爲已成功處理。 Java似乎沒有那個。
以下是Cloudwatch日誌。
08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST
08:35:26 TIME - AUTHENTICATE: 8101ms
08:35:26 TIME - MESSAGE PARSE: 1ms
08:35:26 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
08:35:35 Processed 147 events
08:35:35 TIME - FILE PARSE: 9698
08:35:35 Found 1 event files
08:35:35 Total function took: 17800ms
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST
08:45:03 TIME - AUTHENTICATE: 119ms
08:45:03 TIME - MESSAGE PARSE: 0ms
08:45:03 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
08:45:05 Processed 147 events
08:45:05 data :: event/events/2016/ 08/31/2016 0831124500.export.csv
08:45:06 Processed 211 events
08:45:06 TIME - FILE PARSE: 2499
08:45:06 Found 2 event files
08:45:06 Total function took: 2618ms
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB
09:05:02 START RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST
09:05:02 TIME - AUTHENTICATE: 98ms
09:05:02 TIME - MESSAGE PARSE: 0ms
09:05:02 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
09:05:03 Processed 147 events
09:05:03 data :: event/events/2016/ 08/31/2016 0831124500.export.csv
09:05:04 Processed 211 events
09:05:04 data :: event/events/2016/ 08/31/2016 0831130000.export.csv
09:05:04 Processed 204 events
09:05:04 TIME - FILE PARSE: 2242
09:05:04 Found 3 event files
09:05:04 Total function took: 2340ms
09:05:04 END RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc
編輯1我相信這個問題已經被邁克爾回答,但下面是一些對別人的代碼。我確實使用全球列表來保存記錄。
公共類LambdaHandler {
private final List<GDELTEventFile> eventFiles = new ArrayList<>();
private AmazonS3Client s3Client;
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim();
public void gdeltHandler(S3Event event, Context context) {
StopWatch sw = new StopWatch();
long time = 0L;
sw.start();
s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
sw.split();
System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms");
time += sw.getSplitTime();
sw.reset();
sw.start();
processEvent(event);
sw.split();
System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms");
time += sw.getSplitTime();
sw.reset();
sw.start();
processFiles();
sw.split();
System.out.println("TIME - FILE PARSE: " + sw.getSplitTime());
time += sw.getSplitTime();
System.out.println("Found " + eventFiles.size() + " event files");
System.out.println("Total function took: " + time + "ms");
}
private void processEvent(S3Event event) {
List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords();
for (S3EventNotification.S3EventNotificationRecord record : records) {
long filesize = record.getS3().getObject().getSizeAsLong();
eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize));
}
}
private void processFiles() {
for (GDELTEventFile event : eventFiles) {
try {
System.out.println(event.getBucket() + " :: " + event.getFilename());
GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename());
S3Object file = s3Client.getObject(request);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) {
CSVParser parser = new CSVParser(reader, CSV_FORMAT);
int count = 0;
for (CSVRecord record : parser) {
count++;
}
}
System.out.println("Processed " + count + " events");
}
} catch (IOException ioe) {
System.out.println("IOException :: " + ioe);
}
}
}
'data ::'行中的信息來自哪裏?我從來沒有見過S3每個事件發送多個記錄......實際上,我的代碼被設計爲停止並拋出異常,如果它發生的話,因爲這是意外的。我懷疑你可能沒有考慮到可能的容器重用。如果Lambda事件重用先前調用中的相同容器,那麼您可能擁有一個全局數據結構來保存舊事件嗎? –
嗨邁克爾,如果你問在哪裏打印實際的線,那麼從函數內部打印這些線。它只是一個System.out.println。該文件本身由gdelt服務每15分鐘生成一次,然後將其下載到ec2實例,然後通過cli將其上載。我如何解釋lambda中的容器重用?從來沒有聽說過... – Brooks
當我說「帳戶」的容器重用,我的意思是[請注意,這是一件事](https://aws.amazon.com/blogs/compute/container-reuse-in- lambda /)並相應地編碼。每次函數觸發時,它都會啓動一個運行Lambda函數的空閒進程......但它可能是新的,或者可能是上次運行該函數的*相同*進程,或者在此之前的某個時間被重用(只要因爲您尚未上傳新代碼)。如果你將記錄從S3推到一個全局數組上,而不是一個作用域到你的處理程序(例如),那麼當你迭代該數組時,它有時會*包含舊事件。 –