2012-06-08 24 views
6

每個人都知道,豬都支持DBStorage, 但他們只是從豬支持負載結果到mysql這樣從MySQL讀取表中的數據,以豬

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...'); 

但請告訴我的方式來閱讀的一種方式從這樣的

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table'); 

這裏mysql表是我的代碼

public class DBLoader extends LoadFunc { 
    private final Log log = LogFactory.getLog(getClass()); 
    private ArrayList mProtoTuple = null; 
    private Connection con; 
    private String jdbcURL; 
    private String user; 
    private String pass; 
    private int batchSize; 
    private int count = 0; 
    private String query; 
    ResultSet result; 
    protected TupleFactory mTupleFactory = TupleFactory.getInstance(); 

    public DBLoader() { 
    } 

    public DBLoader(String driver, String jdbcURL, String user, String pass, 
      String query) { 

     try { 
      Class.forName(driver); 
     } catch (ClassNotFoundException e) { 
      log.error("can't load DB driver:" + driver, e); 
      throw new RuntimeException("Can't load DB Driver", e); 
     } 
     this.jdbcURL = jdbcURL; 
     this.user = user; 
     this.pass = pass; 
     this.query = query; 

    } 

    @Override 
    public InputFormat getInputFormat() throws IOException { 
     // TODO Auto-generated method stub 
     return new TextInputFormat(); 
    } 

    @Override 
    public Tuple getNext() throws IOException { 
     // TODO Auto-generated method stub 
     boolean next = false; 

     try { 
      next = result.next(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     if (!next) 
      return null; 
     int numColumns = 0; 
     // Get result set meta data 
     ResultSetMetaData rsmd; 
     try { 
      rsmd = result.getMetaData(); 
      numColumns = rsmd.getColumnCount(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     for (int i = 0; i < numColumns; i++) { 

      try { 
       Object field = result.getObject(i); 

       switch (DataType.findType(field)) { 
       case DataType.NULL: 

        mProtoTuple.add(null); 

        break; 

       case DataType.BOOLEAN: 
        mProtoTuple.add((Boolean) field); 

        break; 

       case DataType.INTEGER: 
        mProtoTuple.add((Integer) field); 

        break; 

       case DataType.LONG: 
        mProtoTuple.add((Long) field); 

        break; 

       case DataType.FLOAT: 
        mProtoTuple.add((Float) field); 

        break; 

       case DataType.DOUBLE: 
        mProtoTuple.add((Double) field); 

        break; 

       case DataType.BYTEARRAY: 
        byte[] b = ((DataByteArray) field).get(); 
        mProtoTuple.add(b); 

        break; 
       case DataType.CHARARRAY: 
        mProtoTuple.add((String) field); 

        break; 
       case DataType.BYTE: 
        mProtoTuple.add((Byte) field); 

        break; 

       case DataType.MAP: 
       case DataType.TUPLE: 
       case DataType.BAG: 
        throw new RuntimeException("Cannot store a non-flat tuple " 
          + "using DbStorage"); 

       default: 
        throw new RuntimeException("Unknown datatype " 
          + DataType.findType(field)); 

       } 

      } catch (Exception ee) { 
       throw new RuntimeException(ee); 
      } 
     } 

     Tuple t = mTupleFactory.newTuple(mProtoTuple); 
     mProtoTuple.clear(); 
     return t; 

    } 

    @Override 
    public void prepareToRead(RecordReader arg0, PigSplit arg1) 
      throws IOException { 

     con = null; 
     if (query == null) { 
      throw new IOException("SQL Insert command not specified"); 
     } 
     try { 
      if (user == null || pass == null) { 
       con = DriverManager.getConnection(jdbcURL); 
      } else { 
       con = DriverManager.getConnection(jdbcURL, user, pass); 
      } 
      con.setAutoCommit(false); 
      result = con.createStatement().executeQuery(query); 
     } catch (SQLException e) { 
      log.error("Unable to connect to JDBC @" + jdbcURL); 
      throw new IOException("JDBC Error", e); 
     } 
     count = 0; 
    } 

    @Override 
    public void setLocation(String location, Job job) throws IOException { 
     // TODO Auto-generated method stub 

     //TextInputFormat.setInputPaths(job, location); 

    } 

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{ 

     @Override 
     public RecordReader<NullWritable, NullWritable> createRecordReader(
       InputSplit arg0, TaskAttemptContext arg1) throws IOException, 
       InterruptedException { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public List<InputSplit> getSplits(JobContext arg0) throws IOException, 
       InterruptedException { 
      // TODO Auto-generated method stub 
      return null; 
     } 

    } 

} 

我嘗試多次寫UDF但不成功.....

回答

2

像你說的,DBStorage只支持將結果保存到數據庫。

要從MySQL加載數據,您可以查看名爲sqoop(將數據從數據庫複製到HDFS)的項目,或者執行mysql轉儲然後將文件複製到HDFS。這兩種方式都需要一些交互作用,不能直接在Pig內部使用。

第三個選項是研究編寫Pig LoadFunc(你說你試圖編寫一個UDF)。它不應該太困難,您需要傳遞與DBStorage相同的選項(驅動程序,連接憑證和要執行的SQL查詢),您也可以使用一些結果集元數據檢查來自動生成模式。

+0

嗨,謝謝。但正如我之前提到的,我只想從Pig內部直接使用。正如你所說,我從LoadFunc發佈我的代碼。當我從big運行我的代碼時,它總是拋出異常。 – phuongdo

+0

然後請添加您看到的異常 –

+0

@phuongdo您是否成功編寫Pig LoadFunc以從MySQL加載數據? – Shri