0

當我從本地機器運行我的管道時,我可以更新駐留在雲Sql實例中的表。但是,當我將其移動到使用DataflowRunner運行時,同樣出現以下異常。如何在訪問Google sql實例的數據流中運行梁類?

從我的日食連接,我創建的數據源配置爲 .create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb")

與運行Dataflow運行器時相同,我更改爲 .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db")

  1. 我是否應該將實例的區域信息加前綴?

例外,當我運行此我得到的是如下:

Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:23:51.583Z: (840be37ab35d3d0d): Starting 2 workers in us-central1-f... 
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:23:51.634Z: (dabfae1dc9365d10): Executing operation JdbcIO.Read/Create.Values/Read(CreateSource)+JdbcIO.Read/ParDo(Read)+JdbcIO.Read/ParDo(Anonymous)+JdbcIO.Read/GroupByKey/Reify+JdbcIO.Read/GroupByKey/Write 
Jun 22, 2017 6:54:49 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:24:44.762Z: (21395b94f8bf7f61): Workers have started successfully. 

SEVERE: 2017-06-22T13:25:30.214Z: (3b988386f963503e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver' 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261) 
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55) 
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43) 
    at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver' 
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$M7MKjX9p.invokeSetup(Unknown Source) 
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65) 
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47) 
    at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100) 
    at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278) 
    ... 14 more 

任何有助於解決這個問題真的讚賞。這是我第一次嘗試將束流管道作爲數據流工作來運行。

PipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 

    ((DataflowPipelineOptions) options).setNumWorkers(2); 
    ((DataflowPipelineOptions)options).setProject("xxxxx"); 
    ((DataflowPipelineOptions)options).setStagingLocation("gs://xxxx/staging"); 
    ((DataflowPipelineOptions)options).setRunner(DataflowRunner.class); 
    ((DataflowPipelineOptions)options).setStreaming(false); 
    options.setTempLocation("gs://xxxx/tempbucket"); 
    options.setJobName("sqlpipeline"); 
PCollection<Account> collection = dataflowPipeline.apply(JdbcIO.<Account>read() 
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration 
        .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://project-id:testdb/db") 
        .withUsername("root").withPassword("root")) 
      .withQuery(
        "select account_id,account_parent,account_description,account_type,account_rollup,Custom_Members from account") 
      .withCoder(AvroCoder.of(Account.class)).withStatementPreparator(new JdbcIO.StatementPreparator() { 
       public void setParameters(PreparedStatement preparedStatement) throws Exception { 
        preparedStatement.setFetchSize(1); 
        preparedStatement.setFetchDirection(ResultSet.FETCH_FORWARD); 

       } 
      }).withRowMapper(new JdbcIO.RowMapper<Account>() { 
       public Account mapRow(ResultSet resultSet) throws Exception { 
        Account account = new Account(); 
        account.setAccount_id(resultSet.getInt("account_id")); 
        account.setAccount_parent(resultSet.getInt("account_parent")); 
        account.setAccount_description(resultSet.getString("account_description")); 
        account.setAccount_type(resultSet.getString("account_type")); 
        account.setAccount_rollup("account_rollup"); 
        account.setCustom_Members("Custom_Members"); 
        return account; 
       } 
      })); 
+0

感謝巴勃羅重新格式化。 (Y) – Balu

回答

1

你是否正確地拉入了com.google.cloud.sql/mysql-socket-factory maven dependency?看起來你沒有加載課程。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

+0

我使用的是以下版本:mysql-socket-factory - 1.0.2。這是我的代碼中的舊版本。但是,即使更新到1.0.2之後,它也會出現相同的錯誤。 「無法加載JDBC驅動程序類'com.mysql.jdbc.GoogleDriver'」 – Balu

+0

Hi Balu,是否可以檢查您的構建輸出以查看您是否擁有包含com.mysql.jdbc.GoogleDriver類的jar? 你也可以打印maven樹,看看是否有任何衝突 https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree。 html http://maven.apache.org/plugins/maven-dependency-plugin/tree-mojo.html –

0

您好我認爲這是更好地與「com.mysql.jdbc.Driver」繼續前進,因爲谷歌驅動程序支持的應用程序引擎的部署

所以其道理這是我的流水線配置看起來很像,它完美對我很好

PCollection < KV < Double, Double >> exchangeRates = p.apply(JdbcIO. < KV < Double, Double >> read() 
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8") 
      ) 
    .withQuery(
     "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE") 
    .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of())) 
    .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >>() { 
     @Override 
     public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception { 
     LOG.info(resultSet.getDouble(1)+ "Came"); 
      return KV.of(resultSet.getDouble(1), resultSet.getDouble(2)); 
     } 
})); 

希望這將有助於

相關問題