2012-10-03 67 views
3

我在Reduce合併階段遇到了一些性能問題,我不知道是否有人可以看一看。我有一個6 GB的數據集(文本),均勻分佈在集羣上,數據集有兩個鍵,然後GroupBy分成兩個減速器(我使用級聯)。所以每個reducer都有3GB的數據。我給每個reducer 12 GB的內存,但我仍然看到一個20分鐘的合併階段。Hadoop減少內存中的shuffle合併

兩個問題:這個合併不應該完全在內存中完成(如果我有12 GB的堆)。即使沒有內存合併,20分鐘似乎太長,無法合併3GB,特別是在節點上有12個磁盤(JBOD)和12個內核。我想知道是否將部分合並數據寫入錯誤的地方(HDFS,vs local?)。

MAPRFS_BYTES_READ和MAPRFS_BYTES_WRITTEN很有趣。初始數據集是6GB(它在「地圖」列中顯示)。不知何故,排序增加到17GB,這似乎很奇怪。然後在reduce階段,它從MapRfs中讀取23GB,並寫入17GB。減少階段合併數據應該寫入MapRFS還是寫入本地FS?爲什麼會規模增長了這麼多了初始數據集(不使用壓縮,這是連勝文)


Counter  Map  Reduce Total 
Job Counters Aggregate execution time of mappers(ms)  0 0 29,887,359 
Launched reduce tasks 0 0 2 
Rack-local map tasks 0 0 4 
Launched map tasks 0 0 353 
Data-local map tasks 0 0 311 
cascading.flow.SliceCounters Read_Duration 329,399  366,004  695,403 
Tuples_Read  252,000,000  67,896,295 319,896,295 
Tuples_Written 252,000,000  0 252,000,000 
Process_End_Time 476,294,761,317,139  0 476,294,761,317,139 
Write_Duration 2,713,840 0 2,713,840 
Process_Begin_Time 476,294,753,764,176  2,698,557,228,678 478,993,310,992,854 
FileSystemCounters MAPRFS_BYTES_READ 6,651,978,400 21,721,014,791 28,372,993,191 
MAPRFS_BYTES_WRITTEN 17,044,716,578 17,044,701,398 34,089,417,976 
FILE_BYTES_WRITTEN 19,046,005 107,748  19,153,753 
Map-Reduce Framework Map input records 252,000,000  0 252,000,000 
Reduce shuffle bytes 0 16,980,659,887 16,980,659,887 
Spilled Records  252,000,000  0 252,000,000 
Map output bytes 16,540,701,046 0 16,540,701,046 
CPU_MILLISECONDS 18,861,020 7,640,360 26,501,380 
Map input bytes  6,644,947,675 0 6,644,947,675 
Combine input records 0 0 0 
SPLIT_RAW_BYTES  97,428 0 97,428 
Reduce input records 0 67,896,295 67,896,295 
Reduce input groups  0 2 2 
Combine output records 0 0 0 
PHYSICAL_MEMORY_BYTES 324,852,019,200  15,041,486,848 339,893,506,048 
Reduce output records 0 0 0 
VIRTUAL_MEMORY_BYTES 626,863,038,464  26,729,230,336 653,592,268,800 
Map output records 252,000,000  0 252,000,000 
GC time elapsed (ms) 1,568,523 76,636 1,645,159 
cascading.flow.StepCounters  Tuples_Read  252,000,000  0 252,000,000 

