Streaming Guide
Learn how to implement real-time streaming responses with the DeepSeek API for improved user experience and responsiveness.
Overview
Streaming allows you to receive partial responses as they're generated, providing:
- Real-time feedback: Users see responses as they're generated
- Improved perceived performance: Faster time to first token
- Better user experience: Progressive content loading
- Reduced latency: Start processing partial responses immediately
- Interactive applications: Build chat interfaces and real-time tools
Basic Streaming
Simple Streaming Example
python
from openai import OpenAI
client = OpenAI(
api_key="sk-your-api-key",
base_url="https://api.deepseek.com/v1"
)
def basic_streaming():
"""Basic streaming example"""
stream = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "user", "content": "Write a short story about a robot learning to paint."}
],
stream=True, # Enable streaming
max_tokens=500
)
print("🤖 DeepSeek: ", end="", flush=True)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
print("\n") # New line at the end
# Run the example
basic_streaming()
Streaming with Error Handling
python
import time
from typing import Generator, Optional
def robust_streaming(
messages: list,
model: str = "deepseek-chat",
max_tokens: int = 500,
temperature: float = 0.7
) -> Generator[str, None, None]:
"""Robust streaming with error handling and retries"""
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model=model,
messages=messages,
stream=True,
max_tokens=max_tokens,
temperature=temperature
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
# Check for finish reason
if chunk.choices[0].finish_reason is not None:
break
return # Success, exit retry loop
except Exception as e:
print(f"❌ Streaming error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
print(f"⏳ Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print("🚫 Max retries exceeded")
raise e
# Usage
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing in simple terms."}
]
print("🤖 DeepSeek: ", end="", flush=True)
try:
for content in robust_streaming(messages):
print(content, end="", flush=True)
time.sleep(0.01) # Small delay for visual effect
print("\n")
except Exception as e:
print(f"\n❌ Streaming failed: {e}")
Advanced Streaming Techniques
Streaming with Token Counting
python
import tiktoken
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class StreamingStats:
"""Statistics for streaming response"""
total_tokens: int = 0
chunks_received: int = 0
time_to_first_token: float = 0
total_time: float = 0
tokens_per_second: float = 0
class StreamingTokenCounter:
"""Count tokens during streaming"""
def __init__(self, model: str = "deepseek-chat"):
self.encoding = tiktoken.get_encoding("cl100k_base")
self.model = model
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.encoding.encode(text))
def stream_with_stats(
self,
messages: List[Dict[str, str]],
**kwargs
) -> tuple[Generator[str, None, None], StreamingStats]:
"""Stream with token counting and statistics"""
stats = StreamingStats()
start_time = time.time()
first_token_time = None
accumulated_content = ""
def token_generator():
nonlocal first_token_time, accumulated_content
stream = client.chat.completions.create(
messages=messages,
stream=True,
**kwargs
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
accumulated_content += content
# Record first token time
if first_token_time is None:
first_token_time = time.time()
stats.time_to_first_token = first_token_time - start_time
stats.chunks_received += 1
yield content
# Calculate final statistics
end_time = time.time()
stats.total_time = end_time - start_time
stats.total_tokens = self.count_tokens(accumulated_content)
if stats.total_time > 0:
stats.tokens_per_second = stats.total_tokens / stats.total_time
return token_generator(), stats
# Usage
counter = StreamingTokenCounter()
messages = [
{"role": "user", "content": "Write a detailed explanation of machine learning algorithms."}
]
print("🤖 DeepSeek: ", end="", flush=True)
token_stream, stats = counter.stream_with_stats(
messages,
model="deepseek-chat",
max_tokens=800
)
for content in token_stream:
print(content, end="", flush=True)
print(f"\n\n📊 Streaming Statistics:")
print(f" Total tokens: {stats.total_tokens}")
print(f" Chunks received: {stats.chunks_received}")
print(f" Time to first token: {stats.time_to_first_token:.2f}s")
print(f" Total time: {stats.total_time:.2f}s")
print(f" Tokens per second: {stats.tokens_per_second:.1f}")
Streaming with Content Processing
python
import re
import json
from typing import Callable, Any
class StreamingProcessor:
"""Process streaming content in real-time"""
def __init__(self):
self.processors = []
self.accumulated_content = ""
def add_processor(self, processor: Callable[[str], Any]):
"""Add a content processor"""
self.processors.append(processor)
def stream_with_processing(
self,
messages: List[Dict[str, str]],
**kwargs
) -> Generator[Dict[str, Any], None, None]:
"""Stream with real-time content processing"""
stream = client.chat.completions.create(
messages=messages,
stream=True,
**kwargs
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
self.accumulated_content += content
# Process content with all processors
processed_data = {
"content": content,
"accumulated": self.accumulated_content,
"processed": {}
}
for i, processor in enumerate(self.processors):
try:
result = processor(self.accumulated_content)
processed_data["processed"][f"processor_{i}"] = result
except Exception as e:
processed_data["processed"][f"processor_{i}"] = f"Error: {e}"
yield processed_data
# Example processors
def extract_code_blocks(content: str) -> List[str]:
"""Extract code blocks from content"""
pattern = r'```[\w]*\n(.*?)\n```'
return re.findall(pattern, content, re.DOTALL)
def count_sentences(content: str) -> int:
"""Count sentences in content"""
sentences = re.split(r'[.!?]+', content)
return len([s for s in sentences if s.strip()])
def extract_keywords(content: str) -> List[str]:
"""Extract potential keywords (simple implementation)"""
words = re.findall(r'\b[A-Z][a-z]+\b', content)
return list(set(words))
# Usage
processor = StreamingProcessor()
processor.add_processor(extract_code_blocks)
processor.add_processor(count_sentences)
processor.add_processor(extract_keywords)
messages = [
{"role": "user", "content": "Write a Python function to calculate fibonacci numbers with examples."}
]
print("🤖 Streaming with real-time processing:\n")
for data in processor.stream_with_processing(
messages,
model="deepseek-chat",
max_tokens=600
):
# Display content
print(data["content"], end="", flush=True)
# Show processing results (every 10 chunks to avoid spam)
if len(data["accumulated"]) % 100 < 10:
print(f"\n[Processing: {len(data['processed']['processor_0'])} code blocks, "
f"{data['processed']['processor_1']} sentences, "
f"{len(data['processed']['processor_2'])} keywords]", end="")
print("\n")
Streaming for Different Use Cases
Chat Interface Streaming
python
import asyncio
from datetime import datetime
from typing import AsyncGenerator
class StreamingChatInterface:
"""Streaming chat interface implementation"""
def __init__(self, api_key: str):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
self.conversation_history = []
async def stream_chat_response(
self,
user_message: str,
system_prompt: str = None
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream chat response with conversation context"""
# Add user message to history
self.conversation_history.append({
"role": "user",
"content": user_message,
"timestamp": datetime.now().isoformat()
})
# Prepare messages for API
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# Add conversation history (limit to last 10 messages for context)
for msg in self.conversation_history[-10:]:
messages.append({
"role": msg["role"],
"content": msg["content"]
})
# Stream response
accumulated_response = ""
try:
stream = self.client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True,
max_tokens=1000,
temperature=0.7
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
accumulated_response += content
yield {
"type": "content",
"content": content,
"accumulated": accumulated_response,
"timestamp": datetime.now().isoformat()
}
# Check for completion
if chunk.choices[0].finish_reason is not None:
yield {
"type": "complete",
"finish_reason": chunk.choices[0].finish_reason,
"total_content": accumulated_response,
"timestamp": datetime.now().isoformat()
}
break
# Add assistant response to history
self.conversation_history.append({
"role": "assistant",
"content": accumulated_response,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
yield {
"type": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def get_conversation_history(self) -> List[Dict[str, Any]]:
"""Get conversation history"""
return self.conversation_history.copy()
def clear_history(self):
"""Clear conversation history"""
self.conversation_history.clear()
# Usage example
async def chat_demo():
"""Demo chat interface"""
chat = StreamingChatInterface("sk-your-api-key")
system_prompt = "You are a helpful AI assistant. Be concise but informative."
questions = [
"What is artificial intelligence?",
"How does machine learning work?",
"Can you give me a simple example?"
]
for question in questions:
print(f"\n👤 User: {question}")
print("🤖 DeepSeek: ", end="", flush=True)
async for response in chat.stream_chat_response(question, system_prompt):
if response["type"] == "content":
print(response["content"], end="", flush=True)
elif response["type"] == "complete":
print(f"\n [Completed: {response['finish_reason']}]")
elif response["type"] == "error":
print(f"\n [Error: {response['error']}]")
await asyncio.sleep(1) # Brief pause between questions
# Run the demo
# asyncio.run(chat_demo())
Document Generation Streaming
python
class StreamingDocumentGenerator:
"""Generate documents with streaming progress"""
def __init__(self, api_key: str):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
def generate_document_sections(
self,
document_type: str,
topic: str,
sections: List[str]
) -> Generator[Dict[str, Any], None, None]:
"""Generate document sections with streaming"""
for i, section in enumerate(sections):
print(f"\n📝 Generating section {i+1}/{len(sections)}: {section}")
prompt = f"""
Write a detailed {section} section for a {document_type} about {topic}.
Make it comprehensive and well-structured.
"""
messages = [
{"role": "system", "content": f"You are writing a {document_type} about {topic}."},
{"role": "user", "content": prompt}
]
section_content = ""
try:
stream = self.client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True,
max_tokens=800,
temperature=0.3
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
section_content += content
yield {
"section_index": i,
"section_name": section,
"content_chunk": content,
"accumulated_content": section_content,
"progress": (i + 1) / len(sections) * 100
}
yield {
"section_index": i,
"section_name": section,
"content_chunk": "",
"accumulated_content": section_content,
"section_complete": True,
"progress": (i + 1) / len(sections) * 100
}
except Exception as e:
yield {
"section_index": i,
"section_name": section,
"error": str(e),
"progress": (i + 1) / len(sections) * 100
}
# Usage
generator = StreamingDocumentGenerator("sk-your-api-key")
document_sections = [
"Introduction",
"Background and Context",
"Main Analysis",
"Key Findings",
"Recommendations",
"Conclusion"
]
print("📄 Generating Technical Report: 'AI in Healthcare'")
print("=" * 50)
complete_document = {}
for update in generator.generate_document_sections(
"technical report",
"AI in Healthcare",
document_sections
):
if "content_chunk" in update and update["content_chunk"]:
print(update["content_chunk"], end="", flush=True)
if update.get("section_complete"):
section_name = update["section_name"]
complete_document[section_name] = update["accumulated_content"]
print(f"\n\n✅ Section '{section_name}' completed")
print(f"📊 Progress: {update['progress']:.1f}%")
if update.get("error"):
print(f"\n❌ Error in section '{update['section_name']}': {update['error']}")
print("\n🎉 Document generation complete!")
print(f"📋 Generated {len(complete_document)} sections")
Code Generation Streaming
python
class StreamingCodeGenerator:
"""Generate code with streaming and syntax highlighting"""
def __init__(self, api_key: str):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
self.current_code_block = ""
self.in_code_block = False
def stream_code_generation(
self,
task_description: str,
language: str = "python",
include_tests: bool = True
) -> Generator[Dict[str, Any], None, None]:
"""Stream code generation with syntax detection"""
prompt = f"""
Write {language} code for the following task: {task_description}
Requirements:
- Include clear comments
- Follow best practices
- Make the code production-ready
"""
if include_tests:
prompt += "\n- Include unit tests"
messages = [
{"role": "system", "content": f"You are an expert {language} developer."},
{"role": "user", "content": prompt}
]
accumulated_content = ""
stream = self.client.chat.completions.create(
model="deepseek-coder", # Use code-specific model
messages=messages,
stream=True,
max_tokens=1500,
temperature=0.1 # Lower temperature for code
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
accumulated_content += content
# Detect code blocks
code_blocks = self._extract_code_blocks(accumulated_content)
yield {
"content_chunk": content,
"accumulated_content": accumulated_content,
"code_blocks": code_blocks,
"language": language,
"task": task_description
}
def _extract_code_blocks(self, content: str) -> List[Dict[str, str]]:
"""Extract code blocks from content"""
pattern = r'```(\w+)?\n(.*?)\n```'
matches = re.findall(pattern, content, re.DOTALL)
code_blocks = []
for lang, code in matches:
code_blocks.append({
"language": lang or "text",
"code": code.strip()
})
return code_blocks
# Usage
code_generator = StreamingCodeGenerator("sk-your-api-key")
task = "Create a Python class for managing a simple todo list with add, remove, and list functionality"
print(f"💻 Generating code for: {task}")
print("=" * 60)
all_code_blocks = []
for update in code_generator.stream_code_generation(
task,
language="python",
include_tests=True
):
# Display streaming content
print(update["content_chunk"], end="", flush=True)
# Track code blocks
if update["code_blocks"]:
all_code_blocks = update["code_blocks"]
print(f"\n\n🎉 Code generation complete!")
print(f"📝 Generated {len(all_code_blocks)} code blocks")
# Display extracted code blocks
for i, block in enumerate(all_code_blocks, 1):
print(f"\n--- Code Block {i} ({block['language']}) ---")
print(block['code'])
Streaming Performance Optimization
Buffered Streaming
python
import threading
import queue
from typing import Optional
class BufferedStreamer:
"""Buffer streaming content for smoother display"""
def __init__(self, buffer_size: int = 10, flush_interval: float = 0.1):
self.buffer_size = buffer_size
self.flush_interval = flush_interval
self.buffer = []
self.output_queue = queue.Queue()
self.stop_event = threading.Event()
def start_buffering(self, stream_generator: Generator[str, None, None]):
"""Start buffering stream content"""
def buffer_worker():
"""Worker thread for buffering"""
for content in stream_generator:
self.buffer.append(content)
# Flush buffer when full
if len(self.buffer) >= self.buffer_size:
self._flush_buffer()
if self.stop_event.is_set():
break
# Flush remaining content
self._flush_buffer()
self.output_queue.put(None) # Signal completion
def flush_worker():
"""Worker thread for periodic flushing"""
while not self.stop_event.is_set():
time.sleep(self.flush_interval)
if self.buffer:
self._flush_buffer()
# Start worker threads
buffer_thread = threading.Thread(target=buffer_worker)
flush_thread = threading.Thread(target=flush_worker)
buffer_thread.start()
flush_thread.start()
return buffer_thread, flush_thread
def _flush_buffer(self):
"""Flush buffer to output queue"""
if self.buffer:
content = "".join(self.buffer)
self.output_queue.put(content)
self.buffer.clear()
def get_buffered_content(self) -> Generator[str, None, None]:
"""Get buffered content"""
while True:
try:
content = self.output_queue.get(timeout=1.0)
if content is None: # Completion signal
break
yield content
except queue.Empty:
continue
def stop(self):
"""Stop buffering"""
self.stop_event.set()
# Usage
def create_stream():
"""Create a sample stream"""
messages = [
{"role": "user", "content": "Write a long essay about the future of artificial intelligence."}
]
stream = client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True,
max_tokens=1000
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
# Create buffered streamer
buffered_streamer = BufferedStreamer(buffer_size=5, flush_interval=0.2)
print("🔄 Starting buffered streaming...")
# Start buffering
buffer_thread, flush_thread = buffered_streamer.start_buffering(create_stream())
print("🤖 DeepSeek: ", end="", flush=True)
# Display buffered content
try:
for content in buffered_streamer.get_buffered_content():
print(content, end="", flush=True)
time.sleep(0.05) # Smooth display
finally:
buffered_streamer.stop()
buffer_thread.join()
flush_thread.join()
print("\n✅ Buffered streaming complete!")
Streaming with Backpressure
python
import asyncio
from asyncio import Queue
class BackpressureStreamer:
"""Handle streaming with backpressure control"""
def __init__(self, max_queue_size: int = 100):
self.max_queue_size = max_queue_size
self.content_queue = Queue(maxsize=max_queue_size)
self.processing_speed = 1.0 # Characters per second
async def stream_with_backpressure(
self,
messages: List[Dict[str, str]],
**kwargs
) -> AsyncGenerator[str, None, None]:
"""Stream with backpressure control"""
# Start streaming task
stream_task = asyncio.create_task(
self._stream_producer(messages, **kwargs)
)
# Start consumer
try:
while True:
try:
# Get content with timeout
content = await asyncio.wait_for(
self.content_queue.get(),
timeout=1.0
)
if content is None: # End signal
break
yield content
# Simulate processing time (backpressure)
processing_time = len(content) / self.processing_speed
await asyncio.sleep(processing_time)
except asyncio.TimeoutError:
# Check if stream is still active
if stream_task.done():
break
continue
finally:
if not stream_task.done():
stream_task.cancel()
try:
await stream_task
except asyncio.CancelledError:
pass
async def _stream_producer(self, messages: List[Dict[str, str]], **kwargs):
"""Producer coroutine for streaming"""
try:
# Note: This is a simplified async wrapper
# In practice, you'd use an async HTTP client
def sync_stream():
stream = client.chat.completions.create(
messages=messages,
stream=True,
**kwargs
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
return chunk.choices[0].delta.content
return None
# Simulate async streaming
loop = asyncio.get_event_loop()
while True:
content = await loop.run_in_executor(None, sync_stream)
if content is None:
break
# Add to queue (will block if queue is full)
await self.content_queue.put(content)
# Signal completion
await self.content_queue.put(None)
except Exception as e:
# Signal error
await self.content_queue.put(f"Error: {e}")
await self.content_queue.put(None)
# Usage
async def backpressure_demo():
"""Demo backpressure streaming"""
streamer = BackpressureStreamer(max_queue_size=50)
messages = [
{"role": "user", "content": "Write a detailed technical explanation of neural networks."}
]
print("🔄 Starting backpressure streaming...")
print("🤖 DeepSeek: ", end="", flush=True)
async for content in streamer.stream_with_backpressure(
messages,
model="deepseek-chat",
max_tokens=800
):
if content.startswith("Error:"):
print(f"\n❌ {content}")
break
print(content, end="", flush=True)
print("\n✅ Backpressure streaming complete!")
# Run the demo
# asyncio.run(backpressure_demo())
Error Handling and Recovery
Robust Streaming with Recovery
python
class RobustStreamer:
"""Robust streaming with error recovery"""
def __init__(self, api_key: str, max_retries: int = 3):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
self.max_retries = max_retries
def stream_with_recovery(
self,
messages: List[Dict[str, str]],
**kwargs
) -> Generator[Dict[str, Any], None, None]:
"""Stream with automatic error recovery"""
retry_count = 0
accumulated_content = ""
while retry_count <= self.max_retries:
try:
# If retrying, add context about previous content
if retry_count > 0 and accumulated_content:
recovery_message = {
"role": "system",
"content": f"Continue from where you left off. Previous content: ...{accumulated_content[-100:]}"
}
recovery_messages = [recovery_message] + messages
else:
recovery_messages = messages
stream = self.client.chat.completions.create(
messages=recovery_messages,
stream=True,
**kwargs
)
chunk_count = 0
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
accumulated_content += content
chunk_count += 1
yield {
"type": "content",
"content": content,
"accumulated": accumulated_content,
"chunk_count": chunk_count,
"retry_count": retry_count
}
if chunk.choices[0].finish_reason is not None:
yield {
"type": "complete",
"finish_reason": chunk.choices[0].finish_reason,
"total_content": accumulated_content,
"total_chunks": chunk_count,
"retry_count": retry_count
}
return # Success, exit retry loop
except Exception as e:
retry_count += 1
yield {
"type": "error",
"error": str(e),
"retry_count": retry_count,
"max_retries": self.max_retries,
"accumulated_content": accumulated_content
}
if retry_count <= self.max_retries:
wait_time = 2 ** retry_count # Exponential backoff
yield {
"type": "retry",
"wait_time": wait_time,
"retry_count": retry_count,
"max_retries": self.max_retries
}
time.sleep(wait_time)
else:
yield {
"type": "failed",
"error": "Max retries exceeded",
"final_content": accumulated_content
}
return
# Usage
robust_streamer = RobustStreamer("sk-your-api-key", max_retries=3)
messages = [
{"role": "user", "content": "Write a comprehensive guide to Python web development."}
]
print("🛡️ Starting robust streaming with recovery...")
for update in robust_streamer.stream_with_recovery(
messages,
model="deepseek-chat",
max_tokens=1000
):
if update["type"] == "content":
print(update["content"], end="", flush=True)
elif update["type"] == "error":
print(f"\n⚠️ Error (attempt {update['retry_count']}): {update['error']}")
elif update["type"] == "retry":
print(f"\n🔄 Retrying in {update['wait_time']} seconds... (attempt {update['retry_count']}/{update['max_retries']})")
elif update["type"] == "complete":
print(f"\n✅ Streaming completed successfully!")
print(f" Total chunks: {update['total_chunks']}")
print(f" Retry count: {update['retry_count']}")
elif update["type"] == "failed":
print(f"\n❌ Streaming failed after {update['retry_count']} attempts")
if update["final_content"]:
print(f" Partial content received: {len(update['final_content'])} characters")
Best Practices
Streaming Optimization Tips
python
class StreamingBestPractices:
"""Best practices for streaming implementation"""
@staticmethod
def optimize_streaming_parameters() -> Dict[str, Any]:
"""Recommended parameters for optimal streaming"""
return {
"stream": True,
"max_tokens": 1000, # Reasonable limit for streaming
"temperature": 0.7, # Balanced creativity
"top_p": 0.9, # Focused but diverse
"frequency_penalty": 0.1, # Reduce repetition
"presence_penalty": 0.1 # Encourage variety
}
@staticmethod
def streaming_checklist() -> List[str]:
"""Checklist for streaming implementation"""
return [
"✅ Enable streaming with stream=True",
"✅ Handle partial content gracefully",
"✅ Implement error handling and retries",
"✅ Use appropriate buffer sizes",
"✅ Monitor token usage during streaming",
"✅ Implement timeout handling",
"✅ Provide user feedback during streaming",
"✅ Handle network interruptions",
"✅ Optimize for perceived performance",
"✅ Test with various content lengths",
"✅ Implement proper cleanup on errors",
"✅ Consider backpressure for slow consumers"
]
@staticmethod
def common_pitfalls() -> List[str]:
"""Common streaming pitfalls to avoid"""
return [
"❌ Not handling None content in chunks",
"❌ Blocking the main thread during streaming",
"❌ Not implementing proper error recovery",
"❌ Ignoring finish_reason signals",
"❌ Not providing user feedback during delays",
"❌ Accumulating too much content in memory",
"❌ Not handling network timeouts",
"❌ Missing proper cleanup on cancellation",
"❌ Not testing with poor network conditions",
"❌ Overwhelming users with too-fast streaming"
]
# Display best practices
practices = StreamingBestPractices()
print("🎯 Streaming Best Practices:")
for tip in practices.streaming_checklist():
print(f" {tip}")
print("\n⚠️ Common Pitfalls to Avoid:")
for pitfall in practices.common_pitfalls():
print(f" {pitfall}")
print("\n⚙️ Recommended Parameters:")
params = practices.optimize_streaming_parameters()
for key, value in params.items():
print(f" {key}: {value}")