2016-06-14 37 views
0

我有很多記錄存儲在我的MongoDB中。看起來就像這樣:Apache Hadoop字數錯誤

{ 
"_id": xxxx, 
    "url":"www.myurl.com/xxxxx", 
    "summary":"THIS IS DOCUMENT IN THE $url" 
} 

我想要的結果是什麼看起來就像這樣:

{ 
    "_id": word_in_summary, 
    "results":[ 
     { 
      "url": "the corresponding url that word appears in summary", 
      "count": "the total count of the word in the summary" 
     }, 
     { .... } 
    ] 
} 

例如:

{ 
    "_id" : ObjectId("574ec02d8bee4f03d9174c11"), 
    "url" : "www.example_1.com/xxxx", 
    "summary" : "hello good, good" 
} 

{ 
    "_id" : ObjectId("574ec02d8bee4f03d9174c12"), 
    "url" : "www.example_2.com/xxxx", 
    "summary" : "good" 
} 

所以,結果是:

{ 
    "_id": "hello", 
    "results":[ 
     "url": "www.example_1.com/xxxx", 
     "count": 1 
    ] 
} 

{ 
    "_id": "good", 
    "results":[ 
     { 
      "url": "www.example_1.com/xxxx", 
      "count": 2 
     }, 
     { 
      "url": "www.example_2.com/xxxx", 
      "count": 1 
     } 
    ] 
} 

我是Java和Hadoop的新手。我的代碼來處理的數據是:

import java.util.*; 
import java.io.*; 

import org.bson.*; 

import com.mongodb.hadoop.MongoInputFormat; 
import com.mongodb.hadoop.MongoOutputFormat; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 


public class WordCount { 