name value 
fs.s3n.impl org.apache.hadoop.fs.s3native.NativeS3FileSystem 
mapreduce.heartbeat.100 1000 
mapred.task.cache.levels 2 
hadoop.tmp.dir /tmp/hadoop-${user.name} 
hadoop.native.lib true 
map.sort.class org.apache.hadoop.util.QuickSort 
mapreduce.jobtracker.recovery.dir /var/mapr/cluster/mapred/jobTracker/recovery 
mapreduce.heartbeat.1000 10000 
ipc.client.idlethreshold 4000 
mapred.system.dir /var/mapr/cluster/mapred/jobTracker/system 
mapreduce.cluster.reduce.userlog.retain-size 10485760 
mapred.job.tracker.persist.jobstatus.hours 0 
io.skip.checksum.errors false 
fs.default.name maprfs:/// 
mapred.cluster.reduce.memory.mb -1 
mapred.child.tmp ./tmp 
fs.har.impl.disable.cache true 
mapred.jobtracker.jobhistory.lru.cache.size 5 
mapred.skip.reduce.max.skip.groups 0 
cascading.flow.step.num 1 
mapred.jobtracker.instrumentation org.apache.hadoop.mapred.JobTrackerMetricsInst 
mapr.localvolumes.path /var/mapr/local 
mapred.tasktracker.dns.nameserver default 
io.sort.factor 50 
mapred.output.value.groupfn.class cascading.tuple.hadoop.util.GroupingComparator 
mapreduce.use.maprfs true 
mapred.task.timeout 600000 
mapred.max.tracker.failures 4 
hadoop.rpc.socket.factory.class.default org.apache.hadoop.net.StandardSocketFactory 
mapred.mapoutput.key.class cascading.tuple.io.TuplePair 
fs.hdfs.impl org.apache.hadoop.hdfs.DistributedFileSystem 
mapred.queue.default.acl-administer-jobs  
mapred.output.key.class org.apache.hadoop.io.Text 
mapred.skip.map.auto.incr.proc.count true 
mapred.map.runner.class cascading.flow.hadoop.FlowMapper 
mapreduce.job.complete.cancel.delegation.tokens true 
mapreduce.tasktracker.heapbased.memory.management false 
io.mapfile.bloom.size 1048576 
tasktracker.http.threads 2 
mapred.job.shuffle.merge.percent 0.70 
cascading.flow.id 853276BF02049D394C31880B08C9E6CC 
mapred.child.renice 10 
fs.ftp.impl org.apache.hadoop.fs.ftp.FTPFileSystem 
user.name jdavis 
mapred.fairscheduler.smalljob.max.inputsize 10737418240 
mapred.output.compress false 
io.bytes.per.checksum 512 
mapred.healthChecker.script.timeout 600000 
topology.node.switch.mapping.impl org.apache.hadoop.net.ScriptBasedMapping 
mapred.reduce.slowstart.completed.maps 0.95 
mapred.reduce.max.attempts 4 
fs.ramfs.impl org.apache.hadoop.fs.InMemoryFileSystem 
mapr.localoutput.dir output 
mapred.skip.map.max.skip.records 0 
mapred.jobtracker.port 9001 
mapred.cluster.map.memory.mb -1 
mapreduce.tasktracker.prefetch.maptasks 1.0 
hadoop.security.group.mapping org.apache.hadoop.security.ShellBasedUnixGroupsMapping 
mapreduce.tasktracker.task.slowlaunch false 
mapred.job.tracker.persist.jobstatus.dir /var/mapr/cluster/mapred/jobTracker/jobsInfo 
mapred.jar /var/mapr/cluster/mapred/jobTracker/staging/jdavis/.staging/job_201210022148_0086/job.jar 
fs.s3.buffer.dir ${hadoop.tmp.dir}/s3 
job.end.retry.attempts 0 
fs.file.impl org.apache.hadoop.fs.LocalFileSystem 
cascading.app.name omeg 
mapred.local.dir.minspacestart 0 
mapred.output.compression.type RECORD 
fs.mapr.working.dir /user/$USERNAME/ 
fs.maprfs.impl com.mapr.fs.MapRFileSystem 
fs.https.impl cascading.tap.hadoop.io.HttpFileSystem 
topology.script.number.args 100 
io.mapfile.bloom.error.rate 0.005 
mapred.cluster.max.reduce.memory.mb -1 
mapred.max.tracker.blacklists 4 
mapred.task.profile.maps 0-2 
mapred.userlog.retain.hours 24 
mapred.job.tracker.persist.jobstatus.active false 
hadoop.security.authorization false 
local.cache.size 10737418240 
mapred.min.split.size 0 
mapred.map.tasks 353 
mapred.tasktracker.task-controller.config.overwrite true 
cascading.app.appjar.path /home/jdavis/tmp/omeg.jar 
mapred.output.value.class org.apache.hadoop.io.Text 
mapred.partitioner.class cascading.tuple.hadoop.util.GroupingPartitioner 
mapreduce.maprfs.use.compression true 
mapred.job.queue.name default 
mapreduce.tasktracker.reserved.physicalmemory.mb.low 0.90 
cascading.group.comparator.size 3 
ipc.server.listen.queue.size 128 
group.name common 
mapred.inmem.merge.threshold 0 
job.end.retry.interval 30000 
mapred.fairscheduler.smalljob.max.maps 10 
mapred.skip.attempts.to.start.skipping 2 
fs.checkpoint.dir ${hadoop.tmp.dir}/dfs/namesecondary 
mapred.reduce.tasks 2 
mapred.merge.recordsBeforeProgress 10000 
mapred.userlog.limit.kb 0 
mapred.job.reduce.memory.mb -1 
webinterface.private.actions true 
io.sort.spill.percent 0.99 
mapred.job.shuffle.input.buffer.percent 0.80 
mapred.job.name [853276BF02049D394C31880B08C9E6CC/DCB7B555F1FC65C767B8E2CD716607AA] copyr/(1/1) /user/jdavis/ctest/end 
mapred.map.tasks.speculative.execution false 
hadoop.util.hash.type murmur 
mapred.map.max.attempts 4 
mapreduce.job.acl-view-job 

