Skip to content

Performance Guide

Optimize your DeepSeek API usage for maximum performance, efficiency, and cost-effectiveness.

Overview

This guide covers:

  • Request optimization: Minimize latency and maximize throughput
  • Token efficiency: Reduce costs and improve response times
  • Caching strategies: Implement smart caching for repeated requests
  • Batch processing: Handle multiple requests efficiently
  • Rate limit management: Optimize within API constraints
  • Monitoring and metrics: Track and improve performance

Request Optimization

Connection Management

python
import httpx
from openai import OpenAI
import time

class OptimizedDeepSeekClient:
    """Optimized client with connection pooling and reuse"""
    
    def __init__(self, api_key: str):
        # Configure HTTP client with connection pooling
        http_client = httpx.Client(
            limits=httpx.Limits(
                max_keepalive_connections=20,
                max_connections=100,
                keepalive_expiry=30
            ),
            timeout=httpx.Timeout(
                connect=10.0,
                read=60.0,
                write=10.0,
                pool=5.0
            )
        )
        
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1",
            http_client=http_client
        )
    
    def create_completion(self, **kwargs):
        """Create completion with optimized settings"""
        return self.client.chat.completions.create(**kwargs)
    
    def close(self):
        """Close HTTP connections"""
        self.client.close()

# Usage
client = OptimizedDeepSeekClient("sk-your-api-key")

try:
    response = client.create_completion(
        model="deepseek-chat",
        messages=[{"role": "user", "content": "Hello!"}]
    )
finally:
    client.close()

Async Processing

python
import asyncio
import aiohttp
from typing import List, Dict, Any

