浅谈 Server-Sent Events
前几天看到 Twitter 点赞区域蹦哒的很可爱, 于是乎想研究研究怎么实现的. 一开始下意识以为是 WebSocket, 但是在控制台死活没找到相关的信息, 去 stackoverflow 问了下原来是 Server-Sent Events, 本文对此做个介绍.
什么是 Server-Sent Events
Server-Sent Events 是一种服务器推送技术, 使客户端可以通过 HTTP 连接从服务器自动接收更新. 每个通知以文本流(文本应该为 utf-8)的形式发送, 并以一对换行符结尾. 与 WebSocket 相比:
- 它不是全双工的, 只能服务器向浏览器发送, 因为流信息本质上就是下载, 一旦连接后不能再次发送请求(否则就变成了一次新的连接)
- WebSocket 使用的 ws 协议, 而 SSE 使用的仍然是 HTTP 协议.
SSE 的特点
这里直接抄袭阮一峰聚聚的:
- SSE 使用 HTTP 协议, 现有的服务器软件都支持. WebSocket 是一个独立协议.
- SSE 属于轻量级, 使用简单;WebSocket 协议相对复杂.
- SSE 默认支持断线重连, WebSocket 需要自己实现.
- SSE 一般只用来传送文本, 二进制数据需要编码后传送, WebSocket 默认支持传送二进制数据.
- SSE 支持自定义发送的消息类型.
来个例子
无码言屌, 下面我们通过一个例子来演示怎么使用 SSE. 首先看客户端的代码, 通过 new 一个 EventSource 来创建 SSE 实例, 第一个参数为后端接口, 第二个 option 参数只有一个 `withCredentials`
, 如果为 true, 在跨域的情况下, 可被允许发送 cookie.
const evtSource = new EventSource("http://localhost:3002/sse", { withCredentials: true, });
EventSource 可以监听三种事件, 分别是 `onopen`
, `onmessage`
, `onerror`
, 第一个和第三个不用多说, 分别是建立连接成功和建立连接失败(CORS 或者请求超时等等). 而 `onmessage`
最重要, 它用来监听每次推送的信息流.
因为 SSE 接收的只能是 utf-8 的纯文本, 因此最通用的做法是后端传递一个 JSON 字符串. 下面的代码中, 每次接收到新的推送, 就会打印出 like_count, 直到 like_count > 10, 客户端会主动要求服务端停止推送.
interface Data { payload: { like_count: number }; } evtSource.addEventListener("message", (e: MessageEvent) => { const { payload: { like_count }, }: Data = JSON.parse(e.data); console.log(like_count); if (like_count > 10) { evtSource.close(); } });
上面基本就是客户端要做的事情了, 很简单, 下面看下服务端的. 因为 nestjs 封装了对 SSE 的支持, 这里就用这个框架搞下.
事件流仅仅是一个简单的文本数据流, 文本应该使用 UTF-8 格式的编码. 每条消息后面都由一个空行作为分隔符. 每条消息是由多个字段组成的, 每个字段由字段名, 一个冒号, 以及字段值组成.
规范中支持四个字段, 分别是:
-
event: 该字段为 onmessage 的子集, 也就是说你可以通过
`evtSource.addEventListener("customEvt", () => {}`
来细粒度监听指定 event 的推送. -
data: 也就是传递的实体, 如果该条消息包含多个 data 字段, 则客户端会用换行符把它们连接成一个字符串来作为字段值.
-
id 可以给每次推送增加一个 id 标识, 比如是 tweetId, 这样就可以把实体中的 likeCount 跟 tweetId 一一映射.
-
retry: 指定浏览器重新发起连接的时间间隔, 它是一个整数值, 指定了重新连接的时间(单位为毫秒), 如果不是正整数将被忽略
此外, 以冒号开头的行为注释行, 会被忽略, 注释行可以用来防止连接超时, 服务器可以定期发送一条消息注释行, 以保持连接不断.
: just a comment\n\n id: 12345\n event: addLikeCount\n retry: 10000\n data: {\n data: "likeCount": 1,\n data: }\n\n
下面直接看后端代码实现, 由于 nestjs 实现 SSE 必须用 rxjs, 所以得有点儿相关基础. 代码中每两秒吐出一次数据, 由于 nestjs 会将 data 转化为 SSE 想要的结构, data 直接写成对象即可.
import { Injectable } from "@nestjs/common"; import { interval } from "rxjs"; import { map } from "rxjs/operators"; import { randomSeries } from "yancey-js-util"; @Injectable() export class SSEService { public sse() { let count = 1; return interval(2000).pipe( map((_) => ({ id: randomSeries(6), type: "addLikeCount", data: { payload: { tweetId: randomSeries(6), likeCount: count++ } }, retry: 10000, })) ); } }
如果没什么意外, 前端就可以看到 stream 了.
分析下响应头, 很重要的两个标志是禁止了缓存, 并且 `Content-Type: text/event-stream`
.
GraphQL 是否支持?
Emmmm, 似乎不支持, 不过 GraphQL 本身已经有了强大的 Subscriptions 系统, 也没必要支持这些玩意儿了(毕竟去年 GraphQL 支持个上传还不利索, 逃).
Can I use SSE?
Emmmm, 除了某浏览器, 都可以用.
其他
突然想到, 我在后端配置了 rateLimit. 那么前端如此频繁的获取数据, 会触发 rateLimit 吗? 答案是不会的, 因为 SSE 始终只是一条 HTTP 请求, 而 rateLimit 限制的只是重复请求.
app.use( rateLimit({ windowMs: 15 * 60 * 1000, max: 100, }) );
奉上全部代码
// 前端部分 // SSE.tsx import { FC, useState, useEffect } from "react"; interface CustomEvent extends Event { data: string; } interface Data { payload: { likeCount: number; }; } const SSE: FC = () => { const [like, setLike] = useState(0); const initialSSE = () => { const evtSource = new EventSource("http://localhost:3002/sse", { withCredentials: true, }); evtSource.addEventListener("open", () => { console.log("已开启"); }); // 这里使用的便是自定义事件 evtSource.addEventListener("addLikeCount", ((e: CustomEvent) => { const { payload: { likeCount }, }: Data = JSON.parse(e.data); setLike(likeCount); if (likeCount > 10) { evtSource.close(); } }) as EventListener); evtSource.addEventListener("message", (e: MessageEvent) => {}); evtSource.addEventListener("error", (err: Event) => { console.log(err); }); }; useEffect(() => { initialSSE(); }, []); return <div>{like}</div>; }; export default SSE; // 后端部分 // sse.module.ts import { Module } from "@nestjs/common"; import { SSEController } from "./sse.controller"; import { SSEService } from "./sse.service"; @Module({ controllers: [SSEController], providers: [SSEService], }) export class SSEModule {} // sse.controller.ts import { Controller, MessageEvent, Sse } from "@nestjs/common"; import { Observable } from "rxjs"; import { SSEService } from "./sse.service"; @Controller() export class SSEController { constructor(private readonly sseService: SSEService) { this.sseService = sseService; } @Sse("sse") public sse(): Observable<MessageEvent> { return this.sseService.sse(); } } // sse.service.ts import { Injectable } from "@nestjs/common"; import { interval } from "rxjs"; import { map } from "rxjs/operators"; import { randomSeries } from "yancey-js-util"; @Injectable() export class SSEService { public sse() { let count = 1; return interval(2000).pipe( map((_) => ({ id: randomSeries(6), type: "addLikeCount", data: { payload: { tweetId: randomSeries(6), likeCount: count++ } }, retry: 10000, })) ); } }
参考
PREVIOUS POST
手摸手教你 360 度无死角 WebP 能力检测
NEXT POST
从 React 源码谈 v8 引擎对数组的内部处理