2012-06-26 34 views
9

我需要返回SQL SELECT查詢的結果爲InputStream參數通過網絡將結果發送另一個系統的Java函數。Java的SQL結果的InputStream

但是,InputStream必須是帶有自定義分隔符(即通常但不總是CSV)的String

雖然我可以很容易地創建一個函數來檢索結果,創建一個分隔String,最後轉換是StringInputStream,SQL結果往往會過於大的內存來處理。此外,在返回結果之前處理整個結果集將導致不必要的等待時間。

我怎樣才能返回InputStream遍歷SQL結果,並因爲它是從數據庫返回發送處理(分隔)的數據?

+0

你看過使用jdbc緩存行集嗎?這可能對你想要做的事情有幫助。 http://docs.oracle.com/javase/1.5.0/docs/api/javax/sql/rowset/CachedRowSet.html – ChadNC

+0

不,但那會怎樣幫助我?問題不在於斷開連接,而是在內存中導致結果。 –

+0

這就是緩存的行集。提供了一種更簡單的方法,通過網絡將查詢結果發送到其他設備,應用程序等。 – ChadNC

回答

8

過帳(未測試)的代碼段,這應該給你基本思路:

/** 
* Implementors of this interface should only convert current row to byte array and return it. 
* 
* @author yura 
*/ 
public interface RowToByteArrayConverter { 
    byte[] rowToByteArray(ResultSet resultSet); 
} 

public class ResultSetAsInputStream extends InputStream { 

    private final RowToByteArrayConverter converter; 
    private final PreparedStatement statement; 
    private final ResultSet resultSet; 

    private byte[] buffer; 
    private int position; 

    public ResultSetAsInputStream(final RowToByteArrayConverter converter, final Connection connection, final String sql, final Object... parameters) throws SQLException { 
     this.converter = converter; 
     statement = createStatement(connection, sql, parameters); 
     resultSet = statement.executeQuery(); 
    } 

    private static PreparedStatement createStatement(final Connection connection, final String sql, final Object[] parameters) { 
     // PreparedStatement should be created here from passed connection, sql and parameters 
     return null; 
    } 

    @Override 
    public int read() throws IOException { 
     try { 
      if(buffer == null) { 
       // first call of read method 
       if(!resultSet.next()) { 
        return -1; // no rows - empty input stream 
       } else { 
        buffer = converter.rowToByteArray(resultSet); 
        position = 0; 
        return buffer[position++] & (0xff); 
       } 
      } else { 
       // not first call of read method 
       if(position < buffer.length) { 
        // buffer already has some data in, which hasn't been read yet - returning it 
        return buffer[position++] & (0xff); 
       } else { 
        // all data from buffer was read - checking whether there is next row and re-filling buffer 
        if(!resultSet.next()) { 
         return -1; // the buffer was read to the end and there is no rows - end of input stream 
        } else { 
         // there is next row - converting it to byte array and re-filling buffer 
         buffer = converter.rowToByteArray(resultSet); 
         position = 0; 
         return buffer[position++] & (0xff); 
        } 
       } 
      } 
     } catch(final SQLException ex) { 
      throw new IOException(ex); 
     } 
    } 



    @Override 
    public void close() throws IOException { 
     try { 
      statement.close(); 
     } catch(final SQLException ex) { 
      throw new IOException(ex); 
     } 
    } 
} 

這是非常直觀的實現,它可以在以下幾個方面進行改進:

  • 代碼如果和else在讀取方法之間的重複可被移除 - 它被張貼只是爲了澄清
  • 而不是重新創建用於每行的字節數組緩衝液(new byte[]是昂貴的歌劇更復雜的邏輯可以實現使用字節數組緩衝區,該緩衝區僅被初始化一次,然後被重新填充。然後每個人都應該改變RowToByteArrayConverter.rowToByteArray方法的簽名int fillByteArrayFromRow(ResultSet rs, byte[] array)應返回填充的字節數,並填寫傳遞字節數組。

因爲字節數組包含符號字節它可以包含-1(實際上255作爲無符號字節是),並因此表明流的不正確結束,所以& (0xff)用於符號字節轉換成無符號字節整數值。有關詳細信息,請參閱How does Java convert int into byte?

也請注意,如果網絡傳輸速度較慢,這可能會保持打開的結果集 很長一段時間,因而存在數據庫問題。

希望這有助於...

2

我會改善@Yura建議,通過引入以下答案:與一個ByteArrayOutputStream,以方便數據寫入的字節數組初始化
使用DataOutputStream聯合,在RowToByteArrayConverter的實現中。
事實上,我建議有轉換器的層次結構,所有的人都延長了同一抽象類(這是我的想法的代碼片段 - 可能不會從第一次編譯)

public abstract class RowToByteArrayConverter { 
    public byte[] rowToByteArray(ResultSet resultSet) { 
     parseResultSet(dataOutputStream, resultSet); 
     return byteArrayOutputSteam.toByteArray(); 
    } 

    public RowToByteArrayConverter() { 
    dataOutputStream = new DataOutputStream(byteArrayOutputStream); 
    } 

    protected DataOutputStream dataOutputStream; 
    protected ByteArrayOutputStream byteArrayOutputStream; 

    protected abstract void parseResultSet(DataOutputStream dataOutputStresm, ResultSet rs); 
} 

現在,你寫代碼,獲取作爲字符串從創紀錄的一列「名」的名稱 - 可以通過簡單地重寫parseResultSet方法,
例如重寫此類。並在DataOputputStream上執行writeUTF8。

0

上述答案提供給一個有限大小的StringBuilder的問題的一個有用的解決方案被超過。它們也具有記憶效率。但是,我的測試表明,它們比只是將數據寫入一個StringBuilder,並呼籲

新ByteArrayInputStream的(data.getBytes( 「UTF-8」))

獲得一個InputStream慢。

我發現要遠遠更好的性能是通過使用一個分區函數,然後使用多線程每個切片傳入數據:

  1. 查詢的源數據庫中的數據的一個子集
  2. 將數據寫入目標

這也避免了總數據可能超過字符串緩衝區的最大大小的問題。

比如我有一個在SQL Server表名爲「RecordDate」列6米記錄。 Recorddate中的值在2013年和2016年之間有所不同。因此,我將每個線程分別配置爲分別請求2013,14,15,16的數據。然後,每個線程將轉碼後的數據寫入到StringBuilder中,並通過使用getBytes()轉換爲InputStream將每個批量加載到目標。

這導致了2x加速。

爲什麼?因爲源數據庫和目標數據庫可以處理多個併發請求,所以總體工作負載分散在三個進程中的多個線程:源數據庫,轉碼器,目標數據庫。