Maple-分布式服务器框架
一. 概述
I. 解决什么问题
笔者在做游戏服务器的设计时,经历了纯c++到纯python, 再到c++&python的演变流程。最终的设计方案即现在maple。
最重要是要找到几个问题的平衡点:
- 服务器性能
- 研发速度
maple很好的兼顾了上面几条。
maple于2014年12月在我们自己的业务上线,期间经过多次迭代升级,至今一直在线上稳定运行。
maple与2016年10月底升级到v2.x,支持集群(cluster),并部署在外网运行。
II. 模块简介
maple基本框架为 gateway + worker + trigger + forwarder(仅cluster模式需要)。
- gateway
使用c++编写,内部使用epoll驱动。
负责客户端连接管理,以及对worker的任务派发。
支持连接数限制,包大小限制,频率限制、最长不活跃时间。
内置tool_stat统计工具,支持统计连接数、worker数、处理时长等。
- worker
使用python编写。
负责向gateway索要任务,以及消息与处理函数的映射等。
与gateway使用主动索要任务的方式通信,有效实现负载均衡。
支持安全停止和安全重启。
- trigger
使用python编写。
类似触发器的设计,通过向gateway发送消息,目标可以是worker,也可以直接是client。
发送消息时,支持如果发生失败或连接失败,会循环尝试至成功为止。
- forwarder
使用c++编写,内部使用epoll驱动,存储使用redis。
cluster模式时,作为trigger与多gateway之间的转发server。
二. 部署运行
I. gateway部署运行
-
部署环境
目前gateway仅在centos6.4/6.5 X64上编译测试过,主要公司服务器都是用这个版本,也没有过多精力去测试其他版本。
-
安装步骤
-
拉取代码
git clone https://github.com/dantezhu/maple
-
拉取依赖代码
cd maple git submodule update --init
-
编译
cd cpp; make require; make all
- 等待编译完成,在gateway/bin目录下将会有如下文件列表
- gateway 程序主文件
- tool_stat 统计工具
-
-
配置文件
etc 目录下的 config.ini 是配置文件模板,内容如下:
[outer] ; 绑定IP,支持批量,逗号分隔 host=0.0.0.0 port=29000 ; listen函数的backlog参数,默认0。 backlog=512 ; 空闲超时(float),长期没收到消息被认为连接超时,默认-1。-1代表永不超时 timeout=-1 ; 初始读取buf大小,默认0。<=0代表从1开始,且延时分配;>0则立即分配 recv_buf_init_size=1024 ; 最大读取buf大小,默认-1。-1代表无限 recv_buf_max_size=-1 ; 最大发送buf大小,默认-1。-1代表无限 send_buf_max_size=-1 ; 最大连接数,默认-1。-1代表无限 conns_max_size=500000 ; 最大连接后备数,默认0。-1代表无限 conns_reserve_max_size=0 ; 可以防御针对协议的CC攻击 ; 不需要配置IP的请求量限制,因为如果要攻击的话,要不就是每个conn都大量发送消息,要不就是每个conn发几个消息就关闭连接,然后再重新建立连接。 ; 格式为: 请求数/秒数,请求数=-1为不限制 ; 中间以英文逗号分隔,不同配置之间是 或 的关系 ; 不配置为不限制 ; 未登录连接请求最大频率 unauthed_conn_req_max_rate=-1/10,-1/60 ; 已登录连接请求频率限制 authed_conn_req_max_rate=-1/10 ; 可以防御CC攻击或者连接数消耗攻击 ; 格式为: 单个IP建立连接的数量/秒数,连接数=-1为不限制 ; 中间以英文逗号分隔,不同配置之间是 或 的关系 ; 不配置为不限制 ; IP新建连接最大频率 ip_conn_create_max_rate=-1/10 ; 启动后延迟多少秒开始进行检查IP。 不配置默认为0 ; 主要为了解决重启gateway后,用户大量重连的问题,比如一个移动基站下会有很多用户IP都会瞬间重连进来 ip_rate_check_delay=0 ; 白名单IP列表,对conn和ip频率限制都生效。格式: 127.0.0.1,192.168.1.1 rate_unlimited_ip_list= [inner] ; 绑定IP,支持批量,逗号分隔 host=127.0.0.1 port=28000 backlog=512 timeout=-1 recv_buf_init_size=1024 recv_buf_max_size=-1 send_buf_max_size=-1 conns_max_size=-1 conns_reserve_max_size=0 tasks_max_size=-1 ; 分派任务最大频率 task_alloc_max_rate=-1/10 [node] ; cluster ; 节点ID(0~32767)。 >0 代表开启cluster部署。务必确保每个节点ID不重复 id=1 [store] ; cluster host=127.0.0.1 port=6379 ; 连接、接收超时(float),1为永不超时。如果超时会断掉连接,重新连接 timeout=10 ; 密码 password= ; 存储database database=0 ; 存储的key统一前缀 key_prefix=maple: ; 等待处理的任务的最大值。-1代表无限 tasks_max_size=-1 ; 存储用户最长时间(秒)。-1代表永远不过期。这里不用配置特别短,主要是为了解决gateway重启,数据不会失效的问题 user_max_age=604800 ; user续期间隔因子(float),默认为0.5。-1代表不主动续期。即 续期间隔=user_max_age * factor user_renew_timeout_factor=0.5 [log] ; debug: 调试阶段使用,用来排查问题。 ; info: 一些异常的数据,或者需要告知一下的信息。 ; warn: 触发了某些限制,或者放任不管可能导致更严重的问题。 ; error: 是程序的问题,并且严重等级较高,需要研发进行处理;或者是想记录在日志中的重要信息。 log_level=error ; 日志类型,默认0 ; 0: 固定文件最大数量 ; 1: 按日期分日志,每天也会固定最大文件数量 ; 2: 按小时,其他同上 log_type=0 ; log_dir 是相对于执行目录 log_dir=logs file_prefix=gw ; 文件大小, 单位是M file_max_size=1024 file_max_num=20 [stat] file_name=stat_file
配置都有注释,就不过多解释了
-
运行
-
启动gateway(在主目录下):
./bin/gateway -c etc/config.ini
-
查看统计(具体命令可以通过-h选项查看)
./tool_stat -f stat_file
-
统计示例为:
CLIENTS : 849 AUTHED_CLIENTS : 834 TRIGGERS : 1 WORKERS : 16 IDLE_WORKERS : 16 STORES : 1 CONNECTED_STORES : 1 CLIENT_REQ : 3628425 CLIENT_CREATED : 243636 CLIENT_CLOSED : 242787 CLIENT_TIMEOUT : 2059 CLIENT_CONN_OVERFLOW : 0 CLIENT_SEND_BUF_OVERFLOW : 0 CLIENT_RECV_BUF_OVERFLOW : 0 CLIENT_PENDING_SEND_BUF : 9683 CLIENT_CREATE_RATE_LIMITED : 0 CLIENT_UNAUTHED_REQ_RATE_LIMITED : 0 CLIENT_AUTHED_REQ_RATE_LIMITED : 0 INNER_REQ : 16713759 INNER_CREATED : 33 INNER_CLOSED : 16 INNER_TIMEOUT : 0 INNER_CONN_OVERFLOW : 0 INNER_TASK_OVERFLOW : 0 INNER_SEND_BUF_OVERFLOW : 0 INNER_RECV_BUF_OVERFLOW : 0 INNER_PENDING_TASKS : 0 INNER_PENDING_SEND_BUF : 0 INNER_TASK_ALLOC_RATE_LIMITED : 0 STORE_REQ : 40955 STORE_RSP : 40955 STORE_RSP_FAIL : 0 STORE_CONNECT : 1 STORE_CONNECTED : 1 STORE_CLOSED : 0 STORE_TIMEOUT : 0 STORE_TASK_OVERFLOW : 0 STORE_PENDING_TASKS : 0 CMD_WORKER_ASK_FOR_TASK : 4285136 CMD_WRITE_TO_CLIENT : 3628504 CMD_WRITE_TO_USERS : 8604539 CMD_CLOSE_CLIENT : 12 CMD_CLOSE_USERS : 8 CMD_LOGIN_CLIENT : 23777 CMD_LOGOUT_CLIENT : 1499 CMD_WRITE_TO_WORKER : 170272 CMD_CLEAR_CLIENT_TASKS : 12 TASK_TIME_1_MS : 4134495 TASK_TIME_3_MS : 118357 TASK_TIME_5_MS : 13474 TASK_TIME_10_MS : 12537 TASK_TIME_30_MS : 5628 TASK_TIME_50_MS : 515 TASK_TIME_100_MS : 53 TASK_TIME_300_MS : 45 TASK_TIME_500_MS : 0 TASK_TIME_1_S : 0 TASK_TIME_3_S : 0 TASK_TIME_5_S : 0 TASK_TIME_10_S : 0 TASK_TIME_MORE : 0 STORE_TIME_1_MS : 39227 STORE_TIME_3_MS : 963 STORE_TIME_5_MS : 308 STORE_TIME_10_MS : 295 STORE_TIME_30_MS : 146 STORE_TIME_50_MS : 11 STORE_TIME_100_MS : 1 STORE_TIME_300_MS : 3 STORE_TIME_500_MS : 0 STORE_TIME_1_S : 0 STORE_TIME_3_S : 0 STORE_TIME_5_S : 0 STORE_TIME_10_S : 0 STORE_TIME_MORE : 0
-
统计项说明:
CLIENTS // client数量 AUTHED_CLIENTS // 已登录client数量 TRIGGERS // trigger数量 WORKERS // worker数量 IDLE_WORKERS // 空闲worker数量 STORES // store数量 CONNECTED_STORES // store连接数量 CLIENT_REQ // client请求次数 CLIENT_CREATED // client连接建立次数 CLIENT_CLOSED // client连接断开次数 CLIENT_TIMEOUT // client连接超时次数 CLIENT_CONN_OVERFLOW // 超过client数量次数 CLIENT_SEND_BUF_OVERFLOW // 超过client等待发送的buf大小而被抛弃的次数 CLIENT_RECV_BUF_OVERFLOW // 超过client接收的buf大小而被抛弃的次数 CLIENT_PENDING_SEND_BUF // 所有client等待发送的buf大小 CLIENT_CREATE_RATE_LIMITED // 触发建立连接频率限制的次数 CLIENT_UNAUTHED_REQ_RATE_LIMITED // 触发未登录client请求频率限制的次数 CLIENT_AUTHED_REQ_RATE_LIMITED // 触发已经登录client请求频率限制的次数 INNER_REQ // inner请求次数,包含所有命令字 INNER_CREATED // inner连接建立次数 INNER_CLOSED // inner连接断开次数 INNER_TIMEOUT // inner连接超时次数 INNER_CONN_OVERFLOW // 超过inner数量次数 INNER_TASK_OVERFLOW // 超过转发给inner任务队列大小而被丢弃的数量 INNER_SEND_BUF_OVERFLOW // 超过inner等待发送的buf大小而被抛弃的次数 INNER_RECV_BUF_OVERFLOW // 超过inner接收的buf大小而被抛弃的次数 INNER_PENDING_TASKS // 等待转发给inner的任务数 INNER_PENDING_SEND_BUF // 所有inner等待发送的buf大小 INNER_TASK_ALLOC_RATE_LIMITED // 触发inner任务分配频率限制的次数 STORE_REQ // store发出的请求量,必须是真的发送(包括select database) STORE_RSP // store收到的请求量,包括所有的req对应的回应 STORE_RSP_FAIL // store收到的失败请求量 STORE_CONNECT // store连接建立尝试 STORE_CONNECTED // store连接建立成功 STORE_CLOSED // store连接被关闭,包括未连接成功的状态下close,所以值可能大于STORE_CONNECTED STORE_TIMEOUT // store连接超时(包括connect和recv) STORE_TASK_OVERFLOW // 超过store任务队列大小而被丢弃的数量 STORE_PENDING_TASKS // 等待发送到store的任务 CMD_WORKER_ASK_FOR_TASK // CMD 开头都是命令字 TASK_TIME_1_MS // 任务处理时长统计 STORE_TIME_1_MS // store处理时长统计
-
-
supervisor配置
[program:gateway] directory=/data/release/gateway/ command=/data/release/gateway/bin/gateway -c etc/config.ini user=root autorestart=true redirect_stderr=true
II. worker、trigger部署运行
-
安装
pip install maple
maple 依赖 netkit
-
模块引入
from maple import Worker, Blueprint, Trigger
-
代码示例
app.py(worker):
from maple import Worker from netkit.box import Box import user app = Worker(Box) app.register_blueprint(user.bp) @app.route(2) def login(request): uid = request.box.get_json()["uid"] request.login_client(uid) # request.logout_client() request.write_to_client(dict( ret=0, body="login %s" % uid )) app.run("192.168.1.67", 28000, workers=2, debug=True)
user.py(blueprint):
from maple import Blueprint bp = Blueprint('user') @bp.route(100) def test(request): request.write_to_client(dict( ret=0 )) return
trigger.py(trigger):
from maple import Trigger from netkit.box import Box def main(): trigger = Trigger(Box, '192.168.1.67', 28000) print trigger.write_to_worker(dict( cmd=3, ret=100, body='from trigger: %s' % it )) # print trigger.close_users([constants.CONNS_AUTHED], 0) # print trigger.write_to_users([ # ([constants.CONNS_ALL], dict(cmd=1, body='direct event from trigger'), 0, [1, 2]) # ]) main()
更多功能的代码可参见 examples
-
安全停止和安全重启
-
安全停止
kill -TERM $pid
-
安全重启
kill -HUP $pid
-
强制停止(不建议)
kill -INT $pid kill -QUIT $pid
-
-
supervisor配置
-
使用maple自身作为父进程进行管理:
[program:maple_worker] environment=PYTHON_EGG_CACHE=/tmp/.python-eggs/ directory=/maple/normal_test/ command=/usr/local/bin/python worker.py user=dantezhu autorestart=true redirect_stderr=true stopsignal=TERM stopwaitsecs=20
优点:
- supervisor 控制台只看到一个父进程,管理比较简单
缺点:
- 没有什么明显缺点。只要记得配置worker.stop_timeout,就可以正常KILL掉没有及时停止的子进程
-
使用supervisor作为父进程进行管理:
[program:maple_worker] environment=PYTHON_EGG_CACHE=/tmp/.python-eggs/,MAPLE_WORKER=true directory=/maple/normal_test/ command=/usr/local/bin/python worker.py numprocs=2 process_name=%(program_name)s_%(process_num)02d user=dantezhu autorestart=true redirect_stderr=true stopsignal=TERM stopwaitsecs=10
优点:
- 可以直接看到所有进程的状态
- 一旦worker stop超时,supervisor会通过 kill -9 将worker强制杀死,不会留下僵尸进程
缺点:
- 如果进程数太多,在supervisor控制台会看到一堆进程
- 可以直接看到所有进程的状态
-
III. forwarder部署运行
-
部署环境
目前forwarder仅在centos6.4/6.5 X64上编译测试过。
-
安装步骤
之前编译gateway的时候,其实forwarder已经一起编译出来了,在 forwarder/bin 下有如下文件列表:
- forwarder 程序主文件
- tool_stat 统计工具
-
配置文件
etc 目录下的 config.ini 是配置文件模板,内容如下:
[outer] ; 绑定IP,支持批量,逗号分隔 host=0.0.0.0 port=25000 ; listen函数的backlog参数,默认0。 backlog=512 ; 空闲超时(float),长期没收到消息被认为连接超时,默认-1。-1代表永不超时 timeout=-1 ; 初始读取buf大小,默认0。<=0代表从1开始,且延时分配;>0则立即分配 recv_buf_init_size=1024 ; 最大读取buf大小,默认-1。-1代表无限 recv_buf_max_size=-1 ; 最大连接数,默认-1。-1代表无限 conns_max_size=-1 ; 最大连接后备数,默认0。-1代表无限 conns_reserve_max_size=0 [node] ; 节点列表。格式: node_id:node_inner_host:node_inner_port,node_id:node_inner_host:node_inner_port ; 支持 kill -hup 热加载 addr_list=1:127.0.0.1:28000 ; 连接超时(float),-1代表永不超时。如果超时会断掉连接,重新连接 timeout=10 ; 等待发送buf的最大值。-1代表无限 send_buf_max_size=-1 [store] host=127.0.0.1 port=6379 ; 连接、接收超时(float),1为永不超时。如果超时会断掉连接,重新连接 timeout=10 ; 密码 password= ; 存储database database=0 ; 存储的key统一前缀 key_prefix=maple: ; 等待处理的任务的最大值。-1代表无限 tasks_max_size=-1 [log] ; debug: 调试阶段使用,用来排查问题。 ; info: 一些异常的数据,或者需要告知一下的信息。 ; warn: 触发了某些限制,或者放任不管可能导致更严重的问题。 ; error: 是程序的问题,并且严重等级较高,需要研发进行处理;或者是想记录在日志中的重要信息。 log_level=error ; 日志类型,默认0 ; 0: 固定文件最大数量 ; 1: 按日期分日志,每天也会固定最大文件数量 ; 2: 按小时,其他同上 log_type=0 ; log_dir 是相对于执行目录 log_dir=logs file_prefix=fw ; 文件大小, 单位是M file_max_size=1024 file_max_num=20 [stat] file_name=stat_file
配置都有注释,就不过多解释了
-
运行
-
启动forwarder(在主目录下):
./bin/forwarder -c etc/config.ini
-
重新加载 node_list
kill -hup $pid
-
查看统计(具体命令可以通过-h选项查看)
./tool_stat -f stat_file
-
统计示例为:
TRIGGERS : 115 STORES : 115 CONNECTED_STORES : 115 NODES : 2 CONNECTED_NODES : 2 TRIGGER_REQ : 13854829 TRIGGER_CREATED : 293 TRIGGER_CLOSED : 178 TRIGGER_TIMEOUT : 0 TRIGGER_CONN_OVERFLOW : 0 STORE_REQ : 9541107 STORE_RSP : 9541107 STORE_RSP_FAIL : 0 STORE_CONNECT : 293 STORE_CONNECTED : 293 STORE_CLOSED : 178 STORE_TIMEOUT : 0 STORE_TASK_OVERFLOW : 0 STORE_PENDING_TASKS : 0 NODE_REQ : 13734384 NODE_CONNECT : 2670 NODE_CONNECTED : 4 NODE_CLOSED : 2668 NODE_TIMEOUT : 0 NODE_SEND_BUF_OVERFLOW : 0 NODE_PENDING_SEND_BUF : 0 CMD_WRITE_TO_CLIENT : 3973486 CMD_WRITE_TO_USERS : 9579355 CMD_CLOSE_CLIENT : 12 CMD_CLOSE_USERS : 19 CMD_LOGIN_CLIENT : 26374 CMD_LOGOUT_CLIENT : 1756 CMD_WRITE_TO_WORKER : 273815 CMD_CLEAR_CLIENT_TASKS : 12 STORE_TIME_1_MS : 9409169 STORE_TIME_3_MS : 53399 STORE_TIME_5_MS : 27652 STORE_TIME_10_MS : 32027 STORE_TIME_30_MS : 17472 STORE_TIME_50_MS : 667 STORE_TIME_100_MS : 50 STORE_TIME_300_MS : 378 STORE_TIME_500_MS : 0 STORE_TIME_1_S : 0 STORE_TIME_3_S : 0 STORE_TIME_5_S : 0 STORE_TIME_10_S : 0 STORE_TIME_MORE : 0
-
统计项说明:
TRIGGERS // trigger数量 STORES // store数量 CONNECTED_STORES // store连接数量 NODES // node数量 CONNECTED_NODES // node连接数量 TRIGGER_REQ // 来自trigger的请求数 TRIGGER_CREATED // trigger连接建立 TRIGGER_CLOSED // trigger连接断开 TRIGGER_TIMEOUT // trigger连接超时 TRIGGER_CONN_OVERFLOW // 超过trigger数量次数 STORE_REQ // store发出的请求量,必须是真的发送(包括select database) STORE_RSP // store收到的请求量,包括所有的req对应的回应 STORE_RSP_FAIL // store收到的失败请求量 STORE_CONNECT // store连接建立尝试 STORE_CONNECTED // store连接建立成功,之所以不单独统计connect fail,是因为不是每次都能正常触发 STORE_CLOSED // store连接被关闭,包括未连接成功的状态下close,所以值可能大于STORE_CONNECTED STORE_TIMEOUT // store连接超时(包括connect和recv) STORE_TASK_OVERFLOW // 超过store任务队列大小而被丢弃的数量 STORE_PENDING_TASKS // 等待发送到store的任务 NODE_REQ // node发出的请求量 NODE_CONNECT // node连接建立尝试 NODE_CONNECTED // node连接建立成功 NODE_CLOSED // node连接关闭,有可能是在尚未连接成功的状态下close NODE_TIMEOUT // node连接超时(包括connect) NODE_SEND_BUF_OVERFLOW // 超过发送buf大小而被抛弃的次数 NODE_PENDING_SEND_BUF // 等待发送到node的buf大小 CMD_WRITE_TO_CLIENT // CMD开头都是命令字 STORE_TIME_1_MS // store处理时长统计
-
-
supervisor配置
[program:forwarder] directory=/data/release/forwarder/ command=/data/release/forwarder/bin/forwarder -c etc/config.ini user=root autorestart=true redirect_stderr=true
三. API介绍
I. Worker
-
Worker(box_class)
构造函数-
box_class 用来做网络包收发,可用
from netkit.box import Box
-
-
register_blueprint(self, blueprint)
注册blueprint- blueprint blueprint
-
run(host=None, port=None, debug=None, workers=None)
启动worker- host gateway的inner host
- port gateway的inner port
- debug 是否debug,默认False
- workers workers数量,默认为1
-
route(cmd)
装饰器。注册cmd的处理函数。被注册函数格式为:def f(request):pass
- cmd 命令字
-
create_worker(f)
装饰器。当worker创建时触发。被注册函数格式为:def f(): pass
-
before_first_request
装饰器。当第一个请求到来时触发。被注册函数格式为:def f(request): pass
-
before_first_request
装饰器。当第一个请求到来时触发。被注册函数格式为:def f(request): pass
-
before_first_request
装饰器。当第一个请求到来时触发。被注册函数格式为:def f(request): pass
-
before_request
装饰器。当请求到来时触发。被注册函数格式为:def f(request): pass
-
after_request
装饰器。当请求处理结束时触发。被注册函数格式为:def f(request): pass
-
before_response
装饰器。当发送响应前触发。被注册函数格式为:def f(conn, response): pass
其中 response 为最终的字节流格式(str)
-
after_response
装饰器。当第一个请求到来时触发。被注册函数格式为:def f(conn, response, result): pass
其中 result 为 True/False,分别代表发送成功/失败
-
close_conn
装饰器。当worker与gateway的连接被关闭时触发。被注册函数格式为:def f(conn): pass
-
create_client
装饰器。当客户端连接建立时触发。被注册函数格式为:def f(request): pass
-
close_client
装饰器。当客户端连接被关闭时触发。被注册函数格式为:def f(request): pass
work_timeout
任务处理最长时间,如果超过该值,子进程会自杀。
默认为None,即不超时stop_timeout
进程停止超时。
当使用 TERM 命令进行停止时,如果超过stop_timeout,那么就会给子进程强制发送KILL命令rsp_once
write_to_client 可以调用几次。如果为True代表只能调用一次,多次调用会抛出异常。
默认为True
II. Blueprint
-
Blueprint(name)
构造函数- name blueprint的名字。在request.endpoint中有用
-
其他装饰器等
与Worker中基本相同,并多了before_app_request这种app层级的回调。 顺序基本是:
app.before_request blueprint.before_app_request blueprint.before_request f() blueprint.after_request blueprint.after_app_request app.after_request
III. Request
-
login_client(uid, userdata=None)
标记用户为已登录- uid 用户ID
- userdata 用户数据,主要存储标记位
logout_client()
标记当前连接为未登录write_to_client(data)
写回响应,返回的cmd和sn均与传入的一致。
data的格式可以为 Box,也可以为 dict。如果是dict,将会传入至Box的map函数中返回一个新box。-
write_to_users(data_list)
批量向用户组发送消息-
data_list
[(uids, box), (uids, box, userdata), (uids, box, userdata, exclude) ...] uid支持特殊值:CONNS_AUTHED : 所有已登录连接 CONNS_ALL : 所有连接 CONNS_UNAUTHED : 所有未登录连接
userdata可不传,默认为0,conn.userdata & userdata == userdata exclude 代表排除的uid列表
-
-
write_to_worker(data)
发送消息到worker进行处理- data 可以为 box 或者 dict
close_client()
关闭当前连接close_users(uids, userdata=None, exclude=None)
批量关闭用户连接,uid同样支持特殊负值clear_client_tasks()
清空该连接产生的所有task。
一般在client有大量异常消息时使用。-
task
gateway包装过的任务box- uid 登录用户ID,>0 为已登陆
- userdata login_client 时传入的userdata
- inner False代表从client端来的消息,True代表从trigger来的消息
cmd
消息命令字,即box.cmdendpoint
如果是app级别,就是func_name;如果blueprint级别,就是blueprint_name.func_namebox
消息boxinterrupt
是否中断处理,即不执行对应的处理函数直接返回,默认为False. 比如在before_request检验权限不符合,即可设置为False.
IV. Trigger
事件触发,其实在gateway端,trigger和worker是同一个连接类型。
注意: trigger是线程安全的,内部实现了加锁
其实trigger就是一个触发器,触发worker,也可以触发client。
但是常用的方法是,触发worker,让worker去做对应的事情,这样可以保证逻辑的独立
-
Trigger(box_class, host, port, ensure=True)
构造函数- box_class 上面已解释
- host gateway的inner host
- port gateway的inner port
- ensure 确保发送成功,只要连接失败或者发送失败就会一直重试。默认为True
write_to_users(data_list)
与 request.write_to_users一致-
write_to_worker(data, node_id=None)
发送消息到worker进行处理- data 可以为 box 或者 dict
- node_id 如果连接的是forwarder的话,node_id可以指定希望转发到的node
close_users(uids, userdata=None, exclude=None)
与 request.close_users 一致
四. 技术实现
I. gateway
gateway使用epoll模型,单进程单线程。
绑定两个端口,分别接入外网连接和内网连接。
外网连接即客户端连接。
内网连接里面分worker和trigger,worker是可以接受任务和返回消息的,而trigger只能返回消息。
trigger可以升级为worker,在跟gateway 索要 task 后。
所有的数据buf,都经过一层封装,再转发出去。
所以,创建一个新的box类型: Task
多了几个字段: node_id, tag, conn_id, uid, userdata, client_ip_num 等
- node_id 指定client所属的node,如果非cluster模式,应该为0
- tag 为了防止gw重启导致conn_id重新从0计数,从而出现混淆
- conn_id 连接ID,自增的long long类型,确保是唯一的
- uid 登录的用户id
- userdata 登录用户的属性
- client_ip_num 客户端ip的int型表示(使用网络序),worker端可以通过 request.task.client_ip 来取到对应的字符串类型
在发送数据时,判断连接是否match的规则是:
- 判断 node_id 是否相等,确保task与gateway属于同一node。
- 判断 tag 是否相等,确保gateway没有重启过
- 判断 conn_id 对应的conn是否存储并合法
II. worker
worker使用单进程单线程的模型,并采取主动向gateway索要任务的方式。这样可以有效的保证负载均衡。
worker中比较细节的是对信号的处理,具体可以参看代码,这里不再赘述.
III. forwarder
forwarder使用epoll模型,单进程单线程。
为了能够异步使用hiredis库,在hiredis的异步接口上做了一层epoll接口的封装。
forwarder的设计上,需要确保的几个关键点在于:
- 同一个trigger发过来的消息,必须保证是同样的顺序发往gateway
- 不同trigger之间,不能互相影响
对于第一点,就要求单个trigger的消息处理必须有序,不能并发。
对于第二点,则要求多个trigger之间不能共享消息处理类。
比如trigger有时候需要访问store来获取转发的node列表(write_to_users/close_users),此时就不能使用store连接池。
所以最终的实现是,当某个trigger建立连接后,就会自动为它创建一条与store的连接,并仅供该trigger使用。
而当trigger连接被关闭时,如果store的任务也已经全部处理完,则store的连接也一起关闭;如果此时store还有任务要处理,则不跟随trigger一起关闭,而是等任务处理完后,自行关闭。
所以,我们使用trigger应该尽量使用长连接的方式,并且当前python代码中的trigger也是这么实现的。
五. 最佳实践
-
集群(cluster)部署
cluster模式下,我们按照node来划分区域,每个node内部包含 一个gateway + 一组worker。
所以,gateway的配置中有一个 node.id,就是用来指定自己所属于的node。
当然,这个node更多的是逻辑上的区分,在客户端看来,所有用户是共服的。forwarder会作为转发server,负责将发送给client的消息,按照node_id转发到对应的node内部的gateway上。
而由于forwarder本身是无状态的,所以本身如果需要扩容,也可以直接平行扩容。
当然,如果要使用forwarder的话,那么类似 request.write_to_users/request.close_users 这种函数调用就都不能使用了,取而代之的是 trigger.write_to_users/trigger.close_users,或者使用 virtual_request 来模拟request。
具体代码参看 cluster_demo -
防御攻击
gateway内部配有简单的针对单个连接请求频率的限制,以及单个IP建立连接的频率限制。
但是达到频率限制之后,并不会对连接或者IP有任何惩罚处理。
所以我编写了 maple_guard 来进行异步的防御,即检测到恶意连接或者IP,就会进行下线并封IP的处理。
但是因为该工具是异步处理,所以gateway本身的频率控制还是要配置,只是可以稍微宽松一点。
六. 相关工具
- maple_timer 打印每个命令字的处理时长比例
- xstat 上报worker统计数据至statsd
- maple_cluster_statsd 上报gateway/forwarder统计数据至statsd
- maple_guard 为gateway加强防御攻击的能力
七. 特殊说明
由于部署gateway的机器要直连公网,所以务必不要将tcp_tw_recycle和tcp_timestamps同时开启,否则会导致connect失败的问题
目前gateway编译使用的protobuf为2.5.0,同理maple依赖的protobuf需要>=2.5.0。而貌似2.6.0以上版本执行setup.py会访问墙外的地址,所以国内执行 pip install protobuf==2.5.0 即可。
-
为什么gateway上大量client连接建立再断开,内存上涨之后不下降呢?
本来以为是conns_reserve_max_size的问题,但即使设置 conns_reserve_max_size=0,依然如此。
做了个测试,再次建立同样多的连接,会发现内存并没有上涨。
最后确定了原因是因为 deque 在pop_front之后并没有把deque内部分配的内存释放。 也就是说如果 m_pendingTasks size 达到过 1w,那么即使pop_front完已经empty,占用的内存依然和 1w 个元素一样多。 测试代码如下:
int main(int argc, char **argv) { deque<int> d; printf("before alloc\n"); for(int i=0; i<=10000000; i++) { d.push_back(i); } printf("after alloc\n"); printf("sleep begin\n"); sleep(10); // 40m printf("sleep end\n"); printf("before free\n"); while(1) { if (d.front()) { d.pop_front(); } else { break; } } printf("after free\n"); printf("sleep begin\n"); sleep(10); // 40m printf("sleep end\n"); return 0; }
网上有很多方法来解决这个问题,比如 swap, shrink_to_fit 等等,但我们的config中可以指定 tasks_max_size,所以就不对这里的内存做回收了,免得降低性能。
-
为什么BaseConnection内部永远监听读事件,而不是将读事件与写事件互斥监听?
举个例子就明白了.
假设client向server send了一个很大的buf,而当server recv到一半的时候,需要向client send一个很大的buf。
那么如果server此时取消监听读事件,只监听写事件的话,那么如果server send的buf超过了缓冲区大小;而client send的buf由于没有被server继续recv,导致也超过了缓冲区大小。
两边就永久等待对方了。
八. 使用maple的项目
- kpush 开源android push解决方案
九. 开发计划
-
worker支持group放弃支持group。
对于有分组需求的场景,可以将worker进化为broker,在server层进行分组。详见 python/maple/examples/broker_server_demo。如果要支持的话,涉及几个点:
- gateway需要配置cmd与group_id的映射关系,没有指定的cmd对应group_id为0。
- gateway支持重新加载配置的信号,否则修改group_id还要重启gateway。
- pendingMsgs需要按照group_id分组。
- pengingMsgs要针对每个group_id限制最大数量,先进先出。
- gateway对pendingMsgs的统计也要重新处理。 可以按照分为多个stat文件。
- worker在每次请求数据的时候,将对应的group_id传过来。
gateway支持cluster将guard从gateway移到业务层支持cluster之后,将gateway的双epoll改为单epoll,因为gateway已经不再是瓶颈outer和worker支持频率限制,outer的直接从guard迁移过来即可,去掉封号逻辑,而worker则要修改一下,hit成功后要能获取到还剩多少时间经过该周期。speedlimit和timer内部使用毫秒计时