Back to Blog

Stateful AI Agents Tutorial: The Complete Developer's Guide to Building AI Systems That Actually Remember

Dytto Team
dyttoai-agentsstateful-agentsllmmemory-managementlanggraphtutorialpython

Stateful AI Agents Tutorial: The Complete Developer's Guide to Building AI Systems That Actually Remember

Building AI agents that remember past interactions is one of the most impactful capabilities you can add to your application. While large language models like GPT-4 and Claude possess vast knowledge, they're fundamentally stateless—each API call starts fresh with no memory of previous conversations unless you explicitly build that capability.

This tutorial walks you through everything you need to know about stateful AI agents: what they are, why they matter, and how to implement them in production systems. Whether you're building a customer support bot that needs to remember user preferences or an autonomous research agent that accumulates findings across sessions, understanding state management is essential.

What Is a Stateful AI Agent?

A stateful AI agent is an AI system that maintains persistent memory across interactions. Unlike stateless agents that treat each request independently, stateful agents can:

  • Remember conversation history from previous sessions
  • Track user preferences and learned behaviors over time
  • Maintain workflow progress across multi-step tasks
  • Build and update a knowledge base from accumulated interactions

The key distinction is simple: a stateless agent handles each request as a standalone transaction, while a stateful agent reads prior state from an external store before responding and writes updated state back afterward.

Here's what this looks like in practice:

# Stateless agent pattern
def stateless_agent(user_input: str) -> str:
    prompt = f"User: {user_input}"
    response = llm.generate(prompt)
    return response  # Nothing saved

# Stateful agent pattern
def stateful_agent(user_id: str, user_input: str) -> str:
    # Load prior context
    state = state_store.get(user_id)
    conversation_history = state.get("messages", [])
    user_preferences = state.get("preferences", {})
    
    # Build context-aware prompt
    prompt = build_prompt(conversation_history, user_preferences, user_input)
    response = llm.generate(prompt)
    
    # Update and persist state
    conversation_history.append({"user": user_input, "assistant": response})
    state["messages"] = conversation_history
    state_store.set(user_id, state)
    
    return response

The agent "remembers" because you made it remember—through explicit state management infrastructure.

Why Stateless LLMs Need Stateful Architecture

Large language models are completely stateless by default. When you make an API call to GPT-4, Claude, or any other LLM, the model has no memory of your previous requests. All the context it has comes from:

  1. Training data: Static knowledge compressed into model weights
  2. Context window: The tokens you send with the current request

What appears as "chat memory" in most LLM SDKs is actually client-side state that your code accumulates and resends with each request. The illusion of memory is entirely on your side.

This creates several limitations for production applications:

The Context Window Problem

Even with modern context windows reaching 100K+ tokens, you can't simply stuff everything into the prompt:

  • Token costs grow linearly: Sending 50K tokens per request gets expensive fast
  • Latency increases: Larger prompts take longer to process
  • Context pollution degrades performance: Too much irrelevant context can actually hurt response quality
  • Hard truncation at limits: Eventually you hit the wall and lose older context

Session Persistence Gaps

Context windows reset with each API call. If your user closes the tab and returns tomorrow, that context is gone unless you've persisted it somewhere. For applications requiring continuity across:

  • Multiple days or weeks of interaction
  • Different devices and clients
  • System restarts and deployments

...you need external state storage.

Cross-Session Learning

The real power of stateful agents comes from learning patterns over time. A customer support agent that notices you always ask about the same feature. A research assistant that remembers which sources you found useful. A personal assistant that adapts to your communication style.

None of this is possible without persistent state.

Types of Agent Memory

Production stateful agents typically implement multiple memory types, each serving different purposes:

Short-Term Memory (Working Memory)

Short-term memory maintains immediate context within the current interaction—the information needed right now to complete the current task. When a user says "Book a flight to Paris, then find hotels near the Louvre," short-term memory tracks the flight booking results to inform the hotel search.

