2016-09-02 82 views
0

我有一個每15分鐘由S3事件觸發的Java Lambda函數。我注意到,在大約每3個小時的時間段內,每個Lambda調用都會包含上傳的最新文件以及在3小時時間範圍內上傳的所有文件。在AWS中多次處理S3文件Lambda

因此,如果迭代遍歷整個列表時,它會重複在早期的Lambda調用中已經處理的文件。

如何獲取它只處理最新上傳的文件?在node.js中,有一個context.suceed(),我認爲它將該事件標記爲已成功處理。 Java似乎沒有那個。

以下是Cloudwatch日誌。

08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST 
08:35:26 TIME - AUTHENTICATE: 8101ms 
08:35:26 TIME - MESSAGE PARSE: 1ms 
08:35:26 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
08:35:35 Processed 147 events 
08:35:35 TIME - FILE PARSE: 9698 
08:35:35 Found 1 event files 
08:35:35 Total function took: 17800ms 
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB 
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST 
08:45:03 TIME - AUTHENTICATE: 119ms 
08:45:03 TIME - MESSAGE PARSE: 0ms 
08:45:03 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
08:45:05 Processed 147 events 
08:45:05 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 
08:45:06 Processed 211 events 
08:45:06 TIME - FILE PARSE: 2499 
08:45:06 Found 2 event files 
08:45:06 Total function took: 2618ms 
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB 
09:05:02 START RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST 
09:05:02 TIME - AUTHENTICATE: 98ms 
09:05:02 TIME - MESSAGE PARSE: 0ms 
09:05:02 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
09:05:03 Processed 147 events 
09:05:03 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 
09:05:04 Processed 211 events 
09:05:04 data :: event/events/2016/ 08/31/2016 0831130000.export.csv 
09:05:04 Processed 204 events 
09:05:04 TIME - FILE PARSE: 2242 
09:05:04 Found 3 event files 
09:05:04 Total function took: 2340ms 
09:05:04 END RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc 

編輯1我相信這個問題已經被邁克爾回答,但下面是一些對別人的代碼。我確實使用全球列表來保存記錄。

公共類LambdaHandler {

private final List<GDELTEventFile> eventFiles = new ArrayList<>(); 
private AmazonS3Client s3Client; 
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim(); 

public void gdeltHandler(S3Event event, Context context) { 
    StopWatch sw = new StopWatch(); 
    long time = 0L; 

    sw.start(); 
    s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider()); 
    sw.split(); 
    System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms"); 
    time += sw.getSplitTime(); 
    sw.reset(); 

    sw.start(); 
    processEvent(event); 
    sw.split(); 
    System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms"); 
    time += sw.getSplitTime(); 
    sw.reset(); 

    sw.start(); 
    processFiles(); 
    sw.split(); 
    System.out.println("TIME - FILE PARSE: " + sw.getSplitTime()); 
    time += sw.getSplitTime(); 

    System.out.println("Found " + eventFiles.size() + " event files"); 
    System.out.println("Total function took: " + time + "ms"); 
} 

private void processEvent(S3Event event) { 
    List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords(); 
    for (S3EventNotification.S3EventNotificationRecord record : records) { 
     long filesize = record.getS3().getObject().getSizeAsLong(); 
     eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize)); 
    } 
} 

private void processFiles() { 
    for (GDELTEventFile event : eventFiles) { 
     try { 
      System.out.println(event.getBucket() + " :: " + event.getFilename()); 
      GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename()); 
      S3Object file = s3Client.getObject(request); 
      try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) { 
       CSVParser parser = new CSVParser(reader, CSV_FORMAT); 
       int count = 0; 
       for (CSVRecord record : parser) { 
         count++; 
        } 
       } 
       System.out.println("Processed " + count + " events"); 
      } 
     } catch (IOException ioe) { 
      System.out.println("IOException :: " + ioe); 
     } 
    } 
} 
+0

'data ::'行中的信息來自哪裏?我從來沒有見過S3每個事件發送多個記錄......實際上,我的代碼被設計爲停止並拋出異常,如果它發生的話,因爲這是意外的。我懷疑你可能沒有考慮到可能的容器重用。如果Lambda事件重用先前調用中的相同容器,那麼您可能擁有一個全局數據結構來保存舊事件嗎? –

+0

嗨邁克爾,如果你問在哪裏打印實際的線,那麼從函數內部打印這些線。它只是一個System.out.println。該文件本身由gdelt服務每15分鐘生成一次,然後將其下載到ec2實例,然後通過cli將其上載。我如何解釋lambda中的容器重用?從來沒有聽說過... – Brooks

+0

當我說「帳戶」的容器重用,我的意思是[請注意,這是一件事](https://aws.amazon.com/blogs/compute/container-reuse-in- lambda /)並相應地編碼。每次函數觸發時,它都會啓動一個運行Lambda函數的空閒進程......但它可能是新的,或者可能是上次運行該函數的*相同*進程,或者在此之前的某個時間被重用(只要因爲您尚未上傳新代碼)。如果你將記錄從S3推到一個全局數組上,而不是一個作用域到你的處理程序(例如),那麼當你迭代該數組時,它有時會*包含舊事件。 –

回答

3

這是代碼的情況下,俯瞰LAMBDA的container reuse的一個重要方面 - 在LAMBDA容器重用包括過程中重複使用。當一個函數在一個重用的容器中執行時,它也必然運行在之前使用過的同一個進程中。

S3的事件通知數據結構是這樣的,它可以包含每個事件的多個對象,但是我的實踐中,這從來沒有發生......但將事件數據推入全局結構意味着如果容器被重用,則稍後的函數調用將會看到舊的數據。

雖然這可以作爲緩存非常有用,但它對於代碼的設計必須具有重要意義 - 總是期待但不會假設您的流程可能從一次調用到未來繼續存在,隨後的調用和相應的代碼。

請注意,容器重用也意味着您需要清理任何臨時文件,如果有可能許多重複使用容器會導致空間耗盡。

請注意,重新部署您的功能代碼始終意味着舊的容器將被廢棄,不會在將來調用最新版本時重複使用。