在实际应用中,延迟队列可以用于处理各种需要延迟处理的任务,例如发送邮件提醒、订单自动取消、对超时任务的处理等。由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。
二.Simple
在 Go 语言中,我们可以使用 time 包提供的计时器功能,通过使用 Go 中的 slice 存储延迟处理的任务,实现一个简单的延迟队列的功能。type Task struct { ExecuteTime time.Time Job func() }首先,我们定义一个结构体 Task,它包含一个可以执行任务的函数 Job,和一个执行时间 ExecuteTime,这是期望执行该函数的时间。
type DelayQueue struct { TaskQueue []Task }接下来,我们定义一个 DelayQueue 结构体,它拥有一个 TaskQueue,这是一个 Task 类型的切片,用于保存待执行任务的列表。
// 堆代码 duidaima.com // 添加任务 func (d *DelayQueue) AddTask(t Task) { d.TaskQueue = append(d.TaskQueue, t) } // 移除任务 func (d *DelayQueue) RemoveTask() { d.TaskQueue = d.TaskQueue[1:] } // 执行任务 func (d *DelayQueue) ExecuteTasks() { for len(d.TaskQueue) > 0 { // 获取队列最顶部的任务 currentTask := d.TaskQueue[0] // 如果执行时间还没到,等待 if time.Now().Before(currentTask.ExecuteTime) { time.Sleep(currentTask.ExecuteTime.Sub(time.Now())) } // 执行任务 currentTask.Job() // 移除已执行的任务 d.RemoveTask() } }DelayQueue 包含三个方法:
func main() { fmt.Println("Start DelayQueue") queue := DelayQueue{} firstTask := Task{ ExecuteTime: time.Now().Add(4 * time.Second), Job: func() { fmt.Println("Executed task 1 after delay") }, } queue.AddTask(firstTask) secondTask := Task{ ExecuteTime: time.Now().Add(10 * time.Second), Job: func() { fmt.Println("Executed task 2 after delay") }, } queue.AddTask(secondTask) queue.ExecuteTasks() fmt.Println("Done!") }输出结果:
Start DelayQueue Executed task 1 after delay Executed task 2 after delay Done!在示例代码中,我们创建了一个延时队列,将任务添加到队列中,并在指定的延时后执行它们。通过使用这些结构体和方法,我们可以在 Go 中实现简单的延迟执行任务的功能。但是,当 Go 程序重启时,存储在 slice 中的延迟处理的任务将全部丢失。
// 定义一个全局的redisdb变量 var redisdb *redis.Client // 初始化连接 func initClient() (err error) { redisdb = redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) _, err = redisdb.Ping().Result() if err != nil { return err } return nil }全局变量 redisdb 是 redis.Client 类型的指针,用来保存到 Redis 客户端的引用。initClient 函数初始化连接到 Redis 服务器,该服务器在本地主机的 6379 端口运行。它将一个新的 Redis 客户端分配给 redisdb 变量。如果连接成功,它就会 ping Redis 服务器以测试连接。
// 向队列中添加任务 func addTaskToQueue(task string, executeTime int64) { err := redisdb.ZAdd("delay-queue", redis.Z{ Score: float64(executeTime), Member: task, }).Err() if err != nil { panic(err) } }addTaskToQueue 函数将具有执行时间的任务添加到 Redis 等待排序的集合 "delay-queue"。执行时间是一个 UNIX 时间戳,作为排序集合中的项目的 score,允许 Redis 按照他们应该执行的时间来排序项目。
// 从队列中获取并处理任务 func getAndExecuteTasks() { for { // 使用 ZRANGEBYSCORE 命令获取分数(时间戳)<= 当前时间的任务 tasks, err := redisdb.ZRangeByScore("delay-queue", redis.ZRangeBy{ Min: "-inf", Max: fmt.Sprintf("%d", time.Now().Unix()), }).Result() if err != nil { time.Sleep(1 * time.Second) continue } // 处理任务 for _, task := range tasks { fmt.Println("Executing task: ", task) // 执行完任务后,用 ZREM 移除该任务 redisdb.ZRem("delay-queue", task) } // 暂停一秒 time.Sleep(1 * time.Second) } }getAndExecuteTasks 函数不断检查 "delay-queue"。它提取队列中 score 小于或等于当前时间戳的任务,意味着这些任务现在应该执行或者他们应该在过去就已经执行。获取任务后,它打印任务(模拟执行)并从队列中删除任务。
func main() { err := initClient() if err != nil { fmt.Println("redis connect error:", err) return } // 添加一些测试任务 addTaskToQueue("task1", time.Now().Add(10*time.Second).Unix()) addTaskToQueue("task2", time.Now().Add(20*time.Second).Unix()) // 执行延迟队列中的任务 getAndExecuteTasks() }输出结果:
Executing task: task1 Executing task: task2main 函数调用这些函数。首先,它初始化 Redis 客户端。如果初始化和连接成功,它将一些测试任务添加到队列中,并启动任务执行循环。
网络延迟:如果 Go 程序和 Redis 服务器不在同一台机器上,网络延迟可能会影响延迟的准确性。