easyswoole - 基于swoole扩展实现的一款高性能php框架

Queue 介紹

原理

EasySwoole 封裝實現了一個輕量級的隊列,默認使用 Redis 作為隊列驅動器。

用戶可以自己實現一個隊列驅動器來實現隊列,用 kafka 作為隊列驅動器或者 其他驅動器方式 作為隊列驅動器,來進行存儲。

從上可知,Queue 并不是一個單獨使用的組件,它更像一個對不同驅動的隊列進行統一封裝的門面組件。

Queue 組件當前最新穩定版本為 3.x。

舊版本 (2.1.x) 的 Queue 組件的使用,請看 Queue 2.1.x

組件要求

  • ext-swoole: >=4.4.0
  • easyswoole/component: ^2.0
  • easyswoole/redis-pool: ~2.2.0

安裝方法

composer require easyswoole/queue 3.x

倉庫地址

easyswoole/queue 3.x

基本使用

默認自帶的隊列驅動為 Redis 隊列。這里簡單列舉 2 種用戶可使用的方式:

  • 在框架的任意位置進行生產和消費隊列任務。
  • 在框架的任意位置進行生產隊列任務, 然后在自定義進程中進行消費任務。

在框架中進行生產和消費任務

創建隊列

use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;

// 配置 Redis 隊列驅動器
$redisConfig = new RedisConfig([
    'host' => '127.0.0.1', // 服務端地址 默認為 '127.0.0.1'
    'port' => 6379, // 端口 默認為 6379
    'auth' => '', // 密碼 默認為 不設置
    'db'   => 0, // 默認為 0 號庫
]);

// 創建隊列
$queue = new Queue(new RedisQueue($redisConfig));

普通生產任務

$queue 為上述創建隊列中得到的隊列對象。

// 創建任務
$job = new Job();

// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 生產普通任務
$queue->producer()->push($job);

普通消費任務

$queue 為上述創建隊列中得到的隊列對象。

// 消費任務
$job = $queue->consumer()->pop();

// 或者是自定義進程中消費任務(具體使用請看下文自定義進程消費任務完整使用示例)
$queue->consumer()->listen(function (Job $job){
    var_dump($job);
});

生產延遲任務

$queue 為上述創建隊列中得到的隊列對象。

// 創建任務
$job = new Job();

// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 設置任務延后執行時間
$job->setDelayTime(5);

// 生產延遲任務
$queue->producer()->push($job);

生產可信任務

// 創建任務
$job = new Job();

// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 設置任務重試次數為 3 次。任務如果沒有確認,則會執行三次
$job->setRetryTimes(3);

// 如果5秒內沒確認任務,會重新回到隊列。默認為3秒
$job->setWaitConfirmTime(5);

// 投遞任務
$queue->producer()->push($job);

// 確認一個任務
$queue->consumer()->confirm($job);

在框架中生產任務和自定義進程中消費任務

  • 注冊隊列驅動器
  • 設置消費進程
  • 生產者投遞任務

定義一個隊列

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;

class MyQueue extends Queue
{
    use Singleton;
}

定義消費進程

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;

class QueueProcess extends AbstractProcess
{
    protected function run($arg)
    {
        go(function (){
            MyQueue::getInstance()->consumer()->listen(function (Job $job){
                var_dump($job->getJobData());
            });
        });
    }
}

支持多進程、多協程消費

注冊隊列驅動器、消費進程及設置生產者投遞任務

<?php

namespace EasySwoole\EasySwoole;

use App\Utility\MyQueue;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\Queue\Job;

class EasySwooleEvent implements Event
{
    public static function initialize()
    {
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // redis pool 使用請看 redis 章節文檔
        $redisConfig = new \EasySwoole\Redis\Config\RedisConfig(
            [
                'host' => '127.0.0.1', // 服務端地址 默認為 '127.0.0.1'
                'port' => 6379, // 端口 默認為 6379
                'auth' => '', // 密碼 默認為 不設置
                'db'   => 0, // 默認為 0 號庫
            ]
        );
        // 配置 隊列驅動器
        $driver = new \EasySwoole\Queue\Driver\RedisQueue($redisConfig, 'easyswoole_queue');
        MyQueue::getInstance($driver);
        // 注冊一個消費進程
        $processConfig = new \EasySwoole\Component\Process\Config([
            'processName' => 'QueueProcess', // 設置 自定義進程名稱
            'processGroup' => 'Queue', // 設置 自定義進程組名稱
            'enableCoroutine' => true, // 設置 自定義進程自動開啟協程
        ]);
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new QueueProcess($processConfig));
        // 模擬生產者,可以在任意位置投遞
        $register->add($register::onWorkerStart, function ($server, $id) {
            if ($id == 0) {
                Timer::getInstance()->loop(3000, function () {
                    $job = new Job();
                    $job->setJobData(['time' => \time()]);
                    MyQueue::getInstance()->producer()->push($job);
                });
            }
        });
    }
}

