3

我正在使用DataflowPipelineRunner創建數據流作業。我嘗試了以下場景。從Google App Engine應用程序運行Google Dataflow管道?

  1. 沒有與N1-HIGHMEM-2

在上述所有的情況下指定任何machineType

  • 以G1小機器
  • ,輸入是從GCS一個文件,該文件是非常小的文件( KB大小),並輸出Big Query表。

    我得到了內存不足錯誤在所有的情況下

    我的編譯後的代碼大小爲94MB。我只嘗試字數統計的例子,它沒有讀取任何輸入(它在作業開始之前失敗)。請幫我理解爲什麼我得到這個錯誤。

    注:我正在使用appengine來開始工作。

    注:相同的代碼與測試工作versoin 0.4.150414

    編輯1

    按在回答嘗試以下的建議,

    1. 交換從自動縮放基本縮放
    2. 二手機器類型B2提供256MB存儲器

    這些配置完成後,Java堆內存問題就解決了。但它試圖將一個罐子上傳到超過10Mb的錯誤位置,因此失敗。

    它記錄了以下異常

    com.google.api.client.http.HttpRequest execute: exception thrown while executing request 
    com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit. 
    at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157) 
    at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45) 
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543) 
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422) 
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275) 
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36) 
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94) 
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965) 
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545) 
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562) 
    at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419) 
    at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) 
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:260) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605) 
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146) 
    at java.lang.Thread.run(Thread.java:745) 
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195) 
    

    我試着直接上傳的jar文件 - 的AppEngine-API-1.0-SDK-1.9.20.jar,它仍然試圖上傳這個罐子appengine- api-L4wtoWwoElWmstI1Ia93cg.jar。 我不知道它是什麼罐子。任何想法這個罐子是讚賞。

    請幫我解決這個問題。

  • +0

    的幾個問題 - 什麼SDK您使用的版本,你能提供一個作業ID? –

    +1

    你能分享你得到的例外嗎?您是否在本地(在appengine上)看到OOM,或者在管道啓動後是否看到OOM?如果OOM發生在appengine上,那麼你可能會碰到http://stackoverflow.com/questions/33647161/pipeline-submission-from-app-engine –

    +0

    我將appengine機器類型更改爲F2,我沒有得到OOM錯誤現在。但它試圖展示超過10mm的文件,並且它可以恢復上傳。上傳花費了大量的時間,並且我得到了超出執行時間限制的異常。對此有任何想法。 –

    回答

    4

    簡而言之,如果您在Managed VM上使用AppEngine,則不會遇到AppEngine沙箱限制(使用F1 or B1 instance class時的OOM,執行時間限制問題,列入白名單的JRE類)。如果您真的想在App Engine沙箱中運行,那麼您對Dataflow SDK的使用最符合AppEngine沙箱的限制。下面我解釋一些常見問題以及人們爲符合AppEngine沙箱限制所做的工作。

    Dataflow SDK需要一個AppEngine實例類,它具有足夠的內存來執行用戶應用程序來構建管道,暫存任何資源並將作業描述發送到Dataflow服務。通常我們看到用戶需要使用內存超過128mb的instance class才能看到OOM錯誤。

    如果您的應用程序所需的資源已經暫存,通常構建管道並將其提交給Dataflow服務通常需要不到幾秒鐘的時間。將JAR和任何其他資源上傳到GCS可能需要60秒以上的時間。這可以通過預先將您的JAR預先升級到GCS來手動解決(數據流SDK將在檢測到它們已經存在時再跳過它們)或使用task queue來獲得10分鐘的限制(請注意,對於大型應用程序,10分鐘可能不足以將您的所有資源分配)。

    最後,AppEngine上沙箱環境中,你和你的依賴僅限於使用JRE中唯一whitelisted類,否則你會得到這樣一個例外:

    java.lang.SecurityException: 
        java.lang.IllegalAccessException: YYY is not allowed on ZZZ 
        ... 
    

    編輯1

    我們在類路徑上執行jar包內容的哈希,並使用修改後的文件名將它們上傳到GCS。 AppEngine使用自己的JAR運行沙盒環境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar參考appengine-api.jar這是一個沙盒環境添加的jar。您可以從我們的PackageUtil#getUniqueContentName(...)中看到,我們只需在之前追加- $ HASH .jar

    我們正在努力解決,爲什麼你看到的RequestPayloadToLarge excepton,它是目前建議您設置filesToStage選項,並篩選出執行你的數據流不需要罐子得到解決,你所面臨的問題。您可以看到我們如何使用DataflowPipelineRunner#detectClassPathResourcesToStage(...)構建文件。

    +0

    謝謝你,我試過你的建議,Java堆大小錯誤已解決。但面臨着文件上傳錯誤的問題。我在編輯中解釋過它。請看看它。 –

    +0

    謝謝你Lukasz。我濾除了appengine-api罐子,它工作。如果RequestPayloadToLarge異常已修復,那將會很好。否則,如果任何需要的jar超過10 mb,用戶將無法使用appengine啓動管道。 –

    +0

    有沒有一種簡單的方法來過濾個別.jars,而無需像上面提到的那樣列出所有資源? –

    1

    我有10MB的限制同樣的問題。我所做的是過濾掉大於該限制的JAR文件(而不是特定文件),然後將DataflowPipelineOptions中的重命名文件設置爲setFilesToStage

    所以我剛剛從Dataflow SDK複製的方法detectClassPathResourcesToStage,改變它悅目:

    private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB 
    
    protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { 
        if (!(classLoader instanceof URLClassLoader)) { 
         String message = String.format("Unable to use ClassLoader to detect classpath elements. " 
           + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); 
         throw new IllegalArgumentException(message); 
        } 
    
        List<String> files = new ArrayList<>(); 
        for (URL url : ((URLClassLoader) classLoader).getURLs()) { 
         try { 
          File file = new File(url.toURI()); 
          if (file.length() < FILE_BYTES_THRESHOLD) { 
           files.add(file.getAbsolutePath()); 
          } 
         } catch (IllegalArgumentException | URISyntaxException e) { 
          String message = String.format("Unable to convert url (%s) to file.", url); 
          throw new IllegalArgumentException(message, e); 
         } 
        } 
        return files; 
    } 
    

    然後當我創建了DataflowPipelineOptions

    DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); 
    ... 
    dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader())); 
    
    +0

    這有助於突出顯示對我來說至關重要的一點:10MiB限制是單個的.jar文件,而不是所有.jar的總大小。 – dbort

    1

    這裏是Helder版本的10MB篩選solution,它將適應DataflowPipelineOptions的默認文件分段行爲,即使它c會在未來的SDK版本中出現問題。

    而不是複製邏輯,它會將DataflowPipelineOptions的一次性副本傳遞給DataflowPipelineRunner以查看它將要執行的文件,然後刪除任何太大的文件。

    請注意,此代碼假定您已定義一個名爲MyOptions的定製PipelineOptions類以及一個名爲loggerjava.util.Logger字段。

    // The largest file size that can be staged to the dataflow service. 
    private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024; 
    
    /** 
    * Returns the list of .jar/etc files to stage based on the 
    * Options, filtering out any files that are too large for 
    * DataflowPipelineRunner. 
    * 
    * <p>If this accidentally filters out a necessary file, it should 
    * be obvious when the pipeline fails with a runtime link error. 
    */ 
    private static ImmutableList<String> getFilesToStage(MyOptions options) { 
        // Construct a throw-away runner with a copy of the Options to see 
        // which files it would have wanted to stage. This could be an 
        // explicitly-specified list of files from the MyOptions param, or 
        // the default list of files determined by DataflowPipelineRunner. 
        List<String> baseFiles; 
        { 
        DataflowPipelineOptions tmpOptions = 
         options.cloneAs(DataflowPipelineOptions.class); 
        // Ignore the result; we only care about how fromOptions() 
        // modifies its parameter. 
        DataflowPipelineRunner.fromOptions(tmpOptions); 
        baseFiles = tmpOptions.getFilesToStage(); 
        // Some value should have been set. 
        Preconditions.checkNotNull(baseFiles); 
        } 
        // Filter out any files that are too large to stage. 
        ImmutableList.Builder<String> filteredFiles = ImmutableList.builder(); 
        for (String file : baseFiles) { 
        long size = new File(file).length(); 
        if (size < MAX_STAGED_FILE_SIZE_BYTES) { 
         filteredFiles.add(file); 
        } else { 
         logger.info("Not staging large file " + file + ": length " + size 
          + " >= max length " + MAX_STAGED_FILE_SIZE_BYTES); 
        } 
        } 
        return filteredFiles.build(); 
    } 
    
    /** Runs the processing pipeline with given options. */ 
    public void runPipeline(MyOptions options) 
        throws IOException, InterruptedException { 
        // DataflowPipelineRunner can't stage large files; 
        // remove any from the list. 
        DataflowPipelineOptions dpOpts = 
         options.as(DataflowPipelineOptions.class); 
        dpOpts.setFilesToStage(getFilesToStage(options)); 
    
        // Run the pipeline as usual using "options". 
        // ... 
    } 
    
    相關問題