OpenClaw 流式响应:让 Agent 像"打字员"一样输出

世界上有两种 AI 体验:一种是等了 10 秒突然蹦出一大段文字,吓得你差点把咖啡洒在键盘上;另一种是像打字员一样一个字一个字地蹦出来,让你觉得它在认真思考。后者,就叫流式响应(Streaming)。今天,我们聊聊如何在 OpenClaw 中实现丝滑的实时输出。

流式响应的三种模式

模式 协议 适用场景 双向通信
Server-Sent Events HTTP + SSE 单向推送(AI 回复)
WebSocket WS/WSS 双向实时通信
HTTP Chunked HTTP/1.1 简单的流式传输

方式一:Server-Sent Events (SSE)

SSE 是最简单的流式方案,适合 AI 回复的单向推送。

后端配置

# .openclaw/config.yaml

streaming:
  enabled: true
  default_mode: "sse"  # sse | websocket | chunked
  
  sse:
    endpoint: "/api/stream"
    heartbeat_interval: 15  # 心跳间隔(秒)
    retry_timeout: 3000     # 重连间隔(毫秒)
    
  # 流式选项
  options:
    chunk_size: 10          # 每次输出的 Token 数
    include_tool_calls: true # 流式输出工具调用过程
    include_thinking: true   # 流式输出思考过程

前端接入

// 使用 EventSource 接收 SSE
const eventSource = new EventSource('/api/stream?session=abc123');

// 接收文本片段
eventSource.addEventListener('text', (event) => {
    const chunk = event.data;
    document.getElementById('output').textContent += chunk;
});

// 接收工具调用状态
eventSource.addEventListener('tool_call', (event) => {
    const tool = JSON.parse(event.data);
    showToolStatus(tool.name, tool.status);
});

// 接收完成信号
eventSource.addEventListener('done', (event) => {
    const result = JSON.parse(event.data);
    console.log('完成', result);
    eventSource.close();
});

// 错误处理
eventSource.addEventListener('error', (event) => {
    if (event.readyState === EventSource.CLOSED) {
        console.log('连接已关闭');
    } else {
        console.log('连接错误,自动重连中...');
    }
});

方式二:WebSocket

WebSocket 支持双向通信,适合需要用户实时交互的场景。

配置 WebSocket

# .openclaw/config.yaml

streaming:
  websocket:
    enabled: true
    endpoint: "/ws"
    
    # 消息格式
    message_format:
      # 发送格式
      send:
        type: "json"
        fields: ["session_id", "message", "context"]
      # 接收格式
      receive:
        type: "json"
        fields: ["type", "content", "meta"]
        
    # 心跳
    ping_interval: 30
    pong_timeout: 10
    
    # 最大消息大小
    max_message_size: 1MB

WebSocket 客户端

class StreamingClient {
    constructor(url) {
        this.ws = new WebSocket(url);
        this.callbacks = {};
        
        this.ws.onmessage = (event) => {
            const msg = JSON.parse(event.data);
            
            switch (msg.type) {
                case 'text_chunk':
                    this.onTextChunk(msg.content);
                    break;
                case 'tool_start':
                    this.onToolStart(msg.tool, msg.args);
                    break;
                case 'tool_result':
                    this.onToolResult(msg.tool, msg.result);
                    break;
                case 'thinking':
                    this.onThinking(msg.content);
                    break;
                case 'done':
                    this.onDone(msg);
                    break;
                case 'error':
                    this.onError(msg);
                    break;
            }
        };
    }
    
    // 发送用户消息
    send(message, context = {}) {
        this.ws.send(JSON.stringify({
            session_id: this.sessionId,
            message,
            context
        }));
    }
    
    // 实时显示文本
    onTextChunk(chunk) {
        this.outputEl.textContent += chunk;
    }
    
    // 中断生成
    abort() {
        this.ws.send(JSON.stringify({ type: 'abort' }));
    }
}

// 使用
const client = new StreamingClient('wss://api.miaoquai.com/ws');
client.send('帮我分析这份数据');

方式三:HTTP Chunked Transfer

最简单的流式方案,不需要额外的协议支持。

Node.js 实现

const express = require('express');
const openclaw = require('@openclaw/sdk');

const app = express();

app.post('/api/chat', async (req, res) => {
    // 设置流式响应头
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    
    // 创建流式 Agent
    const agent = await openclaw.createAgent({
        stream: true,
        model: 'claude-3-sonnet'
    });
    
    // 流式输出
    const stream = await agent.chat(req.body.message);
    
    stream.on('chunk', (chunk) => {
        res.write(`data: ${JSON.stringify(chunk)}\n\n`);
    });
    
    stream.on('tool_call', (tool) => {
        res.write(`event: tool\ndata: ${JSON.stringify(tool)}\n\n`);
    });
    
    stream.on('end', () => {
        res.write('event: done\ndata: {}\n\n');
        res.end();
    });
    
    // 客户端断开时取消生成
    req.on('close', () => {
        stream.abort();
    });
});

高级功能

1. 工具调用过程可视化

# 启用工具调用流式输出
streaming:
  tool_calls:
    stream_progress: true   # 显示执行进度
    show_args: false        # 是否显示参数(隐私考虑)
    show_result: true       # 显示结果摘要
    
  # 示例输出效果:
  # 🔧 调用数据库查询... ✓ (0.3s)
  # 🔧 生成图表... ✓ (1.2s)
  # 🔧 发送邮件... ✓ (0.8s)

2. 打字机效果优化

// 前端打字机效果
class TypewriterEffect {
    constructor(element, options = {}) {
        this.el = element;
        this.speed = options.speed || 20;  // 毫秒/字
        this.queue = [];
        this.isTyping = false;
    }
    
    addChunk(text) {
        this.queue.push(...text.split(''));
        if (!this.isTyping) this.type();
    }
    
    async type() {
        this.isTyping = true;
        while (this.queue.length > 0) {
            const char = this.queue.shift();
            this.el.textContent += char;
            await new Promise(r => setTimeout(r, this.speed));
            
            // 自动滚动
            this.el.parentElement.scrollTop = 
                this.el.parentElement.scrollHeight;
        }
        this.isTyping = false;
    }
}

3. 中断与恢复

// 中断当前生成
await agent.abort();

// 从中断点恢复
const resumed = await agent.resume({
    continue_from: last_token_id,
    prefix: "...(已省略)"
});

性能优化

1. 减少延迟

2. 背压控制

# 当客户端处理不过来时
streaming:
  backpressure:
    enabled: true
    max_buffer_size: 100   # 最大缓冲区
    strategy: "drop_oldest" # drop_oldest | pause | error

3. 重连机制

相关资源