mapred.job.tracker.handler.count 10 
mapred.input.format.class cascading.tap.hadoop.io.MultiInputFormat 
mapred.tasktracker.expiry.interval 600000 
mapred.jobtracker.maxtasks.per.job -1 
mapred.jobtracker.job.history.block.size 3145728 
keep.failed.task.files false 
mapred.output.format.class org.apache.hadoop.mapred.TextOutputFormat 
ipc.client.tcpnodelay false 
mapred.task.profile.reduces 0-2 
mapred.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec 
io.map.index.skip 0 
mapred.working.dir /user/jdavis 
ipc.server.tcpnodelay false 
hadoop.proxyuser.root.hosts 
mapred.reducer.class cascading.flow.hadoop.FlowReducer 
cascading.app.id A593B4669179BB6F06771249E7ADFA48 
mapred.used.genericoptionsparser true 
jobclient.progress.monitor.poll.interval 1000 
mapreduce.tasktracker.jvm.idle.time 10000 
mapred.job.map.memory.mb -1 
hadoop.logfile.size 10000000 
mapred.reduce.tasks.speculative.execution false 
mapreduce.job.dir maprfs:/var/mapr/cluster/mapred/jobTracker/staging/jdavis/.staging/job_201210022148_0086 
mapreduce.tasktracker.outofband.heartbeat true 
mapreduce.reduce.input.limit -1 
mapred.tasktracker.ephemeral.tasks.ulimit 4294967296> 
fs.s3n.block.size 67108864 
fs.inmemory.size.mb 200 
mapred.fairscheduler.smalljob.max.reducers 10 
hadoop.security.authentication simple 
fs.checkpoint.period 3600 
cascading.flow.step.id DCB7B555F1FC65C767B8E2CD716607AA 
mapred.job.reuse.jvm.num.tasks -1 
mapred.jobtracker.completeuserjobs.maximum 5 
mapreduce.cluster.map.userlog.retain-size 10485760 
mapred.task.tracker.task-controller org.apache.hadoop.mapred.LinuxTaskController 
mapred.output.key.comparator.class cascading.tuple.hadoop.util.GroupingSortingComparator 
fs.s3.maxRetries 4 
mapred.cluster.max.map.memory.mb -1 
mapred.mapoutput.value.class cascading.tuple.Tuple 
mapred.map.child.java.opts -XX:ErrorFile=/opt/cores/mapreduce_java_error%p.log 
mapred.job.tracker.history.completed.location /var/mapr/cluster/mapred/jobTracker/history/done 
mapred.local.dir /tmp/mapr-hadoop/mapred/local 
fs.hftp.impl org.apache.hadoop.hdfs.HftpFileSystem 
fs.trash.interval 0 
fs.s3.sleepTimeSeconds 10 
mapred.submit.replication 10 
fs.har.impl org.apache.hadoop.fs.HarFileSystem 
mapreduce.heartbeat.10 300 
cascading.version Concurrent, Inc - Cascading 2.0.5 
mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec 
mapred.tasktracker.dns.interface default 
hadoop.proxyuser.root.groups root 
mapred.job.tracker maprfs:/// 
mapreduce.job.submithost c10-m001.wowrack.upstream.priv 
mapreduce.tasktracker.cache.local.numberdirectories 10000 
io.seqfile.sorter.recordlimit 1000000 
mapreduce.heartbeat.10000 100000 
mapred.line.input.format.linespermap 1 
mapred.jobtracker.taskScheduler org.apache.hadoop.mapred.FairScheduler 
mapred.tasktracker.instrumentation org.apache.hadoop.mapred.TaskTrackerMetricsInst 
mapred.tasktracker.taskmemorymanager.killtask.maxRSS false 
mapred.child.taskset true 
jobclient.completion.poll.interval 5000 
mapred.fairscheduler.smalljob.max.reducer.inputsize 1073741824 
mapred.local.dir.minspacekill 0 
io.sort.record.percent 0.28 
mapr.localspill.dir spill 
io.compression.codec.lzo.class com.hadoop.compression.lzo.LzoCodec 
fs.kfs.impl org.apache.hadoop.fs.kfs.KosmosFileSystem 
mapred.tasktracker.reduce.tasks.maximum (CPUS > 2) ? (CPUS * 0.70): 1 
mapred.temp.dir ${hadoop.tmp.dir}/mapred/temp 
mapred.tasktracker.ephemeral.tasks.maximum 1 
fs.checkpoint.edits.dir ${fs.checkpoint.dir} 
mapred.tasktracker.tasks.sleeptime-before-sigkill 5000 
mapred.job.reduce.input.buffer.percent 0.0 
mapred.tasktracker.indexcache.mb 10 
mapreduce.task.classpath.user.precedence false 
mapreduce.job.split.metainfo.maxsize -1 
hadoop.logfile.count 10 
fs.automatic.close true 
mapred.skip.reduce.auto.incr.proc.count true 
mapreduce.job.submithostaddress 10.100.0.99 
mapred.child.oom_adj 10 
io.seqfile.compress.blocksize 1000000 
fs.s3.block.size 67108864 
mapred.tasktracker.taskmemorymanager.monitoring-interval 3000 
mapreduce.tasktracker.volume.healthcheck.interval 60000 
mapred.cluster.ephemeral.tasks.memory.limit.mb 200 
mapreduce.jobtracker.staging.root.dir /var/mapr/cluster/mapred/jobTracker/staging 
mapred.acls.enabled false 
mapred.queue.default.state RUNNING 
mapred.fairscheduler.smalljob.schedule.enable false 
mapred.queue.names default 
fs.hsftp.impl org.apache.hadoop.hdfs.HsftpFileSystem 
mapred.fairscheduler.eventlog.enabled false 
mapreduce.jobtracker.recovery.maxtime 480 
mapred.task.tracker.http.address 0.0.0.0:50060 
mapreduce.jobtracker.inline.setup.cleanup false 
mapred.reduce.parallel.copies 40 
io.seqfile.lazydecompress true 
mapred.tasktracker.ephemeral.tasks.timeout 10000 
mapred.output.dir maprfs:/user/jdavis/ctest/end 
mapreduce.tasktracker.group root 
hadoop.workaround.non.threadsafe.getpwuid false 
io.sort.mb 512 
mapred.reduce.child.java.opts -Xmx12000m 
ipc.client.connection.maxidletime 10000 
mapred.compress.map.output false 
hadoop.security.uid.cache.secs 14400 
mapred.task.tracker.report.address 127.0.0.1:0 
mapred.healthChecker.interval 60000 
ipc.client.kill.max 10 
ipc.client.connect.max.retries 10 
fs.http.impl cascading.tap.hadoop.io.HttpFileSystem 
fs.s3.impl org.apache.hadoop.fs.s3.S3FileSystem 
mapred.fairscheduler.assignmultiple true 
mapred.user.jobconf.limit 5242880 
mapred.input.dir maprfs:/user/jdavis/ctest/mid 
mapred.job.tracker.http.address 0.0.0.0:50030 
io.file.buffer.size 131072 
mapred.jobtracker.restart.recover true 
io.serializations cascading.tuple.hadoop.TupleSerialization,org.apache.hadoop.io.serializer.WritableSerialization 
mapreduce.use.fastreduce false 
mapred.reduce.copy.backoff 300 
mapred.task.profile false 
mapred.jobtracker.retiredjobs.cache.size 300 
jobclient.output.filter FAILED 
mapred.tasktracker.map.tasks.maximum (CPUS > 2) ? (CPUS * 0.80) : 1 
io.compression.codecs org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec 
fs.checkpoint.size 67108864 
cascading.sort.comparator.size 3 

