Loading...
Alle Artikel
AI Infrastructure · 9 min read

Zuverlässige AI-Agents mit Temporal und LangGraph

Durable, retryable, observable AI-Agents durch die Kombination von Temporal-Workflows mit LangGraph-Reasoning. Beherrscht LLM-Fehler, langlaufende Tool-Calls und Saga-Kompensation.

Das Problem, das Agents haben

Jedes Team, das einen autonomen oder halbautonomen Agent in Produktion gebracht hat, ist gegen dieselbe Wand gelaufen. Der Happy Path ist ein Demo. Die reale Welt ist ein Sammelsurium von Teilfehlern: Der LLM-Call läuft mitten in einem 20-Schritt-Plan in ein Timeout, ein Tool liefert 503 zurück, das Modell erzeugt ein leicht falsches JSON, das den Parser sprengt, der Prozess startet neu und verliert State, ein Downstream-Service wird rate-limited, der Nutzer läuft weg.

Die meisten Agent-Frameworks – LangGraph, CrewAI, AutoGen – geben Ihnen einen schönen Reasoning-DAG und einen Albtraum, sobald ein Node ausfällt. Sie sind um das Reasoning-Modell herum entworfen, nicht um das Durability-Modell. Am Ende reimplementieren Sie Retry-Logik, Checkpointing, Kompensation und Observability von Hand – schlecht.

Dieser Beitrag beschreibt das Muster, das wir in Produktion einsetzen: LangGraph als Reasoning-Schicht, Temporal als Durability-Schicht. LangGraph entscheidet, was zu tun ist; Temporal sorgt dafür, dass es tatsächlich getan wird. Die Kombination ist langweiliger und zuverlässiger als jede der beiden allein.

Temporal in zwei Absätzen

Temporal ist eine Durable-Execution-Engine. Sie schreiben Code, der wie eine normale Funktion aussieht – sequenziell, imperativ, typisiert –, aber Temporal fängt jeden externen Aufruf ab und persistiert dessen Inputs, Outputs und Zwischenzustand in eine Datenbank. Stürzt der Worker-Prozess mitten in der Funktion ab, replay’t Temporal die Funktion auf einem neuen Worker, und sie läuft dort weiter, wo sie stehen geblieben war – deterministisch und für den Aufrufer unsichtbar.

Die zwei Primitiven sind Workflows und Activities. Workflows sind der Orchestrierungscode – durable, deterministisch, langlaufend. Activities sind die Side Effects – API-Aufrufe, DB-Writes, LLM-Completions. Activities können unabhängig vom Workflow gerettet werden. Ein Workflow, der drei Wochen läuft und einen Cluster-Ausfall überlebt, ist ein unterstützter Anwendungsfall, kein Partytrick.

Warum das so gut zu Agents passt

Agents sind strukturell langlaufende Orchestrierungen nicht-deterministischer Schritte mit hohen Retry-Anforderungen. Sie sind Temporal-Workflows mit LLM-Hut. Konkret:

  • LLM-Calls sind Activities. Retry bei transienten Fehlern, Timeout, invalidem Output.
  • Tool-Calls sind Activities. Gleiche Logik.
  • Der Reasoning-Loop ist ein Workflow. Er überlebt Crashes; Checkpoints sind gratis.
  • Menschliche Freigabe ist ein Signal. Der Workflow blockt auf ein Temporal-Signal, wartet Stunden oder Tage und läuft weiter.
  • Kompensation ist eine Saga. Scheitert Schritt 7, nachdem Schritt 5 eine Kreditkarte belastet hat, führen Sie eine Kompensations-Activity aus, um zu erstatten.

LangGraph ist in dieser Architektur eine Library, die aus einem Workflow heraus aufgerufen wird, keine Runtime. Sie nutzen seinen State Graph und die Reasoning-Muster, um die nächste Aktion zu entscheiden; Temporal führt sie aus.

Ein minimaler Agent-Workflow

Bauen wir einen Customer-Support-Agent. Er durchsucht eine Knowledge Base, ruft eine Ticketing-API auf, eskaliert optional an einen Menschen und verschickt eine Antwort-Mail. Wir verwenden das Temporal Python SDK (1.8+) und LangGraph 0.3+.

