Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions langgraph_plugin/graph_api/human_in_the_loop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Demonstrates pausing a graph with LangGraph's `interrupt()` and waiting indefini
1. The Workflow starts and the `generate_draft` node produces a response.
2. The `human_review` node calls `interrupt(draft)`, pausing execution.
3. The Workflow stores the draft (visible via the query) and calls `workflow.wait_condition()` — blocking durably until the signal sets `_human_input`. This can wait indefinitely; Temporal persists the state.
4. An external process (UI, CLI, etc.) queries the draft and sends approval via signal.
4. An external process (UI, CLI, etc.) queries the draft and sends the human's feedback via signal.
5. The graph resumes — `interrupt()` returns the signal value and the node completes.

## Running the Sample
Expand All @@ -25,14 +25,16 @@ Prerequisites: `uv sync --group langgraph` and a running Temporal dev server (`t
# Terminal 1: start the worker
uv run langgraph_plugin/graph_api/human_in_the_loop/run_worker.py

# Terminal 2: start the workflow (polls for draft, then auto-approves)
# Terminal 2: start the workflow (polls for the draft, then prompts you for feedback)
uv run langgraph_plugin/graph_api/human_in_the_loop/run_workflow.py
```

When the draft is ready, you'll be prompted at the terminal. Type `approve` to accept it as-is, or type revision feedback and the draft will be regenerated by an LLM incorporating your notes.

## Files

| File | Description |
|------|-------------|
| `workflow.py` | Graph node functions, graph definition, and `ChatbotWorkflow` definition |
| `run_worker.py` | Builds graph, registers with `LangGraphPlugin`, starts worker |
| `run_workflow.py` | Starts workflow, polls draft via query, sends approval via signal |
| `run_workflow.py` | Starts workflow, polls draft via query, prompts for human feedback, sends it via signal |
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ async def main() -> None:

print(f"Draft for review: {draft}")

# Send approval via signal (a UI would trigger this)
await handle.signal(ChatbotWorkflow.provide_feedback, "approve")
# Prompt for human feedback instead of auto-approving.
feedback = await asyncio.to_thread(
input, "Enter 'approve' to accept, or type revision feedback: "
)
await handle.signal(ChatbotWorkflow.provide_feedback, feedback)

result = await handle.result()
print(f"Final response: {result}")
Expand Down
23 changes: 14 additions & 9 deletions langgraph_plugin/graph_api/human_in_the_loop/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from datetime import timedelta

from langchain.chat_models import init_chat_model
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, StateGraph
Expand All @@ -20,21 +21,25 @@ class State(TypedDict):


async def generate_draft(state: State) -> dict[str, str]:
"""Generate a draft response. Replace with an LLM call in production."""
return {
"value": (
f"Here's my response to '{state['value']}': "
"The answer is 42. Let me know if this helps!"
)
}
"""Generate a draft response with an LLM."""
response = await init_chat_model("claude-sonnet-4-6").ainvoke(
f"Please respond concisely to: {state['value']}"
)
return {"value": str(response.content)}


async def human_review(state: State) -> dict[str, str]:
"""Present draft to human for review via interrupt."""
"""Present draft to human for review via interrupt; revise with LLM on feedback."""
feedback = interrupt(state["value"])
if feedback == "approve":
return {"value": state["value"]}
return {"value": f"[Revised] {state['value']} (incorporating feedback: {feedback})"}
response = await init_chat_model("claude-sonnet-4-6").ainvoke(
"Revise the following draft according to the reviewer's feedback. "
"Output only the revised draft, with no preamble.\n\n"
f"Draft:\n{state['value']}\n\n"
f"Feedback:\n{feedback}"
)
return {"value": str(response.content)}


def make_chatbot_graph() -> StateGraph:
Expand Down
131 changes: 80 additions & 51 deletions tests/langgraph_plugin/human_in_the_loop_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import sys
import uuid
from unittest.mock import patch

import pytest
from temporalio.client import Client
Expand All @@ -18,36 +19,59 @@
)


