Loading...
All Articles
AI Infrastructure · 9 min read

Reliable AI Agents with Temporal and LangGraph

Durable, retryable, observable AI agents built by combining Temporal workflows with LangGraph reasoning. Handles LLM failures, long-running tool calls, and saga-style compensation.

The Problem Agents Have

Every team that's shipped an autonomous or semi-autonomous agent to production has hit the same wall. The happy path is a demo. The real world is a mess of partial failures: the LLM call times out halfway through a 20-step plan, a tool returns a 503, the model produces a slightly wrong JSON that breaks the parser, the process restarts and loses state, a downstream service is rate-limited, the user walks away.

Most agent frameworks — LangGraph, CrewAI, AutoGen — give you a beautiful DAG of reasoning and a nightmare when any node fails. They're designed around the reasoning model, not around the durability model. You end up re-implementing retry logic, checkpointing, compensation, and observability by hand, poorly.

This post is about the pattern we use in production: LangGraph as the reasoning layer, Temporal as the durability layer. LangGraph decides what to do; Temporal ensures it actually gets done. The combination is more boring and more reliable than either alone.

Temporal In Two Paragraphs

Temporal is a durable execution engine. You write code that looks like a regular function — sequential, imperative, typed — but Temporal intercepts every external call and persists its inputs, outputs, and intermediate state to a database. If the worker process crashes mid-function, Temporal replays the function on a new worker and it continues from where it left off, deterministic and invisible to the caller.

The two primitives are workflows and activities. Workflows are the orchestration code — durable, deterministic, long-running. Activities are the side effects — API calls, database writes, LLM completions. Activities can be retried independently of the workflow. A workflow that runs for three weeks and survives a cluster failure is a supported use case, not a party trick.

Why This Fits Agents So Well

Agents are, structurally, long-running orchestrations of non-deterministic steps with heavy retry requirements. They are Temporal workflows wearing an LLM hat. Specifically:

  • LLM calls are activities. Retry on transient failure, timeout, invalid output.
  • Tool calls are activities. Same reasoning.
  • The reasoning loop is a workflow. It survives crashes; checkpoints are free.
  • Human approval is a signal. The workflow blocks on a Temporal signal, waits hours or days, and continues.
  • Compensation is a saga. If step 7 fails after step 5 charged a credit card, you run a compensation activity to refund.

LangGraph, in this architecture, is a library called from a workflow, not a runtime. You use its state graph and reasoning patterns to decide the next action; Temporal executes it.

A Minimal Agent Workflow

Let's build a customer-support agent. It searches a knowledge base, calls a ticketing API, optionally escalates to a human, and sends a reply email. We'll use the Temporal Python SDK (1.8+) and LangGraph 0.3+.

Activities

Activities are just Python functions with a decorator. They run in a separate process from the workflow for safety.

from __future__ import annotations

from dataclasses import dataclass

from langchain_anthropic import ChatAnthropic
from temporalio import activity


@dataclass
class LLMRequest:
    system: str
    user: str
    model: str = "claude-sonnet-4-6-20260115"


@dataclass
class LLMResponse:
    text: str
    input_tokens: int
    output_tokens: int


@activity.defn
async def call_llm(req: LLMRequest) -> LLMResponse:
    llm = ChatAnthropic(model=req.model, max_tokens=1024)
    result = await llm.ainvoke(
        [
            ("system", req.system),
            ("user", req.user),
        ]
    )
    return LLMResponse(
        text=result.content,
        input_tokens=result.usage_metadata["input_tokens"],
        output_tokens=result.usage_metadata["output_tokens"],
    )


@dataclass
class SearchArgs:
    query: str
    limit: int = 5


@dataclass
class KBResult:
    title: str
    url: str
    snippet: str


@activity.defn
async def search_kb(args: SearchArgs) -> list[KBResult]:
    # Real implementation calls your search backend
    return [
        KBResult(
            title="Reset password",
            url="https://kb.example.com/reset",
            snippet="To reset your password...",
        ),
    ]


@dataclass
class Ticket:
    id: str
    title: str
    body: str
    email: str


@activity.defn
async def create_ticket(t: Ticket) -> str:
    # Real implementation calls your ticketing API
    return f"T-{hash(t.title) % 100000}"


@dataclass
class Email:
    to: str
    subject: str
    body: str


