性能优化指南
本指南将帮助您优化 DeepSeek API 的性能,提高响应速度,降低延迟,并提升整体用户体验。
性能基准
响应时间基准
模型 | 平均延迟 | P95 延迟 | P99 延迟 |
---|---|---|---|
deepseek-chat | 800ms | 1.2s | 2.0s |
deepseek-coder | 600ms | 1.0s | 1.8s |
deepseek-math | 1.0s | 1.5s | 2.5s |
吞吐量基准
模型 | QPS | 并发连接 | Token/秒 |
---|---|---|---|
deepseek-chat | 1000 | 500 | 50,000 |
deepseek-coder | 800 | 400 | 40,000 |
deepseek-math | 600 | 300 | 30,000 |
请求优化
1. 参数优化
python
# 优化的请求参数
optimized_request = {
"model": "deepseek-chat",
"messages": messages,
"max_tokens": 1000, # 设置合理的最大 token 数
"temperature": 0.7, # 根据需求调整创造性
"top_p": 0.9, # 使用 top_p 而不是 temperature
"stream": True, # 启用流式输出
"stop": ["\\n\\n"] # 设置停止词减少不必要的生成
}
2. 消息优化
python
def optimize_messages(messages):
"""优化消息以提高性能"""
optimized = []
# 合并连续的用户消息
current_role = None
current_content = []
for msg in messages:
if msg['role'] == current_role and msg['role'] == 'user':
current_content.append(msg['content'])
else:
if current_content:
optimized.append({
'role': current_role,
'content': ' '.join(current_content)
})
current_role = msg['role']
current_content = [msg['content']]
# 添加最后一组消息
if current_content:
optimized.append({
'role': current_role,
'content': ' '.join(current_content)
})
return optimized
3. Token 优化
python
import tiktoken
def optimize_token_usage(text, max_tokens=4000):
"""优化 token 使用"""
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
if len(tokens) <= max_tokens:
return text
# 截断到最大 token 数
truncated_tokens = tokens[:max_tokens]
return encoding.decode(truncated_tokens)
# 使用示例
optimized_content = optimize_token_usage(long_text, 2000)
连接优化
1. 连接池
python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class DeepSeekClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = requests.Session()
# 配置连接池
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=20,
max_retries=Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
)
self.session.mount('https://', adapter)
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
})
def chat_completion(self, messages):
response = self.session.post(
'https://api.deepseek.com/v1/chat/completions',
json={'model': 'deepseek-chat', 'messages': messages},
timeout=30
)
return response.json()
2. HTTP/2 支持
python
import httpx
async def async_chat_completion(messages):
async with httpx.AsyncClient(http2=True) as client:
response = await client.post(
'https://api.deepseek.com/v1/chat/completions',
headers={'Authorization': 'Bearer YOUR_API_KEY'},
json={'model': 'deepseek-chat', 'messages': messages},
timeout=30.0
)
return response.json()
缓存策略
1. 响应缓存
python
import hashlib
import json
from functools import lru_cache
class CachedDeepSeekClient:
def __init__(self, api_key):
self.client = DeepSeekClient(api_key)
self.cache = {}
def _generate_cache_key(self, messages, **kwargs):
"""生成缓存键"""
content = json.dumps({
'messages': messages,
'params': kwargs
}, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def chat_completion_cached(self, messages, **kwargs):
cache_key = self._generate_cache_key(messages, **kwargs)
if cache_key in self.cache:
return self.cache[cache_key]
response = self.client.chat_completion(messages, **kwargs)
self.cache[cache_key] = response
return response
2. Redis 缓存
python
import redis
import json
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(host=host, port=port, db=db)
def get_cached_response(self, cache_key):
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
def cache_response(self, cache_key, response, ttl=3600):
self.redis_client.setex(
cache_key,
ttl,
json.dumps(response)
)
流式处理优化
1. 流式响应处理
python
import json
def stream_chat_completion(messages):
"""流式处理聊天完成"""
response = requests.post(
'https://api.deepseek.com/v1/chat/completions',
headers={'Authorization': 'Bearer YOUR_API_KEY'},
json={
'model': 'deepseek-chat',
'messages': messages,
'stream': True
},
stream=True
)
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:]
if data != '[DONE]':
chunk = json.loads(data)
yield chunk
# 使用流式处理
for chunk in stream_chat_completion(messages):
if 'choices' in chunk:
delta = chunk['choices'][0]['delta']
if 'content' in delta:
print(delta['content'], end='', flush=True)
2. 异步流式处理
python
import asyncio
import aiohttp
import json
async def async_stream_chat(messages):
"""异步流式聊天"""
async with aiohttp.ClientSession() as session:
async with session.post(
'https://api.deepseek.com/v1/chat/completions',
headers={'Authorization': 'Bearer YOUR_API_KEY'},
json={
'model': 'deepseek-chat',
'messages': messages,
'stream': True
}
) as response:
async for line in response.content:
if line:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
data = line[6:]
if data != '[DONE]':
chunk = json.loads(data)
yield chunk
并发优化
1. 异步并发
python
import asyncio
import aiohttp
async def process_multiple_requests(request_list):
"""并发处理多个请求"""
async with aiohttp.ClientSession() as session:
tasks = []
for request_data in request_list:
task = asyncio.create_task(
make_async_request(session, request_data)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def make_async_request(session, request_data):
"""发送异步请求"""
async with session.post(
'https://api.deepseek.com/v1/chat/completions',
headers={'Authorization': 'Bearer YOUR_API_KEY'},
json=request_data,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
return await response.json()
2. 线程池并发
python
from concurrent.futures import ThreadPoolExecutor
import threading
class ThreadSafeClient:
def __init__(self, api_key, max_workers=10):
self.api_key = api_key
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.local = threading.local()
def get_session(self):
if not hasattr(self.local, 'session'):
self.local.session = requests.Session()
self.local.session.headers.update({
'Authorization': f'Bearer {self.api_key}'
})
return self.local.session
def chat_completion(self, messages):
session = self.get_session()
response = session.post(
'https://api.deepseek.com/v1/chat/completions',
json={'model': 'deepseek-chat', 'messages': messages}
)
return response.json()
def process_batch(self, message_list):
futures = [
self.executor.submit(self.chat_completion, messages)
for messages in message_list
]
results = []
for future in futures:
try:
result = future.result(timeout=30)
results.append(result)
except Exception as e:
results.append({'error': str(e)})
return results
地域优化
1. 端点选择
python
REGIONAL_ENDPOINTS = {
'us-east': 'https://us-east.api.deepseek.com',
'us-west': 'https://us-west.api.deepseek.com',
'eu-west': 'https://eu-west.api.deepseek.com',
'ap-southeast': 'https://ap-southeast.api.deepseek.com',
'ap-northeast': 'https://ap-northeast.api.deepseek.com'
}
def get_optimal_endpoint(user_location):
"""根据用户位置选择最优端点"""
location_mapping = {
'US': 'us-east',
'CA': 'us-west',
'GB': 'eu-west',
'DE': 'eu-west',
'SG': 'ap-southeast',
'JP': 'ap-northeast',
'CN': 'ap-northeast'
}
region = location_mapping.get(user_location, 'us-east')
return REGIONAL_ENDPOINTS[region]
2. 延迟测试
python
import time
import statistics
def test_endpoint_latency(endpoint, iterations=5):
"""测试端点延迟"""
latencies = []
for _ in range(iterations):
start_time = time.time()
try:
response = requests.get(f"{endpoint}/v1/models", timeout=5)
if response.status_code == 200:
latency = (time.time() - start_time) * 1000
latencies.append(latency)
except:
continue
if latencies:
return {
'avg_latency': statistics.mean(latencies),
'min_latency': min(latencies),
'max_latency': max(latencies)
}
return None
# 测试所有端点
for region, endpoint in REGIONAL_ENDPOINTS.items():
latency_info = test_endpoint_latency(endpoint)
if latency_info:
print(f"{region}: {latency_info['avg_latency']:.2f}ms")
监控和调试
1. 性能监控
python
import time
from functools import wraps
def monitor_performance(func):
"""性能监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
except Exception as e:
result = {'error': str(e)}
success = False
end_time = time.time()
duration = end_time - start_time
# 记录性能指标
log_performance_metric({
'function': func.__name__,
'duration': duration,
'success': success,
'timestamp': start_time
})
return result
return wrapper
@monitor_performance
def monitored_chat_completion(messages):
return client.chat_completion(messages)
2. 性能分析
python
class PerformanceAnalyzer:
def __init__(self):
self.metrics = []
def add_metric(self, metric):
self.metrics.append(metric)
def analyze_performance(self):
if not self.metrics:
return {}
durations = [m['duration'] for m in self.metrics if m['success']]
success_rate = sum(1 for m in self.metrics if m['success']) / len(self.metrics)
return {
'total_requests': len(self.metrics),
'success_rate': success_rate,
'avg_duration': statistics.mean(durations) if durations else 0,
'p95_duration': statistics.quantiles(durations, n=20)[18] if len(durations) > 20 else 0,
'p99_duration': statistics.quantiles(durations, n=100)[98] if len(durations) > 100 else 0
}
最佳实践总结
请求优化
- 使用合适的参数设置
- 优化消息结构和 token 使用
- 启用流式输出提升用户体验
连接优化
- 使用连接池复用连接
- 启用 HTTP/2 支持
- 实现合理的重试机制
缓存策略
- 缓存重复请求的响应
- 使用 Redis 等外部缓存
- 设置合理的缓存过期时间
并发处理
- 使用异步编程提高并发性
- 合理设置并发数量
- 实现负载均衡
监控调试
- 监控关键性能指标
- 分析性能瓶颈
- 持续优化和改进