easyswoole - 基于swoole扩展实现的一款高性能php框架

FastCacheQueue

EasySwoole FastCache組件在>= 1.2.1的時候新增類似· beanstalkd消息隊列 ·特性。

  • 可以創建多個queue
  • 支持延遲投遞
  • 任務超時恢復執行
  • 任務重發執行
  • 任務最大重發次數
  • 支持putJob、delayJob、releaseJob、reserveJob、buryJob、kickJob等命令

基本使用

FastCacheQueue依托于FastCache,具體安裝請查看FastCache

服務注冊

更新后,EasySwoole\FastCache\CacheProcessConfig類多出以下方法

/** 設置進程最大內存 默認512M */
public function setMaxMem(string $maxMem): void
/** 設置消息隊列保留時間 默認60s (取出任務后沒有及時確認會重新放回隊列) */
public function setQueueReserveTime(int $queueReserveTime): void
/** 設置消息隊列最大重發次數 默認10 達到次數后重發將會被丟棄 */
public function setQueueMaxReleaseTimes(int $queueMaxReleaseTimes): void

開始使用

下文示例代碼的Job和Cache都使用以下命名空間

use EasySwoole\FastCache\Cache;
use EasySwoole\FastCache\Job;

投遞任務

投遞成功之后 將會返回該任務的jobId。

沒有失敗情況,除非fastCache注冊注冊失敗。

$job = new Job();
$job->setData("siam"); // 任意類型數據
$job->setQueue("siam_queue");
$jobId = Cache::getInstance()->putJob($job);
var_dump($jobId);

取出任務

可以開啟自定義進程當消費者,循環監聽隊列,執行任務處理。

注意:任務執行完成一定要有一個結果。要么刪除該任務,要么重發。否則當任務取出一定時間后(默認60s)會自動放回隊列中。

$job = Cache::getInstance()->getJob('siam_queue');// Job對象或者null
if ($job === null){
    echo "沒有任務\n";
}else{
    // 執行業務邏輯
    var_dump($job);
    // 執行完了要刪除或者重發,否則超時會自動重發
    Cache::getInstance()->deleteJob($job);
}

清空ready任務隊列


 var_dump(Cache::getInstance()->flushReadyJobQueue('siam_queue'));

 var_dump(Cache::getInstance()->jobQueueSize('siam_queue'));

延遲執行任務

$job = new Job();
$job->setData("siam");
$job->setQueue("siam_queue_delay");
$job->setDelay(5);// 延時5s
$jobId = Cache::getInstance()->putJob($job);
var_dump($jobId);
// 馬上取會失敗 隔5s取才成功
$job = Cache::getInstance()->getJob('siam_queue_delay');
var_dump($job);

刪除任務

可以是由getJob取出的對象,也可以自己聲明Job對象,傳入JobId來刪除。

$job = new Job();
$job->setJobId(1);
$job->setQueue('siam_queue_delay');
Cache::getInstance()->deleteJob($job);

任務重發

任務執行失敗,或者某些場景需要重新執行,則可以重發。

重發時,可以指定是否延遲執行。

// get出來的任務執行失敗可以重發
$job = new Job();
$job->setData("siam");
$job->setQueue("siam_queue");
$jobId = Cache::getInstance()->putJob($job);

$job = Cache::getInstance()->getJob('siam_queue');

if ($job === null){
    echo "沒有任務\n";
}else{
    // 執行業務邏輯
    $doRes = false;
    if (!$doRes){
        // 業務邏輯失敗,需要重發  
        // 如果延遲隊列需要馬上重發,在這里需要清空delay屬性
        // $job->setDelay(0);
        // 如果普通隊列需要延遲重發,則設置delay屬性
        // $job->setDelay(5);
        $res = Cache::getInstance()->releaseJob($job);
        var_dump($res);
    }else{
        // 執行完了要刪除或者重發,否則超時會自動重發
        Cache::getInstance()->deleteJob($job);
    }
}

返回現在有什么隊列

$queues = Cache::getInstance()->jobQueues();
var_dump($queues);

返回某個隊列的長度

$queueSize = Cache::getInstance()->jobQueueSize("siam_queue");
$queueSize2 = Cache::getInstance()->jobQueueSize("siam_queue_delay");
var_dump($queueSize);
var_dump($queueSize2);

清空隊列 可指定名稱

// 清空全部
$res = Cache::getInstance()->flushJobQueue();
var_dump($res);

// 清空siam_queue隊列
$res = Cache::getInstance()->flushJobQueue('siam_queue');
var_dump($res);

