Web即时通讯的几种方式
October 21, 2023
前言 #
在web开发中,我们经常会遇到这样的场景:服务端某个操作完成,或者发生了某种变化时,需要实时通知到客户端/浏览器,我们就称其为即时通讯。例如,client提交了一个导出任务,服务端执行异步任务进行处理,在导出完成时,告知客户端,客户端再下载导出结果。又比如,常见webim通讯场景,客户端发送消息,服务端接收到消息后,通知到接收方。常用的即时通讯方式有以下几种:
...在web开发中,我们经常会遇到这样的场景:服务端某个操作完成,或者发生了某种变化时,需要实时通知到客户端/浏览器,我们就称其为即时通讯。例如,client提交了一个导出任务,服务端执行异步任务进行处理,在导出完成时,告知客户端,客户端再下载导出结果。又比如,常见webim通讯场景,客户端发送消息,服务端接收到消息后,通知到接收方。常用的即时通讯方式有以下几种:
...asynq是一款使用Go语言开发的开源任务队列库,它的特点是:
我们先从官方提供的Client示例的入口来看一下asynq的任务存储是怎么实现的。Client
package main
import (
"log"
"time"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
// ------------------------------------------------------
// Example 1: Enqueue task to be processed immediately.
// Use (*Client).Enqueue method.
// ------------------------------------------------------
task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// ------------------------------------------------------------
// Example 2: Schedule task to be processed in the future.
// Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------
info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatalf("could not schedule task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// ----------------------------------------------------------------------------
// Example 3: Set other options to tune task processing behavior.
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
// ----------------------------------------------------------------------------
task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
EnqueueContext
// EnqueueContext enqueues the given task to a queue.
// EnqueueContent向队列中投递任务
//
// EnqueueContext returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
// 如果任务成功投递,返回TaskInfo和nil,否则返回非nil的error
//
// The argument opts specifies the behavior of task processing.
// 参数opts指定了任务处理的行为
// If there are conflicting Option values the last one overrides others.
// 如果有冲突的Option值,最后一个会覆盖前面的
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// 任何传递给NewTask的选项都可以被传递给Enqueue的选项覆盖
// By default, max retry is set to 25 and timeout is set to 30 minutes.
// 默认情况下,最大重试次数为25,超时时间为30分钟
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
// 如果没有提供ProcessAt或ProcessIn选项,任务将立即处于pending状态
//
// The first argument context applies to the enqueue operation. To specify task timeout and deadline, use Timeout and Deadline option instead.
// 第一个参数context应用于enqueue操作。要指定任务的超时时间和截止时间,请使用Timeout和Deadline选项
func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error) {
if task == nil {
return nil, fmt.Errorf("task cannot be nil")
}
if strings.TrimSpace(task.Type()) == "" {
return nil, fmt.Errorf("task typename cannot be empty")
}
// merge task options with the options provided at enqueue time.
// 将任务选项与enqueue时提供的选项合并
opts = append(task.opts, opts...)
opt, err := composeOptions(opts...)
if err != nil {
return nil, err
}
deadline := noDeadline
if !opt.deadline.IsZero() {
deadline = opt.deadline
}
timeout := noTimeout
if opt.timeout != 0 {
timeout = opt.timeout
}
if deadline.Equal(noDeadline) && timeout == noTimeout {
// If neither deadline nor timeout are set, use default timeout.
timeout = defaultTimeout
}
var uniqueKey string
if opt.uniqueTTL > 0 {
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
}
msg := &base.TaskMessage{
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()),
UniqueKey: uniqueKey,
GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()),
}
now := time.Now()
var state base.TaskState
if opt.processAt.After(now) {
err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
state = base.TaskStateScheduled
} else if opt.group != "" {
// Use zero value for processAt since we don't know when the task will be aggregated and processed.
// 对于processAt使用零值,因为我们不知道任务何时被聚合和处理
opt.processAt = time.Time{}
err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
state = base.TaskStateAggregating
} else {// 普通任务
opt.processAt = now
err = c.enqueue(ctx, msg, opt.uniqueTTL)
state = base.TaskStatePending
}
switch {
case errors.Is(err, errors.ErrDuplicateTask):
return nil, fmt.Errorf("%w", ErrDuplicateTask)
case errors.Is(err, errors.ErrTaskIdConflict):
return nil, fmt.Errorf("%w", ErrTaskIDConflict)
case err != nil:
return nil, err
}
return newTaskInfo(msg, state, opt.processAt, nil), nil
}
在开发grpc服务时,我们经常会遇到一些通用的需求,比如:日志、链路追踪、鉴权等。这些需求可以通过grpc拦截器来实现。本文使用go语言来实现一个 grpc一元模式(Unary)拦截器,上报链路追踪信息。
...我们使用protobuf+grpc技术栈来开发微服务时,会要使用相关protoc插件来生成相关代码。有时可能会需要自定义一些插件,本文就来实现一个自定义的protoc插件。
以前开发protoc插件时,需要实现generator接口(github.com/golang/protobuf/protoc-gen-go/generator),现在网上有不少稍老一些资料也是这样介绍的。但是实际上,这个接口已经被废弃了,现在要开发插件,应该使用的是"google.golang.org/protobuf/compiler/protogen"包。我们开发插件时,必须要开发一个如下签名的函数:
...我司业务是toB的,所以我们的分表主要有两种形式:按企业id分表、按时间分表。我们的做法是:让查询在单表中进行,避免跨表分页。
如果是按企业id分表的,我们不会支持跨企业查询,也没有这个需求。
...今天想给大家推荐的一本书是《愿有人陪你颠沛流离》。算不上是大作,但是我觉得这本书还是值得一读的。至少,对于我来说,能 从中找到一些自己的影子,自己的青春。
MVCC即多版本并发控制,同一条记录可能存在多个版本,不同的事务可能读取到的是不同版本。 之所以使用MVCC,是为了提高并发读写性能。如果没有MVCC,就需要使用锁来保证并发安全,有一个会话在进行写操作,其它读操作都需要阻塞。而我们现实中的业务,大部分 都是读多写少的,所以使用锁的话会大大降低并发性能。
...undo log,又称回滚日志。undo log有以下作用:
又称 重做日志。 redo log主要用于崩溃恢复,保证事务的持久性。
...如无特殊说明,本文默认mysql使用的存储引擎是InnoDB。
行锁,是指锁的粒度是行级别的,其锁定的记录是某个记录或者间隙(也可能是多个)。表锁是指锁的粒度是表级别的。
...RediSearch是一个支持搜索功能的redis模块。本文对此模块进行了介绍,并用go语言实现了一个简单示例。
RediSearch提供了查询、二级索引、全文索引功能。你需要先在相关Redis条目上建立索引,然后才能使用RediSearch进行查询。
...