reliable, scalable, distributed server framework


License
MIT
Install
pip install maple==4.1.6

Documentation

Maple-分布式服务器框架

一. 概述

I. 解决什么问题

笔者在做游戏服务器的设计时,经历了纯c++到纯python, 再到c++&python的演变流程。最终的设计方案即现在maple。
最重要是要找到几个问题的平衡点:

  1. 服务器性能
  2. 研发速度

maple很好的兼顾了上面几条。
maple于2014年12月在我们自己的业务上线,期间经过多次迭代升级,至今一直在线上稳定运行。
maple与2016年10月底升级到v2.x,支持集群(cluster),并部署在外网运行。

II. 模块简介

maple基本框架为 gateway + worker + trigger + forwarder(仅cluster模式需要)。

  • gateway
    使用c++编写,内部使用epoll驱动。
    负责客户端连接管理,以及对worker的任务派发。
    支持连接数限制,包大小限制,频率限制、最长不活跃时间。
    内置tool_stat统计工具,支持统计连接数、worker数、处理时长等。
  • worker
    使用python编写。
    负责向gateway索要任务,以及消息与处理函数的映射等。
    与gateway使用主动索要任务的方式通信,有效实现负载均衡。
    支持安全停止和安全重启。
  • trigger
    使用python编写。
    类似触发器的设计,通过向gateway发送消息,目标可以是worker,也可以直接是client。
    发送消息时,支持如果发生失败或连接失败,会循环尝试至成功为止。
  • forwarder
    使用c++编写,内部使用epoll驱动,存储使用redis。
    cluster模式时,作为trigger与多gateway之间的转发server。

二. 部署运行

