博客

Web即时通讯的几种方式

October 21, 2023
Web
Web, 即时通讯

前言 #

在web开发中,我们经常会遇到这样的场景:服务端某个操作完成,或者发生了某种变化时,需要实时通知到客户端/浏览器,我们就称其为即时通讯。例如,client提交了一个导出任务,服务端执行异步任务进行处理,在导出完成时,告知客户端,客户端再下载导出结果。又比如,常见webim通讯场景,客户端发送消息,服务端接收到消息后,通知到接收方。常用的即时通讯方式有以下几种:

...

asynq源码阅读

October 6, 2023
后端开发, 消息队列
源码阅读

asynq库介绍 #

asynq是一款使用Go语言开发的开源任务队列库,它的特点是:

  • 支持每个任务至少执行一次
  • 支持任务调度
  • 失败任务重试
  • 支持崩溃恢复
  • 支持优先级队列
  • 支持加权优先级队列
  • 支持严格优先级队列
  • 基于redis,添加任务低延迟。
  • 支持任务去重。
  • 允许任务设置超时时间。 …… 更多特性,可见查看其github地址:https://github.com/hibiken/asynq。

asynq源码阅读 #

从client看任务存储 #

我们先从官方提供的Client示例的入口来看一下asynq的任务存储是怎么实现的。

Client
package main

