EBEasyBuild Docs
文档/后端/Disruptor

easyfk-disruptor Disruptor

Disruptor 高性能队列 — 无锁并发编程阅读时间 ~15 min

1. 模块概述

queue-disruptor 是 EasyFK 框架的高性能内存队列模块,基于 LMAX Disruptor 4.0 构建,提供了低延迟、高吞吐量的事件处理能力。该模块通过 Spring Boot 自动配置机制,支持声明式的队列定义和 Processor 自动绑定,开发者只需实现 Processor 接口即可完成队列的消费逻辑。

2. 模块依赖

build.gradle:

groovy
dependencies {
    api('com.lmax:disruptor:4.0.0')
}

3. 包结构

plaintext
com.mcst.easyfk.queue.disruptor
├── api/                          # 接口定义
│   ├── IIngestQueue<T>             - 摄入队列接口
│   ├── Processor<T>                - 数据处理器接口
│   └── IQueueLifecycle             - 队列生命周期接口
├── config/                       # 自动配置
│   ├── DisruptorQueueAutoConfiguration - 自动配置类
│   └── ShutdownManager             - 优雅关闭管理器
├── core/                         # 核心实现
│   ├── DisruptorIngestQueue<T>     - Disruptor 队列实现(含 Builder)
│   ├── IngestQueueManager          - 队列管理器(门面类)
│   ├── IngestQueueRegistry         - 队列注册表
│   ├── Event<T>                    - 事件对象
│   ├── EventFactoryImpl<T>         - 事件工厂
│   └── GenericWorkHandler<T>       - 通用工作处理器(分片)
├── enums/                        # 枚举
│   └── WaitStrategyType            - 等待策略类型
└── properties/                   # 配置属性
    └── DisruptorQueuesProperties   - 队列配置属性

4. 快速开始

4.1 实现 Processor

java
@Component("orderProcessor")
public class OrderProcessor implements Processor<OrderData> {

    @Override
    public void process(OrderData item) throws Exception {
        // 处理订单数据
        orderService.processOrder(item);
    }
}

4.2 发送数据到队列

java
@Service
public class OrderService {

    @Resource
    private IngestQueueManager queueManager;

    public void submitOrder(OrderData order) {
        // 阻塞模式发送
        queueManager.send("orderProcessor", order);

        // 非阻塞模式发送(推荐)
        boolean success = queueManager.trySend("orderProcessor", order, 100, TimeUnit.MILLISECONDS);
    }
}

说明: 模块会自动发现 Processor Bean 并创建同名队列,无需任何配置即可使用。

4.3 可选配置

yaml
easyfk:
  config:
    queue:
      disruptor:
        buffer-size: 65536          # 全局 RingBuffer 大小(2的幂)
        workers: 4                  # 全局工作线程数
        producer-type: SINGLE       # 全局生产者类型
        wait-strategy: BLOCKING     # 全局等待策略
        queues:
          orderProcessor:           # 队列名称(对应 Processor Bean 名称)
            buffer-size: 131072     # 覆盖全局配置
            workers: 8
            producer-type: MULTI
            wait-strategy: YIELDING
            processor-bean: orderProcessor  # 可选,显式绑定
            ingest:
              max-spin-nanos: 500000
            thread:
              name-prefix: order-worker-
              daemon: true
              priority: 5
            shutdown:
              await-ms: 60000

5. 核心接口

5.1 Processor\<T\> —— 数据处理器

java
public interface Processor<T> {
    void process(T item) throws Exception;
}

实现该接口定义队列的消费逻辑。每个 Processor Bean 会自动关联一个同名队列。

5.2 IIngestQueue\<T\> —— 摄入队列接口

`accept(T item)`阻塞式放入队列,队列满时自旋等待

5.3 IQueueLifecycle —— 生命周期接口

`start()`启动队列

6. 核心组件详解

6.1 IngestQueueManager —— 队列管理器(门面类)

提供统一的队列操作入口,推荐在业务代码中使用此类

`getQueue(name)``IIngestQueue&lt;T&gt;`获取队列实例
`trySend(name, item, timeout, unit)``boolean`非阻塞超时发送(推荐)
`sendBatch(name, items, timeout, unit)``int`批量发送,返回成功数量
`sendWithRetry(name, item, maxRetries, timeout, unit)``boolean`带重试的发送(递增退避)
`hasQueue(name)``boolean`检查队列是否存在
`getAllQueueNames()``Set&lt;String&gt;`获取所有队列名称
`getAllQueues()``Map&lt;String, IIngestQueue&lt;?&gt;&gt;`获取所有队列实例

在 Processor 中访问其他队列:

java
@Component("stepOneProcessor")
public class StepOneProcessor implements Processor<RawData> {

    @Resource
    private IngestQueueManager queueManager;

    @Override
    public void process(RawData item) throws Exception {
        ProcessedData result = doProcess(item);
        // 发送到下一个队列(管道模式)
        queueManager.trySend("stepTwoProcessor", result, 100, TimeUnit.MILLISECONDS);
    }
}

6.2 DisruptorIngestQueue —— 核心队列实现

基于 Disruptor RingBuffer 实现的高性能队列。

accept() 流程:

