Skip to content

批量处理指南

批量处理 API 允许您一次性处理多个请求,提高效率并降低成本。

概述

批量处理适用于以下场景:

  • 大量文本的批量翻译
  • 批量内容生成
  • 数据集的批量分析
  • 离线处理任务

批量聊天完成

基本用法

bash
curl -X POST "https://api.deepseek.com/v1/batch/chat/completions" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "deepseek-chat",
    "requests": [
      {
        "messages": [{"role": "user", "content": "什么是人工智能?"}]
      },
      {
        "messages": [{"role": "user", "content": "解释机器学习的概念"}]
      },
      {
        "messages": [{"role": "user", "content": "深度学习的应用领域"}]
      }
    ]
  }'

响应格式

json
{
  "id": "batch_123456789",
  "object": "batch.completion",
  "created": 1677652288,
  "model": "deepseek-chat",
  "responses": [
    {
      "index": 0,
      "response": {
        "choices": [
          {
            "message": {
              "role": "assistant",
              "content": "人工智能是计算机科学的一个分支..."
            },
            "finish_reason": "stop"
          }
        ],
        "usage": {
          "prompt_tokens": 12,
          "completion_tokens": 85,
          "total_tokens": 97
        }
      }
    }
  ],
  "total_usage": {
    "prompt_tokens": 36,
    "completion_tokens": 255,
    "total_tokens": 291
  }
}

异步批量处理

提交批量任务

python
import requests
import json

def submit_batch_job(requests_data):
    url = "https://api.deepseek.com/v1/batch/jobs"
    headers = {
        "Authorization": "Bearer YOUR_API_KEY",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": "deepseek-chat",
        "requests": requests_data,
        "callback_url": "https://your-domain.com/batch-callback"
    }
    
    response = requests.post(url, headers=headers, json=payload)
    return response.json()

# 提交任务
batch_requests = [
    {"messages": [{"role": "user", "content": f"分析文本 {i}"}]}
    for i in range(100)
]

job = submit_batch_job(batch_requests)
print(f"批量任务 ID: {job['job_id']}")

查询任务状态

python
def check_batch_status(job_id):
    url = f"https://api.deepseek.com/v1/batch/jobs/{job_id}"
    headers = {"Authorization": "Bearer YOUR_API_KEY"}
    
    response = requests.get(url, headers=headers)
    return response.json()

# 查询状态
status = check_batch_status(job['job_id'])
print(f"任务状态: {status['status']}")
print(f"完成进度: {status['completed']}/{status['total']}")

文件批量处理

上传文件

python
def upload_batch_file(file_path):
    url = "https://api.deepseek.com/v1/files"
    headers = {"Authorization": "Bearer YOUR_API_KEY"}
    
    with open(file_path, 'rb') as f:
        files = {'file': f}
        data = {'purpose': 'batch'}
        response = requests.post(url, headers=headers, files=files, data=data)
    
    return response.json()

# 上传 JSONL 文件
file_info = upload_batch_file("batch_requests.jsonl")
print(f"文件 ID: {file_info['id']}")

JSONL 文件格式

jsonl
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-chat", "messages": [{"role": "user", "content": "Hello"}]}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-chat", "messages": [{"role": "user", "content": "How are you?"}]}}

创建批量任务

python
def create_batch_from_file(input_file_id):
    url = "https://api.deepseek.com/v1/batches"
    headers = {
        "Authorization": "Bearer YOUR_API_KEY",
        "Content-Type": "application/json"
    }
    
    payload = {
        "input_file_id": input_file_id,
        "endpoint": "/v1/chat/completions",
        "completion_window": "24h"
    }
    
    response = requests.post(url, headers=headers, json=payload)
    return response.json()

# 创建批量任务
batch = create_batch_from_file(file_info['id'])
print(f"批量任务 ID: {batch['id']}")

性能优化

批量大小优化

python
# 推荐的批量大小
OPTIMAL_BATCH_SIZES = {
    "deepseek-chat": 50,
    "deepseek-coder": 30,
    "deepseek-math": 20
}

