1.Hadoop MapReduce框架
hadoop1.x和hadoop2.x使用的的MapReduce模型是不同的,hadoop1.x使用的JobTrack和TaskTrack来分配任务和执行任务,而hadoop2.x是使用(yarn框架) 资源管理器ResourceManager、应用主体ApplicationMaster和节点管理器NodeManager来分配任务和执行任务。
hadoop2.x yarn框架结构图
一个应用程序所需的Container分为两大类,如下:
1. 运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源;
2.运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动之。以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即ApplicationMaster可能与它管理的任务运行在一个节点上。
2.MapReduce工作流程
map完成后的中间过程
map过程包括: 1). 从磁盘读入数据 2). 运行map任务 3). 写结果到磁盘 reduce过程包括: 1). shuffle&sort 2). 运行reduce任务 3). 写结果到磁盘 分析 在map的第三个阶段,map任务的输出会被Partitioner类以指定的方式区分地写出到输出文件里,如果提供了Combiner,在Mapper输出键值时,键值对不会被马上写到输出里,他们会被缓冲在内存中,当达到一定的数据量时,这部分数据会在Combiner中进行合并,然后再输出到Partitioner中。这个阶段通过将数据写入磁盘提高了系统的可靠性,但降低了性能。
排序过程:先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。 在reduce的第一个阶段,Hadoop框架会根据Map结果中的key,将相关的结果传输到某一个Reducer上,这个步骤中的远程传输使用了HTTP协议。
map->combiner->partitioner->comparator(根据partition排序,每个partition中的数据再根据map输出的key进行排序)->划分到不同Reducer中。
官网解释的shuffe流程
hadoop shuffe阶段调优参数
mapreduce的优化通常围绕shuffle的过程展开,包括如何增加并发、多用内存少用磁盘、减少shuffle的文件大小等。shuffe阶段我们可以通过配置一些 Hadoop Job 的参数调整 Hadoop shuffle 性能。Hadoop中途的sort及merge,使用多路归并排序方法。
详细配置参数可看
3.Hadoop知识点小结
(1)map数量控制【显示控制HDFS文件分片大小】
map数量和InputSpilt的个数有关系,一般是一个InputSpilt一个map,为了调节map个数,往往可以通过设置InputSpilt分片大小调节map个数。
实现方法,对mapred.min.split.size和mapred.max.split.size的设置重新赋值,我们在程序只需重新赋值给这两个值就可以控制InputSplit分片的大小了。
假如我们想要设置的分片大小为10MB,我们可以在MapReduce程序的驱动部分添加如下代码
TextInputFormat.setMinInputSplitSize(job,1024L);//设置最小分片大小TextInputFormat.setMaxInputSplitSize(job,1024×1024×10L);//设置最大分片大小
(2)多文件输入
对于多个路径的多种文件格式输入可以采用MultipleInputs来处理,不同格式的文件可以采用不同的map操作。
MultipleInputs.addInputPath(job, firstPath, FirstInputFormat.class, FirstMap.class);MultipleInputs.addInputPath(job, sencondPath, SecondInputFormat.class, SecondMap.class);
(3)多文件输出
抽象类,主要参考org.apache.hadoop.mapred.lib.MultipleOutputFormat。子类唯一需要实现的方法是:String generateFileNameForKeyValue(K key, V value, Configuration conf),即通过key和value及conf配置信息决定文件名(含扩展名)。代码如下。
public static class AlphabetOutputFormat extends MultipleOutputFormat{ @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "other.txt"; } } job.setOutputFormatClass(AlphabetOutputFormat.class);//设置输出格式
(4) Map任务经验
确定map任务数时依次优先参考如下几个原则
- 每个map任务使用的内存不超过800M,尽量在500M以下。
- 每个map任务运行时间控制在大约20分钟,最好1-3分钟。
- 每个map任务处理的最大数据量为一个HDFS块大小(目前为256MB,默认64M,需要调整),一个map任务。
- map任务总数不超过平台可用的任务槽位
- map个数为split的份数
- 压缩文件不可切分
- 非压缩文件和sequence文件可以切分
- dfs.block.size决定block大小,默认64m 可根据数据量设置为128M或者512M
输入分片,InputSpilt;
Map任务执行时间 mapred.task.timeout 默认600秒。计算密集型任务需要扩大时间,或者定时发送心跳信息。Shuffe:组合器,分区器。
(5) yarn container 内存和CPU调优
yarn.scheduler.minimum-allocation-mb 256 yarn.nodemanager.resource.memory-mb 4096
一个节点上运行的任务数目主要由两个因素决定,一个是NodeManager可使用的资源总量,一个是单个任务的资源需求量,比如一个NodeManager上可用资源为8 GB内存,8 CPU。单个任务资源需求量为1 GB内存,1CPU,则该节点最多运行8个任务。
yarn-site.xml
NodeManager上可用资源是由管理员在配置文件yarn-site.xml中配置的,相关参数如下:
yarn.nodemanager.resource.memory-mb:节点总的可用物理内存量,默认是8192 yarn.nodemanager.resource.cpu-vcores:节点总的可用CPU数目,默认是8对于任务的相关参数如下:
yarn.scheduler.minimum-allocation-mb:container最小可申请内存量,默认是1024 yarn.scheduler.minimum-allocation-vcores:container最小可申请CPU数,默认是1 yarn.scheduler.maximum-allocation-mb:container最大可申请内存量,默认是8096 yarn.scheduler.maximum-allocation-vcores:container最大可申请CPU数,默认是4yarn.nodemanager.vmem-pmem-ratio 物理内存 与 虚拟内存的比率,每用1M物理内存,默认使用2.1M虚拟内存,如果任务使用虚拟内存超过2.1 ,则会杀掉Container任务,(建议调大);
yarn.nodemanager.vmem-check-enabled 虚拟内存的检查false掉,默认是truemapred-site.xml
map/reduce的内存配置
mapreduce.map.memory.mb = (1~2倍) * yarn.scheduler.minimum-allocation-mb map内存
mapreduce.reduce.memory.mb = (1~4倍) * yarn.scheduler.minimum-allocation-mb reduce内存(6) MR 任务推测执行
推测执行(Speculative Execution)是指在集群环境下运行MapReduce,可能是程序Bug,负载不均或者其他的一些问题,导致在一个JOB下的多个TASK速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,根据木桶原理,这些任务将成为整个JOB的短板,如果集群启动了推测执行,这时为了最大限度的提高短板,Hadoop会为该task启动备份任务,让speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果,并且在运行完成后Kill掉另外一个任务。
推测执行(Speculative Execution)是通过利用更多的资源来换取时间的一种优化策略,但是在资源很紧张的情况下,推测执行也不一定能带来时间上的优化,推测执行配置mapred-site.xml
mapreduce.map.speculative:如果为true则Map Task可以推测执行,即一个Map Task可以启动Speculative Task运行并行执行,该Speculative Task与原始Task同时处理同一份数据,谁先处理完,则将谁的结果作为最终结果。默认为true。
mapreduce.reduce.speculative:同上,默认值为true。
mapreduce.job.speculative.speculative-cap-running-tasks:能够推测重跑正在运行任务(单个JOB)的百分之几,默认是:0.1。
mapreduce.job.speculative.speculative-cap-total-tasks:能够推测重跑全部任务(单个JOB)的百分之几,默认是:0.01。
mapreduce.job.speculative.minimum-allowed-tasks:可以推测重新执行允许的最小任务数。默认是:10。
首先,mapreduce.job.speculative.minimum-allowed-tasks和mapreduce.job.speculative.speculative-cap-total-tasks * 总任务数,取最大值。
然后,拿到上一步的值和mapreduce.job.speculative.speculative-cap-running-tasks * 正在运行的任务数,取最大值,该值就是猜测执行的运行的任务数
mapreduce.job.speculative.retry-after-no-speculate:等待时间(毫秒)做下一轮的猜测,如果没有任务,推测在这一轮。默认:1000(ms)
mapreduce.job.speculative.retry-after-speculate:等待时间(毫秒)做下一轮的猜测,如果有任务推测在这一轮。默认:15000(ms)
mapreduce.job.speculative.slowtaskthreshold:标准差,任务的平均进展率必须低于所有正在运行任务的平均值才会被认为是太慢的任务,默认值:1.0。
4.总结
(1) InputFormat
InputSpilt和RecordReader是InputFormat的关键属性,有两个关键的方法,getSpilt方法将数据文件切分成spilt,spilt默认大小为hdfs的块大小(64M),getRecordReader将spilt解析成records,再将record解析成<Key,Value>。常见的InputFormat有如下:
TextFileInputFormat,NLineInputFormat,KeyValueTextFileInputFormat,SequeceFileInputFormat。其中TextFileInputFormat的key为文件偏移量,value为每一行的数据。
(2) hadoop partitioner
hadoop默认的分区算法,[key.hashcode]%[reduce数量]
(3) Mapreduce优化
问题:
1、数据倾斜。2、map和reduce个数设置不合理,reduce等待过多。 3、spill次数过多,merge次数过多。
解决办法:
1.数据倾斜,partioner设置不合理,需要把key数量比较多数据均分到不同reduce上去。
2.设置map.reduce共存,map运行到一定程度后,reduce也开始运行,下面就是map运行多少百分比后启动redcue。
默认 mapreduce.job.reduce.slowstart.completedmaps=0.05.
建议设置为0.8,map完成80%后才开始reduce copy
3.合并小文件,减少map数量。减少spill次数,增大 map输出内存缓冲区(io.sort.mb)及spill内存上限(sort.spill.percent),减少文件溢出次数,从而减少磁盘 IO。
4.减少merge次数。通过调整Reduce Task中合并小文件时,一次合并的文件的个数(mapreduce.task.io.sort.factor )。来减少merge次数。
(4)次排序
(5)join
map side jon (将小表加载到每个map内存中,输出大表中存在的key)。
reduce side join (将相同key发送到reduce,在reduce端对两个表的数据进行join)
semi jon(map端过滤掉不需要join的数据)。
(6)调度器
容量调度器(Capacity Scheduler) 配置每个队列的容量百分比,同一个队列的任务排队等待,不同队列的job同时运行在yarn
公平调度器 (Fair Scheduler) 每个队列的job按照缺额公平调度job,缺额:理想计算资源和实际计算资源的差距,缺额越大,优先级越高,越先执行。不同队列的job公平调度。
附录
CentOS安装ClouderManager和hadoop组件
自定义安装Haoop和Spark