Jober 工作界面:
产生背景:
项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,显然会有更好的用户体验。 当然实际情况远不止这一点,我们可以通过Jober的配合完成 “订单超时关闭、自动评论、QQ邮箱定时发送功能等等”。
Jober 是什么
通过Swoole 官方提供Swoole/Process API才得以完成,Jober 相当于每个子进程。如果你开启8个Jober,那么就是8个进程同时消费。
核心特点
- 命令行:快速实现消息中间件消费、支持守护进程、常驻内存;
- 自动加载:基于 PSR-4 ,完全使用 Composer 构建;
- 模块化:支持 Composer ,可以很方便的使用第三方库;
- 客户端:支持Beanstalk, 理论上来说还支持RabbitMQ并且可以多进程爬虫等等;
- Jober 管理器特性:
- Jober 管理器可单独分配,每个队列分配不同Worker数量完成任务;具体:配置文件Queue
- 每个Worker进程均有一个定时器,可通过配置文件Crontab(毫秒级) 属性完成;
- 每个Worker进程完程5000个任务,平滑退出重新添加进程负责这个队列;[ 防止内存泄漏 ]
- Worker进程异常退出,Master进程自动新建W接管当前队列;
- Jober 管理器接收到用户STOP信号,逐个平滑退出;
- Jober 服务已运行状态中,添加新队列并运行;举个栗子:jober new mailerbox:2
- jober new mailerbox:2,给队列添加进程提高运行效率;
环境要求
- PHP >= 7.2 语言版本号
- Swoole >= 4.0.0 通信扩展
- Seaslog >= 2.0.2 日志扩展
快速开始
推荐使用 composer 安装。
composer create-project jober/jober
启动Jober:
$> cd Jober/app
$> php jober start
如果你足够幸运的话,那么将会看到如下界面:
_ _
_ | | ___ | |__ ___ _ _
| || | / _ \ | '_ \ / -_) | '_|
\__/ \___/ |_.__/ \___| |_|
Jober: 1.0.0, php: 7.2.18, swoole: 4.3.4
2019-06-03 11:31:33 Jober[INFO]#> Jober has been started. (master PID: 20103)
2019-06-03 11:31:33 Jober[INFO]#> Worker(20104) started ¦ tube(mailerbox) watched ¦ crontab(2/s).
2019-06-03 11:31:33 Jober[INFO]#> Worker(20105) started ¦ tube(mailerbox) watched ¦ crontab(2/s).
2019-06-03 11:31:33 Jober[INFO]#> Worker(20106) started ¦ tube(mailerbox) watched ¦ crontab(2/s).
2019-06-03 11:31:33 Jober[INFO]#> Worker(20107) started ¦ tube(mailerbox) watched ¦ crontab(2/s).
Jober Command:
Usage: php jober [options]
Options:
start/Start debug mode.
new <queue:worker_nums>/Add multiple processes to the queue.
start -d/Daemon mode runs services.
stop/Smooth stop services.
restart/Smooth stop and started service
Jober 队列配置参数:
tube | jobClass | worker_num | crontab |
---|---|---|---|
MailerBox | app\\Jobs\\MailboxJob | 4 | 2 |
Order | app\\Jobs\\OrderJob | 8 | 3 |
队列名称 | 自定义Job类 | 数量 | 扫描频率 |
Jober DEBUG 参数说明:
worker | status | tube | max_tasks | crontab |
---|---|---|---|---|
pid(15438) | exited/reboot | tube(mailerbox) | max_tasks(2) | 2 |
进程号 | 重启/退出 | 负责管道/队列(mailerbox) | 完成2个任务退出重启 | 每2秒读取一次 |
解读上面语句含义:
worker(15438): 负责队列(mailerbox), max(2)完成两个任务后自动退出并重启,自动创建进程接管这个队列.
worker(15438): 负责队列(mailerbox), 2秒查看一次队列,是否有数据需要处理.
邮箱异步发送案例:
<?php
namespace app\Jobs;
use app\service\Mailer as MailerService;
use app\service\MailStats;
use app\utils\MailBox;
use Jober\Colors\ColorText;
use Jober\Core\Console;
use Jober\Core\Log;
use Jober\Iface\IJob;
use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Connection;
class MailboxJob implements IJob
{
/**
* @return bool
* @return 标识这个任务处理成功, true - false
* 每个Worker 处理成功5000个任务,自动退出重新拉取新的Worker (防止内存泄漏)
* @throws \xobotyi\beansclient\Exception\Connection
* @throws \xobotyi\beansclient\Exception\Client
*/
public function doJob(): bool
{
$connection = new Connection('127.0.0.1', 11300, 2, true);
$beansClient = new BeansClient($connection);
try {
$job = $beansClient->watchTube('MailerBox')
->reserve(3);
$data = json_decode($job->payload, true);
if (!empty($data)) {
if (MailBox::send($data)) {
//todo:: 邮件发送逻辑
// (new MailerService())->log($data, MailStats::SENT_SUCCESS); # 发送成功记录一条日志
Console::println(date('Y-m-d ')." 异步邮件已发送 -> 内容:".$data['body']
,ColorText::FG_CYAN);
// 删除队列中的已发送邮件
$job->delete();
Console::println(date('Y-m-d ')." 异步队列中删除 -> 内容:".$data['to']
,ColorText::FG_RED);
return true;
}
// (new MailerService())->log($data, MailStats::SENT_ERROR); # 发送失败记录日志
}
}
catch (\Exception $e) {
/**
* 1. 异常必须捕捉,报错会不断重启Jober
* 2. 记录到日志文件
*/
// Log::error("异常原因:".$e->getMessage());
}
return false;
}
}
License
Apache License Version 2.0, http://www.apache.org/licenses/