I. gateway部署运行

  1. 部署环境

    目前gateway仅在centos6.4/6.5 X64上编译测试过,主要公司服务器都是用这个版本,也没有过多精力去测试其他版本。

  2. 安装步骤

    • 拉取代码

      git clone https://github.com/dantezhu/maple
      
    • 拉取依赖代码

      cd maple 
      git submodule update --init
      
    • 编译

      cd cpp; make require; make all
      
    • 等待编译完成,在gateway/bin目录下将会有如下文件列表
      • gateway 程序主文件
      • tool_stat 统计工具
  3. 配置文件

    etc 目录下的 config.ini 是配置文件模板,内容如下:

    [outer]
    ; 绑定IP,支持批量,逗号分隔
    host=0.0.0.0
    port=29000
    ; listen函数的backlog参数,默认0。
    backlog=512
    ; 空闲超时(float),长期没收到消息被认为连接超时,默认-1。-1代表永不超时
    timeout=-1
    
    ; 初始读取buf大小,默认0。<=0代表从1开始,且延时分配;>0则立即分配
    recv_buf_init_size=1024
    ; 最大读取buf大小,默认-1。-1代表无限
    recv_buf_max_size=-1
    
    ; 最大发送buf大小,默认-1。-1代表无限
    send_buf_max_size=-1
    
    ; 最大连接数,默认-1。-1代表无限
    conns_max_size=500000
    
    ; 最大连接后备数,默认0。-1代表无限
    conns_reserve_max_size=0
    
    ; 可以防御针对协议的CC攻击
    ; 不需要配置IP的请求量限制,因为如果要攻击的话,要不就是每个conn都大量发送消息,要不就是每个conn发几个消息就关闭连接,然后再重新建立连接。
    ; 格式为: 请求数/秒数,请求数=-1为不限制
    ; 中间以英文逗号分隔,不同配置之间是 或 的关系
    ; 不配置为不限制
    ; 未登录连接请求最大频率
    unauthed_conn_req_max_rate=-1/10,-1/60
    ; 已登录连接请求频率限制
    authed_conn_req_max_rate=-1/10
    
    ; 可以防御CC攻击或者连接数消耗攻击
    ; 格式为: 单个IP建立连接的数量/秒数,连接数=-1为不限制
    ; 中间以英文逗号分隔,不同配置之间是 或 的关系
    ; 不配置为不限制
    ; IP新建连接最大频率
    ip_conn_create_max_rate=-1/10
    ; 启动后延迟多少秒开始进行检查IP。 不配置默认为0
    ; 主要为了解决重启gateway后,用户大量重连的问题,比如一个移动基站下会有很多用户IP都会瞬间重连进来
    ip_rate_check_delay=0
    
    ; 白名单IP列表,对conn和ip频率限制都生效。格式: 127.0.0.1,192.168.1.1
    rate_unlimited_ip_list=
    
    [inner]
    ; 绑定IP,支持批量,逗号分隔
    host=127.0.0.1
    port=28000
    backlog=512
    timeout=-1
    
    recv_buf_init_size=1024
    recv_buf_max_size=-1
    
    send_buf_max_size=-1
    
    conns_max_size=-1
    
    conns_reserve_max_size=0
    
    tasks_max_size=-1
    
    ; 分派任务最大频率
    task_alloc_max_rate=-1/10
    
    [node]
    ; cluster
    ; 节点ID(0~32767)。 >0 代表开启cluster部署。务必确保每个节点ID不重复
    id=1
    
    [store]
    ; cluster
    host=127.0.0.1
    port=6379
    ; 连接、接收超时(float),1为永不超时。如果超时会断掉连接,重新连接
    timeout=10
    ; 密码
    password=
    ; 存储database
    database=0
    ; 存储的key统一前缀
    key_prefix=maple:
    ; 等待处理的任务的最大值。-1代表无限
    tasks_max_size=-1
    ; 存储用户最长时间(秒)。-1代表永远不过期。这里不用配置特别短,主要是为了解决gateway重启,数据不会失效的问题
    user_max_age=604800
    ; user续期间隔因子(float),默认为0.5。-1代表不主动续期。即 续期间隔=user_max_age * factor
    user_renew_timeout_factor=0.5
    
    [log]
    ; debug: 调试阶段使用,用来排查问题。
    ; info: 一些异常的数据,或者需要告知一下的信息。
    ; warn: 触发了某些限制,或者放任不管可能导致更严重的问题。
    ; error: 是程序的问题,并且严重等级较高,需要研发进行处理;或者是想记录在日志中的重要信息。
    log_level=error
    ; 日志类型,默认0
    ; 0: 固定文件最大数量
    ; 1: 按日期分日志,每天也会固定最大文件数量
    ; 2: 按小时,其他同上
    log_type=0
    ; log_dir 是相对于执行目录
    log_dir=logs
    file_prefix=gw
    ; 文件大小, 单位是M
    file_max_size=1024
    file_max_num=20
    
    [stat]
    file_name=stat_file
    

    配置都有注释,就不过多解释了

  4. 运行

    • 启动gateway(在主目录下):

       ./bin/gateway -c etc/config.ini
      
    • 查看统计(具体命令可以通过-h选项查看)

      ./tool_stat -f stat_file
      
    • 统计示例为:

      CLIENTS                                  : 849
      AUTHED_CLIENTS                           : 834
      TRIGGERS                                 : 1
      WORKERS                                  : 16
      IDLE_WORKERS                             : 16
      STORES                                   : 1
      CONNECTED_STORES                         : 1
      CLIENT_REQ                               : 3628425
      CLIENT_CREATED                           : 243636
      CLIENT_CLOSED                            : 242787
      CLIENT_TIMEOUT                           : 2059
      CLIENT_CONN_OVERFLOW                     : 0
      CLIENT_SEND_BUF_OVERFLOW                 : 0
      CLIENT_RECV_BUF_OVERFLOW                 : 0
      CLIENT_PENDING_SEND_BUF                  : 9683
      CLIENT_CREATE_RATE_LIMITED               : 0
      CLIENT_UNAUTHED_REQ_RATE_LIMITED         : 0
      CLIENT_AUTHED_REQ_RATE_LIMITED           : 0
      INNER_REQ                                : 16713759
      INNER_CREATED                            : 33
      INNER_CLOSED                             : 16
      INNER_TIMEOUT                            : 0
      INNER_CONN_OVERFLOW                      : 0
      INNER_TASK_OVERFLOW                      : 0
      INNER_SEND_BUF_OVERFLOW                  : 0
      INNER_RECV_BUF_OVERFLOW                  : 0
      INNER_PENDING_TASKS                      : 0
      INNER_PENDING_SEND_BUF                   : 0
      INNER_TASK_ALLOC_RATE_LIMITED            : 0
      STORE_REQ                                : 40955
      STORE_RSP                                : 40955
      STORE_RSP_FAIL                           : 0
      STORE_CONNECT                            : 1
      STORE_CONNECTED                          : 1
      STORE_CLOSED                             : 0
      STORE_TIMEOUT                            : 0
      STORE_TASK_OVERFLOW                      : 0
      STORE_PENDING_TASKS                      : 0
      CMD_WORKER_ASK_FOR_TASK                  : 4285136
      CMD_WRITE_TO_CLIENT                      : 3628504
      CMD_WRITE_TO_USERS                       : 8604539
      CMD_CLOSE_CLIENT                         : 12
      CMD_CLOSE_USERS                          : 8
      CMD_LOGIN_CLIENT                         : 23777
      CMD_LOGOUT_CLIENT                        : 1499
      CMD_WRITE_TO_WORKER                      : 170272
      CMD_CLEAR_CLIENT_TASKS                   : 12
      TASK_TIME_1_MS                           : 4134495
      TASK_TIME_3_MS                           : 118357
      TASK_TIME_5_MS                           : 13474
      TASK_TIME_10_MS                          : 12537
      TASK_TIME_30_MS                          : 5628
      TASK_TIME_50_MS                          : 515
      TASK_TIME_100_MS                         : 53
      TASK_TIME_300_MS                         : 45
      TASK_TIME_500_MS                         : 0
      TASK_TIME_1_S                            : 0
      TASK_TIME_3_S                            : 0
      TASK_TIME_5_S                            : 0
      TASK_TIME_10_S                           : 0
      TASK_TIME_MORE                           : 0
      STORE_TIME_1_MS                          : 39227
      STORE_TIME_3_MS                          : 963
      STORE_TIME_5_MS                          : 308
      STORE_TIME_10_MS                         : 295
      STORE_TIME_30_MS                         : 146
      STORE_TIME_50_MS                         : 11
      STORE_TIME_100_MS                        : 1
      STORE_TIME_300_MS                        : 3
      STORE_TIME_500_MS                        : 0
      STORE_TIME_1_S                           : 0
      STORE_TIME_3_S                           : 0
      STORE_TIME_5_S                           : 0
      STORE_TIME_10_S                          : 0
      STORE_TIME_MORE                          : 0
      
    • 统计项说明:

        CLIENTS                            // client数量
        AUTHED_CLIENTS                     // 已登录client数量
      
        TRIGGERS                           // trigger数量
      
        WORKERS                            // worker数量
        IDLE_WORKERS                       // 空闲worker数量
      
        STORES                             // store数量
        CONNECTED_STORES                   // store连接数量
      
        CLIENT_REQ                         // client请求次数
        CLIENT_CREATED                     // client连接建立次数
        CLIENT_CLOSED                      // client连接断开次数
        CLIENT_TIMEOUT                     // client连接超时次数
        CLIENT_CONN_OVERFLOW               // 超过client数量次数
        CLIENT_SEND_BUF_OVERFLOW           // 超过client等待发送的buf大小而被抛弃的次数
        CLIENT_RECV_BUF_OVERFLOW           // 超过client接收的buf大小而被抛弃的次数
        CLIENT_PENDING_SEND_BUF            // 所有client等待发送的buf大小
        CLIENT_CREATE_RATE_LIMITED         // 触发建立连接频率限制的次数
        CLIENT_UNAUTHED_REQ_RATE_LIMITED   // 触发未登录client请求频率限制的次数
        CLIENT_AUTHED_REQ_RATE_LIMITED     // 触发已经登录client请求频率限制的次数
      
        INNER_REQ                          // inner请求次数,包含所有命令字
        INNER_CREATED                      // inner连接建立次数
        INNER_CLOSED                       // inner连接断开次数
        INNER_TIMEOUT                      // inner连接超时次数
        INNER_CONN_OVERFLOW                // 超过inner数量次数
        INNER_TASK_OVERFLOW                // 超过转发给inner任务队列大小而被丢弃的数量
        INNER_SEND_BUF_OVERFLOW            // 超过inner等待发送的buf大小而被抛弃的次数
        INNER_RECV_BUF_OVERFLOW            // 超过inner接收的buf大小而被抛弃的次数
        INNER_PENDING_TASKS                // 等待转发给inner的任务数
        INNER_PENDING_SEND_BUF             // 所有inner等待发送的buf大小
        INNER_TASK_ALLOC_RATE_LIMITED      // 触发inner任务分配频率限制的次数
      
        STORE_REQ                          // store发出的请求量,必须是真的发送(包括select database)
        STORE_RSP                          // store收到的请求量,包括所有的req对应的回应
        STORE_RSP_FAIL                     // store收到的失败请求量
        STORE_CONNECT                      // store连接建立尝试
        STORE_CONNECTED                    // store连接建立成功
        STORE_CLOSED                       // store连接被关闭,包括未连接成功的状态下close,所以值可能大于STORE_CONNECTED
        STORE_TIMEOUT                      // store连接超时(包括connect和recv)
        STORE_TASK_OVERFLOW                // 超过store任务队列大小而被丢弃的数量
        STORE_PENDING_TASKS                // 等待发送到store的任务
      
        CMD_WORKER_ASK_FOR_TASK            // CMD 开头都是命令字
      
        TASK_TIME_1_MS                     // 任务处理时长统计
      
        STORE_TIME_1_MS                    // store处理时长统计
      
  5. supervisor配置

    [program:gateway]
    directory=/data/release/gateway/
    command=/data/release/gateway/bin/gateway -c etc/config.ini
    user=root
    autorestart=true
    redirect_stderr=true
    