Implementation characteristics:

  • Session-scoped, resets when conversation ends
  • Fast access (sub-millisecond latency)
  • Relatively small (recent messages, current task state)
  • Often in-memory or Redis-backed
from dataclasses import dataclass, field
from typing import List, Dict, Any

@dataclass
class WorkingMemory:
    messages: List[Dict[str, str]] = field(default_factory=list)
    current_task: str = ""
    intermediate_results: Dict[str, Any] = field(default_factory=dict)
    active_tools: List[str] = field(default_factory=list)
    
    def add_message(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})
        # Keep only recent messages for working memory
        if len(self.messages) > 20:
            self.messages = self.messages[-20:]
    
    def store_result(self, key: str, value: Any):
        self.intermediate_results[key] = value
    
    def clear(self):
        self.messages = []
        self.current_task = ""
        self.intermediate_results = {}

Long-Term Memory

Long-term memory persists across sessions, surviving system restarts and allowing agents to build on past interactions over weeks or months. This is where user preferences, learned patterns, and accumulated knowledge live.

Implementation characteristics:

  • Persistent storage (Postgres, Redis with persistence, vector databases)
  • Semantic search capabilities for retrieval
  • Consolidation processes to refine raw data into useful knowledge
  • Scales to large amounts of historical data
from typing import List, Optional
import json

class LongTermMemory:
    def __init__(self, db_client, vector_store):
        self.db = db_client
        self.vectors = vector_store
    
    def store_fact(self, user_id: str, fact: str, category: str):
        """Store a learned fact about the user"""
        embedding = self.vectors.embed(fact)
        self.db.execute(
            """INSERT INTO user_facts (user_id, fact, category, embedding, created_at)
               VALUES (%s, %s, %s, %s, NOW())""",
            (user_id, fact, category, embedding)
        )
    
    def recall_relevant(self, user_id: str, query: str, limit: int = 5) -> List[str]:
        """Retrieve facts relevant to the current query"""
        query_embedding = self.vectors.embed(query)
        results = self.db.execute(
            """SELECT fact FROM user_facts 
               WHERE user_id = %s 
               ORDER BY embedding <-> %s 
               LIMIT %s""",
            (user_id, query_embedding, limit)
        )
        return [row["fact"] for row in results]
    
    def update_preference(self, user_id: str, key: str, value: str):
        """Update or insert a user preference"""
        self.db.execute(
            """INSERT INTO user_preferences (user_id, key, value, updated_at)
               VALUES (%s, %s, %s, NOW())
               ON CONFLICT (user_id, key) DO UPDATE SET value = %s, updated_at = NOW()""",
            (user_id, key, value, value)
        )

Episodic Memory

Episodic memory captures specific past experiences with temporal context—not just what happened, but when and in what sequence. This enables agents to reference specific past interactions: "Last Tuesday you asked about that same error..."

Implementation characteristics:

  • Event-sourced or append-only storage
  • Rich metadata (timestamps, session IDs, outcome markers)
  • Temporal queries (what happened in date range, sequence of events)
  • Often implemented as event logs alongside vector storage
from datetime import datetime
from typing import List, Dict

class EpisodicMemory:
    def __init__(self, event_store, vector_store):
        self.events = event_store
        self.vectors = vector_store
    
    def record_episode(self, user_id: str, event_type: str, 
                       content: str, outcome: str = None):
        """Record a specific interaction episode"""
        episode = {
            "user_id": user_id,
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "content": content,
            "outcome": outcome,
            "embedding": self.vectors.embed(content)
        }
        self.events.append(episode)
    
    def recall_similar_episodes(self, user_id: str, situation: str, 
                                 limit: int = 3) -> List[Dict]:
        """Find past episodes similar to current situation"""
        query_embedding = self.vectors.embed(situation)
        return self.events.query(
            user_id=user_id,
            embedding_similarity=query_embedding,
            limit=limit
        )
    
    def get_recent_history(self, user_id: str, days: int = 7) -> List[Dict]:
        """Get chronological history for the past N days"""
        cutoff = datetime.utcnow() - timedelta(days=days)
        return self.events.query(
            user_id=user_id,
            timestamp_after=cutoff,
            order_by="timestamp"
        )

