【VB开源代码栏目提醒】:网学会员为广大网友收集整理了,面向MapReduce的数据处理流程开发方法 - 综合课件,希望对大家有所帮助!
The Major National Science and Technology Special Projects High-Nuclear-Based Project of China under Grant No. 2009ZX01043-003-002 国家科技重大专项核高基项目 the National Science and Technology Support Program of China under Grant No. 2009BAG18B01 2009BADA9B02 国家科技支撑计划. Received 2010-08 Accepted 2010-10. ISSN 1673-9418 CODEN JKYTA8 E-mail: fcstvip.163.com Journal of Frontiers of Computer Science and Technology http://www.ceaj.org 1673-9418/2011/0502-0161-09 Tel: 86-10-51616056 DOI: 10.3778/j.issn.1673-9418.2011.02.006 面向MapReduce的数据处理流程开发方法 易小华12 刘 杰3 叶 丹1 1. 中国科学院 软件研究所 软件工程技术中心 北京100190 2. 中国科学院 研究生院 北京100190 3. 中国科学技术大学 计算机科学与技术系 合肥 230026 Development Method of MapReduce Oriented Data Flow Processing YI Xiaohua12 LIU Jie3 YE Dan1 1. Technology Center of Software Engineering Institute of Software Chinese Academy of Sciences Beijing 100190 China 2. Graduate University Chinese A
cademy of Sciences Beijing 100190 China 3. Dept. of Computer Science and Technology University of Science and Technology of China Hefei 230026 China Corresponding author: E-mail: yixiaohua08otcaix.iscas.ac.cn YI Xiaohua LIU Jie YE Dan. Development method of MapReduce oriented data flow processing. Journal of Frontiers of Computer Science and Technology 2011 52: 161-169. Abstract: In the age of information explosion DataFlow processing widely existed and has shown new features and styles including massive and parallel meanwhile more and more people choose to use MapReduce to process their data because of its simplicity and higher capability with lower cost but MapReduce does not directly support com-plex N-step N-branch and multiple data sets data flow processing. This paper proposes a model-driven development method for DataFlow processing based on MapReduce. It first defines the logical and physical models of the data-flow as well as the component model then designs model transfer and code generation algorithms finally uses the algorithms to generate the MapReduce program code which implements the function defined by the logical model and can run on Hadoop platform. Based on this method a development tool CloudDataFlow is implemented. As the experiment shows compared with similar system it has higher performance extendibility and usability. 162 Journal of Frontiers of Computer Science and Technology 计算机科学与探索 2011 52 Key
words: MapReduce data flow processing model-driven development Hadoop platform 摘 要数据处理流程在信息爆炸的今天被广泛应用并呈现出海量和并行的特点 MapReduce编程模型的简单性和高性价比使得其适用于海量数据的并行处理 但是MapReduce不支持多数据源的数据处理 不能直接应用于具有多个处理操作、多个数据流分支的数据处理流程。
提出一种模型驱动的面向MapReduce计算模型的数据处理流程快速开发方法 定义数据处理流程的逻辑模型、物理模型和组件模型 使用模型转换算法和
代码生成算法将逻辑模型转化为物理模型 再转换为能直接在Hadoop平台上运行的MapReduce程序 基于该方法实现了一个开发工具CloudDataFlow。
实验表明该方法可以有效提高数据流程的处理效率。
关键词MapReduce 数据处理流程 模型驱动 Hadoop平台 文献标识码A 中图分类号TP301 1 引言 数据处理通常可以刻画为对一个或多个数据集进行多步数据处理操作的流程化处理过程 这些数据处理操作既包含一些通用的关系形式的操作 如过滤、合并、分组、连接、计数等 也包含一些领域相关的操作 如语义标注、人脸检测等 本文将这个流程化的数据处理过程称为数据处理流程。
常见的数据处理流程应用包括数据仓库应用中的ETL过程 商业智能应用中的数据分析挖掘过程 科学计算领域的科学工作流
搜索引擎领域的大量分析处理过程等 这些典型的应用场景广泛出现在大型企业和科学研究领域。
此外 在信息爆炸的今天 随着数据量的不断增大 数据处理流程越来越呈现出海量和并行的特点 数据格式也以非结构化和结构化的形式出现 而且处理数据的底层系统一般使用集群来搭建 这些对传统的并行数据处理流程实现方法 如DBMS、网格计算等 提出了新的挑战。
现在流行的由Google提出的MapReduce数据并行计算模型极大地简化了在集群上的海量数据并行处理过程1
开源云计算平台Hadoop2实现了这一计算模型 很好地满足了大多数用户海量数据处理的需求 并在实际场景中得到了广泛的应用。
但是MapReduce模型的简单性也导致了几个问题 1 不直接支持复杂的n步n分支数据处理流程操作 而这在实际数据处理中是非常常见的。
2 缺少同时处理多个数据集的严格支持能力必须通过用户自己编程实现这是一项非常艰难的工作。
3 一些常用的基本数据操作 如过滤、连接、分组等操作 在每次使用时必须重复地手工编码 实现。
由于这几个问题的出现 用户在使用MapReduce的过程中总是手工编写各种复杂的数据处理流程 纠缠于多数据源的数据分析 重复地在一些黑盒流程中实现基本的操作 严重限制了MapReduce编程模型的使用 减慢数据分析进度 而且使数据处理程序的可读性大大降低 同时也不可能进行一些自动的流程优化。
本文针对这些问题提出一种基于模型驱动方式实现的面向MapReduce的数据处理流程开发方法。
第2章给出相关工作介绍 第3章给出相关
问题定义 第4章讲述关键的实现算法 第5章具体分析系统的实现框架并通过实验对系统进行评估 第6章对全文进行总结。
2 相关工作 MapReduce编程模型首先由Google的工程师提出1 从用户的角度讲 它将计算过程分为两个最基本的阶段Map和Reduce 每个阶段的输入都 易小华 等面向MapReduce的数据处理流程开发方法 163 是一系列的键值对key/value 每个阶段的输出也是一系列的键值对 如下所示 Map: k1v1→listk2v2 接收键值对k1v1 经过用户编写的Map
代码处理后输出中间键值对k2v2 MapReduce系统将自动根据键值对所有中间值进行分组归并 输出键值对k2listv2 并将其传到Reduce方法中 Reduce:k2listv2→listk3v3 接收Map阶段输出的键值对k2listv2 经过用户编写的Reduce
代码处理后 将这些值进行合并等操作形成一个更小的值的集合一般是每个Reduce调用产生0或者1个输出值1。
随着数据处理流程和MapReduce并行计算模 型应用越来越广泛 很多科研机构或企业都对简化数据处理流程的MapReduce实现上做了大量的研 究 如Apache的Hadoop、FaceBook的Hive、Yahoo的Pig、Google的Sawzall和FlumeJava等。
Hadoop实现了可以在普通的商用机器集群上并行处理海量数据的MapReduce
软件框架2 Hive提供了基于SQL的简单查询语言Hive QL来简化数据处理流程的操作 同时支持用户自定义的Map和Reduce操作3 Pig也基于Hadoop平台提供了一种类SQL的声明式编程语言pigLatin4 Sawzall提供了一种类C语言的编程方式实现MapReduce5 FlumeJava则提供了一种通用的接口。
但是这些系统也存在以下几个问题1 没有统一的数据操作组件模型来支持用户自定义数据处理操作 2 对于普通用户来说使用系统提供的简化MapReduce的编程语言仍然有困难 3 使用通用的MapReduce流程引擎 由于不同流程的差异性 会导致不必要的判断而带来性能上的损失 4 系统在MapReduce运行参数设 置上对用户完全透明 这对于不同的流程执行效率会带来不稳定的影响。
此外 模型驱动开发方法将一个软件系统使用一个抽象模型来描述 然后根据这个模型使用
代码模板转换为实现
代码的开发方法。
基于模型驱动的方法来开发数据处理流程以及工作流的流程化应用越来越受到关注6 如基于Eclipse的EMF、GEF和GMF技术 UML技术 它们提供的模型图形化定义技术、
代码生成技术等能够很好地解决现有的基于MapReduce数据处理流程问题 而且在易用性和可扩展性等方面都非常好。
3 问题定义 本文研究如何采用模型驱动的开发方法将用户定义的符合逻辑模型的数据处理流程 自动转换为相应的MapReduce实现
程序。
首先根据逻辑模型定义特定的数据处理流程 其次将研究如何自动将这个数据处理流程转换为符合系统定义的物理模型和操作组件模型的流程实例 最后将物理流程实例转换为相应的MapReduce实现
代码 如图1所示。
以下将分别定义数据处理流程的逻辑模型、物理模型和数据操作组件模型。
3.1 数据处理流程逻辑模型和物理模型 逻辑模型从用户角度来定义 与实现平台无关物理模型从计算机角度来定义 与具体实现平台、流程调度策略和计算模型相关。
逻辑模型只有在转变为物理模型后才能执行。
数据处理流程的逻辑模型和物理模型中的数据 Fig.1 Development method summary 图1 开发方法概述 164 Journal of Frontiers of Computer Science and Technology
计算机科学与探索 2011 52 统一以关系型记录模式表示 这种模式的逻辑结构就是一张二维表 由行和列构成 每一行是一条数据记录 每条记录由多个数据列构成 每列都包含了列名、数据类型、长度、精度、数据模式等属性 可表示为RS 其中R为表名 S为列 可以表示为一个多元组SName Type Pattern Precision …两个模型都是一个有向无环图的结构GVE 一个简单示例如图2所示 Fig.2 Data flow processing example 图2 数据处理流程示例 从图2可以看出逻辑模型和物理模型中V ∪∪SLT EPr 其中 1 S为有限的数据源节点起始节点集合 用来提供关系型记录集 只有输出。
2 L为有限的数据处理结果装载节点结束节点集合 用来存储数据处理结果集 只有输入。
3 T为有限的数据转换节点中间节点集合至少有一个输入和一个输出。
4 Pr为数据流连接 表示了各个操作之间的依赖关系和数据的流向。
PrRS
VBVE
VB为Pr起始节点 VE为Pr结束节点 逻辑模型和物理模型的Pr表示方式一样。
可以看出数据流连接将各个操作之间的依赖关系分为三种 ① 线性关系表示按线性先后顺序执行的操作序列 如图3a所示。
② 多分支聚合关系某个操作接收多个操作的处理结果进行处理 如图3b所示。
③ 多分支并发关系某个操作的处理结果被分为多个分支并发处理 如图3c所示。
逻辑模型中 每个节点vv∈V可以表示为四元组vIDIOL。
其中 ID用来标识节点唯一性 I Fig.3 Node dependence relations 图3 节点的依赖关系 表示该节点的一个或者多个输入记录模式集 O表示一个或者多个有限输出记录模式集 L表示该操 作的逻辑语义 它包括该节点需要用户提供的一些逻辑操作参数 如节点实现的组件ID、过滤条件等。
逻辑模型可以使用EMF建模技术来定义 EMF提供模型的UML、Java对象和XML三种表示之间的转换 结合GEF技术还可以实现模型的图形化 定义。
物理模型中 每个节点vv∈V可以表示为五元组vID I O S C 其中 ID、I、O的含义和逻辑模型一样 S是物理操作语义 定义MapReduce任务在Hadoop上运行需要的参数和从逻辑节点继承过来的一些操作参数 C是该节点对应的数据操作组件模型实例 数据操作组件模型在下面介绍。
此外 物理模型GVE还包括了流程实现的一些Jet
代码模板P 如数据流连接的
代码模板、流程头部模板、尾部模版等 Jet技术在下一节介绍。
3.2 数据操作组件模型 数据操作组件是对过滤、连接、合并、值替换等
常用的或者领域相关的数据处理操作的抽象描述和实现 这个模型是为了能够以一种统一方式表示节点 从而可以方便地在流程中增加和减少节点 扩展新的节点。
模型充分利用了Jet技术 Jet是一个使用
Java编写的可以根据一个
代码数据模型生成各种
代码或其他文本的生成器 它自动地根据用户定义的数据模型和相应的
代码模板生成模板解析类 并根据模型实例生成实现
代码 用户可以很方便地通过修改
代码模板来改变
代码的实现7 它 易小华 等面向MapReduce的数据处理流程开发方法 165 的
工作方式如图4所示。
组件模型及其工作方式如图5所示。
Fig.4 Jet work method 图4 Jet工作方式 Fig.5 Component model and its work method 图5 组件模型及其工作方式 组件模型包含了组件ID、参数定义文件和
代码模板ID用来标识组件的唯一性 组件的操作参数和配置参数定义文件 用来定义组件实现该操作需要由用户提供的参数信息和在Hadoop平台上运行时的一些参数信息
代码模板指定了实现该组件需要的符合MapReduce计算模型的Map操作
代码和Reduce操作
代码及其输入输出键类型模板 可以只有Map操作模板而没有Reduce操作模板 组件还需要在Hadoop平台上运行所需要的运行参数配置
代码模板 这些
代码模板都是使用Jet模板规范来定义的。
在逻辑模型中 一个节点可以包含多个操作组件 但是它只在L中指定其所使用的组件ID图6 Fig.6 Relation between component and node 图6 组件与节点关系 而不是实际的包含该组件模型的实例。
但是在物理模型中 一个节点只能包含一个组件 而且这个组件是一个使用了节点信息实例化了的符合组数据操作组件模型的组件实例C图6。
根据组件实现是否需要Reduce操作可以把物理节点分为两类Map操作节点和MapReduce操作节点。
组件的工作方式为接收包含它的物理节点提供的输入输出模式信息和参数信息等 并根据这些信息和
代码模板 使用Jet技术生成相应的数据操作处理
代码 并将处理后的输出按照输出模式存放。
4 实现算法 本章将介绍逻辑模型转换为物理模型的算法和物理模型生成MapReduce
代码的算法。
难点在于怎样利用MapReduce计算模型的特性来构造MapReduce任务 尽可能提高流程的并发度和整体执行效率以及如何利用物理模型构造流程的
代码模型和
代码模板 最后生成符合要求的
代码。
4.1 逻辑模型转换算法 用户设计的逻辑模型需要转换为物理模型以后才可以在计算机上运行 接下来以一个简单的示例来介绍逻辑模型转换算法 图2表示的逻辑模型转换为物理模型如图7所示。
图2和图7都省略了一些模型信息 在说明算法前 先引入一个概念“本地节点组” 它用来捕获物理模型中以线性方式执行而不存在分支或者聚合的一系列操作节点 在图7的例子中 可以将流程图分为三个本地组 分别为按节点的标号来 表示节点组1:234 组2:15 组3:6789。
166 Journal of Frontiers of Computer Science and Technology 计算机科学与探索 2011 52 Fig.7 Physical model example 图7 物理模型示例 MapReduce计算模型允许通过合并Map操作节点的方式来生成以mapper/reducer/mapper形式组织的MapReduce任务 从而减少流程MapReduce任务数 降低磁盘和数据传递消耗 提高效率。
逻辑模型转换算法如算法1所示。
算法1 逻辑模型转换算法 输入逻辑模型LGV1E1。
输出物理模型PGV2E2和MapReduce任务集。
1 构造一个空的物理模型PGV2E2 填充一些公共
代码模板信息和从逻辑模型继承过来的流程名等模型公共信息。
2 对LGV1E1进行拓扑排序。
3 按拓扑排序顺序依次遍历每个节点 读取每个节点的组件ID信息和操作参数等信息v1 ID I O S。
4 根据组件ID和其他信息按照数据操作组件模型和物理模型节点信息实例化一个物理节点v2 ID I O S C。
5 按照遍历的先后顺序连接物理节点构造物理节点的Pr。
6 提取物理模型流程中的本地组信息。
7 针对每个本地组提取包含Reduce操作的节点。
以Reduce操作节点为划分点 以mapper/reducer/ mapper对每个组构造MapReduce任务集。
8 按照本地组之间的连接信息和本地组内的连接信息生成各个MapReduce任务之间的依赖关系。
图2按算法1先转换为图7 然后按照mapper /reducer/mapper形式 组1可以转换为map map map一个MapReduce任务mr1 组2转换为map map一个MapReduce任务mr2 组3转变为map map reducemr3和map reduce mapmr4两个MapReduce任务 其中mr3依赖于mr1和mr2的输出数据 mr4依赖于mr3的输出数据。
与将每个节点当成一个MapReduce任务相比 减少了Map Reduce任务数 减少了数据传递消耗 加大了执行并发度 对执行效率有非常大的改进。
4.2 物理模型
代码生成算法 在本文的
代码生成算法中 使用Java语言和Hadoop平台提供的MapReduce接口做样例 每个流程都是一个Java类 数据连接、输入输出类型、 每个节点的Map操作和Reduce操作都作为该类的内部类来实现 然后在runJob方法中按照物理模型构造的MapReduce任务集组织各个节点的Map和Reduce操作 Hadoop平台提供的ChainMapper和ChainReducer接口直接对Map/Reduce/Map形式的MapReduce任务提供支持 接着配置好任务的运行参数信息和任务依赖关系 并使用Hadoop提供的JobControl接口提交任务 最后在Main方法中执行runJob方法 如图8所示
代码生成算法如算法2所示。
算法2 物理模型
代码生成算法 输入物理模型PGV2E2和MapReduce任务集 以及
代码模板。
输出PGV2E2对应MapReduce实现
代码。
1 根据流程头部信息和头部模板解析类生成流程类的头部。
2 根据连接信息和连接模板解析类生成流程连接内部类。
3 按拓扑排序顺序依次遍历PG每个节点v2 根据节点信息和节点包含的组件信息生成输入输出键类型内部类 Map和Reduce操作内部类。
4 声明runJob方法 并在方法体中遍历MapReduce任务集的每个任务 根据节点对应组件的Hadoop、Job配置模板和配置信息生成每个节点 易小华 等面向MapReduce的数据处理流程开发方法 167 Fig.8 Code generation algorithm 图8
代码生成算法 的配置模板。
5 在runJob方法中 使用ChainMapper和Chain-Reduce接口连接各个节点的Map和Reduce操作。
6 遍历每个本地组。
7 针对每个本地组提取包含Reduce操作的节点。
以Reduce操作节点为划分点 按mapper/reducer/ mapper对每个组构造MapReduce任务集。
8 按照本地组直接的连接信息和本地组内的连接信息生成各个MapReduce任务之间的依赖关系 使用JobControl控制任务。
9 在main方法中执行流程。
5
系统实现和评估 本章简单介绍系统的整体实现框架 然后使 用实现的系统执行参考文献8中的测试查询 并 与现有的基于MapReduce的数据流程处理系统Pig做比较 来测试系统的实现性能 得到初步的实验结果。
5.1 系统实现架构 系统CloudDataFlow的实现基于Eclispe插件技术和GMF建模技术以及Hadoop平台提供的MapReduce编程接口 并使用Jet
代码生成技术来生成流程的执行
代码 在Hadoop平台上运行。
系统架构如图9所示。
系统的
设计尽量使用了分层和可扩展机制。
在数据源上 数据源适配器为HDFS、XML、关系数据库、普通文件等各种数据源或者用户自定义的数据源的访问定义了一个统一的接口。
在转换组件和流程的设计上使用GMF技术 GMF提供了快速开发基于图形化编辑器GEF和EMF建模技术的开发框架。
使用Eclipse插件技术实现数据操作组件和数据源的可扩展性。
然后分别经过逻辑模型和物理模型转换以及
代码生成器生成流程的MapReduce实现程序。
最后利用Hadoop平台提供的各种机制优化、调度和运行MapReduce任务。
Fig.9 CloudDataFlow system frame 图9 CloudDataFlow系统框架图 5.2 实验设置 实验目的主要是验证基于
代码生成算法实现的MapReduce数据处理流程 与Pig等使用通用引擎实现的数据处理流程在处理效率、执行时间上的对比。
所有的实验都运行在一个有5个节点的集群上。
实验数据和查询基于SIGMOD 2009年的
论文8 Pig选取版本为主分支中的786346 使用的Hadoop版本为0.18.X 每个节点的数据都相应地扩展或
压缩使得执行时间不会超过30分钟。
测试了
论文中的前 168 Journal of Frontiers of Computer Science and Technology 计算机科学与探索 2011 52 4个
查询 这些查询可以在http://database.cs.brown. edu/projects/mapreduce-vs-dbms/查看到 包括2个选择查询操作 1个聚集操作和1个连接操作 每.