本手册面向使用 EasyFK 框架的开发人员,详细介绍 chronicle-queue 组件的接入方式、配置说明、API 用法、最佳实践及常见问题。
Gradle 方式(在业务模块的 build.gradle 中添加):
dependencies {
implementation("com.mcst:chronicle-queue")
}Maven 方式(在业务模块的 pom.xml 中添加):
<dependency>
<groupId>com.mcst</groupId>
<artifactId>chronicle-queue</artifactId>
</dependency>com.mcst:easyfk-dependencies)统一管理,无需手动指定版本号。如果项目未引入 BOM,需在 Maven 中先声明:<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.mcst</groupId>
<artifactId>easyfk-dependencies</artifactId>
<version>${easyfk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>零配置即可使用。组件提供了合理的默认值:
| 配置项 | 默认值 | 说明 |
|---|---|---|
| 存储路径 | ./chronicle-queues | 队列文件存储根目录 |
| 滚动周期 | FAST_DAILY | 每天滚动生成新文件 |
| 块大小 | 0(使用 Chronicle 默认) | 内存映射文件块大小 |
如果默认值满足需求,不需要任何 YAML 配置,直接注入 ChronicleQueueTemplate 即可开始使用。
import com.easyfk.chronicle.queue.template.ChronicleQueueTemplate;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
@Service
public class MyService {
@Resource
private ChronicleQueueTemplate chronicleQueueTemplate;
public void demo() {
long index = chronicleQueueTemplate.writeText("my-queue", "Hello Chronicle Queue!");
String message = chronicleQueueTemplate.readText("my-queue", "my-consumer");
System.out.println("写入索引: " + index);
System.out.println("读取到: " + message);
}
}my-queue 不需要预先创建,首次使用时会自动创建并初始化。easyfk:
config:
storage:
queue:
default-path: ./chronicle-queues
default-roll-cycle: FAST_DAILY
default-block-size: 0
create-directories-on-startup: trueeasyfk:
config:
storage:
queue:
default-path: /data/chronicle-queues
default-roll-cycle: FAST_DAILY
queues:
- queue-name: order-events
path: /data/chronicle-queues/orders
roll-cycle: FAST_HOURLY
block-size: 67108864
- queue-name: log-buffer
roll-cycle: FIVE_MINUTELY
- queue-name: metrics
roll-cycle: TEN_MINUTELY字段说明:
| 字段 | 必填 | 说明 |
|---|---|---|
| queue-name | 是 | 队列唯一标识,用于后续 API 调用 |
| path | 否 | 自定义存储路径,不配置则为 defaultPath/queueName |
| roll-cycle | 否 | 覆盖全局默认滚动周期 |
| block-size | 否 | 覆盖全局默认块大小,0 表示使用全局默认 |
队列定义中的 path / rollCycle / blockSize
↓ 未配置则使用
全局默认 defaultPath / defaultRollCycle / defaultBlockSize
↓ 未配置则使用
内置默认值 (./chronicle-queues / FAST_DAILY / Chronicle 内部默认)| 滚动周期 | 切割频率 | 适用场景 |
|---|---|---|
| FAST_DAILY | 每天 | 默认推荐,适合大多数场景 |
| FAST_HOURLY | 每小时 | 高吞吐场景,便于按小时归档清理 |
| FIVE_MINUTELY | 每 5 分钟 | 超高吞吐,需要更细粒度文件切割 |
| TEN_MINUTELY | 每 10 分钟 | 高吞吐场景 |
| TWENTY_MINUTELY | 每 20 分钟 | 中高吞吐场景 |
| HALF_HOURLY | 每 30 分钟 | 中等吞吐场景 |
| WEEKLY | 每周 | 低吞吐场景,减少文件数量 |
FAST_DAILY 即可。组件提供两个核心类,推荐优先使用 ChronicleQueueTemplate:
@Resource
private ChronicleQueueTemplate chronicleQueueTemplate;| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| writeText(queueName, text) | 队列名, 文本 | long(索引) | 写入纯文本消息 |
| writeDocument(queueName, writer) | 队列名, Consumer<WireOut> | long(索引) | 写入结构化数据 |
| writeKeyValue(queueName, key, value) | 队列名, 键, 值 | long(索引) | 写入键值对 |
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| readText(queueName, tailerId) | 队列名, 消费者ID | String 或 null | 读取下一条文本(RESUME 策略) |
| readText(queueName, tailerId, strategy) | 队列名, 消费者ID, 策略 | String 或 null | 指定策略读取文本 |
| readDocument(queueName, tailerId, reader) | 队列名, 消费者ID, 读取函数 | T 或 null | 读取结构化数据(RESUME 策略) |
| readDocument(queueName, tailerId, strategy, reader) | 队列名, 消费者ID, 策略, 读取函数 | T 或 null | 指定策略读取结构化数据 |
| readTextBatch(queueName, tailerId, maxCount) | 队列名, 消费者ID, 最大条数 | List<String> | 批量读取文本(RESUME 策略) |
| readTextBatch(queueName, tailerId, maxCount, strategy) | 队列名, 消费者ID, 最大条数, 策略 | List<String> | 指定策略批量读取 |
| readDocumentBatch(queueName, tailerId, maxCount, reader) | 队列名, 消费者ID, 最大条数, 读取函数 | List<T> | 批量读取结构化数据 |
| readDocumentBatch(queueName, tailerId, maxCount, strategy, reader) | 全部参数 | List<T> | 指定策略批量读取结构化数据 |
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| peekLastText(queueName) | 队列名 | String 或 null | 查看最后一条消息 |
| peekFirstText(queueName) | 队列名 | String 或 null | 查看第一条消息 |
| peekTextAtIndex(queueName, index) | 队列名, 索引 | String 或 null | 查看指定索引消息 |
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| toEnd(queueName, tailerId) | 队列名, 消费者ID | void | 跳到队列末尾 |
| toStart(queueName, tailerId) | 队列名, 消费者ID | void | 跳到队列开头 |
| moveToIndex(queueName, tailerId, index) | 队列名, 消费者ID, 索引 | boolean | 移动到指定索引 |
| getTailerIndex(queueName, tailerId) | 队列名, 消费者ID | long | 获取当前位置索引 |
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| queueExists(queueName) | 队列名 | boolean | 检查队列是否存在 |
| getAllQueueNames() | 无 | Set<String> | 获取所有队列名称 |
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
| getAppender(queueName) | 队列名 | ExcerptAppender | 获取原生写入器 |
| createTailer(queueName) | 队列名 | ExcerptTailer | 创建匿名读取器 |
| createTailer(queueName, tailerId) | 队列名, 消费者ID | ExcerptTailer | 创建命名读取器 |
| getQueue(queueName) | 队列名 | ChronicleQueue | 获取底层队列实例 |
@Resource
private ChronicleQueueManager chronicleQueueManager;| 方法 | 说明 |
|---|---|
| getOrCreateQueue(queueName) | 获取或创建队列(默认配置) |
| getOrCreateQueue(queueName, path, rollCycle, blockSize) | 获取或创建队列(自定义配置) |
| registerQueue(queueName, queue) | 注册外部创建的队列实例 |
| getQueue(queueName) | 获取已有队列,不存在返回 null |
| removeQueue(queueName) | 移除并关闭队列 |
| containsQueue(queueName) | 检查队列是否存在 |
| getQueueNames() | 获取所有队列名称 |
| getAppender(queueName) | 获取写入器 |
| createTailer(queueName) | 创建匿名读取器 |
| createTailer(queueName, tailerId) | 创建命名读取器 |
| closeAll() | 关闭所有队列 |
TailerStartStrategy 枚举控制命名 Tailer 的起始读取位置,概念类似于 Kafka 的 auto.offset.reset:
String msg = template.readText("my-queue", "replay-consumer", TailerStartStrategy.START);使用场景:数据重放、全量统计、测试验证。
String msg = template.readText("my-queue", "realtime-consumer", TailerStartStrategy.END);使用场景:实时监控面板、只关注增量数据。
String msg = template.readText("my-queue", "normal-consumer");
// 等价于
String msg = template.readText("my-queue", "normal-consumer", TailerStartStrategy.RESUME);使用场景:常规业务消费,确保不丢消息,类似 Kafka auto.offset.reset=earliest。
String msg = template.readText("my-queue", "new-consumer", TailerStartStrategy.RESUME_LATEST);使用场景:新上线的监控服务,不需要处理历史积压,只关心新消息。类似 Kafka auto.offset.reset=latest。
是否需要回溯历史数据?
├── 是 → 每次都需要全量重放吗?
│ ├── 是 → START
│ └── 否 → RESUME(首次从头,之后续读)
└── 否 → 是新消费者还是已有消费者?
├── 新消费者,不关心历史 → RESUME_LATEST
└── 只要实时数据,不需要续读 → END// 消费者 A:处理订单
String orderMsg = template.readText("events", "order-processor");
// 消费者 B:记录审计日志(独立进度)
String auditMsg = template.readText("events", "audit-logger");
// 消费者 C:实时统计(只关注新数据)
String statMsg = template.readText("events", "stat-collector", TailerStartStrategy.RESUME_LATEST);tailerId 是消费位置的唯一标识。相同的 tailerId 共享同一个读取位置,不同 tailerId 完全独立。long index = template.writeText("order-events", "ORDER_CREATED:12345");
log.info("消息已写入,索引: {}", index);long index = template.writeDocument("user-events", (WireOut wire) -> {
wire.write("eventType").text("USER_LOGIN");
wire.write("userId").int64(10001L);
wire.write("username").text("zhangsan");
wire.write("loginTime").int64(System.currentTimeMillis());
wire.write("ip").text("192.168.1.100");
});template.writeKeyValue("config-changes", "db.maxPoolSize", "50");
template.writeKeyValue("config-changes", "cache.ttl", "3600");String msg = template.readText("order-events", "order-handler");
if (msg != null) {
processOrder(msg);
}List<String> messages = template.readTextBatch("order-events", "batch-handler", 100);
if (!messages.isEmpty()) {
log.info("本批次读取 {} 条消息", messages.size());
messages.forEach(this::processOrder);
}结合定时任务实现持续消费:
@Scheduled(fixedDelay = 100)
public void consumeOrders() {
List<String> batch = template.readTextBatch("order-events", "scheduled-consumer", 200);
batch.forEach(this::processOrder);
}UserLoginEvent event = template.readDocument("user-events", "event-handler", wire -> {
String eventType = wire.read("eventType").text();
long userId = wire.read("userId").int64();
String username = wire.read("username").text();
long loginTime = wire.read("loginTime").int64();
String ip = wire.read("ip").text();
return new UserLoginEvent(eventType, userId, username, loginTime, ip);
});
List<UserLoginEvent> events = template.readDocumentBatch(
"user-events", "batch-event-handler", 50,
wire -> {
return new UserLoginEvent(
wire.read("eventType").text(),
wire.read("userId").int64(),
wire.read("username").text(),
wire.read("loginTime").int64(),
wire.read("ip").text()
);
}
);String latest = template.peekLastText("order-events");
String earliest = template.peekFirstText("order-events");
String specific = template.peekTextAtIndex("order-events", 86400000000001L);template.toEnd("order-events", "my-consumer");
template.toStart("order-events", "my-consumer");
boolean success = template.moveToIndex("order-events", "my-consumer", targetIndex);
long currentIndex = template.getTailerIndex("order-events", "my-consumer");ExcerptAppender appender = template.getAppender("high-freq-queue");
for (int i = 0; i < 1_000_000; i++) {
appender.writeText("message-" + i);
}
ExcerptTailer tailer = template.createTailer("high-freq-queue", "raw-consumer");
while (true) {
String text = tailer.readText();
if (text == null) break;
}
ChronicleQueue queue = template.getQueue("my-queue");
long lastIndex = queue.lastIndex();queueName)是队列的唯一标识,映射到磁盘上的目录tailerId 决定了消费位置的持久化键,不同业务逻辑必须使用不同的 tailerId{服务名}-{业务功能},例如 order-service-handler、audit-logger返回 null 表示当前没有更多消息可读,并非错误:
String msg = template.readText("my-queue", "consumer");
if (msg == null) {
return;
}
// 错误:不检查 null 直接使用
processMessage(template.readText("my-queue", "consumer")); // 可能 NPE!使用 Wire 协议读写时,读取字段的顺序必须与写入顺序一致:
// 写入
template.writeDocument("queue", wire -> {
wire.write("name").text("张三");
wire.write("age").int32(25);
wire.write("email").text("a@b.com");
});
// 读取 — 字段顺序必须一致
template.readDocument("queue", "consumer", wire -> {
String name = wire.read("name").text();
int age = wire.read("age").int32();
String email = wire.read("email").text();
return new User(name, age, email);
});queue.close() 或 appender.close()/tmp)ChronicleQueueManager 的 Bean 使用 @Lazy 注解,只有在首次被使用时才会初始化queues 预定义列表的队列会在 Manager 初始化时创建// 低效
for (int i = 0; i < 100; i++) {
String msg = template.readText("queue", "consumer");
if (msg != null) process(msg);
}
// 高效
List<String> batch = template.readTextBatch("queue", "consumer", 100);
batch.forEach(this::process);ExcerptAppender appender = template.getAppender("high-freq-queue");
for (String data : largeDataSet) {
appender.writeText(data);
}| 写入频率 | 推荐滚动周期 | 原因 |
|---|---|---|
| < 1000 条/天 | FAST_DAILY 或 WEEKLY | 减少文件数量 |
| 1K ~ 100K 条/天 | FAST_DAILY | 默认推荐 |
| 100K ~ 1M 条/天 | FAST_HOURLY | 单文件不会过大 |
| > 1M 条/天 | FIVE_MINUTELY / TEN_MINUTELY | 控制单文件大小 |
blockSize// 方案 A:定时任务拉取
@Scheduled(fixedDelay = 50)
public void consume() {
List<String> batch = template.readTextBatch("queue", "consumer", 500);
batch.forEach(this::process);
}
// 方案 B:独立线程轮询
@PostConstruct
public void startConsumer() {
Thread consumer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
String msg = template.readText("queue", "consumer");
if (msg != null) {
process(msg);
} else {
LockSupport.parkNanos(1_000_000);
}
}
}, "chronicle-consumer");
consumer.setDaemon(true);
consumer.start();
}Chronicle Queue 底层大量使用 sun.misc.Unsafe、堆外内存(mmap)和 JDK 内部 API。在 Java 21 的强封装模块系统下,必须正确配置 JVM 参数。
--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED在 Gradle 中配置:
tasks.withType(JavaExec).configureEach {
jvmArgs(
'--add-exports', 'java.base/jdk.internal.ref=ALL-UNNAMED',
'--add-exports', 'java.base/sun.nio.ch=ALL-UNNAMED',
'--add-exports', 'java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-exports', 'jdk.unsupported/sun.misc=ALL-UNNAMED',
'--add-opens', 'java.base/java.lang=ALL-UNNAMED',
'--add-opens', 'java.base/java.lang.reflect=ALL-UNNAMED',
'--add-opens', 'java.base/java.io=ALL-UNNAMED',
'--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED',
'--add-opens', 'java.base/java.util=ALL-UNNAMED'
)
}
tasks.withType(Test).configureEach {
jvmArgs(
'--add-exports', 'java.base/jdk.internal.ref=ALL-UNNAMED',
'--add-exports', 'java.base/sun.nio.ch=ALL-UNNAMED',
'--add-opens', 'java.base/java.lang=ALL-UNNAMED',
'--add-opens', 'java.base/java.lang.reflect=ALL-UNNAMED',
'--add-opens', 'java.base/java.io=ALL-UNNAMED',
'--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED'
)
}在 Spring Boot 打包后运行(生产环境):
java \
--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
-jar your-application.jar推荐做法:将参数写入启动脚本或 Dockerfile:
# start.sh
export JAVA_OPTS="--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED"
java $JAVA_OPTS -jar your-application.jar# Dockerfile
ENV JAVA_OPTS="--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED"
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar /app.jar"]-Xms512m -Xmx2g
-XX:MaxDirectMemorySize=4g关于 mmap 内存的误区:
物理内存 > Xmx + MaxDirectMemorySize + 活跃队列总数据量 + 操作系统开销-XX:+UseZGC
# 或
-XX:+UseG1GC
-XX:MaxGCPauseMillis=50
# 或对延迟极度敏感
-XX:+UseShenandoahGCulimit -n 65536
sysctl -w vm.max_map_count=262144
sysctl -w vm.dirty_ratio=10
sysctl -w vm.dirty_background_ratio=5
swapoff -aecho 2048 > /proc/sys/vm/nr_hugepagesJVM 参数:
-XX:+UseLargePages
-XX:LargePageSizeInBytes=2mjava \
--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
-Xms1g -Xmx2g \
-XX:MaxDirectMemorySize=4g \
-XX:+UseZGC \
-jar your-application.jar| 报错信息 | 原因 | 解决方案 |
|---|---|---|
| InaccessibleObjectException | 缺少 --add-opens 参数 | 添加 7.7.1 节的模块开放参数 |
| IllegalAccessError: module java.base does not opens/exports | 缺少 --add-exports 参数 | 添加 7.7.1 节的模块开放参数 |
| java.io.IOException: Map failed | vm.max_map_count 不够或内存不足 | sysctl -w vm.max_map_count=262144 |
| Cannot allocate memory | 物理内存不足 | 增加内存或减少队列数量/块大小 |
| Too many open files | 文件描述符不够 | ulimit -n 65536 |
| OutOfMemoryError: Direct buffer memory | MaxDirectMemorySize 过小 | 增大 -XX:MaxDirectMemorySize |
| java.lang.UnsatisfiedLinkError | JNI 库缺失或架构不匹配 | 确保 OS 架构与 JDK 一致 |
path 目录下.cq4 文件.cq4 文件来释放空间Set<String> names = template.getAllQueueNames();
log.info("当前活跃队列: {}", names);
long consumerIndex = template.getTailerIndex("order-events", "my-consumer");
log.info("消费者当前位置: {}", consumerIndex);
String latestMsg = template.peekLastText("order-events");组件内置了双重关闭机制:
destroy() 关闭所有队列@Scheduled(cron = "0 0 3 * * ?")
public void cleanOldData() {
File queueDir = new File("/data/chronicle-queues/order-events");
File[] files = queueDir.listFiles((dir, name) -> name.endsWith(".cq4"));
if (files == null) return;
long cutoffTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7);
for (File file : files) {
if (file.lastModified() < cutoffTime) {
if (file.delete()) {
log.info("已清理过期队列文件: {}", file.getName());
}
}
}
}Chronicle Queue 支持单写多读的跨进程访问模式。写入端建议只有一个进程。如果需要分布式消息队列,请使用 Kafka 等专业方案。
不会。命名 Tailer 的读取位置由 Chronicle Queue 自动持久化到队列目录中。匿名 Tailer 的位置不会持久化。
会。磁盘空间不足、路径权限不足或 mmap 分配失败时会抛出异常。建议在关键业务路径中添加异常捕获。
单条消息受限于 blockSize,不建议超过块大小的 1/4。一般建议控制在 1MB 以内。
Chronicle Queue 天然保证写入顺序 = 读取顺序(FIFO)。无需额外处理。
不可以。Chronicle Queue 是追加写入(Append-Only)的数据结构。如需逻辑删除,写入“删除标记”消息。
每日磁盘用量 ≈ 每日消息条数 × 平均消息大小 × 1.1(元数据开销约 10%)可以。首次调用 writeText("new-queue", "msg") 时自动创建。也可通过 ChronicleQueueManager.getOrCreateQueue() 手动创建。
| 维度 | chronicle-queue | chronicle-map |
|---|---|---|
| 数据模型 | 有序消息队列(FIFO) | 键值存储(Key-Value) |
| 使用场景 | 消息传递、事件溯源 | 缓存、共享状态 |
| 读取方式 | 顺序消费(Tailer) | 随机访问(Key 查找) |
| 数据保留 | 追加写入,不可删除 | 可更新、可删除 |
easyfk:
config:
storage:
queue:
default-path: /data/app/chronicle-queues
default-roll-cycle: FAST_DAILY
default-block-size: 0
create-directories-on-startup: true
queues:
- queue-name: order-events
path: /data/app/chronicle-queues/orders
roll-cycle: FAST_HOURLY
block-size: 134217728
- queue-name: log-buffer
roll-cycle: FIVE_MINUTELY
- queue-name: metrics
roll-cycle: TEN_MINUTELY
- queue-name: audit-events