2
我需要將map-reduce程序的輸出存儲到數據庫中,那麼有什麼辦法嗎?將Apache Hadoop數據輸出存儲到Mysql數據庫
如果是這樣,是否有可能根據需要將輸出存儲到多列&表中?
請給我建議一些解決方案。
謝謝。
我需要將map-reduce程序的輸出存儲到數據庫中,那麼有什麼辦法嗎?將Apache Hadoop數據輸出存儲到Mysql數據庫
如果是這樣,是否有可能根據需要將輸出存儲到多列&表中?
請給我建議一些解決方案。
謝謝。
偉大的例子顯示on this blog,我嘗試過了,它會真的很好。我引用了代碼中最重要的部分。
首先,您必須創建一個表示您想要存儲的數據的類。這個類必須實現DBWritable接口:
public class DBOutputWritable implements Writable, DBWritable
{
private String name;
private int count;
public DBOutputWritable(String name, int count) {
this.name = name;
this.count = count;
}
public void readFields(DataInput in) throws IOException { }
public void readFields(ResultSet rs) throws SQLException {
name = rs.getString(1);
count = rs.getInt(2);
}
public void write(DataOutput out) throws IOException { }
public void write(PreparedStatement ps) throws SQLException {
ps.setString(1, name);
ps.setInt(2, count);
}
}
在減速創建先前定義的類的對象:
public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
try {
ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get());
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
最後,你必須配置你的數據庫連接(不要忘記添加你的數據庫連接器類路徑)並註冊您的映射器和reducer的輸入/輸出數據類型。
public class Main
{
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf,
"com.mysql.jdbc.Driver", // driver class
"jdbc:mysql://localhost:3306/testDb", // db url
"user", // username
"password"); //password
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(Map.class); // your mapper - not shown in this example
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example
job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example
job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT
job.setOutputValueClass(NullWritable.class); // reducer's VALUEOUT
job.setInputFormatClass(...);
job.setOutputFormatClass(DBOutputFormat.class);
DBInputFormat.setInput(...);
DBOutputFormat.setOutput(
job,
"output", // output table name
new String[] { "name", "count" } //table columns
);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
檢查了這一點:http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/具體的寫作結果返回到數據庫「一節 – Amar
可以使用['Sqoop'](http://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html)與Thak you Amar進行數據傳輸以及來回 –
。 – HarshaKP