diff --git a/README.md b/README.md index 4d7091bc..f8ed460f 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,7 @@ without wrapping them in a workflow. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. * [worker_multiprocessing](worker_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks and other CPU bound operations by running multiple workers. +* [workflow_pause](workflow_pause/) - Demonstrate the experimental Workflow Pause feature: pause/unpause, signals, queries, updates, activities, and cancel/terminate. ## Test diff --git a/tests/workflow_pause/__init__.py b/tests/workflow_pause/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/workflow_pause/activities_test.py b/tests/workflow_pause/activities_test.py new file mode 100644 index 00000000..5b923fbf --- /dev/null +++ b/tests/workflow_pause/activities_test.py @@ -0,0 +1,27 @@ +import uuid + +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from workflow_pause.activities import TASK_QUEUE +from workflow_pause.activities.activities import process_item +from workflow_pause.activities.workflow import ActivityPauseWorkflow + + +async def test_activity_workflow_retries_then_succeeds( + client: Client, env: WorkflowEnvironment +): + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[ActivityPauseWorkflow], + activities=[process_item], + ): + result = await client.execute_workflow( + ActivityPauseWorkflow.run, + "widget", + id=f"activities-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + assert result == "processed widget" diff --git a/tests/workflow_pause/basic_test.py b/tests/workflow_pause/basic_test.py new file mode 100644 index 00000000..98ea9320 --- /dev/null +++ b/tests/workflow_pause/basic_test.py @@ -0,0 +1,19 @@ +import uuid + +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from workflow_pause.basic import TASK_QUEUE +from workflow_pause.basic.workflow import BasicPauseWorkflow + + +async def test_basic_workflow_completes(client: Client, env: WorkflowEnvironment): + async with Worker(client, task_queue=TASK_QUEUE, workflows=[BasicPauseWorkflow]): + result = await client.execute_workflow( + BasicPauseWorkflow.run, + 3, + id=f"basic-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + assert result == 3 diff --git a/tests/workflow_pause/cancel_terminate_test.py b/tests/workflow_pause/cancel_terminate_test.py new file mode 100644 index 00000000..6dc9ec86 --- /dev/null +++ b/tests/workflow_pause/cancel_terminate_test.py @@ -0,0 +1,29 @@ +import asyncio +import uuid + +import pytest +from temporalio.client import Client, WorkflowFailureError +from temporalio.exceptions import CancelledError +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from workflow_pause.cancel_terminate import TASK_QUEUE +from workflow_pause.cancel_terminate.workflow import CancelTerminatePauseWorkflow + + +async def test_cancellation_runs_cleanup(client: Client, env: WorkflowEnvironment): + async with Worker( + client, task_queue=TASK_QUEUE, workflows=[CancelTerminatePauseWorkflow] + ): + handle = await client.start_workflow( + CancelTerminatePauseWorkflow.run, + 20, + id=f"cancel-terminate-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + # Let the workflow start its loop, then cancel it. + await asyncio.sleep(1) + await handle.cancel() + with pytest.raises(WorkflowFailureError) as exc_info: + await handle.result() + assert isinstance(exc_info.value.cause, CancelledError) diff --git a/tests/workflow_pause/queries_test.py b/tests/workflow_pause/queries_test.py new file mode 100644 index 00000000..27efccb2 --- /dev/null +++ b/tests/workflow_pause/queries_test.py @@ -0,0 +1,19 @@ +import uuid + +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from workflow_pause.queries import TASK_QUEUE +from workflow_pause.queries.workflow import QueryPauseWorkflow + + +async def test_query_returns_count(client: Client, env: WorkflowEnvironment): + async with Worker(client, task_queue=TASK_QUEUE, workflows=[QueryPauseWorkflow]): + result = await client.execute_workflow( + QueryPauseWorkflow.run, + 2, + id=f"queries-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + assert result == 2 diff --git a/tests/workflow_pause/signals_test.py b/tests/workflow_pause/signals_test.py new file mode 100644 index 00000000..424e5144 --- /dev/null +++ b/tests/workflow_pause/signals_test.py @@ -0,0 +1,22 @@ +import uuid + +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from workflow_pause.signals import TASK_QUEUE +from workflow_pause.signals.workflow import SignalPauseWorkflow + + +async def test_signals_collected_then_done(client: Client, env: WorkflowEnvironment): + async with Worker(client, task_queue=TASK_QUEUE, workflows=[SignalPauseWorkflow]): + handle = await client.start_workflow( + SignalPauseWorkflow.run, + id=f"signals-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + await handle.signal(SignalPauseWorkflow.add_message, "hello") + await handle.signal(SignalPauseWorkflow.add_message, "world") + await handle.signal(SignalPauseWorkflow.add_message, "done") + result = await handle.result() + assert result == ["hello", "world"] diff --git a/tests/workflow_pause/updates_test.py b/tests/workflow_pause/updates_test.py new file mode 100644 index 00000000..510c2baf --- /dev/null +++ b/tests/workflow_pause/updates_test.py @@ -0,0 +1,23 @@ +import uuid + +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from workflow_pause.updates import TASK_QUEUE +from workflow_pause.updates.workflow import UpdatePauseWorkflow + + +async def test_update_accumulates_then_finishes( + client: Client, env: WorkflowEnvironment +): + async with Worker(client, task_queue=TASK_QUEUE, workflows=[UpdatePauseWorkflow]): + handle = await client.start_workflow( + UpdatePauseWorkflow.run, + id=f"updates-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + assert await handle.execute_update(UpdatePauseWorkflow.add, 5) == 5 + assert await handle.execute_update(UpdatePauseWorkflow.add, 3) == 8 + await handle.execute_update(UpdatePauseWorkflow.finish) + assert await handle.result() == 8 diff --git a/workflow_pause/README.md b/workflow_pause/README.md new file mode 100644 index 00000000..b0e6ce47 --- /dev/null +++ b/workflow_pause/README.md @@ -0,0 +1,30 @@ +# Workflow Pause + +These samples demonstrate the experimental **Workflow Pause** feature. Pausing a Workflow Execution +tells the Temporal Service to stop dispatching workflow tasks for it — the workflow makes no forward +progress (timers don't advance, signals buffer, queries and updates are rejected) until it is +**unpaused**. See the [Temporal CLI docs](https://docs.temporal.io/cli/workflow#pause). + +## Prerequisites + +First see the main [README.md](../README.md) for general prerequisites. Then note: + +- Requires **Temporal Server 1.31.0+ / CLI 1.7.1+**. The feature is experimental. +- **Pause must be enabled server-side.** Start your dev server with the pause dynamic-config flag: + + ```bash + temporal server start-dev --dynamic-config-value frontend.WorkflowPauseEnabled=true + ``` + + Without it, `temporal workflow pause` returns + `Error: workflow pause is not enabled for namespace: default`. + +## Samples + +* [basic](basic/) — Dead-simple pause / unpause of a workflow waiting on a timer. +* [activities](activities/) — How pause interacts with in-flight activities, plus activity-level + pause (`temporal activity pause`) to halt retries. +* [signals](signals/) — Signals sent while paused are buffered and processed on unpause. +* [queries](queries/) — Queries are rejected while paused. +* [updates](updates/) — Updates are rejected while paused. +* [cancel_terminate](cancel_terminate/) — Terminate is immediate; cancel is deferred until unpause. diff --git a/workflow_pause/__init__.py b/workflow_pause/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_pause/activities/README.md b/workflow_pause/activities/README.md new file mode 100644 index 00000000..3675ef39 --- /dev/null +++ b/workflow_pause/activities/README.md @@ -0,0 +1,54 @@ +# Workflow Pause: in-flight activities & activity-level pause + +Demonstrates how pause interacts with activities. The workflow runs a single long-running activity +(`process-item`) that heartbeats for ~5 seconds and is configured to fail its first two attempts +before succeeding, so you can observe both kinds of pause. + +> Workflow Pause is experimental. The dev server must be started with the pause flag enabled — +> see the [workflow_pause README](../README.md) for prerequisites. First see the main +> [README.md](../../README.md) for general prerequisites. + +Run the worker in one terminal: + +```bash +uv run workflow_pause/activities/worker.py +``` + +Start the workflow in a second terminal: + +```bash +uv run workflow_pause/activities/starter.py +``` + +## Demo A — pause the workflow while the activity is in flight + +While the worker log shows the activity processing (attempt 1), pause the workflow: + +```bash +temporal workflow pause -w pause-activities-wf --reason demo +``` + +The currently running activity attempt is **not** killed — it runs to its conclusion — but because +the workflow is paused, the next workflow task is not dispatched, so the workflow does not advance. +Unpause to let it continue: + +```bash +temporal workflow unpause -w pause-activities-wf +``` + +## Demo B — pause the activity (halt retries) + +The activity fails its first two attempts, so it enters a retry backoff. Pause the **activity** so +its retries stop: + +```bash +temporal activity pause --activity-id process-item -w pause-activities-wf +``` + +The activity will not be retried while paused. Resume retries with: + +```bash +temporal activity unpause --activity-id process-item -w pause-activities-wf +``` + +On the third attempt the activity succeeds and the workflow completes with `processed widget`. diff --git a/workflow_pause/activities/__init__.py b/workflow_pause/activities/__init__.py new file mode 100644 index 00000000..9df7361c --- /dev/null +++ b/workflow_pause/activities/__init__.py @@ -0,0 +1,3 @@ +TASK_QUEUE = "workflow-pause-activities-task-queue" +WORKFLOW_ID = "pause-activities-wf" +ACTIVITY_ID = "process-item" diff --git a/workflow_pause/activities/activities.py b/workflow_pause/activities/activities.py new file mode 100644 index 00000000..40b836af --- /dev/null +++ b/workflow_pause/activities/activities.py @@ -0,0 +1,24 @@ +import asyncio + +from temporalio import activity + + +@activity.defn +async def process_item(item: str) -> str: + """Long-running activity that heartbeats, and fails its first two attempts. + + The heartbeats + sleep make the activity observably "in flight" so you can + pause the workflow while it runs. The deliberate failures on the first two + attempts let you demonstrate `temporal activity pause`, which halts retries. + """ + info = activity.info() + activity.logger.info("Processing %s (attempt %d)", item, info.attempt) + + for _ in range(5): + await asyncio.sleep(1) + activity.heartbeat() + + if info.attempt < 3: + raise RuntimeError(f"transient failure on attempt {info.attempt}") + + return f"processed {item}" diff --git a/workflow_pause/activities/starter.py b/workflow_pause/activities/starter.py new file mode 100644 index 00000000..a59b7691 --- /dev/null +++ b/workflow_pause/activities/starter.py @@ -0,0 +1,37 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkflowIDReusePolicy +from temporalio.envconfig import ClientConfig + +from workflow_pause.activities import ACTIVITY_ID, TASK_QUEUE, WORKFLOW_ID +from workflow_pause.activities.workflow import ActivityPauseWorkflow + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + handle = await client.start_workflow( + ActivityPauseWorkflow.run, + "widget", + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + print(f"Started workflow with ID: {handle.id}") + print( + f"Pause the workflow: temporal workflow pause -w {WORKFLOW_ID} --reason demo" + ) + print( + f"Pause the activity: temporal activity pause " + f"--activity-id {ACTIVITY_ID} -w {WORKFLOW_ID}" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_pause/activities/worker.py b/workflow_pause/activities/worker.py new file mode 100644 index 00000000..b837de07 --- /dev/null +++ b/workflow_pause/activities/worker.py @@ -0,0 +1,39 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from workflow_pause.activities import TASK_QUEUE +from workflow_pause.activities.activities import process_item +from workflow_pause.activities.workflow import ActivityPauseWorkflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[ActivityPauseWorkflow], + activities=[process_item], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/workflow_pause/activities/workflow.py b/workflow_pause/activities/workflow.py new file mode 100644 index 00000000..ef06dab9 --- /dev/null +++ b/workflow_pause/activities/workflow.py @@ -0,0 +1,37 @@ +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy + +from workflow_pause.activities import ACTIVITY_ID + +with workflow.unsafe.imports_passed_through(): + from workflow_pause.activities.activities import process_item + + +@workflow.defn +class ActivityPauseWorkflow: + """Runs a single long-running, retrying activity. + + Two things to demonstrate: + 1. Pausing the *workflow* while the activity is in flight: the running + activity attempt is not killed, but once it finishes the next workflow + task is not dispatched, so the workflow does not advance until unpause. + 2. Pausing the *activity* with `temporal activity pause`: retries stop + after the current attempt ends, and resume on `temporal activity unpause`. + """ + + @workflow.run + async def run(self, item: str) -> str: + return await workflow.execute_activity( + process_item, + item, + activity_id=ACTIVITY_ID, + start_to_close_timeout=timedelta(seconds=30), + heartbeat_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=3), + backoff_coefficient=2.0, + maximum_attempts=10, + ), + ) diff --git a/workflow_pause/basic/README.md b/workflow_pause/basic/README.md new file mode 100644 index 00000000..ec156460 --- /dev/null +++ b/workflow_pause/basic/README.md @@ -0,0 +1,41 @@ +# Workflow Pause: basic pause / unpause + +The simplest demonstration of [Workflow Pause](https://docs.temporal.io/cli/workflow#pause). +The workflow loops, sleeping on a timer each iteration. While it is **paused** the timer does +not advance and the iteration count stops; **unpausing** resumes it. + +> Workflow Pause is experimental. The dev server must be started with the pause flag enabled — +> see the [workflow_pause README](../README.md) for prerequisites. First see the main +> [README.md](../../README.md) for general prerequisites. + +Run the worker in one terminal: + +```bash +uv run workflow_pause/basic/worker.py +``` + +Start the workflow in a second terminal: + +```bash +uv run workflow_pause/basic/starter.py +``` + +Watch the worker terminal log "Completed iteration N". Now pause it: + +```bash +temporal workflow pause -w pause-basic-wf --reason demo +``` + +The iterations stop — no new "Completed iteration" lines appear. Confirm it is paused: + +```bash +temporal workflow describe -w pause-basic-wf +``` + +The output shows a `Pause Info` section. Now unpause it: + +```bash +temporal workflow unpause -w pause-basic-wf +``` + +The worker resumes logging iterations and the workflow runs to completion. diff --git a/workflow_pause/basic/__init__.py b/workflow_pause/basic/__init__.py new file mode 100644 index 00000000..e381a2e6 --- /dev/null +++ b/workflow_pause/basic/__init__.py @@ -0,0 +1,2 @@ +TASK_QUEUE = "workflow-pause-basic-task-queue" +WORKFLOW_ID = "pause-basic-wf" diff --git a/workflow_pause/basic/starter.py b/workflow_pause/basic/starter.py new file mode 100644 index 00000000..273ab7f6 --- /dev/null +++ b/workflow_pause/basic/starter.py @@ -0,0 +1,32 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkflowIDReusePolicy +from temporalio.envconfig import ClientConfig + +from workflow_pause.basic import TASK_QUEUE, WORKFLOW_ID +from workflow_pause.basic.workflow import BasicPauseWorkflow + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + handle = await client.start_workflow( + BasicPauseWorkflow.run, + 20, + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + print(f"Started workflow with ID: {handle.id}") + print(f"Pause it with: temporal workflow pause -w {WORKFLOW_ID} --reason demo") + print(f"Unpause it with: temporal workflow unpause -w {WORKFLOW_ID}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_pause/basic/worker.py b/workflow_pause/basic/worker.py new file mode 100644 index 00000000..92677189 --- /dev/null +++ b/workflow_pause/basic/worker.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from workflow_pause.basic import TASK_QUEUE +from workflow_pause.basic.workflow import BasicPauseWorkflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker(client, task_queue=TASK_QUEUE, workflows=[BasicPauseWorkflow]): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/workflow_pause/basic/workflow.py b/workflow_pause/basic/workflow.py new file mode 100644 index 00000000..a45b2cd7 --- /dev/null +++ b/workflow_pause/basic/workflow.py @@ -0,0 +1,29 @@ +from datetime import timedelta + +from temporalio import workflow + + +@workflow.defn +class BasicPauseWorkflow: + """A loop that logs progress and sleeps on a timer each iteration. + + While the workflow is paused, no workflow tasks are dispatched, so the + timer does not advance and the iteration count stops moving. Unpausing + lets it resume from where it left off. + """ + + def __init__(self) -> None: + self._completed = 0 + + @workflow.run + async def run(self, iterations: int) -> int: + for i in range(iterations): + workflow.logger.info("Starting iteration %d of %d", i + 1, iterations) + await workflow.sleep(timedelta(seconds=3)) + self._completed += 1 + workflow.logger.info("Completed iteration %d of %d", i + 1, iterations) + return self._completed + + @workflow.query + def completed(self) -> int: + return self._completed diff --git a/workflow_pause/cancel_terminate/README.md b/workflow_pause/cancel_terminate/README.md new file mode 100644 index 00000000..a59d6e3e --- /dev/null +++ b/workflow_pause/cancel_terminate/README.md @@ -0,0 +1,62 @@ +# Workflow Pause: cancel vs terminate on a paused workflow + +Shows how cancel and terminate differ when a workflow is paused: + +- **Terminate** takes effect immediately, even while paused. +- **Cancel** records a `WorkflowExecutionCancelRequested` event, but the workflow stays `Paused` and + its cancellation handling does not run until you **unpause** it. + +> Workflow Pause is experimental. The dev server must be started with the pause flag enabled — +> see the [workflow_pause README](../README.md) for prerequisites. First see the main +> [README.md](../../README.md) for general prerequisites. + +Run the worker in one terminal: + +```bash +uv run workflow_pause/cancel_terminate/worker.py +``` + +## Terminate a paused workflow (immediate) + +Start the workflow in a second terminal: + +```bash +uv run workflow_pause/cancel_terminate/starter.py +``` + +Pause then terminate it — it ends right away: + +```bash +temporal workflow pause -w pause-cancel-terminate-wf --reason demo +temporal workflow terminate -w pause-cancel-terminate-wf +temporal workflow describe -w pause-cancel-terminate-wf # Status: Terminated +``` + +## Cancel a paused workflow (deferred until unpause) + +Start a fresh run (re-running the starter terminates the previous one): + +```bash +uv run workflow_pause/cancel_terminate/starter.py +``` + +Pause it, then request cancellation: + +```bash +temporal workflow pause -w pause-cancel-terminate-wf --reason demo +temporal workflow cancel -w pause-cancel-terminate-wf +``` + +Describe it — the cancel is recorded but the status is still `Paused`: + +```bash +temporal workflow describe -w pause-cancel-terminate-wf # Status: Paused +``` + +Unpause it — now the workflow processes the cancellation (the worker logs +"Cancellation received — running cleanup") and ends as `Canceled`: + +```bash +temporal workflow unpause -w pause-cancel-terminate-wf +temporal workflow describe -w pause-cancel-terminate-wf # Status: Canceled +``` diff --git a/workflow_pause/cancel_terminate/__init__.py b/workflow_pause/cancel_terminate/__init__.py new file mode 100644 index 00000000..836e225f --- /dev/null +++ b/workflow_pause/cancel_terminate/__init__.py @@ -0,0 +1,2 @@ +TASK_QUEUE = "workflow-pause-cancel-terminate-task-queue" +WORKFLOW_ID = "pause-cancel-terminate-wf" diff --git a/workflow_pause/cancel_terminate/starter.py b/workflow_pause/cancel_terminate/starter.py new file mode 100644 index 00000000..93593af9 --- /dev/null +++ b/workflow_pause/cancel_terminate/starter.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkflowIDReusePolicy +from temporalio.envconfig import ClientConfig + +from workflow_pause.cancel_terminate import TASK_QUEUE, WORKFLOW_ID +from workflow_pause.cancel_terminate.workflow import CancelTerminatePauseWorkflow + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + handle = await client.start_workflow( + CancelTerminatePauseWorkflow.run, + 20, + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + print(f"Started workflow with ID: {handle.id}") + print(f"Pause it with: temporal workflow pause -w {WORKFLOW_ID} --reason demo") + print(f"Terminate it with: temporal workflow terminate -w {WORKFLOW_ID}") + print(f"Cancel it with: temporal workflow cancel -w {WORKFLOW_ID}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_pause/cancel_terminate/worker.py b/workflow_pause/cancel_terminate/worker.py new file mode 100644 index 00000000..fcc79813 --- /dev/null +++ b/workflow_pause/cancel_terminate/worker.py @@ -0,0 +1,35 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from workflow_pause.cancel_terminate import TASK_QUEUE +from workflow_pause.cancel_terminate.workflow import CancelTerminatePauseWorkflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker( + client, task_queue=TASK_QUEUE, workflows=[CancelTerminatePauseWorkflow] + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/workflow_pause/cancel_terminate/workflow.py b/workflow_pause/cancel_terminate/workflow.py new file mode 100644 index 00000000..9c5d258c --- /dev/null +++ b/workflow_pause/cancel_terminate/workflow.py @@ -0,0 +1,27 @@ +import asyncio +from datetime import timedelta + +from temporalio import workflow + + +@workflow.defn +class CancelTerminatePauseWorkflow: + """A loop that runs cleanup logic when cancelled. + + On a PAUSED workflow: + - `temporal workflow terminate` takes effect immediately. + - `temporal workflow cancel` records a WorkflowExecutionCancelRequested + event, but the workflow stays Paused and its cancellation handling (the + cleanup below) does not run until the workflow is unpaused. + """ + + @workflow.run + async def run(self, iterations: int) -> str: + try: + for i in range(iterations): + workflow.logger.info("Working, iteration %d", i + 1) + await workflow.sleep(timedelta(seconds=3)) + return "completed" + except asyncio.CancelledError: + workflow.logger.info("Cancellation received — running cleanup") + raise diff --git a/workflow_pause/queries/README.md b/workflow_pause/queries/README.md new file mode 100644 index 00000000..0e6b1f6d --- /dev/null +++ b/workflow_pause/queries/README.md @@ -0,0 +1,46 @@ +# Workflow Pause: queries are rejected while paused + +Queries succeed against a running workflow, but are **rejected** while the workflow is paused. +Unpausing makes it queryable again. + +> Workflow Pause is experimental. The dev server must be started with the pause flag enabled — +> see the [workflow_pause README](../README.md) for prerequisites. First see the main +> [README.md](../../README.md) for general prerequisites. + +Run the worker in one terminal: + +```bash +uv run workflow_pause/queries/worker.py +``` + +Start the workflow in a second terminal: + +```bash +uv run workflow_pause/queries/starter.py +``` + +Query it while running — this returns the current count: + +```bash +temporal workflow query -w pause-queries-wf --type current_count +``` + +Pause it: + +```bash +temporal workflow pause -w pause-queries-wf --reason demo +``` + +Query again — it is now **rejected**: + +```bash +temporal workflow query -w pause-queries-wf --type current_count +# Error: query was rejected, workflow has status: Paused +``` + +Unpause it and the query works again: + +```bash +temporal workflow unpause -w pause-queries-wf +temporal workflow query -w pause-queries-wf --type current_count +``` diff --git a/workflow_pause/queries/__init__.py b/workflow_pause/queries/__init__.py new file mode 100644 index 00000000..4d010f20 --- /dev/null +++ b/workflow_pause/queries/__init__.py @@ -0,0 +1,2 @@ +TASK_QUEUE = "workflow-pause-queries-task-queue" +WORKFLOW_ID = "pause-queries-wf" diff --git a/workflow_pause/queries/starter.py b/workflow_pause/queries/starter.py new file mode 100644 index 00000000..0a757079 --- /dev/null +++ b/workflow_pause/queries/starter.py @@ -0,0 +1,34 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkflowIDReusePolicy +from temporalio.envconfig import ClientConfig + +from workflow_pause.queries import TASK_QUEUE, WORKFLOW_ID +from workflow_pause.queries.workflow import QueryPauseWorkflow + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + handle = await client.start_workflow( + QueryPauseWorkflow.run, + 20, + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + print(f"Started workflow with ID: {handle.id}") + print( + f"Query it with: temporal workflow query -w {WORKFLOW_ID} --type current_count" + ) + print(f"Pause it with: temporal workflow pause -w {WORKFLOW_ID} --reason demo") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_pause/queries/worker.py b/workflow_pause/queries/worker.py new file mode 100644 index 00000000..918ffc1d --- /dev/null +++ b/workflow_pause/queries/worker.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from workflow_pause.queries import TASK_QUEUE +from workflow_pause.queries.workflow import QueryPauseWorkflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker(client, task_queue=TASK_QUEUE, workflows=[QueryPauseWorkflow]): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/workflow_pause/queries/workflow.py b/workflow_pause/queries/workflow.py new file mode 100644 index 00000000..cf3ec05f --- /dev/null +++ b/workflow_pause/queries/workflow.py @@ -0,0 +1,27 @@ +from datetime import timedelta + +from temporalio import workflow + + +@workflow.defn +class QueryPauseWorkflow: + """A loop exposing its progress via a query. + + Queries succeed while the workflow is running, but are REJECTED while it is + paused with: `query was rejected, workflow has status: Paused`. Unpausing + makes the workflow queryable again. + """ + + def __init__(self) -> None: + self._count = 0 + + @workflow.run + async def run(self, iterations: int) -> int: + for _ in range(iterations): + await workflow.sleep(timedelta(seconds=3)) + self._count += 1 + return self._count + + @workflow.query + def current_count(self) -> int: + return self._count diff --git a/workflow_pause/signals/README.md b/workflow_pause/signals/README.md new file mode 100644 index 00000000..dfe68c7d --- /dev/null +++ b/workflow_pause/signals/README.md @@ -0,0 +1,54 @@ +# Workflow Pause: signals are buffered while paused + +A signal sent to a **paused** workflow is accepted and recorded in history, but its handler does not +run until the workflow is **unpaused** — then the buffered signals are processed in order. + +> Workflow Pause is experimental. The dev server must be started with the pause flag enabled — +> see the [workflow_pause README](../README.md) for prerequisites. First see the main +> [README.md](../../README.md) for general prerequisites. + +Run the worker in one terminal: + +```bash +uv run workflow_pause/signals/worker.py +``` + +Start the workflow in a second terminal: + +```bash +uv run workflow_pause/signals/starter.py +``` + +Pause the workflow: + +```bash +temporal workflow pause -w pause-signals-wf --reason demo +``` + +Now send a signal while paused — it succeeds (it is recorded in history): + +```bash +temporal workflow signal -w pause-signals-wf --name add_message --input '"hello"' +``` + +But the handler has **not** run yet. Confirm by querying once unpaused (queries are rejected while +paused — see the `queries` sample). Unpause: + +```bash +temporal workflow unpause -w pause-signals-wf +``` + +The worker now logs `Received message: hello` — the buffered signal was processed on unpause. +Confirm the buffered message landed by querying the workflow (queries work again now that it is +unpaused): + +```bash +temporal workflow query -w pause-signals-wf --type messages +# ["hello"] +``` + +Send a `"done"` signal to let the workflow complete: + +```bash +temporal workflow signal -w pause-signals-wf --name add_message --input '"done"' +``` diff --git a/workflow_pause/signals/__init__.py b/workflow_pause/signals/__init__.py new file mode 100644 index 00000000..750b8248 --- /dev/null +++ b/workflow_pause/signals/__init__.py @@ -0,0 +1,2 @@ +TASK_QUEUE = "workflow-pause-signals-task-queue" +WORKFLOW_ID = "pause-signals-wf" diff --git a/workflow_pause/signals/starter.py b/workflow_pause/signals/starter.py new file mode 100644 index 00000000..f80d0cc3 --- /dev/null +++ b/workflow_pause/signals/starter.py @@ -0,0 +1,34 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkflowIDReusePolicy +from temporalio.envconfig import ClientConfig + +from workflow_pause.signals import TASK_QUEUE, WORKFLOW_ID +from workflow_pause.signals.workflow import SignalPauseWorkflow + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + handle = await client.start_workflow( + SignalPauseWorkflow.run, + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + print(f"Started workflow with ID: {handle.id}") + print(f"Pause it with: temporal workflow pause -w {WORKFLOW_ID} --reason demo") + print( + f"Signal it with: temporal workflow signal -w {WORKFLOW_ID} " + f"--name add_message --input '\"hello\"'" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_pause/signals/worker.py b/workflow_pause/signals/worker.py new file mode 100644 index 00000000..9cfdb881 --- /dev/null +++ b/workflow_pause/signals/worker.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from workflow_pause.signals import TASK_QUEUE +from workflow_pause.signals.workflow import SignalPauseWorkflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker(client, task_queue=TASK_QUEUE, workflows=[SignalPauseWorkflow]): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/workflow_pause/signals/workflow.py b/workflow_pause/signals/workflow.py new file mode 100644 index 00000000..53eac33f --- /dev/null +++ b/workflow_pause/signals/workflow.py @@ -0,0 +1,32 @@ +from temporalio import workflow + + +@workflow.defn +class SignalPauseWorkflow: + """Collects messages delivered by signal until a "done" signal arrives. + + Signals sent while the workflow is paused are accepted and recorded in + history, but the signal handler does not run until the workflow is + unpaused — at which point the buffered signals are processed in order. + """ + + def __init__(self) -> None: + self._messages: list[str] = [] + self._done = False + + @workflow.run + async def run(self) -> list[str]: + await workflow.wait_condition(lambda: self._done) + return self._messages + + @workflow.signal + async def add_message(self, message: str) -> None: + if message == "done": + self._done = True + return + workflow.logger.info("Received message: %s", message) + self._messages.append(message) + + @workflow.query + def messages(self) -> list[str]: + return self._messages diff --git a/workflow_pause/updates/README.md b/workflow_pause/updates/README.md new file mode 100644 index 00000000..87752572 --- /dev/null +++ b/workflow_pause/updates/README.md @@ -0,0 +1,47 @@ +# Workflow Pause: updates are rejected while paused + +An update issued against a **paused** workflow is rejected immediately. Unpausing lets updates be +admitted and executed again. + +> Workflow Pause is experimental. The dev server must be started with the pause flag enabled — +> see the [workflow_pause README](../README.md) for prerequisites. First see the main +> [README.md](../../README.md) for general prerequisites. + +Run the worker in one terminal: + +```bash +uv run workflow_pause/updates/worker.py +``` + +Start the workflow in a second terminal: + +```bash +uv run workflow_pause/updates/starter.py +``` + +Send an update while running — it returns the new total: + +```bash +temporal workflow update execute -w pause-updates-wf --name add --input 5 +``` + +Pause it: + +```bash +temporal workflow pause -w pause-updates-wf --reason demo +``` + +Send an update while paused — it is **rejected** immediately: + +```bash +temporal workflow update execute -w pause-updates-wf --name add --input 5 +# Error: unable to update workflow: Workflow is paused. Cannot update the workflow. +``` + +Unpause it and updates work again. Finish the workflow with the `finish` update: + +```bash +temporal workflow unpause -w pause-updates-wf +temporal workflow update execute -w pause-updates-wf --name add --input 3 +temporal workflow update execute -w pause-updates-wf --name finish +``` diff --git a/workflow_pause/updates/__init__.py b/workflow_pause/updates/__init__.py new file mode 100644 index 00000000..b5bdde06 --- /dev/null +++ b/workflow_pause/updates/__init__.py @@ -0,0 +1,2 @@ +TASK_QUEUE = "workflow-pause-updates-task-queue" +WORKFLOW_ID = "pause-updates-wf" diff --git a/workflow_pause/updates/starter.py b/workflow_pause/updates/starter.py new file mode 100644 index 00000000..860f066f --- /dev/null +++ b/workflow_pause/updates/starter.py @@ -0,0 +1,34 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.common import WorkflowIDReusePolicy +from temporalio.envconfig import ClientConfig + +from workflow_pause.updates import TASK_QUEUE, WORKFLOW_ID +from workflow_pause.updates.workflow import UpdatePauseWorkflow + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + handle = await client.start_workflow( + UpdatePauseWorkflow.run, + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + print(f"Started workflow with ID: {handle.id}") + print( + f"Update it with: temporal workflow update execute -w {WORKFLOW_ID} " + f"--name add --input 5" + ) + print(f"Pause it with: temporal workflow pause -w {WORKFLOW_ID} --reason demo") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_pause/updates/worker.py b/workflow_pause/updates/worker.py new file mode 100644 index 00000000..ac12a767 --- /dev/null +++ b/workflow_pause/updates/worker.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from workflow_pause.updates import TASK_QUEUE +from workflow_pause.updates.workflow import UpdatePauseWorkflow + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker(client, task_queue=TASK_QUEUE, workflows=[UpdatePauseWorkflow]): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/workflow_pause/updates/workflow.py b/workflow_pause/updates/workflow.py new file mode 100644 index 00000000..a04ced4f --- /dev/null +++ b/workflow_pause/updates/workflow.py @@ -0,0 +1,34 @@ +from temporalio import workflow + + +@workflow.defn +class UpdatePauseWorkflow: + """Maintains a running total adjusted via updates. + + An update issued against a paused workflow is REJECTED immediately with: + `Workflow is paused. Cannot update the workflow.` Unpausing lets updates be + admitted and executed again. + """ + + def __init__(self) -> None: + self._total = 0 + self._finished = False + + @workflow.run + async def run(self) -> int: + await workflow.wait_condition(lambda: self._finished) + return self._total + + @workflow.update + async def add(self, amount: int) -> int: + self._total += amount + workflow.logger.info("Total is now %d", self._total) + return self._total + + @workflow.update + async def finish(self) -> None: + self._finished = True + + @workflow.query + def total(self) -> int: + return self._total