Semantic Memory

Semantic memory stores factual knowledge independent of specific experiences—general information about the domain, user profiles, product specifications. Unlike episodic memory which captures "what happened," semantic memory captures "what is true."

class SemanticMemory:
    def __init__(self, knowledge_base, vector_store):
        self.kb = knowledge_base
        self.vectors = vector_store
    
    def store_fact(self, entity: str, relation: str, value: str, source: str):
        """Store a fact as entity-relation-value triple"""
        self.kb.add_triple(entity, relation, value, source=source)
        # Also store text representation for semantic search
        fact_text = f"{entity} {relation} {value}"
        self.vectors.upsert(
            id=f"{entity}:{relation}",
            embedding=self.vectors.embed(fact_text),
            metadata={"entity": entity, "relation": relation, "value": value}
        )
    
    def query_entity(self, entity: str) -> Dict[str, str]:
        """Get all known facts about an entity"""
        return self.kb.get_relations(entity)
    
    def semantic_search(self, query: str, limit: int = 10) -> List[Dict]:
        """Find facts semantically related to query"""
        return self.vectors.search(
            embedding=self.vectors.embed(query),
            limit=limit
        )

Building Your First Stateful Agent

Let's build a complete stateful agent implementation. We'll use Python with Redis for state storage, demonstrating the core patterns you'll need for production systems.

Step 1: Define the State Schema

First, define what state your agent needs to track:

from dataclasses import dataclass, field, asdict
from typing import List, Dict, Any, Optional
from datetime import datetime
import json

@dataclass
class AgentState:
    """Complete state for a stateful agent"""
    # Identity
    user_id: str
    session_id: str
    
    # Short-term memory
    conversation_history: List[Dict[str, str]] = field(default_factory=list)
    current_task: Optional[str] = None
    pending_actions: List[str] = field(default_factory=list)
    
    # Long-term memory
    user_preferences: Dict[str, Any] = field(default_factory=dict)
    learned_facts: List[str] = field(default_factory=list)
    
    # Metadata
    created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    last_interaction: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    interaction_count: int = 0
    
    def to_json(self) -> str:
        return json.dumps(asdict(self))
    
    @classmethod
    def from_json(cls, data: str) -> "AgentState":
        return cls(**json.loads(data))
    
    def add_message(self, role: str, content: str):
        self.conversation_history.append({
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat()
        })
        self.last_interaction = datetime.utcnow().isoformat()
        self.interaction_count += 1

Step 2: Implement the State Store

Create an abstraction for state persistence:

import redis
from abc import ABC, abstractmethod
from typing import Optional

class StateStore(ABC):
    @abstractmethod
    def get(self, key: str) -> Optional[AgentState]:
        pass
    
    @abstractmethod
    def set(self, key: str, state: AgentState, ttl: int = None):
        pass
    
    @abstractmethod
    def delete(self, key: str):
        pass

class RedisStateStore(StateStore):
    def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
        self.client = redis.Redis(host=host, port=port, db=db)
    
    def _make_key(self, key: str) -> str:
        return f"agent:state:{key}"
    
    def get(self, key: str) -> Optional[AgentState]:
        data = self.client.get(self._make_key(key))
        if data:
            return AgentState.from_json(data.decode())
        return None
    
    def set(self, key: str, state: AgentState, ttl: int = None):
        full_key = self._make_key(key)
        self.client.set(full_key, state.to_json())
        if ttl:
            self.client.expire(full_key, ttl)
    
    def delete(self, key: str):
        self.client.delete(self._make_key(key))
    
    def get_or_create(self, user_id: str, session_id: str) -> AgentState:
        key = f"{user_id}:{session_id}"
        state = self.get(key)
        if not state:
            state = AgentState(user_id=user_id, session_id=session_id)
            self.set(key, state)
        return state

