jober/jober

消息队列处理框架,支持客户端Beanstalk、RabbitMQ等


Keywords
rabbitmq, beanstalk, worker, swoole, Jober
License
Apache-2.0

Documentation

Jober 工作界面:

image

产生背景:

项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,显然会有更好的用户体验。 当然实际情况远不止这一点,我们可以通过Jober的配合完成 “订单超时关闭、自动评论、QQ邮箱定时发送功能等等”。

Jober 是什么

通过Swoole 官方提供Swoole/Process API才得以完成,Jober 相当于每个子进程。如果你开启8个Jober,那么就是8个进程同时消费。

核心特点

  • 命令行:快速实现消息中间件消费、支持守护进程、常驻内存;
  • 自动加载:基于 PSR-4 ,完全使用 Composer 构建;
  • 模块化:支持 Composer ,可以很方便的使用第三方库;
  • 客户端:支持Beanstalk, 理论上来说还支持RabbitMQ并且可以多进程爬虫等等;
  • Jober 管理器特性:
    • Jober 管理器可单独分配,每个队列分配不同Worker数量完成任务;具体:配置文件Queue
    • 每个Worker进程均有一个定时器,可通过配置文件Crontab(毫秒级) 属性完成;
    • 每个Worker进程完程5000个任务,平滑退出重新添加进程负责这个队列;[ 防止内存泄漏 ]
    • Worker进程异常退出,Master进程自动新建W接管当前队列;
    • Jober 管理器接收到用户STOP信号,逐个平滑退出;
    • Jober 服务已运行状态中,添加新队列并运行;举个栗子:jober new mailerbox:2
    • jober new mailerbox:2,给队列添加进程提高运行效率;

环境要求

  • PHP >= 7.2 语言版本号
  • Swoole >= 4.0.0 通信扩展
  • Seaslog >= 2.0.2 日志扩展

快速开始

推荐使用 composer 安装。

  composer create-project jober/jober

启动Jober:

  $> cd Jober/app
  $> php jober start

如果你足够幸运的话,那么将会看到如下界面:

     _         _                
  _ | |  ___  | |__   ___   _ _ 
 | || | / _ \ | '_ \ / -_) | '_|
  \__/  \___/ |_.__/ \___| |_|  
 
 Jober: 1.0.0, php: 7.2.18, swoole: 4.3.4 
 2019-06-03 11:31:33 Jober[INFO]#> Jober has been started. (master PID: 20103) 
 2019-06-03 11:31:33 Jober[INFO]#> Worker(20104) started ¦ tube(mailerbox) watched ¦ crontab(2/s). 
 2019-06-03 11:31:33 Jober[INFO]#> Worker(20105) started ¦ tube(mailerbox) watched ¦ crontab(2/s). 
 2019-06-03 11:31:33 Jober[INFO]#> Worker(20106) started ¦ tube(mailerbox) watched ¦ crontab(2/s). 
 2019-06-03 11:31:33 Jober[INFO]#> Worker(20107) started ¦ tube(mailerbox) watched ¦ crontab(2/s). 


Jober Command:

Usage: php jober [options]
    Options:
    start/Start debug mode.
    new <queue:worker_nums>/Add multiple processes to the queue.
    start -d/Daemon mode runs services.
    stop/Smooth stop services.
    restart/Smooth stop and started service

Jober 队列配置参数:

tube jobClass worker_num crontab
MailerBox app\\Jobs\\MailboxJob 4 2
Order app\\Jobs\\OrderJob 8 3
队列名称 自定义Job类 数量 扫描频率

Jober DEBUG 参数说明:

worker status tube max_tasks crontab
pid(15438) exited/reboot tube(mailerbox) max_tasks(2) 2
进程号 重启/退出 负责管道/队列(mailerbox) 完成2个任务退出重启 每2秒读取一次

解读上面语句含义:

worker(15438): 负责队列(mailerbox), max(2)完成两个任务后自动退出并重启,自动创建进程接管这个队列.
worker(15438): 负责队列(mailerbox), 2秒查看一次队列,是否有数据需要处理.

邮箱异步发送案例:

<?php
namespace app\Jobs;
use app\service\Mailer as MailerService;
use app\service\MailStats;
use app\utils\MailBox;
use Jober\Colors\ColorText;
use Jober\Core\Console;
use Jober\Core\Log;
use Jober\Iface\IJob;
use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Connection;
class MailboxJob implements IJob
{
    /**
     * @return bool
     * @return 标识这个任务处理成功, true - false
     * 每个Worker 处理成功5000个任务,自动退出重新拉取新的Worker (防止内存泄漏)
     * @throws \xobotyi\beansclient\Exception\Connection
     * @throws \xobotyi\beansclient\Exception\Client
     */
    public function doJob(): bool
    {
        $connection = new Connection('127.0.0.1', 11300, 2, true);
        $beansClient = new BeansClient($connection);
        try {
            $job = $beansClient->watchTube('MailerBox')
                ->reserve(3);
            $data = json_decode($job->payload, true);
            if (!empty($data)) {
                if (MailBox::send($data)) {
                    //todo:: 邮件发送逻辑
//                    (new MailerService())->log($data, MailStats::SENT_SUCCESS); # 发送成功记录一条日志
                    Console::println(date('Y-m-d ')." 异步邮件已发送 -> 内容:".$data['body']
                        ,ColorText::FG_CYAN);
                    // 删除队列中的已发送邮件
                    $job->delete();
                    Console::println(date('Y-m-d ')." 异步队列中删除 -> 内容:".$data['to']
                        ,ColorText::FG_RED);
                    return true;
                }
//                (new MailerService())->log($data, MailStats::SENT_ERROR); # 发送失败记录日志
            }
        }
        catch (\Exception $e) {
            /**
             * 1. 异常必须捕捉,报错会不断重启Jober
             * 2. 记录到日志文件
             */
//            Log::error("异常原因:".$e->getMessage());
        }
        return false;
    }
}

License

Apache License Version 2.0, http://www.apache.org/licenses/