asynq源码阅读

asynq源码阅读

October 6, 2023
后端开发, 消息队列
源码阅读

asynq库介绍 #

asynq是一款使用Go语言开发的开源任务队列库,它的特点是:

  • 支持每个任务至少执行一次
  • 支持任务调度
  • 失败任务重试
  • 支持崩溃恢复
  • 支持优先级队列
  • 支持加权优先级队列
  • 支持严格优先级队列
  • 基于redis,添加任务低延迟。
  • 支持任务去重。
  • 允许任务设置超时时间。 …… 更多特性,可见查看其github地址:https://github.com/hibiken/asynq。

asynq源码阅读 #

从client看任务存储 #

我们先从官方提供的Client示例的入口来看一下asynq的任务存储是怎么实现的。

Client
package main

import (
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
    defer client.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatalf("could not schedule task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
我们从Enqueue方法开始看,最终进入EnqueueContext方法:
EnqueueContext
// EnqueueContext enqueues the given task to a queue.
// EnqueueContent向队列中投递任务
//
// EnqueueContext returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
// 如果任务成功投递,返回TaskInfo和nil,否则返回非nil的error
//
// The argument opts specifies the behavior of task processing.
// 参数opts指定了任务处理的行为
// If there are conflicting Option values the last one overrides others.
// 如果有冲突的Option值,最后一个会覆盖前面的
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// 任何传递给NewTask的选项都可以被传递给Enqueue的选项覆盖
// By default, max retry is set to 25 and timeout is set to 30 minutes.
// 默认情况下,最大重试次数为25,超时时间为30分钟
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
// 如果没有提供ProcessAt或ProcessIn选项,任务将立即处于pending状态
//
// The first argument context applies to the enqueue operation. To specify task timeout and deadline, use Timeout and Deadline option instead.
// 第一个参数context应用于enqueue操作。要指定任务的超时时间和截止时间,请使用Timeout和Deadline选项
func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error) {
	if task == nil {
		return nil, fmt.Errorf("task cannot be nil")
	}
	if strings.TrimSpace(task.Type()) == "" {
		return nil, fmt.Errorf("task typename cannot be empty")
	}
	// merge task options with the options provided at enqueue time.
    // 将任务选项与enqueue时提供的选项合并
	opts = append(task.opts, opts...)
	opt, err := composeOptions(opts...)
	if err != nil {
		return nil, err
	}
	deadline := noDeadline
	if !opt.deadline.IsZero() {
		deadline = opt.deadline
	}
	timeout := noTimeout
	if opt.timeout != 0 {
		timeout = opt.timeout
	}
	if deadline.Equal(noDeadline) && timeout == noTimeout {
		// If neither deadline nor timeout are set, use default timeout.
		timeout = defaultTimeout
	}
	var uniqueKey string
	if opt.uniqueTTL > 0 {
		uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
	}
	msg := &base.TaskMessage{
		ID:        opt.taskID,
		Type:      task.Type(),
		Payload:   task.Payload(),
		Queue:     opt.queue,
		Retry:     opt.retry,
		Deadline:  deadline.Unix(),
		Timeout:   int64(timeout.Seconds()),
		UniqueKey: uniqueKey,
		GroupKey:  opt.group,
		Retention: int64(opt.retention.Seconds()),
	}
	now := time.Now()
	var state base.TaskState
	if opt.processAt.After(now) {
		err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
		state = base.TaskStateScheduled
	} else if opt.group != "" {
		// Use zero value for processAt since we don't know when the task will be aggregated and processed.
        // 对于processAt使用零值,因为我们不知道任务何时被聚合和处理
		opt.processAt = time.Time{}
		err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
		state = base.TaskStateAggregating
	} else {// 普通任务
		opt.processAt = now
		err = c.enqueue(ctx, msg, opt.uniqueTTL)
		state = base.TaskStatePending
	}
	switch {
	case errors.Is(err, errors.ErrDuplicateTask):
		return nil, fmt.Errorf("%w", ErrDuplicateTask)
	case errors.Is(err, errors.ErrTaskIdConflict):
		return nil, fmt.Errorf("%w", ErrTaskIDConflict)
	case err != nil:
		return nil, err
	}
	return newTaskInfo(msg, state, opt.processAt, nil), nil
}
我们不一行一行的介绍,直接看投递任务的逻辑:

	if opt.processAt.After(now) {
		err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
		state = base.TaskStateScheduled
	} else if opt.group != "" {
		// Use zero value for processAt since we don't know when the task will be aggregated and processed.
        // 对于processAt使用零值,因为我们不知道任务何时被聚合和处理
		opt.processAt = time.Time{}
		err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
		state = base.TaskStateAggregating
	} else {// 普通任务
		opt.processAt = now
		err = c.enqueue(ctx, msg, opt.uniqueTTL)
		state = base.TaskStatePending
	}

可以看到,根据任务的类型,asynq会将任务分为三类:

  • 需要延迟执行的任务
  • 分组任务
  • 普通任务 我们这里只看普通任务的投递逻辑。打断点,进入enqueue方法,最终会进入rdb.go的Enqueue:
// Enqueue adds the given task to the pending list of the queue.
// Enqueue将给定的任务添加到队列的pending列表中
func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
	var op errors.Op = "rdb.Enqueue"
    // 对消息进行编码
	encoded, err := base.EncodeMessage(msg)
	if err != nil {
		return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
	}
    // 将队列添加到全局字典中。此处使用的是redis的set。
	if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
		return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
	}
	keys := []string{
		base.TaskKey(msg.Queue, msg.ID),
		base.PendingKey(msg.Queue),
	}
	argv := []interface{}{
		encoded,
		msg.ID,
		r.clock.Now().UnixNano(),
	}

    // 执行lua脚本,将任务添加到队列中
	n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...)
	if err != nil {
		return err
	}
	if n == 0 {
		return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
	}
	return nil
}

