2012-12-13 37 views
0

我想在Hadoop過程中將MySQL設置爲輸入。如何在Hadoop中使用DBInputFormat類 - 版本1.0.3中的MySQL連接?通過來自hadoop-1.0.3/docs/api /的JobConf配置作業不起作用。Hadoop - Mysql新的API連接

// Create a new JobConf 
JobConf job = new JobConf(new Configuration(), MyJob.class); 

// Specify various job-specific parameters  
job.setJobName("myjob"); 

FileInputFormat.setInputPaths(job, new Path("in")); 
FileOutputFormat.setOutputPath(job, new Path("out")); 

job.setMapperClass(MyJob.MyMapper.class); 
job.setCombinerClass(MyJob.MyReducer.class); 
job.setReducerClass(MyJob.MyReducer.class); 

job.setInputFormat(SequenceFileInputFormat.class); 
job.setOutputFormat(SequenceFileOutputFormat.class); 
+0

很可能它不起作用,因爲你的代碼與'DBInputFormat'沒有任何關係。 –

回答

0

你需要做類似下面的(例如假設典型的職員表):

JobConf conf = new JobConf(getConf(), MyDriver.class); 
    conf.setInputFormat(DBInputFormat.class); 
    DBConfiguration.configureDB(conf, 「com.mysql.jdbc.Driver」, 「jdbc:mysql://localhost/mydatabase」); String [] fields = { 「employee_id」, "name" }; 
    DBInputFormat.setInput(conf, MyRecord.class, 「employees」, null /* conditions */, 「employee_id」, fields); 
    ... 
    // other necessary configuration 
    JobClient.runJob(conf); 

configureDB()setInput()調用配置DBInputFormat。第一個調用指定要使用的JDBC驅動程序實現以及要連接的數據庫。第二個調用指定要從數據庫加載的數據。 MyRecord類是將數據讀入到Java中的類,「employees」是要讀取的表的名稱。 「employee_id」參數指定表的主鍵,用於排序結果。下面的「InputFormat的限制」部分解釋了爲什麼這是必要的。最後,fields數組列出要讀取的表的列。 setInput()的重載定義允許您指定任意SQL查詢來讀取,而不是。

調用configureDB()setInput()後,你應該配置你的工作照常休息,設置映射和減速機類,指定任何其他數據源來自(例如,在HDFS數據集)等具體工作參數讀取。

您需要創建自己的實現的Writable - 類似如下(考慮ID,名稱和表中的字段):

class MyRecord implements Writable, DBWritable { 
    long id; 
    String name; 

    public void readFields(DataInput in) throws IOException { 
     this.id = in.readLong(); 
     this.name = Text.readString(in); 
     } 

    public void readFields(ResultSet resultSet) throws SQLException { 
     this.id = resultSet.getLong(1); 
     this.name = resultSet.getString(2); } 

    public void write(DataOutput out) throws IOException { 
     out.writeLong(this.id); 
     Text.writeString(out, this.name); } 

    public void write(PreparedStatement stmt) throws SQLException { 
     stmt.setLong(1, this.id); 
     stmt.setString(2, this.name); } 
    } 

映射器然後接收您的DBWritable實現作爲它的輸入值的一個實例。輸入密鑰是數據庫提供的行ID,你很可能會放棄這個價值。

public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> { 
public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { 
// Use val.id, val.name here 
output.collect(new LongWritable(val.id), new Text(val.name)); 
} 
} 

更多:閱讀以下鏈接(我的回答的實際來源):http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/

+0

org.apache.hadoop.mapreduce.lib.db.DBInputFormat org.apache.hadoop.mapred.lib.db.DBInputFormat 根據您使用的API嘗試使用其中之一。 – Amar

+0

Thnak你。錯誤是我已經將這兩個API一起使用了...... –

+0

它是否適合你? – Amar

0

看一看this崗位。它顯示瞭如何將數據從Map Reduce匯入MySQL數據庫。

+0

在新的API中我找不到正確的配置主... –

+0

我發現了錯誤,我將它作爲下面的評論回覆...謝謝 –