Multi-Session AI Context: The Complete Guide to Building AI Agents That Remember Across Conversations
Multi-Session AI Context: The Complete Guide to Building AI Agents That Remember Across Conversations
Building AI agents that maintain context across multiple sessions is one of the most challenging problems in production AI development. While single-session chatbots have become commoditized, multi-session AI context—the ability for an agent to remember, reason about, and act upon information from previous interactions—remains the frontier that separates toy demos from production-grade systems.
This comprehensive guide covers everything you need to know about implementing multi-session context for AI agents: the architecture patterns, storage strategies, synchronization mechanisms, and practical code examples that will help you build AI systems that truly remember.
What Is Multi-Session AI Context?
Multi-session AI context refers to an AI agent's ability to maintain and utilize information across separate conversation sessions. Unlike single-session memory, which exists only within the current conversation window, multi-session context persists beyond session boundaries and can be retrieved in future interactions.
Consider the difference:
Single-session context: A user asks a chatbot about their order status. The chatbot checks the database, provides the status, and forgets everything when the session ends. Tomorrow, the user must re-explain everything.
Multi-session context: The same user returns tomorrow. The AI remembers their previous inquiry, knows their order history, recalls their communication preferences, and can proactively provide an update without being asked.
The distinction might seem subtle, but it fundamentally changes what AI agents can accomplish. Multi-session context enables:
- Continuous relationship building with users over time
- Accumulation of relevant knowledge about user preferences
- Long-running task completion that spans multiple interactions
- Contextual awareness that improves with every conversation
- True personalization based on historical interactions
Why Single-Session Memory Falls Short
Most AI applications today operate with what we might call "goldfish memory"—they remember everything brilliantly within a conversation, then promptly forget it all when the session ends. This limitation manifests in several painful ways.
The Monday Morning Amnesia Problem
You spend Friday afternoon working with an AI agent on a complex project. You establish requirements, make design decisions, create a roadmap. Monday morning, you return to continue the work, and the agent acts as if nothing happened. Worse, it might confidently claim that previous work was "completed" without any ability to verify this against actual state.
This isn't just inconvenient—it's actively harmful. The agent's false confidence can lead to:
- Implementation drift where new work contradicts previous decisions
- Wasted time re-explaining context that should already be known
- Quality degradation as the agent makes assumptions instead of checking documented history
- Erosion of trust as users learn they cannot rely on continuity
Context Window Limitations
Even within a session, LLM context windows impose hard limits. Claude's context window is generous at 200K tokens, but complex projects can easily exceed this. Code reviews, documentation analysis, multi-file refactoring—these tasks frequently push beyond what any context window can hold.
Without multi-session context architecture, developers resort to awkward workarounds: copy-pasting previous outputs, maintaining manual summaries, or simply accepting that context will be lost. None of these scale.
The Personalization Gap
Modern users expect personalization. They expect services to know their preferences, remember their history, and adapt to their needs over time. AI agents operating in single-session mode cannot deliver this. Every interaction starts from zero, requiring users to re-establish context that any good assistant should already have.
Architecture Patterns for Multi-Session Context
Building effective multi-session context requires thoughtful architecture. The right approach depends on your use case, scale requirements, and latency tolerance. Let's examine the major patterns.
Pattern 1: Session-State Storage with Retrieval
The most straightforward pattern stores complete session state and retrieves relevant portions for new sessions.
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Dict, Any
import json
@dataclass
class SessionState:
session_id: str
user_id: str
created_at: datetime
updated_at: datetime
messages: List[Dict[str, Any]]
metadata: Dict[str, Any]
summary: Optional[str] = None
class MultiSessionContextManager:
def __init__(self, storage_backend):
self.storage = storage_backend
async def save_session(self, state: SessionState):
"""Persist session state for future retrieval."""
await self.storage.upsert(
table="sessions",
data={
"session_id": state.session_id,
"user_id": state.user_id,
"created_at": state.created_at.isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"messages": json.dumps(state.messages),
"metadata": json.dumps(state.metadata),
"summary": state.summary
}
)
async def get_relevant_sessions(
self,
user_id: str,
query: Optional[str] = None,
limit: int = 5
) -> List[SessionState]:
"""Retrieve sessions relevant to current context."""
if query:
# Semantic search across session summaries
sessions = await self.storage.vector_search(
table="sessions",
query=query,
filter={"user_id": user_id},
limit=limit
)
else:
# Fallback to recency-based retrieval
sessions = await self.storage.query(
table="sessions",
filter={"user_id": user_id},
order_by="updated_at DESC",
limit=limit
)
return [self._deserialize_session(s) for s in sessions]
async def build_context_for_session(
self,
user_id: str,
current_input: str,
max_tokens: int = 4000
) -> str:
"""Build context string from relevant historical sessions."""
relevant_sessions = await self.get_relevant_sessions(
user_id=user_id,
query=current_input,
limit=5
)
context_parts = []
total_tokens = 0
for session in relevant_sessions:
session_context = self._format_session_context(session)
session_tokens = self._estimate_tokens(session_context)
if total_tokens + session_tokens > max_tokens:
break
context_parts.append(session_context)
total_tokens += session_tokens
return "\n\n---\n\n".join(context_parts)
def _format_session_context(self, session: SessionState) -> str:
"""Format a session for inclusion in context."""
if session.summary:
return f"Previous conversation ({session.updated_at.strftime('%Y-%m-%d')}):\n{session.summary}"
# Fallback to condensed message history
recent_messages = session.messages[-10:]
formatted = []
for msg in recent_messages:
role = msg.get("role", "unknown")
content = msg.get("content", "")[:500]
formatted.append(f"{role}: {content}")
return f"Previous conversation ({session.updated_at.strftime('%Y-%m-%d')}):\n" + "\n".join(formatted)
This pattern works well for applications where:
- Sessions are relatively self-contained
- Historical context can be effectively summarized
- Retrieval latency is acceptable (typically 50-200ms)
- Storage costs are manageable
Pattern 2: Hierarchical Memory with Compaction
For long-running agent systems, a hierarchical approach manages context more efficiently. This pattern maintains multiple "levels" of memory with automatic compaction.
from enum import Enum
from typing import List, Dict, Any
import asyncio
class MemoryLevel(Enum):
WORKING = "working" # Current session, full detail
SHORT_TERM = "short_term" # Recent sessions, summarized
LONG_TERM = "long_term" # Historical, highly compressed
SEMANTIC = "semantic" # Extracted facts and relationships
class HierarchicalMemoryManager:
def __init__(self, storage, llm_client):
self.storage = storage
self.llm = llm_client
async def process_session_end(self, session_id: str, user_id: str):
"""Process session completion with hierarchical storage."""
# Get full session
session = await self.storage.get_session(session_id)
# Generate summary for short-term memory
summary = await self._generate_summary(session.messages)
# Extract semantic facts for long-term storage
facts = await self._extract_facts(session.messages)
# Store at appropriate levels
await asyncio.gather(
self._store_short_term(user_id, session_id, summary),
self._store_semantic_facts(user_id, facts),
self._maybe_compact_short_term(user_id)
)
async def _generate_summary(self, messages: List[Dict]) -> str:
"""Generate concise summary of conversation."""
prompt = f"""Summarize this conversation in 2-3 sentences, focusing on:
1. Key decisions or conclusions reached
2. Important information shared by the user
3. Any commitments or next steps
Conversation:
{self._format_messages(messages)}
Summary:"""
response = await self.llm.complete(prompt, max_tokens=200)
return response.strip()
async def _extract_facts(self, messages: List[Dict]) -> List[Dict]:
"""Extract durable facts from conversation."""
prompt = f"""Extract specific facts from this conversation that would be useful to remember long-term.
Focus on: user preferences, stated facts about themselves, decisions, and commitments.
Conversation:
{self._format_messages(messages)}
Return as JSON array of objects with 'fact', 'category', and 'confidence' fields.
Only include facts with high confidence. Example:
[{{"fact": "User prefers dark mode", "category": "preference", "confidence": 0.95}}]
Facts:"""
response = await self.llm.complete(prompt, max_tokens=500)
try:
return json.loads(response)
except json.JSONDecodeError:
return []
async def _maybe_compact_short_term(self, user_id: str):
"""Compact short-term memory if it exceeds threshold."""
short_term = await self.storage.get_short_term_memories(user_id)
if len(short_term) > 20: # Threshold for compaction
# Summarize oldest memories into long-term storage
oldest = short_term[:10]
combined_summary = await self._generate_combined_summary(oldest)
await self.storage.store_long_term_memory(
user_id=user_id,
content=combined_summary,
source_count=len(oldest)
)
# Remove compacted memories from short-term
for memory in oldest:
await self.storage.delete_short_term_memory(memory.id)
async def get_context(self, user_id: str, query: str) -> Dict[str, Any]:
"""Retrieve context from all memory levels."""
results = await asyncio.gather(
self.storage.get_short_term_memories(user_id, limit=5),
self.storage.search_long_term_memories(user_id, query, limit=3),
self.storage.search_semantic_facts(user_id, query, limit=10)
)
return {
"recent_sessions": results[0],
"historical_context": results[1],
"known_facts": results[2]
}
Pattern 3: Event-Sourced Context
For systems requiring auditability or complex state reconstruction, event sourcing provides a powerful foundation.
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Union
from abc import ABC, abstractmethod
@dataclass
class ContextEvent(ABC):
event_id: str
user_id: str
timestamp: datetime
session_id: str
@abstractmethod
def apply(self, state: 'UserContextState') -> 'UserContextState':
pass
@dataclass
class MessageAddedEvent(ContextEvent):
role: str
content: str
def apply(self, state: 'UserContextState') -> 'UserContextState':
state.messages.append({
"role": self.role,
"content": self.content,
"timestamp": self.timestamp.isoformat()
})
return state
@dataclass
class FactLearnedEvent(ContextEvent):
fact: str
category: str
confidence: float
source_message_id: Optional[str] = None
def apply(self, state: 'UserContextState') -> 'UserContextState':
state.facts.append({
"fact": self.fact,
"category": self.category,
"confidence": self.confidence,
"learned_at": self.timestamp.isoformat()
})
return state
@dataclass
class PreferenceUpdatedEvent(ContextEvent):
key: str
value: any
previous_value: Optional[any] = None
def apply(self, state: 'UserContextState') -> 'UserContextState':
state.preferences[self.key] = {
"value": self.value,
"updated_at": self.timestamp.isoformat()
}
return state
@dataclass
class UserContextState:
user_id: str
messages: List[Dict]
facts: List[Dict]
preferences: Dict[str, Any]
relationships: List[Dict]
@classmethod
def initial(cls, user_id: str) -> 'UserContextState':
return cls(
user_id=user_id,
messages=[],
facts=[],
preferences={},
relationships=[]
)
class EventSourcedContextManager:
def __init__(self, event_store):
self.event_store = event_store
self.state_cache = {}
async def append_event(self, event: ContextEvent):
"""Append event and update cached state."""
await self.event_store.append(event)
if event.user_id in self.state_cache:
self.state_cache[event.user_id] = event.apply(
self.state_cache[event.user_id]
)
async def get_state(self, user_id: str) -> UserContextState:
"""Reconstruct or retrieve cached state."""
if user_id in self.state_cache:
return self.state_cache[user_id]
state = UserContextState.initial(user_id)
events = await self.event_store.get_events(user_id)
for event in events:
state = event.apply(state)
self.state_cache[user_id] = state
return state
async def get_state_at_time(
self,
user_id: str,
timestamp: datetime
) -> UserContextState:
"""Reconstruct state at a specific point in time."""
state = UserContextState.initial(user_id)
events = await self.event_store.get_events(
user_id,
before=timestamp
)
for event in events:
state = event.apply(state)
return state
Storage Backends for Multi-Session Context
Choosing the right storage backend significantly impacts performance, cost, and capabilities of your multi-session context system.
PostgreSQL with pgvector
PostgreSQL with the pgvector extension provides an excellent balance of reliability, flexibility, and vector search capabilities.
import asyncpg
from typing import List, Dict, Optional
import json
import numpy as np
class PostgresContextStorage:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def initialize(self):
"""Initialize connection pool and ensure schema exists."""
self.pool = await asyncpg.create_pool(self.connection_string)
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS user_sessions (
id SERIAL PRIMARY KEY,
session_id VARCHAR(255) UNIQUE NOT NULL,
user_id VARCHAR(255) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
messages JSONB NOT NULL DEFAULT '[]',
metadata JSONB NOT NULL DEFAULT '{}',
summary TEXT,
summary_embedding vector(1536)
);
CREATE TABLE IF NOT EXISTS user_facts (
id SERIAL PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
fact TEXT NOT NULL,
category VARCHAR(100),
confidence FLOAT NOT NULL,
embedding vector(1536),
created_at TIMESTAMPTZ DEFAULT NOW(),
source_session_id VARCHAR(255)
);
CREATE INDEX IF NOT EXISTS idx_sessions_user_id
ON user_sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_updated
ON user_sessions(updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_facts_user_id
ON user_facts(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_embedding
ON user_sessions USING ivfflat (summary_embedding vector_cosine_ops);
CREATE INDEX IF NOT EXISTS idx_facts_embedding
ON user_facts USING ivfflat (embedding vector_cosine_ops);
""")
async def upsert_session(
self,
session_id: str,
user_id: str,
messages: List[Dict],
metadata: Dict,
summary: Optional[str] = None,
summary_embedding: Optional[List[float]] = None
):
"""Insert or update a session."""
async with self.pool.acquire() as conn:
embedding_str = None
if summary_embedding:
embedding_str = f"[{','.join(map(str, summary_embedding))}]"
await conn.execute("""
INSERT INTO user_sessions
(session_id, user_id, messages, metadata, summary, summary_embedding)
VALUES ($1, $2, $3, $4, $5, $6::vector)
ON CONFLICT (session_id) DO UPDATE SET
messages = EXCLUDED.messages,
metadata = EXCLUDED.metadata,
summary = EXCLUDED.summary,
summary_embedding = EXCLUDED.summary_embedding,
updated_at = NOW()
""", session_id, user_id, json.dumps(messages),
json.dumps(metadata), summary, embedding_str)
async def search_sessions_by_similarity(
self,
user_id: str,
query_embedding: List[float],
limit: int = 5
) -> List[Dict]:
"""Find sessions similar to query embedding."""
async with self.pool.acquire() as conn:
embedding_str = f"[{','.join(map(str, query_embedding))}]"
rows = await conn.fetch("""
SELECT
session_id,
user_id,
messages,
metadata,
summary,
created_at,
updated_at,
1 - (summary_embedding <=> $1::vector) as similarity
FROM user_sessions
WHERE user_id = $2 AND summary_embedding IS NOT NULL
ORDER BY summary_embedding <=> $1::vector
LIMIT $3
""", embedding_str, user_id, limit)
return [dict(row) for row in rows]
async def search_facts_by_similarity(
self,
user_id: str,
query_embedding: List[float],
limit: int = 10
) -> List[Dict]:
"""Find facts similar to query embedding."""
async with self.pool.acquire() as conn:
embedding_str = f"[{','.join(map(str, query_embedding))}]"
rows = await conn.fetch("""
SELECT
fact,
category,
confidence,
created_at,
1 - (embedding <=> $1::vector) as similarity
FROM user_facts
WHERE user_id = $2 AND embedding IS NOT NULL
ORDER BY embedding <=> $1::vector
LIMIT $3
""", embedding_str, user_id, limit)
return [dict(row) for row in rows]
Redis for High-Performance Caching
For applications requiring sub-millisecond latency on context retrieval, Redis provides excellent caching capabilities.
import redis.asyncio as redis
import json
from typing import Optional, Dict, List
from datetime import timedelta
class RedisContextCache:
def __init__(self, redis_url: str, default_ttl: int = 3600):
self.redis = redis.from_url(redis_url)
self.default_ttl = default_ttl
async def cache_user_context(
self,
user_id: str,
context: Dict,
ttl: Optional[int] = None
):
"""Cache user context with TTL."""
key = f"context:{user_id}"
await self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(context)
)
async def get_cached_context(self, user_id: str) -> Optional[Dict]:
"""Retrieve cached context."""
key = f"context:{user_id}"
data = await self.redis.get(key)
return json.loads(data) if data else None
async def update_session_context(
self,
user_id: str,
session_id: str,
update: Dict
):
"""Update specific session within cached context."""
key = f"context:{user_id}:sessions"
await self.redis.hset(key, session_id, json.dumps(update))
await self.redis.expire(key, self.default_ttl)
async def get_recent_sessions(
self,
user_id: str,
limit: int = 5
) -> List[Dict]:
"""Get recently accessed sessions from cache."""
key = f"context:{user_id}:recent"
sessions = await self.redis.lrange(key, 0, limit - 1)
return [json.loads(s) for s in sessions]
async def add_to_recent_sessions(
self,
user_id: str,
session_summary: Dict,
max_recent: int = 10
):
"""Add session to recent sessions list."""
key = f"context:{user_id}:recent"
await self.redis.lpush(key, json.dumps(session_summary))
await self.redis.ltrim(key, 0, max_recent - 1)
await self.redis.expire(key, self.default_ttl)
Synchronization Strategies for Multi-Agent Systems
When multiple AI agents or sessions need to share context, synchronization becomes critical. Without proper coordination, agents can develop inconsistent views of user state.
Write-Through with Event Broadcasting
import asyncio
from typing import Callable, List
import json
class ContextSynchronizer:
def __init__(self, primary_storage, cache, message_broker):
self.storage = primary_storage
self.cache = cache
self.broker = message_broker
self.subscribers: List[Callable] = []
async def update_context(
self,
user_id: str,
update: Dict,
source_agent: str
):
"""Update context with write-through and event broadcast."""
# Write to primary storage
await self.storage.update_user_context(user_id, update)
# Update cache
await self.cache.invalidate_and_set(user_id, update)
# Broadcast update event
event = {
"type": "context_updated",
"user_id": user_id,
"update": update,
"source_agent": source_agent,
"timestamp": datetime.utcnow().isoformat()
}
await self.broker.publish(f"context:{user_id}", json.dumps(event))
# Notify local subscribers
for subscriber in self.subscribers:
asyncio.create_task(subscriber(event))
async def subscribe_to_updates(
self,
user_id: str,
callback: Callable
):
"""Subscribe to context updates for a user."""
self.subscribers.append(callback)
await self.broker.subscribe(f"context:{user_id}", callback)
async def get_context_with_freshness(
self,
user_id: str,
max_staleness_seconds: int = 60
) -> Dict:
"""Get context with staleness guarantee."""
cached = await self.cache.get_with_age(user_id)
if cached and cached['age_seconds'] < max_staleness_seconds:
return cached['data']
# Cache miss or too stale - fetch from primary
fresh = await self.storage.get_user_context(user_id)
await self.cache.set(user_id, fresh)
return fresh
Conflict Resolution for Concurrent Updates
When multiple agents update context simultaneously, conflicts must be resolved consistently.
from dataclasses import dataclass
from typing import Optional, Dict, Any
from enum import Enum
class ConflictResolution(Enum):
LAST_WRITE_WINS = "last_write_wins"
MERGE = "merge"
REJECT = "reject"
@dataclass
class VersionedContext:
user_id: str
data: Dict[str, Any]
version: int
updated_at: datetime
updated_by: str
class OptimisticConcurrencyManager:
def __init__(self, storage):
self.storage = storage
async def update_with_optimistic_lock(
self,
user_id: str,
update_fn: Callable[[Dict], Dict],
agent_id: str,
max_retries: int = 3
) -> VersionedContext:
"""Update context with optimistic concurrency control."""
for attempt in range(max_retries):
# Read current version
current = await self.storage.get_versioned_context(user_id)
# Apply update function
new_data = update_fn(current.data.copy())
# Attempt conditional write
try:
updated = await self.storage.conditional_update(
user_id=user_id,
new_data=new_data,
expected_version=current.version,
updated_by=agent_id
)
return updated
except VersionConflictError:
if attempt == max_retries - 1:
raise
# Wait with exponential backoff before retry
await asyncio.sleep(0.1 * (2 ** attempt))
raise MaxRetriesExceededError(f"Failed to update after {max_retries} attempts")
async def merge_conflicting_updates(
self,
base: Dict,
update_a: Dict,
update_b: Dict
) -> Dict:
"""Three-way merge for conflicting updates."""
result = base.copy()
# Find keys modified in each update
a_changes = {k: v for k, v in update_a.items() if base.get(k) != v}
b_changes = {k: v for k, v in update_b.items() if base.get(k) != v}
# Apply non-conflicting changes
for key, value in a_changes.items():
if key not in b_changes:
result[key] = value
for key, value in b_changes.items():
if key not in a_changes:
result[key] = value
# Handle conflicts (here: last-write-wins based on timestamp)
for key in set(a_changes.keys()) & set(b_changes.keys()):
# Could implement more sophisticated merging based on data type
result[key] = update_b[key] # Simplistic: B wins
return result
Building a Complete Multi-Session Context System
Let's bring everything together into a production-ready system that handles the full lifecycle of multi-session context.
from dataclasses import dataclass
from typing import Optional, Dict, List, Any
from datetime import datetime
import asyncio
@dataclass
class ContextConfig:
max_working_memory_tokens: int = 4000
max_short_term_sessions: int = 20
max_facts_per_query: int = 15
compaction_threshold: int = 50
cache_ttl_seconds: int = 3600
embedding_model: str = "text-embedding-3-small"
class MultiSessionContextSystem:
"""
Complete multi-session context management system.
Handles:
- Session persistence and retrieval
- Hierarchical memory with automatic compaction
- Semantic search across historical context
- Real-time synchronization across agents
- Caching for low-latency access
"""
def __init__(
self,
storage: PostgresContextStorage,
cache: RedisContextCache,
embedding_client,
llm_client,
config: ContextConfig = None
):
self.storage = storage
self.cache = cache
self.embeddings = embedding_client
self.llm = llm_client
self.config = config or ContextConfig()
async def start_session(
self,
user_id: str,
session_id: str,
initial_context: Optional[str] = None
) -> Dict[str, Any]:
"""
Initialize a new session with relevant historical context.
Returns context package to inject into agent prompt.
"""
# Gather context from all memory levels
context_parts = await asyncio.gather(
self._get_user_facts(user_id, initial_context),
self._get_recent_session_summaries(user_id),
self._get_relevant_historical_context(user_id, initial_context)
)
facts, recent_sessions, historical = context_parts
# Build context package
context_package = {
"user_facts": facts,
"recent_conversations": recent_sessions,
"relevant_history": historical,
"session_id": session_id
}
# Cache for the session
await self.cache.cache_user_context(user_id, context_package)
return context_package
async def process_message(
self,
user_id: str,
session_id: str,
role: str,
content: str
):
"""Process a message and update context accordingly."""
# Store message
await self.storage.append_message(session_id, {
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat()
})
# Extract facts in background if it's an assistant response
if role == "user":
asyncio.create_task(
self._maybe_extract_facts(user_id, session_id, content)
)
async def end_session(
self,
user_id: str,
session_id: str
):
"""Finalize session and process for long-term storage."""
# Get full session
session = await self.storage.get_session(session_id)
if not session or len(session.messages) < 2:
return # Nothing worth storing
# Generate summary
summary = await self._generate_session_summary(session.messages)
# Generate embedding for semantic search
embedding = await self.embeddings.embed(summary)
# Update session with summary and embedding
await self.storage.upsert_session(
session_id=session_id,
user_id=user_id,
messages=session.messages,
metadata=session.metadata,
summary=summary,
summary_embedding=embedding
)
# Check if compaction needed
await self._maybe_compact_memories(user_id)
# Invalidate cache
await self.cache.invalidate(user_id)
async def query_context(
self,
user_id: str,
query: str,
max_tokens: int = None
) -> str:
"""
Query for relevant context to inject into a prompt.
Returns formatted context string within token budget.
"""
max_tokens = max_tokens or self.config.max_working_memory_tokens
# Get query embedding
query_embedding = await self.embeddings.embed(query)
# Search across all context types
results = await asyncio.gather(
self.storage.search_facts_by_similarity(
user_id, query_embedding,
limit=self.config.max_facts_per_query
),
self.storage.search_sessions_by_similarity(
user_id, query_embedding,
limit=10
)
)
facts, sessions = results
# Build context within token budget
context_parts = []
token_count = 0
# Add relevant facts first (usually most valuable)
if facts:
facts_text = self._format_facts(facts)
facts_tokens = self._estimate_tokens(facts_text)
if token_count + facts_tokens <= max_tokens:
context_parts.append(f"Known facts about user:\n{facts_text}")
token_count += facts_tokens
# Add session summaries within remaining budget
for session in sessions:
if session.get('summary'):
session_text = f"Previous conversation ({session['updated_at'].strftime('%Y-%m-%d')}): {session['summary']}"
session_tokens = self._estimate_tokens(session_text)
if token_count + session_tokens <= max_tokens:
context_parts.append(session_text)
token_count += session_tokens
else:
break
return "\n\n".join(context_parts)
async def _get_user_facts(
self,
user_id: str,
context_hint: Optional[str]
) -> List[Dict]:
"""Get relevant facts about user."""
if context_hint:
embedding = await self.embeddings.embed(context_hint)
return await self.storage.search_facts_by_similarity(
user_id, embedding, limit=10
)
return await self.storage.get_recent_facts(user_id, limit=10)
async def _get_recent_session_summaries(
self,
user_id: str
) -> List[Dict]:
"""Get summaries of recent sessions."""
return await self.storage.get_recent_sessions(
user_id,
limit=5,
with_summary=True
)
async def _get_relevant_historical_context(
self,
user_id: str,
query: Optional[str]
) -> List[Dict]:
"""Search for relevant historical context."""
if not query:
return []
embedding = await self.embeddings.embed(query)
return await self.storage.search_sessions_by_similarity(
user_id, embedding, limit=3
)
async def _generate_session_summary(
self,
messages: List[Dict]
) -> str:
"""Generate concise summary of session."""
formatted_messages = "\n".join([
f"{m['role']}: {m['content'][:500]}"
for m in messages[-20:] # Last 20 messages max
])
prompt = f"""Summarize this conversation in 2-3 sentences. Focus on:
- Key topics discussed
- Decisions made or conclusions reached
- Important information shared
Conversation:
{formatted_messages}
Summary:"""
return await self.llm.complete(prompt, max_tokens=150)
async def _maybe_extract_facts(
self,
user_id: str,
session_id: str,
user_message: str
):
"""Extract facts from user message if any present."""
prompt = f"""Does this message contain factual information about the user that would be worth remembering long-term?
Look for: preferences, personal details, stated intentions, corrections of prior assumptions.
Message: "{user_message}"
If yes, respond with JSON: {{"facts": [{{"fact": "...", "category": "preference|personal|intention|correction", "confidence": 0.0-1.0}}]}}
If no useful facts, respond with: {{"facts": []}}
Response:"""
response = await self.llm.complete(prompt, max_tokens=200)
try:
result = json.loads(response)
for fact_data in result.get('facts', []):
if fact_data.get('confidence', 0) >= 0.8:
embedding = await self.embeddings.embed(fact_data['fact'])
await self.storage.store_fact(
user_id=user_id,
fact=fact_data['fact'],
category=fact_data.get('category'),
confidence=fact_data['confidence'],
embedding=embedding,
source_session_id=session_id
)
except json.JSONDecodeError:
pass # Silently handle malformed responses
async def _maybe_compact_memories(self, user_id: str):
"""Compact memories if threshold exceeded."""
session_count = await self.storage.count_sessions(user_id)
if session_count > self.config.compaction_threshold:
# Get oldest sessions for compaction
oldest = await self.storage.get_oldest_sessions(
user_id,
limit=session_count - self.config.max_short_term_sessions
)
# Generate combined summary
summaries = [s['summary'] for s in oldest if s.get('summary')]
combined = await self._generate_combined_summary(summaries)
# Store in long-term archive
await self.storage.archive_sessions(
user_id=user_id,
session_ids=[s['session_id'] for s in oldest],
combined_summary=combined
)
async def _generate_combined_summary(
self,
summaries: List[str]
) -> str:
"""Generate combined summary for archival."""
prompt = f"""Combine these conversation summaries into a single coherent summary of key themes and information:
{chr(10).join(f'- {s}' for s in summaries)}
Combined summary (preserve all important details):"""
return await self.llm.complete(prompt, max_tokens=300)
def _format_facts(self, facts: List[Dict]) -> str:
"""Format facts for inclusion in context."""
return "\n".join([
f"- {f['fact']} ({f['category']}, confidence: {f['confidence']:.0%})"
for f in facts
])
def _estimate_tokens(self, text: str) -> int:
"""Rough token estimation."""
return len(text) // 4 # Approximate
Integration with Dytto's Context API
Building a multi-session context system from scratch is complex. Dytto provides a production-ready context layer that handles the heavy lifting, letting you focus on building your AI application.
import httpx
from typing import Dict, Any, Optional, List
class DyttoContextClient:
"""
Client for Dytto's context API.
Dytto handles:
- Persistent user context across all sessions
- Automatic fact extraction and storage
- Semantic search across user history
- Real-time context synchronization
- Privacy-preserving context management
"""
def __init__(self, api_key: str, base_url: str = "https://api.dytto.app"):
self.api_key = api_key
self.base_url = base_url
self.client = httpx.AsyncClient(
headers={"Authorization": f"Bearer {api_key}"}
)
async def get_context(
self,
user_id: str,
query: Optional[str] = None,
categories: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Retrieve user context for injection into AI prompts.
Args:
user_id: The user to get context for
query: Optional query to find relevant context
categories: Filter by context categories
Returns:
Context package ready for prompt injection
"""
params = {"user_id": user_id}
if query:
params["query"] = query
if categories:
params["categories"] = ",".join(categories)
response = await self.client.get(
f"{self.base_url}/v1/context",
params=params
)
response.raise_for_status()
return response.json()
async def store_fact(
self,
user_id: str,
fact: str,
category: str,
confidence: float = 0.9,
source: Optional[str] = None
):
"""
Store a fact about a user.
Dytto automatically handles:
- Embedding generation for semantic search
- Deduplication against existing facts
- Conflict resolution with existing knowledge
"""
response = await self.client.post(
f"{self.base_url}/v1/facts",
json={
"user_id": user_id,
"fact": fact,
"category": category,
"confidence": confidence,
"source": source
}
)
response.raise_for_status()
return response.json()
async def search_context(
self,
user_id: str,
query: str,
limit: int = 10
) -> List[Dict]:
"""
Semantic search across user's context history.
"""
response = await self.client.get(
f"{self.base_url}/v1/context/search",
params={
"user_id": user_id,
"query": query,
"limit": limit
}
)
response.raise_for_status()
return response.json()["results"]
async def get_user_summary(self, user_id: str) -> Dict[str, Any]:
"""
Get a comprehensive summary of what we know about a user.
Useful for:
- Onboarding prompts
- Personalization
- Context briefings
"""
response = await self.client.get(
f"{self.base_url}/v1/users/{user_id}/summary"
)
response.raise_for_status()
return response.json()
# Example usage with an AI agent
async def build_personalized_agent_prompt(
dytto: DyttoContextClient,
user_id: str,
user_message: str,
base_prompt: str
) -> str:
"""Build a prompt enriched with user context."""
# Get relevant context for this message
context = await dytto.get_context(
user_id=user_id,
query=user_message
)
# Format context for injection
context_block = ""
if context.get("facts"):
facts_text = "\n".join([
f"- {f['fact']}" for f in context["facts"]
])
context_block += f"\n\nKnown facts about this user:\n{facts_text}"
if context.get("preferences"):
prefs_text = "\n".join([
f"- {k}: {v}" for k, v in context["preferences"].items()
])
context_block += f"\n\nUser preferences:\n{prefs_text}"
if context.get("recent_interactions"):
recent = context["recent_interactions"][:3]
recent_text = "\n".join([
f"- {r['summary']}" for r in recent
])
context_block += f"\n\nRecent interactions:\n{recent_text}"
return f"""{base_prompt}
## User Context{context_block}
## Current Conversation
User: {user_message}
"""
Best Practices for Multi-Session Context
1. Design for Context Limits
Even with sophisticated retrieval, you cannot inject unlimited context. Design your system to prioritize the most relevant information.
class ContextPrioritizer:
"""Prioritize context within token budgets."""
PRIORITY_WEIGHTS = {
"active_task": 1.0, # Highest priority
"recent_decision": 0.9,
"user_preference": 0.8,
"related_history": 0.6,
"general_fact": 0.4
}
def prioritize(
self,
context_items: List[Dict],
token_budget: int
) -> List[Dict]:
"""Select highest-priority items within budget."""
# Score each item
scored = []
for item in context_items:
score = self.PRIORITY_WEIGHTS.get(item['type'], 0.3)
score *= item.get('relevance', 1.0) # From semantic search
score *= item.get('recency_factor', 1.0)
scored.append((score, item))
# Sort by score descending
scored.sort(key=lambda x: x[0], reverse=True)
# Select within budget
selected = []
remaining = token_budget
for score, item in scored:
tokens = self._estimate_tokens(item)
if tokens <= remaining:
selected.append(item)
remaining -= tokens
return selected
2. Handle Context Staleness
Context can become stale. Build mechanisms to detect and handle outdated information.
@dataclass
class ContextValidity:
is_valid: bool
staleness_seconds: float
confidence: float
should_refresh: bool
class StalenessChecker:
"""Check and handle context staleness."""
STALENESS_THRESHOLDS = {
"preference": 86400 * 30, # 30 days
"fact": 86400 * 7, # 7 days
"session_summary": 3600, # 1 hour
"active_task": 300 # 5 minutes
}
def check_validity(
self,
context_item: Dict,
current_time: datetime
) -> ContextValidity:
item_type = context_item.get('type', 'unknown')
threshold = self.STALENESS_THRESHOLDS.get(item_type, 3600)
updated_at = context_item.get('updated_at')
if not updated_at:
return ContextValidity(
is_valid=False,
staleness_seconds=float('inf'),
confidence=0.0,
should_refresh=True
)
age = (current_time - updated_at).total_seconds()
staleness_ratio = age / threshold
return ContextValidity(
is_valid=staleness_ratio < 1.0,
staleness_seconds=age,
confidence=max(0, 1 - staleness_ratio),
should_refresh=staleness_ratio > 0.8
)
3. Provide Context Transparency
Let users understand and control what the AI remembers about them.
async def generate_context_report(
storage,
user_id: str
) -> Dict[str, Any]:
"""Generate user-facing report of stored context."""
facts = await storage.get_all_facts(user_id)
sessions = await storage.get_session_summaries(user_id)
return {
"summary": {
"total_facts": len(facts),
"total_sessions": len(sessions),
"memory_since": sessions[-1]['created_at'] if sessions else None
},
"facts_by_category": group_by(facts, 'category'),
"recent_sessions": sessions[:10],
"data_controls": {
"export_url": f"/api/users/{user_id}/export",
"delete_url": f"/api/users/{user_id}/delete",
"preferences_url": f"/api/users/{user_id}/preferences"
}
}
Common Pitfalls and How to Avoid Them
Pitfall 1: Over-Retrieval
Retrieving too much context can confuse the model and waste tokens. Always limit and prioritize.
Bad:
# Don't do this - retrieves everything
context = await storage.get_all_context(user_id)
prompt = f"{base_prompt}\n\nContext:\n{json.dumps(context)}"
Good:
# Retrieve only what's relevant
relevant_context = await storage.search_context(
user_id=user_id,
query=user_message,
limit=10,
max_tokens=2000
)
prompt = build_prompt_with_context(base_prompt, relevant_context)
Pitfall 2: Ignoring Context Conflicts
Facts can contradict each other. Build conflict detection.
async def detect_conflicts(
storage,
user_id: str,
new_fact: str,
category: str
) -> List[Dict]:
"""Detect potential conflicts with existing facts."""
# Get existing facts in same category
existing = await storage.get_facts(user_id, category=category)
# Use LLM to detect conflicts
conflicts = []
for fact in existing:
prompt = f"""Do these two facts conflict?
Fact 1: {fact['fact']}
Fact 2: {new_fact}
Answer YES or NO, then explain briefly."""
response = await llm.complete(prompt, max_tokens=100)
if response.upper().startswith('YES'):
conflicts.append({
'existing_fact': fact,
'new_fact': new_fact,
'explanation': response
})
return conflicts
Pitfall 3: Privacy Violations
Multi-session context stores sensitive information. Implement proper access controls.
class ContextAccessControl:
"""Enforce access control on context operations."""
async def check_access(
self,
requester_id: str,
target_user_id: str,
operation: str
) -> bool:
"""Check if requester can perform operation on target's context."""
# Users can always access their own context
if requester_id == target_user_id:
return True
# Check for explicit grants
grant = await self.storage.get_access_grant(
requester_id, target_user_id, operation
)
return grant is not None and grant.is_valid()
async def audit_access(
self,
requester_id: str,
target_user_id: str,
operation: str,
result: Any
):
"""Log context access for auditing."""
await self.storage.log_access(
requester_id=requester_id,
target_user_id=target_user_id,
operation=operation,
timestamp=datetime.utcnow(),
result_summary=self._summarize_result(result)
)
Measuring Multi-Session Context Effectiveness
Track metrics to understand how well your context system performs.
class ContextMetrics:
"""Track multi-session context effectiveness."""
async def record_session_metrics(
self,
session_id: str,
user_id: str,
metrics: Dict
):
"""Record metrics for a session."""
await self.storage.insert('context_metrics', {
'session_id': session_id,
'user_id': user_id,
'timestamp': datetime.utcnow(),
# Context retrieval metrics
'context_tokens_used': metrics.get('context_tokens', 0),
'facts_retrieved': metrics.get('facts_count', 0),
'sessions_retrieved': metrics.get('sessions_count', 0),
'retrieval_latency_ms': metrics.get('retrieval_ms', 0),
# Effectiveness metrics
'context_hit_rate': metrics.get('hit_rate', 0),
'user_corrections': metrics.get('corrections', 0),
'repeat_explanations': metrics.get('repeats', 0),
# Quality metrics
'user_satisfaction': metrics.get('satisfaction', None),
'task_completion': metrics.get('completed', None)
})
async def get_effectiveness_report(
self,
time_range: tuple
) -> Dict[str, Any]:
"""Generate effectiveness report."""
metrics = await self.storage.query_metrics(time_range)
return {
'avg_context_tokens': np.mean([m['context_tokens_used'] for m in metrics]),
'avg_retrieval_latency': np.mean([m['retrieval_latency_ms'] for m in metrics]),
'context_hit_rate': np.mean([m['context_hit_rate'] for m in metrics]),
'correction_rate': np.mean([m['user_corrections'] for m in metrics]),
'completion_rate': np.mean([
m['task_completion'] for m in metrics
if m['task_completion'] is not None
])
}
Conclusion
Multi-session AI context transforms AI agents from forgetful assistants into genuine partners that accumulate knowledge and improve over time. The technical challenges are significant—storage, retrieval, synchronization, and privacy all require careful design—but the benefits justify the investment.
The key principles to remember:
-
Design for retrieval, not just storage. It's not enough to store context; you need to retrieve the right context at the right time.
-
Prioritize ruthlessly. Context windows have limits. Build systems that surface the most relevant information within those limits.
-
Handle staleness explicitly. Context ages. Old preferences may no longer apply. Build freshness into your retrieval logic.
-
Respect privacy. Multi-session context is inherently sensitive. Implement proper access controls, auditing, and user transparency.
-
Measure effectiveness. Track whether your context system actually improves outcomes. Reduce user corrections, repeat explanations, and task failures.
Building this infrastructure from scratch is substantial work. For teams focused on building AI applications rather than infrastructure, Dytto provides a production-ready context layer that handles storage, retrieval, synchronization, and privacy out of the box. This lets you focus on what matters: building AI experiences that truly remember.
The future of AI isn't stateless chatbots that forget everything. It's intelligent agents that build genuine understanding over time. Multi-session context is how we get there.
Ready to add persistent context to your AI agents? Try Dytto's context API and start building AI that remembers.