Activities

Activities sind einfach Python-Funktionen mit einem Decorator. Sie laufen aus Sicherheitsgründen in einem eigenen Prozess.

from __future__ import annotations

from dataclasses import dataclass

from langchain_anthropic import ChatAnthropic
from temporalio import activity


@dataclass
class LLMRequest:
    system: str
    user: str
    model: str = "claude-sonnet-4-6-20260115"


@dataclass
class LLMResponse:
    text: str
    input_tokens: int
    output_tokens: int


@activity.defn
async def call_llm(req: LLMRequest) -> LLMResponse:
    llm = ChatAnthropic(model=req.model, max_tokens=1024)
    result = await llm.ainvoke(
        [
            ("system", req.system),
            ("user", req.user),
        ]
    )
    return LLMResponse(
        text=result.content,
        input_tokens=result.usage_metadata["input_tokens"],
        output_tokens=result.usage_metadata["output_tokens"],
    )


@dataclass
class SearchArgs:
    query: str
    limit: int = 5


@dataclass
class KBResult:
    title: str
    url: str
    snippet: str


@activity.defn
async def search_kb(args: SearchArgs) -> list[KBResult]:
    # Real implementation calls your search backend
    return [
        KBResult(
            title="Reset password",
            url="https://kb.example.com/reset",
            snippet="To reset your password...",
        ),
    ]


@dataclass
class Ticket:
    id: str
    title: str
    body: str
    email: str


@activity.defn
async def create_ticket(t: Ticket) -> str:
    # Real implementation calls your ticketing API
    return f"T-{hash(t.title) % 100000}"


@dataclass
class Email:
    to: str
    subject: str
    body: str


@activity.defn
async def send_email(msg: Email) -> None:
    # Real implementation calls SES/Mailgun/etc
    activity.logger.info("would send email", extra={"to": msg.to})

Retry Policies

Jede Activity bekommt eine explizite Retry Policy. LLM-Calls brauchen aggressives Retry, weil Anbieter regelmäßig 429 und 5xx werfen. Ticket-Erstellung braucht Idempotenz, nicht nur Retry.

from datetime import timedelta

from temporalio.common import RetryPolicy

LLM_RETRY = RetryPolicy(
    initial_interval=timedelta(seconds=1),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(seconds=30),
    maximum_attempts=6,
    non_retryable_error_types=["ValueError"],
)

TOOL_RETRY = RetryPolicy(
    initial_interval=timedelta(seconds=2),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(seconds=60),
    maximum_attempts=4,
)

Der Workflow

Der Workflow fügt alles zusammen. Beachten Sie, dass LangGraphs StateGraph im Workflow konstruiert wird, aber seine Nodes an Temporal-Activities delegieren, statt direkt LLMs aufzurufen.

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import timedelta
from typing import Literal

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from langgraph.graph import StateGraph, END
    from .activities import (
        call_llm,
        search_kb,
        create_ticket,
        send_email,
        LLMRequest,
        SearchArgs,
        Ticket,
        Email,
    )
    from .retry import LLM_RETRY, TOOL_RETRY


@dataclass
class SupportInput:
    customer_email: str
    message: str


@dataclass
class AgentState:
    customer_email: str
    message: str
    kb_results: list = field(default_factory=list)
    draft_reply: str = ""
    ticket_id: str | None = None
    needs_human: bool = False


