thinkphp-queue消息队列服务

thinkphp-queue消息队列服务 是 官⽅提供的⼀个消息队列服务,它⽀持消息队列的⼀些基本特性:
消息的发布,获取,执⾏,删除,重发,失败处理,延迟执⾏,超时控制等
队列的多队列, 内存限制 ,启动,停⽌,守护等
消息队列可降级为同步执⾏
搭建的存储环境
可使⽤ [推荐]
可使⽤数据库
推荐使⽤redis
⾸先安装redis
这次使⽤的是宝塔,安装流程:
⼀、
宝塔⾯板安装Redis不会特别⿇烦,只要⼏步就可以实现:
1.安装redis服务
2.配置redis设置【可不设置】
3.安装PHP扩展
1.⾸先,我们来安装redis服务,进⼊宝塔管理⾯板--软件管理--运⾏环境--redis-点击安装,等待完成
2.完成之后开始第⼆步,配置redis设置。这⼀步根据⾃⼰需要进⾏配置。注意安全问题哦
我这⾥就改⼀下设置密码验证
到/www/server/redis/下⾯的⽂件f⾥⾯的requirepass foobared这⾏,将注释去掉
foobared这个改为⾃⼰想设置的密码
3.然后安装php扩展
点击PHP对应的设置--安装扩展--redis--安装
等待完成。
完成后重启即可。
注意:1.需要开启相应的端⼝,宝塔⾯板-安全,还有服务器提供商那边也开放此端⼝
2.若宝塔上安装了多个版本的php,可先查看⽬前正在使⽤的php版本,【php  -v】
3.查看项⽬正在使⽤的php版本
⼆、
【官⽅】 安装 thinkphp-
composer install topthink/think-queue
【我使⽤的命令】
composer require topthink/think-queue:2.*
(⾸先确保已安装composer)
注意: # Thinkphp5.1            composer require topthink/think-queue:2.*            # Thinkphp6            composer require topthink/think-queue:3.*
尼龙纤维植绒拭子三、
先通过⼀段代码,了解⼀下 thinkphp-queue 的基本使⽤流程。
⽬标:
在业务控制器中推送⼀个新消息到⼀个名为 ‘helloJobQueue’ 的中,该消息中包含我们⾃定义的业务数据,然后,编写⼀个名为 Hello 的消费者类,并通过命令⾏去调⽤该消费者类获取这个消息,拿到定义的数据。
1.1 安装 thinkphp-queue
1.2 搭建消息队列的存储环境    【使⽤ Redis 】 安装并启动 Redis 服务
1.3 配置消息队列的驱动
安装完thinkphp-queue后会⾃动⽣成\application\config\queue.php
根据选择的存储⽅式,在 \application\config\queue.php 这个配置⽂件中,添加消息队列对应的驱动配置
1  return [
2      'connector'  => 'Redis',  // Redis 驱动
3      'expire'    => 60,  // 任务的过期时间,默认为60秒; 若要禁⽤,则设置为 null
4      'default'    => 'default',  // 默认的队列名称
5      'host'      => '127.0.0.1', // redis 主机ip
6      'port'      => 6379,  // redis 端⼝
7      'password'  => '',  // redis 密码
8      'select'    => 0,  // 使⽤哪⼀个 db,默认为 db0
9      'timeout'    => 0,  // redis连接的超时时间水泵自动抽水
10      'persistent' => false,  // 是否是长连接
11
12  //    'connector' => 'Database',  // 数据库驱动
13  //    'expire'    => 60,          // 任务的过期时间,默认为60秒; 若要禁⽤,则设置为 null
14  //    'default'  => 'default',    // 默认的队列名称
15  //    'table'    => 'jobs',      // 存储消息的表名,不带前缀
16  //    'dsn'      => [],
17
18  //    'connector'  => 'Topthink', // ThinkPHP内部的队列通知服务平台,本⽂不作介绍
19  //    'token'      => '',
20  //    'project_id'  => '',
21  //    'protocol'    => 'https',
22  //    'host'        => 'pthink',
23  //    'port'        => 443,
24  //    'api_version' => 1,
25  //    'max_retries' => 3,
26  //    'default'    => 'default',
27
28  //    'connector'  => 'Sync',  // Sync 驱动,该驱动的实际作⽤是取消消息队列,还原为同步执⾏
29  ];
1.3.1 配置⽂件中的 expire 参数说明
无菌检测系统
expire 参数指的是任务的过期时间, 单位为秒。 过期的任务,其准确的定义是
1. 任务的状态为执⾏中
2. 任务的开始执⾏的时刻 + expire > 当前时刻
expire 不为null 时 ,thinkphp-queue 会在每次获取下⼀个任务之前检查并重发过期(执⾏超时)的任务。
expire 为null 时,thinkphp-queue 不会检查过期的任务,性能相对较⾼⼀点。但是需要注意:
这些执⾏超时的任务会⼀直留在消息队列中,需要开发者另⾏处理(删除或者重发)!
对expire 参数理解或者使⽤不当时,很容易产⽣⼀些bug,后⾯会举例提到。
1.4 消息的创建与推送
我们在业务控制器中创建⼀个新的消息,并推送到 helloJobQueue 队列
新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob ⽅法
<?php
/**
* ⽂件路径: \application\index\controller\JobTest.php
* 该控制器的业务代码中借助了thinkphp-queue 库,将⼀个消息推送到消息队列
*/
namespace app\index\controller;
use think\Exception;
use think\Queue;
class JobTest {
/**
* ⼀个使⽤了队列的 action
*/
public function actionWithHelloJob(){
// 1.当前任务将由哪个类来负责处理。
//  当轮到该任务时,系统将⽣成⼀个该类的实例,并调⽤其 fire ⽅法窑链
$jobHandlerClassName  = 'app\index\job\Hello';
// 2.当前任务归属的队列名称,如果为新队列,会⾃动创建
$jobQueueName    = "helloJobQueue";
/
/ 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
//  ( jobData 为对象时,存储其public属性的键值对 )
$jobData          = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
// 4.将该任务推送到消息队列,等待对应的消费者去执⾏
$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// database 驱动时,返回值为 1|false  ;  redis 驱动时,返回值为随机字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
注意: 在这个例⼦当中,我们是⼿动指定的 $jobHandlerClassName ,更合理的做法是先定义好消息名称与消费者类名的映射关系,然后由某个可以获取该映射关系的类来推送这个消息。这样,⽣产者只需要知道消息的名称,⽽⽆需指定哪个消费者类来处理。
除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); 这种⽅式之外,还可以直接传⼊ Queue::push( $jobHandlerObject ,null , $jobQueueName ); 这时,需要在 $jobHandlerObject 中定义⼀个 handle() ⽅法,消息队列在执⾏到该任务时会⾃动反序列化该对象,并调⽤其 handle()⽅法。 该⽅式中, 数据需要提前挂载在 $jobHandlerObject 对象上。
1.5 消息的消费与删除
编写 Hello 消费者类,⽤于处理 helloJobQueue 队列中的任务
新增 \application\index\job\Hello.php 消费者类,并编写其 fire() ⽅法
<?php
/**
* ⽂件路径: \application\index\job\Hello.php
* 这是⼀个消费者类,⽤于处理 helloJobQueue 队列中的任务
*/
namespace app\index\job;
use think\queue\Job;
class Hello {
/**
* fire⽅法是消息队列默认调⽤的⽅法
* @param Job            $job      当前的任务对象
* @param array|mixed    $data    发布任务时⾃定义的数据
*/
public function fire(Job $job,$data)
{
// 有些消息在到达消费者时,可能已经不再需要执⾏了
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return;
}
$isJobDone = $this->doHelloJob($data);
if ($isJobDone) {
/
/ 如果任务执⾏成功,记得删除任务
$job->delete();
print("<info>Hello Job has been done and deleted"."</info>\n");
}else{
if ($job->attempts() > 3) {
//通过这个⽅法可以检查这个任务已经重试了⼏次了
srcpan
print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
// 也可以重新发布这个任务
氢氧焊接机
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay为延迟时间,表⽰该任务延迟2秒后再执⾏
}
}
}
/**
* 有些消息在到达消费者时,可能已经不再需要执⾏了
* @param array|mixed    $data    发布任务时⾃定义的数据
* @return boolean                任务执⾏的结果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}

本文发布于:2024-09-22 12:50:31,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/4/183105.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   队列   任务
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议