libprocess
libprocess是mesos中非常重要的一个基础库,提供一些很方便的helper函数以及并发编程所需要的基本原语。libprocess基本是参考erlang的模式实现的,其中的actor都叫做process,每个process有一个独立的ID。
在Mesos的master节点中,每个Framework以及Slave都是一个远程的process。而slave节点上,每个executor是一个process,只不过内置的executor是在同一个进程中的,而其他自定义的executor是独立的进程,executor和slave之间通过进程间通信方式(网络端口)交互。
Mesos通过actor模型,简化了分布式系统的调用以及并发编程的复杂度,process之间都通过消息异步通信,只需要知道对方的ID即可,无需了解对方和自己是否在同一个节点上。libprocess封装的manager知道接收方是本地的process还是远程的process,如果是远程的则通过请求接口转发消息。libprocess也封装了网络层,传输层使用的是http协议,使用方对不同的消息注册不同的handler即可,也支持http的长轮询模式订阅事件。mesos为了提高消息传递解析效率,消息传递支持json和protobuf两种格式。
Libprocess+protocol buffer组合是mesos最底层最重要的消息传递基础库(没有采用RPC机制),由于该库采用了基于Protocal Buffer消息传递的通信机制),因而非常高效。Mesos常用的两个头文件是libprocess/include/process下的 process.hpp和protobuf.hpp,这两个提供了用于消息传递的API,其中process.hpp是最核心的文件,提供了原始的接口, 而protobuf.hpp是在process.hpp基础上,加入了ProtocalBuffer对象参数,使ProtocalBuffer使用起来更加容易。
在mesos中,主要有四个角色,分别是:mesos-master,mesos-slave,framework(Hadoop/Spark/MPI等)scheduler,executor(在mesos-slave上执行framework task的组件),每种角色均是一个Process,在实现时会继承libprocess中的ProtobufProcess类(它又继承了Process类),这样,它们均会编成一个后台运行且不断监听protocal buffer消息的socket server,如下图所示,方框中表示是mesos master:
LibProcess采用了基于事件驱动的编程模型,每一个服务(进程)内部实际上运行了一个socket server,而不同服务之间通过消息(事件)进行通信。在一个服务内部,注册了很多消息以及每个消息对应的处理器,一旦它收到某种类型的消息,则会调用相应的处理器进行处理,在处理过程中,可能会产生另外一种消息发送给另一个服务。
主要概念
process及PID
process可以这样用:
using namespace process;
class QueueProcess: public Process {}
int main(int argc, char **argv) {
QueueProcess process;
spawn(process);
terminate(process);
wait(process);
return 0;
}
PID实际是一个HTTP URL:
http://ip:port/name
http://ip:port/name/endpoint
https://ip:port/name/endpoint
它用来命名一个进程而不需要直接引用该进程。
class QueueProcess : public Process {
public:
QueueProcess() : ProcessBase(“queue”) {} //set name
};
int main(int argc, char **argv) {
QueueProcess process;
PID pid;
pid = spawn(process); //or pid = process.self();
teminate(pid)
wait(pid)
return 0;
}
本地消息
dispatch是异步函数调用,在include/process/dispatch.hpp中实现。
- 每个进程有一个队列存放收到的消息
- 进程有执行上下文,一个进程中同时只能有一个线程执行
- 无同步
- 无明确的接收消息循环
- 绝对禁止在进程中阻塞
- 创建/启动进程开销很低(不会分配栈、不创建线程)
- 无远程的dispatch,这个只为本地调用
promise和future
- future对象是从生产者对象或函数中得到值,如果是在另外的线程执行该动作,可以做同步
promise是存储一个类型T的值以可以被一个future对象获取(可以在另一个线程中获取该值),提供了一个同步点
调用Future
::get(),如果值还没有就绪,get()函数会一直阻塞,直到能拿到一个Socket值,然后返回 - 通过Future
::isReady()等API定期判断,一旦isReady()返回true,即可调用Future ::get(),get()函数立即返回 - 注册回调函数,告诉生产者,当值就绪之后,触发我注册的回调。Future提供多种回调方式
借助Future/Promise,生产者和消费者之间可以做到完全异步。
3rdparty/libprocess/include/process/future.hpp 84-474行有Future的定义:
基本定义是一个模板:
template <typename T>
class Future
future变量有4种状态(428行),分别是:
- PENDING表示值还没有ready,这个时候生产者还没调用promise
.set(),而消费者调用future .get()的话会处于阻塞状态 - READY表示值已经就绪,future
.get()会立即返回结果 - FAILED表示值未就绪,已被破坏。通常生产者在处理过程出现异常时会设置这个状态
- DISCARDED表示值被丢失。生产者缺少某些条件无法计算这个值
future.hpp第436-459行定义struct Data{}中的Result
callbacks
future支持非常多类型的callbacks,针对每种不同的状态都可以设置相应的callback,如struct Data{}里可以看到如下字段:
std::vector<DiscardCallback> onDiscardCallbacks;
std::vector<ReadyCallback> onReadyCallbacks;
std::vector<FailedCallback> onFailedCallbacks;
std::vector<DiscardedCallback> onDiscardedCallbacks;
std::vector<AnyCallback> onAnyCallbacks;
其中:
- 当生产者调用
promise<T>.discard()
时触发onDiscardCallbacks - 当生产者调用
promise<T>.set()
时触发onReadyCallbacks - 当生产者调用
promise<T>.fail()
时触发onFailedCallbacks - 不管生产者触发何种动作,onAnyCallbacks都会触发
libprocess里的future实现支持then()方法,允许你通过future的callbacks来实现一些高级功能。
例如,如下代码:
Future<int> second(const bool& b)
{
return b ? 1 : 0;
}
Future<string> third(const int& s)
{
return s > 0 ? "good" : "bad";
}
TEST(Process, chain)
{
Future<string> S = readyFuture()
.then(lambda::bind(&second, lambda::_1))
.then(lambda::bind(&third, lambda::_1));
string s = S.get();
}
表示readyFuture()得到一个future变量A,当A就绪时,执行second(A)并得到一个future变量B,当B就绪时,执行third(B)得到一个future值S。上面的代码等同于:
TEST(Process, chain)
{
Future<bool> A = oneFuture();
Future<int> B = second(A.get());
Future<string> S = third(B.get());
string s = S.get();
}
注意,get()
是同步的,会一直等到相应的future变量处于ready状态才返回。
例如:
template
class QueueProcess {
public:
void enqueue(T t) {promise.set(t);}
Future dequeue() {return promise.future();}
private:
Promise promise;
}
template
class Queue {
public:
Queue() {spawn(process);}
~Queue() {terminate(process); wait(process);}
void enqueue(T t) {dispatch(process, &QueueProcess::enqueue, t);}
Future dequeue() {return dispatch(process, &QueueProcess::dequeue, t);}
private:
QueueProcess process;
}
int main (int argc, char **argv) {
Queue queue;
queue.enqueue(42);
queue.dequeue()
.then([] (int i) {
//use i
});
return 0;
}
远程消息
- route设置HTTP URL的回调函数
- install定义指定消息的回调函数
- 消息采用protobuf格式
libprocess的函数
install
安装一个处理ProtocalBuffer消息的handler,其中,消息类型是M,该消息对应的处理函数是method
例如,src/master/master.cpp 732-734:
install<RegisterFrameworkMessage>(
&Master::registerFramework,
&RegisterFrameworkMessage::framework);
安装一个处理RegisterFrameworkMessage(ProtocalBuffer对象)的handler,mesos master一旦接收到该消息,便会调用Master::registerFramework函数处理, 且该函数的输入参数是RegisterFrameworkMessage的framework属性。
send
void send(const process::UPID& to, const google::protobuf::Message& message)
向某个UPID上发送消息,其中UPID代表一个socket,里面含有ip和port信息,而消息message是ProtocalBuffer定义的对象。
dispatch
void dispatch(const UPID& pid,
const std::tr1::shared_ptr >& f)
执行进程pid中的函数f,为了提高效率,该函数并不会等到函数f执行完成,而是采用了异步的方法:将函数f放入一个函数队列,由另外一个进程(或者多个)不断从队列中获取函数,依次执行。
delay
Timer delay(double secs,const PID& pid,void (T::*method)())
延迟secs秒调度进程pid中的方法method,并返回一个计数器,通过这个计时器,可取消该调度。
subprocess
参数:
- path
- argv
- in
- out
- err
- set_sid
- flags
- environment
- _clone
- parent_hooks
- working_directory
- watchdog
功能:exec一个进程。
Owned
Owned是为了访问数据时的原子性,定义在3rdparty/libprocess/include/process/owned.hpp
Owned实现主要是用到了std::shared_ptr。
智能指针
C++11实现了智能指针,是用来模拟传统指针、提供一些附加功能的特殊类型。常见功能是能够通过自动垃圾回收来管理存储。
C++能够使用new和delete来动态分配、释放对象,如果分配之后,无人释放,也无人引用这块空间,就产生了内存泄漏。例如,这样的程序:
void MemoryAlloc()
{
int* a = new int(0);
}
但有的程序很难确定在哪里释放分配的空间。例如,这样的程序:
class DataGenerator
{
public:
int* GetData(){new int(1)};
};
空间是在对象内部函数GetData分配的,对象外部可能使用,此时很难确定应该是在对象内部还是外部释放。
C++为了解决这类问题,引入了智能指针概念,有三种不同的智能指针:
- unique_ptr:确保空间只会被一个unique_ptr对象管理,当该对象消息,就会自动释放资源
- shared_ptr:可以有多个shared_ptr共享空间,内部会记录被引用的次数,只要还有shared_ptr对象存在,资源不会被释放;当所有对象都消失,资源会自动释放
- weak_ptr:搭配shared_ptr使用,和shared_ptr不同点在于weak_ptr不会影响资源被使用的次数,也就是说weak_ptr存在与否不代表资源会不会被释放。
shared_ptr由于资源是可以共用的,也就可以透過 operator= 等方法,來分享 shared_ptr 所使用的資源。例如:
{
shared_ptr<int> a; // a is empty
{
shared_ptr<int> b( new int( 10 ) ); // allocate resource
a = b; // reference counter: 2
{
shared_ptr<int> c = a; // reference counter: 3
*c = 100;
} // c dead, reference counter: 2
} // b dead, reference counter: 1
cout << *a << endl;
} // release resource
如果要强制释放智能指针资源,可以调用其reset函数。
vector<int>* pVec = new vector<int>();
pVec->push_back( 1 );
cout << pVec->size() << endl;
delete pVec;
可以改成使用unique_ptr:
unique_ptr< vector<int> > pVec( new vector<int>() );
pVec->push_back( 1 );
cout << pVec->size() << endl;
前面的例子可以改成shared_ptr实现
shared_ptr<int> MemoryAlloc()
{
shared_ptr<int> a( new int(0) );
return a;
}