@workflow.defn
class SupportAgent:
    def __init__(self) -> None:
        self._human_approval: Literal["approve", "reject"] | None = None

    @workflow.signal
    def human_decision(self, decision: Literal["approve", "reject"]) -> None:
        self._human_approval = decision

    @workflow.run
    async def run(self, input: SupportInput) -> str:
        state = AgentState(
            customer_email=input.customer_email,
            message=input.message,
        )

        kb_results = await workflow.execute_activity(
            search_kb,
            SearchArgs(query=input.message, limit=5),
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=TOOL_RETRY,
        )
        state.kb_results = kb_results

        classify = await workflow.execute_activity(
            call_llm,
            LLMRequest(
                system="You classify support messages. Respond with one of: faq, account, billing, escalate",
                user=input.message,
            ),
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=LLM_RETRY,
        )
        category = classify.text.strip().lower()

        if category == "escalate":
            state.needs_human = True
            state.ticket_id = await workflow.execute_activity(
                create_ticket,
                Ticket(
                    id="",
                    title=f"Escalated: {input.message[:60]}",
                    body=input.message,
                    email=input.customer_email,
                ),
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )
            return f"escalated:{state.ticket_id}"

        draft = await workflow.execute_activity(
            call_llm,
            LLMRequest(
                system=_draft_prompt(state.kb_results),
                user=input.message,
            ),
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=LLM_RETRY,
        )
        state.draft_reply = draft.text

        await workflow.wait_condition(
            lambda: self._human_approval is not None,
            timeout=timedelta(hours=24),
        )

        if self._human_approval == "reject":
            return "rejected"

        await workflow.execute_activity(
            send_email,
            Email(
                to=state.customer_email,
                subject="Re: your request",
                body=state.draft_reply,
            ),
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=TOOL_RETRY,
        )

        return "sent"


def _draft_prompt(results) -> str:
    joined = "\n".join(f"- {r.title}: {r.snippet}" for r in results)
    return (
        "You are a support assistant. Use the following knowledge base entries "
        f"to draft a reply:\n{joined}\n"
        "Be concise and friendly. If you are not sure, say so."
    )

Da steckt einiges drin, deshalb die wichtigen Punkte:

  • Keine LLM-Calls im Workflow selbst. Alle Model-Calls gehen über execute_activity. Nicht verhandelbar – Workflows müssen deterministisch sein.
  • Jede Activity hat Timeout und Retry Policy. Temporal retry’t nicht ohne, und es timeoutet auch nicht ohne.
  • Der Human-Approval-Schritt ist ein Signal mit 24-Stunden-Timeout. Der Workflow persistiert während des Wartens. Cluster-Restart ist egal.
  • Fehler sind strukturiert. non_retryable_error_types erlaubt Ihnen, zwischen „Netzwerk-Schluckauf" und „Modell hat verweigert" zu unterscheiden.

Saga-Muster für Kompensation

Ein anspruchsvollerer Fall: ein Agent, der mittendrin eine Aktion ausführt und sie rollbacken muss, falls ein späterer Schritt scheitert. Das klassische Saga-Muster. Temporal handhabt es elegant, weil Sie Kompensationslogik mit try/except registrieren können.

@workflow.defn
class PurchaseAgent:
    @workflow.run
    async def run(self, input: "PurchaseInput") -> str:
        compensations: list = []
        try:
            reservation_id = await workflow.execute_activity(
                reserve_inventory,
                input.sku,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )
            compensations.append(("release", reservation_id))

            charge_id = await workflow.execute_activity(
                charge_card,
                input.payment,
                start_to_close_timeout=timedelta(seconds=60),
                retry_policy=TOOL_RETRY,
            )
            compensations.append(("refund", charge_id))

            shipment = await workflow.execute_activity(
                create_shipment,
                input.address,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )

            return f"shipped:{shipment}"

        except Exception:
            for kind, handle in reversed(compensations):
                if kind == "release":
                    await workflow.execute_activity(
                        release_inventory,
                        handle,
                        start_to_close_timeout=timedelta(seconds=30),
                    )
                elif kind == "refund":
                    await workflow.execute_activity(
                        refund_charge,
                        handle,
                        start_to_close_timeout=timedelta(seconds=30),
                    )
            raise

Das ist der einzig korrekte Weg, den wir gefunden haben, einem Agent Write-and-undo beizubringen. Das im Application Code handzurollen, ist ein Tränental.

Umgang mit Nichtdeterminismus

Workflows in Temporal müssen beim Replay deterministisch sein. LLM-Outputs sind nicht deterministisch. Das ist in Ordnung, weil Sie LLM-Calls in Activities stecken und Temporal deren Ergebnisse persistiert. Beim Replay liefert das SDK den persistierten Output, statt das LLM erneut aufzurufen.

Dinge, die Ihren Workflow brechen:

  • random.random() oder time.time() direkt aufrufen – nutzen Sie workflow.random() und workflow.now().
  • Threads oder asyncio-Konstrukte, die nicht deterministisch sind.
  • Libraries mit Seiteneffekten beim Modul-Load importieren. (Nutzen Sie workflow.unsafe.imports_passed_through().)
  • Globalen State innerhalb der Workflow-Funktion mutieren.

