2013-08-21 109 views
2

我需要將map-reduce程序的輸出存儲到數據庫中,那麼有什麼辦法嗎?將Apache Hadoop數據輸出存儲到Mysql數據庫

如果是這樣,是否有可能根據需要將輸出存儲到多列&表中?

請給我建議一些解決方案。

謝謝。

+0

檢查了這一點:http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/具體的寫作結果返回到數據庫「一節 – Amar

+0

可以使用['Sqoop'](http://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html)與Thak you Amar進行數據傳輸以及來回 –

+0

。 – HarshaKP

回答

4

偉大的例子顯示on this blog,我嘗試過了,它會真的很好。我引用了代碼中最重要的部分。

首先,您必須創建一個表示您想要存儲的數據的類。這個類必須實現DBWritable接口:

public class DBOutputWritable implements Writable, DBWritable 
{ 
    private String name; 
    private int count; 

    public DBOutputWritable(String name, int count) { 
    this.name = name; 
    this.count = count; 
    } 

    public void readFields(DataInput in) throws IOException { } 

    public void readFields(ResultSet rs) throws SQLException { 
    name = rs.getString(1); 
    count = rs.getInt(2); 
    } 

    public void write(DataOutput out) throws IOException { } 

    public void write(PreparedStatement ps) throws SQLException { 
    ps.setString(1, name); 
    ps.setInt(2, count); 
    } 
} 

在減速創建先前定義的類的對象:

public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> { 

    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) { 
    int sum = 0; 

    for(IntWritable value : values) { 
     sum += value.get(); 
    } 

    try { 
     ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get()); 
    } catch(IOException e) { 
     e.printStackTrace(); 
    } catch(InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 
} 

最後,你必須配置你的數據庫連接(不要忘記添加你的數據庫連接器類路徑)並註冊您的映射器和reducer的輸入/輸出數據類型。

public class Main 
{ 
    public static void main(String[] args) throws Exception 
    { 
    Configuration conf = new Configuration(); 
    DBConfiguration.configureDB(conf, 
    "com.mysql.jdbc.Driver", // driver class 
    "jdbc:mysql://localhost:3306/testDb", // db url 
    "user", // username 
    "password"); //password 

    Job job = new Job(conf); 
    job.setJarByClass(Main.class); 
    job.setMapperClass(Map.class); // your mapper - not shown in this example 
    job.setReducerClass(Reduce.class); 
    job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example 
    job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example 
    job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT 
    job.setOutputValueClass(NullWritable.class); // reducer's VALUEOUT 
    job.setInputFormatClass(...); 
    job.setOutputFormatClass(DBOutputFormat.class); 

    DBInputFormat.setInput(...); 

    DBOutputFormat.setOutput(
    job, 
    "output", // output table name 
    new String[] { "name", "count" } //table columns 
    ); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 
相關問題