class AsyncDeepSeekClient:
    """Async client for concurrent requests"""
    
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.base_url = "https://api.deepseek.com/v1"
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
    async def create_completion(self, session: aiohttp.ClientSession, **kwargs) -> Dict[str, Any]:
        """Create async completion"""
        
        async with self.semaphore:
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }
            
            async with session.post(
                f"{self.base_url}/chat/completions",
                json=kwargs,
                headers=headers
            ) as response:
                return await response.json()
    
    async def batch_completions(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Process multiple requests concurrently"""
        
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=20,
            keepalive_timeout=30
        )
        
        timeout = aiohttp.ClientTimeout(total=60)
        
        async with aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        ) as session:
            tasks = [
                self.create_completion(session, **request)
                for request in requests
            ]
            
            return await asyncio.gather(*tasks, return_exceptions=True)

# Usage
async def main():
    client = AsyncDeepSeekClient("sk-your-api-key", max_concurrent=5)
    
    requests = [
        {
            "model": "deepseek-chat",
            "messages": [{"role": "user", "content": f"Question {i}"}],
            "max_tokens": 100
        }
        for i in range(10)
    ]
    
    start_time = time.time()
    results = await client.batch_completions(requests)
    end_time = time.time()
    
    print(f"Processed {len(requests)} requests in {end_time - start_time:.2f} seconds")
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Request {i} failed: {result}")
        else:
            print(f"Request {i} succeeded")

# Run async example
# asyncio.run(main())

Request Batching

python
from dataclasses import dataclass
from typing import List, Optional
import time
from collections import defaultdict

@dataclass
class BatchRequest:
    """Individual request in a batch"""
    id: str
    model: str
    messages: List[Dict[str, str]]
    max_tokens: Optional[int] = None
    temperature: Optional[float] = None
    
class RequestBatcher:
    """Batch requests for efficient processing"""
    
    def __init__(self, batch_size: int = 10, max_wait_time: float = 1.0):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.pending_requests = []
        self.results = {}
        
    def add_request(self, request: BatchRequest) -> str:
        """Add request to batch"""
        self.pending_requests.append(request)
        
        # Process batch if full or timeout
        if len(self.pending_requests) >= self.batch_size:
            self._process_batch()
        
        return request.id
    
    def _process_batch(self):
        """Process current batch of requests"""
        if not self.pending_requests:
            return
        
        batch = self.pending_requests.copy()
        self.pending_requests.clear()
        
        # Group by model for efficiency
        model_groups = defaultdict(list)
        for request in batch:
            model_groups[request.model].append(request)
        
        # Process each model group
        for model, requests in model_groups.items():
            self._process_model_group(model, requests)
    
    def _process_model_group(self, model: str, requests: List[BatchRequest]):
        """Process requests for a specific model"""
        
        for request in requests:
            try:
                response = client.chat.completions.create(
                    model=request.model,
                    messages=request.messages,
                    max_tokens=request.max_tokens,
                    temperature=request.temperature
                )
                
                self.results[request.id] = {
                    "success": True,
                    "response": response.choices[0].message.content
                }
                
            except Exception as e:
                self.results[request.id] = {
                    "success": False,
                    "error": str(e)
                }
    
    def get_result(self, request_id: str, timeout: float = 10.0) -> Dict[str, Any]:
        """Get result for a specific request"""
        
        start_time = time.time()
        
        while request_id not in self.results:
            if time.time() - start_time > timeout:
                return {"success": False, "error": "Timeout waiting for result"}
            
            # Process pending batch if timeout reached
            if (time.time() - start_time > self.max_wait_time and 
                self.pending_requests):
                self._process_batch()
            
            time.sleep(0.1)
        
        return self.results.pop(request_id)
    
    def flush(self):
        """Process all pending requests"""
        self._process_batch()

# Usage
batcher = RequestBatcher(batch_size=5, max_wait_time=2.0)

# Add requests
request_ids = []
for i in range(10):
    request = BatchRequest(
        id=f"req_{i}",
        model="deepseek-chat",
        messages=[{"role": "user", "content": f"Question {i}"}],
        max_tokens=50
    )
    request_ids.append(batcher.add_request(request))

# Get results
for req_id in request_ids:
    result = batcher.get_result(req_id)
    if result["success"]:
        print(f"{req_id}: {result['response']}")
    else:
        print(f"{req_id}: Error - {result['error']}")

Token Optimization

Token Counting and Estimation

python
import tiktoken
from typing import List, Dict

class TokenOptimizer:
    """Optimize token usage for cost and performance"""
    
    def __init__(self, model: str = "deepseek-chat"):
        self.model = model
        # Use cl100k_base encoding as approximation
        self.encoding = tiktoken.get_encoding("cl100k_base")
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in text"""
        return len(self.encoding.encode(text))
    
    def count_message_tokens(self, messages: List[Dict[str, str]]) -> int:
        """Count tokens in message list"""
        total_tokens = 0
        
        for message in messages:
            # Add tokens for role and content
            total_tokens += self.count_tokens(message.get("role", ""))
            total_tokens += self.count_tokens(message.get("content", ""))
            # Add overhead tokens per message
            total_tokens += 4
        
        # Add overhead for the conversation
        total_tokens += 2
        
        return total_tokens
    
    def estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Estimate cost based on token usage"""
        # Example pricing (adjust based on actual rates)
        input_cost_per_1k = 0.0014  # $0.0014 per 1K input tokens
        output_cost_per_1k = 0.0028  # $0.0028 per 1K output tokens
        
        input_cost = (input_tokens / 1000) * input_cost_per_1k
        output_cost = (output_tokens / 1000) * output_cost_per_1k
        
        return input_cost + output_cost
    
    def optimize_messages(self, messages: List[Dict[str, str]], max_tokens: int) -> List[Dict[str, str]]:
        """Optimize messages to fit within token limit"""
        
        optimized = []
        current_tokens = 0
        
        # Always include the last message (usually the current question)
        if messages:
            last_message = messages[-1]
            last_tokens = self.count_message_tokens([last_message])
            
            if last_tokens <= max_tokens:
                optimized.append(last_message)
                current_tokens = last_tokens
                
                # Add previous messages in reverse order
                for message in reversed(messages[:-1]):
                    message_tokens = self.count_message_tokens([message])
                    
                    if current_tokens + message_tokens <= max_tokens:
                        optimized.insert(0, message)
                        current_tokens += message_tokens
                    else:
                        break
        
        return optimized
    
    def truncate_content(self, content: str, max_tokens: int) -> str:
        """Truncate content to fit within token limit"""
        
        tokens = self.encoding.encode(content)
        
        if len(tokens) <= max_tokens:
            return content
        
        # Truncate and decode
        truncated_tokens = tokens[:max_tokens]
        return self.encoding.decode(truncated_tokens)
    
    def smart_summarize_context(self, messages: List[Dict[str, str]], target_tokens: int) -> List[Dict[str, str]]:
        """Summarize older messages to reduce token count"""
        
        current_tokens = self.count_message_tokens(messages)
        
        if current_tokens <= target_tokens:
            return messages
        
        # Keep recent messages, summarize older ones
        keep_recent = 3  # Keep last 3 messages
        recent_messages = messages[-keep_recent:]
        older_messages = messages[:-keep_recent]
        
        if older_messages:
            # Create summary of older messages
            summary_content = "Previous conversation summary: "
            for msg in older_messages:
                summary_content += f"{msg['role']}: {msg['content'][:100]}... "
            
            summary_message = {
                "role": "system",
                "content": self.truncate_content(summary_content, target_tokens // 4)
            }
            
            return [summary_message] + recent_messages
        
        return recent_messages

# Usage
optimizer = TokenOptimizer()

messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is machine learning?"},
    {"role": "assistant", "content": "Machine learning is..."},
    {"role": "user", "content": "Can you give me examples?"}
]

# Count tokens
total_tokens = optimizer.count_message_tokens(messages)
print(f"Total tokens: {total_tokens}")

# Optimize for token limit
optimized_messages = optimizer.optimize_messages(messages, max_tokens=1000)
print(f"Optimized to {len(optimized_messages)} messages")

# Estimate cost
estimated_cost = optimizer.estimate_cost(input_tokens=total_tokens, output_tokens=150)
print(f"Estimated cost: ${estimated_cost:.4f}")

Prompt Optimization

python
class PromptOptimizer:
    """Optimize prompts for efficiency and effectiveness"""
    
    def __init__(self):
        self.token_optimizer = TokenOptimizer()
    
    def compress_prompt(self, prompt: str) -> str:
        """Compress prompt while maintaining meaning"""
        
        # Remove unnecessary whitespace
        compressed = " ".join(prompt.split())
        
        # Replace verbose phrases with concise alternatives
        replacements = {
            "Please provide a detailed explanation of": "Explain",
            "I would like you to": "Please",
            "Can you help me understand": "Explain",
            "It would be great if you could": "Please",
            "I need assistance with": "Help with",
            "Could you please tell me": "What is",
            "I am interested in learning about": "Explain",
        }
        
        for verbose, concise in replacements.items():
            compressed = compressed.replace(verbose, concise)
        
        return compressed
    
    def create_efficient_system_prompt(self, role: str, constraints: List[str] = None) -> str:
        """Create efficient system prompts"""
        
        base_prompts = {
            "assistant": "You are a helpful AI assistant.",
            "coder": "You are an expert programmer.",
            "analyst": "You are a data analyst.",
            "writer": "You are a professional writer.",
            "teacher": "You are an educational tutor."
        }
        
        prompt = base_prompts.get(role, "You are a helpful AI assistant.")
        
        if constraints:
            # Add constraints efficiently
            constraint_text = " ".join(constraints)
            prompt += f" {constraint_text}"
        
        return self.compress_prompt(prompt)
    
    def optimize_few_shot_examples(self, examples: List[Dict[str, str]], max_examples: int = 3) -> List[Dict[str, str]]:
        """Optimize few-shot examples for token efficiency"""
        
        if len(examples) <= max_examples:
            return examples
        
        # Score examples by length and diversity
        scored_examples = []
        
        for i, example in enumerate(examples):
            input_tokens = self.token_optimizer.count_tokens(example.get("input", ""))
            output_tokens = self.token_optimizer.count_tokens(example.get("output", ""))
            total_tokens = input_tokens + output_tokens
            
            # Prefer shorter examples
            score = 1.0 / (total_tokens + 1)
            
            scored_examples.append((score, i, example))
        
        # Sort by score and take top examples
        scored_examples.sort(reverse=True)
        
        return [example for _, _, example in scored_examples[:max_examples]]
    
    def create_template_prompt(self, task: str, input_format: str, output_format: str, examples: List[Dict[str, str]] = None) -> str:
        """Create optimized template prompt"""
        
        prompt_parts = [
            f"Task: {task}",
            f"Input: {input_format}",
            f"Output: {output_format}"
        ]
        
        if examples:
            optimized_examples = self.optimize_few_shot_examples(examples)
            
            prompt_parts.append("Examples:")
            for i, example in enumerate(optimized_examples, 1):
                prompt_parts.append(f"{i}. Input: {example['input']}")
                prompt_parts.append(f"   Output: {example['output']}")
        
        return "\n".join(prompt_parts)

# Usage
prompt_optimizer = PromptOptimizer()

# Compress verbose prompt
verbose_prompt = "Please provide a detailed explanation of how machine learning algorithms work and I would like you to include examples."
compressed = prompt_optimizer.compress_prompt(verbose_prompt)
print(f"Original: {len(verbose_prompt)} chars")
print(f"Compressed: {len(compressed)} chars")

# Create efficient system prompt
system_prompt = prompt_optimizer.create_efficient_system_prompt(
    "coder", 
    ["Be concise.", "Include examples.", "Focus on Python."]
)
print(f"System prompt: {system_prompt}")

# Optimize few-shot examples
examples = [
    {"input": "Sort list [3,1,4,1,5]", "output": "[1,1,3,4,5]"},
    {"input": "Reverse string 'hello'", "output": "'olleh'"},
    {"input": "Find max in [2,8,1,9,3]", "output": "9"}
]

template = prompt_optimizer.create_template_prompt(
    "Python operations",
    "Description of operation",
    "Result or code",
    examples
)
print(f"Template:\n{template}")

Caching Strategies

Response Caching

python
import hashlib
import json
import time
from pathlib import Path
from typing import Optional, Dict, Any

class ResponseCache:
    """Cache API responses for improved performance"""
    
    def __init__(self, cache_dir: str = "./cache", ttl: int = 3600):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.ttl = ttl  # Time to live in seconds
    
    def _get_cache_key(self, model: str, messages: List[Dict[str, str]], **kwargs) -> str:
        """Generate cache key for request"""
        
        # Create deterministic hash of request parameters
        cache_data = {
            "model": model,
            "messages": messages,
            **kwargs
        }
        
        # Sort to ensure consistent hashing
        cache_string = json.dumps(cache_data, sort_keys=True)
        return hashlib.md5(cache_string.encode()).hexdigest()
    
    def get(self, model: str, messages: List[Dict[str, str]], **kwargs) -> Optional[Dict[str, Any]]:
        """Get cached response if available and valid"""
        
        cache_key = self._get_cache_key(model, messages, **kwargs)
        cache_file = self.cache_dir / f"{cache_key}.json"
        
        if not cache_file.exists():
            return None
        
        try:
            with open(cache_file, 'r') as f:
                cached_data = json.load(f)
            
            # Check if cache is still valid
            if time.time() - cached_data['timestamp'] > self.ttl:
                cache_file.unlink()  # Remove expired cache
                return None
            
            return cached_data['response']
        
        except (json.JSONDecodeError, KeyError, FileNotFoundError):
            return None
    
    def set(self, model: str, messages: List[Dict[str, str]], response: Dict[str, Any], **kwargs):
        """Cache response"""
        
        cache_key = self._get_cache_key(model, messages, **kwargs)
        cache_file = self.cache_dir / f"{cache_key}.json"
        
        cache_data = {
            "timestamp": time.time(),
            "response": response,
            "model": model,
            "messages": messages,
            "kwargs": kwargs
        }
        
        with open(cache_file, 'w') as f:
            json.dump(cache_data, f, indent=2)
    
    def clear_expired(self):
        """Clear expired cache entries"""
        
        current_time = time.time()
        
        for cache_file in self.cache_dir.glob("*.json"):
            try:
                with open(cache_file, 'r') as f:
                    cached_data = json.load(f)
                
                if current_time - cached_data['timestamp'] > self.ttl:
                    cache_file.unlink()
            
            except (json.JSONDecodeError, KeyError, FileNotFoundError):
                cache_file.unlink()  # Remove corrupted cache
    
    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics"""
        
        cache_files = list(self.cache_dir.glob("*.json"))
        total_size = sum(f.stat().st_size for f in cache_files)
        
        return {
            "total_entries": len(cache_files),
            "total_size_mb": total_size / (1024 * 1024),
            "cache_dir": str(self.cache_dir)
        }

class CachedDeepSeekClient:
    """DeepSeek client with caching"""
    
    def __init__(self, api_key: str, cache_ttl: int = 3600):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
        self.cache = ResponseCache(ttl=cache_ttl)
        self.cache_hits = 0
        self.cache_misses = 0
    
    def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
        """Create chat completion with caching"""
        
        # Try to get from cache first
        cached_response = self.cache.get(**kwargs)
        
        if cached_response:
            self.cache_hits += 1
            print(f"✅ Cache hit! (Hits: {self.cache_hits}, Misses: {self.cache_misses})")
            return cached_response
        
        # Make API call
        self.cache_misses += 1
        print(f"🔄 Cache miss, making API call (Hits: {self.cache_hits}, Misses: {self.cache_misses})")
        
        response = self.client.chat.completions.create(**kwargs)
        
        # Convert response to dict for caching
        response_dict = {
            "id": response.id,
            "object": response.object,
            "created": response.created,
            "model": response.model,
            "choices": [
                {
                    "index": choice.index,
                    "message": {
                        "role": choice.message.role,
                        "content": choice.message.content
                    },
                    "finish_reason": choice.finish_reason
                }
                for choice in response.choices
            ],
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            }
        }
        
        # Cache the response
        self.cache.set(response=response_dict, **kwargs)
        
        return response_dict
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """Get caching statistics"""
        
        cache_stats = self.cache.get_stats()
        
        total_requests = self.cache_hits + self.cache_misses
        hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
        
        return {
            **cache_stats,
            "cache_hits": self.cache_hits,
            "cache_misses": self.cache_misses,
            "hit_rate_percent": hit_rate
        }

