Loading...
Усі статті
AI Infrastructure · 9 min read

Надійні AI-агенти з Temporal та LangGraph

Durable, retryable, observable AI-агенти, побудовані поєднанням Temporal workflows з міркуванням LangGraph. Обробляє провали LLM, довгі tool calls та saga-style компенсацію.

Проблема, яку мають агенти

Кожна команда, що відправила автономного або напівавтономного агента у продакшн, зіткнулась з тією самою стіною. Happy path — це демо. Реальний світ — це бардак часткових відмов: виклик LLM тайм-аутнувся посеред 20-крокового плану, інструмент повернув 503, модель видала трохи неправильний JSON, що ламає parser, процес рестартнувся і втратив стан, downstream сервіс rate-limited, користувач пішов.

Більшість agent-фреймворків — LangGraph, CrewAI, AutoGen — дають вам гарний DAG міркування і нічний кошмар, коли будь-який node падає. Вони спроєктовані навколо моделі міркування, а не моделі durability. Ви закінчуєте тим, що вручну погано реалізовуєте retry logic, checkpointing, компенсацію і observability.

Ця стаття — про патерн, який ми використовуємо у продакшні: LangGraph як шар міркування, Temporal як шар durability. LangGraph вирішує що робити; Temporal гарантує, що це дійсно буде зроблено. Комбінація нудніша і надійніша, ніж будь-яка з них окремо.

Temporal у двох абзацах

Temporal — це двигун durable execution. Ви пишете код, що виглядає як звичайна функція — послідовна, імперативна, типізована — але Temporal перехоплює кожен зовнішній виклик і персистить його входи, виходи і проміжний стан у базу даних. Якщо worker-процес падає посеред функції, Temporal відтворює функцію на новому worker, і вона продовжує з того місця, де зупинилась, детерміновано і невидимо для викликача.

Два примітиви — це workflows і activities. Workflows — це orchestration-код, durable, детермінований, long-running. Activities — це side effects: API-виклики, записи в БД, LLM completions. Activities можна ретраїти незалежно від workflow. Workflow, що працює три тижні і виживає відмову кластера — це підтримуваний use case, а не фокус.

Чому це так добре підходить агентам

Агенти структурно — це long-running orchestrations недетермінованих кроків із важкими вимогами до retry. Вони — Temporal workflows у LLM-капелюсі. Конкретно:

  • LLM-виклики — це activities. Retry на transient failure, timeout, невалідний output.
  • Tool-виклики — це activities. Та сама логіка.
  • Цикл міркування — це workflow. Він виживає падіння; checkpoints безкоштовні.
  • Людське схвалення — це signal. Workflow блокується на Temporal signal, чекає годинами чи днями і продовжується.
  • Компенсація — це saga. Якщо крок 7 провалюється після того, як крок 5 списав з картки, ви запускаєте activity компенсації для повернення коштів.

LangGraph у цій архітектурі — це бібліотека, що викликається з workflow, а не runtime. Ви використовуєте його state graph та патерни міркування, щоб вирішувати наступну дію; Temporal її виконує.

Мінімальний workflow агента

Зберімо customer-support агента. Він шукає по knowledge base, викликає ticketing API, опціонально ескалує людині і надсилає email-відповідь. Ми використаємо Temporal Python SDK (1.8+) та LangGraph 0.3+.

Activities

Activities — це просто Python-функції з декоратором. Вони запускаються в окремому процесі від workflow для безпеки.

from __future__ import annotations

from dataclasses import dataclass

from langchain_anthropic import ChatAnthropic
from temporalio import activity


@dataclass
class LLMRequest:
    system: str
    user: str
    model: str = "claude-sonnet-4-6-20260115"


@dataclass
class LLMResponse:
    text: str
    input_tokens: int
    output_tokens: int


@activity.defn
async def call_llm(req: LLMRequest) -> LLMResponse:
    llm = ChatAnthropic(model=req.model, max_tokens=1024)
    result = await llm.ainvoke(
        [
            ("system", req.system),
            ("user", req.user),
        ]
    )
    return LLMResponse(
        text=result.content,
        input_tokens=result.usage_metadata["input_tokens"],
        output_tokens=result.usage_metadata["output_tokens"],
    )


@dataclass
class SearchArgs:
    query: str
    limit: int = 5


@dataclass
class KBResult:
    title: str
    url: str
    snippet: str


@activity.defn
async def search_kb(args: SearchArgs) -> list[KBResult]:
    # Real implementation calls your search backend
    return [
        KBResult(
            title="Reset password",
            url="https://kb.example.com/reset",
            snippet="To reset your password...",
        ),
    ]


@dataclass
class Ticket:
    id: str
    title: str
    body: str
    email: str


@activity.defn
async def create_ticket(t: Ticket) -> str:
    # Real implementation calls your ticketing API
    return f"T-{hash(t.title) % 100000}"


@dataclass
class Email:
    to: str
    subject: str
    body: str


