EBEasyBuild Docs
文档/后端/WebSocket

easyfk-websocket WebSocket

WebSocket 通信 — 实时双向消息推送阅读时间 ~15 min

1. 项目概述

websocket-server 是 EasyFK 框架中的 WebSocket 服务组件,基于 Netty 构建高性能 WebSocket 服务器,提供实时消息推送能力。支持多策略消息推送、频道订阅管理、JWT 认证、HMAC-SHA256 接口鉴权,以及基于 Disruptor 的高性能异步消息发送。

1.1 技术栈

NettyWebSocket 服务器核心,处理连接、编解码、消息收发
Caffeine高性能本地缓存,管理用户会话数据与活跃时间
JWT用户身份认证与令牌验证
HMAC-SHA256 / MD5接口签名鉴权
Spring Boot AutoConfiguration自动配置与 Bean 管理
Fastjson2JSON 序列化/反序列化

1.2 模块依赖

gradle
dependencies {
    api project(':component-cache:cache-caffeine')
    api project(':component-websocket:websocket-api')
    api('com.mcst:queue-disruptor')
    api('com.mcst:easyfk-authority')
    api('com.mcst:web-base')
    api('io.netty:netty-all')
}

父项目 component-websocket 统一依赖了 com.mcst:easyfk-core

2. 项目结构

plaintext
websocket-server/
├── build.gradle
└── src/main/
    ├── java/com/mcst/easyfk/websocket/
    │   ├── config/
    │   │   └── ServerConfig.java              # Spring Boot 自动配置类
    │   ├── enums/
    │   │   └── EventType.java                 # 事件类型枚举
    │   ├── handler/
    │   │   ├── UserEventHandler.java          # 用户事件处理器接口(业务方实现)
    │   │   └── WebSocketHandler.java          # WebSocket 核心处理器
    │   ├── interceptor/
    │   │   ├── DefaultAuthInterceptor.java    # 默认认证拦截器
    │   │   ├── HandshakeResult.java           # 握手结果对象
    │   │   └── WebSocketInterceptor.java      # 拦截器接口
    │   ├── manager/
    │   │   ├── impl/
    │   │   │   └── SocketManagerImpl.java     # Socket 管理器实现
    │   │   ├── ISocketManager.java            # Socket 管理器接口
    │   │   ├── LocalSubManager.java           # 本地订阅管理器
    │   │   └── WsUserDataManager.java         # 用户会话数据管理器
    │   ├── properties/
    │   │   └── WebSocketProperties.java       # 配置属性类
    │   ├── sender/
    │   │   ├── SessionSendDispatcher.java     # 消息发送分发器(Disruptor 分片)
    │   │   ├── SessionSendEvent.java          # 发送事件对象
    │   │   ├── SessionSendWorker.java         # 实际发送工作器
    │   │   ├── WebSocketMessageSender.java    # 消息发送器(核心)
    │   │   └── WsSendProcessor.java           # Disruptor 消费处理器
    │   ├── server/
    │   │   ├── PushMessageServer.java         # 消息推送服务入口
    │   │   └── WebSocketServer.java           # Netty WebSocket 服务器
    │   ├── strategy/
    │   │   ├── impl/
    │   │   │   ├── ChannelCategoryPushStrategy.java  # 频道+类别推送策略
    │   │   │   ├── ChannelPushStrategy.java           # 频道推送策略
    │   │   │   ├── NotificationPushStrategy.java      # 通知推送策略
    │   │   │   └── UserPushStrategy.java              # 用户推送策略
    │   │   ├── MessagePushStrategy.java       # 推送策略接口
    │   │   └── MessagePushStrategySelector.java  # 策略选择器
    │   ├── util/
    │   │   └── SecuritySignUtil.java          # 安全签名工具类
    │   └── vo/
    │       ├── PushMessage.java               # 推送消息对象
    │       └── ResponseMessage.java           # 响应消息对象
    └── resources/META-INF/spring/
        └── org.springframework.boot.autoconfigure.AutoConfiguration.imports

3. 核心组件详解

3.1 WebSocketServer — Netty 服务器

文件: server/WebSocketServer.java

Netty WebSocket 服务器的启动与关闭管理。使用 @PostConstruct 在 Spring 容器启动后自动以守护线程启动 Netty 服务。

Pipeline 组成:

