libprocess

libprocess是mesos中非常重要的一个基础库,提供一些很方便的helper函数以及并发编程所需要的基本原语。libprocess基本是参考erlang的模式实现的,其中的actor都叫做process,每个process有一个独立的ID。

在Mesos的master节点中,每个Framework以及Slave都是一个远程的process。而slave节点上,每个executor是一个process,只不过内置的executor是在同一个进程中的,而其他自定义的executor是独立的进程,executor和slave之间通过进程间通信方式(网络端口)交互。

Mesos通过actor模型,简化了分布式系统的调用以及并发编程的复杂度,process之间都通过消息异步通信,只需要知道对方的ID即可,无需了解对方和自己是否在同一个节点上。libprocess封装的manager知道接收方是本地的process还是远程的process,如果是远程的则通过请求接口转发消息。libprocess也封装了网络层,传输层使用的是http协议,使用方对不同的消息注册不同的handler即可,也支持http的长轮询模式订阅事件。mesos为了提高消息传递解析效率,消息传递支持json和protobuf两种格式。

Libprocess+protocol buffer组合是mesos最底层最重要的消息传递基础库(没有采用RPC机制),由于该库采用了基于Protocal Buffer消息传递的通信机制),因而非常高效。Mesos常用的两个头文件是libprocess/include/process下的 process.hpp和protobuf.hpp,这两个提供了用于消息传递的API,其中process.hpp是最核心的文件,提供了原始的接口, 而protobuf.hpp是在process.hpp基础上,加入了ProtocalBuffer对象参数,使ProtocalBuffer使用起来更加容易。

在mesos中,主要有四个角色,分别是:mesos-master,mesos-slave,framework(Hadoop/Spark/MPI等)scheduler,executor(在mesos-slave上执行framework task的组件),每种角色均是一个Process,在实现时会继承libprocess中的ProtobufProcess类(它又继承了Process类),这样,它们均会编成一个后台运行且不断监听protocal buffer消息的socket server,如下图所示,方框中表示是mesos master:

LibProcess采用了基于事件驱动的编程模型,每一个服务(进程)内部实际上运行了一个socket server,而不同服务之间通过消息(事件)进行通信。在一个服务内部,注册了很多消息以及每个消息对应的处理器,一旦它收到某种类型的消息,则会调用相应的处理器进行处理,在处理过程中,可能会产生另外一种消息发送给另一个服务。

主要概念

process及PID

process可以这样用:

using namespace    process;

class QueueProcess: public Process {}

int main(int argc, char **argv) {
  QueueProcess process;
  spawn(process);
  terminate(process);
  wait(process);
  return 0;
}

PID实际是一个HTTP URL:

http://ip:port/name    

http://ip:port/name/endpoint    

https://ip:port/name/endpoint

它用来命名一个进程而不需要直接引用该进程。

class QueueProcess : public Process {
public:          
  QueueProcess() : ProcessBase(“queue”) {}    //set name
};    
int main(int argc, char **argv) {
  QueueProcess process;
  PID    pid;

  pid = spawn(process); //or pid = process.self();
  teminate(pid)
  wait(pid)
  return 0;
}

本地消息

dispatch是异步函数调用,在include/process/dispatch.hpp中实现。

  • 每个进程有一个队列存放收到的消息
  • 进程有执行上下文,一个进程中同时只能有一个线程执行
    • 无同步
    • 无明确的接收消息循环
    • 绝对禁止在进程中阻塞
    • 创建/启动进程开销很低(不会分配栈、不创建线程)
  • 无远程的dispatch,这个只为本地调用

promise和future

  • future对象是从生产者对象或函数中得到值,如果是在另外的线程执行该动作,可以做同步
  • promise是存储一个类型T的值以可以被一个future对象获取(可以在另一个线程中获取该值),提供了一个同步点

  • 调用Future::get(),如果值还没有就绪,get()函数会一直阻塞,直到能拿到一个Socket值,然后返回

  • 通过Future::isReady()等API定期判断,一旦isReady()返回true,即可调用Future::get(),get()函数立即返回
  • 注册回调函数,告诉生产者,当值就绪之后,触发我注册的回调。Future提供多种回调方式

借助Future/Promise,生产者和消费者之间可以做到完全异步。

3rdparty/libprocess/include/process/future.hpp 84-474行有Future的定义:

基本定义是一个模板:

template <typename T>
class Future

future变量有4种状态(428行),分别是:

  • PENDING表示值还没有ready,这个时候生产者还没调用promise.set(),而消费者调用future.get()的话会处于阻塞状态
  • READY表示值已经就绪,future.get()会立即返回结果
  • FAILED表示值未就绪,已被破坏。通常生产者在处理过程出现异常时会设置这个状态
  • DISCARDED表示值被丢失。生产者缺少某些条件无法计算这个值

future.hpp第436-459行定义struct Data{}中的Result result用来保存数据,生产者通过promise.set()将值保存到这里,然后触发相应的回调函数,消费者通过future.get()从这里读取

callbacks

future支持非常多类型的callbacks,针对每种不同的状态都可以设置相应的callback,如struct Data{}里可以看到如下字段:

std::vector<DiscardCallback> onDiscardCallbacks;
std::vector<ReadyCallback> onReadyCallbacks;
std::vector<FailedCallback> onFailedCallbacks;
std::vector<DiscardedCallback> onDiscardedCallbacks;
std::vector<AnyCallback> onAnyCallbacks;

其中:

  • 当生产者调用promise<T>.discard()时触发onDiscardCallbacks
  • 当生产者调用promise<T>.set()时触发onReadyCallbacks
  • 当生产者调用promise<T>.fail()时触发onFailedCallbacks
  • 不管生产者触发何种动作,onAnyCallbacks都会触发