Step 3: Build the Stateful Agent

Now combine everything into a working agent:

from openai import OpenAI

class StatefulAgent:
    def __init__(self, 
                 state_store: StateStore,
                 model: str = "gpt-4",
                 system_prompt: str = None):
        self.state_store = state_store
        self.llm = OpenAI()
        self.model = model
        self.system_prompt = system_prompt or self._default_system_prompt()
    
    def _default_system_prompt(self) -> str:
        return """You are a helpful assistant with memory capabilities.
You remember past conversations and user preferences.
Use the provided context to give personalized, relevant responses."""
    
    def _build_messages(self, state: AgentState, user_input: str) -> List[Dict]:
        messages = [{"role": "system", "content": self.system_prompt}]
        
        # Add context from long-term memory
        if state.user_preferences:
            pref_context = "User preferences: " + json.dumps(state.user_preferences)
            messages.append({"role": "system", "content": pref_context})
        
        if state.learned_facts:
            facts_context = "Known facts about user: " + "; ".join(state.learned_facts[-10:])
            messages.append({"role": "system", "content": facts_context})
        
        # Add conversation history (last 10 messages for context window management)
        for msg in state.conversation_history[-10:]:
            messages.append({"role": msg["role"], "content": msg["content"]})
        
        # Add current user input
        messages.append({"role": "user", "content": user_input})
        
        return messages
    
    def _extract_learnings(self, user_input: str, response: str, state: AgentState):
        """Extract facts and preferences from the conversation"""
        # This is a simplified version - production systems would use
        # more sophisticated extraction (embeddings, classification, etc.)
        
        # Example: detect preference statements
        preference_indicators = ["i prefer", "i like", "i want", "always", "never"]
        lower_input = user_input.lower()
        for indicator in preference_indicators:
            if indicator in lower_input:
                state.learned_facts.append(f"User stated: {user_input}")
                break
    
    def chat(self, user_id: str, session_id: str, user_input: str) -> str:
        # Load state
        key = f"{user_id}:{session_id}"
        state = self.state_store.get_or_create(user_id, session_id)
        
        # Build context-aware prompt
        messages = self._build_messages(state, user_input)
        
        # Generate response
        response = self.llm.chat.completions.create(
            model=self.model,
            messages=messages
        )
        assistant_message = response.choices[0].message.content
        
        # Update state
        state.add_message("user", user_input)
        state.add_message("assistant", assistant_message)
        
        # Extract learnings for long-term memory
        self._extract_learnings(user_input, assistant_message, state)
        
        # Persist state
        self.state_store.set(key, state)
        
        return assistant_message
    
    def update_preference(self, user_id: str, session_id: str, 
                          key: str, value: Any):
        """Explicitly update a user preference"""
        state_key = f"{user_id}:{session_id}"
        state = self.state_store.get_or_create(user_id, session_id)
        state.user_preferences[key] = value
        self.state_store.set(state_key, state)

Step 4: Add Semantic Retrieval

For production systems, you'll want vector-based retrieval for long-term memory:

from sentence_transformers import SentenceTransformer
import numpy as np

