侧边栏壁纸
  • 累计撰写 49 篇文章
  • 累计创建 23 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

SpringBoot 实现 SSE

阿砖
2024-11-19 / 0 评论 / 0 点赞 / 260 阅读 / 9869 字

SpringBoot 实现 SSE

SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。在 SrpingBootWeb 中已经很好的封装了 SSE 模块,实现起来非常方便。

我们先使用 https://start.aliyun.com/ 创建一个只有 web 模块的 springboot 服务。增加以下代码

  • SseEmitterService.java
package com.runbrick.sse.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class SseEmitterService {

    /**
     * 根据用户ID存储 Sse 信息
     */
    private final static Map<Long, SseEmitter> USER_EMITTERS = new ConcurrentHashMap<>();

    /**
     * 创建用户连接并返回 SseEmitter
     *
     * @param userId 用户ID
     * @return SseEmitter
     */
    public static SseEmitter connect(Long userId) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        USER_EMITTERS.put(userId, sseEmitter);

        // 注册回调
        sseEmitter.onCompletion(() -> USER_EMITTERS.remove(userId));
        sseEmitter.onTimeout(() -> USER_EMITTERS.remove(userId));
        sseEmitter.onError((e) -> USER_EMITTERS.remove(userId));
        log.info("创建新的sse连接,当前用户:{}", userId);

        try {
            // 向客户端发送一条连接成功的事件
            sseEmitter.send("欢迎链接SSE服务");
        } catch (IOException e) {
            // 如果发送消息失败,则从映射表中移除 emitter
            USER_EMITTERS.remove(userId);
            log.info("链接失败,当前用户:{}", userId);
        }
        return sseEmitter;
    }

    /**
     * 给指定用户发送信息
     */
    public static void sendMessage(Long userId, String message) {
        if (USER_EMITTERS.containsKey(userId)) {
            try {
                USER_EMITTERS.get(userId).send(message);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", userId, e.getMessage());
                USER_EMITTERS.remove(userId);
            }
        }
    }

    /**
     * 群发消息(群发)
     */
    public static void sendMessage(String message, List<Long> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * 群发消息(超发)
     */
    public static void sendMessage(String message) {
        List<Long> ids = USER_EMITTERS.keySet().stream().toList();
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * 断开指定用户的 SSE 连接
     */
    public void disconnect(Long userId) {
        SseEmitter emitter = USER_EMITTERS.get(userId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event().comment("disconnected"));
            } catch (Exception ignore) {
            }
            USER_EMITTERS.remove(userId);
        }
    }

    /**
     * 移除用户连接
     */
    public static void removeUser(Long userId) {
        USER_EMITTERS.remove(userId);
        log.info("移除用户:{}", userId);
    }
}
  • SseEmitterMessage.java
package com.runbrick.sse.entity;

import lombok.Data;

@Data
public class SseEmitterMessage {

    private Long userId;

    private String message;
}

  • SseEmitterController.java
package com.runbrick.sse.controller;

import com.runbrick.sse.entity.SseEmitterMessage;
import com.runbrick.sse.service.SseEmitterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Slf4j
@RequestMapping("/sse")
@RestController
public class SseEmitterController {

    /**
     * 用于创建连接
     */
    @GetMapping("/connect/{userId}")
    public SseEmitter connect(@PathVariable Long userId) {
        return SseEmitterService.connect(userId);
    }

    /**
     * 发送消息
     */
    @PostMapping("/sendUser")
    public ResponseEntity<String> sendUser(@RequestBody SseEmitterMessage param) {
        if (param.getUserId() != null) {
            SseEmitterService.sendMessage(param.getUserId(), param.getMessage()); // 发送给指定得人
        } else {
            SseEmitterService.sendMessage(param.getMessage()); // 发送给所有人
        }
        return ResponseEntity.ok("SSE推送消息给" + param.getUserId());
    }


    /**
     * 关闭连接
     */
    @GetMapping("/close/{userId}")
    public ResponseEntity<String> close(@PathVariable("userId") Long userId) {
        SseEmitterService.removeUser(userId);
        return ResponseEntity.ok("连接关闭");
    }
}

添加完成以下代码后使用 apifox 测试下。

image

image

上面创建了两个通讯连接,在开一个窗口测试下是否可以推送消息

image

image

也可以使用 js 的 EventSource 来实现,不过我为了省事就没有做。到目前为止就可以实现简单的推送消息和接受消息的功能。这样就不用再去使用 websocket 来实现一些简单的消息推送通知了。

0

评论区