批量处理指南
批量处理 API 允许您一次性处理多个请求,提高效率并降低成本。
概述
批量处理适用于以下场景:
- 大量文本的批量翻译
- 批量内容生成
- 数据集的批量分析
- 离线处理任务
批量聊天完成
基本用法
bash
curl -X POST "https://api.deepseek.com/v1/batch/chat/completions" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "deepseek-chat",
"requests": [
{
"messages": [{"role": "user", "content": "什么是人工智能?"}]
},
{
"messages": [{"role": "user", "content": "解释机器学习的概念"}]
},
{
"messages": [{"role": "user", "content": "深度学习的应用领域"}]
}
]
}'
响应格式
json
{
"id": "batch_123456789",
"object": "batch.completion",
"created": 1677652288,
"model": "deepseek-chat",
"responses": [
{
"index": 0,
"response": {
"choices": [
{
"message": {
"role": "assistant",
"content": "人工智能是计算机科学的一个分支..."
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 12,
"completion_tokens": 85,
"total_tokens": 97
}
}
}
],
"total_usage": {
"prompt_tokens": 36,
"completion_tokens": 255,
"total_tokens": 291
}
}
异步批量处理
提交批量任务
python
import requests
import json
def submit_batch_job(requests_data):
url = "https://api.deepseek.com/v1/batch/jobs"
headers = {
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-chat",
"requests": requests_data,
"callback_url": "https://your-domain.com/batch-callback"
}
response = requests.post(url, headers=headers, json=payload)
return response.json()
# 提交任务
batch_requests = [
{"messages": [{"role": "user", "content": f"分析文本 {i}"}]}
for i in range(100)
]
job = submit_batch_job(batch_requests)
print(f"批量任务 ID: {job['job_id']}")
查询任务状态
python
def check_batch_status(job_id):
url = f"https://api.deepseek.com/v1/batch/jobs/{job_id}"
headers = {"Authorization": "Bearer YOUR_API_KEY"}
response = requests.get(url, headers=headers)
return response.json()
# 查询状态
status = check_batch_status(job['job_id'])
print(f"任务状态: {status['status']}")
print(f"完成进度: {status['completed']}/{status['total']}")
文件批量处理
上传文件
python
def upload_batch_file(file_path):
url = "https://api.deepseek.com/v1/files"
headers = {"Authorization": "Bearer YOUR_API_KEY"}
with open(file_path, 'rb') as f:
files = {'file': f}
data = {'purpose': 'batch'}
response = requests.post(url, headers=headers, files=files, data=data)
return response.json()
# 上传 JSONL 文件
file_info = upload_batch_file("batch_requests.jsonl")
print(f"文件 ID: {file_info['id']}")
JSONL 文件格式
jsonl
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-chat", "messages": [{"role": "user", "content": "Hello"}]}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-chat", "messages": [{"role": "user", "content": "How are you?"}]}}
创建批量任务
python
def create_batch_from_file(input_file_id):
url = "https://api.deepseek.com/v1/batches"
headers = {
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
}
payload = {
"input_file_id": input_file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}
response = requests.post(url, headers=headers, json=payload)
return response.json()
# 创建批量任务
batch = create_batch_from_file(file_info['id'])
print(f"批量任务 ID: {batch['id']}")
性能优化
批量大小优化
python
# 推荐的批量大小
OPTIMAL_BATCH_SIZES = {
"deepseek-chat": 50,
"deepseek-coder": 30,
"deepseek-math": 20
}
def split_into_batches(requests, model):
batch_size = OPTIMAL_BATCH_SIZES.get(model, 50)
return [requests[i:i + batch_size]
for i in range(0, len(requests), batch_size)]
并发处理
python
import asyncio
import aiohttp
async def process_batch_async(session, batch_data):
url = "https://api.deepseek.com/v1/batch/chat/completions"
headers = {"Authorization": "Bearer YOUR_API_KEY"}
async with session.post(url, headers=headers, json=batch_data) as response:
return await response.json()
async def process_multiple_batches(batches):
async with aiohttp.ClientSession() as session:
tasks = [process_batch_async(session, batch) for batch in batches]
results = await asyncio.gather(*tasks)
return results
# 并发处理多个批次
batches = split_into_batches(all_requests, "deepseek-chat")
results = asyncio.run(process_multiple_batches(batches))
错误处理
重试机制
python
import time
from typing import List, Dict
def process_batch_with_retry(batch_data: Dict, max_retries: int = 3) -> Dict:
for attempt in range(max_retries):
try:
response = requests.post(
"https://api.deepseek.com/v1/batch/chat/completions",
headers={"Authorization": "Bearer YOUR_API_KEY"},
json=batch_data,
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise e
# 指数退避
wait_time = 2 ** attempt
print(f"请求失败,{wait_time}秒后重试...")
time.sleep(wait_time)
部分失败处理
python
def handle_partial_failures(batch_response):
successful_responses = []
failed_requests = []
for i, response in enumerate(batch_response.get('responses', [])):
if 'error' in response:
failed_requests.append({
'index': i,
'error': response['error']
})
else:
successful_responses.append(response)
return successful_responses, failed_requests
# 处理部分失败
success, failures = handle_partial_failures(batch_result)
print(f"成功: {len(success)}, 失败: {len(failures)}")
成本优化
Token 使用优化
python
def optimize_batch_requests(requests):
"""优化批量请求以减少 token 使用"""
optimized = []
for req in requests:
# 移除重复的系统消息
messages = req['messages']
if len(messages) > 1 and messages[0]['role'] == 'system':
# 合并相同的系统消息
system_msg = messages[0]['content']
user_msgs = [msg for msg in messages[1:] if msg['role'] == 'user']
optimized.append({
'messages': [{'role': 'system', 'content': system_msg}] + user_msgs
})
else:
optimized.append(req)
return optimized
批量定价
处理方式 | 价格优势 | 适用场景 |
---|---|---|
实时批量 | 10% 折扣 | 中等规模处理 |
异步批量 | 20% 折扣 | 大规模离线处理 |
文件批量 | 30% 折扣 | 超大规模数据处理 |
监控和调试
批量任务监控
python
def monitor_batch_job(job_id):
while True:
status = check_batch_status(job_id)
print(f"状态: {status['status']}")
print(f"进度: {status['completed']}/{status['total']}")
print(f"成功率: {status['success_rate']:.2%}")
if status['status'] in ['completed', 'failed', 'cancelled']:
break
time.sleep(30) # 每30秒检查一次
性能指标
python
def analyze_batch_performance(batch_result):
total_requests = len(batch_result['responses'])
total_tokens = batch_result['total_usage']['total_tokens']
processing_time = batch_result['processing_time']
metrics = {
'throughput': total_requests / processing_time,
'tokens_per_second': total_tokens / processing_time,
'average_tokens_per_request': total_tokens / total_requests,
'cost_efficiency': calculate_batch_savings(batch_result)
}
return metrics
最佳实践
- 合理分批: 根据模型和内容复杂度选择合适的批量大小
- 异步处理: 对于大量数据使用异步批量处理
- 错误处理: 实现完善的重试和错误恢复机制
- 监控跟踪: 监控批量任务的执行状态和性能
- 成本控制: 利用批量折扣优化处理成本