plaintext
accept(item)
    │
    ├─ null 检查 → null 直接返回
    │
    ├─ running 检查 → 已关闭抛 RejectedExecutionException
    │
    └─ 自旋循环
        ├─ 中断检查 → 已中断抛 RejectedExecutionException
        ├─ running 检查 → 已关闭抛 RejectedExecutionException
        ├─ 容量检查 → 有空间 → tryPublishEvent → 成功返回
        └─ 无空间 → LockSupport.parkNanos(maxSpinNanos) → 继续循环

tryPublishEvent() 流程:

plaintext
tryPublishEvent(item)
    │
    ├─ ringBuffer.next() → 获取序列号
    ├─ ringBuffer.get(seq) → 获取 Event 对象
    ├─ event.setPayload(item) → 设置负载
    ├─ ringBuffer.publish(seq) → 发布事件
    │
    └─ 异常处理
        └─ 发布空事件(payload=null)→ 保持序列号一致性

6.3 GenericWorkHandler —— 分片工作处理器

使用 Java record 实现,基于序列号取模的分片策略:

java
public void onEvent(Event<T> event, long sequence, boolean endOfBatch) {
    if ((sequence % shardTotal) != shardIndex) return;  // 不属于当前分片,跳过
    processor.process(event.getPayload());
    event.setPayload(null);  // 清空负载,允许 GC
}

分片示例(4个 Worker):

0处理跳过跳过跳过
2跳过跳过处理跳过
3跳过跳过跳过处理

6.4 ShutdownManager —— 优雅关闭

实现 DisposableBean,在 Spring 容器销毁时自动关闭所有队列:

plaintext
Spring 容器关闭
    │
    └─ ShutdownManager.destroy()
        │
        └─ 遍历所有注册的队列
            ├─ 获取该队列的 awaitMs 配置(默认 60000ms)
            └─ 调用 IQueueLifecycle.shutdown(awaitMs)
                ├─ CAS 设置 running=false(幂等)
                ├─ 等待进行中的 accept 完成
                └─ disruptor.shutdown(awaitMs, MILLISECONDS)

7. 等待策略

阻塞等待`BLOCKING`较高默认策略,适合大多数场景
忙等待`BUSY_SPIN`极高(100%)最低对延迟极其敏感的场景

8. 配置属性详解

8.1 全局配置

`buffer-size`Integer65536RingBuffer 大小(自动对齐到 2 的幂)
`producer-type`String`SINGLE`生产者类型:`SINGLE` / `MULTI`
`wait-strategy`String`BLOCKING`等待策略:`BLOCKING` / `YIELDING` / `BUSY_SPIN`

8.2 队列级配置(覆盖全局)

`buffer-size`Integer全局值RingBuffer 大小
`producer-type`String全局值生产者类型
`wait-strategy`String全局值等待策略
`processor-bean`String队列名称显式绑定 Processor Bean
`ingest.max-spin-nanos`long500000生产者自旋等待纳秒数
`thread.name-prefix`StringProcessor Bean 名称工作线程名称前缀
`thread.daemon`booleantrue是否为守护线程
`thread.priority`int5线程优先级(1-10)
`shutdown.await-ms`long60000关闭等待时间(毫秒)

配置优先级: 队列特定配置 > 全局配置 > 代码默认值

9. 自动配置机制

9.1 Processor 自动发现

模块启动时自动扫描所有 Processor 类型的 Bean,并为每个 Processor 创建对应的队列:

plaintext
启动流程
    │
    ├─ 1. 扫描配置文件中显式定义的队列(优先级最高)
    │     └─ 查找对应 Processor Bean → 创建并注册队列
    │
    └─ 2. 扫描未被绑定的 Processor Bean
          └─ 使用全局默认配置 → 自动创建并注册队列

9.2 Processor 查找规则

1. 如果配置了 processor-bean,直接按名称查找

2. 如果未配置,尝试用队列名称匹配 Processor Bean 名称

3. 如果容器中只有一个 Processor,直接使用

4. 以上都不匹配时抛出异常

10. 使用模式

10.1 单队列模式

java
@Component("logProcessor")
public class LogProcessor implements Processor<LogEntry> {
    @Override
    public void process(LogEntry item) {
        // 写入日志存储
    }
}

10.2 管道模式(多队列串联)

java
@Component("parseProcessor")
public class ParseProcessor implements Processor<RawData> {
    @Resource
    private IngestQueueManager queueManager;

    @Override
    public void process(RawData item) {
        ParsedData parsed = parse(item);
        queueManager.trySend("enrichProcessor", parsed, 100, TimeUnit.MILLISECONDS);
    }
}

@Component("enrichProcessor")
public class EnrichProcessor implements Processor<ParsedData> {
    @Override
    public void process(ParsedData item) {
        // 数据增强处理
    }
}

10.3 带重试的发送

java
boolean success = queueManager.sendWithRetry("targetQueue", data, 3, 50, TimeUnit.MILLISECONDS);

10.4 批量发送

java
List<OrderData> orders = getOrders();
int sent = queueManager.sendBatch("orderProcessor", orders, 100, TimeUnit.MILLISECONDS);

easyfk-disruptor — 高性能无锁队列,实现极致并发性能。

— END —