2012-10-02 19:30:50,676 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=SHUFFLE, sessionId= 
2012-10-02 19:30:50,737 INFO org.apache.hadoop.mapreduce.util.ProcessTree: setsid exited with exit code 0 
2012-10-02 19:30:50,742 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: /proc/<pid>/status does not have information about swap space used(VmSwap). Can not track swap usage of a task. 
2012-10-02 19:30:50,742 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : [email protected]7b62aab 
2012-10-02 19:30:50,903 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 9115 may have finished in the interim. 
2012-10-02 19:31:01,663 INFO org.apache.hadoop.mapred.Merger: Merging 37 sorted segments 
2012-10-02 19:31:01,672 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 36 segments left of total size: 1204882102 bytes 
2012-10-02 19:31:03,079 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 7596 may have finished in the interim. 
2012-10-02 19:31:15,487 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 4803 may have finished in the interim. 
2012-10-02 19:31:15,489 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 11069 may have finished in the interim. 
2012-10-02 19:33:37,821 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 20846 may have finished in the interim. 
2012-10-02 19:33:59,274 INFO org.apache.hadoop.mapred.Merger: Merging 35 sorted segments 
2012-10-02 19:33:59,275 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 35 segments left of total size: 1176895576 bytes 
2012-10-02 19:34:02,131 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 21791 may have finished in the interim. 
2012-10-02 19:34:29,927 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 22847 may have finished in the interim. 
2012-10-02 19:36:32,181 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 30438 may have finished in the interim. 
2012-10-02 19:37:18,243 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 3852 may have finished in the interim. 
2012-10-02 19:37:26,292 INFO org.apache.hadoop.mapred.Merger: Merging 37 sorted segments 
2012-10-02 19:37:26,293 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 37 segments left of total size: 1233203028 bytes 
2012-10-02 19:39:07,695 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 9813 may have finished in the interim. 
2012-10-02 19:39:10,764 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 10045 may have finished in the interim. 
2012-10-02 19:39:56,829 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 17383 may have finished in the interim. 
2012-10-02 19:40:18,295 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 19584 may have finished in the interim. 
2012-10-02 19:40:32,307 INFO org.apache.hadoop.mapred.Merger: Merging 58 sorted segments 
2012-10-02 19:40:32,308 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 44 segments left of total size: 1206978885 bytes 
2012-10-02 19:41:35,154 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 26361 may have finished in the interim. 
2012-10-02 19:43:53,644 INFO org.apache.hadoop.mapred.Merger: Merging 56 sorted segments 
2012-10-02 19:43:53,645 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 56 segments left of total size: 1217287352 bytes 
2012-10-02 19:46:55,246 INFO org.apache.hadoop.mapred.Merger: Merging 44 sorted segments 
2012-10-02 19:46:55,246 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 44 segments left of total size: 1221163604 bytes 
2012-10-02 19:49:57,894 INFO org.apache.hadoop.mapred.Merger: Merging 85 sorted segments 
2012-10-02 19:49:57,895 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 62 segments left of total size: 1229975233 bytes 
2012-10-02 19:52:09,914 WARN org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree: The process 25247 may have finished in the interim. 
2012-10-02 19:52:52,620 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments 
2012-10-02 19:52:52,620 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 32065409 bytes 
2012-10-02 19:52:53,327 INFO org.apache.hadoop.mapred.Merger: Merging 8 sorted segments 
2012-10-02 19:52:53,345 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 8 segments left of total size: 8522450575 bytes 
2012-10-02 19:52:53,366 INFO cascading.flow.hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.0.5 