@activity.defn
async def send_email(msg: Email) -> None:
    # Real implementation calls SES/Mailgun/etc
    activity.logger.info("would send email", extra={"to": msg.to})

Retry Policies

Кожна activity отримує явну retry policy. LLM-виклики потребують агресивного retry, бо провайдери регулярно провалюються з 429 і 5xx. Створення тикета потребує ідемпотентності, не просто retry.

from datetime import timedelta

from temporalio.common import RetryPolicy

LLM_RETRY = RetryPolicy(
    initial_interval=timedelta(seconds=1),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(seconds=30),
    maximum_attempts=6,
    non_retryable_error_types=["ValueError"],
)

TOOL_RETRY = RetryPolicy(
    initial_interval=timedelta(seconds=2),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(seconds=60),
    maximum_attempts=4,
)

Workflow

Workflow склеює все разом. Зверніть увагу, що StateGraph LangGraph будується всередині workflow, але його nodes делегують Temporal activities замість того, щоб викликати LLM напряму.

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import timedelta
from typing import Literal

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from langgraph.graph import StateGraph, END
    from .activities import (
        call_llm,
        search_kb,
        create_ticket,
        send_email,
        LLMRequest,
        SearchArgs,
        Ticket,
        Email,
    )
    from .retry import LLM_RETRY, TOOL_RETRY


@dataclass
class SupportInput:
    customer_email: str
    message: str


@dataclass
class AgentState:
    customer_email: str
    message: str
    kb_results: list = field(default_factory=list)
    draft_reply: str = ""
    ticket_id: str | None = None
    needs_human: bool = False


@workflow.defn
class SupportAgent:
    def __init__(self) -> None:
        self._human_approval: Literal["approve", "reject"] | None = None

    @workflow.signal
    def human_decision(self, decision: Literal["approve", "reject"]) -> None:
        self._human_approval = decision

    @workflow.run
    async def run(self, input: SupportInput) -> str:
        state = AgentState(
            customer_email=input.customer_email,
            message=input.message,
        )

        kb_results = await workflow.execute_activity(
            search_kb,
            SearchArgs(query=input.message, limit=5),
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=TOOL_RETRY,
        )
        state.kb_results = kb_results

        classify = await workflow.execute_activity(
            call_llm,
            LLMRequest(
                system="You classify support messages. Respond with one of: faq, account, billing, escalate",
                user=input.message,
            ),
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=LLM_RETRY,
        )
        category = classify.text.strip().lower()

        if category == "escalate":
            state.needs_human = True
            state.ticket_id = await workflow.execute_activity(
                create_ticket,
                Ticket(
                    id="",
                    title=f"Escalated: {input.message[:60]}",
                    body=input.message,
                    email=input.customer_email,
                ),
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )
            return f"escalated:{state.ticket_id}"

        draft = await workflow.execute_activity(
            call_llm,
            LLMRequest(
                system=_draft_prompt(state.kb_results),
                user=input.message,
            ),
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=LLM_RETRY,
        )
        state.draft_reply = draft.text

        await workflow.wait_condition(
            lambda: self._human_approval is not None,
            timeout=timedelta(hours=24),
        )

        if self._human_approval == "reject":
            return "rejected"

        await workflow.execute_activity(
            send_email,
            Email(
                to=state.customer_email,
                subject="Re: your request",
                body=state.draft_reply,
            ),
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=TOOL_RETRY,
        )

        return "sent"


def _draft_prompt(results) -> str:
    joined = "\n".join(f"- {r.title}: {r.snippet}" for r in results)
    return (
        "You are a support assistant. Use the following knowledge base entries "
        f"to draft a reply:\n{joined}\n"
        "Be concise and friendly. If you are not sure, say so."
    )

Тут багато, тож виділимо важливі частини.

  • Жодних LLM-викликів усередині самого workflow. Усі виклики моделей ідуть через execute_activity. Це не обговорюється — workflows мають бути детермінованими.
  • Кожна activity має timeout і retry policy. Temporal не ретраїтиме без них і не тайм-аутне без них.
  • Крок людського схвалення — це signal із 24-годинним timeout. Workflow персистить, поки чекає. Рестарт кластера не має значення.
  • Failures структуровані. non_retryable_error_types дозволяє відрізнити "network blip" від "model refused".

Saga-патерн для компенсації

Складніший випадок: агент, що виконує дію посередині, і має відкатити її, якщо пізніший крок провалюється. Класичний saga-патерн. Temporal обробляє це елегантно, бо ви можете реєструвати compensation-логіку через try/except.