# Usage
cached_client = CachedDeepSeekClient("sk-your-api-key", cache_ttl=1800)

# First call - cache miss
response1 = cached_client.chat_completions_create(
    model="deepseek-chat",
    messages=[{"role": "user", "content": "What is Python?"}]
)

# Second call with same parameters - cache hit
response2 = cached_client.chat_completions_create(
    model="deepseek-chat",
    messages=[{"role": "user", "content": "What is Python?"}]
)

# Get cache statistics
stats = cached_client.get_cache_stats()
print(f"Cache statistics: {stats}")

Semantic Caching

python
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticCache:
    """Cache based on semantic similarity of requests"""
    
    def __init__(self, similarity_threshold: float = 0.85, max_entries: int = 1000):
        self.similarity_threshold = similarity_threshold
        self.max_entries = max_entries
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.cache_entries = []
        self.embeddings = []
    
    def _get_query_text(self, messages: List[Dict[str, str]]) -> str:
        """Extract query text from messages"""
        
        # Combine all user messages
        user_messages = [msg['content'] for msg in messages if msg['role'] == 'user']
        return " ".join(user_messages)
    
    def _find_similar_entry(self, query_embedding: np.ndarray) -> Optional[int]:
        """Find semantically similar cache entry"""
        
        if not self.embeddings:
            return None
        
        # Calculate similarities
        similarities = cosine_similarity([query_embedding], self.embeddings)[0]
        
        # Find best match above threshold
        best_idx = np.argmax(similarities)
        best_similarity = similarities[best_idx]
        
        if best_similarity >= self.similarity_threshold:
            return best_idx
        
        return None
    
    def get(self, messages: List[Dict[str, str]], **kwargs) -> Optional[Dict[str, Any]]:
        """Get semantically similar cached response"""
        
        query_text = self._get_query_text(messages)
        query_embedding = self.model.encode([query_text])[0]
        
        similar_idx = self._find_similar_entry(query_embedding)
        
        if similar_idx is not None:
            entry = self.cache_entries[similar_idx]
            
            # Check if other parameters match
            if entry['kwargs'] == kwargs:
                return entry['response']
        
        return None
    
    def set(self, messages: List[Dict[str, str]], response: Dict[str, Any], **kwargs):
        """Cache response with semantic indexing"""
        
        query_text = self._get_query_text(messages)
        query_embedding = self.model.encode([query_text])[0]
        
        # Add to cache
        cache_entry = {
            "messages": messages,
            "response": response,
            "kwargs": kwargs,
            "query_text": query_text,
            "timestamp": time.time()
        }
        
        self.cache_entries.append(cache_entry)
        self.embeddings.append(query_embedding)
        
        # Maintain cache size limit
        if len(self.cache_entries) > self.max_entries:
            # Remove oldest entry
            self.cache_entries.pop(0)
            self.embeddings.pop(0)
    
    def get_stats(self) -> Dict[str, Any]:
        """Get semantic cache statistics"""
        
        return {
            "total_entries": len(self.cache_entries),
            "similarity_threshold": self.similarity_threshold,
            "max_entries": self.max_entries
        }

