Back to Blog

Multi-Session AI Context: The Complete Guide to Building AI Agents That Remember Across Conversations

Dytto Team
dyttoaicontext-engineeringmemoryllmtutorial

Multi-Session AI Context: The Complete Guide to Building AI Agents That Remember Across Conversations

Building AI agents that maintain context across multiple sessions is one of the most challenging problems in production AI development. While single-session chatbots have become commoditized, multi-session AI context—the ability for an agent to remember, reason about, and act upon information from previous interactions—remains the frontier that separates toy demos from production-grade systems.

This comprehensive guide covers everything you need to know about implementing multi-session context for AI agents: the architecture patterns, storage strategies, synchronization mechanisms, and practical code examples that will help you build AI systems that truly remember.

What Is Multi-Session AI Context?

Multi-session AI context refers to an AI agent's ability to maintain and utilize information across separate conversation sessions. Unlike single-session memory, which exists only within the current conversation window, multi-session context persists beyond session boundaries and can be retrieved in future interactions.

Consider the difference:

Single-session context: A user asks a chatbot about their order status. The chatbot checks the database, provides the status, and forgets everything when the session ends. Tomorrow, the user must re-explain everything.

Multi-session context: The same user returns tomorrow. The AI remembers their previous inquiry, knows their order history, recalls their communication preferences, and can proactively provide an update without being asked.

The distinction might seem subtle, but it fundamentally changes what AI agents can accomplish. Multi-session context enables:

  • Continuous relationship building with users over time
  • Accumulation of relevant knowledge about user preferences
  • Long-running task completion that spans multiple interactions
  • Contextual awareness that improves with every conversation
  • True personalization based on historical interactions

Why Single-Session Memory Falls Short

Most AI applications today operate with what we might call "goldfish memory"—they remember everything brilliantly within a conversation, then promptly forget it all when the session ends. This limitation manifests in several painful ways.

The Monday Morning Amnesia Problem

You spend Friday afternoon working with an AI agent on a complex project. You establish requirements, make design decisions, create a roadmap. Monday morning, you return to continue the work, and the agent acts as if nothing happened. Worse, it might confidently claim that previous work was "completed" without any ability to verify this against actual state.

This isn't just inconvenient—it's actively harmful. The agent's false confidence can lead to:

  • Implementation drift where new work contradicts previous decisions
  • Wasted time re-explaining context that should already be known
  • Quality degradation as the agent makes assumptions instead of checking documented history
  • Erosion of trust as users learn they cannot rely on continuity

Context Window Limitations

Even within a session, LLM context windows impose hard limits. Claude's context window is generous at 200K tokens, but complex projects can easily exceed this. Code reviews, documentation analysis, multi-file refactoring—these tasks frequently push beyond what any context window can hold.

Without multi-session context architecture, developers resort to awkward workarounds: copy-pasting previous outputs, maintaining manual summaries, or simply accepting that context will be lost. None of these scale.

The Personalization Gap

Modern users expect personalization. They expect services to know their preferences, remember their history, and adapt to their needs over time. AI agents operating in single-session mode cannot deliver this. Every interaction starts from zero, requiring users to re-establish context that any good assistant should already have.

Architecture Patterns for Multi-Session Context

Building effective multi-session context requires thoughtful architecture. The right approach depends on your use case, scale requirements, and latency tolerance. Let's examine the major patterns.

Pattern 1: Session-State Storage with Retrieval

The most straightforward pattern stores complete session state and retrieves relevant portions for new sessions.

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Dict, Any
import json

@dataclass
class SessionState:
    session_id: str
    user_id: str
    created_at: datetime
    updated_at: datetime
    messages: List[Dict[str, Any]]
    metadata: Dict[str, Any]
    summary: Optional[str] = None

class MultiSessionContextManager:
    def __init__(self, storage_backend):
        self.storage = storage_backend
    
    async def save_session(self, state: SessionState):
        """Persist session state for future retrieval."""
        await self.storage.upsert(
            table="sessions",
            data={
                "session_id": state.session_id,
                "user_id": state.user_id,
                "created_at": state.created_at.isoformat(),
                "updated_at": datetime.utcnow().isoformat(),
                "messages": json.dumps(state.messages),
                "metadata": json.dumps(state.metadata),
                "summary": state.summary
            }
        )
    
    async def get_relevant_sessions(
        self, 
        user_id: str, 
        query: Optional[str] = None,
        limit: int = 5
    ) -> List[SessionState]:
        """Retrieve sessions relevant to current context."""
        if query:
            # Semantic search across session summaries
            sessions = await self.storage.vector_search(
                table="sessions",
                query=query,
                filter={"user_id": user_id},
                limit=limit
            )
        else:
            # Fallback to recency-based retrieval
            sessions = await self.storage.query(
                table="sessions",
                filter={"user_id": user_id},
                order_by="updated_at DESC",
                limit=limit
            )
        return [self._deserialize_session(s) for s in sessions]
    
    async def build_context_for_session(
        self, 
        user_id: str, 
        current_input: str,
        max_tokens: int = 4000
    ) -> str:
        """Build context string from relevant historical sessions."""
        relevant_sessions = await self.get_relevant_sessions(
            user_id=user_id,
            query=current_input,
            limit=5
        )
        
        context_parts = []
        total_tokens = 0
        
        for session in relevant_sessions:
            session_context = self._format_session_context(session)
            session_tokens = self._estimate_tokens(session_context)
            
            if total_tokens + session_tokens > max_tokens:
                break
                
            context_parts.append(session_context)
            total_tokens += session_tokens
        
        return "\n\n---\n\n".join(context_parts)
    
    def _format_session_context(self, session: SessionState) -> str:
        """Format a session for inclusion in context."""
        if session.summary:
            return f"Previous conversation ({session.updated_at.strftime('%Y-%m-%d')}):\n{session.summary}"
        
        # Fallback to condensed message history
        recent_messages = session.messages[-10:]
        formatted = []
        for msg in recent_messages:
            role = msg.get("role", "unknown")
            content = msg.get("content", "")[:500]
            formatted.append(f"{role}: {content}")
        
        return f"Previous conversation ({session.updated_at.strftime('%Y-%m-%d')}):\n" + "\n".join(formatted)