進程安全退出問題請看 自定義進程 章節

控制器使用

以在 http 服務中為例,使用示例代碼如下:

<?php

namespace App\HttpController;

use App\Utility\MyQueue;
use EasySwoole\Http\AbstractInterface\Controller;
use EasySwoole\Http\Message\Status;
use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;

class Index extends Controller
{
    // 生產普通任務
    public function producer1()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創建任務
        $job = new Job();

        // 設置任務數據
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        var_dump('producer1 => ');
        var_dump($job->getJobData());

        // 生產普通任務
        $produceRes = $queue->producer()->push($job);
        if (!$produceRes) {
            $this->writeJson(Status::CODE_OK, [], '隊列生產普通任務失敗!');
        } else {
            $this->writeJson(Status::CODE_OK, [], '隊列生產普通任務成功!');
        }
    }

    // 生產延遲任務
    public function producer2()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創建任務
        $job = new Job();

        // 設置任務數據
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        // 設置任務延后執行時間
        $job->setDelayTime(5);

        var_dump('producer2 => ');
        var_dump($job->getJobData());

        // 生產延遲任務
        $produceRes = $queue->producer()->push($job);
        if (!$produceRes) {
            $this->writeJson(Status::CODE_OK, [], '隊列生產延遲任務失敗!');
        } else {
            $this->writeJson(Status::CODE_OK, [], '隊列生產延遲任務成功!');
        }
    }

    // 生產可信任務
    public function producer3()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創建任務
        $job = new Job();

        // 設置任務數據
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        var_dump('producer3 => ');
        var_dump($job->getJobData());

        // 設置任務重試次數為 3 次。任務如果沒有確認,則會執行三次
        $job->setRetryTimes(3);

        // 如果5秒內沒確認任務,會重新回到隊列。默認為3秒
        $job->setWaitConfirmTime(5);

        // 投遞任務
        $queue->producer()->push($job);

        // 確認一個任務
        $queue->consumer()->confirm($job);
    }

    // 消費任務
    public function consumer()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        ### 消費任務
        // 獲取到需要消費的任務
        $job = $queue->consumer()->pop();

        if (!$job) {
            $this->writeJson(Status::CODE_OK, [], '沒有隊列任務需要消費了!');
            return false;
        }

        // 獲取需要消費的任務的數據
        $jobData = $job->getJobData();
        var_dump($jobData);
    }
}

進階使用

我們可以自定義驅動,實現 RabbitMQKafka 等消費隊列軟件的封裝。

用戶需要定義類,并實現 \EasySwoole\Queue\QueueDriverInterface 接口的幾個方法即可。該接口的詳細實現請看下文。

QueueDriverInterface 接口類實現

<?php

namespace EasySwoole\Queue;

interface QueueDriverInterface
{
    public function push(Job $job,float $timeout = 3.0): bool;

    public function pop(float $timeout = 3.0, array $params = []): ?Job;

    public function info(): ?array;

    public function confirm(Job $job,float $timeout = 3.0): bool;
}

相關倉庫

EasySwoole 中利用 Redis 實現消息隊列

如何利用 EasySwoole 多進程多協程 Redis 隊列實現爬蟲