`IdleStateHandler`空闲超时检测,超时时长由 `sessionTimeoutSeconds` 配置
`ChunkedWriteHandler`分块写入处理器
`HttpObjectAggregator`HTTP 消息聚合器,最大内容长度由 `maxContentLength` 配置
`WebSocketServerCompressionHandler`WebSocket 消息压缩
`WebSocketHandler`业务处理器(Sharable,全局单例)

关键配置项:

`SO_BACKLOG`1024连接等待队列大小
`SO_KEEPALIVE`trueTCP 保活
`TCP_NODELAY`true禁用 Nagle 算法,减少延迟
`WRITE_BUFFER_WATER_MARK`8KB / 16KB写缓冲区水位线

3.2 WebSocketHandler — 核心处理器

文件: handler/WebSocketHandler.java

@ChannelHandler.Sharable 标注,全局共享单例。负责处理 WebSocket 连接的完整生命周期。

3.2.1 连接建立流程

plaintext
客户端发起 HTTP 升级请求
    ↓
handleHttpRequest()
    ↓
执行拦截器链 beforeHandshake()(按 order 排序,正序执行)
    ↓ 全部通过
从拦截器属性获取 userSessionId
    ↓
socketManager.connectUser() 建立连接
    ↓
更新会话活跃时间
    ↓
WebSocket 协议握手
    ↓
发送欢迎消息
    ↓
执行拦截器链 afterHandshake()(逆序执行)

3.2.2 消息处理流程

客户端发送的 TextWebSocketFrame 被解析为 WsUserOptParam,根据 eventType 分类处理:

`pong`更新会话活跃时间(心跳响应)
`login`验证 JWT Token,绑定用户信息到会话
其他如开启鉴权则验证签名,然后交给 `UserEventHandler` 处理

消息处理通过独立的线程池 messageHandleExecutor 异步执行,不阻塞 Netty 的 I/O 线程。

3.2.3 心跳机制

  • **定时心跳任务**: 由 `heartbeatExecutor` 以 `heartbeatIntervalSeconds` 为间隔定期执行
  • **心跳发送线程池**: `heartbeatSendExecutor` 并行发送 Ping 消息
  • **超时检测**: 检查每个会话的最后活跃时间,超过 `sessionTimeoutSeconds` 则关闭连接
  • **Ping 消息格式**: 使用 `PushMessage` 封装,channel 为 `heartbeat`

3.2.4 连接关闭

  • `channelInactive`: 连接关闭时清理 `socketManager`、`sessionLastActiveTime`、`WsUserDataManager`
  • `userEventTriggered`: Netty `IdleStateEvent` 触发超时关闭
  • `exceptionCaught`: 区分客户端主动断开与服务端异常,避免无意义的错误日志

3.3 拦截器体系

WebSocketInterceptor 接口

java
public interface WebSocketInterceptor {
    HandshakeResult beforeHandshake(ChannelHandlerContext ctx, FullHttpRequest request, 
                                     Map<String, Object> attributes);
    default void afterHandshake(...) {}
    default int getOrder() { return 0; }
}
  • `beforeHandshake`: 握手前拦截,可用于 Token 验证、IP 白名单、限流等
  • `afterHandshake`: 握手后处理,可用于日志记录、事件触发
  • `getOrder`: 数值越小越先执行

DefaultAuthInterceptor — 默认认证拦截器

执行优先级: order = -100(最先执行)

处理流程:

1. 获取或生成 clientId(优先请求头 Client-ID > 查询参数 > 基于 User-Agent + IP 自动生成)

