OpenClaw 流式响应:让 Agent 像"打字员"一样输出
世界上有两种 AI 体验:一种是等了 10 秒突然蹦出一大段文字,吓得你差点把咖啡洒在键盘上;另一种是像打字员一样一个字一个字地蹦出来,让你觉得它在认真思考。后者,就叫流式响应(Streaming)。今天,我们聊聊如何在 OpenClaw 中实现丝滑的实时输出。
流式响应的三种模式
| 模式 | 协议 | 适用场景 | 双向通信 |
|---|---|---|---|
| Server-Sent Events | HTTP + SSE | 单向推送(AI 回复) | ❌ |
| WebSocket | WS/WSS | 双向实时通信 | ✅ |
| HTTP Chunked | HTTP/1.1 | 简单的流式传输 | ❌ |
方式一:Server-Sent Events (SSE)
SSE 是最简单的流式方案,适合 AI 回复的单向推送。
后端配置
# .openclaw/config.yaml
streaming:
enabled: true
default_mode: "sse" # sse | websocket | chunked
sse:
endpoint: "/api/stream"
heartbeat_interval: 15 # 心跳间隔(秒)
retry_timeout: 3000 # 重连间隔(毫秒)
# 流式选项
options:
chunk_size: 10 # 每次输出的 Token 数
include_tool_calls: true # 流式输出工具调用过程
include_thinking: true # 流式输出思考过程
前端接入
// 使用 EventSource 接收 SSE
const eventSource = new EventSource('/api/stream?session=abc123');
// 接收文本片段
eventSource.addEventListener('text', (event) => {
const chunk = event.data;
document.getElementById('output').textContent += chunk;
});
// 接收工具调用状态
eventSource.addEventListener('tool_call', (event) => {
const tool = JSON.parse(event.data);
showToolStatus(tool.name, tool.status);
});
// 接收完成信号
eventSource.addEventListener('done', (event) => {
const result = JSON.parse(event.data);
console.log('完成', result);
eventSource.close();
});
// 错误处理
eventSource.addEventListener('error', (event) => {
if (event.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
} else {
console.log('连接错误,自动重连中...');
}
});
方式二:WebSocket
WebSocket 支持双向通信,适合需要用户实时交互的场景。
配置 WebSocket
# .openclaw/config.yaml
streaming:
websocket:
enabled: true
endpoint: "/ws"
# 消息格式
message_format:
# 发送格式
send:
type: "json"
fields: ["session_id", "message", "context"]
# 接收格式
receive:
type: "json"
fields: ["type", "content", "meta"]
# 心跳
ping_interval: 30
pong_timeout: 10
# 最大消息大小
max_message_size: 1MB
WebSocket 客户端
class StreamingClient {
constructor(url) {
this.ws = new WebSocket(url);
this.callbacks = {};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.type) {
case 'text_chunk':
this.onTextChunk(msg.content);
break;
case 'tool_start':
this.onToolStart(msg.tool, msg.args);
break;
case 'tool_result':
this.onToolResult(msg.tool, msg.result);
break;
case 'thinking':
this.onThinking(msg.content);
break;
case 'done':
this.onDone(msg);
break;
case 'error':
this.onError(msg);
break;
}
};
}
// 发送用户消息
send(message, context = {}) {
this.ws.send(JSON.stringify({
session_id: this.sessionId,
message,
context
}));
}
// 实时显示文本
onTextChunk(chunk) {
this.outputEl.textContent += chunk;
}
// 中断生成
abort() {
this.ws.send(JSON.stringify({ type: 'abort' }));
}
}
// 使用
const client = new StreamingClient('wss://api.miaoquai.com/ws');
client.send('帮我分析这份数据');
方式三:HTTP Chunked Transfer
最简单的流式方案,不需要额外的协议支持。
Node.js 实现
const express = require('express');
const openclaw = require('@openclaw/sdk');
const app = express();
app.post('/api/chat', async (req, res) => {
// 设置流式响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 创建流式 Agent
const agent = await openclaw.createAgent({
stream: true,
model: 'claude-3-sonnet'
});
// 流式输出
const stream = await agent.chat(req.body.message);
stream.on('chunk', (chunk) => {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
});
stream.on('tool_call', (tool) => {
res.write(`event: tool\ndata: ${JSON.stringify(tool)}\n\n`);
});
stream.on('end', () => {
res.write('event: done\ndata: {}\n\n');
res.end();
});
// 客户端断开时取消生成
req.on('close', () => {
stream.abort();
});
});
高级功能
1. 工具调用过程可视化
# 启用工具调用流式输出
streaming:
tool_calls:
stream_progress: true # 显示执行进度
show_args: false # 是否显示参数(隐私考虑)
show_result: true # 显示结果摘要
# 示例输出效果:
# 🔧 调用数据库查询... ✓ (0.3s)
# 🔧 生成图表... ✓ (1.2s)
# 🔧 发送邮件... ✓ (0.8s)
2. 打字机效果优化
// 前端打字机效果
class TypewriterEffect {
constructor(element, options = {}) {
this.el = element;
this.speed = options.speed || 20; // 毫秒/字
this.queue = [];
this.isTyping = false;
}
addChunk(text) {
this.queue.push(...text.split(''));
if (!this.isTyping) this.type();
}
async type() {
this.isTyping = true;
while (this.queue.length > 0) {
const char = this.queue.shift();
this.el.textContent += char;
await new Promise(r => setTimeout(r, this.speed));
// 自动滚动
this.el.parentElement.scrollTop =
this.el.parentElement.scrollHeight;
}
this.isTyping = false;
}
}
3. 中断与恢复
// 中断当前生成
await agent.abort();
// 从中断点恢复
const resumed = await agent.resume({
continue_from: last_token_id,
prefix: "...(已省略)"
});
性能优化
1. 减少延迟
- 启用 HTTP/2 或 HTTP/3 减少连接开销
- 使用 CDN 加速 SSE 端点
- 设置合理的 chunk_size,避免过于频繁的小包
2. 背压控制
# 当客户端处理不过来时
streaming:
backpressure:
enabled: true
max_buffer_size: 100 # 最大缓冲区
strategy: "drop_oldest" # drop_oldest | pause | error
3. 重连机制
- SSE: 浏览器自动重连,设置 retry 字段
- WebSocket: 手动实现指数退避重连
- 恢复上下文: 记录最后接收的 token_id