Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .release-please-manifest.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
".": "0.16.2",
"adk": "0.16.2"
".": "0.17.0",
"adk": "0.17.0"
}
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

* **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``.

## 0.17.0 (2026-07-01)

Full Changelog: [agentex-client-v0.16.2...agentex-client-v0.17.0](https://github.com/scaleapi/scale-agentex-python/compare/agentex-client-v0.16.2...agentex-client-v0.17.0)

### Features

* **temporal:** opt-in continue-as-new for long-lived agent workflows ([#447](https://github.com/scaleapi/scale-agentex-python/issues/447)) ([98cf744](https://github.com/scaleapi/scale-agentex-python/commit/98cf7444002b5f9862f3a922665f016ae6c89af0))

## 0.16.2 (2026-06-29)

Full Changelog: [agentex-client-v0.16.1...agentex-client-v0.16.2](https://github.com/scaleapi/scale-agentex-python/compare/agentex-client-v0.16.1...agentex-client-v0.16.2)
Expand Down
8 changes: 8 additions & 0 deletions adk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 0.17.0 (2026-07-01)

Full Changelog: [agentex-sdk-v0.16.2...agentex-sdk-v0.17.0](https://github.com/scaleapi/scale-agentex-python/compare/agentex-sdk-v0.16.2...agentex-sdk-v0.17.0)

### Chores

* **agentex-sdk:** Synchronize agentex versions

## 0.16.2 (2026-06-29)

Full Changelog: [agentex-sdk-v0.15.0...agentex-sdk-v0.16.2](https://github.com/scaleapi/scale-agentex-python/compare/agentex-sdk-v0.15.0...agentex-sdk-v0.16.2)
Expand Down
2 changes: 1 addition & 1 deletion adk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# (agentex/{__init__.py, _*.py, types/, resources/}) ships from the slim
# sibling package `agentex-client` which is pinned as a runtime dep.
name = "agentex-sdk"
version = "0.16.2"
version = "0.17.0"
description = "Agent Development Kit (ADK) overlay for the Agentex API β€” FastACP server, Temporal workflows, LLM provider integrations, observability"
license = "Apache-2.0"
authors = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,27 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
async def on_task_create(self, params: CreateTaskParams) -> None:
logger.info(f"Received task create params: {params}")

# 1. Acknowledge that the task has been created.
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)
# 1. Acknowledge that the task has been created. Gate this one-time prologue
# on is_continued_run(): run_until_complete below recycles the workflow via
# continue-as-new, which re-enters on_task_create from the top β€” without this
# guard the "you should only see this once" welcome would re-fire on every
# recycle. Original run -> emit; continued (recycled) run -> skip.
if not self.is_continued_run():
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)

# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.

# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
# 2. Keep the workflow open to field events. We use run_until_complete
# instead of a bare wait_condition: it still waits indefinitely, but also
# recycles the Temporal event history via continue-as-new before it hits the
# ~50k-event / 50MB limit, so this chat can stay open forever. Adopting
# run_until_complete IS the opt-in β€” agents that keep the old wait_condition
# never recycle. This agent keeps no cross-turn state, so nothing needs
# restoring across a recycle and `params` is the only carry-forward. (Agents
# that DO keep state restore it at the top of @workflow.run on a recycled
# run β€” framework-specific, landing per-integration in follow-up PRs.)
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Original file line number Diff line number Diff line change
Expand Up @@ -240,42 +240,28 @@ async def stream_messages() -> None:
task_id=task.id,
timeout=90, # Increased timeout for CI environments
):
# A turn emits several messages (user echo, reasoning, agent text),
# each ending in "full" or "done"; consume until the text reply lands.
msg_type = event.get("type")
if msg_type == "full":
task_message_update = StreamTaskMessageFull.model_validate(event)
if task_message_update.parent_task_message and task_message_update.parent_task_message.id:
finished_message = await client.messages.retrieve(task_message_update.parent_task_message.id)
if (
finished_message.content
and finished_message.content.type == "text"
and finished_message.content.author == "user"
):
user_message_found = True
elif (
finished_message.content
and finished_message.content.type == "text"
and finished_message.content.author == "agent"
):
agent_response_found = True
elif finished_message.content and finished_message.content.type == "reasoning":
reasoning_found = True

# Exit early if we have what we need
if user_message_found and agent_response_found:
break

parent_task_message = StreamTaskMessageFull.model_validate(event).parent_task_message
elif msg_type == "done":
task_message_update_done = StreamTaskMessageDone.model_validate(event)
if task_message_update_done.parent_task_message and task_message_update_done.parent_task_message.id:
finished_message = await client.messages.retrieve(task_message_update_done.parent_task_message.id)
if finished_message.content and finished_message.content.type == "reasoning":
reasoning_found = True
elif (
finished_message.content
and finished_message.content.type == "text"
and finished_message.content.author == "agent"
):
agent_response_found = True
parent_task_message = StreamTaskMessageDone.model_validate(event).parent_task_message
else:
continue

if parent_task_message and parent_task_message.id:
finished_message = await client.messages.retrieve(parent_task_message.id)
content = finished_message.content
if content and content.type == "text" and content.author == "user":
user_message_found = True
elif content and content.type == "text" and content.author == "agent":
agent_response_found = True
elif content and content.type == "reasoning":
reasoning_found = True

# Stop once both the user echo and the agent's text reply are seen.
if user_message_found and agent_response_found:
break

stream_task = asyncio.create_task(stream_messages())
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# overlay (formerly `src/agentex/lib/*`) now lives in `adk/` and ships
# as the sibling `agentex-sdk` package β€” see `adk/pyproject.toml`.
name = "agentex-client"
version = "0.16.2"
version = "0.17.0"
description = "The official Python REST client for the Agentex API"
dynamic = ["readme"]
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/agentex/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.

__title__ = "agentex"
__version__ = "0.16.2" # x-release-please-version
__version__ = "0.17.0" # x-release-please-version
2 changes: 1 addition & 1 deletion src/agentex/lib/core/clients/temporal/temporal_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def start_workflow(
duplicate_policy: DuplicateWorkflowPolicy = DuplicateWorkflowPolicy.ALLOW_DUPLICATE,
retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
task_timeout: timedelta = timedelta(seconds=10),
execution_timeout: timedelta = timedelta(seconds=86400),
execution_timeout: timedelta | None = None,
**kwargs: Any,
) -> str:
temporal_retry_policy = TemporalRetryPolicy(**retry_policy.model_dump(exclude_unset=True))
Expand Down
11 changes: 10 additions & 1 deletion src/agentex/lib/core/temporal/services/temporal_task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ async def submit_task(self, agent: Agent, task: Task, params: dict[str, Any] | N

returns the workflow ID of the temporal workflow
"""
# None / 0 / negative => no execution timeout (workflow can stay open
# indefinitely, which long-lived chat/session agents rely on). A positive
# value bounds the whole continue-as-new chain's wall-clock lifetime.
timeout_seconds = self._env_vars.WORKFLOW_EXECUTION_TIMEOUT_SECONDS
execution_timeout = (
timedelta(seconds=timeout_seconds)
if timeout_seconds and timeout_seconds > 0
else None
)
return await self._temporal_client.start_workflow(
workflow=self._env_vars.WORKFLOW_NAME,
arg=CreateTaskParams(
Expand All @@ -42,7 +51,7 @@ async def submit_task(self, agent: Agent, task: Task, params: dict[str, Any] | N
),
id=task.id,
task_queue=self._env_vars.WORKFLOW_TASK_QUEUE,
execution_timeout=timedelta(seconds=self._env_vars.WORKFLOW_EXECUTION_TIMEOUT_SECONDS),
execution_timeout=execution_timeout,
)

async def get_state(self, task_id: str) -> WorkflowState:
Expand Down
136 changes: 136 additions & 0 deletions src/agentex/lib/core/temporal/workflows/workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, Callable
from datetime import timedelta

from temporalio import workflow

Expand All @@ -24,3 +28,135 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
@abstractmethod
async def on_task_create(self, params: CreateTaskParams) -> None:
raise NotImplementedError

# ------------------------------------------------------------------ #
# Continue-as-new lifecycle helpers #
# #
# These let a long-lived chat/session workflow recycle its event #
# history so it can stay open indefinitely without hitting Temporal's #
# ~50k-event / 50MB history limit. They are OPT-IN: an agent gets #
# recycling only by calling `run_until_complete` from its #
# `@workflow.run` instead of the usual indefinite `wait_condition`. #
# The SDK owns the hard Temporal mechanics (recycle decision and #
# draining in-flight handlers before the continue_as_new call). #
# Restoring state after a recycle is the AGENT's job and is #
# framework-specific (rebuild from `adk.messages`, an `adk.state` #
# snapshot, or a framework's own memory); that lands per-integration #
# in follow-up PRs. The 000_hello_acp example shows the minimal #
# stateless adoption that needs no restoration. #
# ------------------------------------------------------------------ #

def should_continue_as_new(self) -> bool:
"""Whether this run should recycle its event history via continue-as-new.

True when Temporal suggests it: ``is_continue_as_new_suggested()`` fires as
the event history approaches the server's size/count limit, so we let
Temporal own the threshold rather than configuring one ourselves.

This reads only a deterministic ``workflow.info()`` value and emits no
commands, so it is safe to use directly as a ``workflow.wait_condition``
predicate, e.g.::

await workflow.wait_condition(
lambda: self._complete_task or self.should_continue_as_new()
)
"""
return workflow.info().is_continue_as_new_suggested()

async def drain_and_continue_as_new(
self,
*args: Any,
is_complete: Callable[[], bool] | None = None,
) -> None:
"""Drain in-flight signal handlers, then continue-as-new.

Call this from the agent's ``@workflow.run`` once the run loop wakes for a
recycle (see :meth:`should_continue_as_new`). ``args`` are forwarded
verbatim to ``workflow.continue_as_new`` and become the new run's input, so
pass whatever your ``@workflow.run`` signature expects β€” typically the
original ``CreateTaskParams`` (the new run keeps the same workflow id / task
id and re-hydrates its state from ``adk.state``).

IMPORTANT: keep your data OUTSIDE workflow state BEFORE calling this β€”
messages in ``adk.messages`` and any other state in ``adk.state``.
In-workflow attributes do NOT survive the recycle; only the forwarded
``args`` do.

Waits on ``all_handlers_finished`` first so an in-flight turn (a signal
handler still running an activity) is never lost or duplicated across the
recycle boundary. ``workflow.continue_as_new`` raises to end the run, so
this never returns normally β€” EXCEPT when ``is_complete`` is given and
returns True after draining: a completion signal can arrive while we wait
for the drain, and the recycled run would start fresh (losing that
completion), so in that case we return without recycling and let the caller
finish.
"""
# Don't recycle until any signal handler still running has finished, so a
# message mid-flight at the boundary is carried into the next run intact.
await workflow.wait_condition(workflow.all_handlers_finished)
# A completion signal may have landed during the drain β€” re-check before
# recycling so a workflow that should finish isn't kept open by the recycle.
if is_complete is not None and is_complete():
return
logger.info(
"Recycling workflow via continue-as-new "
f"(history_length={workflow.info().get_current_history_length()}, "
f"run_id={workflow.info().run_id})"
)
workflow.continue_as_new(*args)

async def run_until_complete(
self,
*continue_as_new_args: Any,
is_complete: Callable[[], bool],
timeout: timedelta | None = None,
) -> None:
"""Keep the workflow open to field events, recycling history as needed.

Drop-in replacement for the usual ``await workflow.wait_condition(
lambda: self._complete_task, timeout=None)`` at the end of an agent's
``@workflow.run``. ``is_complete`` is a no-arg predicate (typically
``lambda: self._complete_task``); ``continue_as_new_args`` are forwarded to
continue-as-new on recycle (typically the original ``CreateTaskParams``).

Adopting this method IS the opt-in to recycling β€” there is no flag. An agent
that keeps the old indefinite ``wait_condition`` never recycles.

``timeout`` is an optional cap on how long to wait with no progress; it
defaults to None = wait indefinitely (the usual case β€” Temporal can keep huge
numbers of idle workflows open). The broader workflow-level lifetime cap is
the execution timeout (``WORKFLOW_EXECUTION_TIMEOUT_SECONDS``, also infinite
by default). On ``timeout`` expiry ``wait_condition`` raises
``asyncio.TimeoutError`` like before.

Comment thread
greptile-apps[bot] marked this conversation as resolved.
Persist anything you need across a recycle OUTSIDE workflow state first β€”
messages in ``adk.messages``, other state in ``adk.state`` β€” and rebuild it
at the top of ``@workflow.run``.
"""
while True:
await workflow.wait_condition(
lambda: is_complete() or self.should_continue_as_new(),
timeout=timeout,
)
if is_complete():
return
# Drains in-flight handlers, then continue-as-new (raises; never
# returns) β€” UNLESS a completion signal arrived during the drain, in
# which case it returns here and the next loop iteration completes.
await self.drain_and_continue_as_new(
*continue_as_new_args, is_complete=is_complete
)
if is_complete():
return

def is_continued_run(self) -> bool:
"""Whether this run was produced by a continue-as-new from a prior run.

True only on a recycled run (``workflow.info().continued_run_id`` is set),
False on the original run a client created. Use it in ``@workflow.run`` to
gate one-time prologue work that must NOT repeat on every recycle β€” e.g. a
welcome message, or rehydrating state only when there's something to restore.
The recycled run re-enters ``@workflow.run`` from the top, so anything not
gated here runs again on each history rollover.
"""
return workflow.info().continued_run_id is not None
Loading
Loading