数据流式获取主要适用于实时性要求高的场景,如聊天、实时推送、大文件下载/上传、日志流等。
-
WebSocket
双向全双工通信,适合高实时性场景。
const ws = new WebSocket('wss://api.example.com/stream');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('收到流式数据:', data);
};
ws.onopen = () => {
ws.send(JSON.stringify({ type: 'subscribe' }));
};
-
Server-Sent Events (SSE)
单向服务器推送,基于 HTTP 协议,自动重连。
const eventSource = new EventSource('/api/stream');
eventSource.onmessage = (event) => {
console.log('收到数据:', JSON.parse(event.data));
};
eventSource.onerror = (err) => {
console.error('SSE 错误:', err);
};
服务器响应头需包含:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
-
Fetch API + ReadableStream
处理分块响应,适用于大文件下载、流式 API。
fetch('/api/stream-data')
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function readChunk() {
return reader.read().then(({ done, value }) => {
if (done) return;
const chunk = decoder.decode(value);
console.log('收到块:', chunk);
// 继续读取下一块
return readChunk();
});
}
return readChunk();
});
在微信小程序中
// 示例:请求一个流式接口
const requestTask = wx.request({
url: 'https://your-backend.com/stream/chat',
method: 'POST',
header: {
'Content-Type': 'application/json',
'Connection': 'close' // 必须设置此项
},
data: {
prompt: '你好,请介绍一下你自己。',
stream: true // 通常后端也需要此参数来开启流式
},
responseType: 'text', // 响应类型通常为文本
enableChunked: true, // 关键:启用分块传输模式
timeout: 0, // 根据实际情况设置,流式请求通常较长,可设置为0或不设置
success(res) {
// 注意:在 enableChunked 模式下,success 回调中通常拿不到完整的响应体
// 完整的响应数据会在 onChunkReceived 中处理
console.log('请求连接成功建立', res.statusCode);
},
fail(err) {
console.error('请求失败', err);
}
});
// 必须监听 onChunkReceived 事件来接收数据块
requestTask.onChunkReceived((res) => {
// res 是一个对象,包含接收到的原始数据
const arrayBuffer = res.data;
// 1. 将 ArrayBuffer 转换为字符串
const decoder = new TextDecoder('utf-8');
const chunkString = decoder.decode(arrayBuffer, { stream: true }); // 注意 { stream: true }
// 2. 处理流式数据 (以 SSE 格式为例)
// SSE 数据格式通常为 "data: {some json}\n\n"
const lines = chunkString.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const dataContent = line.replace('data: ', '');
if (dataContent === '[DONE]') {
console.log('流式传输结束');
return;
}
try {
const parsedData = JSON.parse(dataContent);
// 假设后端返回 { "choices": [{ "delta": { "content": "Hello" } }] }
const text = parsedData.choices?.[0]?.delta?.content || '';
if (text) {
// 在这里更新 UI,例如拼接字符串
// this.setData({ replyText: this.data.replyText + text });
console.log('收到片段:', text);
}
} catch (e) {
console.log('解析非JSON数据或心跳:', dataContent);
}
}
}
});
由于是长连接,需要提供用户取消的选项
// 在 Page 或 Component 的 data 中定义
data: {
requestTask: null
},
// 开始请求
startStream() {
const task = wx.request({ /* 上述配置 */ });
this.setData({ requestTask: task });
},
// 停止/中断请求
stopStream() {
if (this.data.requestTask) {
this.data.requestTask.abort(); // 调用 abort 方法中断请求
console.log('请求已中止');
this.setData({ requestTask: null });
}
}
-
WebRTC DataChannel
点对点(P2P)低延迟传输,适合视频流、游戏等场景。
底层默认使用 SCTP(流控制传输协议) ,运行在 UDP 之上,结合了 TCP 的可靠性和 UDP 的实时性优点。
const configuration = {
iceServers: [ // 这是一个数组,可以配置多个服务器,按顺序尝试
// STUN 服务器配置(用于获取本机公网地址)
{
urls: "stun:stun.example.com:19302"
},
// 或
{
urls: "stun:stun.l.google.com:19302"
},
// TURN 服务器配置(在中继时需认证)
{
urls: "turn:turn.example.com:3478",
username: "your_username",
credential: "your_password"
}
],
// 其他可选参数...
};
// 创建对等连接
const pc = new RTCPeerConnection(configuration);
// 创建DataChannel
const dataChannel = pc.createDataChannel('myChannel');
// 监听消息
dataChannel.onmessage = event => console.log('收到消息:', event.data);
// 发送消息
dataChannel.send('Hello DataChannel!');
- 轮询与长轮询
短轮询:定时请求,简单但实时性差。
长轮询:服务器持有请求直到有数据,比短轮询实时性好。
它们的区别
| 特性 | 短轮询 | 长轮询 | Server-Sent Events (SSE) | Fetch + ReadableStream | WebSocket |
|---|---|---|---|---|---|
| 核心模型 | 客户端定时“拉取” | 客户端挂起式“拉取” | 服务器单向“推送” | 单向“响应流” | 双向“对话” |
| 通信方向 | 准双向 (由请求发起) | 准双向 (由请求发起) | 单向 (服务器 → 客户端) | 单向 (服务器响应流 → 客户端) | 全双工双向 (随时互发) |
| 协议基础 | HTTP/HTTPS (普通请求) | HTTP/HTTPS (长连接) | HTTP/HTTPS (长连接) | HTTP/HTTPS (单次请求) | 独立的 ws/wss 协议 |
| 连接方式 | 短连接 (请求完即关) | 长连接 (服务器挂起) | 持久连接 (服务器可主动发) | 单次连接 (流结束即关) | 持久连接 (升级后专用) |
| 实时性 | 差 (延迟=轮询间隔) | 较好 (数据就绪即返回) | 好 (数据就绪即推送) | 好 (数据块就绪即传输) | 极好 (毫秒级双向) |
| 服务器压力 | 很高 (大量无效请求) | 中等 (挂起减少请求数) | 低 (一个连接持续推送) | 低 (单次流式响应) | 极低 (无头开销,长连接) |
| 数据格式 | 任意 (JSON 等) | 任意 (JSON 等) | 文本 (event-stream 格式) | 任意 (文本/二进制) | 任意 (文本/二进制) |
| 背压控制 | 无 | 无 | 无 | 原生支持 (可暂停流) | 无 (网络层控制) |
| 典型场景 | 兼容性要求极高的旧系统 | 需较好实时性但无法用 SSE/WS | 实时通知、行情、日志 (如 ChatGPT 流) | 大文件下载、AI 文本流、视频 | 聊天、游戏、协同编辑、实时交易 |
| 实现复杂度 | 非常简单 | 较复杂 (需处理超时重连) | 简单 | 中等 (需处理流读取) | 中等 (需处理协议状态) |
核心区别详解
-
主动性:谁发起数据流动?
短/长轮询 (Pull - 拉) :客户端驱动。像不断打电话问“有消息吗?”。
SSE/WebSocket (Push - 推) :服务器驱动。像订阅了频道,消息来了就直接送到。
Fetch Stream (Stream - 流) :客户端发起,服务器流式响应。像打开水龙头,水持续流出来。 -
连接与开销的本质区别
短轮询是“勤问但笨”:无论有无新消息,都频繁敲门问,浪费体力(网络开销)。
长轮询是“等信再问”:站在门口等,一有信就收,收完继续等。比短轮询聪明,但仍需“站”这个动作(保持 HTTP 请求)。
SSE 是“装了个信箱”:信箱(HTTP 连接)装好后,邮差(服务器)有信就直接投进去。
Fetch Stream 是“接了一根水管”:打开阀门(发起请求),水(数据)就持续流过来,流完管子就拆。
WebSocket 是“接了条热线电话”:电话接通后,双方可以随时说话,没有拨号开销。 -
单向与双向的鸿沟
SSE 和 Fetch Stream 是单向高速公路:数据只能从服务器流向客户端。
WebSocket 是双向立交桥:数据可以随时双向自由通行。
轮询是双向乡间小道:每次只能单向走一辆车(请求或响应),需要来回掉头。 -
“流”的特有优势:背压控制
这是 Fetch + ReadableStream 的独有优势。客户端可以主动控制数据流的接收速度,比如:
const reader = response.body.getReader();
// 客户端可以根据处理能力暂停读取
if (bufferIsFull) {
await delay(100); // 暂停一下,防止内存溢出
}
await reader.read(); // 继续读取下一块
这在下载超大文件或处理慢速消费的流时至关重要,而其他方案不具备这种原生应用层控制。
如何选择?
-
需要客户端随时主动向服务器发消息吗?
是 → 只能选 WebSocket。
否 → 进入第 2 题。
-
主要是处理一个大文件的下载,或一个很长的文本生成,需要控制接收速度吗?
是 → 选 Fetch + ReadableStream。
否 → 进入第 3 题。
-
主要是服务器有事件就实时推给客户端,且客户端只需偶尔发送指令(用另一个普通 HTTP 请求)?
是 → 选 SSE。
否 → 进入第 4 题。
-
对实时性要求一般,但服务器或浏览器太老,不支持 SSE/WS?
对实时性有要求 → 用 长轮询。
对实时性几乎没要求(比如每分钟同步一次)→ 用 短轮询。
评论区