回答

1

好你的問題的第一部分。它不足以給hadoop更多的內存,並期望它能夠自動使用它和事情變得更快(雖然會很好!)。 但是你需要調整配置屬性以利用你的內存。即io.sort.mb是一個可以幫助加快合併/洗牌階段的設置。

http://hadoop.apache.org/docs/r0.20.2/mapred-default.html是大多數配置屬性的列表。 http://www.slideshare.net/cloudera/mr-perf給出了加速合併的一些明確建議(幻燈片15)。

啓用中間輸出壓縮(mapred.compress.map.output)通常也會加快速度。

HTH 約翰內斯

0

這實際上不是在所有問題的答案,但我可以解釋,爲什麼有這麼多的數據傳輸。

CoGroup將每個鍵標記爲原始輸入的標記。因此,如果您的數據僅包含2個密鑰,則很容易看出,該數據的大小可以輕鬆加倍(小鍵+相似大小的標籤)。那給你17GB的數據。

接下來,353個映射器每個進程17MB(非常小,你有很多小輸入文件嗎?),默認情況下每個映射器都應該接收blocksize數據(mapr不要在job.xml中暴露它的大小,所以不知道但是在64GB的情況下,你應該使用少得多的映射器(〜100)處理這些數據。

我實際上不知道Mapr Direct Shuffle(tm)是如何工作的(現在正在調查),但是mappers輸出看起來像是maprfs,因此,reducer中的shuffle/sort階段直接從maprfs下載這些部分,這就給我們17GB(你可以在reducer log中總結所有大小),但是從那裏增加6GB出現 - 不知道。可能是這個問題可以發送給mapr支持。