本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net
效率工具
- 推荐一个程序员的常用工具网站,效率加倍嘎嘎好用:程序员常用工具
云服务器
- 云服务器限时免费领:轻量服务器 2 核 4G
- 腾讯云:2 核 2G4M 云服务器新老同享 99 元 / 年,续费同价
- 阿里云:2 核 2G3M 的 ECS 服务器只需 99 元 / 年,续费同价
在分布式系统中,延时队列是一种常见的需求,它允许我们将任务延迟一段时间后再执行。常见的应用场景包括订单超时处理、短信发送延迟、缓存失效处理等。本文将介绍如何在 Spring Boot 项目中,结合 Redis 和 Lua 脚本实现一个高效的延时队列。
一、项目准备
1.1 引入依赖
在 Spring Boot 项目中,我们需要引入以下依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
1.2 配置 Redis
在application.yml中配置 Redis 连接信息:
spring:
redis:
host: localhost
port: 6379
password:
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
1.3 创建 Redis 配置类
创建一个配置类来配置 RedisTemplate:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
return template;
}
}
二、实现延时队列
延时队列的核心思想是使用 Redis 的有序集合(Sorted Set)来存储任务,每个任务关联一个延时时间。当时间到达时,通过 Lua 脚本将任务从有序集合中移到处理队列中。
2.1 创建任务发布接口
首先,我们创建一个接口来发布延时任务:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Instant;
@Service
public class DelayQueueService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String DELAY_QUEUE_KEY = "delay_queue";
public void addTask(String taskId, long delayInSeconds) {
long score = Instant.now().getEpochSecond() + delayInSeconds;
redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskId, score);
}
}
2.2 Lua 脚本处理任务
Lua 脚本用于从有序集合中取出到期的任务,并将其移到处理队列中:
local delayQueueKey = KEYS[1]
local readyQueueKey = KEYS[2]
local currentTime = tonumber(ARGV[1])
local tasks = redis.call('ZRANGEBYSCORE', delayQueueKey, 0, currentTime)
if next(tasks) ~= nil then
for _, task in ipairs(tasks) do
redis.call('ZREM', delayQueueKey, task)
redis.call('LPUSH', readyQueueKey, task)
end
end
return tasks
2.3 创建任务处理接口
我们创建一个接口来处理到期的任务:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.time.Instant;
import java.util.List;
@Service
public class TaskProcessor {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private JedisPool jedisPool;
private static final String DELAY_QUEUE_KEY = "delay_queue";
private static final String READY_QUEUE_KEY = "ready_queue";
private static final String LUA_SCRIPT = "local delayQueueKey = KEYS[1] " +
"local readyQueueKey = KEYS[2] " +
"local currentTime = tonumber(ARGV[1]) " +
"local tasks = redis.call('ZRANGEBYSCORE', delayQueueKey, 0, currentTime) " +
"if next(tasks) ~= nil then " +
" for _, task in ipairs(tasks) do " +
" redis.call('ZREM', delayQueueKey, task) " +
" redis.call('LPUSH', readyQueueKey, task) " +
" end " +
"end " +
"return tasks ";
@Scheduled(fixedRate = 1000)
public void processTasks() {
try (Jedis jedis = jedisPool.getResource()) {
List<String> tasks = (List<String>) jedis.eval(LUA_SCRIPT, 2, DELAY_QUEUE_KEY, READY_QUEUE_KEY, String.valueOf(Instant.now().getEpochSecond()));
for (String task : tasks) {
// 处理任务
System.out.println("Processing task: " + task);
}
}
}
}
在这个实现中,processTasks 方法每秒执行一次,通过 Lua 脚本检查并处理到期的任务。
三、测试与验证
3.1 添加测试任务
在控制器中添加接口来发布延时任务:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DelayQueueController {
@Autowired
private DelayQueueService delayQueueService;
@PostMapping("/addTask")
public String addTask(@RequestParam String taskId, @RequestParam long delayInSeconds) {
delayQueueService.addTask(taskId, delayInSeconds);
return "Task added";
}
}
3.2 验证任务处理
启动 Spring Boot 应用程序,通过 HTTP 请求添加任务:
curl -X POST "http://localhost:8080/addTask?taskId=task1&delayInSeconds=10"
curl -X POST "http://localhost:8080/addTask?taskId=task2&delayInSeconds=20"
检查控制台输出,确认任务在指定的延迟时间后被正确处理:
Processing task: task1
Processing task: task2
四、总结
通过本文,我们学习了如何在 Spring Boot 项目中使用 Redis 和 Lua 脚本实现延时队列。通过 Redis 的有序集合存储任务和 Lua 脚本处理到期任务,可以实现高效的延时任务处理机制。结合 Spring Boot 的定时任务调度,能够方便地实现任务的定期检查和处理。这种方法不仅简单高效,还能很好地扩展和维护,是实现延时队列的一个优秀方案。