class _FakeMessage:
def __init__(self, content: str) -> None:
self.content = content


class _EchoModel:
"""Stand-in for a chat model that echoes the prompt back as its response."""

async def ainvoke(self, prompt: str) -> _FakeMessage:
return _FakeMessage(prompt)


def _fake_init_chat_model(*args: object, **kwargs: object) -> _EchoModel:
return _EchoModel()


_patch_llm = lambda: patch(
"langgraph_plugin.graph_api.human_in_the_loop.workflow.init_chat_model",
_fake_init_chat_model,
)


async def test_human_in_the_loop_approve(client: Client) -> None:
task_queue = f"hitl-test-{uuid.uuid4()}"
plugin = LangGraphPlugin(graphs={"chatbot": make_chatbot_graph()})

async with Worker(
client,
task_queue=task_queue,
workflows=[ChatbotWorkflow],
plugins=[plugin],
):
handle = await client.start_workflow(
ChatbotWorkflow.run,
"test message",
id=f"hitl-{uuid.uuid4()}",
with _patch_llm():
async with Worker(
client,
task_queue=task_queue,
)

# Poll for draft to be ready
draft = None
for _ in range(40):
await asyncio.sleep(0.25)
draft = await handle.query(ChatbotWorkflow.get_draft)
if draft is not None:
break
assert draft is not None
assert "test message" in draft

# Approve
await handle.signal(ChatbotWorkflow.provide_feedback, "approve")
result = await handle.result()
workflows=[ChatbotWorkflow],
plugins=[plugin],
):
handle = await client.start_workflow(
ChatbotWorkflow.run,
"test message",
id=f"hitl-{uuid.uuid4()}",
task_queue=task_queue,
)

# Poll for draft to be ready
draft = None
for _ in range(40):
await asyncio.sleep(0.25)
draft = await handle.query(ChatbotWorkflow.get_draft)
if draft is not None:
break
assert draft is not None
assert "test message" in draft

# Approve
await handle.signal(ChatbotWorkflow.provide_feedback, "approve")
result = await handle.result()

assert result == draft # approved draft returned as-is

Expand All @@ -56,31 +80,36 @@ async def test_human_in_the_loop_revise(client: Client) -> None:
task_queue = f"hitl-revise-test-{uuid.uuid4()}"
plugin = LangGraphPlugin(graphs={"chatbot": make_chatbot_graph()})

async with Worker(
client,
task_queue=task_queue,
workflows=[ChatbotWorkflow],
plugins=[plugin],
):
handle = await client.start_workflow(
ChatbotWorkflow.run,
"test message",
id=f"hitl-revise-{uuid.uuid4()}",
with _patch_llm():
async with Worker(
client,
task_queue=task_queue,
)

# Poll for draft
draft = None
for _ in range(40):
await asyncio.sleep(0.25)
draft = await handle.query(ChatbotWorkflow.get_draft)
if draft is not None:
break
assert draft is not None

# Send revision feedback
await handle.signal(ChatbotWorkflow.provide_feedback, "please be more concise")
result = await handle.result()

assert "[Revised]" in result
workflows=[ChatbotWorkflow],
plugins=[plugin],
):
handle = await client.start_workflow(
ChatbotWorkflow.run,
"test message",
id=f"hitl-revise-{uuid.uuid4()}",
task_queue=task_queue,
)

# Poll for draft
draft = None
for _ in range(40):
await asyncio.sleep(0.25)
draft = await handle.query(ChatbotWorkflow.get_draft)
if draft is not None:
break
assert draft is not None

# Send revision feedback
await handle.signal(
ChatbotWorkflow.provide_feedback, "please be more concise"
)
result = await handle.result()

# The revision node feeds the draft and feedback into the LLM; the echo
# stand-in returns the revision prompt, which contains both.
assert "please be more concise" in result
assert "test message" in result
Loading