2. 提取 clientType(请求头 Client-Type,默认 APP

3. 如开启接口鉴权,使用 HMAC-SHA256 验证连接签名

4. 提取 JWT Token 并验证(如 mustLogin=true 则 Token 必填)

5. 生成会话标识:MD5(clientType + "-" + clientId)

6. 将用户数据写入 WsUserDataManager

支持的请求参数提取方式(同时支持请求头与查询参数):

Token`Access-Token``Access-Token`
客户端类型`Client-Type``Client-Type`
随机数`Request-Nonce``Request-Nonce`
签名`Reset-Sign``Reset-Sign`

3.4 会话管理

ISocketManager 接口 & SocketManagerImpl 实现

管理在线用户的 Channel 映射关系:

plaintext
userSessionId  ←→  Channel(双向映射)

核心方法:

  • `connectUser(channel, userSessionId)`: 建立连接,同步更新 `LocalSubManager` 的 Channel 缓存
  • `disconnectUser(userSessionId)`: 断开连接,清理 Channel 缓存和订阅关系
  • `getUserSession(userSessionId)`: 获取用户 Channel(自动检测 Channel 活跃状态)
  • `getAllUserSessions()`: 获取所有在线 Channel
  • `getUserSessionId(channel)`: 反向查找用户 ID

WsUserDataManager — 用户数据管理

使用 Caffeine 缓存管理用户会话数据,支持以下两种索引:

`WS_USER_DATA_CACHE`userSessionIdWsUserData10万访问后12小时

支持一个用户多个会话(多端登录/多标签页场景)。

LocalSubManager — 订阅管理

管理用户的频道+类别订阅关系,使用 ConcurrentHashMap + CopyOnWriteArraySet 实现线程安全:

`subscriptions`channel:categorySet&lt;userSessionId&gt;正向索引
`channelCache`userSessionIdChannelChannel 缓存(跳过 SocketManager 二次查找)

支持功能:

  • 批量类别订阅/取消订阅(逗号分隔)
  • 直接获取订阅者 Channel 列表(性能优化)
  • 缓存命中/未命中统计
  • 订阅统计信息

3.5 消息发送体系

消息发送架构

plaintext
PushMessageServer
    ↓ pushMessage(PushWsMessageParam)
MessagePushStrategySelector
    ↓ select() 选择策略
MessagePushStrategy(4种实现)
    ↓ push()
WebSocketMessageSender
    ├─ sendMessageToUser()       → SessionSendDispatcher → Disruptor → SessionSendWorker
    ├─ sendMessageToUserByUserId() → SessionSendDispatcher → Disruptor → SessionSendWorker
    ├─ sendToSubscribers()       → batchBroadcast()(直接批量写入,跳过 Disruptor)
    ├─ sendToChannelSubscribers() → batchBroadcast()
    └─ broadcastToAll()          → batchBroadcast()

消息推送策略(策略模式)

`NotificationPushStrategy`0(最高)channel = "notification"全员广播
`UserPushStrategy`2有 userSessionId 或 userId指定用户推送
`ChannelPushStrategy`4有 channel,无 userSessionId/userId/category频道全订阅者推送

SessionSendDispatcher — Disruptor 分片分发

用于单用户/少量用户的消息发送,保证消息可靠性:

  • **分片机制**: 根据 `userSessionId` 的 hash 值选择分片,同一用户的消息进入同一分片队列
  • **队列类型**: 基于 LMAX Disruptor 的 RingBuffer
  • **监控统计**: 每 10 秒输出 Disruptor 入队/丢弃/发送/不可写统计

WebSocketMessageSender — 消息发送器

三种性能优化方案:

1. 零拷贝: 使用 retainedDuplicate() 共享底层 ByteBuf,避免数据复制

2. Channel 直接获取: 从 LocalSubManager 直接获取 Channel 列表,跳过二次查找

3. 批量写入: batchBroadcast() 先批量 write(),再统一 flush(),减少系统调用

背压控制: 发送前检查 channel.isWritable(),不可写时跳过,防止消息积压。

3.6 安全签名

文件: util/SecuritySignUtil.java

支持两种签名方式:

MD5 签名

plaintext
签名数据 = ClientType=xxx&ClientId=xxx&Nonce=xxx&Param=xxx&Secret=xxx
签名结果 = MD5(签名数据)

HMAC-SHA256 签名(推荐,默认使用)

plaintext
签名数据 = ClientType=xxx&ClientId=xxx&Nonce=xxx&Param=xxx
签名结果 = HMAC-SHA256(签名数据, secret)

Param 排序规则: 如果 Param 是 JSON 格式,则按照 key 的 ASCII 码排序后再参与签名。

4. 配置参数

配置前缀:easyfk.config.websocket

4.1 基础配置

`path``/ws`WebSocket 访问路径
`mustLogin``false`是否要求登录才能连接
`securitySecret``easyfk@ws#2026!`接口鉴权密钥(生产环境务必修改)
`enableSecurity``false`是否开启接口鉴权
`allowCrossOrigin``true`是否支持跨域
`allowedOrigins``["*"]`允许的跨域来源
`sessionTimeoutSeconds``1800`会话超时时间(秒,默认30分钟)
`heartbeatIntervalSeconds``30`心跳检测间隔(秒)
`maxSubscriptionsPerUser``100`单用户最大订阅数
`maxFrameSize``65536`WebSocket 帧最大大小(字节,64KB)
`maxContentLength``65536`HTTP 最大内容长度(字节,64KB)

4.2 Netty 线程配置

`bossThreads``1`Boss 线程数(接受连接)

4.3 SSL 配置

`sslEnabled``false`是否启用 SSL
`sslKeyPath`-SSL 私钥路径

4.4 CORS 配置 (`easyfk.config.websocket.cors`)

`allowedMethods``["GET","POST","PUT","DELETE","OPTIONS"]`允许的 HTTP 方法
`exposedHeaders``[]`暴露的响应头
`allowCredentials``true`是否允许携带凭证
`maxAge``3600`预检请求缓存时间(秒)

4.5 性能配置 (`easyfk.config.websocket.performance`)

`sendShardCount``32`Disruptor 发送队列分片数
`sendQueueBufferSize``131072`每个分片的 RingBuffer 大小(必须是2的幂)
`sendQueueTryTimeoutMs``50`入队最大等待时间(毫秒)
`sendQueueThreadPrefix``ws-send-`发送线程名前缀
`sendQueueWaitStrategy``YIELDING`Disruptor 等待策略(BLOCKING/YIELDING/BUSY_SPIN)
`broadcastParallelThreshold``100`广播并行阈值
`heartbeatPoolCoreSize``max(16, CPU×2)`心跳线程池核心线程数
`heartbeatPoolMaxSize``max(32, CPU×4)`心跳线程池最大线程数
`heartbeatPoolQueueSize``50000`心跳线程池队列容量
`messagePoolCoreSize``max(32, CPU×4)`消息处理线程池核心线程数
`messagePoolMaxSize``max(64, CPU×8)`消息处理线程池最大线程数
`messagePoolQueueSize``20000`消息处理线程池队列容量

4.6 配置示例

yaml
easyfk:
  config:
    websocket:
      path: /ws
      port: 8081
      mustLogin: false
      enableSecurity: true
      securitySecret: "your-production-secret-key"
      sessionTimeoutSeconds: 1800
      heartbeatIntervalSeconds: 30
      performance:
        sendShardCount: 64
        sendQueueWorkersPerShard: 4
        sendQueueBufferSize: 262144
        sendQueueWaitStrategy: YIELDING

5. 扩展开发指南

5.1 实现自定义用户事件处理器

业务方通过实现 UserEventHandler 接口来处理客户端发送的业务事件:

java
@Component
public class MyUserEventHandler implements UserEventHandler {

    @Override
    public BaseResult<?> handleUserEvent(WsUserOptParam param) {
        String eventType = param.getEventType();
        Map<String, Object> eventData = param.getEventData();
        
        switch (eventType) {
            case "subscribe":
                // 处理订阅逻辑
                return BRBuilder.successResult();
            case "unsubscribe":
                // 处理取消订阅逻辑
                return BRBuilder.successResult();
            default:
                // 处理其他自定义事件
                return BRBuilder.successResult();
        }
    }

    @Override
    public void clearUserSubscriptions(String clientId) {
        // 用户断开连接时清理业务层订阅数据
    }
}

5.2 实现自定义拦截器

业务方可以实现 WebSocketInterceptor 添加自定义拦截逻辑:

java
@Component
public class IpWhitelistInterceptor implements WebSocketInterceptor {

    @Override
    public HandshakeResult beforeHandshake(ChannelHandlerContext ctx, 
                                           FullHttpRequest request, 
                                           Map<String, Object> attributes) {
        String ip = getClientIp(ctx);
        if (!isAllowed(ip)) {
            return HandshakeResult.forbidden("IP_BLOCKED", "IP not in whitelist");
        }
        return HandshakeResult.success();
    }

    @Override
    public int getOrder() {
        return -200; // 在默认认证拦截器之前执行
    }
}

5.3 服务端主动推送消息

通过 PushMessageServer 推送消息:

java
@Resource
private PushMessageServer pushMessageServer;

// 推送给指定用户
PushWsMessageParam param = new PushWsMessageParam()
    .setUserSessionId("user-session-id")
    .setChannel("order")
    .setCategory("fill")
    .setData(Map.of("orderId", "12345", "status", "filled"));
pushMessageServer.pushMessage(param);

// 推送给频道+类别订阅者
PushWsMessageParam param2 = new PushWsMessageParam()
    .setChannel("quote")
    .setCategory("BTCUSDT")
    .setData(Map.of("price", "50000.00", "volume", "123.45"));
pushMessageServer.pushMessage(param2);

// 发送系统通知
PushWsMessageParam param3 = new PushWsMessageParam()
    .setChannel("notification")
    .setData(Map.of("title", "系统维护通知", "content", "系统将于凌晨2点维护", "level", "IMPORTANT"));
pushMessageServer.pushMessage(param3);

5.4 通过 API 接口远程推送

websocket-api 模块定义了 IPushWsMsgApi 接口:

java
public interface IPushWsMsgApi {
    BaseResult<?> pushMessage(PushWsMessageParam param);
}

业务方可通过 RPC 或 HTTP 调用此接口实现跨服务推送。

6. 自动配置

ServerConfig 通过 Spring Boot @AutoConfiguration 自动注册以下 Bean:

`PushMessageServer`服务消息推送服务入口
`WebSocketHandler`处理器WebSocket 核心处理器
`DefaultAuthInterceptor`拦截器默认认证拦截器
`SocketManagerImpl`管理器在线用户管理
`LocalSubManager`管理器订阅管理
`SessionSendWorker`发送器消息发送工作器
`SessionSendDispatcher`分发器Disruptor 分片分发器
`WebSocketMessageSender`发送器消息发送核心
`NotificationPushStrategy`策略通知推送策略
`UserPushStrategy`策略用户推送策略
`ChannelCategoryPushStrategy`策略频道+类别推送策略
`ChannelPushStrategy`策略频道推送策略
`MessagePushStrategySelector`选择器策略选择器

所有 Bean 均使用 @ConditionalOnMissingBean,业务方可通过自定义同类型 Bean 进行覆盖。

7. 线程模型

plaintext
┌─────────────────────────────────────────────────────────────────┐
│                    Netty Thread Model                           │
│  BossGroup (1 thread)     →  接受连接                           │
│  WorkerGroup (N threads)  →  I/O 读写、编解码                    │
├─────────────────────────────────────────────────────────────────┤
│                    业务线程池                                     │
│  messageHandleExecutor    →  消息业务处理(WebSocketHandler)     │
│  heartbeatSendExecutor    →  心跳 Ping 发送                     │
│  heartbeatExecutor        →  心跳定时检测调度(单线程)            │
├─────────────────────────────────────────────────────────────────┤
│                    Disruptor 发送队列                             │
│  shardQueues[0..N-1]      →  单用户消息异步发送                   │
│  每个分片有独立的 Worker 线程                                     │
├─────────────────────────────────────────────────────────────────┤
│                    直接广播                                       │
│  WebSocketMessageSender   →  订阅推送/全员广播(跳过 Disruptor)  │
│  批量 write + 统一 flush                                        │
├─────────────────────────────────────────────────────────────────┤
│                    监控线程                                       │
│  ws-send-monitor          →  每10秒输出发送统计                   │
└─────────────────────────────────────────────────────────────────┘

8. API 模块(websocket-api)

8.1 数据对象

PushWsMessageParam — 推送消息参数

`channel`String频道(如 kline, quote, order, notification, chat)
`groupId`String群组ID
`userSessionId`String用户会话ID(多个逗号分隔)
`userId`String用户ID(多个逗号分隔)
`data`Map&lt;String, Object&gt;推送数据

WsUserOptParam — 用户操作参数

`eventType`String事件类型
`token`StringJWT Token
`nonce`String随机数(签名验证)
`sign`String签名(接口鉴权)

8.2 枚举定义

ChannelTypeEnum — 频道类型

`NOTIFICATION`notification通知频道
`HEARTBEAT`heartbeat心跳频道
`BIZ`biz业务频道

NotificationLevel — 通知级别

`NORMAL`normal普通通知
`URGENT`urgent紧急通知

EventType — 事件类型

`LOGIN`login登录
`UNSUBSCRIBE`unsubscribe取消订阅
`PING`ping心跳检测(服务端发送)
`PONG`pong心跳响应(客户端发送)
`ONLINE`online检查在线状态

easyfk-websocket — 实时双向通信,构建高效消息推送能力。

— END —