2016-07-28 66 views
0

我正在查詢使用Flink DataSet API的Oracle數據庫。爲此,我定製了Flink JDBCInputFormat以返回java.sql.Resultset。因爲我需要使用Flink運算符對結果集進行進一步的操作。Apache Flink JDBC InputFormat引發java.net.SocketException:套接字關閉

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); 
    environment.setParallelism(1); 
    @SuppressWarnings("unchecked") 
    DataSource<ResultSet> source 
      = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() 
        .setUsername("username") 
        .setPassword("password") 
        .setDrivername("driver_name") 
        .setDBUrl("jdbcUrl") 
        .setQuery("query") 
        .finish(),  
        new GenericTypeInfo<ResultSet>(ResultSet.class) 
      ); 
    source.print(); 

    environment.execute(); 

} 

以下是定製JDBCInputFormat

public class JDBCInputFormat extends RichInputFormat<ResultSet, InputSplit> implements ResultTypeQueryable { 

@Override 
public void open(InputSplit inputSplit) throws IOException { 
       Class.forName(drivername); 
        dbConn = DriverManager.getConnection(dbURL, username, password); 
       statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); 
       resultSet = statement.executeQuery(); 
} 

@Override 
public void close() throws IOException { 
      if(statement != null) { 
        statement.close(); 
       } 
       if(resultSet != null) 
        resultSet.close(); 
       if(dbConn != null) { 
        dbConn.close(); 
       } 
} 

@Override 
public boolean reachedEnd() throws IOException { 
     isLastRecord = resultSet.isLast(); 
    return isLastRecord; 
} 

@Override 
public ResultSet nextRecord(ResultSet row) throws IOException{ 
     if(!isLastRecord){    
      resultSet.next(); 
     } 
     return resultSet; 
} 

}

這適用於下面有限制查詢該行中讀取的: 選擇A,B,從XYZÇ其中rownum < = 10; 但是當我嘗試有大約百萬數據的獲取所有行,我得到下面的異常取隨機數行後:

java.sql.SQLRecoverableException: Io exception: Socket closed 
at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:101) 
at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:199) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:263) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:521) 
at oracle.jdbc.driver.T4CPreparedStatement.fetch(T4CPreparedStatement.java:1024) 
at oracle.jdbc.driver.OracleResultSetImpl.close_or_fetch_from_next(OracleResultSetImpl.java:314) 
at oracle.jdbc.driver.OracleResultSetImpl.next(OracleResultSetImpl.java:228) 
at oracle.jdbc.driver.ScrollableResultSet.cacheRowAt(ScrollableResultSet.java:1839) 
at oracle.jdbc.driver.ScrollableResultSet.isValidRow(ScrollableResultSet.java:1823) 
at oracle.jdbc.driver.ScrollableResultSet.isLast(ScrollableResultSet.java:349) 
at JDBCInputFormat.reachedEnd(JDBCInputFormat.java:98) 
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
at java.lang.Thread.run(Thread.java:745) 

產生的原因:java.net.SocketException異常:套接字關閉 在java.net.SocketOutputStream.socketWrite0(本地方法)

所以對於我的情況,我該如何解決這個問題?

回答

1

我不認爲有可能像定期記錄一樣運送ResultSet。這是一個內部維護與數據庫服務器的連接的有狀態對象。使用ResultSet作爲在Flink操作符之間傳輸的記錄意味着它可以被序列化,通過網絡發送到另一臺機器,反序列化,並交給另一個JVM進程中的另一個線程。這是行不通的。

根據連接的不同,ResultSet可能會保持在同一臺機器上的同一個線程中,這可能適用於您。如果您想從運營商內部查詢數據庫,則可以將該功能實現爲RichMapPartitionFunction。否則,我會讀取數據源中的ResultSet並轉發結果行。