class VectorMemory:
    def __init__(self, redis_client, embedding_model: str = "all-MiniLM-L6-v2"):
        self.redis = redis_client
        self.encoder = SentenceTransformer(embedding_model)
    
    def store(self, user_id: str, text: str, category: str = "general"):
        """Store a memory with its embedding"""
        embedding = self.encoder.encode(text).tolist()
        memory_id = f"mem:{user_id}:{hash(text)}"
        
        self.redis.hset(memory_id, mapping={
            "text": text,
            "category": category,
            "embedding": json.dumps(embedding),
            "created_at": datetime.utcnow().isoformat()
        })
        
        # Add to user's memory index
        self.redis.sadd(f"user:memories:{user_id}", memory_id)
    
    def search(self, user_id: str, query: str, limit: int = 5) -> List[str]:
        """Find memories semantically similar to query"""
        query_embedding = self.encoder.encode(query)
        
        # Get all user memories
        memory_ids = self.redis.smembers(f"user:memories:{user_id}")
        
        results = []
        for mem_id in memory_ids:
            data = self.redis.hgetall(mem_id)
            if data:
                mem_embedding = np.array(json.loads(data[b"embedding"]))
                similarity = np.dot(query_embedding, mem_embedding) / (
                    np.linalg.norm(query_embedding) * np.linalg.norm(mem_embedding)
                )
                results.append((similarity, data[b"text"].decode()))
        
        # Return top-k most similar
        results.sort(reverse=True, key=lambda x: x[0])
        return [text for _, text in results[:limit]]

State Graphs for Complex Workflows

Simple conversational agents can manage state linearly, but complex workflows—multi-step processes, decision trees, parallel tasks—benefit from explicit state graph definitions.

A state graph defines possible states your agent can be in and valid transitions between them. Each node represents a workflow state, and edges represent transitions triggered by user input, tool results, or other events.

Why State Graphs Matter

State graphs provide:

  1. Explicit control flow: The agent knows exactly which actions are valid at each step
  2. Resumability: After failures, the agent knows exactly where to resume
  3. Coordination: In multi-agent systems, state graphs make handoffs explicit
  4. Auditability: Every transition is logged for debugging and compliance

Implementing with LangGraph

LangGraph is a popular framework for building state-driven agents:

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator

class WorkflowState(TypedDict):
    messages: Annotated[List[str], operator.add]
    current_step: str
    collected_data: dict
    is_complete: bool

def research_node(state: WorkflowState) -> WorkflowState:
    """Gather information about the user's request"""
    # Perform research
    research_results = perform_research(state["messages"][-1])
    return {
        "messages": [f"Research complete: {research_results}"],
        "current_step": "analyze",
        "collected_data": {"research": research_results},
        "is_complete": False
    }

def analyze_node(state: WorkflowState) -> WorkflowState:
    """Analyze gathered information"""
    analysis = analyze_data(state["collected_data"]["research"])
    return {
        "messages": [f"Analysis: {analysis}"],
        "current_step": "respond",
        "collected_data": {**state["collected_data"], "analysis": analysis},
        "is_complete": False
    }

def respond_node(state: WorkflowState) -> WorkflowState:
    """Generate final response"""
    response = generate_response(
        state["collected_data"]["research"],
        state["collected_data"]["analysis"]
    )
    return {
        "messages": [response],
        "current_step": "complete",
        "collected_data": state["collected_data"],
        "is_complete": True
    }

def should_continue(state: WorkflowState) -> str:
    """Determine next step based on current state"""
    if state["is_complete"]:
        return END
    return state["current_step"]

# Build the graph
workflow = StateGraph(WorkflowState)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("respond", respond_node)

workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "respond")
workflow.add_conditional_edges("respond", should_continue)

workflow.set_entry_point("research")
graph = workflow.compile()

Five Failure Modes to Avoid

Building stateful agents introduces failure modes that don't exist in stateless systems. Here are the five most common production failures and how to prevent them:

1. Stale State Reads

When your agent loads state from storage, that data might already be outdated. Another process updated the record. Another agent modified shared state. A state transition happened in a parallel request.

The danger: The agent acts confidently on outdated information without knowing it's stale.

Mitigation strategies:

  • Version state with timestamps or sequence numbers
  • Use compare-and-swap operations for writes
  • For critical decisions, read state at decision time, not at request start
  • Implement optimistic locking for concurrent access
