SpringBoot 实现 SSE
SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。在 SrpingBootWeb 中已经很好的封装了 SSE 模块,实现起来非常方便。
我们先使用 https://start.aliyun.com/ 创建一个只有 web 模块的 springboot 服务。增加以下代码
- SseEmitterService.java
package com.runbrick.sse.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class SseEmitterService {
/**
* 根据用户ID存储 Sse 信息
*/
private final static Map<Long, SseEmitter> USER_EMITTERS = new ConcurrentHashMap<>();
/**
* 创建用户连接并返回 SseEmitter
*
* @param userId 用户ID
* @return SseEmitter
*/
public static SseEmitter connect(Long userId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
USER_EMITTERS.put(userId, sseEmitter);
// 注册回调
sseEmitter.onCompletion(() -> USER_EMITTERS.remove(userId));
sseEmitter.onTimeout(() -> USER_EMITTERS.remove(userId));
sseEmitter.onError((e) -> USER_EMITTERS.remove(userId));
log.info("创建新的sse连接,当前用户:{}", userId);
try {
// 向客户端发送一条连接成功的事件
sseEmitter.send("欢迎链接SSE服务");
} catch (IOException e) {
// 如果发送消息失败,则从映射表中移除 emitter
USER_EMITTERS.remove(userId);
log.info("链接失败,当前用户:{}", userId);
}
return sseEmitter;
}
/**
* 给指定用户发送信息
*/
public static void sendMessage(Long userId, String message) {
if (USER_EMITTERS.containsKey(userId)) {
try {
USER_EMITTERS.get(userId).send(message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", userId, e.getMessage());
USER_EMITTERS.remove(userId);
}
}
}
/**
* 群发消息(群发)
*/
public static void sendMessage(String message, List<Long> ids) {
ids.forEach(userId -> sendMessage(userId, message));
}
/**
* 群发消息(超发)
*/
public static void sendMessage(String message) {
List<Long> ids = USER_EMITTERS.keySet().stream().toList();
ids.forEach(userId -> sendMessage(userId, message));
}
/**
* 断开指定用户的 SSE 连接
*/
public void disconnect(Long userId) {
SseEmitter emitter = USER_EMITTERS.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event().comment("disconnected"));
} catch (Exception ignore) {
}
USER_EMITTERS.remove(userId);
}
}
/**
* 移除用户连接
*/
public static void removeUser(Long userId) {
USER_EMITTERS.remove(userId);
log.info("移除用户:{}", userId);
}
}
- SseEmitterMessage.java
package com.runbrick.sse.entity;
import lombok.Data;
@Data
public class SseEmitterMessage {
private Long userId;
private String message;
}
- SseEmitterController.java
package com.runbrick.sse.controller;
import com.runbrick.sse.entity.SseEmitterMessage;
import com.runbrick.sse.service.SseEmitterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@Slf4j
@RequestMapping("/sse")
@RestController
public class SseEmitterController {
/**
* 用于创建连接
*/
@GetMapping("/connect/{userId}")
public SseEmitter connect(@PathVariable Long userId) {
return SseEmitterService.connect(userId);
}
/**
* 发送消息
*/
@PostMapping("/sendUser")
public ResponseEntity<String> sendUser(@RequestBody SseEmitterMessage param) {
if (param.getUserId() != null) {
SseEmitterService.sendMessage(param.getUserId(), param.getMessage()); // 发送给指定得人
} else {
SseEmitterService.sendMessage(param.getMessage()); // 发送给所有人
}
return ResponseEntity.ok("SSE推送消息给" + param.getUserId());
}
/**
* 关闭连接
*/
@GetMapping("/close/{userId}")
public ResponseEntity<String> close(@PathVariable("userId") Long userId) {
SseEmitterService.removeUser(userId);
return ResponseEntity.ok("连接关闭");
}
}
添加完成以下代码后使用 apifox 测试下。
上面创建了两个通讯连接,在开一个窗口测试下是否可以推送消息
也可以使用 js 的 EventSource 来实现,不过我为了省事就没有做。到目前为止就可以实现简单的推送消息和接受消息的功能。这样就不用再去使用 websocket 来实现一些简单的消息推送通知了。
评论区