II. worker、trigger部署运行

  1. 安装

    pip install maple
    

    maple 依赖 netkit

  2. 模块引入

    from maple import Worker, Blueprint, Trigger
    
  3. 代码示例

    app.py(worker):

    from maple import Worker
    from netkit.box import Box
    import user
    
    app = Worker(Box)
    app.register_blueprint(user.bp)
    
    @app.route(2)
    def login(request):
        uid = request.box.get_json()["uid"]
        request.login_client(uid)
        # request.logout_client()
        request.write_to_client(dict(
            ret=0,
            body="login %s" % uid
        ))
    
    app.run("192.168.1.67", 28000, workers=2, debug=True)
    

    user.py(blueprint):

    from maple import Blueprint
    
    bp = Blueprint('user')
    
    @bp.route(100)
    def test(request):
        request.write_to_client(dict(
            ret=0
        ))
        return
    

    trigger.py(trigger):

    from maple import Trigger
    from netkit.box import Box
    
    def main():
        trigger = Trigger(Box, '192.168.1.67', 28000)
    
        print trigger.write_to_worker(dict(
            cmd=3,
            ret=100,
            body='from trigger: %s' % it
        ))
        # print trigger.close_users([constants.CONNS_AUTHED], 0)
        # print trigger.write_to_users([
        #     ([constants.CONNS_ALL], dict(cmd=1, body='direct event from trigger'), 0, [1, 2])
        # ])
    
    
    main()
    

    更多功能的代码可参见 examples

  4. 安全停止和安全重启

    • 安全停止

      kill -TERM $pid
      
    • 安全重启

      kill -HUP $pid
      
    • 强制停止(不建议)

      kill -INT $pid
      kill -QUIT $pid
      
  5. supervisor配置

    • 使用maple自身作为父进程进行管理:

      [program:maple_worker]
      environment=PYTHON_EGG_CACHE=/tmp/.python-eggs/
      directory=/maple/normal_test/
      command=/usr/local/bin/python worker.py
      user=dantezhu
      autorestart=true
      redirect_stderr=true
      stopsignal=TERM
      stopwaitsecs=20
      

      优点:

      • supervisor 控制台只看到一个父进程,管理比较简单

      缺点:

      • 没有什么明显缺点。只要记得配置worker.stop_timeout,就可以正常KILL掉没有及时停止的子进程
    • 使用supervisor作为父进程进行管理:

      [program:maple_worker]
      environment=PYTHON_EGG_CACHE=/tmp/.python-eggs/,MAPLE_WORKER=true
      directory=/maple/normal_test/
      command=/usr/local/bin/python worker.py
      numprocs=2
      process_name=%(program_name)s_%(process_num)02d
      user=dantezhu
      autorestart=true
      redirect_stderr=true
      stopsignal=TERM
      stopwaitsecs=10
      

      优点:

      • 可以直接看到所有进程的状态
      • 一旦worker stop超时,supervisor会通过 kill -9 将worker强制杀死,不会留下僵尸进程

      缺点:

      • 如果进程数太多,在supervisor控制台会看到一堆进程

