2016-04-10 43 views
1

與我們有從csv,tsv等導入行的默認示例不同,我們有一個,所以我們可以從數據庫導入記錄並插入到德魯伊?有什麼想法嗎?德魯伊:Firehose從數據庫導入記錄

這是我在想什麼 -

"firehose": { 
    "type" : "database", 
     "datasource" : { 
       "connectURI" : "jdbc:mysql://localhost:3306/test", 
       "user" : "druid", 
       "password" : "xyz123" 
     }, 
     "query" : "select * from table" 
     "frequency" : "P1M" 
} 

我們都不可能擴展它獲得通過JNDI數據源和其他幾個連接。這種實施有什麼問題嗎?

+0

可能會比較簡單,從數據庫導出爲CSV文件,然後用通常的方法是什麼? – Nikem

+0

當然,但數據的大小是問題。 – jagamot

回答

0
 
How about this idea? It's custom firehose for jdbc ingestion. 
In this case, only supports one time query ingestion. 
https://github.com/sirpkt/druid/tree/jdbc_firehose/extensions-contrib/jdbc-firehose 
This is code snippet. Using DBI library try to get result set from existing database server. 
public Firehose connect(final MapInputRowParser parser) throws IOException, ParseException, IllegalArgumentException 
    { 
    if (columns != null) { 
     verifyParserSpec(parser.getParseSpec(), columns); 
    } 

    final Handle handle = new DBI(
     connectorConfig.getConnectURI(), 
     connectorConfig.getUser(), 
     connectorConfig.getPassword() 
    ).open(); 

    final String query = makeQuery(columns); 

    final ResultIterator<InputRow> rowIterator = handle 
     .createQuery(query) 
     .map(
      new ResultSetMapper<InputRow>() 
      { 
       List<String> queryColumns = (columns == null) ? Lists.<String>newArrayList(): columns; 

       @Override 
       public InputRow map(
        final int index, 
        final ResultSet r, 
        final StatementContext ctx 
      ) throws SQLException 
       { 
       try { 
        if (queryColumns.size() == 0) 
        { 
        ResultSetMetaData metadata = r.getMetaData(); 
        for (int idx = 1; idx <= metadata.getColumnCount(); idx++) 
        { 
         queryColumns.add(metadata.getColumnName(idx)); 
        } 
        Preconditions.checkArgument(queryColumns.size() > 0, 
         String.format("No column in table [%s]", table)); 
        verifyParserSpec(parser.getParseSpec(), queryColumns); 
        } 
        ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder(); 
        for (String column: queryColumns) { 
        builder.put(column, r.getObject(column)); 
        } 
        return parser.parse(builder.build()); 

       } catch(IllegalArgumentException e) { 
        throw new SQLException(e); 
       } 
       } 
      } 
     ).iterator(); 
相關問題