Wenn Sie die Regeln einhalten, ist LLM-Nichtdeterminismus unsichtbar. Wenn nicht, bekommen Sie beim Replay einen NonDeterminismError und dürfen debuggen.

Testen mit Time-Skipping

Temporals Test-Environment erlaubt Ihnen, Zeit zu überspringen. Ein Workflow, der 24 Stunden auf Freigabe wartet, lässt sich in Millisekunden testen.

import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from app.activities import search_kb, call_llm, create_ticket, send_email
from app.workflows import SupportAgent, SupportInput


@pytest.mark.asyncio
async def test_support_agent_approval() -> None:
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="support",
            workflows=[SupportAgent],
            activities=[search_kb, call_llm, create_ticket, send_email],
        ):
            handle = await env.client.start_workflow(
                SupportAgent.run,
                SupportInput(
                    customer_email="[email protected]",
                    message="I cannot log in",
                ),
                id="test-support-1",
                task_queue="support",
            )
            await handle.signal(SupportAgent.human_decision, "approve")
            result = await handle.result()
            assert result == "sent"

start_time_skipping() ersetzt die normale Uhr, sodass wait_condition(timeout=24h) sofort zurückkehrt, wenn das Signal empfangen wurde. Einen mehrtägigen Workflow in einem Unit-Test zu testen ist real.

Das LLM-Activity werden Sie dennoch mocken wollen – entweder durch Registrierung einer Test-Activity mit derselben Signatur oder über Temporals Activity-Replacement-Feature.

Observability

Temporal bringt eine Web-UI mit, die die History jedes Workflows zeigt: Events, Inputs, Outputs, Retries, Timings. Für Agents ist das Gold – Sie können exakt abspielen, was das LLM gesehen und was es gesagt hat.

Verdrahten Sie OpenTelemetry obendrauf, und Sie bekommen die Spans auch in Ihrem normalen Tracing-Backend:

from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.client import Client

client = await Client.connect(
    "temporal:7233",
    interceptors=[TracingInterceptor()],
)

Die resultierenden Traces zeigen den Workflow als Parent Span mit jeder Activity als Child – genau das, was Sie für das Debugging des „Warum hat der Agent das gemacht?" wollen.

Wann dieser Stack nicht passt

Temporal ist Overkill für einen Single-Turn-LLM-Call hinter einem HTTP-Endpoint. Nutzen Sie Temporal, wenn:

  • Der Workflow mehrere Schritte mit externen Seiteneffekten hat.
  • Sie Retry- oder Kompensationssemantik benötigen.
  • Workflows langlaufend sind (Minuten bis Tage).
  • Zuverlässigkeit wichtiger ist als Latenz.
  • Sie einen vollständigen Audit-Trail jeder Aktion benötigen.

Überspringen Sie Temporal, wenn:

  • Der Agent ein LLM-Call und eine Antwort ist.
  • Subsekunden-Latenz kritisch und ein Workflow-Hop inakzeptabel ist.
  • Sie Determinismus-Constraints noch nicht verstanden haben – erst gehen, dann rennen.

Nächste Schritte

Agents werden im kommenden Jahr in jeder Organisation von Demo zu Produktion wandern, und die Teams, die zuverlässig shippen, sind die, die Orchestrierung von Beginn an ernst genommen haben. Temporal plus LangGraph ist die pragmatischste Kombination, die wir gefunden haben: LangGraph für Reasoning und Graph-Struktur, Temporal für Durability und Retries. Beginnen Sie mit einem einfachen Workflow, bekommen Sie Retries und Timeouts richtig hin, ergänzen Sie dann Kompensation und Human-in-the-Loop. Wenn Sie Unterstützung bei der Architektur von Agent-Systemen auf Temporal wünschen, nehmen Sie Kontakt auf.

abgelegt unter
temporallanggraphagentsworkflowsai
mit uns arbeiten

Soll unser Team Ihrer Infrastruktur helfen?

talk to an engineerFree 30-min discovery callBook
close