class SemanticCachedClient:
    """Client with semantic caching"""
    
    def __init__(self, api_key: str, similarity_threshold: float = 0.85):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
        self.semantic_cache = SemanticCache(similarity_threshold)
        self.semantic_hits = 0
        self.semantic_misses = 0
    
    def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
        """Create completion with semantic caching"""
        
        # Try semantic cache
        cached_response = self.semantic_cache.get(**kwargs)
        
        if cached_response:
            self.semantic_hits += 1
            print(f"🎯 Semantic cache hit! (Hits: {self.semantic_hits}, Misses: {self.semantic_misses})")
            return cached_response
        
        # Make API call
        self.semantic_misses += 1
        print(f"🔄 Semantic cache miss (Hits: {self.semantic_hits}, Misses: {self.semantic_misses})")
        
        response = self.client.chat.completions.create(**kwargs)
        
        # Convert and cache response
        response_dict = {
            "choices": [
                {
                    "message": {
                        "content": choice.message.content
                    }
                }
                for choice in response.choices
            ]
        }
        
        self.semantic_cache.set(response=response_dict, **kwargs)
        
        return response_dict

# Usage
semantic_client = SemanticCachedClient("sk-your-api-key", similarity_threshold=0.8)

# These queries are semantically similar and should hit cache
queries = [
    "What is machine learning?",
    "Can you explain machine learning?",
    "Tell me about ML",
    "What does machine learning mean?"
]

