2017-07-09 120 views
0

我想用弗林克-JDBC從MySQL中獲取數據。 我看到的阿帕奇弗林克網站爲例弗林克JDBCInputFormat找不到方法「setRowTypeInfo」

// Read data from a relational database using the JDBC input format 
DataSet<Tuple2<String, Integer> dbData = 
    env.createInput(
     JDBCInputFormat.buildJDBCInputFormat() 
        .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") 
        .setDBUrl("jdbc:derby:memory:persons") 
        .setQuery("select name, age from persons") 
        .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) 
        .finish() 
    ); 

但是當我嘗試寫一個demo,我找不到'setRowTypeInfo'方法。 正是這樣

import org.apache.flink.api.common.typeinfo.BasicTypeInfo 
import org.apache.flink.api.java.ExecutionEnvironment 
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat 
import org.apache.flink.api.scala._ 

/** 
    * Created by lulijun on 17/7/7. 
    */ 
object FlinkJDBC { 


    def main(args:Array[String]): Unit = { 

    val env = ExecutionEnvironment.createLocalEnvironment() 

    val dbData = env.createInput(
     JDBCInputFormat.buildJDBCInputFormat 
     .setDrivername("com.mysql.jdbc.Driver") 
     .setDBUrl("XXX") 
     .setUsername("xxx") 
     .setPassword("XXX") 
     .setQuery("select name, age from persons") 
     .setRowTypeInfo(new Nothing(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) 
     .finish) 

    dbData.print() 

    env.execute() 
    } 

} 

的「setRowTypeInfo」方法始終是紅色的,這種想法提示 「不能解析符號setRowTypeInfo」

弗林克-JDBC的罐子版本我使用的是1.0.0。

<dependencies> 
    <!-- Use this dependency if you are using the DataSet API --> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-scala_2.10</artifactId> 
     <version>1.3.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-clients_2.10</artifactId> 
     <version>1.3.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-jdbc</artifactId> 
     <version>1.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>mysql</groupId> 
     <artifactId>mysql-connector-java</artifactId> 
     <version>5.1.36</version> 
    </dependency> 
</dependencies> 

我搜查了很多,大多數人使用的方法完全像官方文件,但在一個提到這個問題。 我懷疑我是否使用了錯誤版本的flink-jdbc,但我無法獲得有關使用flink-jdbc的正確方法的任何信息。 如果你知道這個問題,請教我。謝謝。

+0

我試着將flink-jdbc版本更改爲flink-jdbc_2.11/0.10.2,並將所有其他flink jar更改爲scala 2.11。但仍然找不到'setRowTypeInfo'方法。每個人如何正確訪問這個方法? – lulijun

回答

0

我將flink-jdbc版本從1.0.0更改爲1.3.0,並解決了問題。 但是,當我在maven websit https://mvnrepository.com/search?q=flink-jdbc上搜索flink-jdbc時,我無法在前幾頁得到正確的信息,這讓我認爲flink-jdbc的版本不需要與其他flink jar相匹配。 但事實是flink-jdbc/1.1.3使用了包api.table的類RowTypeInfo,而flink-jdbc/1.3.0使用了包api.java的類RowTypeInfo,它們之間有着密切的聯繫。 我們必須確保版本匹配。