def update_state_with_version(self, key: str, update_fn, max_retries: int = 3):
    """Update state with optimistic locking"""
    for attempt in range(max_retries):
        state = self.get(key)
        original_version = state.version
        
        # Apply update
        updated_state = update_fn(state)
        updated_state.version = original_version + 1
        
        # Attempt conditional write
        success = self.conditional_set(
            key, 
            updated_state, 
            expected_version=original_version
        )
        
        if success:
            return updated_state
        
        # Retry on conflict
        time.sleep(0.1 * (2 ** attempt))
    
    raise ConcurrencyError(f"Failed to update {key} after {max_retries} attempts")

2. Partial State Updates

Stateful agents often update multiple pieces of state in a single interaction. If the agent updates Redis successfully but the Postgres write fails, you have inconsistent state.

Mitigation strategies:

  • Use atomic transactions where possible
  • Implement saga patterns for distributed state
  • Design for eventual consistency with reconciliation
  • Store all state updates as a single atomic document
def atomic_state_update(self, key: str, updates: dict):
    """Update multiple state fields atomically"""
    with self.redis.pipeline() as pipe:
        try:
            pipe.watch(key)
            current_state = json.loads(pipe.get(key) or "{}")
            
            # Merge updates
            new_state = {**current_state, **updates, "updated_at": datetime.utcnow().isoformat()}
            
            pipe.multi()
            pipe.set(key, json.dumps(new_state))
            pipe.execute()
        except redis.WatchError:
            # State changed during update, retry
            return self.atomic_state_update(key, updates)

3. Race Conditions in Multi-Agent Systems

When multiple agents or processes share state, concurrent access creates race conditions. Two agents read the same state, both make decisions, and their writes conflict.

Mitigation strategies:

  • Use distributed locks for critical sections
  • Implement message queues for serialized processing
  • Design agents with clear ownership boundaries
  • Use event sourcing to merge concurrent updates
import redis.lock

def with_state_lock(self, user_id: str, timeout: int = 30):
    """Context manager for exclusive state access"""
    lock_key = f"lock:agent:state:{user_id}"
    lock = self.redis.lock(lock_key, timeout=timeout)
    
    try:
        if lock.acquire(blocking=True, blocking_timeout=5):
            yield
        else:
            raise LockTimeoutError(f"Could not acquire lock for {user_id}")
    finally:
        if lock.owned():
            lock.release()

4. Context Drift and Prompt Pollution

As state accumulates, the context injected into prompts can drift from what's actually relevant. Old preferences override new ones. Irrelevant facts clutter the context. The agent's behavior becomes unpredictable.

Mitigation strategies:

  • Implement relevance scoring for context selection
  • Use recency weighting for memory retrieval
  • Periodically consolidate and prune old state
  • Separate "must include" context from "optionally relevant" context
def build_relevant_context(self, state: AgentState, query: str) -> str:
    """Select only relevant context for the current query"""
    context_parts = []
    
    # Always include recent conversation (high relevance)
    recent_messages = state.conversation_history[-5:]
    context_parts.append("Recent conversation:\n" + format_messages(recent_messages))
    
    # Semantic search for relevant long-term memories
    relevant_facts = self.vector_memory.search(
        user_id=state.user_id,
        query=query,
        limit=5
    )
    if relevant_facts:
        context_parts.append("Relevant context:\n" + "\n".join(relevant_facts))
    
    # Include active preferences (recency-weighted)
    active_prefs = self.get_active_preferences(state, max_age_days=30)
    if active_prefs:
        context_parts.append("User preferences:\n" + format_preferences(active_prefs))
    
    return "\n\n".join(context_parts)

5. Lost State Across Retries and Failures

When an agent call fails mid-execution, what happens to the state changes it already made? Naive retry logic can cause duplicate writes, missed updates, or corrupted state.

Mitigation strategies:

  • Make state updates idempotent
  • Use checkpointing for multi-step operations
  • Implement dead letter queues for failed operations
  • Design recovery procedures that can resume from checkpoints