@workflow.defn
class PurchaseAgent:
    @workflow.run
    async def run(self, input: "PurchaseInput") -> str:
        compensations: list = []
        try:
            reservation_id = await workflow.execute_activity(
                reserve_inventory,
                input.sku,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )
            compensations.append(("release", reservation_id))

            charge_id = await workflow.execute_activity(
                charge_card,
                input.payment,
                start_to_close_timeout=timedelta(seconds=60),
                retry_policy=TOOL_RETRY,
            )
            compensations.append(("refund", charge_id))

            shipment = await workflow.execute_activity(
                create_shipment,
                input.address,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=TOOL_RETRY,
            )

            return f"shipped:{shipment}"

        except Exception:
            for kind, handle in reversed(compensations):
                if kind == "release":
                    await workflow.execute_activity(
                        release_inventory,
                        handle,
                        start_to_close_timeout=timedelta(seconds=30),
                    )
                elif kind == "refund":
                    await workflow.execute_activity(
                        refund_charge,
                        handle,
                        start_to_close_timeout=timedelta(seconds=30),
                    )
            raise

Це єдиний правильний спосіб, який ми знайшли, щоб дати агенту можливість write-and-undo. Реалізувати це вручну в коді застосунку — це стежка сліз.

Обробка недетермінізму

Workflows у Temporal мають бути детермінованими на replay. LLM-виводи не детерміновані. Це нормально, бо ви кладете LLM-виклики в activities, і Temporal персистить їхні результати. На replay SDK повертає persisted output замість того, щоб знову викликати LLM.

Речі, що зламають ваш workflow:

  • Виклик random.random() чи time.time() напряму — використовуйте workflow.random() і workflow.now().
  • Використання потоків чи asyncio-конструкцій, що не детерміновані.
  • Імпорт бібліотек із side effects на module load. (Використовуйте workflow.unsafe.imports_passed_through().)
  • Мутація глобального стану всередині функції workflow.

Якщо ви дотримуєтесь правил, недетермінізм LLM невидимий. Якщо ні — ви зіткнетесь з NonDeterminismError на replay і муситимете його дебажити.

Тестування з time-skipping

Тестове середовище Temporal дозволяє пропускати час. Workflow, що чекає 24 години на схвалення, можна протестувати за мілісекунди.

import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from app.activities import search_kb, call_llm, create_ticket, send_email
from app.workflows import SupportAgent, SupportInput


@pytest.mark.asyncio
async def test_support_agent_approval() -> None:
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="support",
            workflows=[SupportAgent],
            activities=[search_kb, call_llm, create_ticket, send_email],
        ):
            handle = await env.client.start_workflow(
                SupportAgent.run,
                SupportInput(
                    customer_email="[email protected]",
                    message="I cannot log in",
                ),
                id="test-support-1",
                task_queue="support",
            )
            await handle.signal(SupportAgent.human_decision, "approve")
            result = await handle.result()
            assert result == "sent"

start_time_skipping() заміняє звичайний годинник, тож wait_condition(timeout=24h) повертається негайно, якщо signal отримано. Тестувати multi-day workflow в unit test — це реальна річ.

Ви все одно захочете замокати LLM-activity — або реєстрацією тестової activity з тим самим підписом, або через activity replacement фічу Temporal.

Observability

Temporal постачається з Web UI, що показує історію кожного workflow: події, входи, виходи, retries, тайминги. Для агентів це золото — ви можете replay-ити точно те, що LLM бачив, і точно те, що він сказав.

Підключіть OpenTelemetry зверху — і ви отримаєте spans у вашому звичайному tracing-бекенді також:

from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.client import Client

client = await Client.connect(
    "temporal:7233",
    interceptors=[TracingInterceptor()],
)

Результуючі traces показують workflow як parent span, а кожну activity — як child, саме те, що ви хочете для дебагу "чому агент зробив це".

Коли не використовувати цей стек

Temporal — це overkill для single-turn LLM-виклику за HTTP-ендпоінтом. Використовуйте Temporal, коли:

  • Workflow має кілька кроків із зовнішніми side effects.
  • Вам потрібна семантика retry чи компенсації.
  • Workflows long-running (хвилини — дні).
  • Надійність важливіша за latency.
  • Вам потрібен повний audit trail кожної дії.

Пропустіть Temporal, коли:

  • Агент — це один LLM-виклик і відповідь.
  • Sub-second latency критична, і додавання workflow-hop неприйнятне.
  • Ви ще не розумієте обмеження детермінізму — спочатку крок, потім біг.

Наступні кроки

Агенти перейдуть з демо у продакшн у кожній організації протягом наступного року, і команди, що відвантажуватимуть надійно, будуть тими, хто серйозно поставився до orchestration з самого початку. Temporal плюс LangGraph — найпрагматичніша комбінація, яку ми знайшли: LangGraph для міркування і графової структури, Temporal для durability і retries. Почніть з простого workflow, правильно налаштуйте retries і timeouts, а потім додайте компенсацію і human-in-the-loop. Якщо хочете допомоги з проєктуванням агентних систем на Temporal, напишіть нам.

у категорії
temporallanggraphagentsworkflowsai
працювати з нами

Хочете, щоб наша команда допомогла з вашою інфраструктурою?

talk to an engineerFree 30-min discovery callBook
close