libprocess里的future实现支持then()方法,允许你通过future的callbacks来实现一些高级功能。

例如,如下代码:

Future<int> second(const bool& b)
{
  return b ? 1 : 0;
}

Future<string> third(const int& s)
{
  return s > 0 ? "good" : "bad";
}

TEST(Process, chain)
{
  Future<string> S = readyFuture()
    .then(lambda::bind(&second, lambda::_1))
    .then(lambda::bind(&third, lambda::_1));

  string s = S.get();
}

表示readyFuture()得到一个future变量A,当A就绪时,执行second(A)并得到一个future变量B,当B就绪时,执行third(B)得到一个future值S。上面的代码等同于:

TEST(Process, chain)
{
  Future<bool> A = oneFuture();
  Future<int> B = second(A.get());
  Future<string> S = third(B.get());

  string s = S.get();
}

注意,get()是同步的,会一直等到相应的future变量处于ready状态才返回。

例如:

template 
class QueueProcess {
public:
  void enqueue(T t) {promise.set(t);}
  Future dequeue() {return promise.future();}
private:
  Promise promise;
}

template 
class Queue {
public:
  Queue() {spawn(process);}
  ~Queue() {terminate(process); wait(process);}
  void enqueue(T t) {dispatch(process, &QueueProcess::enqueue, t);}
  Future dequeue() {return dispatch(process, &QueueProcess::dequeue, t);}
private:
  QueueProcess process;
}

int main (int argc, char **argv) {
  Queue queue;
  queue.enqueue(42);
  queue.dequeue()
    .then([] (int i) {
    //use i
    });
  return 0;  
}

远程消息

  • route设置HTTP URL的回调函数
  • install定义指定消息的回调函数
  • 消息采用protobuf格式

libprocess的函数

install

安装一个处理ProtocalBuffer消息的handler,其中,消息类型是M,该消息对应的处理函数是method

例如,src/master/master.cpp 732-734:

  install<RegisterFrameworkMessage>(
      &Master::registerFramework,
      &RegisterFrameworkMessage::framework);

安装一个处理RegisterFrameworkMessage(ProtocalBuffer对象)的handler,mesos master一旦接收到该消息,便会调用Master::registerFramework函数处理, 且该函数的输入参数是RegisterFrameworkMessage的framework属性。

send

void send(const process::UPID& to, const google::protobuf::Message& message)

向某个UPID上发送消息,其中UPID代表一个socket,里面含有ip和port信息,而消息message是ProtocalBuffer定义的对象。

dispatch

void dispatch(const UPID& pid, 
  const std::tr1::shared_ptr >& f)

执行进程pid中的函数f,为了提高效率,该函数并不会等到函数f执行完成,而是采用了异步的方法:将函数f放入一个函数队列,由另外一个进程(或者多个)不断从队列中获取函数,依次执行。

delay

Timer delay(double secs,const PID& pid,void (T::*method)())

延迟secs秒调度进程pid中的方法method,并返回一个计数器,通过这个计时器,可取消该调度。

subprocess

参数:

  • path
  • argv
  • in
  • out
  • err
  • set_sid
  • flags
  • environment
  • _clone
  • parent_hooks
  • working_directory
  • watchdog

功能:exec一个进程。

Owned

Owned是为了访问数据时的原子性,定义在3rdparty/libprocess/include/process/owned.hpp

Owned实现主要是用到了std::shared_ptr。

智能指针

C++11实现了智能指针,是用来模拟传统指针、提供一些附加功能的特殊类型。常见功能是能够通过自动垃圾回收来管理存储。

C++能够使用new和delete来动态分配、释放对象,如果分配之后,无人释放,也无人引用这块空间,就产生了内存泄漏。例如,这样的程序:

void MemoryAlloc()
{
  int* a = new int(0);
}

但有的程序很难确定在哪里释放分配的空间。例如,这样的程序:

class DataGenerator
{
public:
  int* GetData(){new int(1)};
};

空间是在对象内部函数GetData分配的,对象外部可能使用,此时很难确定应该是在对象内部还是外部释放。

C++为了解决这类问题,引入了智能指针概念,有三种不同的智能指针:

  • unique_ptr:确保空间只会被一个unique_ptr对象管理,当该对象消息,就会自动释放资源
  • shared_ptr:可以有多个shared_ptr共享空间,内部会记录被引用的次数,只要还有shared_ptr对象存在,资源不会被释放;当所有对象都消失,资源会自动释放
  • weak_ptr:搭配shared_ptr使用,和shared_ptr不同点在于weak_ptr不会影响资源被使用的次数,也就是说weak_ptr存在与否不代表资源会不会被释放。

shared_ptr由于资源是可以共用的,也就可以透過 operator= 等方法,來分享 shared_ptr 所使用的資源。例如:

{
  shared_ptr<int> a;  // a is empty
  {
    shared_ptr<int> b( new int( 10 ) );  // allocate resource
    a = b;  // reference counter: 2
    {
      shared_ptr<int> c = a;  // reference counter: 3
      *c = 100;
    }  // c dead, reference counter: 2
  } // b dead, reference counter: 1
  cout << *a << endl;
}  // release resource

如果要强制释放智能指针资源,可以调用其reset函数。

vector<int>* pVec = new vector<int>();
pVec->push_back( 1 );
cout << pVec->size() << endl;
delete pVec;

可以改成使用unique_ptr:

unique_ptr< vector<int> > pVec( new vector<int>() );
pVec->push_back( 1 );
cout << pVec->size() << endl;

前面的例子可以改成shared_ptr实现

shared_ptr<int> MemoryAlloc()
{
  shared_ptr<int> a( new int(0) );
  return a;
}

results matching ""

    No results matching ""