主站蜘蛛池模板: 萍乡市信源电瓷制造有限公司--官网|萍乡市信源电瓷|萍乡电瓷厂|萍乡绝缘子-萍乡市信源电瓷制造有限公司 | 铅板-济南鑫玉防辐射材料有限公司 | 企业微信服务商-企业微信crm营销|认证-[朝阳企客通]一款企微管家服务型产品 | 江苏减速机_常州减速机_摆线减速机_摆线针轮减速机 | 移动厕所_真空环保厕所_环保厕所_景区生态厕所_雨施捷移动厕所生产厂家 | 王者荣耀/和平精英扫码上号登录器_微信安卓苹果扫码上号登录软件 - 上号宝扫码登录器 | 济南时代,济南时代试验机,试金老品牌-济南时代试验机技术有限公司 | 上海栋彤物流有限公司-可信赖的物流服务提供商 | 上海画册设计-上海宣传册设计-产品手册设计-企业画册设计公司 | 免费的PPT幻灯片演示制作软件,动画视频及课件制作软件 - Focusky万彩演示大师官网 | 有用的旅游攻略_格林旅行网 | 深圳市佳顺优印印刷有限公司,佳顺优印,画册印刷,海报印刷,封套印刷,手提袋印刷,包装盒印刷,彩盒印刷,无碳纸印刷,不干胶印刷,信封印刷,便笺印刷,笔记本印刷,台历印刷,挂历印刷,国际会展中心附近印刷厂,宝安印刷厂,宝安教材印刷厂 | 湖南视频会议设备厂家|长沙视频会议设备安装型号齐全找湖南日恒智能工程有限公司 | 湿电电源,静电除尘电源,电捕焦电源-山东仕瑞电气科技有限公司 | 永磁电机,防爆电机,调速电机,永磁同步电机_河南华信电机股份有限公司 | 云德律师事务所_全国律师服务咨询服务企业 | 望崖阁书法培训班-杭州书法高考培训班2023届招生简章-优清画院 | 泰安兴润建材有限公司,泰安井盖定做,泰安警示桩定做,泰安雨水篦子定做,泰安操场篦子定做,泰安标志牌定做 | 首页-青特集团官方网站| 泊头压滤机-隔膜-洗沙-厢式-板框压滤机-河北巨盛压滤机制造有限公司 | 山东万利精密机械制造有限公司-高速金属圆锯机,数控高速圆锯机,高速圆锯机生产厂家 | 山东腾达源金属材料官网-耐磨板nm400,nm400耐磨钢板,nm500耐磨钢板切割,耐候板批发,高强板现货【价格】 | 网站客服系统_在线客服系统【莺语客服】 | 消防车厂家_东风水罐泡沫消防车价格图片吨位-湖北新东日专用汽车有限公司 | 上海层傲传动设备有限公司 - 工业皮带,输送带,传动带 | 中国国际精细化工展览会—官网 | 河北瑞峰医疗-河北护理床-河北医用病床-河北养老院护理床-河北护理床厂家-河北病床厂家-河北瑞峰医疗 | 康拓威技术(深圳)有限公司|Theia镜头代理商|安讯士AXIS摄像机|安讯士监控系统|博世BOSCH监控|博世会议系统|索尼SONY监控|松下PANASONIC监控|三星韩华SAMSUNG监控|霍尼韦尔Honeywell|海康|大华|华为监控|Theia无畸变镜头|AXIS监控|安讯视摄像机 | 眉山净源居环保科技有限公司,眉山除甲醛公司,眉山甲醛治理,眉山保洁服务,眉山家政保洁,眉山家电维修 - 眉山净源居环保科技有限公司,眉山除甲醛公司,眉山甲醛治理,眉山保洁服务,眉山家政保洁,眉山家电维修 | 消字号牙膏代加工|面膜代加工|凝胶贴牌|漱口水贴牌-南京三盾药业有限公司-消字号牙膏代加工|面膜代加工|凝胶贴牌|漱口水贴牌-南京三盾药业有限公司 | 泊头市鸿海泵业有限公司--导热油泵,高温油泵,沥青保温泵,圆弧泵,齿轮油泵,高粘度泵,自吸离心油泵,罗茨油泵为主的专业生产厂家 | 呼吸家官网|肺功能检测仪生产厂家|国产肺功能仪知名品牌|肺功能检测仪|肺功能测试仪|婴幼儿肺功能仪|弥散残气肺功能仪|肺功能测试系统|广州红象医疗科技有限公司|便携式肺功能仪|大肺功能仪|呼吸康复一体机|儿童肺功能仪|肺活量计|医用简易肺功能仪|呼吸康复系统|肺功能仪|弥散肺功能仪(大肺)|便携式肺功能检测仪|肺康复|呼吸肌力测定肺功能仪|肺功能测定仪|呼吸神经肌肉刺激仪|便携式肺功能 | 郑州阳光房|封阳台|钢结构【河南郑州如意阳光房门窗有限公司】 | 集装箱零配件_不锈钢丝绳厂家_镀锌铁链条_合页非标件定制_上海英鑫多实业有限公司 | 耐磨焊丝厂-堆焊焊材研发-修复工程-天津舜荣焊材官网 | 气体灭火安装,气体灭火系统安装,七氟丙烷灭火系统安装,消防气体灭火控制系统 | 山东致合必拓环保科技股份有限公司 | 长沙物流公司|湖南货物运输公司|长沙第三方物流公司-国联物流 湖南第三方物流专家 | 铝型材定制_东莞铝型材_散热器铝型材_工业铝合金型材挤压加工生产厂家价格-中亚铝业 | 心理咨询室设备_音乐放松椅_心理测评系统_情绪宣泄设备厂家 | 商标注册查询_商标注册代理公司_专利申请_版权登记-源智知识产权 |