Skip to content

轻量级的、基于文本的实时通信机制Server-Sent Events (SSE)

1、Server-Sent Events (SSE) 简介

Server-Sent Events (SSE) 是一种基于 HTTP 协议的标准,它允许服务器通过单向连接向浏览器发送实时更新。与 WebSocket 不同,SSE 仅允许服务器向客户端推送数据,而客户端不能通过同一连接发送数据给服务器。

SSE 是一种轻量级的、基于文本的实时通信机制,适用于需要向客户端发送实时信息的场景,比如新闻推送、股票实时数据、天气预报等。

2、SSE 特点

  • 单向通信:只有服务器向客户端发送数据。
  • 基于标准的 HTTP 协议:与 HTTP 完全兼容,易于集成和使用。
  • 自动重连:当连接中断时,浏览器会自动重新连接。
  • 传输格式:SSE 使用 text/event-stream 格式发送数据。
  • 长连接:与 WebSocket 不同,SSE 通过 HTTP 长连接进行实时通信,适用于数据流传输而无需双向通信的场景。

3、如何在 Spring Boot 中实现 SSE

java
package com.xx.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class SseController {

    // 线程池用于异步推送消息
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    @GetMapping(value = "/sse/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe() {
        SseEmitter emitter = new SseEmitter(0L); // 设置为无限超时

        executor.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    emitter.send("当前时间:" + LocalTime.now());
                    Thread.sleep(1000); // 每秒推送一次
                }
                emitter.complete(); // 任务完成后关闭连接
            } catch (IOException | InterruptedException e) {
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}
html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE 测试</title>
</head>
<body>
<h1>SSE 实时推送:</h1>
<div id="msg"></div>

<script>
    const eventSource = new EventSource("http://localhost:8080/sse/subscribe");

    eventSource.onmessage = function(event) {
        const msgDiv = document.getElementById("msg");
        msgDiv.innerHTML += "<p>" + event.data + "</p>";
    };

    eventSource.onerror = function () {
        console.error("连接出错或已关闭。");
        eventSource.close();
    };
</script>
</body>
</html>

使用 ApiPost 也可以发送SSE接口

4、推送多个事件

java
package com.xx.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class SseController {

    // 线程池用于异步推送消息
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    @GetMapping(value = "/sse/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe() {
        SseEmitter emitter = new SseEmitter(0L); // 设置为无限超时

        executor.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
//                    emitter.send("当前时间:" + LocalTime.now());
                    emitter.send("事件 1");
                    Thread.sleep(1000);
                    emitter.send("事件 2");
                    Thread.sleep(1000);
                    emitter.send("事件 3");
                    Thread.sleep(1000); // 每秒推送一次
                }
                emitter.complete(); // 任务完成后关闭连接
            } catch (IOException | InterruptedException e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;
    }
}

5、自定义 HTTP 头避免缓存

java
@RequestMapping(value = "/sse/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe() {
    SseEmitter emitter = new SseEmitter();
    emitter.setCacheControl("no-cache");
    return emitter;
}

设置 Cache-Control: no-cache 是为了告诉浏览器不要缓存 SSE 响应数据,以确保客户端总是接收到服务器“最新、实时”的推送内容。

6、封装SSE Service

1、ISysSSEService

java
package com.xx.service;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * @Author: xueqimiao
 * @Date: 2025/4/1 14:11
 */
public interface ISysSSEService {

    /**
     * SSE 连接
     */
    SseEmitter connect();

    /**
     * 指定用户ID SSE 推送消息
     *
     * @param userId    用户ID
     * @param eventName 事件名称
     * @param message   消息内容
     */
    boolean send(Long userId, String eventName, String message);

    /**
     * 指定用户ID SSE 推送提醒消息
     *
     * @param userId  用户ID
     * @param title   消息标题
     * @param content 消息内容
     */
    boolean sendRemind(Long userId, String title, String content);
}

2、SysSSEServiceImpl

java
package com.xx.service;

import cn.hutool.json.JSONUtil;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.springframework.http.MediaType.APPLICATION_JSON;

/**
 * @Author: xueqimiao
 * @Date: 2025/4/1 14:11
 */
@Service
public class SysSSEServiceImpl implements ISysSSEService {
    private final static Map<Long, ConcurrentHashMap<String, SseEmitter>> USER_TOKEN_EMITTERS =
            new ConcurrentHashMap<>();

    @Override
    public SseEmitter connect() {
        Long userId = 1L;
        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
        // 创建一个新的 SseEmitter 实例,默认30秒超时,设置为0L则永不超时
        SseEmitter emitter = new SseEmitter(0L);
        // 用户会话ID
        String sid = "1";
        if (null != sid) {
            try {
                // 会话关联推送
                emitters.put(sid, emitter);
                // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的会话信息
                emitter.onCompletion(() -> emitters.remove(sid));
                emitter.onTimeout(() -> emitters.remove(sid));
                emitter.onError((e) -> emitters.remove(sid));
                // 向客户端发送一条连接成功的事件
                emitter.send(SseEmitter.event().comment("connected"));
            } catch (IOException e) {
                // 如果发送消息失败,则从映射表中移除 emitter
                emitters.remove(sid);
            }
        }
        return emitter;
    }

    @Override
    public boolean send(Long userId, String eventName, String message) {
        boolean result = false;
        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
        if (emitters != null) {
            for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
                SseEmitter sseEmitter = entry.getValue();
                try {
                    sseEmitter.send(SseEmitter.event().name(eventName).data(message, APPLICATION_JSON));
                    if (!result) {
                        // 只要推送一次成功认为成功
                        result = true;
                    }
                } catch (Exception e) {
                    emitters.remove(entry.getKey());
                    sseEmitter.completeWithError(e);
                }
            }
        }
        return result;
    }

    @Override
    public boolean sendRemind(Long userId, String title, String content) {
        return this.send(userId, "remind", this.toMessage(title, content));
    }

    /**
     * 组装消息
     *
     * @param title   消息标题
     * @param content 消息内容
     * @return 实际发送消息内容
     */
    private String toMessage(String title, String content) {
        Map<String, Object> map = new HashMap<>();
        map.put("title", title);
        map.put("content", content);
        return JSONUtil.formatJsonStr(map.toString());
    }

}

3、SseController

java
package com.xx.controller;

import com.xx.service.ISysSSEService;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class SseController {

    @Resource
    private ISysSSEService sseService;

    @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(HttpServletResponse response) {
        // 设置响应的字符编码
        response.setCharacterEncoding("UTF-8");
        return sseService.connect();
    }

    @GetMapping("/test-send")
    public String sendTest(@RequestParam(required = false,name = "message") String message) {
        sseService.send(1L, "message", message);
        return "SUCCESS";
    }
}

7、总结

  • SSE 是一种非常适合实现单向实时通信的技术。它在 Spring Boot 中的实现相对简单且高效,尤其适合需要实时推送更新但不要求双向通信的场景。

写在最后

在技术的漫漫长路上,我们都是行者,怀揣着梦想与热情,踏过迷茫与困惑,一步步向着更高的境界迈进。每一次的摸索与尝试,每一回的挫折与成长,都如同繁星点缀着我们的旅程,使其熠熠生辉。