可以看到,asynq最终调用runScriptWithErrorCode来执行lua脚本,将任务添加到队列中。我们来看一下这个脚本(enqueueCmd)的定义:

// enqueueCmd enqueues a given task message.
// enqueueCmd将给定的任务消息入队
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data  任务消息数据
// ARGV[2] -> task ID  任务id
// ARGV[3] -> current unix time in nsec  当前unix时间戳
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task ID already exists
var enqueueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
	return 0
end
redis.call("HSET", KEYS[1],
           "msg", ARGV[1],
           "state", "pending",
           "pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)

这段lua脚本的逻辑是:

  1. 判断任务是否已经存在,如果存在,返回0。否则,继续执行下一步。
  2. 将任务的信息存储到hash字典中中,包括任务的消息数据,任务状态,任务的入队时间。
  3. 将任务id添加到队列的头部。 这里的任务id,使用的是uuid,见client.go-composeOptions方法(这里仅展示部分代码):
func composeOptions(opts ...Option) (option, error) {
	res := option{
		retry:     defaultMaxRetry,
		queue:     base.DefaultQueueName,
		taskID:    uuid.NewString(),
		timeout:   0, // do not set to defaultTimeout here
		deadline:  time.Time{},
		processAt: time.Now(),
	}

    // ...
}

server #

我们还是从官方示例入口开始看起:

Server
package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: redisAddr},
        asynq.Config{
            // Specify how many concurrent workers to use
            Concurrency: 10,
            // Optionally specify multiple queues with different priority.
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
            // See the godoc for other configuration options
        },
    )

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}
从Run入口进入,一直到Server的Start方法:
Start
// Start starts the worker server. Once the server has started,
// Start 启动worker server。一旦server启动,
// it pulls tasks off queues and starts a worker goroutine for each task
// and then call Handler to process it.
// 它从队列中拉取任务,并为每个任务启动一个worker goroutine,然后调用Handler来处理它。
// Tasks are processed concurrently by the workers up to the number of
// concurrency specified in Config.Concurrency.
// 任务由worker并发处理,最多可以并发处理数量由Config.Concurrency指定。
//
// Start returns any error encountered at server startup time.
// Start在启动那个时遇到任何错误都会返回。
// If the server has already been shutdown, ErrServerClosed is returned.
// 如果server已经关闭,会返回ErrServerClosed。
func (srv *Server) Start(handler Handler) error {
	if handler == nil {
		return fmt.Errorf("asynq: server cannot run with nil handler")
	}
	srv.processor.handler = handler

	if err := srv.start(); err != nil {
		return err
	}
	srv.logger.Info("Starting processing")

    // 初始化
	srv.heartbeater.start(&srv.wg)
    // 健康检查
	srv.healthchecker.start(&srv.wg)
    // 启动订阅者
	srv.subscriber.start(&srv.wg)
	srv.syncer.start(&srv.wg)
	srv.recoverer.start(&srv.wg)
	srv.forwarder.start(&srv.wg)
    // 启动订阅者
	srv.processor.start(&srv.wg)
	srv.janitor.start(&srv.wg)
	srv.aggregator.start(&srv.wg)
	return nil
}
其中Server结构体的定义是:
Server结构体定义
// Server is responsible for task processing and task lifecycle management.
// Server负责任务处理和任务生命周期管理。
//
// Server pulls tasks off queues and processes them.
// Server从队列中拉取任务并处理它们。
// If the processing of a task is unsuccessful, server will schedule it for a retry.
// 如果任务处理不成功,server会为其安排重试。
//
// A task will be retried until either the task gets processed successfully
// or until it reaches its max retry count.
// 任务将重试,直到任务被成功处理或达到其最大重试次数为止。
//
// If a task exhausts its retries, it will be moved to the archive and
// will be kept in the archive set.
// 如果任务达到了最大重试次数,它将被移动到archive中,并将保留在archive集合中。
// Note that the archive size is finite and once it reaches its max size,
// oldest tasks in the archive will be deleted.
// 请注意,archive的大小是有限的,一旦达到最大大小,archive中最旧的任务将被删除。
type Server struct {
	logger *log.Logger

	broker base.Broker

	state *serverState

	// wait group to wait for all goroutines to finish.
	wg            sync.WaitGroup
	forwarder     *forwarder
	processor     *processor
	syncer        *syncer
	heartbeater   *heartbeater
	subscriber    *subscriber
	recoverer     *recoverer
	healthchecker *healthchecker
	janitor       *janitor
	aggregator    *aggregator
}
我们可以看到,asynq的server启动了很多goroutine,包括健康检查,任务处理等。我们这里着重看一下任务处理器。