III. forwarder部署运行

  1. 部署环境

    目前forwarder仅在centos6.4/6.5 X64上编译测试过。

  2. 安装步骤

    之前编译gateway的时候,其实forwarder已经一起编译出来了,在 forwarder/bin 下有如下文件列表:

    • forwarder 程序主文件
    • tool_stat 统计工具
  3. 配置文件

    etc 目录下的 config.ini 是配置文件模板,内容如下:

    [outer]
    ; 绑定IP,支持批量,逗号分隔
    host=0.0.0.0
    port=25000
    ; listen函数的backlog参数,默认0。
    backlog=512
    ; 空闲超时(float),长期没收到消息被认为连接超时,默认-1。-1代表永不超时
    timeout=-1
    
    ; 初始读取buf大小,默认0。<=0代表从1开始,且延时分配;>0则立即分配
    recv_buf_init_size=1024
    ; 最大读取buf大小,默认-1。-1代表无限
    recv_buf_max_size=-1
    
    ; 最大连接数,默认-1。-1代表无限
    conns_max_size=-1
    
    ; 最大连接后备数,默认0。-1代表无限
    conns_reserve_max_size=0
    
    [node]
    ; 节点列表。格式: node_id:node_inner_host:node_inner_port,node_id:node_inner_host:node_inner_port
    ; 支持 kill -hup 热加载
    addr_list=1:127.0.0.1:28000
    
    ; 连接超时(float),-1代表永不超时。如果超时会断掉连接,重新连接
    timeout=10
    
    ; 等待发送buf的最大值。-1代表无限
    send_buf_max_size=-1
    
    [store]
    host=127.0.0.1
    port=6379
    ; 连接、接收超时(float),1为永不超时。如果超时会断掉连接,重新连接
    timeout=10
    ; 密码
    password=
    ; 存储database
    database=0
    ; 存储的key统一前缀
    key_prefix=maple:
    ; 等待处理的任务的最大值。-1代表无限
    tasks_max_size=-1
    
    [log]
    ; debug: 调试阶段使用,用来排查问题。
    ; info: 一些异常的数据,或者需要告知一下的信息。
    ; warn: 触发了某些限制,或者放任不管可能导致更严重的问题。
    ; error: 是程序的问题,并且严重等级较高,需要研发进行处理;或者是想记录在日志中的重要信息。
    log_level=error
    ; 日志类型,默认0
    ; 0: 固定文件最大数量
    ; 1: 按日期分日志,每天也会固定最大文件数量
    ; 2: 按小时,其他同上
    log_type=0
    ; log_dir 是相对于执行目录
    log_dir=logs
    file_prefix=fw
    ; 文件大小, 单位是M
    file_max_size=1024
    file_max_num=20
    
    [stat]
    file_name=stat_file
    

    配置都有注释,就不过多解释了

  4. 运行

    • 启动forwarder(在主目录下):

      ./bin/forwarder -c etc/config.ini
      
    • 重新加载 node_list

      kill -hup $pid
      
    • 查看统计(具体命令可以通过-h选项查看)

      ./tool_stat -f stat_file
      
    • 统计示例为:

      TRIGGERS                                 : 115
      STORES                                   : 115
      CONNECTED_STORES                         : 115
      NODES                                    : 2
      CONNECTED_NODES                          : 2
      TRIGGER_REQ                              : 13854829
      TRIGGER_CREATED                          : 293
      TRIGGER_CLOSED                           : 178
      TRIGGER_TIMEOUT                          : 0
      TRIGGER_CONN_OVERFLOW                    : 0
      STORE_REQ                                : 9541107
      STORE_RSP                                : 9541107
      STORE_RSP_FAIL                           : 0
      STORE_CONNECT                            : 293
      STORE_CONNECTED                          : 293
      STORE_CLOSED                             : 178
      STORE_TIMEOUT                            : 0
      STORE_TASK_OVERFLOW                      : 0
      STORE_PENDING_TASKS                      : 0
      NODE_REQ                                 : 13734384
      NODE_CONNECT                             : 2670
      NODE_CONNECTED                           : 4
      NODE_CLOSED                              : 2668
      NODE_TIMEOUT                             : 0
      NODE_SEND_BUF_OVERFLOW                   : 0
      NODE_PENDING_SEND_BUF                    : 0
      CMD_WRITE_TO_CLIENT                      : 3973486
      CMD_WRITE_TO_USERS                       : 9579355
      CMD_CLOSE_CLIENT                         : 12
      CMD_CLOSE_USERS                          : 19
      CMD_LOGIN_CLIENT                         : 26374
      CMD_LOGOUT_CLIENT                        : 1756
      CMD_WRITE_TO_WORKER                      : 273815
      CMD_CLEAR_CLIENT_TASKS                   : 12
      STORE_TIME_1_MS                          : 9409169
      STORE_TIME_3_MS                          : 53399
      STORE_TIME_5_MS                          : 27652
      STORE_TIME_10_MS                         : 32027
      STORE_TIME_30_MS                         : 17472
      STORE_TIME_50_MS                         : 667
      STORE_TIME_100_MS                        : 50
      STORE_TIME_300_MS                        : 378
      STORE_TIME_500_MS                        : 0
      STORE_TIME_1_S                           : 0
      STORE_TIME_3_S                           : 0
      STORE_TIME_5_S                           : 0
      STORE_TIME_10_S                          : 0
      STORE_TIME_MORE                          : 0
      
    • 统计项说明:

        TRIGGERS                   // trigger数量
      
        STORES                     // store数量
        CONNECTED_STORES           // store连接数量
      
        NODES                      // node数量
        CONNECTED_NODES            // node连接数量
      
        TRIGGER_REQ                // 来自trigger的请求数
        TRIGGER_CREATED            // trigger连接建立
        TRIGGER_CLOSED             // trigger连接断开
        TRIGGER_TIMEOUT            // trigger连接超时
        TRIGGER_CONN_OVERFLOW      // 超过trigger数量次数
      
        STORE_REQ                  // store发出的请求量,必须是真的发送(包括select database)
        STORE_RSP                  // store收到的请求量,包括所有的req对应的回应
        STORE_RSP_FAIL             // store收到的失败请求量
        STORE_CONNECT              // store连接建立尝试
        STORE_CONNECTED            // store连接建立成功,之所以不单独统计connect fail,是因为不是每次都能正常触发
        STORE_CLOSED               // store连接被关闭,包括未连接成功的状态下close,所以值可能大于STORE_CONNECTED
        STORE_TIMEOUT              // store连接超时(包括connect和recv)
        STORE_TASK_OVERFLOW        // 超过store任务队列大小而被丢弃的数量
        STORE_PENDING_TASKS        // 等待发送到store的任务
      
        NODE_REQ                   // node发出的请求量
        NODE_CONNECT               // node连接建立尝试
        NODE_CONNECTED             // node连接建立成功
        NODE_CLOSED                // node连接关闭,有可能是在尚未连接成功的状态下close
        NODE_TIMEOUT               // node连接超时(包括connect)
        NODE_SEND_BUF_OVERFLOW     // 超过发送buf大小而被抛弃的次数
        NODE_PENDING_SEND_BUF      // 等待发送到node的buf大小
      
        CMD_WRITE_TO_CLIENT        // CMD开头都是命令字
      
        STORE_TIME_1_MS            // store处理时长统计
      
  5. supervisor配置

    [program:forwarder]
    directory=/data/release/forwarder/
    command=/data/release/forwarder/bin/forwarder -c etc/config.ini
    user=root
    autorestart=true
    redirect_stderr=true
    