This pattern works well for applications where:

  • Sessions are relatively self-contained
  • Historical context can be effectively summarized
  • Retrieval latency is acceptable (typically 50-200ms)
  • Storage costs are manageable

Pattern 2: Hierarchical Memory with Compaction

For long-running agent systems, a hierarchical approach manages context more efficiently. This pattern maintains multiple "levels" of memory with automatic compaction.

from enum import Enum
from typing import List, Dict, Any
import asyncio

class MemoryLevel(Enum):
    WORKING = "working"      # Current session, full detail
    SHORT_TERM = "short_term"  # Recent sessions, summarized
    LONG_TERM = "long_term"    # Historical, highly compressed
    SEMANTIC = "semantic"      # Extracted facts and relationships

class HierarchicalMemoryManager:
    def __init__(self, storage, llm_client):
        self.storage = storage
        self.llm = llm_client
        
    async def process_session_end(self, session_id: str, user_id: str):
        """Process session completion with hierarchical storage."""
        # Get full session
        session = await self.storage.get_session(session_id)
        
        # Generate summary for short-term memory
        summary = await self._generate_summary(session.messages)
        
        # Extract semantic facts for long-term storage
        facts = await self._extract_facts(session.messages)
        
        # Store at appropriate levels
        await asyncio.gather(
            self._store_short_term(user_id, session_id, summary),
            self._store_semantic_facts(user_id, facts),
            self._maybe_compact_short_term(user_id)
        )
    
    async def _generate_summary(self, messages: List[Dict]) -> str:
        """Generate concise summary of conversation."""
        prompt = f"""Summarize this conversation in 2-3 sentences, focusing on:
1. Key decisions or conclusions reached
2. Important information shared by the user
3. Any commitments or next steps

Conversation:
{self._format_messages(messages)}

Summary:"""
        
        response = await self.llm.complete(prompt, max_tokens=200)
        return response.strip()
    
    async def _extract_facts(self, messages: List[Dict]) -> List[Dict]:
        """Extract durable facts from conversation."""
        prompt = f"""Extract specific facts from this conversation that would be useful to remember long-term.
Focus on: user preferences, stated facts about themselves, decisions, and commitments.

Conversation:
{self._format_messages(messages)}

Return as JSON array of objects with 'fact', 'category', and 'confidence' fields.
Only include facts with high confidence. Example:
[{{"fact": "User prefers dark mode", "category": "preference", "confidence": 0.95}}]

Facts:"""
        
        response = await self.llm.complete(prompt, max_tokens=500)
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return []
    
    async def _maybe_compact_short_term(self, user_id: str):
        """Compact short-term memory if it exceeds threshold."""
        short_term = await self.storage.get_short_term_memories(user_id)
        
        if len(short_term) > 20:  # Threshold for compaction
            # Summarize oldest memories into long-term storage
            oldest = short_term[:10]
            combined_summary = await self._generate_combined_summary(oldest)
            
            await self.storage.store_long_term_memory(
                user_id=user_id,
                content=combined_summary,
                source_count=len(oldest)
            )
            
            # Remove compacted memories from short-term
            for memory in oldest:
                await self.storage.delete_short_term_memory(memory.id)
    
    async def get_context(self, user_id: str, query: str) -> Dict[str, Any]:
        """Retrieve context from all memory levels."""
        results = await asyncio.gather(
            self.storage.get_short_term_memories(user_id, limit=5),
            self.storage.search_long_term_memories(user_id, query, limit=3),
            self.storage.search_semantic_facts(user_id, query, limit=10)
        )
        
        return {
            "recent_sessions": results[0],
            "historical_context": results[1],
            "known_facts": results[2]
        }

Pattern 3: Event-Sourced Context

For systems requiring auditability or complex state reconstruction, event sourcing provides a powerful foundation.

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Union
from abc import ABC, abstractmethod

@dataclass
class ContextEvent(ABC):
    event_id: str
    user_id: str
    timestamp: datetime
    session_id: str
    
    @abstractmethod
    def apply(self, state: 'UserContextState') -> 'UserContextState':
        pass

@dataclass
class MessageAddedEvent(ContextEvent):
    role: str
    content: str
    
    def apply(self, state: 'UserContextState') -> 'UserContextState':
        state.messages.append({
            "role": self.role,
            "content": self.content,
            "timestamp": self.timestamp.isoformat()
        })
        return state

@dataclass
class FactLearnedEvent(ContextEvent):
    fact: str
    category: str
    confidence: float
    source_message_id: Optional[str] = None
    
    def apply(self, state: 'UserContextState') -> 'UserContextState':
        state.facts.append({
            "fact": self.fact,
            "category": self.category,
            "confidence": self.confidence,
            "learned_at": self.timestamp.isoformat()
        })
        return state

@dataclass
class PreferenceUpdatedEvent(ContextEvent):
    key: str
    value: any
    previous_value: Optional[any] = None
    
    def apply(self, state: 'UserContextState') -> 'UserContextState':
        state.preferences[self.key] = {
            "value": self.value,
            "updated_at": self.timestamp.isoformat()
        }
        return state