class CheckpointedAgent:
    def execute_with_checkpoints(self, task_id: str, steps: List[Callable]):
        """Execute multi-step task with checkpointing"""
        checkpoint = self.load_checkpoint(task_id)
        start_step = checkpoint.get("completed_steps", 0)
        
        for i, step in enumerate(steps[start_step:], start=start_step):
            try:
                result = step()
                
                # Checkpoint after each successful step
                self.save_checkpoint(task_id, {
                    "completed_steps": i + 1,
                    "last_result": result,
                    "timestamp": datetime.utcnow().isoformat()
                })
            except Exception as e:
                # Log failure with checkpoint state for recovery
                self.log_failure(task_id, i, e)
                raise
        
        # Clear checkpoint on completion
        self.clear_checkpoint(task_id)

Choosing Your Storage Backend

The choice of state storage backend significantly impacts your agent's performance, reliability, and operational complexity.

Redis: Best for Speed

Pros:

  • Sub-millisecond latency for state operations
  • Built-in data structures (hashes, lists, sorted sets)
  • Pub/Sub for multi-agent coordination
  • Vector search capabilities (Redis Stack)

Cons:

  • Memory constraints for large state
  • Requires persistence configuration for durability
  • More operational complexity than managed services

Use when: Latency is critical, state fits in memory, you need real-time coordination.

PostgreSQL: Best for Durability

Pros:

  • ACID guarantees for state consistency
  • Complex queries across state data
  • pgvector for semantic search
  • Battle-tested reliability

Cons:

  • Higher latency than in-memory stores
  • Schema migrations for evolving state
  • Connection management overhead

Use when: State must survive failures, you need complex queries, regulatory compliance matters.

Hybrid Architecture: Best of Both

For production systems, combine both:

class HybridStateStore:
    def __init__(self, redis_client, postgres_pool):
        self.redis = redis_client
        self.pg = postgres_pool
    
    def get(self, key: str) -> AgentState:
        # Try Redis first (hot cache)
        cached = self.redis.get(f"state:{key}")
        if cached:
            return AgentState.from_json(cached)
        
        # Fall back to Postgres (cold storage)
        with self.pg.connection() as conn:
            row = conn.execute(
                "SELECT state_data FROM agent_states WHERE key = %s",
                (key,)
            ).fetchone()
            if row:
                state = AgentState.from_json(row[0])
                # Warm the cache
                self.redis.setex(f"state:{key}", 3600, row[0])
                return state
        
        return None
    
    def set(self, key: str, state: AgentState):
        state_json = state.to_json()
        
        # Write to Postgres (durability)
        with self.pg.connection() as conn:
            conn.execute(
                """INSERT INTO agent_states (key, state_data, updated_at)
                   VALUES (%s, %s, NOW())
                   ON CONFLICT (key) DO UPDATE 
                   SET state_data = %s, updated_at = NOW()""",
                (key, state_json, state_json)
            )
        
        # Update Redis (speed)
        self.redis.setex(f"state:{key}", 3600, state_json)

Scaling Stateful Agents

As your application grows, state management becomes more challenging. Here are patterns for scaling:

Partitioning State

Shard state by user ID or tenant to distribute load:

def get_shard(self, user_id: str, num_shards: int = 16) -> int:
    """Consistent hashing for state sharding"""
    return int(hashlib.md5(user_id.encode()).hexdigest(), 16) % num_shards

def get_state_store(self, user_id: str) -> StateStore:
    """Get the appropriate shard for this user"""
    shard = self.get_shard(user_id)
    return self.shards[shard]

Async State Operations

Don't block on state writes for non-critical updates:

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncStateManager:
    def __init__(self, state_store: StateStore):
        self.store = state_store
        self.executor = ThreadPoolExecutor(max_workers=10)
        self.write_queue = asyncio.Queue()
    
    async def async_set(self, key: str, state: AgentState):
        """Non-blocking state write"""
        await self.write_queue.put((key, state))
    
    async def process_writes(self):
        """Background task to process queued writes"""
        while True:
            key, state = await self.write_queue.get()
            await asyncio.get_event_loop().run_in_executor(
                self.executor,
                self.store.set,
                key,
                state
            )
            self.write_queue.task_done()