for query in queries:
    response = semantic_client.chat_completions_create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": query}]
    )

Rate Limit Management

Rate Limiter

python
import time
from collections import deque
from threading import Lock
import asyncio

class RateLimiter:
    """Manage API rate limits"""
    
    def __init__(self, requests_per_minute: int = 60, tokens_per_minute: int = 60000):
        self.requests_per_minute = requests_per_minute
        self.tokens_per_minute = tokens_per_minute
        
        self.request_times = deque()
        self.token_usage = deque()
        
        self.lock = Lock()
    
    def _clean_old_entries(self, current_time: float):
        """Remove entries older than 1 minute"""
        
        cutoff_time = current_time - 60
        
        # Clean request times
        while self.request_times and self.request_times[0] < cutoff_time:
            self.request_times.popleft()
        
        # Clean token usage
        while self.token_usage and self.token_usage[0][0] < cutoff_time:
            self.token_usage.popleft()
    
    def can_make_request(self, estimated_tokens: int = 0) -> tuple[bool, float]:
        """Check if request can be made, return (can_make, wait_time)"""
        
        with self.lock:
            current_time = time.time()
            self._clean_old_entries(current_time)
            
            # Check request rate limit
            if len(self.request_times) >= self.requests_per_minute:
                wait_time = 60 - (current_time - self.request_times[0])
                return False, max(0, wait_time)
            
            # Check token rate limit
            current_tokens = sum(tokens for _, tokens in self.token_usage)
            if current_tokens + estimated_tokens > self.tokens_per_minute:
                # Calculate wait time based on oldest token usage
                if self.token_usage:
                    wait_time = 60 - (current_time - self.token_usage[0][0])
                    return False, max(0, wait_time)
            
            return True, 0
    
    def record_request(self, tokens_used: int = 0):
        """Record a successful request"""
        
        with self.lock:
            current_time = time.time()
            self.request_times.append(current_time)
            
            if tokens_used > 0:
                self.token_usage.append((current_time, tokens_used))
    
    def get_current_usage(self) -> Dict[str, Any]:
        """Get current rate limit usage"""
        
        with self.lock:
            current_time = time.time()
            self._clean_old_entries(current_time)
            
            current_requests = len(self.request_times)
            current_tokens = sum(tokens for _, tokens in self.token_usage)
            
            return {
                "requests_used": current_requests,
                "requests_limit": self.requests_per_minute,
                "requests_remaining": self.requests_per_minute - current_requests,
                "tokens_used": current_tokens,
                "tokens_limit": self.tokens_per_minute,
                "tokens_remaining": self.tokens_per_minute - current_tokens
            }