@dataclass
class UserContextState:
    user_id: str
    messages: List[Dict]
    facts: List[Dict]
    preferences: Dict[str, Any]
    relationships: List[Dict]
    
    @classmethod
    def initial(cls, user_id: str) -> 'UserContextState':
        return cls(
            user_id=user_id,
            messages=[],
            facts=[],
            preferences={},
            relationships=[]
        )

class EventSourcedContextManager:
    def __init__(self, event_store):
        self.event_store = event_store
        self.state_cache = {}
    
    async def append_event(self, event: ContextEvent):
        """Append event and update cached state."""
        await self.event_store.append(event)
        
        if event.user_id in self.state_cache:
            self.state_cache[event.user_id] = event.apply(
                self.state_cache[event.user_id]
            )
    
    async def get_state(self, user_id: str) -> UserContextState:
        """Reconstruct or retrieve cached state."""
        if user_id in self.state_cache:
            return self.state_cache[user_id]
        
        state = UserContextState.initial(user_id)
        events = await self.event_store.get_events(user_id)
        
        for event in events:
            state = event.apply(state)
        
        self.state_cache[user_id] = state
        return state
    
    async def get_state_at_time(
        self, 
        user_id: str, 
        timestamp: datetime
    ) -> UserContextState:
        """Reconstruct state at a specific point in time."""
        state = UserContextState.initial(user_id)
        events = await self.event_store.get_events(
            user_id, 
            before=timestamp
        )
        
        for event in events:
            state = event.apply(state)
        
        return state

Storage Backends for Multi-Session Context

Choosing the right storage backend significantly impacts performance, cost, and capabilities of your multi-session context system.

PostgreSQL with pgvector

PostgreSQL with the pgvector extension provides an excellent balance of reliability, flexibility, and vector search capabilities.

import asyncpg
from typing import List, Dict, Optional
import json
import numpy as np

class PostgresContextStorage:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None
    
    async def initialize(self):
        """Initialize connection pool and ensure schema exists."""
        self.pool = await asyncpg.create_pool(self.connection_string)
        
        async with self.pool.acquire() as conn:
            await conn.execute("""
                CREATE EXTENSION IF NOT EXISTS vector;
                
                CREATE TABLE IF NOT EXISTS user_sessions (
                    id SERIAL PRIMARY KEY,
                    session_id VARCHAR(255) UNIQUE NOT NULL,
                    user_id VARCHAR(255) NOT NULL,
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    updated_at TIMESTAMPTZ DEFAULT NOW(),
                    messages JSONB NOT NULL DEFAULT '[]',
                    metadata JSONB NOT NULL DEFAULT '{}',
                    summary TEXT,
                    summary_embedding vector(1536)
                );
                
                CREATE TABLE IF NOT EXISTS user_facts (
                    id SERIAL PRIMARY KEY,
                    user_id VARCHAR(255) NOT NULL,
                    fact TEXT NOT NULL,
                    category VARCHAR(100),
                    confidence FLOAT NOT NULL,
                    embedding vector(1536),
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    source_session_id VARCHAR(255)
                );
                
                CREATE INDEX IF NOT EXISTS idx_sessions_user_id 
                    ON user_sessions(user_id);
                CREATE INDEX IF NOT EXISTS idx_sessions_updated 
                    ON user_sessions(updated_at DESC);
                CREATE INDEX IF NOT EXISTS idx_facts_user_id 
                    ON user_facts(user_id);
                CREATE INDEX IF NOT EXISTS idx_sessions_embedding 
                    ON user_sessions USING ivfflat (summary_embedding vector_cosine_ops);
                CREATE INDEX IF NOT EXISTS idx_facts_embedding 
                    ON user_facts USING ivfflat (embedding vector_cosine_ops);
            """)
    
    async def upsert_session(
        self,
        session_id: str,
        user_id: str,
        messages: List[Dict],
        metadata: Dict,
        summary: Optional[str] = None,
        summary_embedding: Optional[List[float]] = None
    ):
        """Insert or update a session."""
        async with self.pool.acquire() as conn:
            embedding_str = None
            if summary_embedding:
                embedding_str = f"[{','.join(map(str, summary_embedding))}]"
            
            await conn.execute("""
                INSERT INTO user_sessions 
                    (session_id, user_id, messages, metadata, summary, summary_embedding)
                VALUES ($1, $2, $3, $4, $5, $6::vector)
                ON CONFLICT (session_id) DO UPDATE SET
                    messages = EXCLUDED.messages,
                    metadata = EXCLUDED.metadata,
                    summary = EXCLUDED.summary,
                    summary_embedding = EXCLUDED.summary_embedding,
                    updated_at = NOW()
            """, session_id, user_id, json.dumps(messages), 
                json.dumps(metadata), summary, embedding_str)
    
    async def search_sessions_by_similarity(
        self,
        user_id: str,
        query_embedding: List[float],
        limit: int = 5
    ) -> List[Dict]:
        """Find sessions similar to query embedding."""
        async with self.pool.acquire() as conn:
            embedding_str = f"[{','.join(map(str, query_embedding))}]"
            
            rows = await conn.fetch("""
                SELECT 
                    session_id,
                    user_id,
                    messages,
                    metadata,
                    summary,
                    created_at,
                    updated_at,
                    1 - (summary_embedding <=> $1::vector) as similarity
                FROM user_sessions
                WHERE user_id = $2 AND summary_embedding IS NOT NULL
                ORDER BY summary_embedding <=> $1::vector
                LIMIT $3
            """, embedding_str, user_id, limit)
            
            return [dict(row) for row in rows]
    
    async def search_facts_by_similarity(
        self,
        user_id: str,
        query_embedding: List[float],
        limit: int = 10
    ) -> List[Dict]:
        """Find facts similar to query embedding."""
        async with self.pool.acquire() as conn:
            embedding_str = f"[{','.join(map(str, query_embedding))}]"
            
            rows = await conn.fetch("""
                SELECT 
                    fact,
                    category,
                    confidence,
                    created_at,
                    1 - (embedding <=> $1::vector) as similarity
                FROM user_facts
                WHERE user_id = $2 AND embedding IS NOT NULL
                ORDER BY embedding <=> $1::vector
                LIMIT $3
            """, embedding_str, user_id, limit)
            
            return [dict(row) for row in rows]

