使用 Node.js 实现一个简单的 SSE 服务
网上有很多 Demo 介绍了如何使用 SSE。但是真正向客户端发送请求是一个持续的过程,因此需要有一个很好的解决方案来管理这些长链接。目前网上的一些 Demo 和博客基本上都是在请求的 Controller 中直接向客户端发送。所以它们也只能是 Demo。
什么是 SSE?
互联网可以轻松搜索到关于 SSE 的标准,因此不赘述。本文主要介绍 Node.js 的实现
见 MDN 文档
客户端示例
const sse = new EventSource('/sse');
sse.onerror = (err) => {
console.error('An error occurred!!', err);
};
sse.onmessage = (e) => {
console.log('Received message: ', e.data);
};
服务端实现
准备
Framework
- 本文使用 Express.js 为服务端框架,其他框架原理相似
安装 ssestream
需安装 NPM 包 ssestream
。该包封装了对 HTTP Header 的一些简单的处理。以下是其代码核心片段。
|
|
可以看出,SSE 需要在在 HTTP 响应的 Header 为
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
需要注意这里使用了 flushHeaders()
来设置 Header。如果不调用这个函数,在第一条响应返回客户端之前或者调用 response.end()
之前是不会写入真正的响应 Header 的,而是把这些 Header 缓存起来。因为我们不需要立即向客户端发送消息,所以我们先把 Response Header 返回给客户端。
使用
|
|
实现 SseSever
类
下面就看一下这个 SseServer
是如何实现的
const SseStream = require('ssestream');
class SseServer {
constructor (options) {
// 用来缓存当前所有的链接用来之后发送消息
this.sseConnections = new Set()
// 设置最大链接数
this.maxConnections = options.maxConnections || Infinity;
this.middleWare = this.middleWare.bind(this);
this.announce = this.announce.bind(this);
}
middleWare () {
return (req, res) => {
const sseConnections = this.sseConnections;
// 超过最大链接数的时候需要拒绝客户端请求
if (sseConnections.size >= this.maxConnections) {
return res.status(429).send()
}
const sse = new SseStream(req);
// 详见 ssestream 的 api
sse.pipe(res);
const metaData = [sse, req, res];
// 写入链接缓存
sseConnections.add(metaData)
// 与客户端链接断开时需要清除链接缓存
req.on('close', function () {
console.log('CONNECTION CLOSED!!!')
sseConnections.delete(metaData)
})
}
}
// 向客户端广播消息
announce (data) {
this.sseConnections.forEach((meta) => {
const [sse, req, res] = meta
const message = {
data,
};
meta[0].write(message);
})
}
}
本文仅展示核心原理。如需提高代码健壮性,需要有更多的错误处理以及缓存长度检查机制以防止内存泄漏。
Authored by @yuqingc 转载请注明出处