三. API介绍

I. Worker

  • Worker(box_class)
    构造函数

    • box_class 用来做网络包收发,可用

      from netkit.box import Box
      
  • register_blueprint(self, blueprint)
    注册blueprint

    • blueprint blueprint
  • run(host=None, port=None, debug=None, workers=None)
    启动worker

    • host gateway的inner host
    • port gateway的inner port
    • debug 是否debug,默认False
    • workers workers数量,默认为1
  • route(cmd)
    装饰器。注册cmd的处理函数。被注册函数格式为:

    def f(request):pass
    
    • cmd 命令字
  • create_worker(f)
    装饰器。当worker创建时触发。被注册函数格式为:

    def f(): pass
    
  • before_first_request
    装饰器。当第一个请求到来时触发。被注册函数格式为:

    def f(request): pass
    
  • before_first_request
    装饰器。当第一个请求到来时触发。被注册函数格式为:

    def f(request): pass
    
  • before_first_request
    装饰器。当第一个请求到来时触发。被注册函数格式为:

    def f(request): pass
    
  • before_request
    装饰器。当请求到来时触发。被注册函数格式为:

    def f(request): pass
    
  • after_request
    装饰器。当请求处理结束时触发。被注册函数格式为:

    def f(request): pass
    
  • before_response
    装饰器。当发送响应前触发。被注册函数格式为:

    def f(conn, response): pass
    

    其中 response 为最终的字节流格式(str)

  • after_response
    装饰器。当第一个请求到来时触发。被注册函数格式为:

    def f(conn, response, result): pass
    

    其中 result 为 True/False,分别代表发送成功/失败

  • close_conn
    装饰器。当worker与gateway的连接被关闭时触发。被注册函数格式为:

    def f(conn): pass
    
  • create_client
    装饰器。当客户端连接建立时触发。被注册函数格式为:

    def f(request): pass
    
  • close_client
    装饰器。当客户端连接被关闭时触发。被注册函数格式为:

    def f(request): pass
    
  • work_timeout
    任务处理最长时间,如果超过该值,子进程会自杀。
    默认为None,即不超时

  • stop_timeout
    进程停止超时。
    当使用 TERM 命令进行停止时,如果超过stop_timeout,那么就会给子进程强制发送KILL命令

  • rsp_once
    write_to_client 可以调用几次。如果为True代表只能调用一次,多次调用会抛出异常。
    默认为True