Redis for High-Performance Caching

For applications requiring sub-millisecond latency on context retrieval, Redis provides excellent caching capabilities.

import redis.asyncio as redis
import json
from typing import Optional, Dict, List
from datetime import timedelta

class RedisContextCache:
    def __init__(self, redis_url: str, default_ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.default_ttl = default_ttl
    
    async def cache_user_context(
        self,
        user_id: str,
        context: Dict,
        ttl: Optional[int] = None
    ):
        """Cache user context with TTL."""
        key = f"context:{user_id}"
        await self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(context)
        )
    
    async def get_cached_context(self, user_id: str) -> Optional[Dict]:
        """Retrieve cached context."""
        key = f"context:{user_id}"
        data = await self.redis.get(key)
        return json.loads(data) if data else None
    
    async def update_session_context(
        self,
        user_id: str,
        session_id: str,
        update: Dict
    ):
        """Update specific session within cached context."""
        key = f"context:{user_id}:sessions"
        await self.redis.hset(key, session_id, json.dumps(update))
        await self.redis.expire(key, self.default_ttl)
    
    async def get_recent_sessions(
        self,
        user_id: str,
        limit: int = 5
    ) -> List[Dict]:
        """Get recently accessed sessions from cache."""
        key = f"context:{user_id}:recent"
        sessions = await self.redis.lrange(key, 0, limit - 1)
        return [json.loads(s) for s in sessions]
    
    async def add_to_recent_sessions(
        self,
        user_id: str,
        session_summary: Dict,
        max_recent: int = 10
    ):
        """Add session to recent sessions list."""
        key = f"context:{user_id}:recent"
        await self.redis.lpush(key, json.dumps(session_summary))
        await self.redis.ltrim(key, 0, max_recent - 1)
        await self.redis.expire(key, self.default_ttl)

Synchronization Strategies for Multi-Agent Systems

When multiple AI agents or sessions need to share context, synchronization becomes critical. Without proper coordination, agents can develop inconsistent views of user state.

Write-Through with Event Broadcasting

import asyncio
from typing import Callable, List
import json

class ContextSynchronizer:
    def __init__(self, primary_storage, cache, message_broker):
        self.storage = primary_storage
        self.cache = cache
        self.broker = message_broker
        self.subscribers: List[Callable] = []
    
    async def update_context(
        self,
        user_id: str,
        update: Dict,
        source_agent: str
    ):
        """Update context with write-through and event broadcast."""
        # Write to primary storage
        await self.storage.update_user_context(user_id, update)
        
        # Update cache
        await self.cache.invalidate_and_set(user_id, update)
        
        # Broadcast update event
        event = {
            "type": "context_updated",
            "user_id": user_id,
            "update": update,
            "source_agent": source_agent,
            "timestamp": datetime.utcnow().isoformat()
        }
        await self.broker.publish(f"context:{user_id}", json.dumps(event))
        
        # Notify local subscribers
        for subscriber in self.subscribers:
            asyncio.create_task(subscriber(event))
    
    async def subscribe_to_updates(
        self,
        user_id: str,
        callback: Callable
    ):
        """Subscribe to context updates for a user."""
        self.subscribers.append(callback)
        await self.broker.subscribe(f"context:{user_id}", callback)
    
    async def get_context_with_freshness(
        self,
        user_id: str,
        max_staleness_seconds: int = 60
    ) -> Dict:
        """Get context with staleness guarantee."""
        cached = await self.cache.get_with_age(user_id)
        
        if cached and cached['age_seconds'] < max_staleness_seconds:
            return cached['data']
        
        # Cache miss or too stale - fetch from primary
        fresh = await self.storage.get_user_context(user_id)
        await self.cache.set(user_id, fresh)
        return fresh

Conflict Resolution for Concurrent Updates

When multiple agents update context simultaneously, conflicts must be resolved consistently.

from dataclasses import dataclass
from typing import Optional, Dict, Any
from enum import Enum

class ConflictResolution(Enum):
    LAST_WRITE_WINS = "last_write_wins"
    MERGE = "merge"
    REJECT = "reject"

@dataclass
class VersionedContext:
    user_id: str
    data: Dict[str, Any]
    version: int
    updated_at: datetime
    updated_by: str