import (
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
    defer client.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatalf("could not schedule task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
我们从Enqueue方法开始看,最终进入EnqueueContext方法:
EnqueueContext
// EnqueueContext enqueues the given task to a queue.
// EnqueueContent向队列中投递任务
//
// EnqueueContext returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
// 如果任务成功投递,返回TaskInfo和nil,否则返回非nil的error
//
// The argument opts specifies the behavior of task processing.
// 参数opts指定了任务处理的行为
// If there are conflicting Option values the last one overrides others.
// 如果有冲突的Option值,最后一个会覆盖前面的
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// 任何传递给NewTask的选项都可以被传递给Enqueue的选项覆盖
// By default, max retry is set to 25 and timeout is set to 30 minutes.
// 默认情况下,最大重试次数为25,超时时间为30分钟
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
// 如果没有提供ProcessAt或ProcessIn选项,任务将立即处于pending状态
//
// The first argument context applies to the enqueue operation. To specify task timeout and deadline, use Timeout and Deadline option instead.
// 第一个参数context应用于enqueue操作。要指定任务的超时时间和截止时间,请使用Timeout和Deadline选项
func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error) {
	if task == nil {
		return nil, fmt.Errorf("task cannot be nil")
	}
	if strings.TrimSpace(task.Type()) == "" {
		return nil, fmt.Errorf("task typename cannot be empty")
	}
	// merge task options with the options provided at enqueue time.
    // 将任务选项与enqueue时提供的选项合并
	opts = append(task.opts, opts...)
	opt, err := composeOptions(opts...)
	if err != nil {
		return nil, err
	}
	deadline := noDeadline
	if !opt.deadline.IsZero() {
		deadline = opt.deadline
	}
	timeout := noTimeout
	if opt.timeout != 0 {
		timeout = opt.timeout
	}
	if deadline.Equal(noDeadline) && timeout == noTimeout {
		// If neither deadline nor timeout are set, use default timeout.
		timeout = defaultTimeout
	}
	var uniqueKey string
	if opt.uniqueTTL > 0 {
		uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
	}
	msg := &base.TaskMessage{
		ID:        opt.taskID,
		Type:      task.Type(),
		Payload:   task.Payload(),
		Queue:     opt.queue,
		Retry:     opt.retry,
		Deadline:  deadline.Unix(),
		Timeout:   int64(timeout.Seconds()),
		UniqueKey: uniqueKey,
		GroupKey:  opt.group,
		Retention: int64(opt.retention.Seconds()),
	}
	now := time.Now()
	var state base.TaskState
	if opt.processAt.After(now) {
		err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
		state = base.TaskStateScheduled
	} else if opt.group != "" {
		// Use zero value for processAt since we don't know when the task will be aggregated and processed.
        // 对于processAt使用零值,因为我们不知道任务何时被聚合和处理
		opt.processAt = time.Time{}
		err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
		state = base.TaskStateAggregating
	} else {// 普通任务
		opt.processAt = now
		err = c.enqueue(ctx, msg, opt.uniqueTTL)
		state = base.TaskStatePending
	}
	switch {
	case errors.Is(err, errors.ErrDuplicateTask):
		return nil, fmt.Errorf("%w", ErrDuplicateTask)
	case errors.Is(err, errors.ErrTaskIdConflict):
		return nil, fmt.Errorf("%w", ErrTaskIDConflict)
	case err != nil:
		return nil, err
	}
	return newTaskInfo(msg, state, opt.processAt, nil), nil
}
我们不一行一行的介绍,直接看投递任务的逻辑:

...

怎样开发一个grpc拦截器

October 2, 2023
后端开发
Grpc

在开发grpc服务时,我们经常会遇到一些通用的需求,比如:日志、链路追踪、鉴权等。这些需求可以通过grpc拦截器来实现。本文使用go语言来实现一个 grpc一元模式(Unary)拦截器,上报链路追踪信息。

...

实现一个自定义的protoc插件

October 1, 2023
微服务
Go, Protobuf, Grpc

我们使用protobuf+grpc技术栈来开发微服务时,会要使用相关protoc插件来生成相关代码。有时可能会需要自定义一些插件,本文就来实现一个自定义的protoc插件。

新旧接口的说明 #

以前开发protoc插件时,需要实现generator接口(github.com/golang/protobuf/protoc-gen-go/generator),现在网上有不少稍老一些资料也是这样介绍的。但是实际上,这个接口已经被废弃了,现在要开发插件,应该使用的是"google.golang.org/protobuf/compiler/protogen"包。我们开发插件时,必须要开发一个如下签名的函数:

...

书籍推荐-20230903

September 3, 2023
其他
读书

今天想给大家推荐的一本书是《愿有人陪你颠沛流离》。算不上是大作,但是我觉得这本书还是值得一读的。至少,对于我来说,能 从中找到一些自己的影子,自己的青春。

后端面经系列-MVCC协议

August 21, 2023
后端开发, 数据库
Mysql, 数据库

什么是MVCC?为什么需要MVCC? #

MVCC即多版本并发控制,同一条记录可能存在多个版本,不同的事务可能读取到的是不同版本。 之所以使用MVCC,是为了提高并发读写性能。如果没有MVCC,就需要使用锁来保证并发安全,有一个会话在进行写操作,其它读操作都需要阻塞。而我们现实中的业务,大部分 都是读多写少的,所以使用锁的话会大大降低并发性能。

...

后端面经系列-事务提交了,你的数据就一定不会丢吗?

August 21, 2023
后端开发, 数据库
Mysql, 数据库, 事务

什么是undo log?为什么需要undo log? #

undo log,又称回滚日志。undo log有以下作用:

  • 事务回滚。为了保证事务的原子性,事务中的操作要么全部完成,要么什么也不做。如果事务执行到一半出现了错误,那么就需要回滚到事务开始之前的状态,我们就必须要记录事务开始执行时的状态, 准确的说是相关记录在那个时刻的数据版本。
  • mvcc。同一条记录的不同版本的undo log,通过roll pointer串联成了一个版本链。mvcc,即多版本并发控制,就是通过这个版本链来实现的。

什么是redo log?为什么需要redo log? #

又称 重做日志。 redo log主要用于崩溃恢复,保证事务的持久性。

...

使用RediSearch进行搜索

August 20, 2023
Redis
RediSearch, Redis

RediSearch是一个支持搜索功能的redis模块。本文对此模块进行了介绍,并用go语言实现了一个简单示例。

RediSearch提供了查询、二级索引、全文索引功能。你需要先在相关Redis条目上建立索引,然后才能使用RediSearch进行查询。

...