@activity.defn
async def send_email(msg: Email) -> None:
    # Real implementation calls SES/Mailgun/etc
    activity.logger.info("would send email", extra={"to": msg.to})

Retry Policies

Every activity gets an explicit retry policy. LLM calls need aggressive retry because providers fail with 429s and 5xxs regularly. Ticket creation needs idempotency, not just retry.

from datetime import timedelta

from temporalio.common import RetryPolicy

LLM_RETRY = RetryPolicy(
    initial_interval=timedelta(seconds=1),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(seconds=30),
    maximum_attempts=6,
    non_retryable_error_types=["ValueError"],
)

TOOL_RETRY = RetryPolicy(
    initial_interval=timedelta(seconds=2),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(seconds=60),
    maximum_attempts=4,
)

The Workflow

The workflow glues it together. Note that LangGraph's StateGraph is constructed inside the workflow but its nodes delegate to Temporal activities rather than calling LLMs directly.

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import timedelta
from typing import Literal

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from langgraph.graph import StateGraph, END
    from .activities import (
        call_llm,
        search_kb,
        create_ticket,
        send_email,
        LLMRequest,
        SearchArgs,
        Ticket,
        Email,
    )
    from .retry import LLM_RETRY, TOOL_RETRY


@dataclass
class SupportInput:
    customer_email: str
    message: str


@dataclass
class AgentState:
    customer_email: str
    message: str
    kb_results: list = field(default_factory=list)
    draft_reply: str = ""
    ticket_id: str | None = None
    needs_human: bool = False


@workflow.defn
class SupportAgent:
    def __init__(self) -> None:
        self._human_approval: Literal["approve", "reject"] | None = None

    @workflow.signal
    def human_decision(self, decision: Literal["approve", "reject"]) -> None:
        self._human_approval = decision

    @workflow.run
    async def run(self, input: SupportInput) -> str:
        state = AgentState(
            customer_email=input.customer_email,
            message=input.message,
        )

        kb_results = await workflow.execute_activity(
            search_kb,
            SearchArgs(query=input.message, limit=5),
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=TOOL_RETRY,
        )
        state.kb_results = kb_results

        classify = await workflow.execute_activity(
            call_llm,
            LLMRequest(
                system="You classify support messages. Respond with one of: faq, account, billing, escalate",
                user=input.message,
            ),
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=LLM_RETRY,
        )
        category = classify.text.strip().lower()

        if category == "escalate":
            state.needs_human = True
            state.ticket_id = await workflow.execute_activity(
                create_ticket,
                Ticket(
                    id="",
                    title=f"Escalated: {input.message[:60]}",
                    body=input.message,
                    email=input.customer_email,
                ),
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )
            return f"escalated:{state.ticket_id}"

        draft = await workflow.execute_activity(
            call_llm,
            LLMRequest(
                system=_draft_prompt(state.kb_results),
                user=input.message,
            ),
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=LLM_RETRY,
        )
        state.draft_reply = draft.text

        await workflow.wait_condition(
            lambda: self._human_approval is not None,
            timeout=timedelta(hours=24),
        )

        if self._human_approval == "reject":
            return "rejected"

        await workflow.execute_activity(
            send_email,
            Email(
                to=state.customer_email,
                subject="Re: your request",
                body=state.draft_reply,
            ),
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=TOOL_RETRY,
        )

        return "sent"


def _draft_prompt(results) -> str:
    joined = "\n".join(f"- {r.title}: {r.snippet}" for r in results)
    return (
        "You are a support assistant. Use the following knowledge base entries "
        f"to draft a reply:\n{joined}\n"
        "Be concise and friendly. If you are not sure, say so."
    )

There's a lot in there, so let's call out the important parts.

  • No LLM calls inside the workflow itself. All model calls go through execute_activity. This is non-negotiable — workflows must be deterministic.
  • Every activity has a timeout and a retry policy. Temporal will not retry without one, and it won't time out without one.
  • The human approval step is a signal with a 24-hour timeout. The workflow persists while waiting. Cluster restart doesn't matter.
  • Failures are structured. non_retryable_error_types lets you distinguish "network blip" from "model refused."

Saga Pattern For Compensation

A more advanced case: an agent that takes an action halfway through and needs to roll back if a later step fails. The classic saga pattern. Temporal handles it gracefully because you can register compensation logic with try/except.

