Skip to content

流式输出指南

流式输出是 DeepSeek API 的重要功能,允许您实时接收模型生成的内容,而不需要等待完整响应。这对于提升用户体验和构建实时应用程序非常有用。

概述

流式输出的优势:

  • 实时响应: 用户可以立即看到生成的内容
  • 更好的用户体验: 减少等待时间,提供流畅的交互
  • 长文本处理: 适合生成长篇内容的场景
  • 资源优化: 可以提前处理部分结果

基本用法

Python 示例

python
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

# 启用流式输出
response = client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "user", "content": "写一篇关于人工智能的短文"}
    ],
    stream=True,  # 启用流式输出
    max_tokens=500
)

# 处理流式响应
for chunk in response:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

Node.js 示例

javascript
import OpenAI from 'openai';

const client = new OpenAI({
    apiKey: process.env.DEEPSEEK_API_KEY,
    baseURL: 'https://api.deepseek.com'
});

async function streamCompletion() {
    const stream = await client.chat.completions.create({
        model: 'deepseek-chat',
        messages: [
            { role: 'user', content: '写一篇关于人工智能的短文' }
        ],
        stream: true,
        max_tokens: 500
    });

    for await (const chunk of stream) {
        if (chunk.choices[0]?.delta?.content) {
            process.stdout.write(chunk.choices[0].delta.content);
        }
    }
}

streamCompletion();

cURL 示例

bash
curl -X POST "https://api.deepseek.com/chat/completions" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEEPSEEK_API_KEY" \
  -d '{
    "model": "deepseek-chat",
    "messages": [
      {"role": "user", "content": "写一篇关于人工智能的短文"}
    ],
    "stream": true,
    "max_tokens": 500
  }' \
  --no-buffer

高级用法

1. 带错误处理的流式处理

python
import json
from openai import OpenAI

def stream_with_error_handling(prompt):
    try:
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            max_tokens=800
        )
        
        full_content = ""
        for chunk in response:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                full_content += content
                print(content, end="", flush=True)
                
        return full_content
        
    except Exception as e:
        print(f"\n错误: {e}")
        return None

# 使用示例
result = stream_with_error_handling("解释量子计算的基本原理")

2. 实时内容过滤

python
import re

def stream_with_filter(prompt, filter_words=None):
    if filter_words is None:
        filter_words = ["敏感词1", "敏感词2"]
    
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=600
    )
    
    buffer = ""
    for chunk in response:
        if chunk.choices[0].delta.content is not None:
            buffer += chunk.choices[0].delta.content
            
            # 检查是否包含完整的词语
            words = buffer.split()
            if len(words) > 1:
                # 处理除最后一个词外的所有词
                for word in words[:-1]:
                    if not any(filter_word in word for filter_word in filter_words):
                        print(word + " ", end="", flush=True)
                
                # 保留最后一个可能不完整的词
                buffer = words[-1]
    
    # 处理剩余内容
    if buffer and not any(filter_word in buffer for filter_word in filter_words):
        print(buffer, end="", flush=True)

3. 流式输出到文件

python
def stream_to_file(prompt, filename):
    with open(filename, 'w', encoding='utf-8') as f:
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            max_tokens=1000
        )
        
        for chunk in response:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                f.write(content)
                f.flush()  # 立即写入文件
                print(content, end="", flush=True)

# 使用示例
stream_to_file("写一篇关于机器学习的详细介绍", "ml_article.txt")

Web 应用集成

1. Flask 流式响应

python
from flask import Flask, Response, request, render_template_string
import json

app = Flask(__name__)

def generate_stream(prompt):
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=800
    )
    
    for chunk in response:
        if chunk.choices[0].delta.content is not None:
            yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
    
    yield f"data: {json.dumps({'done': True})}\n\n"

@app.route('/stream')
def stream():
    prompt = request.args.get('prompt', '你好')
    return Response(
        generate_stream(prompt),
        mimetype='text/plain',
        headers={'Cache-Control': 'no-cache'}
    )

@app.route('/')
def index():
    return render_template_string('''
    <!DOCTYPE html>
    <html>
    <head>
        <title>DeepSeek 流式输出演示</title>
    </head>
    <body>
        <h1>DeepSeek 流式输出演示</h1>
        <input type="text" id="prompt" placeholder="输入您的问题" style="width: 400px;">
        <button onclick="startStream()">开始生成</button>
        <div id="output" style="margin-top: 20px; padding: 10px; border: 1px solid #ccc;"></div>
        
        <script>
        function startStream() {
            const prompt = document.getElementById('prompt').value;
            const output = document.getElementById('output');
            output.innerHTML = '';
            
            const eventSource = new EventSource('/stream?prompt=' + encodeURIComponent(prompt));
            
            eventSource.onmessage = function(event) {
                const data = JSON.parse(event.data);
                if (data.content) {
                    output.innerHTML += data.content;
                } else if (data.done) {
                    eventSource.close();
                }
            };
        }
        </script>
    </body>
    </html>
    ''')

if __name__ == '__main__':
    app.run(debug=True)

2. FastAPI 流式响应

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

async def generate_stream(prompt: str):
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=800
    )
    
    for chunk in response:
        if chunk.choices[0].delta.content is not None:
            yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
    
    yield f"data: {json.dumps({'done': True})}\n\n"

@app.get("/stream/{prompt}")
async def stream_endpoint(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/plain"
    )

实际应用场景

1. 聊天机器人

python
class StreamingChatBot:
    def __init__(self):
        self.conversation_history = []
    
    def chat(self, user_input):
        self.conversation_history.append({"role": "user", "content": user_input})
        
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=self.conversation_history,
            stream=True,
            max_tokens=500
        )
        
        assistant_response = ""
        print("AI: ", end="", flush=True)
        
        for chunk in response:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                assistant_response += content
                print(content, end="", flush=True)
        
        print()  # 换行
        self.conversation_history.append({"role": "assistant", "content": assistant_response})
        return assistant_response

# 使用示例
bot = StreamingChatBot()
while True:
    user_input = input("您: ")
    if user_input.lower() in ['退出', 'quit', 'exit']:
        break
    bot.chat(user_input)

2. 代码生成器

python
def stream_code_generation(description, language="python"):
    prompt = f"用{language}语言实现以下功能:{description}\n\n请提供完整的代码实现:"
    
    response = client.chat.completions.create(
        model="deepseek-coder",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=1000
    )
    
    print(f"正在生成{language}代码...")
    print("=" * 50)
    
    for chunk in response:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)
    
    print("\n" + "=" * 50)

# 使用示例
stream_code_generation("一个简单的计算器类", "python")

3. 文档生成器

python
def stream_documentation(code_snippet):
    prompt = f"""
    为以下代码生成详细的文档说明:
    
    ```
    {code_snippet}
    ```
    
    请包含:
    1. 功能描述
    2. 参数说明
    3. 返回值说明
    4. 使用示例
    """
    
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=800
    )
    
    print("正在生成文档...")
    print("-" * 40)
    
    for chunk in response:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)

性能优化

1. 连接池管理

python
from openai import OpenAI
import threading

class StreamingManager:
    def __init__(self, max_connections=5):
        self.client = OpenAI(
            api_key=os.getenv("DEEPSEEK_API_KEY"),
            base_url="https://api.deepseek.com"
        )
        self.semaphore = threading.Semaphore(max_connections)
    
    def stream_completion(self, prompt):
        with self.semaphore:
            response = self.client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": prompt}],
                stream=True,
                max_tokens=600
            )
            
            for chunk in response:
                if chunk.choices[0].delta.content is not None:
                    yield chunk.choices[0].delta.content

# 使用示例
manager = StreamingManager()
for content in manager.stream_completion("解释深度学习"):
    print(content, end="", flush=True)

2. 缓冲优化

python
def buffered_stream(prompt, buffer_size=10):
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=800
    )
    
    buffer = ""
    for chunk in response:
        if chunk.choices[0].delta.content is not None:
            buffer += chunk.choices[0].delta.content
            
            if len(buffer) >= buffer_size:
                print(buffer, end="", flush=True)
                buffer = ""
    
    # 输出剩余内容
    if buffer:
        print(buffer, end="", flush=True)

错误处理和重试

1. 自动重试机制

python
import time
import random

def stream_with_retry(prompt, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": prompt}],
                stream=True,
                max_tokens=600
            )
            
            for chunk in response:
                if chunk.choices[0].delta.content is not None:
                    print(chunk.choices[0].delta.content, end="", flush=True)
            
            return  # 成功完成
            
        except Exception as e:
            print(f"\n尝试 {attempt + 1} 失败: {e}")
            if attempt < max_retries - 1:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"等待 {wait_time:.2f} 秒后重试...")
                time.sleep(wait_time)
            else:
                print("所有重试都失败了")
                raise

2. 连接超时处理

python
import signal

class TimeoutError(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutError("流式响应超时")

def stream_with_timeout(prompt, timeout_seconds=30):
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout_seconds)
    
    try:
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            max_tokens=600
        )
        
        for chunk in response:
            if chunk.choices[0].delta.content is not None:
                print(chunk.choices[0].delta.content, end="", flush=True)
                signal.alarm(timeout_seconds)  # 重置超时
        
    except TimeoutError:
        print("\n流式响应超时")
    finally:
        signal.alarm(0)  # 取消超时

最佳实践

1. 用户体验优化

  • 显示加载指示器
  • 提供停止生成的选项
  • 实现内容缓存
  • 优化显示刷新频率

2. 资源管理

  • 限制并发连接数
  • 实现连接池
  • 监控内存使用
  • 及时释放资源

3. 错误处理

  • 实现重试机制
  • 提供降级方案
  • 记录错误日志
  • 用户友好的错误提示

常见问题

Q: 流式输出是否会影响响应质量?

A: 不会。流式输出只是改变了内容的传输方式,不影响生成质量。

Q: 如何停止正在进行的流式生成?

A: 可以通过中断连接或使用信号处理来停止流式生成。

Q: 流式输出是否支持所有模型?

A: 是的,DeepSeek 的所有聊天模型都支持流式输出。

Q: 流式输出的延迟如何?

A: 流式输出可以显著减少首字节时间,提供更好的实时体验。

相关资源


最后更新: 2025年1月27日

基于 DeepSeek AI 大模型技术