當我從本地機器運行我的管道時,我可以更新駐留在雲Sql實例中的表。但是,當我將其移動到使用DataflowRunner運行時,同樣出現以下異常。如何在訪問Google sql實例的數據流中運行梁類?
從我的日食連接,我創建的數據源配置爲 .create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb")
。
與運行Dataflow運行器時相同,我更改爲 .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db")
。
- 我是否應該將實例的區域信息加前綴?
例外,當我運行此我得到的是如下:
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.583Z: (840be37ab35d3d0d): Starting 2 workers in us-central1-f...
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.634Z: (dabfae1dc9365d10): Executing operation JdbcIO.Read/Create.Values/Read(CreateSource)+JdbcIO.Read/ParDo(Read)+JdbcIO.Read/ParDo(Anonymous)+JdbcIO.Read/GroupByKey/Reify+JdbcIO.Read/GroupByKey/Write
Jun 22, 2017 6:54:49 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:24:44.762Z: (21395b94f8bf7f61): Workers have started successfully.
SEVERE: 2017-06-22T13:25:30.214Z: (3b988386f963503e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$M7MKjX9p.invokeSetup(Unknown Source)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47)
at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278)
... 14 more
任何有助於解決這個問題真的讚賞。這是我第一次嘗試將束流管道作爲數據流工作來運行。
PipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
((DataflowPipelineOptions) options).setNumWorkers(2);
((DataflowPipelineOptions)options).setProject("xxxxx");
((DataflowPipelineOptions)options).setStagingLocation("gs://xxxx/staging");
((DataflowPipelineOptions)options).setRunner(DataflowRunner.class);
((DataflowPipelineOptions)options).setStreaming(false);
options.setTempLocation("gs://xxxx/tempbucket");
options.setJobName("sqlpipeline");
PCollection<Account> collection = dataflowPipeline.apply(JdbcIO.<Account>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://project-id:testdb/db")
.withUsername("root").withPassword("root"))
.withQuery(
"select account_id,account_parent,account_description,account_type,account_rollup,Custom_Members from account")
.withCoder(AvroCoder.of(Account.class)).withStatementPreparator(new JdbcIO.StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setFetchSize(1);
preparedStatement.setFetchDirection(ResultSet.FETCH_FORWARD);
}
}).withRowMapper(new JdbcIO.RowMapper<Account>() {
public Account mapRow(ResultSet resultSet) throws Exception {
Account account = new Account();
account.setAccount_id(resultSet.getInt("account_id"));
account.setAccount_parent(resultSet.getInt("account_parent"));
account.setAccount_description(resultSet.getString("account_description"));
account.setAccount_type(resultSet.getString("account_type"));
account.setAccount_rollup("account_rollup");
account.setCustom_Members("Custom_Members");
return account;
}
}));
感謝巴勃羅重新格式化。 (Y) – Balu