Проблема, яку мають агенти
Кожна команда, що відправила автономного або напівавтономного агента у продакшн, зіткнулась з тією самою стіною. Happy path — це демо. Реальний світ — це бардак часткових відмов: виклик LLM тайм-аутнувся посеред 20-крокового плану, інструмент повернув 503, модель видала трохи неправильний JSON, що ламає parser, процес рестартнувся і втратив стан, downstream сервіс rate-limited, користувач пішов.
Більшість agent-фреймворків — LangGraph, CrewAI, AutoGen — дають вам гарний DAG міркування і нічний кошмар, коли будь-який node падає. Вони спроєктовані навколо моделі міркування, а не моделі durability. Ви закінчуєте тим, що вручну погано реалізовуєте retry logic, checkpointing, компенсацію і observability.
Ця стаття — про патерн, який ми використовуємо у продакшні: LangGraph як шар міркування, Temporal як шар durability. LangGraph вирішує що робити; Temporal гарантує, що це дійсно буде зроблено. Комбінація нудніша і надійніша, ніж будь-яка з них окремо.
Temporal у двох абзацах
Temporal — це двигун durable execution. Ви пишете код, що виглядає як звичайна функція — послідовна, імперативна, типізована — але Temporal перехоплює кожен зовнішній виклик і персистить його входи, виходи і проміжний стан у базу даних. Якщо worker-процес падає посеред функції, Temporal відтворює функцію на новому worker, і вона продовжує з того місця, де зупинилась, детерміновано і невидимо для викликача.
Два примітиви — це workflows і activities. Workflows — це orchestration-код, durable, детермінований, long-running. Activities — це side effects: API-виклики, записи в БД, LLM completions. Activities можна ретраїти незалежно від workflow. Workflow, що працює три тижні і виживає відмову кластера — це підтримуваний use case, а не фокус.
Чому це так добре підходить агентам
Агенти структурно — це long-running orchestrations недетермінованих кроків із важкими вимогами до retry. Вони — Temporal workflows у LLM-капелюсі. Конкретно:
- LLM-виклики — це activities. Retry на transient failure, timeout, невалідний output.
- Tool-виклики — це activities. Та сама логіка.
- Цикл міркування — це workflow. Він виживає падіння; checkpoints безкоштовні.
- Людське схвалення — це signal. Workflow блокується на Temporal signal, чекає годинами чи днями і продовжується.
- Компенсація — це saga. Якщо крок 7 провалюється після того, як крок 5 списав з картки, ви запускаєте activity компенсації для повернення коштів.
LangGraph у цій архітектурі — це бібліотека, що викликається з workflow, а не runtime. Ви використовуєте його state graph та патерни міркування, щоб вирішувати наступну дію; Temporal її виконує.
Мінімальний workflow агента
Зберімо customer-support агента. Він шукає по knowledge base, викликає ticketing API, опціонально ескалує людині і надсилає email-відповідь. Ми використаємо Temporal Python SDK (1.8+) та LangGraph 0.3+.
Activities
Activities — це просто Python-функції з декоратором. Вони запускаються в окремому процесі від workflow для безпеки.
from __future__ import annotations
from dataclasses import dataclass
from langchain_anthropic import ChatAnthropic
from temporalio import activity
@dataclass
class LLMRequest:
system: str
user: str
model: str = "claude-sonnet-4-6-20260115"
@dataclass
class LLMResponse:
text: str
input_tokens: int
output_tokens: int
@activity.defn
async def call_llm(req: LLMRequest) -> LLMResponse:
llm = ChatAnthropic(model=req.model, max_tokens=1024)
result = await llm.ainvoke(
[
("system", req.system),
("user", req.user),
]
)
return LLMResponse(
text=result.content,
input_tokens=result.usage_metadata["input_tokens"],
output_tokens=result.usage_metadata["output_tokens"],
)
@dataclass
class SearchArgs:
query: str
limit: int = 5
@dataclass
class KBResult:
title: str
url: str
snippet: str
@activity.defn
async def search_kb(args: SearchArgs) -> list[KBResult]:
# Real implementation calls your search backend
return [
KBResult(
title="Reset password",
url="https://kb.example.com/reset",
snippet="To reset your password...",
),
]
@dataclass
class Ticket:
id: str
title: str
body: str
email: str
@activity.defn
async def create_ticket(t: Ticket) -> str:
# Real implementation calls your ticketing API
return f"T-{hash(t.title) % 100000}"
@dataclass
class Email:
to: str
subject: str
body: str
@activity.defn
async def send_email(msg: Email) -> None:
# Real implementation calls SES/Mailgun/etc
activity.logger.info("would send email", extra={"to": msg.to})
Retry Policies
Кожна activity отримує явну retry policy. LLM-виклики потребують агресивного retry, бо провайдери регулярно провалюються з 429 і 5xx. Створення тикета потребує ідемпотентності, не просто retry.
from datetime import timedelta
from temporalio.common import RetryPolicy
LLM_RETRY = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(seconds=30),
maximum_attempts=6,
non_retryable_error_types=["ValueError"],
)
TOOL_RETRY = RetryPolicy(
initial_interval=timedelta(seconds=2),
backoff_coefficient=2.0,
maximum_interval=timedelta(seconds=60),
maximum_attempts=4,
)
Workflow
Workflow склеює все разом. Зверніть увагу, що StateGraph LangGraph будується всередині workflow, але його nodes делегують Temporal activities замість того, щоб викликати LLM напряму.
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Literal
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from langgraph.graph import StateGraph, END
from .activities import (
call_llm,
search_kb,
create_ticket,
send_email,
LLMRequest,
SearchArgs,
Ticket,
Email,
)
from .retry import LLM_RETRY, TOOL_RETRY
@dataclass
class SupportInput:
customer_email: str
message: str
@dataclass
class AgentState:
customer_email: str
message: str
kb_results: list = field(default_factory=list)
draft_reply: str = ""
ticket_id: str | None = None
needs_human: bool = False
@workflow.defn
class SupportAgent:
def __init__(self) -> None:
self._human_approval: Literal["approve", "reject"] | None = None
@workflow.signal
def human_decision(self, decision: Literal["approve", "reject"]) -> None:
self._human_approval = decision
@workflow.run
async def run(self, input: SupportInput) -> str:
state = AgentState(
customer_email=input.customer_email,
message=input.message,
)
kb_results = await workflow.execute_activity(
search_kb,
SearchArgs(query=input.message, limit=5),
start_to_close_timeout=timedelta(seconds=30),
retry_policy=TOOL_RETRY,
)
state.kb_results = kb_results
classify = await workflow.execute_activity(
call_llm,
LLMRequest(
system="You classify support messages. Respond with one of: faq, account, billing, escalate",
user=input.message,
),
start_to_close_timeout=timedelta(seconds=60),
retry_policy=LLM_RETRY,
)
category = classify.text.strip().lower()
if category == "escalate":
state.needs_human = True
state.ticket_id = await workflow.execute_activity(
create_ticket,
Ticket(
id="",
title=f"Escalated: {input.message[:60]}",
body=input.message,
email=input.customer_email,
),
start_to_close_timeout=timedelta(seconds=30),
retry_policy=TOOL_RETRY,
)
return f"escalated:{state.ticket_id}"
draft = await workflow.execute_activity(
call_llm,
LLMRequest(
system=_draft_prompt(state.kb_results),
user=input.message,
),
start_to_close_timeout=timedelta(seconds=60),
retry_policy=LLM_RETRY,
)
state.draft_reply = draft.text
await workflow.wait_condition(
lambda: self._human_approval is not None,
timeout=timedelta(hours=24),
)
if self._human_approval == "reject":
return "rejected"
await workflow.execute_activity(
send_email,
Email(
to=state.customer_email,
subject="Re: your request",
body=state.draft_reply,
),
start_to_close_timeout=timedelta(seconds=30),
retry_policy=TOOL_RETRY,
)
return "sent"
def _draft_prompt(results) -> str:
joined = "\n".join(f"- {r.title}: {r.snippet}" for r in results)
return (
"You are a support assistant. Use the following knowledge base entries "
f"to draft a reply:\n{joined}\n"
"Be concise and friendly. If you are not sure, say so."
)
Тут багато, тож виділимо важливі частини.
- Жодних LLM-викликів усередині самого workflow. Усі виклики моделей ідуть через
execute_activity. Це не обговорюється — workflows мають бути детермінованими. - Кожна activity має timeout і retry policy. Temporal не ретраїтиме без них і не тайм-аутне без них.
- Крок людського схвалення — це signal із 24-годинним timeout. Workflow персистить, поки чекає. Рестарт кластера не має значення.
- Failures структуровані.
non_retryable_error_typesдозволяє відрізнити "network blip" від "model refused".
Saga-патерн для компенсації
Складніший випадок: агент, що виконує дію посередині, і має відкатити її, якщо пізніший крок провалюється. Класичний saga-патерн. Temporal обробляє це елегантно, бо ви можете реєструвати compensation-логіку через try/except.
@workflow.defn
class PurchaseAgent:
@workflow.run
async def run(self, input: "PurchaseInput") -> str:
compensations: list = []
try:
reservation_id = await workflow.execute_activity(
reserve_inventory,
input.sku,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=TOOL_RETRY,
)
compensations.append(("release", reservation_id))
charge_id = await workflow.execute_activity(
charge_card,
input.payment,
start_to_close_timeout=timedelta(seconds=60),
retry_policy=TOOL_RETRY,
)
compensations.append(("refund", charge_id))
shipment = await workflow.execute_activity(
create_shipment,
input.address,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=TOOL_RETRY,
)
return f"shipped:{shipment}"
except Exception:
for kind, handle in reversed(compensations):
if kind == "release":
await workflow.execute_activity(
release_inventory,
handle,
start_to_close_timeout=timedelta(seconds=30),
)
elif kind == "refund":
await workflow.execute_activity(
refund_charge,
handle,
start_to_close_timeout=timedelta(seconds=30),
)
raise
Це єдиний правильний спосіб, який ми знайшли, щоб дати агенту можливість write-and-undo. Реалізувати це вручну в коді застосунку — це стежка сліз.
Обробка недетермінізму
Workflows у Temporal мають бути детермінованими на replay. LLM-виводи не детерміновані. Це нормально, бо ви кладете LLM-виклики в activities, і Temporal персистить їхні результати. На replay SDK повертає persisted output замість того, щоб знову викликати LLM.
Речі, що зламають ваш workflow:
- Виклик
random.random()чиtime.time()напряму — використовуйтеworkflow.random()іworkflow.now(). - Використання потоків чи asyncio-конструкцій, що не детерміновані.
- Імпорт бібліотек із side effects на module load. (Використовуйте
workflow.unsafe.imports_passed_through().) - Мутація глобального стану всередині функції workflow.
Якщо ви дотримуєтесь правил, недетермінізм LLM невидимий. Якщо ні — ви зіткнетесь з NonDeterminismError на replay і муситимете його дебажити.
Тестування з time-skipping
Тестове середовище Temporal дозволяє пропускати час. Workflow, що чекає 24 години на схвалення, можна протестувати за мілісекунди.
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from app.activities import search_kb, call_llm, create_ticket, send_email
from app.workflows import SupportAgent, SupportInput
@pytest.mark.asyncio
async def test_support_agent_approval() -> None:
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="support",
workflows=[SupportAgent],
activities=[search_kb, call_llm, create_ticket, send_email],
):
handle = await env.client.start_workflow(
SupportAgent.run,
SupportInput(
customer_email="[email protected]",
message="I cannot log in",
),
id="test-support-1",
task_queue="support",
)
await handle.signal(SupportAgent.human_decision, "approve")
result = await handle.result()
assert result == "sent"
start_time_skipping() заміняє звичайний годинник, тож wait_condition(timeout=24h) повертається негайно, якщо signal отримано. Тестувати multi-day workflow в unit test — це реальна річ.
Ви все одно захочете замокати LLM-activity — або реєстрацією тестової activity з тим самим підписом, або через activity replacement фічу Temporal.
Observability
Temporal постачається з Web UI, що показує історію кожного workflow: події, входи, виходи, retries, тайминги. Для агентів це золото — ви можете replay-ити точно те, що LLM бачив, і точно те, що він сказав.
Підключіть OpenTelemetry зверху — і ви отримаєте spans у вашому звичайному tracing-бекенді також:
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.client import Client
client = await Client.connect(
"temporal:7233",
interceptors=[TracingInterceptor()],
)
Результуючі traces показують workflow як parent span, а кожну activity — як child, саме те, що ви хочете для дебагу "чому агент зробив це".
Коли не використовувати цей стек
Temporal — це overkill для single-turn LLM-виклику за HTTP-ендпоінтом. Використовуйте Temporal, коли:
- Workflow має кілька кроків із зовнішніми side effects.
- Вам потрібна семантика retry чи компенсації.
- Workflows long-running (хвилини — дні).
- Надійність важливіша за latency.
- Вам потрібен повний audit trail кожної дії.
Пропустіть Temporal, коли:
- Агент — це один LLM-виклик і відповідь.
- Sub-second latency критична, і додавання workflow-hop неприйнятне.
- Ви ще не розумієте обмеження детермінізму — спочатку крок, потім біг.
Наступні кроки
Агенти перейдуть з демо у продакшн у кожній організації протягом наступного року, і команди, що відвантажуватимуть надійно, будуть тими, хто серйозно поставився до orchestration з самого початку. Temporal плюс LangGraph — найпрагматичніша комбінація, яку ми знайшли: LangGraph для міркування і графової структури, Temporal для durability і retries. Почніть з простого workflow, правильно налаштуйте retries і timeouts, а потім додайте компенсацію і human-in-the-loop. Якщо хочете допомоги з проєктуванням агентних систем на Temporal, напишіть нам.