hadoop2.7之作业提交详解(下) hadoop2.7之作业提交
接着作业提交详解(上)继续写:在上一篇(hadoop2.7之作业提交详解(上))中已经讲到了YARNRunner.submitJob()
[WordCount.main() -> Job.waitForCompletion() -> Job.submit()? -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()] 那么现在接着从YARNRunner.submitJob()开始说: 先简单看一下YARNRunner这个类(摘录一部分): package org.apache.hadoop.mapred; public class YARNRunner implements ClientProtocol { private ResourceMgrDelegate resMgrDelegate; //这是RM派驻在“地方”上的特派员 private ClientCache clientCache; Configuration conf; private final FileContext defaultFileContext; public YARNRunner(Configuration conf) {构造函数,需要创建特派员,然后调用下一个构造函数 this(conf,new ResourceMgrDelegate(new YarnConfiguration(conf))); } public YARNRunner(Configuration conf,ResourceMgrDelegate resMgrDelegate) {需要创建ClientCache ClientCache(conf,resMgrDelegate)); } public YARNRunner(Configuration conf,ResourceMgrDelegate resMgrDelegate,ClientCache clientCache) {这是最终的构造函数 this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient",ufe); } } JobStatus submitJob(JobID jobId,String jobSubmitDir,Credentials ts) throws IOException,InterruptedException { addHistoryToken(ts);用于为历史记录服务,与“作业历史(JobHistory)”有关 Construct necessary information to start the MR AM 构建MR AM的必要启动信息 创建一个ApplicationSubmissionContext,并将conf中的相关信息转移过去 ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf,jobSubmitDir,ts); Submit to ResourceManager { /* 将作业提交给资源管理者(ResourceManager)*/ RM受理了所提交的作业以后,会把这个ContainerLaunchContext转发到某个NM节点 上,在那里执行这个shell命令行,另起一个Java虚拟机,让它执行MRAppMaster.class。 由此可见,这个ApplicationSubmissionContext对象appContext,真的是“代表着ResourceManager 为发起该应用的ApplicationMaster所需的全部信息” ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.Failed || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } (YarnException e) { IOException(e); } } } 其中createApplicationSubmissionContext方法的作用: 接下来就是调用ResourceMgrDelegate.submitApplication方法:(所以我们先看一下ResourceMgrDelegate这个类) class ResourceMgrDelegate extends YarnClient { YarnConfiguration conf; ApplicationSubmissionContext application; ApplicationId applicationId; protected YarnClient client;实际上是YarnClientImpl类的对象,那也是对YarnClient的继承和扩展 Text rmDTService; 这是ResourceMgrDelegate的构造方法 ResourceMgrDelegate(YarnConfiguration conf) { super(ResourceMgrDelegate.class.getName()); conf; 创建YarnClient对象client YarnClient.createYarnClient()创建的是YarnClientImpl this.client = YarnClient.createYarnClient(); init(conf);这是由AbstractService类提供的,YarnClient是对AbstractService的扩展 start();这也是由AbstractService类提供的 } ApplicationId submitApplication(ApplicationSubmissionContext appContext) YarnException,IOException { client.submitApplication(appContext);//调用YarnClientImpl.submitApplication方法 } 从前面所有的代码中我们可以得知: ResourceMgrDelegate对象是在YARNRunner的构造函数中创建的。而YARNRunner,则是在前面的Cluster.Initialize()中创建的。再往上追溯,则Cluster类对象是在首次调用connect()时创建的。所以,任何一个节点,只要曾经调用过connect(),即曾经与“集群”有过连接,节点上就会有个Cluster类对象,从而就会有个YARNRunner对象,也就会有个ResourceMgrDelegate对象,而且如下所述就会有个YarnClientImpl对象。 现在为止,我们的作业提交路径是: [WordCount.main() -> Job.waitForCompletion() -> Job.submit()? -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() ->?ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication()] 解下来我们继续看YarnClientImpl.submitApplication()方法: appContext.getApplicationId(); if (applicationId == null) { ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } 创建一个SubmitApplicationRequestPBImpl类的记录块 SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.); request.setApplicationSubmissionContext(appContext);设置好记录块中的Context Automatically add the timeline DT into the CLC Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request);实际的跨节点提交 int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet<YarnApplicationState> waitingStates = EnumSet.of(YarnApplicationState.NEW,YarnApplicationState.NEW_SAVING,YarnApplicationState.SUBMITTED); EnumSet<YarnApplicationState> failToSubmitStates = EnumSet.of(YarnApplicationState.Failed,YarnApplicationState.KILLED); while (true { 获取来自RM节点的应用状态报告,从中获取本应用的当前状态 ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break;作业已进入运行阶段,结束while循环 } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } Notify the client through the log every 10 poll,in case the client is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished," + "submitted application " + applicationId + " is still in " + state); } { Thread.sleep(submitPollIntervalMillis); } (InterruptedException ie) { String msg = "Interrupted while waiting for application " + applicationId + " to be successfully submitted."; LOG.error(msg); YarnException(msg,ie); } } (ApplicationNotFoundException ex) { FailOver or RM restart happens before RMStateStore saves ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request);失败后的再次提交 } } applicationId; } 从上看来只要是调用了rmClient.submitApplication(request)方法,那这儿rmClient又是个什么呢?我们接着来看一下YarnClientImpl这个类的简单定义: class YarnClientImpl YarnClient { static final Log LOG = LogFactory.getLog(YarnClientImpl.); protected ApplicationClientProtocol rmClient; protected long submitPollIntervalMillis; asyncApiPollIntervalMillis; asyncApiPollTimeoutMillis; AHSClient historyClient; boolean historyServiceEnabled; TimelineClient timelineClient; @VisibleForTesting Text timelineService; @VisibleForTesting String timelineDTRenewer; timelineServiceEnabled; timelineServiceBestEffort; final String ROOT = "root"; YarnClientImpl() { super(YarnClientImpl..getName()); } 从上可以看出rmClient是一个ApplicationClientProtocol对象,这个又是一个接口,具体的实现类是ApplicationClientProtocolPBClientImpl ,接下来我们看一下这个类: class ApplicationClientProtocolPBClientImpl ApplicationClientProtocol,Closeable { ApplicationClientProtocolPB proxy; public ApplicationClientProtocolPBClientImpl( clientVersion,InetSocketAddress addr,Configuration conf) IOException { 将配置项“rpc.engine.ApplicationClientProtocolPB”设置成ProtobufRpcEngine RPC.setProtocolEngine(conf,ApplicationClientProtocolPB.); 创建proxy 这个proxy存在于用户为提交运行具体应用而起的那个JVM上,它既不属于 ResourceManager,也不属于NodeManager,而是一个独立的Java虚拟机,可以是在集群内的任何一台机器上 proxy = RPC.getProxy(ApplicationClientProtocolPB. SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) 从请求request中取出其协议报文(message)部分 SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl) request).getProto(); { 交由proxy将报文发送出去,并等候服务端回应 将服务端回应包装成SubmitApplicationResponsePBImpl对象 return new SubmitApplicationResponsePBImpl(proxy.submitApplication( (ServiceException e) { RPCUtil.unwrapAndThrowException(e); ; } } } ApplicationClientProtocolPBClientImpl的submitApplication方法,在其里面就是调用proxy.submitApplication方法,而proxy是在构造函数中创建的。 通过proxy发出的SubmitApplicationRequest,是以RM节点为目标的,最终经由操作系统提供的网络传输层以TCP报文的方式送达RM所在节点机上的对等层,那上面是 Client端 ? Server端: 那么接下来就是通过tcp/ip调用服务端ApplicationClientProtocolPBServiceImpl.submitApplication()方法; class ApplicationClientProtocolPBServiceImpl ApplicationClientProtocolPB { ApplicationClientProtocol real; ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) { this.real = impl; } SubmitApplicationResponseProto submitApplication(RpcController arg0,SubmitApplicationRequestProto proto) ServiceException { SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);创建一个请求 { SubmitApplicationResponse response = real.submitApplication(request); real为ClientRMService类对象 ,该对象在RM初始化时由createClientRMService() 方法创建 ((SubmitApplicationResponsePBImpl)response).getProto(); } ServiceException(e); } (IOException e) { ServiceException(e); } } } 接下来调用ClientRMService.submitApplication(request); 方法 YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationSubmissionContext needs to be validated for safety - only those fields that are independent of the RM's configuration will be checked here,those that are dependent on RM configuration are validated in RMAppManager. String user = ; Safety user = UserGroupInformation.getCurrentUser().getShortUserName();获取用户 } (IOException ie) { LOG.warn("Unable to get the current user."throw RPCUtil.getRemoteException(ie); } Check whether app has already been put into rmContext, If it is,simply return the response 判断作业是否已经存在,如果是则直接返回实例 if (rmContext.getRMApps().get(applicationId) != ) { LOG.info("This is an earlier submitted application: " + applicationId); SubmitApplicationResponse.newInstance(); } 如果没有设置队列,则使用默认队列 if (submissionContext.getQueue() == ) { submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); } 如果没有设置application名字,则使用默认的命名规则 if (submissionContext.getApplicationName() == ) { submissionContext.setApplicationName( YarnConfiguration.DEFAULT_APPLICATION_NAME); } 如果没有指定提交类型,则指定默认为yarn模式 if (submissionContext.getApplicationType() == ) { submissionContext .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); } elseif (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { submissionContext.setApplicationType(submissionContext .getApplicationType().substring(0 call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(),user);提交作业到rmAppManager手中 LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user,applicationId); } (YarnException e) { LOG.info("Exception in submitting application with id " + applicationId.getId(),e); RMAuditLogger.logFailure(user,e.getMessage(),1)"> e; } SubmitApplicationResponse response = recordFactory .newRecordInstance(SubmitApplicationResponse.); response; } 从作业提交的角度看,一旦进入了 RM 节点上的RMAppManagers. ubmitApplication(),作业的提交就已完成。 至于这以后的处理,那是 RM的事了,作业提交的最终流程就是: [WordCount.main() -> Job.waitForCompletion() -> Job.submit()? -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() ->?ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication() ->?ApplicationClientProtocolPBClientImpl.submitApplication() ->?ApplicationClientProtocolPBServiceImpl.submitApplication() ->?ClientRMService.submitApplication() ->?RMAppManager.submitApplication()?] (编辑:北几岛) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |