流式输出指南
流式输出是 DeepSeek API 的重要功能,允许您实时接收模型生成的内容,而不需要等待完整响应。这对于提升用户体验和构建实时应用程序非常有用。
概述
流式输出的优势:
- 实时响应: 用户可以立即看到生成的内容
- 更好的用户体验: 减少等待时间,提供流畅的交互
- 长文本处理: 适合生成长篇内容的场景
- 资源优化: 可以提前处理部分结果
基本用法
Python 示例
python
import os
from openai import OpenAI
client = OpenAI(
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com"
)
# 启用流式输出
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "user", "content": "写一篇关于人工智能的短文"}
],
stream=True, # 启用流式输出
max_tokens=500
)
# 处理流式响应
for chunk in response:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Node.js 示例
javascript
import OpenAI from 'openai';
const client = new OpenAI({
apiKey: process.env.DEEPSEEK_API_KEY,
baseURL: 'https://api.deepseek.com'
});
async function streamCompletion() {
const stream = await client.chat.completions.create({
model: 'deepseek-chat',
messages: [
{ role: 'user', content: '写一篇关于人工智能的短文' }
],
stream: true,
max_tokens: 500
});
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
process.stdout.write(chunk.choices[0].delta.content);
}
}
}
streamCompletion();
cURL 示例
bash
curl -X POST "https://api.deepseek.com/chat/completions" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPSEEK_API_KEY" \
-d '{
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": "写一篇关于人工智能的短文"}
],
"stream": true,
"max_tokens": 500
}' \
--no-buffer
高级用法
1. 带错误处理的流式处理
python
import json
from openai import OpenAI
def stream_with_error_handling(prompt):
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=800
)
full_content = ""
for chunk in response:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_content += content
print(content, end="", flush=True)
return full_content
except Exception as e:
print(f"\n错误: {e}")
return None
# 使用示例
result = stream_with_error_handling("解释量子计算的基本原理")
2. 实时内容过滤
python
import re
def stream_with_filter(prompt, filter_words=None):
if filter_words is None:
filter_words = ["敏感词1", "敏感词2"]
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=600
)
buffer = ""
for chunk in response:
if chunk.choices[0].delta.content is not None:
buffer += chunk.choices[0].delta.content
# 检查是否包含完整的词语
words = buffer.split()
if len(words) > 1:
# 处理除最后一个词外的所有词
for word in words[:-1]:
if not any(filter_word in word for filter_word in filter_words):
print(word + " ", end="", flush=True)
# 保留最后一个可能不完整的词
buffer = words[-1]
# 处理剩余内容
if buffer and not any(filter_word in buffer for filter_word in filter_words):
print(buffer, end="", flush=True)
3. 流式输出到文件
python
def stream_to_file(prompt, filename):
with open(filename, 'w', encoding='utf-8') as f:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=1000
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
f.write(content)
f.flush() # 立即写入文件
print(content, end="", flush=True)
# 使用示例
stream_to_file("写一篇关于机器学习的详细介绍", "ml_article.txt")
Web 应用集成
1. Flask 流式响应
python
from flask import Flask, Response, request, render_template_string
import json
app = Flask(__name__)
def generate_stream(prompt):
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=800
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
@app.route('/stream')
def stream():
prompt = request.args.get('prompt', '你好')
return Response(
generate_stream(prompt),
mimetype='text/plain',
headers={'Cache-Control': 'no-cache'}
)
@app.route('/')
def index():
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>DeepSeek 流式输出演示</title>
</head>
<body>
<h1>DeepSeek 流式输出演示</h1>
<input type="text" id="prompt" placeholder="输入您的问题" style="width: 400px;">
<button onclick="startStream()">开始生成</button>
<div id="output" style="margin-top: 20px; padding: 10px; border: 1px solid #ccc;"></div>
<script>
function startStream() {
const prompt = document.getElementById('prompt').value;
const output = document.getElementById('output');
output.innerHTML = '';
const eventSource = new EventSource('/stream?prompt=' + encodeURIComponent(prompt));
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.content) {
output.innerHTML += data.content;
} else if (data.done) {
eventSource.close();
}
};
}
</script>
</body>
</html>
''')
if __name__ == '__main__':
app.run(debug=True)
2. FastAPI 流式响应
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
async def generate_stream(prompt: str):
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=800
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
@app.get("/stream/{prompt}")
async def stream_endpoint(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/plain"
)
实际应用场景
1. 聊天机器人
python
class StreamingChatBot:
def __init__(self):
self.conversation_history = []
def chat(self, user_input):
self.conversation_history.append({"role": "user", "content": user_input})
response = client.chat.completions.create(
model="deepseek-chat",
messages=self.conversation_history,
stream=True,
max_tokens=500
)
assistant_response = ""
print("AI: ", end="", flush=True)
for chunk in response:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
assistant_response += content
print(content, end="", flush=True)
print() # 换行
self.conversation_history.append({"role": "assistant", "content": assistant_response})
return assistant_response
# 使用示例
bot = StreamingChatBot()
while True:
user_input = input("您: ")
if user_input.lower() in ['退出', 'quit', 'exit']:
break
bot.chat(user_input)
2. 代码生成器
python
def stream_code_generation(description, language="python"):
prompt = f"用{language}语言实现以下功能:{description}\n\n请提供完整的代码实现:"
response = client.chat.completions.create(
model="deepseek-coder",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=1000
)
print(f"正在生成{language}代码...")
print("=" * 50)
for chunk in response:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
print("\n" + "=" * 50)
# 使用示例
stream_code_generation("一个简单的计算器类", "python")
3. 文档生成器
python
def stream_documentation(code_snippet):
prompt = f"""
为以下代码生成详细的文档说明:
```
{code_snippet}
```
请包含:
1. 功能描述
2. 参数说明
3. 返回值说明
4. 使用示例
"""
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=800
)
print("正在生成文档...")
print("-" * 40)
for chunk in response:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
性能优化
1. 连接池管理
python
from openai import OpenAI
import threading
class StreamingManager:
def __init__(self, max_connections=5):
self.client = OpenAI(
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com"
)
self.semaphore = threading.Semaphore(max_connections)
def stream_completion(self, prompt):
with self.semaphore:
response = self.client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=600
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
# 使用示例
manager = StreamingManager()
for content in manager.stream_completion("解释深度学习"):
print(content, end="", flush=True)
2. 缓冲优化
python
def buffered_stream(prompt, buffer_size=10):
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=800
)
buffer = ""
for chunk in response:
if chunk.choices[0].delta.content is not None:
buffer += chunk.choices[0].delta.content
if len(buffer) >= buffer_size:
print(buffer, end="", flush=True)
buffer = ""
# 输出剩余内容
if buffer:
print(buffer, end="", flush=True)
错误处理和重试
1. 自动重试机制
python
import time
import random
def stream_with_retry(prompt, max_retries=3):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=600
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
return # 成功完成
except Exception as e:
print(f"\n尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"等待 {wait_time:.2f} 秒后重试...")
time.sleep(wait_time)
else:
print("所有重试都失败了")
raise
2. 连接超时处理
python
import signal
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("流式响应超时")
def stream_with_timeout(prompt, timeout_seconds=30):
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=600
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
signal.alarm(timeout_seconds) # 重置超时
except TimeoutError:
print("\n流式响应超时")
finally:
signal.alarm(0) # 取消超时
最佳实践
1. 用户体验优化
- 显示加载指示器
- 提供停止生成的选项
- 实现内容缓存
- 优化显示刷新频率
2. 资源管理
- 限制并发连接数
- 实现连接池
- 监控内存使用
- 及时释放资源
3. 错误处理
- 实现重试机制
- 提供降级方案
- 记录错误日志
- 用户友好的错误提示
常见问题
Q: 流式输出是否会影响响应质量?
A: 不会。流式输出只是改变了内容的传输方式,不影响生成质量。
Q: 如何停止正在进行的流式生成?
A: 可以通过中断连接或使用信号处理来停止流式生成。
Q: 流式输出是否支持所有模型?
A: 是的,DeepSeek 的所有聊天模型都支持流式输出。
Q: 流式输出的延迟如何?
A: 流式输出可以显著减少首字节时间,提供更好的实时体验。
相关资源
最后更新: 2025年1月27日