    public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> { 
     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 

     public void map(Object key, BSONObject value, Context context) 
       throws IOException, InterruptedException { 
      String url = value.get("url").toString(); 
      StringTokenizer itr = new StringTokenizer(value.get("summary").toString().replaceAll(",", " ")); 
      while (itr.hasMoreTokens()) { 
       word.set(itr.nextToken() + " " + url); 
       context.write(word, one); 
      } 
     } 
    } 

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, MapWritable> { 
     private MapWritable result = new MapWritable(); 
     public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException { 
      int sum = 0; 
      String str[] = key.toString().split(" "); 
      Text mykey= new Text(str[0]); 
      Text myurl = new Text(str[1]); 
      for (IntWritable val : values) { 
       sum += val.get(); 
      } 
      System.out.println("sum : " + sum); 
      result.put(myurl, new IntWritable(sum)); 
      context.write(mykey, result); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     conf.set("mongo.input.uri" , "mongodb://localhost/testmr.stackin"); 
     conf.set("mongo.output.uri" , "mongodb://localhost/testmr.stackout"); 
     @SuppressWarnings("deprecation") 
     Job job = new Job(conf, "word count"); 
     job.setJarByClass(WordCount.class); 
     job.setMapperClass(TokenizerMapper.class); 
     //job.setCombinerClass(IntSumReducer.class); 
     job.setReducerClass(IntSumReducer.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(IntWritable.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(MapWritable.class); 
     job.setInputFormatClass(MongoInputFormat.class); 
     job.setOutputFormatClass(MongoOutputFormat.class); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

的錯誤:

16/06/14 12:55:51 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 
16/06/14 12:55:51 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 
16/06/14 12:55:52 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
16/06/14 12:55:53 INFO driver.cluster: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} 
16/06/14 12:55:53 INFO driver.cluster: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out 
16/06/14 12:55:53 INFO driver.connection: Opened connection [connectionId{localValue:1, serverValue:1}] to localhost:27017 
16/06/14 12:55:53 INFO driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[2, 4, 9]}, minWireVersion=0, maxWireVersion=0, electionId=null, maxDocumentSize=16777216, roundTripTimeNanos=685985} 
16/06/14 12:55:53 INFO driver.connection: Opened connection [connectionId{localValue:2, serverValue:2}] to localhost:27017 
16/06/14 12:55:53 INFO driver.connection: Closed connection [connectionId{localValue:2, serverValue:2}] to localhost:27017 because the pool has been closed. 
16/06/14 12:55:53 INFO driver.cluster: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} 
16/06/14 12:55:53 INFO driver.cluster: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out 
16/06/14 12:55:53 INFO driver.connection: Opened connection [connectionId{localValue:3, serverValue:3}] to localhost:27017 
16/06/14 12:55:53 INFO driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[2, 4, 9]}, minWireVersion=0, maxWireVersion=0, electionId=null, maxDocumentSize=16777216, roundTripTimeNanos=599568} 
16/06/14 12:55:53 INFO driver.connection: Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27017 
16/06/14 12:55:53 WARN splitter.StandaloneMongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable. 
16/06/14 12:55:53 INFO splitter.MongoCollectionSplitter: Created split: min=null, max= null 
16/06/14 12:55:53 INFO driver.connection: Closed connection [connectionId{localValue:4, serverValue:4}] to localhost:27017 because the pool has been closed. 
16/06/14 12:55:53 INFO mapreduce.JobSubmitter: number of splits:1 
16/06/14 12:55:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local35432891_0001 
16/06/14 12:55:54 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 
16/06/14 12:55:54 INFO mapreduce.Job: Running job: job_local35432891_0001 
16/06/14 12:55:54 INFO mapred.LocalJobRunner: OutputCommitter set in config null 
16/06/14 12:55:54 INFO mapred.LocalJobRunner: OutputCommitter is com.mongodb.hadoop.output.MongoOutputCommitter 
16/06/14 12:55:54 INFO output.MongoOutputCommitter: Setting up job. 
16/06/14 12:55:54 INFO mapred.LocalJobRunner: Waiting for map tasks 
16/06/14 12:55:54 INFO mapred.LocalJobRunner: Starting task: attempt_local35432891_0001_m_000000_0 
16/06/14 12:55:54 INFO output.MongoOutputCommitter: Setting up task. 
16/06/14 12:55:54 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 
16/06/14 12:55:54 INFO mapred.MapTask: Processing split: MongoInputSplit{inputURI hosts=[localhost], inputURI namespace=testmr.stackin, min={ }, max={ }, query={ }, sort={ }, fields={ }, limit=0, notimeout=false} 
16/06/14 12:55:55 INFO driver.cluster: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} 
16/06/14 12:55:55 INFO driver.connection: Opened connection [connectionId{localValue:5, serverValue:5}] to localhost:27017 
16/06/14 12:55:55 INFO driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[2, 4, 9]}, minWireVersion=0, maxWireVersion=0, electionId=null, maxDocumentSize=16777216, roundTripTimeNanos=1312293} 
16/06/14 12:55:55 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 
16/06/14 12:55:55 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 
16/06/14 12:55:55 INFO mapred.MapTask: soft limit at 83886080 
16/06/14 12:55:55 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 
16/06/14 12:55:55 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 
16/06/14 12:55:55 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
16/06/14 12:55:55 INFO driver.connection: Opened connection [connectionId{localValue:6, serverValue:6}] to localhost:27017 
16/06/14 12:55:55 INFO input.MongoRecordReader: Read 4.0 documents from: 
16/06/14 12:55:55 INFO input.MongoRecordReader: MongoInputSplit{inputURI hosts=[localhost], inputURI namespace=testmr.stackin, min={ }, max={ }, query={ }, sort={ }, fields={ }, limit=0, notimeout=false} 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: 
16/06/14 12:55:55 INFO driver.connection: Closed connection [connectionId{localValue:6, serverValue:6}] to localhost:27017 because the pool has been closed. 
16/06/14 12:55:55 INFO mapred.MapTask: Starting flush of map output 
16/06/14 12:55:55 INFO mapred.MapTask: Spilling map output 
16/06/14 12:55:55 INFO mapred.MapTask: bufstart = 0; bufend = 1252; bufvoid = 104857600 
16/06/14 12:55:55 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214340(104857360); length = 57/6553600 
16/06/14 12:55:55 INFO mapred.MapTask: Finished spill 0 
16/06/14 12:55:55 INFO mapred.Task: Task:attempt_local35432891_0001_m_000000_0 is done. And is in the process of committing 
16/06/14 12:55:55 INFO Configuration.deprecation: mapred.child.tmp is deprecated. Instead, use mapreduce.task.tmp.dir 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: map 
16/06/14 12:55:55 INFO mapred.Task: Task 'attempt_local35432891_0001_m_000000_0' done. 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: Finishing task: attempt_local35432891_0001_m_000000_0 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: map task executor complete. 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: Waiting for reduce tasks 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: Starting task: attempt_local35432891_0001_r_000000_0 
16/06/14 12:55:55 INFO output.MongoOutputCommitter: Setting up task. 
16/06/14 12:55:55 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 
16/06/14 12:55:55 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: [email protected] 
16/06/14 12:55:55 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10 
16/06/14 12:55:55 INFO reduce.EventFetcher: attempt_local35432891_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 
16/06/14 12:55:55 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local35432891_0001_m_000000_0 decomp: 1284 len: 1288 to MEMORY 
16/06/14 12:55:55 INFO reduce.InMemoryMapOutput: Read 1284 bytes from map-output for attempt_local35432891_0001_m_000000_0 
16/06/14 12:55:55 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1284, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1284 
16/06/14 12:55:55 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: 1/1 copied. 
16/06/14 12:55:55 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 
16/06/14 12:55:55 INFO mapred.Merger: Merging 1 sorted segments 
16/06/14 12:55:55 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1197 bytes 
16/06/14 12:55:55 INFO reduce.MergeManagerImpl: Merged 1 segments, 1284 bytes to disk to satisfy reduce memory limit 
16/06/14 12:55:55 INFO reduce.MergeManagerImpl: Merging 1 files, 1288 bytes from disk 
16/06/14 12:55:55 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 
16/06/14 12:55:55 INFO mapred.Merger: Merging 1 sorted segments 
16/06/14 12:55:55 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1197 bytes 
16/06/14 12:55:55 INFO mapred.LocalJobRunner: 1/1 copied. 
16/06/14 12:55:55 INFO driver.cluster: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} 
16/06/14 12:55:55 INFO driver.connection: Opened connection [connectionId{localValue:7, serverValue:7}] to localhost:27017 
16/06/14 12:55:55 INFO driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[2, 4, 9]}, minWireVersion=0, maxWireVersion=0, electionId=null, maxDocumentSize=16777216, roundTripTimeNanos=414057} 
16/06/14 12:55:55 INFO output.MongoRecordWriter: Writing to temporary file: tmp/attempt_local35432891_0001_r_000000_0/_MONGO_OUT_TEMP/_out 
16/06/14 12:55:55 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 
sum : 1 
16/06/14 12:55:55 INFO mapreduce.Job: Job job_local35432891_0001 running in uber mode : false 
16/06/14 12:55:55 INFO mapreduce.Job: map 100% reduce 0% 
16/06/14 12:55:56 INFO mapred.LocalJobRunner: reduce task executor complete. 
16/06/14 12:55:56 WARN mapred.LocalJobRunner: job_local35432891_0001 
java.lang.Exception: java.lang.IllegalArgumentException: Can't serialize class org.apache.hadoop.io.IntWritable 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) 
Caused by: java.lang.IllegalArgumentException: Can't serialize class org.apache.hadoop.io.IntWritable 
    at org.bson.BasicBSONEncoder._putObjectField(BasicBSONEncoder.java:202) 
    at org.bson.BasicBSONEncoder.putObject(BasicBSONEncoder.java:111) 
    at com.mongodb.hadoop.io.BSONWritable.write(BSONWritable.java:93) 
    at com.mongodb.hadoop.output.MongoRecordWriter.write(MongoRecordWriter.java:125) 
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558) 
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105) 
    at WordCount$IntSumReducer.reduce(WordCount.java:46) 
    at WordCount$IntSumReducer.reduce(WordCount.java:33) 
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) 
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/06/14 12:55:56 INFO mapreduce.Job: Job job_local35432891_0001 failed with state FAILED due to: NA 
16/06/14 12:55:56 INFO mapreduce.Job: Counters: 38 
    File System Counters 
     FILE: Number of bytes read=3893 
     FILE: Number of bytes written=260587 
     FILE: Number of read operations=0 
     FILE: Number of large read operations=0 
     FILE: Number of write operations=0 
     HDFS: Number of bytes read=0 
     HDFS: Number of bytes written=0 
     HDFS: Number of read operations=1 
     HDFS: Number of large read operations=0 
     HDFS: Number of write operations=0 
    Map-Reduce Framework 
     Map input records=4 
     Map output records=15 
     Map output bytes=1252 
     Map output materialized bytes=1288 
     Input split bytes=201 
     Combine input records=0 
     Combine output records=0 
     Reduce input groups=0 
     Reduce shuffle bytes=1288 
     Reduce input records=0 
     Reduce output records=0 
     Spilled Records=15 
     Shuffled Maps =1 
     Failed Shuffles=0 
     Merged Map outputs=1 
     GC time elapsed (ms)=4 
     CPU time spent (ms)=0 
     Physical memory (bytes) snapshot=0 
     Virtual memory (bytes) snapshot=0 
     Total committed heap usage (bytes)=240648192 
    Shuffle Errors 
     BAD_ID=0 
     CONNECTION=0 
     IO_ERROR=0 
     WRONG_LENGTH=0 
     WRONG_MAP=0 
     WRONG_REDUCE=0 
    File Input Format Counters 
     Bytes Read=0 
    File Output Format Counters 
     Bytes Written=0 

我應該如何解決這個問題之後

?謝謝!

回答

1

在您的減速機中,您的輸出值是結果這是類型MapWritable。如果這是你的意圖,則需要更換此行

job.setOutputValueClass(IntWritable.class); 

job.setOutputValueClass(MapWritable.class); 

編輯:

由於映射器的輸出是減速機不同的(最終)輸出,你也應該設置

job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(IntWritable.class); 
+0

這將解決異常問題,但恐怕mapreduce輸出不是你正在尋找的。請詢問另一個關於如何爲您的工作編寫正確的map-reduce應用程序的問題。 – waltersu

+0

是的,這真的是另一個例外出來。 'java.io.IOException:類型與map中的值不匹配:expected org.apache.hadoop.io.MapWritable,received org.apache.hadoop.io.IntWritable'。 –

+0

我在評論中添加了2行代碼來解決您的異常。請嘗試,如果這對你有用。 – waltersu