2017-06-22 51 views
2

我很努力地使用JdbcIO和Apache Beam 2.0(Java)在同一個項目中連接到Dataflow的Cloud SQL實例。從Dataflow作業連接到雲端SQL作業

,我發現了以下錯誤:

java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure 

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.) 
  • 根據文檔數據流服務帳戶*@dataflow-service-producer-prod.iam.gserviceaccount.com應該有訪問所有如果他擁有「編輯者」權限,則可以在同一個項目中找到資源。

  • 當我使用DirectRunner運行相同的Dataflow作業時,一切正常。

這是我使用的代碼:

private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true"; 

PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read() 
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL) 
    .withUsername(JDBC_USER).withPassword(JDBC_PW)) 
.withQuery(
    "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable") 
.withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of())) 
.withRowMapper(new JdbcIO.RowMapper < KV < String, Double >>() { 
    public KV < String, Double > mapRow(ResultSet resultSet) throws Exception { 
    return KV.of(resultSet.getString(1), resultSet.getDouble(2)); 
    } 
})); 

編輯:

使用其他數據流任務中梁的外部設置方法似乎很好地工作DataflowRunner告訴我認爲數據庫可能不是問題。

java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW); 

回答

1

我覺得這個方法可以更好地工作,請嘗試com.mysql.jdbc.GoogleDriver,並使用這裏列出的Maven依賴。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

相關問題: Where i find and download this jar file com.mysql.jdbc.GoogleDriver?

+0

嘿@亞歷克斯·阿馬託,不幸的是,似乎沒有與gcp數據流一起工作,因爲我得到一個「java.sql.SQLException:無法加載JDBC驅動程序類'com.mysql.jdbc.GoogleDriver'」錯誤,即使這兩個maven依賴關係都已添加。 – Jimmy

0

喜它的方式ü沒有it.Additionaly我刪除從數據庫配置方法withusername和密碼的方法和我的流水線配置工作對我來說看起來像下面

PCollection < KV < Double, Double >> exchangeRates = p.apply(JdbcIO. < KV < Double, Double >> read() 
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8") 
      ) 
    .withQuery(
     "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE") 
    .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of())) 
    .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >>() { 
     @Override 
     public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception { 
     LOG.info(resultSet.getDouble(1)+ "Came"); 
      return KV.of(resultSet.getDouble(1), resultSet.getDouble(2)); 
     } 
    })); 

希望這將有助於