將任務改為延遲狀態

//添加任務
$job = new Job();
$job->setData("LuffyQAQ");
$job->setQueue("LuffyQAQ_queue_delay");
$jobId = Cache::getInstance()->putJob($job);

//方法一 直接傳入jobId
$job->setJobId($jobId);
$job->setDelay(30);
var_dump(Cache::getInstance()->delayJob($job));

//方法二 取出任務
$job = Cache::getInstance()->getJob('LuffyQAQ_queue_delay');
$job->setDelay(30);
var_dump(Cache::getInstance()->delayJob($job));

//使用jobQueueSize查看隊列長度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_delay");
var_dump($queueSize);

從延遲執行隊列中拿取

//傳入隊列名
var_dump(Cache::getInstance()->getDelayJob('LuffyQAQ_queue_delay'));

清空delay任務隊列


 var_dump(Cache::getInstance()->flushDelayJobQueue('LuffyQAQ_queue_delay'));

 var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_delay'));

將任務改為保留狀態

//添加任務
$job = new Job();
$job->setData("LuffyQAQ");
$job->setQueue("LuffyQAQ_queue_reserve");
$jobId = Cache::getInstance()->putJob($job);

//方法一 直接傳入jobId
$job->setJobId($jobId);
var_dump(Cache::getInstance()->reserveJob($job));

//方法二 取出任務
$job = Cache::getInstance()->getJob('LuffyQAQ_queue_reserve');
var_dump(Cache::getInstance()->reserveJob($job));

//使用jobQueueSize查看隊列長度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_reserve");
var_dump($queueSize);

從保留隊列中拿取

//傳入隊列名
var_dump(Cache::getInstance()->getReserveJob('LuffyQAQ_queue_reserve'));

清空reserve任務隊列


 var_dump(Cache::getInstance()->flushReserveJobQueue('LuffyQAQ_queue_reserve'));

 var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_reserve'));

將任務改為埋藏狀態

$job = new Job();
$job->setQueue('LuffyQAQ_queue_bury');
$job->setData('LuffyQAQ');
$jobId = Cache::getInstance()->putJob($job);
$job->setJobId($jobId);

var_dump(Cache::getInstance()->buryJob($job));

//使用jobQueueSize查看隊列長度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_bury");
var_dump($queueSize);

從埋藏隊列中拿取

//傳入隊列名
var_dump(Cache::getInstance()->getBuryJob('LuffyQAQ_queue_bury'));

將埋藏隊列任務恢復到ready中


var_dump(Cache::getInstance()->kickJob($job));

清空bury任務隊列


 var_dump(Cache::getInstance()->flushBuryJobQueue('LuffyQAQ_queue_bury'));

 var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_bury'));
