2016-11-10 50 views
1

以下程序將連接到Oracle 11g並獲取記錄。如何在pipeline.apply()中給我編碼器的Nu​​llPointerException。JDBC從Oracle獲取Beam

我已將ojdbc14.jar添加到項目依賴項中。

public static void main(String[] args) { 

     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());  
     p.apply(JdbcIO.<KV<Integer, String>>read() 
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
          "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@hostdnsname:port/servicename") 
        .withUsername("uname") 
        .withPassword("pwd")) 
        .withQuery("select EMPID,NAME from EMPLOYEE1") 
        .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { 
        public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception { 
         return KV.of(resultSet.getInt(1), resultSet.getString(2)); 
        } 
        })); 
     p.run(); 

    } 

給出下面的錯誤。任何線索?

Exception in thread "main" java.lang.NullPointerException: coder 
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228) 
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:283) 
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:216) 
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:399) 
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:307) 
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47) 
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:158) 
    at org.apache.beam.examples.v030.JdbcUtil.main(JdbcUtil.java:21) 

回答

1

您好!

對不起,錯誤消息不是很有幫助,但實際上它是一個驗證步驟。我已經提交BEAM-959來改善這一點。

。你需要提供一個編碼器,如通過

.withCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())` 

我已經申請BEAM-960改善這種編碼器的自動化,就像我們在梁大部等地。

+0

我上面編輯了你的帖子,做了以上更改。它給出了以下錯誤。線程「main」中的異常org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:org.apache.beam.sdk.util.UserCodeException:java.lang.AbstractMethodError:oracle。 jdbc.driver.T4CConnection.isValid(I)Z – naga

+0

有什麼我失蹤? – naga

+0

@ Kenn,要解決問題,請告訴我您是否需要任何其他信息。 – naga

0

試試這個。

pipeline.apply((JdbcIO.<KV<Integer, String>>read().withCoder(KvCoder.of(VarIntCoder.of(),StringUtf8Coder.of())) 
       .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
         "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/deepakgoyal") 
        .withUsername("root") 
        .withPassword("root")) 
       .withQuery("select empid, name from employee") 

       .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { 
       public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception { 
        return KV.of(resultSet.getInt(1), resultSet.getString(2)); 
       } 
       }) 
      )) 

並且不要忘記在您的項目中添加mySql Connector jar。提前致謝。

+0

嘗試向提供的代碼添加一些上下文。 – Alexei

+0

道歉,但我不明白你到底想要什麼。 –