acl_cpp 非阻塞模块的IPC通信机制
在 acl_cpp 的非阻塞框架的设计中,充分利用了操作系统平台的高并发机制,同时简化了异步编程的过程。但是,并不是所有的操作都是非阻塞的,现实的程序应用中存在着大量的阻塞式行为,acl_cpp 的非阻塞框架中设计了一种通过 ipc 模式使阻塞式函数与 acl_cpp 的非阻塞过程相结合的机制。即是说,在 acl_cpp 的主线程是非阻塞的,而把阻塞过程放在单独的一个线程中运行,当阻塞线程运算完毕,会以 ipc 方式通知主线程运行结果,这样就达到了阻塞与非阻塞相结合的模式。下图展示了 ipc 类的继承关系:
以上图中有两个类与 ipc 相关:ipc_server 及 ipc_client,其中 ipc_server 其实用于与监听异步流相关,而ipc_client 与客户端连接流相关。在 ipc_server 类中有一个 open 函数用来打开本地监听地址,另有三个虚函数便于子类进行相关操作:
1)on_accept:当监听流获得一个客户端连接时回调此函数,将获得的客户端流传递给子类对象;
2)on_open:当用户调用 ipc_server::open 函数,且监听某一服务地址成功时通过该函数将实际的监听地址传递给子类对象(因为在调用 open(addr) 时,addr 一般仅指定 IP 地址,同时将 端口赋 0 以便于操作系统自动分配本地端口号,所以通过 on_open 便可以将实际的 端口传递给子类对象);
3)on_close:当监听流关闭时调用此函数通知子类对象。
相对于 ipc_server 类,则 ipc_client 的接口就显得比较多,主要的函数如下:
1)open:共有四个 open 重载函数用连接监听流服务地址,其中两个是同步流方式,两个是异步流方式,一般同步建立的流用在阻塞式线程中,异步建立的流用在非阻塞线程中;
2)send_message:一般用来向非阻塞主线程发送消息;
3)on_message:异步接收到阻塞线程发来的消息的回调函数;
4)append_message:添加异步流想要接收的消息号。
ipc_client 类比较特殊,其充当着双重身份:1)作为客户端连接流连接监听流的服务地址,一般用在阻塞式线程中;2)作为监听流接收到来自于客户端流的连接请求而创建的与之对应的服务端异步流,一般用在非阻塞线程中。
下面以一个具体的实例来说明如果使用 ipc_server 及 ipc_client 两个类:
#include "lib_acl.h"
#include <iostream>
#include "aio_handle.hpp"
#include "ipc_server.hpp"
#include "ipc_client.hpp"
using namespace acl;
#define MSG_REQ 1
#define MSG_RES 2
#define MSG_STOP 3
// 消息客户端连接类定义
class test_client1 : public ipc_client
{
public:
test_client1()
{
}
~test_client1()
{
}
// 连接消息服务端地址成功后的回调函数
virtual void on_open()
{
// 添加消息回调对象,接收消息服务器的
// 此类消息
this->append_message(MSG_RES);
// 向消息服务器发送请求消息
this->send_message(MSG_REQ, NULL, 0);
// 异步等待来自于消息服务器的消息
wait();
}
// 流关闭时的回调函数
virtual void on_close()
{
delete this;
}
// 接收到消息服务器的消息时的回调函数,其中的消息号由
// append_message 进行注册
virtual void on_message(int nMsg, void*, int)
{
std::cout << "test_client1 on message:" << nMsg << std::endl;
// 向消息服务器发送消息,通知消息服务器停止
this->send_message(MSG_STOP, NULL, 0);
// 删除在 on_open 中注册的消息号
this->delete_message(MSG_RES);
// 本异步消息过程停止运行
this->get_handle().stop();
// 关闭本异步流对象
this->close();
}
protected:
private:
};
// 消息客户端处理过程
static bool client_main(aio_handle* handle, const char* addr)
{
// 创建消息连接
ipc_client* ipc = new test_client1();
// 连接消息服务器
if (ipc->open(handle, addr, 0) == false)
{
std::cout << "open " << addr << " error!" << std::endl;
delete ipc;
return (false);
}
return (true);
}
// 子线程的入口函数
static void* thread_callback(void *ctx)
{
const char* addr = (const char*) ctx;
aio_handle handle;
if (client_main(&handle, addr) == false)
{
handle.check();
return (NULL);
}
// 子线程的异步消息循环
while (true)
{
if (handle.check() == false)
break;
}
// 最后清理一些可能未关闭的流连接
handle.check();
return (NULL);
}
// 消息服务器接收到的客户端连接流类定义
class test_client2 : public ipc_client
{
public:
test_client2()
{
}
~test_client2()
{
}
virtual void on_close()
{
delete this;
}
// 接收到消息客户端发来消息的回调函数
virtual void on_message(int nMsg, void*, int)
{
std::cout << "test_client2 on message:" << nMsg << std::endl;
// 如果收到消息客户端要求退出的消息,则主线程了退出
if (nMsg == MSG_STOP)
{
this->close();
// 通知主线程的非阻塞引擎关闭
this->get_handle().stop();
}
else
// 回应客户端消息
this->send_message(MSG_RES, NULL, 0);
}
protected:
private:
};
// 主线程的消息服务器 ipc 服务监听类定义
class test_server : public ipc_server
{
public:
test_server()
{
}
~test_server()
{
}
// 消息服务器接收到消息客户端连接时的回调函数
void on_accept(aio_socket_stream* client)
{
// 创建 ipc 连接对象
ipc_client* ipc = new test_client2();
// 打开异步IPC过程
ipc->open(client);
// 添加消息回调对象
ipc->append_message(MSG_REQ);
ipc->append_message(MSG_STOP);
ipc->wait();
}
protected:
private:
};
static void usage(const char* procname)
{
printf("usage: %s -h[help] -t[use thread]\n", procname);
}
int main(int argc, char* argv[])
{
int ch;
bool use_thread = false;
while ((ch = getopt(argc, argv, "ht")) > 0)
{
switch (ch)
{
case 'h':
usage(argv[0]);
return (0);
case 't':
use_thread = true;
break;
default:
break;
}
}
acl_init();
aio_handle handle;
ipc_server* server = new test_server();
// 使消息服务器监听 127.0.0.1 的地址
if (server->open(&handle, "127.0.0.1:0") == false)
{
delete server;
std::cout << "open server error!" << std::endl;
getchar();
return (1);
}
char addr[256];
#ifdef WIN32
_snprintf(addr, sizeof(addr), "%s", server->get_addr());
#else
snprintf(addr, sizeof(addr), "%s", server->get_addr());
#endif
if (use_thread)
{
// 使消息客户端在子线程中单独运行
acl_pthread_t tid;
acl_pthread_create(&tid, NULL, thread_callback, addr);
}
// 因为消息客户端也是非阻塞过程,所以也可以与消息服务器
// 在同一线程中运行
else
client_main(&handle, addr);
// 主线程的消息循环过程
while (true)
{
if (handle.check() == false)
{
std::cout << "stop now!" << std::endl;
break;
}
}
delete server;
handle.check(); // 清理一些可能未关闭的异步流对象
std::cout << "server stopped!" << std::endl;
getchar();
return (0);
}
以上例子相对简单,其展示的消息服务器与消息客户端均是非阻塞过程,其实将上面的异步消息客户端稍微一改便可以改成同步消息客户端了,修改部分如下:
class test_client3 : public ipc_client
{
public:
test_client3()
{
}
~test_client3()
{
}
virtual void on_open()
{
// 添加消息回调对象
this->append_message(MSG_RES);
// 向消息服务器发送请求消息
this->send_message(MSG_REQ, NULL, 0);
// 同步等待消息
wait();
}
virtual void on_close()
{
delete this;
}
virtual void on_message(int nMsg, void*, int)
{
std::cout << "test_client3 on message:" << nMsg << std::endl;
this->send_message(MSG_STOP, NULL, 0);
this->delete_message(MSG_RES);
this->close();
}
protected:
private:
};
// 子线程处理过程
static bool client_main(const char* addr)
{
// 创建消息客户端对象
ipc_client* ipc = new test_client3();
// 同步方式连接消息服务器
if (ipc->open(addr, 0) == false)
{
std::cout << "open " << addr << " error!" << std::endl;
delete ipc; // 当消息客户端未成功创建时需要在此处删除对象
return (false);
}
return (true);
}
static void* thread_callback(void *ctx)
{
const char* addr = (const char*) ctx;
if (client_main(addr) == false)
return (NULL);
return (NULL);
}
对比 test_client1 与 test_client_3 两个消息客户端,可以发现二者区别并不太大,关键在于调用 open 时是采用了异步还是同步连接消息服务器,其决定了消息客户端是异步的还是同步的。
示例代码:samples/aio_ipc
acl_cpp 下载:http://sourceforge.net/projects/acl/
原文地址:http://zsxxsz.iteye.com/blog/1495832
更多文章:http://zsxxsz.iteye.com/
QQ 群:242722074