2013-03-22 96 views
0

我正在嘗試使用map-reduce作業連接到mysql數據庫的以下代碼。我正面臨下面發佈的以下錯誤。我把檢查點在我的代碼 這表明作業,直到作業的部分是實際運行的正常運行,後來的作業失敗...使用mapreduce作業連接到MySql數據庫時出錯

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.util.Iterator; 
import java.util.List; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.filecache.DistributedCache; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.SequenceFileOutputFormat; 
import org.apache.hadoop.mapred.TextOutputFormat; 
import org.apache.hadoop.mapred.lib.db.DBConfiguration; 
//import org.apache.hadoop.mapred.lib.db.DBInputFormat; 
import org.apache.hadoop.mapred.lib.db.DBInputFormat; 
import org.apache.hadoop.mapred.lib.db.DBOutputFormat; 
import org.apache.hadoop.mapred.lib.db.DBWritable; 


public class TweetWordCount { 


    public static class TweetWordCountMapper extends MapReduceBase implements 
      Mapper<LongWritable, GetTweets, Text, IntWritable> { 
     private final static IntWritable intTwordsCount = new IntWritable(1); 
     private Text strTwoken = new Text(); 

     public void map(LongWritable key, GetTweets value, 
       OutputCollector<Text, IntWritable> output, Reporter reporter) 
       throws IOException { 
      System.out.println("checkpoint4"); 
      GetTweets tweets = new GetTweets(); 
      tweets.strTweet = value.strTweet; 
      //TwitterTokenizer twokenizer = new TwitterTokenizer(); 
      //List<String> twokens = twokenizer.twokenize(value.strTweet.toString()); 

      output.collect(new Text(value.strTweet.toString()), intTwordsCount); 
      System.out.println("checkpoint5"); 

     } 

    } 


    public static class TweetWordCountReducer extends MapReduceBase implements 
      Reducer<Text, IntWritable, Text, IntWritable> { 
     public void reduce(Text key, Iterator<IntWritable> values, 
       OutputCollector<Text, IntWritable> output, Reporter reporter) 
       throws IOException { 
      System.out.println("checkpoint6"); 
      int intTwokenCount = 0; 
      while (values.hasNext()) { 
       intTwokenCount += values.next().get(); 
      } 
      output.collect(key, new IntWritable(intTwokenCount)); 
      System.out.println("checkpoint6"); 
     } 
    } 


    public static void main(String[] args) throws Exception { 

     System.out.println("checkpoint1"); 
      JobConf twokenJobConf = new JobConf(new Configuration(),TweetWordCount.class); 
      //JobConf twokenJobConf = new JobConf(TweetWordCount.class); 
      twokenJobConf.setJobName("twoken_count"); 

      twokenJobConf.setInputFormat(DBInputFormat.class); //Set input format here 
      twokenJobConf.setOutputFormat(TextOutputFormat.class);// Sets the output format 

      Object out = new Path("twokens"); 

      twokenJobConf.setMapperClass(TweetWordCountMapper.class); 
      twokenJobConf.setCombinerClass(TweetWordCountReducer.class); 
      twokenJobConf.setReducerClass(TweetWordCountReducer.class); 

      twokenJobConf.setOutputKeyClass(Text.class); 
      twokenJobConf.setOutputValueClass(IntWritable.class); 

      DBConfiguration.configureDB(twokenJobConf, "com.mysql.jdbc.Driver", 
        "jdbc:mysql://localhost/test", "root", "root"); //Specifies the DB configuration 

      String[] fields = {"Tweet"}; //Specifies the Fields to be fetched from DB 
      DBInputFormat.setInput(twokenJobConf, GetTweets.class, "NewGamil", 
        null /* conditions */, "Tweet", fields); // Specifies the DB table and fields 

      //SequenceFileOutputFormat.setOutputPath(twokenJobConf, (Path) out); 
      FileOutputFormat.setOutputPath(twokenJobConf, (Path) out); 
      System.out.println("checkpoint2"); 
      JobClient.runJob(twokenJobConf); 
      System.out.println("checkpoint3"); 

    } 


    public static class GetTweets implements Writable, DBWritable { 
     String strTweet; 

     public GetTweets() { 

     } 

     public void readFields(DataInput in) throws IOException { 
      System.out.println("checkpoint 2a"); 
      this.strTweet = Text.readString(in); 
     } 

     public void readFields(ResultSet resultSet) throws SQLException { 
      System.out.println("checkpoint 3a"); 
      // this.id = resultSet.getLong(1); 
      this.strTweet = resultSet.getString(1); 
     } 

     public void write(DataOutput out) throws IOException { 

     } 

     public void write(PreparedStatement stmt) throws SQLException { 

     } 

    } 


} 


[email protected]:~$ hadoop jar Twit.jar 
Warning: $HADOOP_HOME is deprecated. 

checkpoint1 
checkpoint2 
13/03/22 17:16:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
13/03/22 17:16:12 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/home/rv/hadoopfiles/mapred/staging/rv/.staging/job_201303221600_0008 
Exception in thread "main" java.lang.RuntimeException: Error in configuring object 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93) 
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117) 
    at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575) 
    at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:981) 
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:973) 
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:172) 
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:889) 
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:842) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:416) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:842) 
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:816) 
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1253) 
    at TweetWordCount.main(TweetWordCount.java:107) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:616) 
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156) 
Caused by: java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:616) 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88) 
    ... 20 more 
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver 
    at org.apache.hadoop.mapred.lib.db.DBInputFormat.configure(DBInputFormat.java:271) 
    ... 25 more 
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:217) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:321) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:266) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:188) 
    at org.apache.hadoop.mapred.lib.db.DBConfiguration.getConnection(DBConfiguration.java:123) 
    at org.apache.hadoop.mapred.lib.db.DBInputFormat.configure(DBInputFormat.java:266) 
    ... 25 more 
+0

你有' '$ HADOOP_HOME/lib'中的mysql-connector-java-xxxx-bin.jar? – 2013-03-22 13:36:02

+0

它無法找到類('由:java.lang.ClassNotFoundException:com.mysql.jdbc.Driver')。當您啓動作業時,可以使用-libjars 選項啓動它。這可能會起作用。 – mohit6up 2013-03-24 22:11:33

回答

1

不知道你的應用程序的範圍是什麼(學習,開發等),但我會建議使用Sqoop與關係數據庫(如MySQL)進行交互。

+0

是的,我的$ HADOOP_HOME/lib中有mysql連接器。 – 2013-03-22 14:34:21