def split_into_batches(requests, model):
    batch_size = OPTIMAL_BATCH_SIZES.get(model, 50)
    return [requests[i:i + batch_size] 
            for i in range(0, len(requests), batch_size)]

并发处理

python
import asyncio
import aiohttp

async def process_batch_async(session, batch_data):
    url = "https://api.deepseek.com/v1/batch/chat/completions"
    headers = {"Authorization": "Bearer YOUR_API_KEY"}
    
    async with session.post(url, headers=headers, json=batch_data) as response:
        return await response.json()

async def process_multiple_batches(batches):
    async with aiohttp.ClientSession() as session:
        tasks = [process_batch_async(session, batch) for batch in batches]
        results = await asyncio.gather(*tasks)
    return results

# 并发处理多个批次
batches = split_into_batches(all_requests, "deepseek-chat")
results = asyncio.run(process_multiple_batches(batches))

错误处理

重试机制

python
import time
from typing import List, Dict

def process_batch_with_retry(batch_data: Dict, max_retries: int = 3) -> Dict:
    for attempt in range(max_retries):
        try:
            response = requests.post(
                "https://api.deepseek.com/v1/batch/chat/completions",
                headers={"Authorization": "Bearer YOUR_API_KEY"},
                json=batch_data,
                timeout=60
            )
            response.raise_for_status()
            return response.json()
            
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                raise e
            
            # 指数退避
            wait_time = 2 ** attempt
            print(f"请求失败,{wait_time}秒后重试...")
            time.sleep(wait_time)

部分失败处理

python
def handle_partial_failures(batch_response):
    successful_responses = []
    failed_requests = []
    
    for i, response in enumerate(batch_response.get('responses', [])):
        if 'error' in response:
            failed_requests.append({
                'index': i,
                'error': response['error']
            })
        else:
            successful_responses.append(response)
    
    return successful_responses, failed_requests

# 处理部分失败
success, failures = handle_partial_failures(batch_result)
print(f"成功: {len(success)}, 失败: {len(failures)}")

成本优化

Token 使用优化

python
def optimize_batch_requests(requests):
    """优化批量请求以减少 token 使用"""
    optimized = []
    
    for req in requests:
        # 移除重复的系统消息
        messages = req['messages']
        if len(messages) > 1 and messages[0]['role'] == 'system':
            # 合并相同的系统消息
            system_msg = messages[0]['content']
            user_msgs = [msg for msg in messages[1:] if msg['role'] == 'user']
            optimized.append({
                'messages': [{'role': 'system', 'content': system_msg}] + user_msgs
            })
        else:
            optimized.append(req)
    
    return optimized

批量定价

处理方式价格优势适用场景
实时批量10% 折扣中等规模处理
异步批量20% 折扣大规模离线处理
文件批量30% 折扣超大规模数据处理

监控和调试

批量任务监控

python
def monitor_batch_job(job_id):
    while True:
        status = check_batch_status(job_id)
        
        print(f"状态: {status['status']}")
        print(f"进度: {status['completed']}/{status['total']}")
        print(f"成功率: {status['success_rate']:.2%}")
        
        if status['status'] in ['completed', 'failed', 'cancelled']:
            break
            
        time.sleep(30)  # 每30秒检查一次

性能指标

python
def analyze_batch_performance(batch_result):
    total_requests = len(batch_result['responses'])
    total_tokens = batch_result['total_usage']['total_tokens']
    processing_time = batch_result['processing_time']
    
    metrics = {
        'throughput': total_requests / processing_time,
        'tokens_per_second': total_tokens / processing_time,
        'average_tokens_per_request': total_tokens / total_requests,
        'cost_efficiency': calculate_batch_savings(batch_result)
    }
    
    return metrics

最佳实践

  1. 合理分批: 根据模型和内容复杂度选择合适的批量大小
  2. 异步处理: 对于大量数据使用异步批量处理
  3. 错误处理: 实现完善的重试和错误恢复机制
  4. 监控跟踪: 监控批量任务的执行状态和性能
  5. 成本控制: 利用批量折扣优化处理成本

相关链接

基于 DeepSeek AI 大模型技术