class OptimisticConcurrencyManager:
    def __init__(self, storage):
        self.storage = storage
    
    async def update_with_optimistic_lock(
        self,
        user_id: str,
        update_fn: Callable[[Dict], Dict],
        agent_id: str,
        max_retries: int = 3
    ) -> VersionedContext:
        """Update context with optimistic concurrency control."""
        for attempt in range(max_retries):
            # Read current version
            current = await self.storage.get_versioned_context(user_id)
            
            # Apply update function
            new_data = update_fn(current.data.copy())
            
            # Attempt conditional write
            try:
                updated = await self.storage.conditional_update(
                    user_id=user_id,
                    new_data=new_data,
                    expected_version=current.version,
                    updated_by=agent_id
                )
                return updated
            except VersionConflictError:
                if attempt == max_retries - 1:
                    raise
                # Wait with exponential backoff before retry
                await asyncio.sleep(0.1 * (2 ** attempt))
        
        raise MaxRetriesExceededError(f"Failed to update after {max_retries} attempts")
    
    async def merge_conflicting_updates(
        self,
        base: Dict,
        update_a: Dict,
        update_b: Dict
    ) -> Dict:
        """Three-way merge for conflicting updates."""
        result = base.copy()
        
        # Find keys modified in each update
        a_changes = {k: v for k, v in update_a.items() if base.get(k) != v}
        b_changes = {k: v for k, v in update_b.items() if base.get(k) != v}
        
        # Apply non-conflicting changes
        for key, value in a_changes.items():
            if key not in b_changes:
                result[key] = value
        
        for key, value in b_changes.items():
            if key not in a_changes:
                result[key] = value
        
        # Handle conflicts (here: last-write-wins based on timestamp)
        for key in set(a_changes.keys()) & set(b_changes.keys()):
            # Could implement more sophisticated merging based on data type
            result[key] = update_b[key]  # Simplistic: B wins
        
        return result

Building a Complete Multi-Session Context System

Let's bring everything together into a production-ready system that handles the full lifecycle of multi-session context.

from dataclasses import dataclass
from typing import Optional, Dict, List, Any
from datetime import datetime
import asyncio

@dataclass
class ContextConfig:
    max_working_memory_tokens: int = 4000
    max_short_term_sessions: int = 20
    max_facts_per_query: int = 15
    compaction_threshold: int = 50
    cache_ttl_seconds: int = 3600
    embedding_model: str = "text-embedding-3-small"

