Beanstalk使用

Beanstalk 介绍

beanstalk 的官网介绍,beanstalk 是一个简单、快速的工作队列,其设计之初是异步执行耗时的操作,来减少大型 web 应用的延迟。它是一个内存队列,也支持把任务写入 binlog,完成持久化。

核心概念

  • job : 一个需要异步处理的任务,是 Beanstalkd 中的基本单元,需要放在一个 tube 中。

  • tube :一个有名的任务队列,用来存储统一类型的 job,是 producer 和 consumer 操作的对象。

  • producer :Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中。

  • consumer :Job 的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态

  • 任务优先级:任务(job)可以有 0~2^32 个优先级, 0 代表最高优先级。beanstalkd 采用最大最小堆(Min-max heap)处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务,时间复杂度为 O(logn).

  • 延时任务(delay):有两种方式可以延时执行任务(job): 生产者发布任务时指定延时;或者当任务处理完毕后,消费者再次将任务放入队列延时执行(RELEASE with )。

  • 任务超时重发(time-to-run): Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete /release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令,它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).

  • 任务预留(buried):如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick 能够一次性把 n 条被保留的任务放回准备队列。

实际使用

pda/pheanstalk 为例

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
require __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;
# 创建客户端
$pheanstalk = Pheanstalk::create('127.0.0.1', 11300, 10);

# 往指定管道生产一个字符串 job
$pheanstalk
->useTube('testtube')
->put("job payload goes here\n");

# 往指定管道生产一个 encode 后的数组
$pheanstalk
->useTube('testtube')
->put(
json_encode(['test' => 'data']),
Pheanstalk::DEFAULT_PRIORITY, // job 的优先级
30, // 延迟多长时间处理
60 // beanstalk 重试时间
);

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
require __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('127.0.0.1', 11300, 10);

// 从 testtube 管道获取任务
$pheanstalk->watch('testtube');

// 挂起直到有一个任务返回
$job = $pheanstalk->reserve();
// 等待 3 秒直到有一个任务返回,超过 3 秒则退出
$job = $pheanstalk->reserveWithTimeout(3);

try {
// 获取 job 的数据
$jobPayload = $job->getData();
// 处理业务
// 如果业务的处理时间超过 beanstalk 对该任务的重试时间,可以通过 touch 方法,重置重试时间
$pheanstalk->touch($job);

// 如果当前不处理此任务,将任务加入 bury 队列,等待调用 kick 方法再次推入 ready 队列
// $pheanstalk->bury($job);

// 处理完任务之后进行删除,不删除 beanstalk 会一直重试
$pheanstalk->delete($job);
}
catch(\Exception $e) {
// 如果有异常可以重新将 job 推入任务,等待重试
$pheanstalk->release($job);
// 延迟 5 秒处理任务
$pheanstalk->release($job, Pheanstalk::DEFAULT_PRIORITY, 5);
}

参考链接:https://www.fzb.me/beanstalkd/