class RateLimitedClient:
    """Client with automatic rate limiting"""
    
    def __init__(self, api_key: str, requests_per_minute: int = 60, tokens_per_minute: int = 60000):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
        self.rate_limiter = RateLimiter(requests_per_minute, tokens_per_minute)
        self.token_optimizer = TokenOptimizer()
    
    def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
        """Create completion with rate limiting"""
        
        # Estimate tokens for rate limiting
        messages = kwargs.get('messages', [])
        estimated_tokens = self.token_optimizer.count_message_tokens(messages)
        estimated_tokens += kwargs.get('max_tokens', 150)  # Add estimated output tokens
        
        # Check rate limits
        can_make, wait_time = self.rate_limiter.can_make_request(estimated_tokens)
        
        if not can_make:
            print(f"⏳ Rate limit reached, waiting {wait_time:.2f} seconds...")
            time.sleep(wait_time)
        
        # Make request
        try:
            response = self.client.chat.completions.create(**kwargs)
            
            # Record successful request
            actual_tokens = response.usage.total_tokens
            self.rate_limiter.record_request(actual_tokens)
            
            return response
        
        except Exception as e:
            # Handle rate limit errors
            if "rate limit" in str(e).lower():
                print("🚫 Rate limit error from API, waiting 60 seconds...")
                time.sleep(60)
                return self.chat_completions_create(**kwargs)
            else:
                raise e
    
    def get_rate_limit_status(self) -> Dict[str, Any]:
        """Get current rate limit status"""
        return self.rate_limiter.get_current_usage()

# Usage
rate_limited_client = RateLimitedClient(
    "sk-your-api-key",
    requests_per_minute=50,  # Conservative limit
    tokens_per_minute=50000
)

# Make multiple requests
for i in range(10):
    print(f"Making request {i+1}...")
    
    response = rate_limited_client.chat_completions_create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": f"Question {i+1}"}],
        max_tokens=100
    )
    
    # Check rate limit status
    status = rate_limited_client.get_rate_limit_status()
    print(f"Rate limit status: {status['requests_remaining']} requests, {status['tokens_remaining']} tokens remaining")

Performance Monitoring

Metrics Collection

python
import time
import statistics
from dataclasses import dataclass, field
from typing import List, Dict, Any
import json
from datetime import datetime

@dataclass
class RequestMetrics:
    """Metrics for a single request"""
    timestamp: float
    model: str
    input_tokens: int
    output_tokens: int
    total_tokens: int
    latency: float
    success: bool
    error: str = None
    cost: float = 0.0