class MultiSessionContextSystem:
    """
    Complete multi-session context management system.
    
    Handles:
    - Session persistence and retrieval
    - Hierarchical memory with automatic compaction
    - Semantic search across historical context
    - Real-time synchronization across agents
    - Caching for low-latency access
    """
    
    def __init__(
        self,
        storage: PostgresContextStorage,
        cache: RedisContextCache,
        embedding_client,
        llm_client,
        config: ContextConfig = None
    ):
        self.storage = storage
        self.cache = cache
        self.embeddings = embedding_client
        self.llm = llm_client
        self.config = config or ContextConfig()
    
    async def start_session(
        self,
        user_id: str,
        session_id: str,
        initial_context: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Initialize a new session with relevant historical context.
        
        Returns context package to inject into agent prompt.
        """
        # Gather context from all memory levels
        context_parts = await asyncio.gather(
            self._get_user_facts(user_id, initial_context),
            self._get_recent_session_summaries(user_id),
            self._get_relevant_historical_context(user_id, initial_context)
        )
        
        facts, recent_sessions, historical = context_parts
        
        # Build context package
        context_package = {
            "user_facts": facts,
            "recent_conversations": recent_sessions,
            "relevant_history": historical,
            "session_id": session_id
        }
        
        # Cache for the session
        await self.cache.cache_user_context(user_id, context_package)
        
        return context_package
    
    async def process_message(
        self,
        user_id: str,
        session_id: str,
        role: str,
        content: str
    ):
        """Process a message and update context accordingly."""
        # Store message
        await self.storage.append_message(session_id, {
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat()
        })
        
        # Extract facts in background if it's an assistant response
        if role == "user":
            asyncio.create_task(
                self._maybe_extract_facts(user_id, session_id, content)
            )
    
    async def end_session(
        self,
        user_id: str,
        session_id: str
    ):
        """Finalize session and process for long-term storage."""
        # Get full session
        session = await self.storage.get_session(session_id)
        
        if not session or len(session.messages) < 2:
            return  # Nothing worth storing
        
        # Generate summary
        summary = await self._generate_session_summary(session.messages)
        
        # Generate embedding for semantic search
        embedding = await self.embeddings.embed(summary)
        
        # Update session with summary and embedding
        await self.storage.upsert_session(
            session_id=session_id,
            user_id=user_id,
            messages=session.messages,
            metadata=session.metadata,
            summary=summary,
            summary_embedding=embedding
        )
        
        # Check if compaction needed
        await self._maybe_compact_memories(user_id)
        
        # Invalidate cache
        await self.cache.invalidate(user_id)
    
    async def query_context(
        self,
        user_id: str,
        query: str,
        max_tokens: int = None
    ) -> str:
        """
        Query for relevant context to inject into a prompt.
        
        Returns formatted context string within token budget.
        """
        max_tokens = max_tokens or self.config.max_working_memory_tokens
        
        # Get query embedding
        query_embedding = await self.embeddings.embed(query)
        
        # Search across all context types
        results = await asyncio.gather(
            self.storage.search_facts_by_similarity(
                user_id, query_embedding, 
                limit=self.config.max_facts_per_query
            ),
            self.storage.search_sessions_by_similarity(
                user_id, query_embedding,
                limit=10
            )
        )
        
        facts, sessions = results
        
        # Build context within token budget
        context_parts = []
        token_count = 0
        
        # Add relevant facts first (usually most valuable)
        if facts:
            facts_text = self._format_facts(facts)
            facts_tokens = self._estimate_tokens(facts_text)
            if token_count + facts_tokens <= max_tokens:
                context_parts.append(f"Known facts about user:\n{facts_text}")
                token_count += facts_tokens
        
        # Add session summaries within remaining budget
        for session in sessions:
            if session.get('summary'):
                session_text = f"Previous conversation ({session['updated_at'].strftime('%Y-%m-%d')}): {session['summary']}"
                session_tokens = self._estimate_tokens(session_text)
                if token_count + session_tokens <= max_tokens:
                    context_parts.append(session_text)
                    token_count += session_tokens
                else:
                    break
        
        return "\n\n".join(context_parts)
    
    async def _get_user_facts(
        self,
        user_id: str,
        context_hint: Optional[str]
    ) -> List[Dict]:
        """Get relevant facts about user."""
        if context_hint:
            embedding = await self.embeddings.embed(context_hint)
            return await self.storage.search_facts_by_similarity(
                user_id, embedding, limit=10
            )
        return await self.storage.get_recent_facts(user_id, limit=10)
    
    async def _get_recent_session_summaries(
        self,
        user_id: str
    ) -> List[Dict]:
        """Get summaries of recent sessions."""
        return await self.storage.get_recent_sessions(
            user_id, 
            limit=5,
            with_summary=True
        )
    
    async def _get_relevant_historical_context(
        self,
        user_id: str,
        query: Optional[str]
    ) -> List[Dict]:
        """Search for relevant historical context."""
        if not query:
            return []
        
        embedding = await self.embeddings.embed(query)
        return await self.storage.search_sessions_by_similarity(
            user_id, embedding, limit=3
        )
    
    async def _generate_session_summary(
        self,
        messages: List[Dict]
    ) -> str:
        """Generate concise summary of session."""
        formatted_messages = "\n".join([
            f"{m['role']}: {m['content'][:500]}"
            for m in messages[-20:]  # Last 20 messages max
        ])
        
        prompt = f"""Summarize this conversation in 2-3 sentences. Focus on:
- Key topics discussed
- Decisions made or conclusions reached
- Important information shared

Conversation:
{formatted_messages}

Summary:"""
        
        return await self.llm.complete(prompt, max_tokens=150)
    
    async def _maybe_extract_facts(
        self,
        user_id: str,
        session_id: str,
        user_message: str
    ):
        """Extract facts from user message if any present."""
        prompt = f"""Does this message contain factual information about the user that would be worth remembering long-term?
Look for: preferences, personal details, stated intentions, corrections of prior assumptions.

Message: "{user_message}"

If yes, respond with JSON: {{"facts": [{{"fact": "...", "category": "preference|personal|intention|correction", "confidence": 0.0-1.0}}]}}
If no useful facts, respond with: {{"facts": []}}

Response:"""
        
        response = await self.llm.complete(prompt, max_tokens=200)
        
        try:
            result = json.loads(response)
            for fact_data in result.get('facts', []):
                if fact_data.get('confidence', 0) >= 0.8:
                    embedding = await self.embeddings.embed(fact_data['fact'])
                    await self.storage.store_fact(
                        user_id=user_id,
                        fact=fact_data['fact'],
                        category=fact_data.get('category'),
                        confidence=fact_data['confidence'],
                        embedding=embedding,
                        source_session_id=session_id
                    )
        except json.JSONDecodeError:
            pass  # Silently handle malformed responses
    
    async def _maybe_compact_memories(self, user_id: str):
        """Compact memories if threshold exceeded."""
        session_count = await self.storage.count_sessions(user_id)
        
        if session_count > self.config.compaction_threshold:
            # Get oldest sessions for compaction
            oldest = await self.storage.get_oldest_sessions(
                user_id, 
                limit=session_count - self.config.max_short_term_sessions
            )
            
            # Generate combined summary
            summaries = [s['summary'] for s in oldest if s.get('summary')]
            combined = await self._generate_combined_summary(summaries)
            
            # Store in long-term archive
            await self.storage.archive_sessions(
                user_id=user_id,
                session_ids=[s['session_id'] for s in oldest],
                combined_summary=combined
            )
    
    async def _generate_combined_summary(
        self,
        summaries: List[str]
    ) -> str:
        """Generate combined summary for archival."""
        prompt = f"""Combine these conversation summaries into a single coherent summary of key themes and information:

{chr(10).join(f'- {s}' for s in summaries)}

Combined summary (preserve all important details):"""
        
        return await self.llm.complete(prompt, max_tokens=300)
    
    def _format_facts(self, facts: List[Dict]) -> str:
        """Format facts for inclusion in context."""
        return "\n".join([
            f"- {f['fact']} ({f['category']}, confidence: {f['confidence']:.0%})"
            for f in facts
        ])
    
    def _estimate_tokens(self, text: str) -> int:
        """Rough token estimation."""
        return len(text) // 4  # Approximate

Integration with Dytto's Context API

Building a multi-session context system from scratch is complex. Dytto provides a production-ready context layer that handles the heavy lifting, letting you focus on building your AI application.

import httpx
from typing import Dict, Any, Optional, List

class DyttoContextClient:
    """
    Client for Dytto's context API.
    
    Dytto handles:
    - Persistent user context across all sessions
    - Automatic fact extraction and storage
    - Semantic search across user history
    - Real-time context synchronization
    - Privacy-preserving context management
    """
    
    def __init__(self, api_key: str, base_url: str = "https://api.dytto.app"):
        self.api_key = api_key
        self.base_url = base_url
        self.client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {api_key}"}
        )
    
    async def get_context(
        self,
        user_id: str,
        query: Optional[str] = None,
        categories: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """
        Retrieve user context for injection into AI prompts.
        
        Args:
            user_id: The user to get context for
            query: Optional query to find relevant context
            categories: Filter by context categories
        
        Returns:
            Context package ready for prompt injection
        """
        params = {"user_id": user_id}
        if query:
            params["query"] = query
        if categories:
            params["categories"] = ",".join(categories)
        
        response = await self.client.get(
            f"{self.base_url}/v1/context",
            params=params
        )
        response.raise_for_status()
        return response.json()
    
    async def store_fact(
        self,
        user_id: str,
        fact: str,
        category: str,
        confidence: float = 0.9,
        source: Optional[str] = None
    ):
        """
        Store a fact about a user.
        
        Dytto automatically handles:
        - Embedding generation for semantic search
        - Deduplication against existing facts
        - Conflict resolution with existing knowledge
        """
        response = await self.client.post(
            f"{self.base_url}/v1/facts",
            json={
                "user_id": user_id,
                "fact": fact,
                "category": category,
                "confidence": confidence,
                "source": source
            }
        )
        response.raise_for_status()
        return response.json()
    
    async def search_context(
        self,
        user_id: str,
        query: str,
        limit: int = 10
    ) -> List[Dict]:
        """
        Semantic search across user's context history.
        """
        response = await self.client.get(
            f"{self.base_url}/v1/context/search",
            params={
                "user_id": user_id,
                "query": query,
                "limit": limit
            }
        )
        response.raise_for_status()
        return response.json()["results"]
    
    async def get_user_summary(self, user_id: str) -> Dict[str, Any]:
        """
        Get a comprehensive summary of what we know about a user.
        
        Useful for:
        - Onboarding prompts
        - Personalization
        - Context briefings
        """
        response = await self.client.get(
            f"{self.base_url}/v1/users/{user_id}/summary"
        )
        response.raise_for_status()
        return response.json()

# Example usage with an AI agent
async def build_personalized_agent_prompt(
    dytto: DyttoContextClient,
    user_id: str,
    user_message: str,
    base_prompt: str
) -> str:
    """Build a prompt enriched with user context."""
    
    # Get relevant context for this message
    context = await dytto.get_context(
        user_id=user_id,
        query=user_message
    )
    
    # Format context for injection
    context_block = ""
    
    if context.get("facts"):
        facts_text = "\n".join([
            f"- {f['fact']}" for f in context["facts"]
        ])
        context_block += f"\n\nKnown facts about this user:\n{facts_text}"
    
    if context.get("preferences"):
        prefs_text = "\n".join([
            f"- {k}: {v}" for k, v in context["preferences"].items()
        ])
        context_block += f"\n\nUser preferences:\n{prefs_text}"
    
    if context.get("recent_interactions"):
        recent = context["recent_interactions"][:3]
        recent_text = "\n".join([
            f"- {r['summary']}" for r in recent
        ])
        context_block += f"\n\nRecent interactions:\n{recent_text}"
    
    return f"""{base_prompt}

## User Context{context_block}

## Current Conversation
User: {user_message}
"""

Best Practices for Multi-Session Context

1. Design for Context Limits

Even with sophisticated retrieval, you cannot inject unlimited context. Design your system to prioritize the most relevant information.

class ContextPrioritizer:
    """Prioritize context within token budgets."""
    
    PRIORITY_WEIGHTS = {
        "active_task": 1.0,      # Highest priority
        "recent_decision": 0.9,
        "user_preference": 0.8,
        "related_history": 0.6,
        "general_fact": 0.4
    }
    
    def prioritize(
        self,
        context_items: List[Dict],
        token_budget: int
    ) -> List[Dict]:
        """Select highest-priority items within budget."""
        # Score each item
        scored = []
        for item in context_items:
            score = self.PRIORITY_WEIGHTS.get(item['type'], 0.3)
            score *= item.get('relevance', 1.0)  # From semantic search
            score *= item.get('recency_factor', 1.0)
            scored.append((score, item))
        
        # Sort by score descending
        scored.sort(key=lambda x: x[0], reverse=True)
        
        # Select within budget
        selected = []
        remaining = token_budget
        
        for score, item in scored:
            tokens = self._estimate_tokens(item)
            if tokens <= remaining:
                selected.append(item)
                remaining -= tokens
        
        return selected

2. Handle Context Staleness

Context can become stale. Build mechanisms to detect and handle outdated information.

@dataclass
class ContextValidity:
    is_valid: bool
    staleness_seconds: float
    confidence: float
    should_refresh: bool

class StalenessChecker:
    """Check and handle context staleness."""
    
    STALENESS_THRESHOLDS = {
        "preference": 86400 * 30,  # 30 days
        "fact": 86400 * 7,         # 7 days
        "session_summary": 3600,   # 1 hour
        "active_task": 300         # 5 minutes
    }
    
    def check_validity(
        self,
        context_item: Dict,
        current_time: datetime
    ) -> ContextValidity:
        item_type = context_item.get('type', 'unknown')
        threshold = self.STALENESS_THRESHOLDS.get(item_type, 3600)
        
        updated_at = context_item.get('updated_at')
        if not updated_at:
            return ContextValidity(
                is_valid=False,
                staleness_seconds=float('inf'),
                confidence=0.0,
                should_refresh=True
            )
        
        age = (current_time - updated_at).total_seconds()
        staleness_ratio = age / threshold
        
        return ContextValidity(
            is_valid=staleness_ratio < 1.0,
            staleness_seconds=age,
            confidence=max(0, 1 - staleness_ratio),
            should_refresh=staleness_ratio > 0.8
        )

3. Provide Context Transparency

Let users understand and control what the AI remembers about them.

async def generate_context_report(
    storage,
    user_id: str
) -> Dict[str, Any]:
    """Generate user-facing report of stored context."""
    
    facts = await storage.get_all_facts(user_id)
    sessions = await storage.get_session_summaries(user_id)
    
    return {
        "summary": {
            "total_facts": len(facts),
            "total_sessions": len(sessions),
            "memory_since": sessions[-1]['created_at'] if sessions else None
        },
        "facts_by_category": group_by(facts, 'category'),
        "recent_sessions": sessions[:10],
        "data_controls": {
            "export_url": f"/api/users/{user_id}/export",
            "delete_url": f"/api/users/{user_id}/delete",
            "preferences_url": f"/api/users/{user_id}/preferences"
        }
    }

Common Pitfalls and How to Avoid Them

Pitfall 1: Over-Retrieval

Retrieving too much context can confuse the model and waste tokens. Always limit and prioritize.

Bad:

# Don't do this - retrieves everything
context = await storage.get_all_context(user_id)
prompt = f"{base_prompt}\n\nContext:\n{json.dumps(context)}"

Good:

# Retrieve only what's relevant
relevant_context = await storage.search_context(
    user_id=user_id,
    query=user_message,
    limit=10,
    max_tokens=2000
)
prompt = build_prompt_with_context(base_prompt, relevant_context)

Pitfall 2: Ignoring Context Conflicts

Facts can contradict each other. Build conflict detection.

async def detect_conflicts(
    storage,
    user_id: str,
    new_fact: str,
    category: str
) -> List[Dict]:
    """Detect potential conflicts with existing facts."""
    
    # Get existing facts in same category
    existing = await storage.get_facts(user_id, category=category)
    
    # Use LLM to detect conflicts
    conflicts = []
    for fact in existing:
        prompt = f"""Do these two facts conflict?
Fact 1: {fact['fact']}
Fact 2: {new_fact}

Answer YES or NO, then explain briefly."""
        
        response = await llm.complete(prompt, max_tokens=100)
        if response.upper().startswith('YES'):
            conflicts.append({
                'existing_fact': fact,
                'new_fact': new_fact,
                'explanation': response
            })
    
    return conflicts

Pitfall 3: Privacy Violations

Multi-session context stores sensitive information. Implement proper access controls.

class ContextAccessControl:
    """Enforce access control on context operations."""
    
    async def check_access(
        self,
        requester_id: str,
        target_user_id: str,
        operation: str
    ) -> bool:
        """Check if requester can perform operation on target's context."""
        
        # Users can always access their own context
        if requester_id == target_user_id:
            return True
        
        # Check for explicit grants
        grant = await self.storage.get_access_grant(
            requester_id, target_user_id, operation
        )
        
        return grant is not None and grant.is_valid()
    
    async def audit_access(
        self,
        requester_id: str,
        target_user_id: str,
        operation: str,
        result: Any
    ):
        """Log context access for auditing."""
        await self.storage.log_access(
            requester_id=requester_id,
            target_user_id=target_user_id,
            operation=operation,
            timestamp=datetime.utcnow(),
            result_summary=self._summarize_result(result)
        )

Measuring Multi-Session Context Effectiveness

Track metrics to understand how well your context system performs.

class ContextMetrics:
    """Track multi-session context effectiveness."""
    
    async def record_session_metrics(
        self,
        session_id: str,
        user_id: str,
        metrics: Dict
    ):
        """Record metrics for a session."""
        await self.storage.insert('context_metrics', {
            'session_id': session_id,
            'user_id': user_id,
            'timestamp': datetime.utcnow(),
            
            # Context retrieval metrics
            'context_tokens_used': metrics.get('context_tokens', 0),
            'facts_retrieved': metrics.get('facts_count', 0),
            'sessions_retrieved': metrics.get('sessions_count', 0),
            'retrieval_latency_ms': metrics.get('retrieval_ms', 0),
            
            # Effectiveness metrics
            'context_hit_rate': metrics.get('hit_rate', 0),
            'user_corrections': metrics.get('corrections', 0),
            'repeat_explanations': metrics.get('repeats', 0),
            
            # Quality metrics
            'user_satisfaction': metrics.get('satisfaction', None),
            'task_completion': metrics.get('completed', None)
        })
    
    async def get_effectiveness_report(
        self,
        time_range: tuple
    ) -> Dict[str, Any]:
        """Generate effectiveness report."""
        metrics = await self.storage.query_metrics(time_range)
        
        return {
            'avg_context_tokens': np.mean([m['context_tokens_used'] for m in metrics]),
            'avg_retrieval_latency': np.mean([m['retrieval_latency_ms'] for m in metrics]),
            'context_hit_rate': np.mean([m['context_hit_rate'] for m in metrics]),
            'correction_rate': np.mean([m['user_corrections'] for m in metrics]),
            'completion_rate': np.mean([
                m['task_completion'] for m in metrics 
                if m['task_completion'] is not None
            ])
        }

Conclusion

Multi-session AI context transforms AI agents from forgetful assistants into genuine partners that accumulate knowledge and improve over time. The technical challenges are significant—storage, retrieval, synchronization, and privacy all require careful design—but the benefits justify the investment.

The key principles to remember:

  1. Design for retrieval, not just storage. It's not enough to store context; you need to retrieve the right context at the right time.

  2. Prioritize ruthlessly. Context windows have limits. Build systems that surface the most relevant information within those limits.

  3. Handle staleness explicitly. Context ages. Old preferences may no longer apply. Build freshness into your retrieval logic.

  4. Respect privacy. Multi-session context is inherently sensitive. Implement proper access controls, auditing, and user transparency.

  5. Measure effectiveness. Track whether your context system actually improves outcomes. Reduce user corrections, repeat explanations, and task failures.

Building this infrastructure from scratch is substantial work. For teams focused on building AI applications rather than infrastructure, Dytto provides a production-ready context layer that handles storage, retrieval, synchronization, and privacy out of the box. This lets you focus on what matters: building AI experiences that truly remember.

The future of AI isn't stateless chatbots that forget everything. It's intelligent agents that build genuine understanding over time. Multi-session context is how we get there.


Ready to add persistent context to your AI agents? Try Dytto's context API and start building AI that remembers.

All posts
Published on