State Compression and Archival

For long-running agents, compress and archive old state:

def compress_conversation_history(self, state: AgentState, max_messages: int = 50):
    """Summarize and compress old conversation history"""
    if len(state.conversation_history) <= max_messages:
        return state
    
    # Keep recent messages
    recent = state.conversation_history[-max_messages:]
    old = state.conversation_history[:-max_messages]
    
    # Summarize old messages
    summary = self.llm.summarize(
        f"Summarize this conversation history:\n{format_messages(old)}"
    )
    
    # Archive old messages
    self.archive_store.store(
        user_id=state.user_id,
        session_id=state.session_id,
        messages=old
    )
    
    # Update state with summary and recent messages
    state.conversation_history = [
        {"role": "system", "content": f"Previous conversation summary: {summary}"}
    ] + recent
    
    return state

Integrating with External Context Sources

Stateful agents become more powerful when connected to external knowledge sources. This is where platforms like Dytto excel—providing a unified context layer that persists user information, preferences, and behavioral patterns across all your AI applications.

The Context Layer Pattern

Instead of each agent maintaining its own isolated state, a context layer provides:

  1. Unified user profiles across applications
  2. Consistent preferences that follow users between services
  3. Behavioral patterns learned from historical interactions
  4. Real-time context (location, weather, calendar) for personalization
class ContextEnrichedAgent:
    def __init__(self, state_store: StateStore, context_api: ContextAPI):
        self.state_store = state_store
        self.context = context_api
    
    async def chat(self, user_id: str, session_id: str, user_input: str) -> str:
        # Load local state
        state = self.state_store.get_or_create(user_id, session_id)
        
        # Enrich with external context
        user_context = await self.context.get_context(user_id)
        
        # Build prompt with all available context
        messages = self._build_messages(
            state=state,
            user_input=user_input,
            preferences=user_context.get("preferences", {}),
            recent_activity=user_context.get("recent_activity", []),
            patterns=user_context.get("behavioral_patterns", {})
        )
        
        response = await self.llm.generate(messages)
        
        # Update both local state and shared context
        state.add_message("user", user_input)
        state.add_message("assistant", response)
        self.state_store.set(f"{user_id}:{session_id}", state)
        
        # Push learned facts to context layer
        facts = self._extract_facts(user_input, response)
        if facts:
            await self.context.store_facts(user_id, facts)
        
        return response

When to Use External Context APIs

Build your own state management when:

  • You need complete control over data storage
  • Your use case is simple (single app, limited state)
  • You're prototyping or in early development

Use a context API like Dytto when:

  • Multiple applications need to share user context
  • You want consistent personalization across services
  • You need enterprise-grade context management without building it
  • Your agents should learn from user behavior over time

Conclusion

Stateful AI agents represent the next evolution in LLM applications. While the underlying models remain stateless, the infrastructure you build around them determines whether your agent forgets everything between requests or develops genuine understanding over time.

The key principles to remember:

  1. State is explicit: LLMs don't remember anything unless you build persistence
  2. Multiple memory types serve different purposes: short-term for sessions, long-term for learning, episodic for temporal context
  3. State graphs tame complexity: For multi-step workflows, explicit state machines beat implicit control flow
  4. Failure modes are predictable: Stale reads, partial updates, race conditions, context drift, and lost state across retries all have established solutions
  5. Storage choice matters: Redis for speed, Postgres for durability, hybrid for production

Start simple with session-scoped state storage. Add long-term memory when you need cross-session learning. Implement state graphs when workflows grow complex. And always design for the failure modes that will inevitably occur in production.

The agents that truly remember—that learn from experience and develop deeper understanding over time—are the ones that will provide lasting value. Building that capability isn't magic; it's careful state management infrastructure.


Building stateful AI agents for your application? Dytto provides the context layer infrastructure that makes personalization across sessions, devices, and applications seamless. Check out our API documentation to get started.

All posts
Published on