class PerformanceMonitor:
    """Monitor and analyze API performance"""
    
    def __init__(self):
        self.metrics: List[RequestMetrics] = []
        self.start_time = time.time()
    
    def record_request(self, metrics: RequestMetrics):
        """Record request metrics"""
        self.metrics.append(metrics)
    
    def get_summary_stats(self, time_window: int = 3600) -> Dict[str, Any]:
        """Get summary statistics for the specified time window (seconds)"""
        
        current_time = time.time()
        cutoff_time = current_time - time_window
        
        # Filter metrics within time window
        recent_metrics = [m for m in self.metrics if m.timestamp >= cutoff_time]
        
        if not recent_metrics:
            return {"error": "No metrics in time window"}
        
        # Calculate statistics
        latencies = [m.latency for m in recent_metrics]
        input_tokens = [m.input_tokens for m in recent_metrics]
        output_tokens = [m.output_tokens for m in recent_metrics]
        total_tokens = [m.total_tokens for m in recent_metrics]
        costs = [m.cost for m in recent_metrics]
        
        successful_requests = [m for m in recent_metrics if m.success]
        failed_requests = [m for m in recent_metrics if not m.success]
        
        return {
            "time_window_hours": time_window / 3600,
            "total_requests": len(recent_metrics),
            "successful_requests": len(successful_requests),
            "failed_requests": len(failed_requests),
            "success_rate": len(successful_requests) / len(recent_metrics) * 100,
            
            "latency": {
                "mean": statistics.mean(latencies),
                "median": statistics.median(latencies),
                "p95": self._percentile(latencies, 95),
                "p99": self._percentile(latencies, 99),
                "min": min(latencies),
                "max": max(latencies)
            },
            
            "tokens": {
                "total_input": sum(input_tokens),
                "total_output": sum(output_tokens),
                "total_combined": sum(total_tokens),
                "avg_input": statistics.mean(input_tokens),
                "avg_output": statistics.mean(output_tokens),
                "avg_total": statistics.mean(total_tokens)
            },
            
            "cost": {
                "total": sum(costs),
                "average_per_request": statistics.mean(costs),
                "cost_per_1k_tokens": sum(costs) / (sum(total_tokens) / 1000) if sum(total_tokens) > 0 else 0
            },
            
            "throughput": {
                "requests_per_minute": len(recent_metrics) / (time_window / 60),
                "tokens_per_minute": sum(total_tokens) / (time_window / 60)
            }
        }
    
    def _percentile(self, data: List[float], percentile: int) -> float:
        """Calculate percentile"""
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]
    
    def get_error_analysis(self) -> Dict[str, Any]:
        """Analyze errors and failures"""
        
        failed_metrics = [m for m in self.metrics if not m.success]
        
        if not failed_metrics:
            return {"total_errors": 0}
        
        # Group errors by type
        error_counts = {}
        for metric in failed_metrics:
            error_type = metric.error or "Unknown"
            error_counts[error_type] = error_counts.get(error_type, 0) + 1
        
        return {
            "total_errors": len(failed_metrics),
            "error_rate": len(failed_metrics) / len(self.metrics) * 100,
            "error_types": error_counts,
            "most_common_error": max(error_counts.items(), key=lambda x: x[1]) if error_counts else None
        }
    
    def export_metrics(self, filename: str):
        """Export metrics to JSON file"""
        
        export_data = {
            "export_timestamp": datetime.now().isoformat(),
            "total_metrics": len(self.metrics),
            "metrics": [
                {
                    "timestamp": m.timestamp,
                    "model": m.model,
                    "input_tokens": m.input_tokens,
                    "output_tokens": m.output_tokens,
                    "total_tokens": m.total_tokens,
                    "latency": m.latency,
                    "success": m.success,
                    "error": m.error,
                    "cost": m.cost
                }
                for m in self.metrics
            ]
        }
        
        with open(filename, 'w') as f:
            json.dump(export_data, f, indent=2)

