【Java精品源码栏目提醒】:网学会员为广大网友收集整理了,MapReduce源码流程分析 - 讲义教程,希望对大家有所帮助!
1 / 49 MapReduce
源码流程分析 MapReduce
源码流程分析 ............................................................................................................... 1 1.角色 ............................................................................................................................................... 2 1.1JobClient ............................................................................................................................... 2 1.2JobTracker ............................................................................................................................ 2 1.3TaskTracker ........................................................................................................................... 2 2.数据结构........................................................................................................................................ 2 2.1JobInProgress ....................................................................................................................... 2 2.2TaskInProgress ..................................................................................................................... 2 2.3TaskLauncher ....................................................................................................................... 3 2.4TaskTrackerTaskInProgress ................................................................................................. 3 2.5Task....................................................................................................................................... 3 2.6TaskRunner ........................................................................................................................... 3 2.7JvmManager ......................................................................................................................... 3 3.流程 ............................................................................................................................................... 3 3.1 Client端 ............................................................................................................................... 3 3.1.1初始化....................................................................................................................... 3 3.1.2与JobTracker端的交互.............................................................................................. 4 3.2 JobTracker端 ....................................................................................................................... 7 3.2.1启动初始化 ............................................................................................................... 7 3.2.2与Client端交互 ....................................................................................................... 10 3.2.3与TaskTracker端交互 ............................................................................................. 16 3.3 TaskTracker端 ................................................................................................................... 17 3.3.1启动初始化 ............................................................................................................. 17 3.3.2与JobTracker端交互 ............................................................................................... 27 3.3.3运行任务 ................................................................................................................. 27 2 / 49 1.角色 1.1JobClient 每一个job都会在用户端通过JobClient类将应用程序以及配置参数打包成jar文件存储在HDFS并把路径提交到JobTracker然后由JobTracker创建每一个Task即MapTask和ReduceTask并将它们分发到各个TaskTracker服务中去执行。
1.2JobTracker JobTracker是一个master中心节点服务 JobTracker负责生成和调度job的每一个子任务task运行于TaskTracker上并监控它们如果发现有失败的task就重新运行它。
一般情况应该把JobTracker部署在单独的机器上。
1.3TaskTracker TaskTracker是运行于多个节点上的slaver服务。
TaskTracker则负责直接执行每一个task。
TaskTracker都需要运行在HDFS的DataNode上。
2.数据结构 2.1JobInProgress JobClient提交job后JobTracker会创建一个JobInProgress来跟踪和调度这个job并把它添加到job队列里。
JobInProgress会根据提交的job jar中定义的输入数据集已分解成FileSplit创建对应的一批TaskInProgress用于监控和调度MapTask同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask缺省为1个ReduceTask。
2.2TaskInProgress JobTracker启动任务时通过每一个TaskInProgress来launchTask这时会把Task对象即MapTask和ReduceTask序列化写入相应的TaskTracker服务中TaskTracker收到后会创建对应的TaskInProgress此TaskInProgress实现非JobTracker中使用的TaskInProgress作用类似 3 / 49 用于监控和调度该Task。
启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。
TaskRunner会自动装载job jar并设置好环境变量后启动一个独立的
java child进程来执行Task即MapTask或者ReduceTask但它们不一定运行在同一个TaskTracker中。
2.3TaskLauncher 2.4TaskTrackerTaskInProgress 2.5Task 2.6TaskRunner 2.7JvmManager 3.流程 启动。
3.1 Client端 3.1.1初始化 客户端初始化主要是创建RPC远程服务器在创建JobClient的时候进行创建的。
是在客户端提交作业代码执行时进行的提交之后立刻客户端做必要的初始化之后再提交作业。
JobClient jc new JobClientjob RunningJob rj jc.submitJobjob 4 / 49 3.1.2与JobTracker端的交互 客户端主要负责像JobTracker提交作业在旧版的api中通过使用JonConf类对作业进行配置JobConf的功能已被新的类Configuration和Job替换。
Configuration类描述了资源这些资源大多都是从XML配置文件中读取的属性和值组成。
比如来自core-default.xml和core-site.xml。
Job描述了用户角度的视图它允许用户配置、提交、控制它的执行和查询状态。
如图所示是一个MapReduce的程序客户端提交作业的代码通过生成一个JobConf对作业进行配置也可以用新的api生成一个Job注释中最后通过JobClient.runJobconf对作业进行提交也可用新的api job.waitForCompletiontrue来对作业进行提交。
5 / 49 图3.1客户端提交作业代码 此方法会返回一个RunningJob对象它用来跟踪作业的状态。
作业提交完毕后JobClient会根据此对象开始轮询作业的进度直到作业完成。
在JobClient.runJob方法中会创建一个JobClient对象之后调用JobClient的submitJob方法此方法返回RunningJob对象在submitJob方法中调用submitJobInternal方法。
submitJobInternal此方法是提交作业的主要方法。
它主要实现了Hadoop权威指南P167所描述的作业提交的过程。
主要有 JobID jobId jobSubmitClient.getNewJobId 向JobTracker请求一个新的作业ID通过调用JobTracker的getNewJobId来实现 Path submitJobDir new PathgetSystemDir jobId.toString Path submitJarFile new PathsubmitJobDir quotjob.jarquot Path submitSplitFile new PathsubmitJobDir quotjob.splitquot configureCommandLineOptionsjob submitJobDir submitJarFile Path submitJobFile new PathsubmitJobDir quotjob.xmlquot 将运行所需要的资源——包括作业job.jar文件配置文件job.xml即所得的输入划分文件job.split复制到一个以作业ID号命名的目录中。
configureCommandLineOptions函数的作用是在fs中创建虚拟路径…… 其中job.xmljob.jarjob.split作用如下 job.xml: 作业配置例如Mapper Combiner Reducer的类型输入输出格式的类型等。
job.jar: jar包里面包含了执行此任务需要的各种类比如 MapperReducer等实现。
job.split: 文件分块的相关信息比如有数据分多少个块块的大小默认64m等。
// Check the output specification if reduces 0 job.getUseNewMapper : job.getUseNewReducer org.apache.hadoop.mapreduce.OutputFormatltgt output 6 / 49 ReflectionUtils.newInstancecontext.getOutputFormatClass job output.checkOutputSpecscontext else job.getOutputFormat.checkOutputSpecsfs job 检查输出经是否已经存在。
//计算作业的输入划分 // Create the splits for the job LOG.debugquotCreating splits at quot fs.makeQualifiedsubmitSplitFile int maps if job.getUseNewMapper maps writeNewSplitscontext submitSplitFile else maps writeOldSplitsjob submitSplitFile job.setquotmapred.job.split.filequot submitSplitFile.toString job.setNumMapTasksmaps // Write job file to JobTrackers fs FSDataOutputStream out FileSystem.createfs submitJobFilenew FsPermissionJOB_FILE_PERMISSION try job.writeXmlout finally out.close 将job.xml写入到JobTracker的文件系统里具体就是对job.xml把Configuration的配置信息写在job.xml里。
// Now actually submit the job using the submit name JobStatus status jobSubmitClient.submitJobjobId if status null return new NetworkedJobstatus else throw new IOExceptionquotCould not launch jobquot 提交作业。
这里通知JobTracker发送jobId过去由JobTracker根据jobId去执行。
JobClient里面使用RPC机制来构造一个实现 JobSubmissionProtocol接口的JobTracker的代理然后利用远程发放直接执行JobTracker里的submitJob与我们的利用Socket通信略有不同。
NetworkedJob通过实现JobSubmissionProtocol接口的代理来与JobTracker进行通信。
7 / 49 3.2 JobTracker端 JobTracker的地位相当于我们的Master它负责调度job的每一个子任务task运行于slave上并监控它们如果发现有失败的task就重新运行它。
JobTracker一直在等待JobClient通过RPC提交作业而TaskTracker一直通过RPC向 JobTracker发送心跳heartbeat询问有没有任务可做如果有让其派发任务给它执行。
如果JobTracker的作业队列不为空 则TaskTracker通过发送心跳来会获得JobTracker给它派发的任务。
这是一道pull过程: slave主动向master拉生意。
slave节点的TaskTracker接到任务后在其本地发起Task然后执行任务。
3.2.1启动初始化 JobTracker启动时首先运行main方法 public static void mainString argv throws IOException InterruptedException StringUtils.startupShutdownMessageJobTracker.class argv LOG if argv.length 0 System.out.printlnquotusage: JobTrackerquot System.exit-1 try JobTracker tracker startTrackernew JobConf tracker.offerService catch Throwable e LOG.fatalStringUtils.stringifyExceptione System.exit-1 在main方法中调用startTracker方法和tracker.offerService方法。
startTracker方法主要调用两个方法如下图所示代码。
8 / 49 图3.1 startTracker方法 如图所示一个方法是JobTracker的构造方法一个是jobTracker.taskSchedule.setTaskTrackerManager方法。
在JobTracker的构造方法中首先会初始化常量数据之后创建队列管理器queueManager和作业调度器。
queueManager new QueueManagerthis.conf // Create the scheduler Classlt extends TaskSchedulergt schedulerClass conf.getClassquotmapred.jobtracker.taskSchedulerquot JobQueueTaskScheduler.class TaskScheduler.class taskScheduler TaskScheduler ReflectionUtils.newInstanceschedulerClass conf 这里调度器的创建使用了反射机制实例化了一个JobQueueTaskScheduler对象。
调度策略默认为FIFO。
在JobQueueTaskScheduler构造方法中会生成一个JobQueueJobInProgressListener即JobQueueJobInProgress监听器。
JobQueueJobInProgressListener构造方法会创建一个MapltJobSchedulingInfo JobInProgressgt通过传递一个比较器FIFO_JOB_QUEUE_COMPARATOR它是 9 / 49 JobQueueJobInProgressListener类的静态函数。
同时通过反射机制调用了JobQueueTaskScheduler的setConf方法具体参见相关源代码这里就不列举了。
在setConf方法中创建了EagerTaskInitializationListener监听器。
// Set ports start RPC servers setup security policy etc. InetSocketAddress addr getAddressconf this.localMachine addr.getHostName this.port addr.getPort // Set service-level authorization security policy if conf.getBoolean ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG false PolicyProvider policyProvider PolicyProviderReflectionUtils.newInstance conf.getClassPolicyProvider.POLICY_PROVIDER_CONFIG MapReducePolicyProvider.class PolicyProvider.class conf SecurityUtil.setPolicynew ConfiguredPolicyconf policyProvider int handlerCount conf.getIntquotmapred.job.tracker.handler.countquot 10 this.interTrackerServer RPC.getServerthis addr.getHostName addr.getPort handlerCount false conf 启动RPC服务器。
String infoAddr NetUtils.getServerAddressconf quotmapred.job.tracker.info.bindAddressquot quotmapred.job.tracker.info.portquot quotmapred.job.tracker.http.addressquot InetSocketAddress infoSocAddr NetUtils.createSocketAddrinfoAddr String infoBindAddress infoSocAddr.getHostName int tmpInfoPort infoSocAddr.getPort this.startTime System.currentTimeMillis infoServer new HttpServerquotjobquot infoBindAddress tmpInfoPort tmpInfoPort 0 conf infoServer.setAttributequotjob.trackerquot this // initialize history parameters. boolean historyInitialized JobHistory.initconf this.localMachine this.startTime String historyLogDir null FileSystem historyFS null if historyInitialized historyLogDir conf.getquothadoop.job.history.locationquot 10 / 49 infoServer.setAttributequothistoryLogDirquot historyLogDir historyFS new PathhistoryLogDir.getFileSystemconf infoServer.setAttributequotfileSysquot historyFS infoServer.addServletquotreducegraphquot quot/taskgraphquot TaskGraphServlet.class infoServer.start 启动TrackInfoServer。
构造函数还有其他的内容这里就不细说了其实是其他的也没仔细看。
现在回到方法startTracker方法在创建JobTracker下面还有一句代码如下 result new JobTrackerconf identifier result.taskScheduler.setTaskTrackerManagerresult JobTracker将自己的调度器的TaskTrackerManager设置成自己。
这么做使得后来再初始化Task时调用的是自己的InitJob方法。
现在回到main方法在startTracker方法后还有一个tracker.offerService方法要执行。
在tracker.offerService中会启动几个服务其中关键的有调度器的启用和interTrackerServer的启用。
taskScheduler.start this.interTrackerServer.start taskScheduler实质上是JobQueueTaskScheduler当它启动时会将事先生成的监听器加入到taskTrackerManager的监听器队列中taskTrackerManager是一个抽象类被JobTracker实现。
所以调用的是JobTracker的addJobInProgressListener方法。
Override public synchronized void start throws IOException super.start taskTrackerManager.addJobInProgressListenerjobQueueJobInProgressListener eagerTaskInitializationListener.setTaskTrackerManagertaskTrackerManager eagerTaskInitializationListener.start taskTrackerManager.addJobInProgressListener eagerTaskInitializationListener 3.2.2与Client端交互 客户端使用RPC机制来构造一个实现 JobSubmissionProtocol接口的JobTracker的代理然后利用远程发放直接执行JobTracker里的submitJob方法进而提交作业。
下面来看一下JobTracker.submitJob方法。
11 / 49 图3-2 JobTracker提交作业 在这个函数中会创建一个JobInProgress对象并把它加入到jobs队列中jobs是一个TreeMap:MapltJobID JobInProgressgt jobs new TreeMapltJobID JobInProgressgt。
是在addJob方法中加入到jobs并且将它加入到监听队列中。
代码如下 private synchronized JobStatus addJobJobID jobId JobInProgress job totalSubmissions synchronized jobs synchronized taskScheduler jobs.putjob.getProfile.getJobID job for JobInProgressListener listener : jobInProgressListeners try listener.jobAddedjob catch IOException ioe LOG.warnquotFailed to add and so skipping the job : quot job.getJobID quot. Exception : quot ioe 12 / 49 myInstrumentation.submitJobjob.getJobConf jobId return job.getStatus 这些监听器是在JobTracker启动初始化的时候构造的有JobQueueJobInProgressListener和EagerTaskInitializationListener。
其中eagerTaskInitializationListener负责任务Task的初始化。
其具体实现是这样的: 这个listener在初始化时会开启一个JobInitThread线程当作业通过jobAddedjob加入到初始化队列 jobInitQueue中根据作业的优先级排序resortInitQueue方法后 这个线程就会调用JobInProgress.initTasks立即初始化作业的所有任务。
调用关系如下在JobTracker启动初始化的时候会启动taskSchedule即调用taskSchedule.start即调用JobQueueTaskScheduler.start在JobQueueTaskScheduler.start方法中会调用 super.start taskTrackerManager.addJobInProgressListenerjobQueueJobInProgressListener eagerTaskInitializationListener.setTaskTrackerManagertaskTrackerManager eagerTaskInitializationListener.start taskTrackerManager.addJobInProgressListenereagerTaskInitializationListener.
上一篇:
程序员面试题精选100题
下一篇:
bc80e7a0-d1f2-4595-b21d-01a76798e87a