你需要做類似下面的(例如假設典型的職員表):
JobConf conf = new JobConf(getConf(), MyDriver.class);
conf.setInputFormat(DBInputFormat.class);
DBConfiguration.configureDB(conf, 「com.mysql.jdbc.Driver」, 「jdbc:mysql://localhost/mydatabase」); String [] fields = { 「employee_id」, "name" };
DBInputFormat.setInput(conf, MyRecord.class, 「employees」, null /* conditions */, 「employee_id」, fields);
...
// other necessary configuration
JobClient.runJob(conf);
的configureDB()
和setInput()
調用配置DBInputFormat
。第一個調用指定要使用的JDBC驅動程序實現以及要連接的數據庫。第二個調用指定要從數據庫加載的數據。 MyRecord類是將數據讀入到Java中的類,「employees」是要讀取的表的名稱。 「employee_id」參數指定表的主鍵,用於排序結果。下面的「InputFormat的限制」部分解釋了爲什麼這是必要的。最後,fields數組列出要讀取的表的列。 setInput()
的重載定義允許您指定任意SQL查詢來讀取,而不是。
調用configureDB()
和setInput()
後,你應該配置你的工作照常休息,設置映射和減速機類,指定任何其他數據源來自(例如,在HDFS數據集)等具體工作參數讀取。
您需要創建自己的實現的Writable
- 類似如下(考慮ID,名稱和表中的字段):
class MyRecord implements Writable, DBWritable {
long id;
String name;
public void readFields(DataInput in) throws IOException {
this.id = in.readLong();
this.name = Text.readString(in);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.id = resultSet.getLong(1);
this.name = resultSet.getString(2); }
public void write(DataOutput out) throws IOException {
out.writeLong(this.id);
Text.writeString(out, this.name); }
public void write(PreparedStatement stmt) throws SQLException {
stmt.setLong(1, this.id);
stmt.setString(2, this.name); }
}
映射器然後接收您的DBWritable實現作爲它的輸入值的一個實例。輸入密鑰是數據庫提供的行ID,你很可能會放棄這個價值。
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> {
public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
// Use val.id, val.name here
output.collect(new LongWritable(val.id), new Text(val.name));
}
}
更多:閱讀以下鏈接(我的回答的實際來源):http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/
很可能它不起作用,因爲你的代碼與'DBInputFormat'沒有任何關係。 –