queue-disruptor 是 EasyFK 框架的高性能内存队列模块,基于 LMAX Disruptor 4.0 构建,提供了低延迟、高吞吐量的事件处理能力。该模块通过 Spring Boot 自动配置机制,支持声明式的队列定义和 Processor 自动绑定,开发者只需实现 Processor 接口即可完成队列的消费逻辑。
build.gradle:
dependencies {
api('com.lmax:disruptor:4.0.0')
}com.mcst.easyfk.queue.disruptor
├── api/ # 接口定义
│ ├── IIngestQueue<T> - 摄入队列接口
│ ├── Processor<T> - 数据处理器接口
│ └── IQueueLifecycle - 队列生命周期接口
├── config/ # 自动配置
│ ├── DisruptorQueueAutoConfiguration - 自动配置类
│ └── ShutdownManager - 优雅关闭管理器
├── core/ # 核心实现
│ ├── DisruptorIngestQueue<T> - Disruptor 队列实现(含 Builder)
│ ├── IngestQueueManager - 队列管理器(门面类)
│ ├── IngestQueueRegistry - 队列注册表
│ ├── Event<T> - 事件对象
│ ├── EventFactoryImpl<T> - 事件工厂
│ └── GenericWorkHandler<T> - 通用工作处理器(分片)
├── enums/ # 枚举
│ └── WaitStrategyType - 等待策略类型
└── properties/ # 配置属性
└── DisruptorQueuesProperties - 队列配置属性@Component("orderProcessor")
public class OrderProcessor implements Processor<OrderData> {
@Override
public void process(OrderData item) throws Exception {
// 处理订单数据
orderService.processOrder(item);
}
}@Service
public class OrderService {
@Resource
private IngestQueueManager queueManager;
public void submitOrder(OrderData order) {
// 阻塞模式发送
queueManager.send("orderProcessor", order);
// 非阻塞模式发送(推荐)
boolean success = queueManager.trySend("orderProcessor", order, 100, TimeUnit.MILLISECONDS);
}
}说明: 模块会自动发现 Processor Bean 并创建同名队列,无需任何配置即可使用。
easyfk:
config:
queue:
disruptor:
buffer-size: 65536 # 全局 RingBuffer 大小(2的幂)
workers: 4 # 全局工作线程数
producer-type: SINGLE # 全局生产者类型
wait-strategy: BLOCKING # 全局等待策略
queues:
orderProcessor: # 队列名称(对应 Processor Bean 名称)
buffer-size: 131072 # 覆盖全局配置
workers: 8
producer-type: MULTI
wait-strategy: YIELDING
processor-bean: orderProcessor # 可选,显式绑定
ingest:
max-spin-nanos: 500000
thread:
name-prefix: order-worker-
daemon: true
priority: 5
shutdown:
await-ms: 60000public interface Processor<T> {
void process(T item) throws Exception;
}实现该接口定义队列的消费逻辑。每个 Processor Bean 会自动关联一个同名队列。
| `accept(T item)` | 阻塞式放入队列,队列满时自旋等待 |
|---|
| `start()` | 启动队列 |
|---|
提供统一的队列操作入口,推荐在业务代码中使用此类。
| `getQueue(name)` | `IIngestQueue<T>` | 获取队列实例 |
|---|---|---|
| `trySend(name, item, timeout, unit)` | `boolean` | 非阻塞超时发送(推荐) |
| `sendBatch(name, items, timeout, unit)` | `int` | 批量发送,返回成功数量 |
| `sendWithRetry(name, item, maxRetries, timeout, unit)` | `boolean` | 带重试的发送(递增退避) |
| `hasQueue(name)` | `boolean` | 检查队列是否存在 |
| `getAllQueueNames()` | `Set<String>` | 获取所有队列名称 |
| `getAllQueues()` | `Map<String, IIngestQueue<?>>` | 获取所有队列实例 |
在 Processor 中访问其他队列:
@Component("stepOneProcessor")
public class StepOneProcessor implements Processor<RawData> {
@Resource
private IngestQueueManager queueManager;
@Override
public void process(RawData item) throws Exception {
ProcessedData result = doProcess(item);
// 发送到下一个队列(管道模式)
queueManager.trySend("stepTwoProcessor", result, 100, TimeUnit.MILLISECONDS);
}
}基于 Disruptor RingBuffer 实现的高性能队列。
accept() 流程:
accept(item)
│
├─ null 检查 → null 直接返回
│
├─ running 检查 → 已关闭抛 RejectedExecutionException
│
└─ 自旋循环
├─ 中断检查 → 已中断抛 RejectedExecutionException
├─ running 检查 → 已关闭抛 RejectedExecutionException
├─ 容量检查 → 有空间 → tryPublishEvent → 成功返回
└─ 无空间 → LockSupport.parkNanos(maxSpinNanos) → 继续循环tryPublishEvent() 流程:
tryPublishEvent(item)
│
├─ ringBuffer.next() → 获取序列号
├─ ringBuffer.get(seq) → 获取 Event 对象
├─ event.setPayload(item) → 设置负载
├─ ringBuffer.publish(seq) → 发布事件
│
└─ 异常处理
└─ 发布空事件(payload=null)→ 保持序列号一致性使用 Java record 实现,基于序列号取模的分片策略:
public void onEvent(Event<T> event, long sequence, boolean endOfBatch) {
if ((sequence % shardTotal) != shardIndex) return; // 不属于当前分片,跳过
processor.process(event.getPayload());
event.setPayload(null); // 清空负载,允许 GC
}分片示例(4个 Worker):
| 0 | 处理 | 跳过 | 跳过 | 跳过 |
|---|---|---|---|---|
| 2 | 跳过 | 跳过 | 处理 | 跳过 |
| 3 | 跳过 | 跳过 | 跳过 | 处理 |
实现 DisposableBean,在 Spring 容器销毁时自动关闭所有队列:
Spring 容器关闭
│
└─ ShutdownManager.destroy()
│
└─ 遍历所有注册的队列
├─ 获取该队列的 awaitMs 配置(默认 60000ms)
└─ 调用 IQueueLifecycle.shutdown(awaitMs)
├─ CAS 设置 running=false(幂等)
├─ 等待进行中的 accept 完成
└─ disruptor.shutdown(awaitMs, MILLISECONDS)| 阻塞等待 | `BLOCKING` | 低 | 较高 | 默认策略,适合大多数场景 |
|---|---|---|---|---|
| 忙等待 | `BUSY_SPIN` | 极高(100%) | 最低 | 对延迟极其敏感的场景 |
| `buffer-size` | Integer | 65536 | RingBuffer 大小(自动对齐到 2 的幂) |
|---|---|---|---|
| `producer-type` | String | `SINGLE` | 生产者类型:`SINGLE` / `MULTI` |
| `wait-strategy` | String | `BLOCKING` | 等待策略:`BLOCKING` / `YIELDING` / `BUSY_SPIN` |
| `buffer-size` | Integer | 全局值 | RingBuffer 大小 |
|---|---|---|---|
| `producer-type` | String | 全局值 | 生产者类型 |
| `wait-strategy` | String | 全局值 | 等待策略 |
| `processor-bean` | String | 队列名称 | 显式绑定 Processor Bean |
| `ingest.max-spin-nanos` | long | 500000 | 生产者自旋等待纳秒数 |
| `thread.name-prefix` | String | Processor Bean 名称 | 工作线程名称前缀 |
| `thread.daemon` | boolean | true | 是否为守护线程 |
| `thread.priority` | int | 5 | 线程优先级(1-10) |
| `shutdown.await-ms` | long | 60000 | 关闭等待时间(毫秒) |
配置优先级: 队列特定配置 > 全局配置 > 代码默认值
模块启动时自动扫描所有 Processor 类型的 Bean,并为每个 Processor 创建对应的队列:
启动流程
│
├─ 1. 扫描配置文件中显式定义的队列(优先级最高)
│ └─ 查找对应 Processor Bean → 创建并注册队列
│
└─ 2. 扫描未被绑定的 Processor Bean
└─ 使用全局默认配置 → 自动创建并注册队列1. 如果配置了 processor-bean,直接按名称查找
2. 如果未配置,尝试用队列名称匹配 Processor Bean 名称
3. 如果容器中只有一个 Processor,直接使用
4. 以上都不匹配时抛出异常
@Component("logProcessor")
public class LogProcessor implements Processor<LogEntry> {
@Override
public void process(LogEntry item) {
// 写入日志存储
}
}@Component("parseProcessor")
public class ParseProcessor implements Processor<RawData> {
@Resource
private IngestQueueManager queueManager;
@Override
public void process(RawData item) {
ParsedData parsed = parse(item);
queueManager.trySend("enrichProcessor", parsed, 100, TimeUnit.MILLISECONDS);
}
}
@Component("enrichProcessor")
public class EnrichProcessor implements Processor<ParsedData> {
@Override
public void process(ParsedData item) {
// 数据增强处理
}
}boolean success = queueManager.sendWithRetry("targetQueue", data, 3, 50, TimeUnit.MILLISECONDS);List<OrderData> orders = getOrders();
int sent = queueManager.sendBatch("orderProcessor", orders, 100, TimeUnit.MILLISECONDS);easyfk-disruptor — 高性能无锁队列,实现极致并发性能。