II. Blueprint

  • Blueprint(name)
    构造函数

    • name blueprint的名字。在request.endpoint中有用
  • 其他装饰器等

    与Worker中基本相同,并多了before_app_request这种app层级的回调。 顺序基本是:

    app.before_request
    blueprint.before_app_request
    blueprint.before_request
    f()
    blueprint.after_request
    blueprint.after_app_request
    app.after_request
    

III. Request

  • login_client(uid, userdata=None)
    标记用户为已登录

    • uid 用户ID
    • userdata 用户数据,主要存储标记位
  • logout_client()
    标记当前连接为未登录

  • write_to_client(data)
    写回响应,返回的cmd和sn均与传入的一致。
    data的格式可以为 Box,也可以为 dict。如果是dict,将会传入至Box的map函数中返回一个新box。

  • write_to_users(data_list)
    批量向用户组发送消息

    • data_list
      [(uids, box), (uids, box, userdata), (uids, box, userdata, exclude) ...] uid支持特殊值:

      CONNS_AUTHED : 所有已登录连接 CONNS_ALL : 所有连接 CONNS_UNAUTHED : 所有未登录连接

    userdata可不传,默认为0,conn.userdata & userdata == userdata exclude 代表排除的uid列表

  • write_to_worker(data)
    发送消息到worker进行处理

    • data 可以为 box 或者 dict
  • close_client()
    关闭当前连接

  • close_users(uids, userdata=None, exclude=None)
    批量关闭用户连接,uid同样支持特殊负值

  • clear_client_tasks()
    清空该连接产生的所有task。
    一般在client有大量异常消息时使用。

  • task
    gateway包装过的任务box

    • uid 登录用户ID,>0 为已登陆
    • userdata login_client 时传入的userdata
    • inner False代表从client端来的消息,True代表从trigger来的消息
  • cmd
    消息命令字,即box.cmd

  • endpoint
    如果是app级别,就是func_name;如果blueprint级别,就是blueprint_name.func_name

  • box
    消息box

  • interrupt
    是否中断处理,即不执行对应的处理函数直接返回,默认为False. 比如在before_request检验权限不符合,即可设置为False.

