消息处理
Master收到的各种消息的回调函数对应关系
消息 | 回调函数 | 参数 |
---|---|---|
SubmitSchedulerRequest | Master::submitScheduler | name |
RegisterFrameworkMessage | Master::registerFramework | framework |
ReregisterFrameworkMessage | Master::reregisterFramework | framework, failover |
UnregisterFrameworkMessage | Master::unregisterFramework | framework_id |
DeactivateFrameworkMessage | Master::deactivateFramework | framework_id |
ResourceRequestMessage | Master::resourceRequest | framework_id requests |
LaunchTasksMessage | Master::launchTasks | framework_id tasks filters offer_ids |
ReviveOffersMessage | Master::reviveOffers | framework_id |
KillTaskMessage | Master::killTask | framework_id task_id |
StatusUpdateAcknowledgementMessage | Master::statusUpdateAcknowledgement | slave_id framework_id task_id uuid |
FrameworkToExecutorMessage | Master::schedulerMessage | slave_id framework_id executor_id data |
RegisterSlaveMessage | Master::registerSlave | slave checkpointed_resources version |
ReregisterSlaveMessage | Master::reregisterSlave | slave checkpointed_resources executor_infos tasks frameworks completed_frameworks version |
UnregisterSlaveMessage | Master::unregisterSlave | slave_id |
StatusUpdateMessage | Master::statusUpdate | update pid |
ExecutorToFrameworkMessage | Master::executorMessage | slave_id framework_id executor_id data |
ReconcileTasksMessage | Master::reconcileTasks | framework_id statuses |
ExitedExecutorMessage | Master::exitedExecutor | slave_id framework_id executor_id status |
UpdateSlaveMessage | Master::updateSlave | slave_id oversubscribed_resources |
AuthenticateMessage | Master::authenticate | pid |
Master::registerFramework
收到RegisterFrameworkMessage时调用
- 2085-2096:
- 2098-2101:从参数frameworkInfo拷贝信息到call,subscribe(from, call),参见Master::subscribe
Master::unregisterFramework
收到UnregisterFrameworkMessage时调用
若参数正确(framework->pid == from),调用teardown(framework)
Master::deactivateFramework
收到DeactivateFrameworkMessage时调用
- 2701-2715:判断参数有无错误,有错则输出警告信息并返回
- 2717:deactivate(framework)
Master::disconnect
Master::deactivate
Master::resourceRequest
收到ResourceRequestMessage时调用。什么情况下发出这种消息????????????
- 2839-2851:判断参数合法性
- 2853-2858:调用Master::request
Master::request
- 2868:输出信息
- 2870:更新统计信息
- 2872-2874:调用allocator->requestResources,具体见allocator
Master::launchTasks
- 2907-2925:判断参数合法性
- 2929:如果tasks不为空
- 2930-2934:设置操作为LAUNCH
- 2944:调用accept
- 2945-2953:否则调用decline
Master::accept
参数:
- framework
accept
3249-3259:更新统计信息
- 3267-3268:offer内容为空,报错
- 3271:检查offer有效性?????
- 3275:对每个offer
- 3276:获得offer
- 3278:得到slaveId
- 3279:计算资源的总和
- 3281-3287:若有错误,恢复资源。。。。。。
- 3288:removeOffer
- 3304-3333:若有错误,。。。。。。。。。
- 3338-3340:确保有slave ID,并得到Slave对象
- 3345:声明一个futures向量
- 3346:针对accept.operations中的每一个operation
- 3347:根据operation类型
- 3348-3366:LAUNCH,将授权过的task/framework加入futures,将task加入framework的pendingTasks队列
- 3375-3385:RESERVE,将授权的操作加入futures
- 3388-3397:UNRESERVE,将授权的操作加入futures
- 3401-3410:CREATE,将授权的操作加入futures
- 3414-3424:DESTROY,将授权的操作加入futures
- 3347:根据operation类型
- 3429-3426:根据futures处理,调用
_accept
.......
Master::_accept
参数:
- frameworkId
- slaveId
- offeredResources
- accept
_authorizations
3447:得到framework
- 3451-3464:若有错误,恢复资源。。。。。。
- 3466:根据ID获得Slave对象
- 3468-3507:若slave不存在或是未连接,发送一个UPDATE消息。。。并恢复资源。。。。
- 3516-3517:检查对操作的授权是否已经完成
- 3519:对每一个操作:
- 3520:根据操作类型
- 3522-3575:RESERVE。。。。。。。
- 3578-3627:UNRESERVE。。。。。。
- 3629-3681:CREATE。。。。。
- 3683-3731:DESTROY。。。。。。。
- 3733-3734:LAUNCH,对launch操作中的每个task
- 3735-3736:从队列最前面???取认证信息
- 3744:task是否在pendingTasks队列?记录在pending中
- 3747:将task从pendingTasks队列移走
- 3749-3781:检查认证信息是否依然有效,若已经无效,则发一个认证失败的状态更新信息,并跳到下一次循环,处理下一个操作
- 3790-3823:调用validation::task::validate检查任务是否有效??若已经无效,则发一个TASK_ERROR的状态更新信息,并跳到下一次循环,处理下一个操作
- 3826-3855:若pending,则在framework中增加任务(调用addTask),并将offer的资源减去该任务消耗的资源;给slave发RunTaskMessage消息?调用HookManager???????(这里是和label有关???)
- 3520:根据操作类型
- 3866-3873:完成所有动作后,最后检查,若offer提供的资源还未消耗完,则调用allocator->recoverResources恢复这些资源。
RunTaskMessage在src/messages/messages.proto中定义。
Master 调度相关消息
subscribe见include/mesos/v1/scheduler/scheduler.proto
包括以下事件类型消息(Event):
- Subscribed(当提交scheduler时master收到的第一个消息?????????)
- Offers(当有新的资源offer给scheduler时)
- InverseOffers(当scheduler将资源要回去时??每个inverse offer定义了agent及可选的指定资源,如果该agent上的所有资源都要求被释放,则不指定资源)
- Rescind:
- RescindInverseOffer
- Update:当executor/agent/master产生了状态的更新时,应该由executor使用以与它们管理的task可靠地通知状态。当任务终结时,executor要立刻发送一个更新(v1/mesos.proto)是很关键的,以使得mesos能够及时释放资源。scheduler也要负责应答这种消息。
- Message:非预设消息,mesos不解释这种消息,只是转发给scheduler且不保证可靠性,如果消息丢失executor必须重传
- Failure:agent移出mesos集群时
- Error:当scheduler发生不可恢复的错误时
- HEARTBEAT
调用类型消息(Call):
- Subscribe
- Accept
- Decline
- AcceptInverseOffers
- DeclineInverseOffers
- Kill
- Shutdown
- Acknowledge
- Reconcile
- Message
- Request
- TEARDOWN
- SUPPRESS
Master::subscribe的处理
在master.cpp中有两个subscribe,处理不同的情景,一种是处理HTTP来的请求,另一种是?????
HTTP方式
Master::subscribe参数:
- http
subscribe
2137:得到frameworkInfo
- 2140-2144:若frameworkInfo没有id,则属于注册framework,否则是再注册framework,分别将相关计数加1
- 2149-2195:判断有效性。。。。。。
- 2198-2210:调用
_subscribe
Master::_subscribe
参数:
- http
- frameworkInfo
- force
authorized
2220-2242:判断授权信息
- 2249-2268:如果是第一次subscribe该framework,则新建framework,发送应答消息,调用heartbeat,最后返回
- 2273-2289:frameworks中有该frameworks的ID,表明已经注册过,此次是重新注册,更新相关信息(allocator也要更新相关信息),记录下再注册的时间,最后调用failoverFramework
- 2289:frameworks中找不到该frameworks的ID,原因只能是有了新的master leader
- 2294:创建新的Framework对象
- 2297-2305:搜索所有slave,将该framework对应的task和executor再加入到新的framework中
- 2310:将新创建的framework加入master
- 2312-2319:发送FrameworkReregisteredMessage应答消息,调用heartbeat
- 2328-2337:给每个slave发UpdateFrameworkMessage消息
另一种方式
谁调用的????
Master::subscribe
收到Subscribe消息时被调用???
参数:
- const UPID& from
const scheduler::Call::Subscribe& subscribe
2345:从subscribe得到frameworkInfo
- 2348-2352:若frameworkInfo没有id,则属于注册framework,否则是再注册framework,分别将相关计数加1
- 2354-2371:若from属于正在进行认证,则等待认证结束后返回?????
- 2375-2422:判断认证是否有效,若失败,则给from发送失败消息并返回????
- 2441-2453:调用
_subscribe
Master::_subscribe
- 2467-2485:如果认证失败,则给from发送失败消息并返回????
- 2490-2498:
- 2505:如果是第一次subscribe该framework
- 2506-2517:检查该framework是否重复subscribe,若是,则发送消息并返回
- 2521-2524:产生新的Framework对象,和新的Framework ID
- 2526:调用addFramework增加该新的Framework
- 2528-2533:发送消息。。。。。。并返回
- 2537:确认已经赋予了新的Framework ID,以下的代码都属于处理reregister
- 2539:若包含本id?????
- 2546-2547:得到要再注册的framework
- 2550-2559:若framework->pid是from,且force标志为0,这种情况是不允许的???发送失败消息并返回
- 2564-2569:更新framework相关信息,包括时间、allocator存放的framework ID
- 2571-2578:若设置了force标志,调用failoverFramework
- 2587-2594:否则移除framework所有的offer
- 2597-2608:还有移除所有反向offer????
- 2616-2625:重新激活该framework,并发送消息(给谁发???)
- 返回
- 2627:找不到对应此ID的framework,可能的原因是有一个新选举出来的master
- 2632:新建一个Framework对象
- 2635-2643:搜索所有的slave节点,将每个节点上该ID对应的所有task和executor加入该framework(注意这里的ID指的是再注册用的ID,而framework已经是新建的)
- 2648:调用addFramework增加该新的Framework
- 2654-2657:发送消息
- 2667-2672:将新的Framework ID广播到所有的slaves