@workflow.defn
class PurchaseAgent:
    @workflow.run
    async def run(self, input: "PurchaseInput") -> str:
        compensations: list = []
        try:
            reservation_id = await workflow.execute_activity(
                reserve_inventory,
                input.sku,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )
            compensations.append(("release", reservation_id))

            charge_id = await workflow.execute_activity(
                charge_card,
                input.payment,
                start_to_close_timeout=timedelta(seconds=60),
                retry_policy=TOOL_RETRY,
            )
            compensations.append(("refund", charge_id))

            shipment = await workflow.execute_activity(
                create_shipment,
                input.address,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )

            return f"shipped:{shipment}"

        except Exception:
            for kind, handle in reversed(compensations):
                if kind == "release":
                    await workflow.execute_activity(
                        release_inventory,
                        handle,
                        start_to_close_timeout=timedelta(seconds=30),
                    )
                elif kind == "refund":
                    await workflow.execute_activity(
                        refund_charge,
                        handle,
                        start_to_close_timeout=timedelta(seconds=30),
                    )
            raise

This is the only correct way we've found to give an agent the ability to write-and-undo. Hand-rolling it in application code is a trail of tears.

Handling Non-Determinism

Workflows in Temporal must be deterministic on replay. LLM outputs are not deterministic. This is fine because you put LLM calls in activities, and Temporal persists their results. On replay, the SDK returns the persisted output instead of calling the LLM again.

The things that will break your workflow:

  • Calling random.random() or time.time() directly — use workflow.random() and workflow.now().
  • Using threads or asyncio constructs that aren't deterministic.
  • Importing libraries with side effects at module load. (Use workflow.unsafe.imports_passed_through().)
  • Mutating global state inside the workflow function.

If you follow the rules, LLM non-determinism is invisible. If you don't, you'll hit NonDeterminismError on replay and have to debug it.

Testing With Time-Skipping

Temporal's test environment lets you skip time. A workflow that waits 24 hours for approval can be tested in milliseconds.

import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from app.activities import search_kb, call_llm, create_ticket, send_email
from app.workflows import SupportAgent, SupportInput


@pytest.mark.asyncio
async def test_support_agent_approval() -> None:
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="support",
            workflows=[SupportAgent],
            activities=[search_kb, call_llm, create_ticket, send_email],
        ):
            handle = await env.client.start_workflow(
                SupportAgent.run,
                SupportInput(
                    customer_email="[email protected]",
                    message="I cannot log in",
                ),
                id="test-support-1",
                task_queue="support",
            )
            await handle.signal(SupportAgent.human_decision, "approve")
            result = await handle.result()
            assert result == "sent"

start_time_skipping() replaces the normal clock, so wait_condition(timeout=24h) returns immediately if the signal has been received. Testing a multi-day workflow in a unit test is a real thing.

You will still want to mock the LLM activity — either by registering a test activity with the same signature, or by using Temporal's activity replacement feature.

Observability

Temporal ships with a Web UI that shows every workflow's history: events, inputs, outputs, retries, timings. For agents this is gold — you can replay exactly what the LLM saw and exactly what it said.

Wire OpenTelemetry in on top and you get the spans in your regular tracing backend too:

from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.client import Client

client = await Client.connect(
    "temporal:7233",
    interceptors=[TracingInterceptor()],
)

The resulting traces show the workflow as a parent span with each activity as a child — exactly what you want for debugging "why did the agent do that."

When Not To Use This Stack

Temporal is overkill for a single-turn LLM call behind an HTTP endpoint. Use Temporal when:

  • The workflow has multiple steps with external side effects.
  • You need retry or compensation semantics.
  • Workflows are long-running (minutes to days).
  • Reliability matters more than latency.
  • You need a full audit trail of every action.

Skip Temporal when:

  • The agent is one LLM call and a response.
  • Sub-second latency is critical and adding a workflow hop is unacceptable.
  • You don't yet understand determinism constraints — walk before running.

Next Steps

Agents will move from demo to production in every org over the next year, and the teams that ship reliably will be the ones who took orchestration seriously from the start. Temporal plus LangGraph is the most pragmatic combination we've found: LangGraph for the reasoning and graph structure, Temporal for the durability and retries. Start with a simple workflow, get the retries and timeouts right, then add compensation and human-in-the-loop. If you want help architecting agent systems on Temporal, get in touch.

filed under
temporallanggraphagentsworkflowsai
work with us

Want our team to help with your infrastructure?

talk to an engineerFree 30-min discovery callBook
close