主站蜘蛛池模板: 专业色素炭黑生产厂家,提供各种用途色素炭黑价格-枣庄鑫源化工 | 衢州装饰公司_衢州装修公司_衢州创美装饰工程有限公司 - Powered by www.qzcmzs.com | 连云港机械手厂家_全自动焊接机械手_刀轴焊接机_智能轴类焊接机_连云港建博自动化设备有限公司 | 深圳货柜租赁_集装箱出售/租赁_集装箱改造_鹏泰集装箱 | 气调包装机厂家-真空包装机价格-气调包装机-锁鲜包装机-江苏大江智能装备有限公司 | 卫生间隔断-东莞卫生间隔断-东莞卫生间隔断厂家-公共卫生间隔断-东莞市康丰家居建材有限公司 | 太原门禁系统_太原车牌识别_山西人脸识别系统-山西元一智能科技有限公司 | 软化水设备_锅炉软水设备_全自动软化设备【安装维护、更换树脂】-青岛水处理设备厂家 | 阻抗分析仪 阻抗测试仪 介电常数测试仪 充电枪测试仪-苏州腾斯凯电子科技有限公司 | 纠偏系统厂家-迈欣机械| 中国家居资讯网-家居建材-知名十大品牌-著名品牌资讯网 | 阻垢剂|缓蚀剂|杀菌剂|分散剂|水处理剂|印染助剂|水处理药剂|造纸助剂|膜阻垢剂|缓蚀剂|HEDP|ATMP|螯合剂-山东凯瑞化学有限公司 水处理药剂生产厂家 | 转盘萃取塔,DMF回收塔生产厂家-无锡弘鼎华化工设备有限公司 | 陕西筱润智能科技有限公司 干部人事智能档案柜 智能密集架 智能档案柜 部队选层文件智能柜 智能枪弹柜 财务智能档案柜 边防武警智能密集架 医院智能档案柜 部队选层文件智能柜智能枪弹柜 学校医院文件柜 企事业单位公检法智能文件柜 生产厂家-筱润智能科技有限公司 RFID射频智能密集架 全自动智能选层档案柜 智能密保柜 枪柜部队营房营具床桌椅办公家具 办公用品档案盒设备货架 全自动智能选层柜生产厂家-筱润智能科技有限公司 | 亿企商贸-亿万企业的商务贸易平台-B2B企业产品发布供求信息平台,一带一路中国企业及产品展示平台,免费企业智能自助建站网络营销推广平台,打造B2B企业黄页产品信息发布推广专业综合电子商务平台! | 云梯车|云梯搬家车|工程高空上料车|云梯登高车价格|视频|图片-专汽之家 | 深圳PCB电路板厂|PCB线路板厂|FPC柔性电路板厂|FPC软性线路板生产厂家|恒成和电路板:18681495413 | 双螺杆挤压膨化设备_挤压熟化设备_烘干设备_油炸设备及喷涂调味设备-山东铭本机械科技公司 | 双合金螺杆|注塑机螺杆|挤出机螺杆|双合金料管—东莞市精耐螺杆机械有限公司 | 铝型材定制_东莞铝型材_散热器铝型材_工业铝合金型材挤压加工生产厂家价格-中亚铝业 | 萍乡市信源电瓷制造有限公司--官网|萍乡市信源电瓷|萍乡电瓷厂|萍乡绝缘子-萍乡市信源电瓷制造有限公司 | 学校洗碗机-郑州洗碗机厂家-商用洗碗机-郑州旭申环保科技有限公司 | 涂塑复合钢管,大口径涂塑钢管,内外涂塑钢管厂家-沧州友诚管业有限公司 | 青岛除甲醛公司|青岛甲醛治理|青岛除甲醛|甲醛检测|光触媒除甲醛|装修除味除甲醛|新房除甲醛|青岛室内环境污染检测治理|青岛闪洁环保科技有限公司官网 | 深圳彩盒印刷-纸盒包装-不干胶标签印刷-深圳印刷厂家-深圳贝的印刷 | 铸铁平台,三维柔性焊接平台,划线平台,大理石平台,检验平板,花岗石平台_泊头市恒量机械设备有限公司 | 烟气在线监测系统_VOCs在线监测_分析仪「杭州世驰科技」 | 银泰洁净--净化工程总承包,20年精耕细作,专为净化而来_银泰洁净--净化工程总承包,20年精耕细作,专为净化而来 | 伸缩接头,限位伸缩接头,传力接头,可拆卸接头,橡胶接头,衬四氟橡胶接头,橡胶软连接,橡胶补偿器,防水套管- 巩义市隆盛管道设备有限公司 | 首页|广东蒙泰高新纤维股份有限公司|丙纶细旦丝|丙纶异形丝|丙纶FDY网络丝 - 广东蒙泰高新纤维股份有限公司 | 吉林市发布(雾凇融媒)官网 | 江苏减速机_常州减速机_摆线减速机_摆线针轮减速机 | 一站式应用与数据集成平台(iPaaS) - 数环通 | 上饶环亚电脑会计培训学校--电脑学校|上饶电脑学校|上饶电脑培训|会计培训|上饶会计培训|上饶县会计培训|广丰会计培训|玉山会计培训|横峰会计培训|上饶网店培训 上进电缆(嘉兴)股份有限公司官网 - 光伏电缆|防火电缆|电力电缆|铝合金电缆专业生产厂家 | 铝合金热处理设备_天然气铝棒加热炉_QPQ热处理设备-浙江长兴天源炉业科技有限公司 | 实验室冷水机-冷却循环水系统-深圳市达沃西制冷设备厂 | 真空上料机_加料机_天津自动上料机_投料站_包装机加料_吸料机_粉体称重-天津市飞云粉体设备有限公司 | 内蒙古天奇生物科技有限公司——OEM代工|ODM定制|原料供应|骨肽|片剂|胶囊剂|口服液 | 上海机械加工-机械加工-精密机械加工-上海欧野精工机械有限公司 上海慧泰仪器制造有限公司_一体型马弗炉-可控真空干燥箱-强光稳定性试验箱 | 济南牛皮癣专科研究院_「济南市银屑病医院」_济南治牛皮癣医保医院_济南正规的银屑病医院 | 永光机械-小型建筑机械领航者,公司专注于小型建筑、工程机械的研发与制造 |