各种消息处理

每种消息对应的回调函数

消息 回调函数 参数
SlaveRegisteredMessage Slave::registered slave_id
connection
SlaveReregisteredMessage Slave::reregistered slave_id
reconciliations
connection
RunTaskMessage Slave::runTask framework
framework_id
pid
task
KillTaskMessage Slave::killTask
ShutdownExecutorMessage Slave::shutdownExecutor framework_id
executor_id
ShutdownFrameworkMessage Slave::shutdownFramework framework_id
FrameworkToExecutorMessage Slave::schedulerMessage slave_id
framework_id
executor_id
data
UpdateFrameworkMessage Slave::updateFramework framework_id
pid
CheckpointResourcesMessage Slave::checkpointResources resources
StatusUpdateAcknowledgementMessage Slave::statusUpdateAcknowledgement slave_id
framework_id
task_id
uuid
RegisterExecutorMessage Slave::registerExecutor framework_id
executor_id
ReregisterExecutorMessage Slave::reregisterExecutor framework_id
executor_id
tasks
updates
StatusUpdateMessage Slave::statusUpdate update
pid
ExecutorToFrameworkMessage Slave::executorMessage slave_id
framework_id
executor_id
data
ShutdownMessage Slave::shutdown message
PingSlaveMessage Slave::ping connected

Slave::runTask

参数:

  • from
  • frameworkInfo
  • frameworkId_
  • pid
  • task

  • 1479-1484:忽略不是来自master的消息

  • 1486-1490:忽略没有ID的framework的消息
  • 1493:得到frameworkId
  • 1498-1503:如果task记录的slave ID不符合,报错返回?????
  • 1505-1507:检查目前slave的状态必须是以下状态之一:RECOVERING/DISCONNECTED/RUNNING/TERMINATING
  • 1510-1517:如果是RECOVERING/TERMINATING状态,忽略该消息
  • 1523:得到framework
  • 1524-1563:如果framework为空(表示是首次创建??)。。。。。。
    • 1526-1537:得到framework的工作目录和元数据目录,目录名称格式见agent概述,并调用unschedule,见GC。。。。。。
    • 1539-1543:新建framework UPID,设置这个UPID为空,因为我们将通过HTTP通信
    • 1545-1548:根据frameworkInfo新建Framework对象,登记该对象在本Slave对象的frameworks变量中,若需要,则调用检查点
    • 1555-1563:查找completedFrameworks队列,若其中有与本framework ID相等的,将该项的所有completedExecutors移到本framework中,并从completedFrameworks队列移除它。。。。。
  • 1566-1567:得到任务对应的executor信息
  • 1569-1573:如果有相关钩子,调用HookManager::slaveRunTaskLabelDecorator????
  • 1579:将task加入framework的pending中
  • 1583:得到framework的executor对象
  • 1584-1599:如果executor为空(第一次调用时为空),获得executor的工作目录,调用unschedule(最终调用gc->unschedule,见1466行)
  • 1602-1603:unschedule完成后运行_runTask

Slave::_runTask

参数:

  • future
  • frameworkInfo
  • task

  • 1612:得到frameworkId

  • 1617-1623:得到framework对象,若对象不存在则报错返回
  • 1625-1626:得到任务对应的executor信息
  • 1628-1629:如果framework->pending含有executor及task
    • 将task从framework->pending中移除
    • 或framework->pending中executorId对应的task列表已空,则将executorId也移除(一个executor可以有多个task)
  • 1644-1649:否则,报错返回
  • 1653-1662:若framework的状态是TERMINATING,则报错,并且若framework中已经没有executor,也没有任何pending的内容,则移除framework
  • 1667-1695:若future还不可用,发送一个错误。。。。。
  • 1704-1769:检查checkpoint情况。。。。。。。。
  • 1773-1774:检查目前slave的状态必须是以下状态之一:DISCONNECTED/RUNNING/TERMINATING
  • 1776-1790:若状态是TERMINATING,报错并返回
  • 1792:检查framework状态必须是RUNNING
  • 1796-1800:得到executor
  • 1804:根据executor的状态:
    • 1805-1831:TERMINATING/TERMINATED,发送TASK_LOST的状态更新消息
    • 1833-1844:REGISTERING,先执行检查点,再将该task在executor排队
    • 1845-1878:RUNNING
      • 1847-1849:先执行检查点
      • 1856:将该task在executor排队
      • 1862-1868:更新资源总和???

Slave::shutdownFramework

参数:

  • from
  • frameworkId

流程:

  • 2210-2216:from必须等于master
  • 2225-2229:当前状态必须是RECOVERING/DISCONNECTED/RUNNING/TERMINATING中的一个
  • 2225-2229:若状态是RECOVERING/DISCONNECTED,则忽略该消息
  • 2231-2235:确保能够通过frameworkId获得framework对象
  • 2238-2241:若framework状态是TERMINATING,则忽略该消息
  • 2245:将framework状态设置为TERMINATING
  • 2250-2251:对framework中的每一个executor进行以下处理:
    • 2252-2256:检查executor状态必须是REGISTERING/RUNNING/TERMINATING/TERMINATED中的一个
    • 2258-2260:若状态是REGISTERING/RUNNING则调用_shutdownExecutor(framework, executor)
    • 2261-2265:若状态是TERMINATING,表明已经在处理终止,只是在等待应答,调用removeExecutor(framework, executor)从framework中移除该executor
    • 2272-2274:若framework没有executor,也没有pending的任务了,移除本framework

Slave::removeExecutor

  • 4190-4207:各种检查
  • 4211-4218:若有检查点,则。。。。。。。。。。。。
  • 4225-4230:获得Executor工作路径
  • 4232:更新该工作路径的时间
  • 4233-4234:调用垃圾收集
  • 4238-4267:。。。。。。。
  • 4269-4271:调用钩子函数HookManager::slaveRemoveExecutorHook
  • 4273:最后调用framework->destroyExecutor

Slave::_shutdownExecutor

参数:

  • framework
  • executor

流程:

  • 4404-4415:各种检查
  • 4417:将executor状态改为TERMINATING
  • 4421:发送一个ShutdownExecutorMessage消息(见src/messages/messages.proto,消息内容是executor和framework的ID),当slave收到该消息后会调用shutdownExecutor
  • 4425-4437:延迟一定的时间调用Slave::shutdownExecutorTimeout

Slave::shutdownExecutor

参数:

  • from
  • frameworkId
  • executorId

流程:

  • 4330-4396:各种检查
  • 4398:调用_shutdownExecutor

有很多地方会发送ShutdownExecutorMessage消息,若是Slave::_shutdownExecutor调用发送的这个消息,由于Slave::_shutdownExecutor之前已经将状态改为TERMINATING,该消息会被忽略,见4384-4389

Slave::shutdownExecutorTimeout

参数:

  • frameworkId
  • executorId
  • containerId

流程:

  • 4446-4473:各种检查
  • 4475-4487:只有当executor状态为TERMINATING,调用containerizer->destroy(executor->containerId)。containerizer是slave对象的私有变量,类型为Containerizer*。

results matching ""

    No results matching ""