消息处理

Master收到的各种消息的回调函数对应关系

消息 回调函数 参数
SubmitSchedulerRequest Master::submitScheduler name
RegisterFrameworkMessage Master::registerFramework framework
ReregisterFrameworkMessage Master::reregisterFramework framework,
failover
UnregisterFrameworkMessage Master::unregisterFramework framework_id
DeactivateFrameworkMessage Master::deactivateFramework framework_id
ResourceRequestMessage Master::resourceRequest framework_id
requests
LaunchTasksMessage Master::launchTasks framework_id
tasks
filters
offer_ids
ReviveOffersMessage Master::reviveOffers framework_id
KillTaskMessage Master::killTask framework_id
task_id
StatusUpdateAcknowledgementMessage Master::statusUpdateAcknowledgement slave_id
framework_id
task_id
uuid
FrameworkToExecutorMessage Master::schedulerMessage slave_id
framework_id
executor_id
data
RegisterSlaveMessage Master::registerSlave slave
checkpointed_resources
version
ReregisterSlaveMessage Master::reregisterSlave slave
checkpointed_resources
executor_infos
tasks
frameworks
completed_frameworks
version
UnregisterSlaveMessage Master::unregisterSlave slave_id
StatusUpdateMessage Master::statusUpdate update
pid
ExecutorToFrameworkMessage Master::executorMessage slave_id
framework_id
executor_id
data
ReconcileTasksMessage Master::reconcileTasks framework_id
statuses
ExitedExecutorMessage Master::exitedExecutor slave_id
framework_id
executor_id
status
UpdateSlaveMessage Master::updateSlave slave_id
oversubscribed_resources
AuthenticateMessage Master::authenticate pid

Master::registerFramework

收到RegisterFrameworkMessage时调用

  • 2085-2096:
  • 2098-2101:从参数frameworkInfo拷贝信息到call,subscribe(from, call),参见Master::subscribe

Master::unregisterFramework

收到UnregisterFrameworkMessage时调用

若参数正确(framework->pid == from),调用teardown(framework)

Master::deactivateFramework

收到DeactivateFrameworkMessage时调用

  • 2701-2715:判断参数有无错误,有错则输出警告信息并返回
  • 2717:deactivate(framework)

Master::disconnect

Master::deactivate

Master::resourceRequest

收到ResourceRequestMessage时调用。什么情况下发出这种消息????????????

  • 2839-2851:判断参数合法性
  • 2853-2858:调用Master::request

Master::request

  • 2868:输出信息
  • 2870:更新统计信息
  • 2872-2874:调用allocator->requestResources,具体见allocator

Master::launchTasks

  • 2907-2925:判断参数合法性
  • 2929:如果tasks不为空
    • 2930-2934:设置操作为LAUNCH
    • 2944:调用accept
  • 2945-2953:否则调用decline

Master::accept

参数:

  • framework
  • accept

  • 3249-3259:更新统计信息

  • 3267-3268:offer内容为空,报错
  • 3271:检查offer有效性?????
  • 3275:对每个offer
    • 3276:获得offer
    • 3278:得到slaveId
    • 3279:计算资源的总和
    • 3281-3287:若有错误,恢复资源。。。。。。
    • 3288:removeOffer
  • 3304-3333:若有错误,。。。。。。。。。
  • 3338-3340:确保有slave ID,并得到Slave对象
  • 3345:声明一个futures向量
  • 3346:针对accept.operations中的每一个operation
    • 3347:根据operation类型
      • 3348-3366:LAUNCH,将授权过的task/framework加入futures,将task加入framework的pendingTasks队列
      • 3375-3385:RESERVE,将授权的操作加入futures
      • 3388-3397:UNRESERVE,将授权的操作加入futures
      • 3401-3410:CREATE,将授权的操作加入futures
      • 3414-3424:DESTROY,将授权的操作加入futures
  • 3429-3426:根据futures处理,调用_accept.......

Master::_accept

参数:

  • frameworkId
  • slaveId
  • offeredResources
  • accept
  • _authorizations

  • 3447:得到framework

  • 3451-3464:若有错误,恢复资源。。。。。。
  • 3466:根据ID获得Slave对象
  • 3468-3507:若slave不存在或是未连接,发送一个UPDATE消息。。。并恢复资源。。。。
  • 3516-3517:检查对操作的授权是否已经完成
  • 3519:对每一个操作:
    • 3520:根据操作类型
      • 3522-3575:RESERVE。。。。。。。
      • 3578-3627:UNRESERVE。。。。。。
      • 3629-3681:CREATE。。。。。
      • 3683-3731:DESTROY。。。。。。。
      • 3733-3734:LAUNCH,对launch操作中的每个task
        • 3735-3736:从队列最前面???取认证信息
        • 3744:task是否在pendingTasks队列?记录在pending中
        • 3747:将task从pendingTasks队列移走
        • 3749-3781:检查认证信息是否依然有效,若已经无效,则发一个认证失败的状态更新信息,并跳到下一次循环,处理下一个操作
        • 3790-3823:调用validation::task::validate检查任务是否有效??若已经无效,则发一个TASK_ERROR的状态更新信息,并跳到下一次循环,处理下一个操作
        • 3826-3855:若pending,则在framework中增加任务(调用addTask),并将offer的资源减去该任务消耗的资源;给slave发RunTaskMessage消息?调用HookManager???????(这里是和label有关???)
  • 3866-3873:完成所有动作后,最后检查,若offer提供的资源还未消耗完,则调用allocator->recoverResources恢复这些资源。