class MonitoredDeepSeekClient:
    """DeepSeek client with performance monitoring"""
    
    def __init__(self, api_key: str):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
        self.monitor = PerformanceMonitor()
        self.token_optimizer = TokenOptimizer()
    
    def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
        """Create completion with monitoring"""
        
        start_time = time.time()
        
        # Estimate input tokens
        messages = kwargs.get('messages', [])
        input_tokens = self.token_optimizer.count_message_tokens(messages)
        
        try:
            response = self.client.chat.completions.create(**kwargs)
            
            # Calculate metrics
            end_time = time.time()
            latency = end_time - start_time
            
            output_tokens = response.usage.completion_tokens
            total_tokens = response.usage.total_tokens
            
            # Estimate cost
            cost = self.token_optimizer.estimate_cost(input_tokens, output_tokens)
            
            # Record metrics
            metrics = RequestMetrics(
                timestamp=start_time,
                model=kwargs.get('model', 'unknown'),
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                total_tokens=total_tokens,
                latency=latency,
                success=True,
                cost=cost
            )
            
            self.monitor.record_request(metrics)
            
            return response
        
        except Exception as e:
            # Record failed request
            end_time = time.time()
            latency = end_time - start_time
            
            metrics = RequestMetrics(
                timestamp=start_time,
                model=kwargs.get('model', 'unknown'),
                input_tokens=input_tokens,
                output_tokens=0,
                total_tokens=input_tokens,
                latency=latency,
                success=False,
                error=str(e)
            )
            
            self.monitor.record_request(metrics)
            
            raise e
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Get comprehensive performance report"""
        
        return {
            "summary_1h": self.monitor.get_summary_stats(3600),
            "summary_24h": self.monitor.get_summary_stats(86400),
            "error_analysis": self.monitor.get_error_analysis()
        }
    
    def export_performance_data(self, filename: str):
        """Export performance data"""
        self.monitor.export_metrics(filename)

# Usage
monitored_client = MonitoredDeepSeekClient("sk-your-api-key")

# Make some requests
for i in range(5):
    try:
        response = monitored_client.chat_completions_create(
            model="deepseek-chat",
            messages=[{"role": "user", "content": f"Test question {i+1}"}],
            max_tokens=100
        )
        print(f"✅ Request {i+1} successful")
    except Exception as e:
        print(f"❌ Request {i+1} failed: {e}")

# Get performance report
report = monitored_client.get_performance_report()
print(json.dumps(report, indent=2))

# Export data
monitored_client.export_performance_data("performance_metrics.json")

Best Practices Summary

Performance Checklist

python
class PerformanceChecklist:
    """Performance optimization checklist"""
    
    @staticmethod
    def check_request_optimization(client_config: Dict[str, Any]) -> List[str]:
        """Check request optimization"""
        
        recommendations = []
        
        # Connection pooling
        if not client_config.get("connection_pooling"):
            recommendations.append("✅ Enable HTTP connection pooling")
        
        # Timeout configuration
        if not client_config.get("timeout_configured"):
            recommendations.append("✅ Configure appropriate timeouts")
        
        # Async processing
        if not client_config.get("async_support"):
            recommendations.append("✅ Consider async processing for concurrent requests")
        
        return recommendations
    
    @staticmethod
    def check_token_optimization(prompt_config: Dict[str, Any]) -> List[str]:
        """Check token optimization"""
        
        recommendations = []
        
        # Prompt length
        if prompt_config.get("avg_prompt_tokens", 0) > 2000:
            recommendations.append("✅ Consider shortening prompts")
        
        # Context management
        if not prompt_config.get("context_management"):
            recommendations.append("✅ Implement context window management")
        
        # Token counting
        if not prompt_config.get("token_counting"):
            recommendations.append("✅ Implement token counting and estimation")
        
        return recommendations
    
    @staticmethod
    def check_caching_strategy(cache_config: Dict[str, Any]) -> List[str]:
        """Check caching strategy"""
        
        recommendations = []
        
        # Response caching
        if not cache_config.get("response_caching"):
            recommendations.append("✅ Implement response caching")
        
        # Cache hit rate
        hit_rate = cache_config.get("hit_rate", 0)
        if hit_rate < 20:
            recommendations.append("✅ Improve cache hit rate (currently {hit_rate}%)")
        
        # Semantic caching
        if not cache_config.get("semantic_caching"):
            recommendations.append("✅ Consider semantic caching for similar queries")
        
        return recommendations
    
    @staticmethod
    def check_rate_limiting(rate_config: Dict[str, Any]) -> List[str]:
        """Check rate limiting"""
        
        recommendations = []
        
        # Rate limiter
        if not rate_config.get("rate_limiter"):
            recommendations.append("✅ Implement client-side rate limiting")
        
        # Backoff strategy
        if not rate_config.get("backoff_strategy"):
            recommendations.append("✅ Implement exponential backoff")
        
        # Usage monitoring
        if not rate_config.get("usage_monitoring"):
            recommendations.append("✅ Monitor rate limit usage")
        
        return recommendations
    
    @staticmethod
    def check_monitoring(monitor_config: Dict[str, Any]) -> List[str]:
        """Check monitoring setup"""
        
        recommendations = []
        
        # Performance monitoring
        if not monitor_config.get("performance_monitoring"):
            recommendations.append("✅ Implement performance monitoring")
        
        # Error tracking
        if not monitor_config.get("error_tracking"):
            recommendations.append("✅ Implement error tracking and analysis")
        
        # Metrics export
        if not monitor_config.get("metrics_export"):
            recommendations.append("✅ Set up metrics export and analysis")
        
        return recommendations
    
    @staticmethod
    def generate_full_report(config: Dict[str, Any]) -> Dict[str, List[str]]:
        """Generate full performance optimization report"""
        
        return {
            "request_optimization": PerformanceChecklist.check_request_optimization(
                config.get("client", {})
            ),
            "token_optimization": PerformanceChecklist.check_token_optimization(
                config.get("prompts", {})
            ),
            "caching_strategy": PerformanceChecklist.check_caching_strategy(
                config.get("cache", {})
            ),
            "rate_limiting": PerformanceChecklist.check_rate_limiting(
                config.get("rate_limits", {})
            ),
            "monitoring": PerformanceChecklist.check_monitoring(
                config.get("monitoring", {})
            )
        }

# Usage
config = {
    "client": {
        "connection_pooling": True,
        "timeout_configured": True,
        "async_support": False
    },
    "prompts": {
        "avg_prompt_tokens": 1500,
        "context_management": True,
        "token_counting": True
    },
    "cache": {
        "response_caching": True,
        "hit_rate": 35,
        "semantic_caching": False
    },
    "rate_limits": {
        "rate_limiter": True,
        "backoff_strategy": True,
        "usage_monitoring": False
    },
    "monitoring": {
        "performance_monitoring": False,
        "error_tracking": True,
        "metrics_export": False
    }
}

report = PerformanceChecklist.generate_full_report(config)

for category, recommendations in report.items():
    print(f"\n{category.replace('_', ' ').title()}:")
    for rec in recommendations:
        print(f"  {rec}")

Next Steps

基于 DeepSeek AI 大模型技术