大数据数量庞大,格式多样化。大量数据由家庭、制造工厂和办公场所的各种设备、互联网事务交易、社交网络的活动、自动化传感器、移动设备以及科研仪器等生成。它的爆炸式增长已超出了传统IT基础架构的处理能力,给企业和社会带来严峻的数据管理问题。因此必须开发新的数据架构,围绕“数据收集、数据管理、数据分析、知识形成、智慧行动”的全过程,开发使用这些数据,释放出更多数据的隐藏价值。
一、大数据建设思路
1)数据的获得
大数据产生的根本原因在于感知式系统的广泛使用。随着技术的发展,人们已经有能力制造极其微小的带有处理功能的传感器,并开始将这些设备广泛的布置于社会的各个角落,通过这些设备来对整个社会的运转进行监控。这些设备会源源不断的产生新数据,这种数据的产生方式是自动的。因此在数据收集方面,要对来自网络包括物联网、社交网络和机构信息系统的数据附上时空标志,去伪存真,尽可能收集异源甚至是异构的数据,必要时还可与历史数据对照,多角度验证数据的全面性和可信性。
2)数据的汇集和存储
互联网是个神奇的大网,大数据开发和软件定制也是一种模式,这里提供最详细的报价,如果你真的想做,可以来这里,这个手机的开始数字是一八七中间的是三儿零最后的是一四二五零,按照顺序组合起来就可以找到,我想说的是,除非你想做或者了解这方面的内容,如果只是凑热闹的话,就不要来了
数据只有不断流动和充分共享,才有生命力。应在各专用数据库建设的基础上,通过数据集成,实现各级各类信息系统的数据交换和数据共享。 数据存储要达到低成本、低能耗、高可靠性目标,通常要用到冗余配置、分布化和云计算技术,在存储时要按照一定规则对数据进行分类,通过过滤和去重,减少存储量,同时加入便于日后检索的标签。
3)数据的管理
大数据管理的技术也层出不穷。在众多技术中,有6种数据管理技术普遍被关注,即分布式存储与计算、内存数据库技术、列式数据库技术、云数据库、非关系型的数据库、移动数据库技术。其中分布式存储与计算受关注度最高。上图是一个图书数据管理系统。
4)数据的分析
数据分析处理:有些行业的数据涉及上百个参数,其复杂性不仅体现在数据样本本身,更体现在多源异构、多实体和多空间之间的交互动态性,难以用传统的方法描述与度量,处理的复杂度很大,需要将高维图像等多媒体数据降维后度量与处理,利用上下文关联进行语义分析,从大量动态而且可能是模棱两可的数据中综合信息,并导出可理解的内容。大数据的处理类型很多,主要的处理模式可以分为流处理和批处理两种。批处理是先存储后处理,而流处理则是直接处理数据。挖掘的任务主要是关联分析、聚类分析、分类、预测、时序模式和偏差分析等。
5)大数据的价值:决策支持系统
大数据的神奇之处就是通过对过去和现在的数据进行分析,它能够精确预测未来;通过对组织内部的和外部的数据整合,它能够洞察事物之间的相关关系;通过对海量数据的挖掘,它能够代替人脑,承担起企业和社会管理的职责。
6)数据的使用
大数据有三层内涵:一是数据量巨大、来源多样和类型多样的数据集;二是新型的数据处理和分析技术;三是运用数据分析形成价值。大数据对科学研究、经济建设、社会发展和文化生活等各个领域正在产生革命性的影响。大数据应用的关键,也是其必要条件,就在于"IT"与"经营"的融合,当然,这里的经营的内涵可以非常广泛,小至一个零售门店的经营,大至一个城市的经营。
二、大数据基本架构
基于上述大数据的特征,通过传统IT技术存储和处理大数据成本高昂。一个企业要大力发展大数据应用首先需要解决两个问题:一是低成本、快速地对海量、多类别的数据进行抽取和存储;二是使用新的技术对数据进行分析和挖掘,为企业创造价值。因此,大数据的存储和处理与云计算技术密不可分,在当前的技术条件下,基于廉价硬件的分布式系统(如Hadoop等)被认为是最适合处理大数据的技术平台。
Hadoop是一个分布式的基础架构,能够让用户方便高效地利用运算资源和处理海量数据,目前已在很多大型互联网企业得到了广泛应用,如亚马逊、Facebook和Yahoo等。其是一个开放式的架构,架构成员也在不断扩充完善中,通常架构如图2所示:
Hadoop体系架构
(1)Hadoop最底层是一个HDFS(Hadoop Distributed File System,分布式文件系统),存储在HDFS中的文件先被分成块,然后再将这些块复制到多个主机中(DataNode,数据节点)。
(2)Hadoop的核心是MapReduce(映射和化简编程模型)引擎,Map意为将单个任务分解为多个,而Reduce则意为将分解后的多任务结果汇总,该引擎由JobTrackers(工作追踪,对应命名节点)和TaskTrackers(任务追踪,对应数据节点)组成。当处理大数据查询时,MapReduce会将任务分解在多个节点处理,从而提高了数据处理的效率,避免了单机性能瓶颈限制。
(3)Hive是Hadoop架构中的数据仓库,主要用于静态的结构以及需要经常分析的工作。Hbase主要作为面向列的数据库运行在HDFS上,可存储PB级的数据。Hbase利用MapReduce来处理内部的海量数据,并能在海量数据中定位所需的数据且访问它。
(4)Sqoop是为数据的互操作性而设计,可以从关系数据库导入数据到Hadoop,并能直接导入到HDFS或Hive。
(5)Zookeeper在Hadoop架构中负责应用程序的协调工作,以保持Hadoop集群内的同步工作。
(6)Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发,最初由Facebook开发,是构建在各种编程语言间无缝结合的、高效的服务。
Hadoop核心设计
Hbase——分布式数据存储系统
Client:使用HBase RPC机制与HMaster和HRegionServer进行通信
Zookeeper:协同服务管理,HMaster通过Zookeepe可以随时感知各个HRegionServer的健康状况
HMaster: 管理用户对表的增删改查操作
HRegionServer:HBase中最核心的模块,主要负责响应用户I/O请求,向HDFS文件系统中读写数据
HRegion:Hbase中分布式存储的最小单元,可以理解成一个Table
HStore:HBase存储的核心。由MemStore和StoreFile组成。
HLog:每次用户操作写入Memstore的同时,也会写一份数据到HLog文件
结合上述Hadoop架构功能,大数据平台系统功能建议如图所示:
应用系统:对于大多数企业而言,运营领域的应用是大数据最核心的应用,之前企业主要使用来自生产经营中的各种报表数据,但随着大数据时代的到来,来自于互联网、物联网、各种传感器的海量数据扑面而至。于是,一些企业开始挖掘和利用这些数据,来推动运营效率的提升。
数据平台:借助大数据平台,未来的互联网络将可以让商家更了解消费者的使用**惯,从而改进使用体验。基于大数据基础上的相应分析,能够更有针对性的改进用户体验,同时挖掘新的商业机会。
数据源:数据源是指数据库应用程序所使用的数据库或者数据库服务器。丰富的数据源是大数据产业发展的前提。数据源在不断拓展,越来越多样化。如:智能汽车可以把动态行驶过程变成数据,嵌入到生产设备里的物联网可以把生产过程和设备动态状况变成数据。对数据源的不断拓展不仅能带来采集设备的发展,而且可以通过控制新的数据源更好地控制数据的价值。然而我国数字化的数据资源总量远远低于美欧,就已有有限的数据资源来说,还存在标准化、准确性、完整性低,利用价值不高的情况,这**降低了数据的价值。
三、大数据的目标效果
通过大数据的引入和部署,可以达到如下效果:
1)数据整合
·统一数据模型:承载企业数据模型,促进企业各域数据逻辑模型的统一;
·统一数据标准:统一建立标准的数据编码目录,实现企业数据的标准化与统一存储;
·统一数据视图:实现统一数据视图,使企业在客户、产品和资源等视角获取到一致的信息。
2)数据质量管控
·数据质量校验:根据规则对所存储的数据进行一致性、完整性和准确性的校验,保证数据的一致性、完整性和准确性;
·数据质量管控:通过建立企业数据的质量标准、数据管控的组织、数据管控的流程,对数据质量进行统一管控,以达到数据质量逐步完善。
3)数据共享
·消除网状接口,建立大数据共享中心,为各业务系统提供共享数据,降低接口复杂度,提高系统间接口效率与质量;
·以实时或准实时的方式将整合或计算好的数据向外系统提供。
4)数据应用
·查询应用:平台实现条件不固定、不可预见、格式灵活的按需查询功能;
·固定报表应用:视统计维度和指标固定的分析结果的展示,可根据业务系统的需求,分析产生各种业务报表数据等;
·动态分析应用:按关心的维度和指标对数据进行主题性的分析,动态分析应用中维度和指标不固定。
四、总结
基于分布式技术构建的大数据平台能够有效降低数据存储成本,提升数据分析处理效率,并具备海量数据、高并发场景的支撑能力,可大幅缩短数据查询响应时间,满足企业各上层应用的数据需求。
Hadoop在可伸缩性、健壮性、计算性能和成本上具有无可替代的优势,事实上已成为当前互联网企业主流的大数据分析平台。本文主要介绍一种基于Hadoop平台的多维分析和数据挖掘平台架构。作为一家互联网数据分析公司,我们在海量数据的分析领域那真是被“逼上梁山”。多年来在严苛的业务需求和数据压力下,我们几乎尝试了所有可能的大数据分析方法,最终落地于Hadoop平台之上。
1. 大数据分析大分类
Hadoop平台对业务的针对性较强,为了让你明确它是否符合你的业务,现粗略地从几个角度将大数据分析的业务需求分类,针对不同的具体需求,应采用不同的数据分析架构。
按照数据分析的实时性,分为实时数据分析和离线数据分析两种。
实时数据分析一般用于金融、移动和互联网B2C等产品,往往要求在数秒内返回上亿行数据的分析,从而达到不影响用户体验的目的。要满足这样的需求,可以采用精心设计的传统关系型数据库组成并行处理集群,或者采用一些内存计算平台,或者采用HDD的架构,这些无疑都需要比较高的软硬件成本。目前比较新的海量数据实时分析工具有EMC的Greenplum、SAP的HANA等。
对于大多数反馈时间要求不是那么严苛的应用,比如离线统计分析、机器学习、搜索引擎的反向索引计算、推荐引擎的计算等,应采用离线分析的方式,通过数据采集工具将日志数据导入专用的分析平台。但面对海量数据,传统的ETL工具往往彻底失效,主要原因是数据格式转换的开销太大,在性能上无法满足海量数据的采集需求。互联网企业的海量数据采集工具,有Facebook开源的Scribe、LinkedIn开源的Kafka、淘宝开源的Timetunnel、Hadoop的Chukwa等,均可以满足每秒数百MB的日志数据采集和传输需求,并将这些数据上载到Hadoop中央系统上。
按照大数据的数据量,分为内存级别、BI级别、海量级别三种。
这里的内存级别指的是数据量不超过集群的内存最大值。不要小看今天内存的容量,Facebook缓存在内存的Memcached中的数据高达320TB,而目前的PC服务器,内存也可以超过百GB。因此可以采用一些内存数据库,将热点数据常驻内存之中,从而取得非常快速的分析能力,非常适合实时分析业务。图1是一种实际可行的MongoDB分析架构。
图1 用于实时分析的MongoDB架构
MongoDB大集群目前存在一些稳定性问题,会发生周期性的写堵塞和主从同步失效,但仍不失为一种潜力十足的可以用于高速数据分析的NoSQL。
此外,目前大多数服务厂商都已经推出了带4GB以上SSD的解决方案,利用内存+SSD,也可以轻易达到内存分析的性能。随着SSD的发展,内存数据分析必然能得到更加广泛的应用。
BI级别指的是那些对于内存来说太大的数据量,但一般可以将其放入传统的BI产品和专门设计的BI数据库之中进行分析。目前主流的BI产品都有支持TB级以上的数据分析方案。种类繁多,就不具体列举了。
海量级别指的是对于数据库和BI产品已经完全失效或者成本过高的数据量。海量数据级别的优秀企业级产品也有很多,但基于软硬件的成本原因,目前大多数互联网企业采用Hadoop的HDFS分布式文件系统来存储数据,并使用MapReduce进行分析。本文稍后将主要介绍Hadoop上基于MapReduce的一个多维数据分析平台。
数据分析的算法复杂度
根据不同的业务需求,数据分析的算法也差异巨大,而数据分析的算法复杂度和架构是紧密关联的。举个例子,Redis是一个性能非常高的内存Key-Value NoSQL,它支持List和Set、SortedSet等简单集合,如果你的数据分析需求简单地通过排序,链表就可以解决,同时总的数据量不大于内存(准确地说是内存加上虚拟内存再除以2),那么无疑使用Redis会达到非常惊人的分析性能。
还有很多易并行问题(Embarrassingly Parallel),计算可以分解成完全独立的部分,或者很简单地就能改造出分布式算法,比如大规模脸部识别、图形渲染等,这样的问题自然是使用并行处理集群比较适合。
而大多数统计分析,机器学习问题可以用MapReduce算法改写。MapReduce目前最擅长的计算领域有流量统计、推荐引擎、趋势分析、用户行为分析、数据挖掘分类器、分布式索引等。
2. 面对大数据OLAP大一些问题
OLAP分析需要进行大量的数据分组和表间关联,而这些显然不是NoSQL和传统数据库的强项,往往必须使用特定的针对BI优化的数据库。比如绝大多数针对BI优化的数据库采用了列存储或混合存储、压缩、延迟加载、对存储数据块的预统计、分片索引等技术。
Hadoop平台上的OLAP分析,同样存在这个问题,Facebook针对Hive开发的RCFile数据格式,就是采用了上述的一些优化技术,从而达到了较好的数据分析性能。如图2所示。
然而,对于Hadoop平台来说,单单通过使用Hive模仿出SQL,对于数据分析来说远远不够,首先Hive虽然将HiveQL翻译MapReduce的时候进行了优化,但依然效率低下。多维分析时依然要做事实表和维度表的关联,维度一多性能必然大幅下降。其次,RCFile的行列混合存储模式,事实上限制死了数据格式,也就是说数据格式是针对特定分析预先设计好的,一旦分析的业务模型有所改动,海量数据转换格式的代价是极其巨大的。最后,HiveQL对OLAP业务分析人员依然是非常不友善的,维度和度量才是直接针对业务人员的分析语言。
而且目前OLAP存在的最大问题是:业务灵活多变,必然导致业务模型随之经常发生变化,而业务维度和度量一旦发生变化,技术人员需要把整个Cube(多维立方体)重新定义并重新生成,业务人员只能在此Cube上进行多维分析,这样就限制了业务人员快速改变问题分析的角度,从而使所谓的BI系统成为死板的日常报表系统。
使用Hadoop进行多维分析,首先能解决上述维度难以改变的问题,利用Hadoop中数据非结构化的特征,采集来的数据本身就是包含大量冗余信息的。同时也可以将大量冗余的维度信息整合到事实表中,这样可以在冗余维度下灵活地改变问题分析的角度。其次利用Hadoop MapReduce强大的并行化处理能力,无论OLAP分析中的维度增加多少,开销并不显著增长。换言之,Hadoop可以支持一个巨大无比的Cube,包含了无数你想到或者想不到的维度,而且每次多维分析,都可以支持成千上百个维度,并不会显著影响分析的性能。
而且目前OLAP存在的最大问题是:业务灵活多变,必然导致业务模型随之经常发生变化,而业务维度和度量一旦发生变化,技术人员需要把整个Cube(多维立方体)重新定义并重新生成,业务人员只能在此Cube上进行多维分析,这样就限制了业务人员快速改变问题分析的角度,从而使所谓的BI系统成为死板的日常报表系统。
3. 一种Hadoop多维分析平台的架构
整个架构由四大部分组成:数据采集模块、数据冗余模块、维度定义模块、并行分 析模块。
数据采集模块采用了Cloudera的Flume,将海量的小日志文件进行高速传输和合并,并能够确保数据的传输安全性。单个collector宕机之后,数据也不会丢失,并能将agent数据自动转移到其他的colllecter处理,不会影响整个采集系统的运行。如图5所示。
数据冗余模块不是必须的,但如果日志数据中没有足够的维度信息,或者需要比较频繁地增加维度,则需要定义数据冗余模块。通过冗余维度定义器定义需要冗余的维度信息和来源(数据库、文件、内存等),并指定扩展方式,将信息写入数据日志中。在海量数据下,数据冗余模块往往成为整个系统的瓶颈,建议使用一些比较快的内存NoSQL来冗余原始数据,并采用尽可能多的节点进行并行冗余;或者也完全可以在Hadoop中执行批量Map,进行数据格式的转化。
维度定义模块是面向业务用户的前端模块,用户通过可视化的定义器从数据日志中定义维度和度量,并能自动生成一种多维分析语言,同时可以使用可视化的分析器通过GUI执行刚刚定义好的多维分析命令。
并行分析模块接受用户提交的多维分析命令,并将通过核心模块将该命令解析为Map-Reduce,提交给Hadoop集群之后,生成报表供报表中心展示。
核心模块是将多维分析语言转化为MapReduce的解析器,读取用户定义的维度和度量,将用户的多维分析命令翻译成MapReduce程序。核心模块的具体逻辑如图6所示。
图6中根据JobConf参数进行Map和Reduce类的拼装并不复杂,难点是很多实际问题很难通过一个MapReduce Job解决,必须通过多个MapReduce Job组成工作流(WorkFlow),这里是最需要根据业务进行定制的部分。图7是一个简单的MapReduce工作流的例子。
MapReduce的输出一般是统计分析的结果,数据量相较于输入的海量数据会小很多,这样就可以导入传统的数据报表产品中进行展现。
可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:
1.JobClient 写代码,配置作业,提交作业。
2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。
3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。
4.HDFS:保存作业数据、配置信息等,保存作业结果。
Map/Reduce 作业总体执行流程:
代码编写 ----> 作业配置 ----> 作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ----> Reduce任务分配与执行 ----> 输出结果
而对于每个作业的执行,又包含:
输入准备 ----> 任务执行 ----> 输出结果
作业提交JobClient:
JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。
submitJob方法作业提交的过程:
1.向JobTracker请求一个新的JobId。
2.检查作业相关路径,如果路径不正确就会返回错误。
3.计算作业输入分片及其划分信息。
4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并
复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。
5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。
作业的初始化JobTracker:
JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。
当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。
初始化过程就是JobInProgress对象的initTasks方法进行初始化的。
初始化步骤:
1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。
2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。
3.最后就是创建两个初始化task,进行map和reduce的初始化。
任务的分配JobTracker:
消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。
分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。
分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。
任务的执行TaskTracker:
TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。
1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。
2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。
3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。
在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。
对于单个Map,任务执行的简单流程是:
1.分配任务执行参数
2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)
3.配置log文件夹,配置map任务的通信和输出参数
4.读取input split,生成RecordReader读取数据
5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。
6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。
Streaming和Pipes:
Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。
Streaming:使用标准输入输出Streaming与进程进行通信。
Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。
进度和状态更新:
一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。
状态消息与客户端的通信:
1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。
2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。
3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。
4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。
5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。
6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。
前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。
作业的完成:
当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。
如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。
jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。
失败
实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。
1.任务失败
子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。
子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。
任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。
任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。
任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。
2.tasktracker失败
tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。
即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。
3.jobtracker失败
老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。
作业调度:
早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。
Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)
Capacity Scheduler:
集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。
shuffle和排序:
MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。
整个shuffle的流程应该是这样:
map结果划分partition 排序sort 分割spill 合并同一划分 合并同一划分 合并结果排序 reduce处理 输出
Map端:
写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。
写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。
合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。
在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。
内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。
如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。
写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。
reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。
Reduce端:
复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。
合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。
Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。
Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。
Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。
如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)
或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。
随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。
排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。
reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。
细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。