任务处理器 #

任务处理器所在文件process.go,具体执行方法start() -> exec():

// exec pulls a task out of the queue and starts a worker goroutine to
// process the task.
// exec从队列中拉取一个任务并启动一个工作协程去处理这个任务。
func (p *processor) exec() {
	select {
	case <-p.quit:
		return
	case p.sema <- struct{}{}: // acquire token
		qnames := p.queues()
        // 从队列中获取一个任务
		msg, leaseExpirationTime, err := p.broker.Dequeue(qnames...)
		switch {
		case errors.Is(err, errors.ErrNoProcessableTask):
			p.logger.Debug("All queues are empty")
			// Queues are empty, this is a normal behavior.
            // 队列为空,这是一个常规操作。
			// Sleep to avoid slamming redis and let scheduler move tasks into queues.
            // 为了防止对redis进行过多请求并让调度器移动任务到队列中,这里进行休眠。
			// Note: We are not using blocking pop operation and polling queues instead.
            // 注意:我们不使用阻塞式的弹出操作,而是使用轮询队列的方式。
			// This adds significant load to redis.
            // 这会给 Redis 增加很大的负载。
			time.Sleep(time.Second)
			<-p.sema // release token
			return
		case err != nil:
			if p.errLogLimiter.Allow() {
				p.logger.Errorf("Dequeue error: %v", err)
			}
			<-p.sema // release token
			return
		}

		lease := base.NewLease(leaseExpirationTime)
		deadline := p.computeDeadline(msg)
		p.starting <- &workerInfo{msg, time.Now(), deadline, lease}
		go func() {
			defer func() {
				p.finished <- msg
				<-p.sema // release token
			}()

			ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
			p.cancelations.Add(msg.ID, cancel)
			defer func() {
				cancel()
				p.cancelations.Delete(msg.ID)
			}()

			// check context before starting a worker goroutine.
			select {
			case <-ctx.Done():
				// already canceled (e.g. deadline exceeded).
				p.handleFailedMessage(ctx, lease, msg, ctx.Err())
				return
			default:
			}

			resCh := make(chan error, 1)
            // 启动一个协程来处理任务
			go func() {
				task := newTask(
					msg.Type,
					msg.Payload,
					&ResultWriter{
						id:     msg.ID,
						qname:  msg.Queue,
						broker: p.broker,
						ctx:    ctx,
					},
				)
                // 任务处理
				resCh <- p.perform(ctx, task)
			}()

            // 处理各种异常
			select {
			case <-p.abort:
				// time is up, push the message back to queue and quit this worker goroutine.
				p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
				p.requeue(lease, msg)
				return
			case <-lease.Done():
				cancel()
				p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
				return
			case <-ctx.Done():
				p.handleFailedMessage(ctx, lease, msg, ctx.Err())
				return
			case resErr := <-resCh:
				if resErr != nil {
					p.handleFailedMessage(ctx, lease, msg, resErr)
					return
				}
				p.handleSucceededMessage(lease, msg)
			}
		}()
	}
}

我们看亮点:

  • 获取任务
  • 执行任务

获取任务 #

从Dequeue开始,最终可以看到从redis中获取任务的lua脚本:

// Input:
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:paused
// KEYS[3] -> asynq:{<qname>}:active
// KEYS[4] -> asynq:{<qname>}:lease
// --
// ARGV[1] -> initial lease expiration Unix time
// ARGV[2] -> task key prefix
//
// Output:
// 输出:
// Returns nil if no processable task is found in the given queue.
// Returns an encoded TaskMessage.
// 如果在队列中没有发现可处理的任务,返回nil
// 返回一个编码后的 TaskMessage。
//
// Note: dequeueCmd checks whether a queue is paused first, before
// calling RPOPLPUSH to pop a task from the queue.
var dequeueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[2]) == 0 then
	local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
	if id then
		local key = ARGV[2] .. id
		redis.call("HSET", key, "state", "active")
		redis.call("HDEL", key, "pending_since")
		redis.call("ZADD", KEYS[4], ARGV[1], id)
		return redis.call("HGET", key, "msg")
	end
end
return nil`)

任务处理 #

处理任务的最终代码在servermux.go->ProcessTask:

// ProcessTask dispatches the task to the handler whose
// pattern most closely matches the task type.
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error {
    // 获取在启动server时传递的处理方法
	h, _ := mux.Handler(task)
	return h.ProcessTask(ctx, task)
}

这里也可以分为两步:

  1. 根据任务信息获取启动server时传入的处理方法。
  2. 执行处理方法。

总结 #

这个库实现了任务队列的基本功能,目前star数有7.1k,还不错。个人看法:可以当成一种轻量级的任务队列或者消息队列使用,但是是否在关键业务中使用,慎重。