asynq源码阅读
October 6, 2023
asynq库介绍 #
asynq是一款使用Go语言开发的开源任务队列库,它的特点是:
- 支持每个任务至少执行一次
- 支持任务调度
- 失败任务重试
- 支持崩溃恢复
- 支持优先级队列
- 支持加权优先级队列
- 支持严格优先级队列
- 基于redis,添加任务低延迟。
- 支持任务去重。
- 允许任务设置超时时间。 …… 更多特性,可见查看其github地址:https://github.com/hibiken/asynq。
asynq源码阅读 #
从client看任务存储 #
我们先从官方提供的Client示例的入口来看一下asynq的任务存储是怎么实现的。Client
package main
import (
"log"
"time"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
// ------------------------------------------------------
// Example 1: Enqueue task to be processed immediately.
// Use (*Client).Enqueue method.
// ------------------------------------------------------
task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// ------------------------------------------------------------
// Example 2: Schedule task to be processed in the future.
// Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------
info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatalf("could not schedule task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// ----------------------------------------------------------------------------
// Example 3: Set other options to tune task processing behavior.
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
// ----------------------------------------------------------------------------
task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
EnqueueContext
// EnqueueContext enqueues the given task to a queue.
// EnqueueContent向队列中投递任务
//
// EnqueueContext returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
// 如果任务成功投递,返回TaskInfo和nil,否则返回非nil的error
//
// The argument opts specifies the behavior of task processing.
// 参数opts指定了任务处理的行为
// If there are conflicting Option values the last one overrides others.
// 如果有冲突的Option值,最后一个会覆盖前面的
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// 任何传递给NewTask的选项都可以被传递给Enqueue的选项覆盖
// By default, max retry is set to 25 and timeout is set to 30 minutes.
// 默认情况下,最大重试次数为25,超时时间为30分钟
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
// 如果没有提供ProcessAt或ProcessIn选项,任务将立即处于pending状态
//
// The first argument context applies to the enqueue operation. To specify task timeout and deadline, use Timeout and Deadline option instead.
// 第一个参数context应用于enqueue操作。要指定任务的超时时间和截止时间,请使用Timeout和Deadline选项
func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error) {
if task == nil {
return nil, fmt.Errorf("task cannot be nil")
}
if strings.TrimSpace(task.Type()) == "" {
return nil, fmt.Errorf("task typename cannot be empty")
}
// merge task options with the options provided at enqueue time.
// 将任务选项与enqueue时提供的选项合并
opts = append(task.opts, opts...)
opt, err := composeOptions(opts...)
if err != nil {
return nil, err
}
deadline := noDeadline
if !opt.deadline.IsZero() {
deadline = opt.deadline
}
timeout := noTimeout
if opt.timeout != 0 {
timeout = opt.timeout
}
if deadline.Equal(noDeadline) && timeout == noTimeout {
// If neither deadline nor timeout are set, use default timeout.
timeout = defaultTimeout
}
var uniqueKey string
if opt.uniqueTTL > 0 {
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
}
msg := &base.TaskMessage{
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()),
UniqueKey: uniqueKey,
GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()),
}
now := time.Now()
var state base.TaskState
if opt.processAt.After(now) {
err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
state = base.TaskStateScheduled
} else if opt.group != "" {
// Use zero value for processAt since we don't know when the task will be aggregated and processed.
// 对于processAt使用零值,因为我们不知道任务何时被聚合和处理
opt.processAt = time.Time{}
err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
state = base.TaskStateAggregating
} else {// 普通任务
opt.processAt = now
err = c.enqueue(ctx, msg, opt.uniqueTTL)
state = base.TaskStatePending
}
switch {
case errors.Is(err, errors.ErrDuplicateTask):
return nil, fmt.Errorf("%w", ErrDuplicateTask)
case errors.Is(err, errors.ErrTaskIdConflict):
return nil, fmt.Errorf("%w", ErrTaskIDConflict)
case err != nil:
return nil, err
}
return newTaskInfo(msg, state, opt.processAt, nil), nil
}
if opt.processAt.After(now) {
err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
state = base.TaskStateScheduled
} else if opt.group != "" {
// Use zero value for processAt since we don't know when the task will be aggregated and processed.
// 对于processAt使用零值,因为我们不知道任务何时被聚合和处理
opt.processAt = time.Time{}
err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
state = base.TaskStateAggregating
} else {// 普通任务
opt.processAt = now
err = c.enqueue(ctx, msg, opt.uniqueTTL)
state = base.TaskStatePending
}
可以看到,根据任务的类型,asynq会将任务分为三类:
- 需要延迟执行的任务
- 分组任务
- 普通任务 我们这里只看普通任务的投递逻辑。打断点,进入enqueue方法,最终会进入rdb.go的Enqueue:
// Enqueue adds the given task to the pending list of the queue.
// Enqueue将给定的任务添加到队列的pending列表中
func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
var op errors.Op = "rdb.Enqueue"
// 对消息进行编码
encoded, err := base.EncodeMessage(msg)
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
// 将队列添加到全局字典中。此处使用的是redis的set。
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
keys := []string{
base.TaskKey(msg.Queue, msg.ID),
base.PendingKey(msg.Queue),
}
argv := []interface{}{
encoded,
msg.ID,
r.clock.Now().UnixNano(),
}
// 执行lua脚本,将任务添加到队列中
n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...)
if err != nil {
return err
}
if n == 0 {
return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
}
return nil
}
可以看到,asynq最终调用runScriptWithErrorCode来执行lua脚本,将任务添加到队列中。我们来看一下这个脚本(enqueueCmd)的定义:
// enqueueCmd enqueues a given task message.
// enqueueCmd将给定的任务消息入队
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data 任务消息数据
// ARGV[2] -> task ID 任务id
// ARGV[3] -> current unix time in nsec 当前unix时间戳
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task ID already exists
var enqueueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", "pending",
"pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
这段lua脚本的逻辑是:
- 判断任务是否已经存在,如果存在,返回0。否则,继续执行下一步。
- 将任务的信息存储到hash字典中中,包括任务的消息数据,任务状态,任务的入队时间。
- 将任务id添加到队列的头部。 这里的任务id,使用的是uuid,见client.go-composeOptions方法(这里仅展示部分代码):
func composeOptions(opts ...Option) (option, error) {
res := option{
retry: defaultMaxRetry,
queue: base.DefaultQueueName,
taskID: uuid.NewString(),
timeout: 0, // do not set to defaultTimeout here
deadline: time.Time{},
processAt: time.Now(),
}
// ...
}
server #
我们还是从官方示例入口开始看起:Server
package main
import (
"log"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
// See the godoc for other configuration options
},
)
// mux maps a type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
// ...register other handlers...
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
Start
// Start starts the worker server. Once the server has started,
// Start 启动worker server。一旦server启动,
// it pulls tasks off queues and starts a worker goroutine for each task
// and then call Handler to process it.
// 它从队列中拉取任务,并为每个任务启动一个worker goroutine,然后调用Handler来处理它。
// Tasks are processed concurrently by the workers up to the number of
// concurrency specified in Config.Concurrency.
// 任务由worker并发处理,最多可以并发处理数量由Config.Concurrency指定。
//
// Start returns any error encountered at server startup time.
// Start在启动那个时遇到任何错误都会返回。
// If the server has already been shutdown, ErrServerClosed is returned.
// 如果server已经关闭,会返回ErrServerClosed。
func (srv *Server) Start(handler Handler) error {
if handler == nil {
return fmt.Errorf("asynq: server cannot run with nil handler")
}
srv.processor.handler = handler
if err := srv.start(); err != nil {
return err
}
srv.logger.Info("Starting processing")
// 初始化
srv.heartbeater.start(&srv.wg)
// 健康检查
srv.healthchecker.start(&srv.wg)
// 启动订阅者
srv.subscriber.start(&srv.wg)
srv.syncer.start(&srv.wg)
srv.recoverer.start(&srv.wg)
srv.forwarder.start(&srv.wg)
// 启动订阅者
srv.processor.start(&srv.wg)
srv.janitor.start(&srv.wg)
srv.aggregator.start(&srv.wg)
return nil
}
Server结构体定义
// Server is responsible for task processing and task lifecycle management.
// Server负责任务处理和任务生命周期管理。
//
// Server pulls tasks off queues and processes them.
// Server从队列中拉取任务并处理它们。
// If the processing of a task is unsuccessful, server will schedule it for a retry.
// 如果任务处理不成功,server会为其安排重试。
//
// A task will be retried until either the task gets processed successfully
// or until it reaches its max retry count.
// 任务将重试,直到任务被成功处理或达到其最大重试次数为止。
//
// If a task exhausts its retries, it will be moved to the archive and
// will be kept in the archive set.
// 如果任务达到了最大重试次数,它将被移动到archive中,并将保留在archive集合中。
// Note that the archive size is finite and once it reaches its max size,
// oldest tasks in the archive will be deleted.
// 请注意,archive的大小是有限的,一旦达到最大大小,archive中最旧的任务将被删除。
type Server struct {
logger *log.Logger
broker base.Broker
state *serverState
// wait group to wait for all goroutines to finish.
wg sync.WaitGroup
forwarder *forwarder
processor *processor
syncer *syncer
heartbeater *heartbeater
subscriber *subscriber
recoverer *recoverer
healthchecker *healthchecker
janitor *janitor
aggregator *aggregator
}
任务处理器 #
任务处理器所在文件process.go,具体执行方法start() -> exec():
// exec pulls a task out of the queue and starts a worker goroutine to
// process the task.
// exec从队列中拉取一个任务并启动一个工作协程去处理这个任务。
func (p *processor) exec() {
select {
case <-p.quit:
return
case p.sema <- struct{}{}: // acquire token
qnames := p.queues()
// 从队列中获取一个任务
msg, leaseExpirationTime, err := p.broker.Dequeue(qnames...)
switch {
case errors.Is(err, errors.ErrNoProcessableTask):
p.logger.Debug("All queues are empty")
// Queues are empty, this is a normal behavior.
// 队列为空,这是一个常规操作。
// Sleep to avoid slamming redis and let scheduler move tasks into queues.
// 为了防止对redis进行过多请求并让调度器移动任务到队列中,这里进行休眠。
// Note: We are not using blocking pop operation and polling queues instead.
// 注意:我们不使用阻塞式的弹出操作,而是使用轮询队列的方式。
// This adds significant load to redis.
// 这会给 Redis 增加很大的负载。
time.Sleep(time.Second)
<-p.sema // release token
return
case err != nil:
if p.errLogLimiter.Allow() {
p.logger.Errorf("Dequeue error: %v", err)
}
<-p.sema // release token
return
}
lease := base.NewLease(leaseExpirationTime)
deadline := p.computeDeadline(msg)
p.starting <- &workerInfo{msg, time.Now(), deadline, lease}
go func() {
defer func() {
p.finished <- msg
<-p.sema // release token
}()
ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
p.cancelations.Add(msg.ID, cancel)
defer func() {
cancel()
p.cancelations.Delete(msg.ID)
}()
// check context before starting a worker goroutine.
select {
case <-ctx.Done():
// already canceled (e.g. deadline exceeded).
p.handleFailedMessage(ctx, lease, msg, ctx.Err())
return
default:
}
resCh := make(chan error, 1)
// 启动一个协程来处理任务
go func() {
task := newTask(
msg.Type,
msg.Payload,
&ResultWriter{
id: msg.ID,
qname: msg.Queue,
broker: p.broker,
ctx: ctx,
},
)
// 任务处理
resCh <- p.perform(ctx, task)
}()
// 处理各种异常
select {
case <-p.abort:
// time is up, push the message back to queue and quit this worker goroutine.
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.requeue(lease, msg)
return
case <-lease.Done():
cancel()
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
return
case <-ctx.Done():
p.handleFailedMessage(ctx, lease, msg, ctx.Err())
return
case resErr := <-resCh:
if resErr != nil {
p.handleFailedMessage(ctx, lease, msg, resErr)
return
}
p.handleSucceededMessage(lease, msg)
}
}()
}
}
我们看亮点:
- 获取任务
- 执行任务
获取任务 #
从Dequeue开始,最终可以看到从redis中获取任务的lua脚本:
// Input:
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:paused
// KEYS[3] -> asynq:{<qname>}:active
// KEYS[4] -> asynq:{<qname>}:lease
// --
// ARGV[1] -> initial lease expiration Unix time
// ARGV[2] -> task key prefix
//
// Output:
// 输出:
// Returns nil if no processable task is found in the given queue.
// Returns an encoded TaskMessage.
// 如果在队列中没有发现可处理的任务,返回nil
// 返回一个编码后的 TaskMessage。
//
// Note: dequeueCmd checks whether a queue is paused first, before
// calling RPOPLPUSH to pop a task from the queue.
var dequeueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[2]) == 0 then
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if id then
local key = ARGV[2] .. id
redis.call("HSET", key, "state", "active")
redis.call("HDEL", key, "pending_since")
redis.call("ZADD", KEYS[4], ARGV[1], id)
return redis.call("HGET", key, "msg")
end
end
return nil`)
任务处理 #
处理任务的最终代码在servermux.go->ProcessTask:
// ProcessTask dispatches the task to the handler whose
// pattern most closely matches the task type.
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error {
// 获取在启动server时传递的处理方法
h, _ := mux.Handler(task)
return h.ProcessTask(ctx, task)
}
这里也可以分为两步:
- 根据任务信息获取启动server时传入的处理方法。
- 执行处理方法。
总结 #
这个库实现了任务队列的基本功能,目前star数有7.1k,还不错。个人看法:可以当成一种轻量级的任务队列或者消息队列使用,但是是否在关键业务中使用,慎重。