IV. Trigger

事件触发,其实在gateway端,trigger和worker是同一个连接类型。

注意: trigger是线程安全的,内部实现了加锁

其实trigger就是一个触发器,触发worker,也可以触发client。
但是常用的方法是,触发worker,让worker去做对应的事情,这样可以保证逻辑的独立

  • Trigger(box_class, host, port, ensure=True)
    构造函数

    • box_class 上面已解释
    • host gateway的inner host
    • port gateway的inner port
    • ensure 确保发送成功,只要连接失败或者发送失败就会一直重试。默认为True
  • write_to_users(data_list)
    与 request.write_to_users一致

  • write_to_worker(data, node_id=None)
    发送消息到worker进行处理

    • data 可以为 box 或者 dict
    • node_id 如果连接的是forwarder的话,node_id可以指定希望转发到的node
  • close_users(uids, userdata=None, exclude=None)
    与 request.close_users 一致

四. 技术实现

I. gateway

gateway使用epoll模型,单进程单线程。

绑定两个端口,分别接入外网连接和内网连接。

外网连接即客户端连接。

内网连接里面分worker和trigger,worker是可以接受任务和返回消息的,而trigger只能返回消息。

trigger可以升级为worker,在跟gateway 索要 task 后。

所有的数据buf,都经过一层封装,再转发出去。
所以,创建一个新的box类型: Task

多了几个字段: node_id, tag, conn_id, uid, userdata, client_ip_num 等

  • node_id 指定client所属的node,如果非cluster模式,应该为0
  • tag 为了防止gw重启导致conn_id重新从0计数,从而出现混淆
  • conn_id 连接ID,自增的long long类型,确保是唯一的
  • uid 登录的用户id
  • userdata 登录用户的属性
  • client_ip_num 客户端ip的int型表示(使用网络序),worker端可以通过 request.task.client_ip 来取到对应的字符串类型

在发送数据时,判断连接是否match的规则是:

  1. 判断 node_id 是否相等,确保task与gateway属于同一node。
  2. 判断 tag 是否相等,确保gateway没有重启过
  3. 判断 conn_id 对应的conn是否存储并合法

II. worker

worker使用单进程单线程的模型,并采取主动向gateway索要任务的方式。这样可以有效的保证负载均衡。

worker中比较细节的是对信号的处理,具体可以参看代码,这里不再赘述.

III. forwarder

forwarder使用epoll模型,单进程单线程。

为了能够异步使用hiredis库,在hiredis的异步接口上做了一层epoll接口的封装。

forwarder的设计上,需要确保的几个关键点在于:

  1. 同一个trigger发过来的消息,必须保证是同样的顺序发往gateway
  2. 不同trigger之间,不能互相影响

对于第一点,就要求单个trigger的消息处理必须有序,不能并发。

对于第二点,则要求多个trigger之间不能共享消息处理类。
比如trigger有时候需要访问store来获取转发的node列表(write_to_users/close_users),此时就不能使用store连接池。

所以最终的实现是,当某个trigger建立连接后,就会自动为它创建一条与store的连接,并仅供该trigger使用。

而当trigger连接被关闭时,如果store的任务也已经全部处理完,则store的连接也一起关闭;如果此时store还有任务要处理,则不跟随trigger一起关闭,而是等任务处理完后,自行关闭。

所以,我们使用trigger应该尽量使用长连接的方式,并且当前python代码中的trigger也是这么实现的。

