2

此問題是this one的後續行爲。 我想使用Apache梁從谷歌扳手錶中讀取數據(然後做一些數據處理)。在apache光束中使用SpannerIO時出錯

package com.google.cloud.dataflow.examples; 
import java.io.IOException; 
import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.PipelineResult; 
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; 
import org.apache.beam.sdk.options.PipelineOptions; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.values.PCollection; 
import com.google.cloud.spanner.Struct; 

public class backup { 

    public static void main(String[] args) throws IOException { 
    PipelineOptions options = PipelineOptionsFactory.create(); 

    Pipeline p = Pipeline.create(options); 
    PCollection<Struct> rows = p.apply(
      SpannerIO.read() 
       .withInstanceId("my_instance") 
       .withDatabaseId("my_db") 
       .withQuery("SELECT t.table_name FROM information_schema.tables AS t") 
       ); 

    PipelineResult result = p.run(); 
    try { 
     result.waitUntilFinish(); 
    } catch (Exception exc) { 
     result.cancel(); 
    } 
    } 
} 

當我嘗試使用DirectRunner執行代碼時,我得到了 以下錯誤消息:

org.apache.beam.runners.direct我使用了Java SDK寫了下面的最低例子.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:

org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

或者使用DataflowRunner:

org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

[...] Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

[...] Caused by: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

在這兩種情況下,錯誤信息都很隱祕,而且我無法找到任何關於Google搜索錯誤的原因。我也無法使用SpannerIO模塊找到任何示例腳本。

此錯誤是由於我的代碼中的明顯錯誤,還是由於谷歌雲工具的安裝不當造成的?

+2

Argh,你很可能觸及依賴衝突https://issues.apache.org/jira/browse/BEAM-2837。它是固定的,但我們需要等待新版本的光束。您可以從源代碼自己構建光束二進制文件,或者在您的pom.xml中使用這個技巧https://gist.github.com/mairbek/0c770ff7b591e3db58936b0b9294215a –

+1

哦。謝謝 !我想我會嘗試修復。 –

回答

1

你需要指定專案編號:

SpannerIO.read() 
      .withProjectId("my_project") 
      .withInstanceId("my_instance") 
      .withDatabaseId("my_db") 

而且你需要設置憑據的扳手項目。由於SpannerIO的API不允許您設置任何自定義憑據,因此您必須使用環境變量GOOGLE_APPLICATION_CREDENTIALS設置全局應用程序憑據。

您還可以使用JDBC讀取(並寫入)Cloud Spanner。讀取是這樣完成的:

 PCollection<KV<String, Long>> words = p2.apply(JdbcIO.<KV<String, Long>> read() 
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("nl.topicus.jdbc.CloudSpannerDriver", 
        "jdbc:cloudspanner://localhost;Project=my-project-id;Instance=instance-id;Database=database;PvtKeyPath=C:\\Users\\MyUserName\\Documents\\CloudSpannerKeys\\cloudspanner-key.json")) 
      .withQuery("SELECT t.table_name FROM information_schema.tables AS t").withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())) 
      .withRowMapper(new JdbcIO.RowMapper<KV<String, Long>>() 
      { 
       private static final long serialVersionUID = 1L; 

       @Override 
       public KV<String, Long> mapRow(ResultSet resultSet) throws Exception 
       { 
        return KV.of(resultSet.getString(1), resultSet.getLong(2)); 
       } 
      })); 

此方法還允許您通過設置PvtKeyPath來使用自定義憑證。您也可以使用JDBC寫入Google Cloud Spanner。看看這裏的例子:http://www.googlecloudspanner.com/2017/10/google-cloud-spanner-with-apache-beam.html

+0

我確實已經忘記了「projectID」這一行,但添加它並沒有解決這個錯誤。 事實上,我使用的是Eclipse谷歌雲工具插件,並且已經登錄到我的谷歌賬戶。所以這應該照顧證書? 我可能不得不嘗試JDBC版本。 –

1

這個問題最有可能是由這裏描述的依賴性兼容性問題引起的:BEAM-2837。下面是在JIRA問題的意見,一個描述一個快速的解決方法:

<dependency> 
    <groupId>com.google.api.grpc</groupId> 
    <artifactId>grpc-google-common-protos</artifactId> 
    <version>0.1.9</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.beam</groupId> 
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> 
    <version>${beam.version}</version> 
    <exclusions> 
     <exclusion> 
      <groupId>com.google.api.grpc</groupId> 
      <artifactId>grpc-google-common-protos</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

明確定義所需的com.google.api.grpc依賴,從org.apache.beam排除版本。

+0

謝謝!老實說,我最終轉向使用python SDK,併爲讀寫器寫一個自定義的ParDo。 –

相關問題