RunTaskMessage在src/messages/messages.proto中定义。

Master 调度相关消息

subscribe见include/mesos/v1/scheduler/scheduler.proto

包括以下事件类型消息(Event):

  • Subscribed(当提交scheduler时master收到的第一个消息?????????)
  • Offers(当有新的资源offer给scheduler时)
  • InverseOffers(当scheduler将资源要回去时??每个inverse offer定义了agent及可选的指定资源,如果该agent上的所有资源都要求被释放,则不指定资源)
  • Rescind:
  • RescindInverseOffer
  • Update:当executor/agent/master产生了状态的更新时,应该由executor使用以与它们管理的task可靠地通知状态。当任务终结时,executor要立刻发送一个更新(v1/mesos.proto)是很关键的,以使得mesos能够及时释放资源。scheduler也要负责应答这种消息。
  • Message:非预设消息,mesos不解释这种消息,只是转发给scheduler且不保证可靠性,如果消息丢失executor必须重传
  • Failure:agent移出mesos集群时
  • Error:当scheduler发生不可恢复的错误时
  • HEARTBEAT

调用类型消息(Call):

  • Subscribe
  • Accept
  • Decline
  • AcceptInverseOffers
  • DeclineInverseOffers
  • Kill
  • Shutdown
  • Acknowledge
  • Reconcile
  • Message
  • Request
  • TEARDOWN
  • SUPPRESS

Master::subscribe的处理

在master.cpp中有两个subscribe,处理不同的情景,一种是处理HTTP来的请求,另一种是?????

HTTP方式

Master::subscribe参数:

  • http
  • subscribe

  • 2137:得到frameworkInfo

  • 2140-2144:若frameworkInfo没有id,则属于注册framework,否则是再注册framework,分别将相关计数加1
  • 2149-2195:判断有效性。。。。。。
  • 2198-2210:调用_subscribe

Master::_subscribe

参数:

  • http
  • frameworkInfo
  • force
  • authorized

  • 2220-2242:判断授权信息

  • 2249-2268:如果是第一次subscribe该framework,则新建framework,发送应答消息,调用heartbeat,最后返回
  • 2273-2289:frameworks中有该frameworks的ID,表明已经注册过,此次是重新注册,更新相关信息(allocator也要更新相关信息),记录下再注册的时间,最后调用failoverFramework
  • 2289:frameworks中找不到该frameworks的ID,原因只能是有了新的master leader
    • 2294:创建新的Framework对象
    • 2297-2305:搜索所有slave,将该framework对应的task和executor再加入到新的framework中
    • 2310:将新创建的framework加入master
    • 2312-2319:发送FrameworkReregisteredMessage应答消息,调用heartbeat
  • 2328-2337:给每个slave发UpdateFrameworkMessage消息

另一种方式

谁调用的????

Master::subscribe

收到Subscribe消息时被调用???

参数:

  • const UPID& from
  • const scheduler::Call::Subscribe& subscribe

  • 2345:从subscribe得到frameworkInfo

  • 2348-2352:若frameworkInfo没有id,则属于注册framework,否则是再注册framework,分别将相关计数加1
  • 2354-2371:若from属于正在进行认证,则等待认证结束后返回?????
  • 2375-2422:判断认证是否有效,若失败,则给from发送失败消息并返回????
  • 2441-2453:调用_subscribe

Master::_subscribe

  • 2467-2485:如果认证失败,则给from发送失败消息并返回????
  • 2490-2498:
  • 2505:如果是第一次subscribe该framework
    • 2506-2517:检查该framework是否重复subscribe,若是,则发送消息并返回
    • 2521-2524:产生新的Framework对象,和新的Framework ID
    • 2526:调用addFramework增加该新的Framework
    • 2528-2533:发送消息。。。。。。并返回
  • 2537:确认已经赋予了新的Framework ID,以下的代码都属于处理reregister
  • 2539:若包含本id?????
    • 2546-2547:得到要再注册的framework
    • 2550-2559:若framework->pid是from,且force标志为0,这种情况是不允许的???发送失败消息并返回
    • 2564-2569:更新framework相关信息,包括时间、allocator存放的framework ID
    • 2571-2578:若设置了force标志,调用failoverFramework
    • 2587-2594:否则移除framework所有的offer
    • 2597-2608:还有移除所有反向offer????
    • 2616-2625:重新激活该framework,并发送消息(给谁发???)
    • 返回
  • 2627:找不到对应此ID的framework,可能的原因是有一个新选举出来的master
    • 2632:新建一个Framework对象
    • 2635-2643:搜索所有的slave节点,将每个节点上该ID对应的所有task和executor加入该framework(注意这里的ID指的是再注册用的ID,而framework已经是新建的)
    • 2648:调用addFramework增加该新的Framework
    • 2654-2657:发送消息
  • 2667-2672:将新的Framework ID广播到所有的slaves

http://masterip:masterport的HTTP请求的处理函数

results matching ""

    No results matching ""