五. 最佳实践

  1. 集群(cluster)部署

    cluster模式下,我们按照node来划分区域,每个node内部包含 一个gateway + 一组worker。
    所以,gateway的配置中有一个 node.id,就是用来指定自己所属于的node。
    当然,这个node更多的是逻辑上的区分,在客户端看来,所有用户是共服的。

    forwarder会作为转发server,负责将发送给client的消息,按照node_id转发到对应的node内部的gateway上。

    而由于forwarder本身是无状态的,所以本身如果需要扩容,也可以直接平行扩容。

    当然,如果要使用forwarder的话,那么类似 request.write_to_users/request.close_users 这种函数调用就都不能使用了,取而代之的是 trigger.write_to_users/trigger.close_users,或者使用 virtual_request 来模拟request。
    具体代码参看 cluster_demo

  2. 防御攻击

    gateway内部配有简单的针对单个连接请求频率的限制,以及单个IP建立连接的频率限制。

    但是达到频率限制之后,并不会对连接或者IP有任何惩罚处理。

    所以我编写了 maple_guard 来进行异步的防御,即检测到恶意连接或者IP,就会进行下线并封IP的处理。

    但是因为该工具是异步处理,所以gateway本身的频率控制还是要配置,只是可以稍微宽松一点。

六. 相关工具

  1. maple_timer 打印每个命令字的处理时长比例
  2. xstat 上报worker统计数据至statsd
  3. maple_cluster_statsd 上报gateway/forwarder统计数据至statsd
  4. maple_guard 为gateway加强防御攻击的能力

七. 特殊说明

  1. 由于部署gateway的机器要直连公网,所以务必不要将tcp_tw_recycle和tcp_timestamps同时开启,否则会导致connect失败的问题

  2. 目前gateway编译使用的protobuf为2.5.0,同理maple依赖的protobuf需要>=2.5.0。而貌似2.6.0以上版本执行setup.py会访问墙外的地址,所以国内执行 pip install protobuf==2.5.0 即可。

  3. 为什么gateway上大量client连接建立再断开,内存上涨之后不下降呢?

    本来以为是conns_reserve_max_size的问题,但即使设置 conns_reserve_max_size=0,依然如此。

    做了个测试,再次建立同样多的连接,会发现内存并没有上涨。

    最后确定了原因是因为 deque 在pop_front之后并没有把deque内部分配的内存释放。 也就是说如果 m_pendingTasks size 达到过 1w,那么即使pop_front完已经empty,占用的内存依然和 1w 个元素一样多。 测试代码如下:

    int main(int argc, char **argv) {
        deque<int> d;
        printf("before alloc\n");
        for(int i=0; i<=10000000; i++) {
            d.push_back(i);
        }
        printf("after alloc\n");
    
        printf("sleep begin\n");
        sleep(10);  // 40m
        printf("sleep end\n");
    
        printf("before free\n");
        while(1) {
            if (d.front()) {
                d.pop_front();
            }
            else {
                break;
            }
        }
        printf("after free\n");
    
        printf("sleep begin\n");
        sleep(10); // 40m
        printf("sleep end\n");
    
        return 0;
    }
    

    网上有很多方法来解决这个问题,比如 swap, shrink_to_fit 等等,但我们的config中可以指定 tasks_max_size,所以就不对这里的内存做回收了,免得降低性能。

  4. 为什么BaseConnection内部永远监听读事件,而不是将读事件与写事件互斥监听?

    举个例子就明白了.

    假设client向server send了一个很大的buf,而当server recv到一半的时候,需要向client send一个很大的buf。

    那么如果server此时取消监听读事件,只监听写事件的话,那么如果server send的buf超过了缓冲区大小;而client send的buf由于没有被server继续recv,导致也超过了缓冲区大小。

    两边就永久等待对方了。

八. 使用maple的项目

  1. kpush 开源android push解决方案

九. 开发计划

  1. worker支持group

    放弃支持group。
    对于有分组需求的场景,可以将worker进化为broker,在server层进行分组。详见 python/maple/examples/broker_server_demo。

    如果要支持的话,涉及几个点:

    1. gateway需要配置cmd与group_id的映射关系,没有指定的cmd对应group_id为0。
    2. gateway支持重新加载配置的信号,否则修改group_id还要重启gateway。
    3. pendingMsgs需要按照group_id分组。
    4. pengingMsgs要针对每个group_id限制最大数量,先进先出。
    5. gateway对pendingMsgs的统计也要重新处理。 可以按照分为多个stat文件。
    6. worker在每次请求数据的时候,将对应的group_id传过来。
  2. gateway支持cluster
  3. 将guard从gateway移到业务层
  4. 支持cluster之后,将gateway的双epoll改为单epoll,因为gateway已经不再是瓶颈
  5. outer和worker支持频率限制,outer的直接从guard迁移过来即可,去掉封号逻辑,而worker则要修改一下,hit成功后要能获取到还剩多少时间经过该周期。
  6. speedlimit和timer内部使用毫秒计时