各种消息处理
每种消息对应的回调函数
消息 | 回调函数 | 参数 |
---|---|---|
SlaveRegisteredMessage | Slave::registered | slave_id connection |
SlaveReregisteredMessage | Slave::reregistered | slave_id reconciliations connection |
RunTaskMessage | Slave::runTask | framework framework_id pid task |
KillTaskMessage | Slave::killTask | |
ShutdownExecutorMessage | Slave::shutdownExecutor | framework_id executor_id |
ShutdownFrameworkMessage | Slave::shutdownFramework | framework_id |
FrameworkToExecutorMessage | Slave::schedulerMessage | slave_id framework_id executor_id data |
UpdateFrameworkMessage | Slave::updateFramework | framework_id pid |
CheckpointResourcesMessage | Slave::checkpointResources | resources |
StatusUpdateAcknowledgementMessage | Slave::statusUpdateAcknowledgement | slave_id framework_id task_id uuid |
RegisterExecutorMessage | Slave::registerExecutor | framework_id executor_id |
ReregisterExecutorMessage | Slave::reregisterExecutor | framework_id executor_id tasks updates |
StatusUpdateMessage | Slave::statusUpdate | update pid |
ExecutorToFrameworkMessage | Slave::executorMessage | slave_id framework_id executor_id data |
ShutdownMessage | Slave::shutdown | message |
PingSlaveMessage | Slave::ping | connected |
Slave::runTask
参数:
- from
- frameworkInfo
- frameworkId_
- pid
task
1479-1484:忽略不是来自master的消息
- 1486-1490:忽略没有ID的framework的消息
- 1493:得到frameworkId
- 1498-1503:如果task记录的slave ID不符合,报错返回?????
- 1505-1507:检查目前slave的状态必须是以下状态之一:RECOVERING/DISCONNECTED/RUNNING/TERMINATING
- 1510-1517:如果是RECOVERING/TERMINATING状态,忽略该消息
- 1523:得到framework
- 1524-1563:如果framework为空(表示是首次创建??)。。。。。。
- 1526-1537:得到framework的工作目录和元数据目录,目录名称格式见agent概述,并调用unschedule,见GC。。。。。。
- 1539-1543:新建framework UPID,设置这个UPID为空,因为我们将通过HTTP通信
- 1545-1548:根据frameworkInfo新建Framework对象,登记该对象在本Slave对象的frameworks变量中,若需要,则调用检查点
- 1555-1563:查找completedFrameworks队列,若其中有与本framework ID相等的,将该项的所有completedExecutors移到本framework中,并从completedFrameworks队列移除它。。。。。
- 1566-1567:得到任务对应的executor信息
- 1569-1573:如果有相关钩子,调用HookManager::slaveRunTaskLabelDecorator????
- 1579:将task加入framework的pending中
- 1583:得到framework的executor对象
- 1584-1599:如果executor为空(第一次调用时为空),获得executor的工作目录,调用unschedule(最终调用gc->unschedule,见1466行)
- 1602-1603:unschedule完成后运行_runTask
Slave::_runTask
参数:
- future
- frameworkInfo
task
1612:得到frameworkId
- 1617-1623:得到framework对象,若对象不存在则报错返回
- 1625-1626:得到任务对应的executor信息
- 1628-1629:如果framework->pending含有executor及task
- 将task从framework->pending中移除
- 或framework->pending中executorId对应的task列表已空,则将executorId也移除(一个executor可以有多个task)
- 1644-1649:否则,报错返回
- 1653-1662:若framework的状态是TERMINATING,则报错,并且若framework中已经没有executor,也没有任何pending的内容,则移除framework
- 1667-1695:若future还不可用,发送一个错误。。。。。
- 1704-1769:检查checkpoint情况。。。。。。。。
- 1773-1774:检查目前slave的状态必须是以下状态之一:DISCONNECTED/RUNNING/TERMINATING
- 1776-1790:若状态是TERMINATING,报错并返回
- 1792:检查framework状态必须是RUNNING
- 1796-1800:得到executor
- 1804:根据executor的状态:
- 1805-1831:TERMINATING/TERMINATED,发送TASK_LOST的状态更新消息
- 1833-1844:REGISTERING,先执行检查点,再将该task在executor排队
- 1845-1878:RUNNING
- 1847-1849:先执行检查点
- 1856:将该task在executor排队
- 1862-1868:更新资源总和???
Slave::shutdownFramework
参数:
- from
- frameworkId
流程:
- 2210-2216:from必须等于master
- 2225-2229:当前状态必须是RECOVERING/DISCONNECTED/RUNNING/TERMINATING中的一个
- 2225-2229:若状态是RECOVERING/DISCONNECTED,则忽略该消息
- 2231-2235:确保能够通过frameworkId获得framework对象
- 2238-2241:若framework状态是TERMINATING,则忽略该消息
- 2245:将framework状态设置为TERMINATING
- 2250-2251:对framework中的每一个executor进行以下处理:
- 2252-2256:检查executor状态必须是REGISTERING/RUNNING/TERMINATING/TERMINATED中的一个
- 2258-2260:若状态是REGISTERING/RUNNING则调用_shutdownExecutor(framework, executor)
- 2261-2265:若状态是TERMINATING,表明已经在处理终止,只是在等待应答,调用removeExecutor(framework, executor)从framework中移除该executor
- 2272-2274:若framework没有executor,也没有pending的任务了,移除本framework
Slave::removeExecutor
- 4190-4207:各种检查
- 4211-4218:若有检查点,则。。。。。。。。。。。。
- 4225-4230:获得Executor工作路径
- 4232:更新该工作路径的时间
- 4233-4234:调用垃圾收集
- 4238-4267:。。。。。。。
- 4269-4271:调用钩子函数HookManager::slaveRemoveExecutorHook
- 4273:最后调用framework->destroyExecutor
Slave::_shutdownExecutor
参数:
- framework
- executor
流程:
- 4404-4415:各种检查
- 4417:将executor状态改为TERMINATING
- 4421:发送一个ShutdownExecutorMessage消息(见src/messages/messages.proto,消息内容是executor和framework的ID),当slave收到该消息后会调用shutdownExecutor
- 4425-4437:延迟一定的时间调用Slave::shutdownExecutorTimeout
Slave::shutdownExecutor
参数:
- from
- frameworkId
- executorId
流程:
- 4330-4396:各种检查
- 4398:调用_shutdownExecutor
有很多地方会发送ShutdownExecutorMessage消息,若是Slave::_shutdownExecutor调用发送的这个消息,由于Slave::_shutdownExecutor之前已经将状态改为TERMINATING,该消息会被忽略,见4384-4389
Slave::shutdownExecutorTimeout
参数:
- frameworkId
- executorId
- containerId
流程:
- 4446-4473:各种检查
- 4475-4487:只有当executor状态为TERMINATING,调用containerizer->destroy(executor->containerId)。containerizer是slave对象的私有变量,类型为Containerizer*。