From 7d75e6569d1604bec304d6fc052fb34c509d0cc1 Mon Sep 17 00:00:00 2001 From: songyichun1 Date: Wed, 24 Jun 2026 16:00:59 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=E6=94=AF=E6=8C=81=E8=B0=83=E7=94=A8oa?= =?UTF-8?q?uth=E9=89=B4=E6=9D=83=E7=B1=BB=E5=9E=8Bagent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/content/docs/cli/harness-cli.en.mdx | 17 ++ docs/content/docs/cli/harness-cli.mdx | 6 + tests/a2a/test_registry_client.py | 112 ++++++++++ veadk/a2a/registry_client.py | 254 +++++++++++++++++++++-- 4 files changed, 371 insertions(+), 18 deletions(-) diff --git a/docs/content/docs/cli/harness-cli.en.mdx b/docs/content/docs/cli/harness-cli.en.mdx index 98a2f112..1bc27a79 100644 --- a/docs/content/docs/cli/harness-cli.en.mdx +++ b/docs/content/docs/cli/harness-cli.en.mdx @@ -178,6 +178,23 @@ configures the runtime's gateway authorizer. in `harness.json` (no key). Calling such a runtime requires your own `Authorization: Bearer ` header — the CLI does not mint it. +### Calling OAuth remote agents from A2A Registry + +When a harness enables the `agentkit_a2a` registry, the runtime dynamically +discovers agents through `SearchAgentCards` / `GetA2aAgent` and mounts them as +per-turn `remote_a2a_*` tools. If a matched remote AgentCard uses `apiKey` +auth, the runtime injects the declared header from `securitySchemes`. If it +uses `oauth2` with `clientCredentials.tokenUrl`, the runtime parses the +Volcengine Identity UserPool from the token URL, finds its +`MACHINE_TO_MACHINE` client, exchanges `client_credentials` for an access token, +and calls the remote agent's `message/send` / `tasks/get` with +`Authorization: Bearer `. + +Normally this only requires the harness runtime to have permission to call the +AgentKit A2A Registry and Identity OpenAPI. If the registry endpoint is custom +and cannot also serve Identity OpenAPI requests, set `REGISTRY_ID_ENDPOINT` or +`AGENTKIT_ID_ENDPOINT` to the Identity OpenAPI endpoint. + ## `veadk harness invoke` Invoke a **deployed** harness and print its output. The `url` and `key` are resolved from `harness.json` (written by `deploy`) by `--name`, so you need not pass them explicitly. diff --git a/docs/content/docs/cli/harness-cli.mdx b/docs/content/docs/cli/harness-cli.mdx index d169134b..afe2ca85 100644 --- a/docs/content/docs/cli/harness-cli.mdx +++ b/docs/content/docs/cli/harness-cli.mdx @@ -169,6 +169,12 @@ auth: - 用户池、客户端、外部身份提供商(如飞书)在 **Identity 控制台**一次性建好,CLI 只**引用** `discovery_url` + `allowed_ids`,**不涉及任何 secret**。 - custom_jwt 部署后 `harness.json` 记录 `{url, runtime_id, auth_type, discovery_url, allowed_ids}`(无 key)。调用该 runtime 需自带 `Authorization: Bearer <用户池签发的 JWT>`,CLI 不代为获取。 +### A2A Registry 调用 OAuth remote-agent + +当 Harness 启用 `agentkit_a2a` registry 后,运行时会通过 `SearchAgentCards` / `GetA2aAgent` 动态发现并挂载 `remote_a2a_*` 工具。命中的远程 Agent 如果是 `apiKey` 鉴权,会按 AgentCard 的 `securitySchemes` 注入对应 header;如果是 `oauth2` 且声明了 `clientCredentials.tokenUrl`,运行时会自动从 token URL 解析火山引擎 Identity UserPool,查询其中的 `MACHINE_TO_MACHINE` client,使用 `client_credentials` 换取 access token,并以 `Authorization: Bearer ` 调用远程 Agent 的 `message/send` / `tasks/get`。 + +通常只需要确保 Harness Runtime 有权限调用 AgentKit A2A Registry 和 Identity OpenAPI。若 registry endpoint 使用了自定义域名且不能同时访问 Identity OpenAPI,可通过 `REGISTRY_ID_ENDPOINT` 或 `AGENTKIT_ID_ENDPOINT` 指定 Identity OpenAPI 地址。 + ## `veadk harness invoke` 调用一个**已部署**的 harness 并打印输出。`url` 与 `key` 默认按 `--name` 从 `harness.json`(由 `deploy` 写入)解析,无需显式传入。 diff --git a/tests/a2a/test_registry_client.py b/tests/a2a/test_registry_client.py index 8fd3315e..b6f916bb 100644 --- a/tests/a2a/test_registry_client.py +++ b/tests/a2a/test_registry_client.py @@ -22,6 +22,7 @@ from veadk.a2a.registry_client import ( AgentKitA2ARegistryConfig, RegistryError, + _OAUTH_TOKEN_CACHE, _agent_auth_headers, _volc_sign_v4, create_task, @@ -68,6 +69,41 @@ def _agent_card() -> dict: } +def _oauth_agent_card() -> dict: + token_url = ( + "https://userpool-61597ac7-4bcb-4acf-a1d8-fdbfb95333ad." + "userpool.auth.id.cn-beijing.volces.com/oauth/token" + ) + return { + "name": "Finance Policy Remote Agent", + "description": "Finance policy agent", + "version": "1.0.0", + "url": " https://oauth-agent.test/a2a/ ", + "security": [{"oauth2": []}], + "securitySchemes": { + "oauth2": { + "type": "oauth2", + "description": "OAuth2 client credentials flow", + "flows": { + "clientCredentials": { + "tokenUrl": f" `{token_url}` ", + "refreshUrl": f" `{token_url}` ", + "scopes": {}, + } + }, + } + }, + "skills": [ + { + "id": "finance-policy", + "name": "Finance policy", + "description": "Answer finance policy questions", + "tags": ["finance", "policy"], + } + ], + } + + @patch.dict( "os.environ", { @@ -190,6 +226,82 @@ def test_create_task_gets_agent_and_sends_message(post: Mock): assert "Authorization" not in serialized +@patch.dict( + "os.environ", + { + "AGENTKIT_ACCESS_KEY": "ak-test", + "AGENTKIT_SECRET_KEY": "sk-test", + }, + clear=False, +) +@patch("veadk.a2a.registry_client.requests.post") +def test_create_task_gets_oauth_agent_token_and_sends_message(post: Mock): + _OAUTH_TOKEN_CACHE.clear() + card = _oauth_agent_card() + post.side_effect = [ + _mock_response( + { + "ResponseMetadata": {"RequestId": "get-req"}, + "Result": { + "Id": "agent-id", + "Status": "running", + "AgentCard": json.dumps(card), + }, + } + ), + _mock_response( + { + "ResponseMetadata": {"RequestId": "list-client-req"}, + "Result": {"Data": [{"Uid": "m2m-client-id"}]}, + } + ), + _mock_response( + { + "ResponseMetadata": {"RequestId": "get-client-req"}, + "Result": {"ClientSecret": "m2m-client-secret"}, + } + ), + _mock_response({"access_token": "oauth-access-token", "expires_in": 3600}), + _mock_response( + { + "result": { + "kind": "message", + "parts": [{"kind": "text", "text": "需要财务审批。"}], + } + } + ), + ] + + result = create_task( + "Finance Policy Remote Agent", + "这笔支出是否需要审批?", + config=AgentKitA2ARegistryConfig( + space_id="space-test", + endpoint="https://open.volcengineapi.com/", + ), + ) + + assert result["outcome"] == "success" + assert result["selected_agent"]["name"] == "Finance Policy Remote Agent" + assert result["response"]["text"] == "需要财务审批。" + + assert post.call_args_list[0].kwargs["params"]["Action"] == "GetA2aAgent" + assert post.call_args_list[1].kwargs["params"]["Action"] == "ListUserPoolClients" + assert post.call_args_list[2].kwargs["params"]["Action"] == "GetUserPoolClient" + assert post.call_args_list[3].args[0].endswith("/oauth/token") + assert post.call_args_list[3].kwargs["headers"]["Authorization"].startswith( + "Basic " + ) + assert post.call_args_list[4].args[0] == "https://oauth-agent.test/a2a/" + assert post.call_args_list[4].kwargs["headers"]["Authorization"] == ( + "Bearer oauth-access-token" + ) + + serialized = json.dumps(result, ensure_ascii=False) + assert "oauth-access-token" not in serialized + assert "m2m-client-secret" not in serialized + + @patch.dict( "os.environ", { diff --git a/veadk/a2a/registry_client.py b/veadk/a2a/registry_client.py index b058984c..daf6a252 100644 --- a/veadk/a2a/registry_client.py +++ b/veadk/a2a/registry_client.py @@ -14,16 +14,18 @@ from __future__ import annotations +import base64 import hashlib import hmac import json import os +import re import time import uuid from dataclasses import dataclass from datetime import datetime, timezone from typing import Any -from urllib.parse import quote, urlparse +from urllib.parse import quote, urlparse, urlunparse import requests @@ -38,6 +40,7 @@ DEFAULT_POLL_INTERVAL_MS = 5000 SEARCH_PROMPT_MAX_BYTES = 2048 TERMINAL_STATES = {"completed", "failed", "canceled", "rejected"} +_OAUTH_TOKEN_CACHE: dict[str, tuple[str, float]] = {} class RegistryError(Exception): @@ -331,15 +334,47 @@ def _request_id(response: dict[str, Any]) -> str | None: def _agentkit_post( config: AgentKitA2ARegistryConfig, action: str, body: dict[str, Any] +) -> tuple[dict[str, Any], int]: + return _signed_openapi_post( + config=config, + endpoint=config.endpoint, + service_name=config.service_name, + action=action, + version=config.version, + body=body, + ) + + +def _identity_post( + config: AgentKitA2ARegistryConfig, action: str, body: dict[str, Any] +) -> tuple[dict[str, Any], int]: + return _signed_openapi_post( + config=config, + endpoint=_identity_endpoint(config), + service_name="id", + action=action, + version=config.version, + body=body, + ) + + +def _signed_openapi_post( + *, + config: AgentKitA2ARegistryConfig, + endpoint: str, + service_name: str, + action: str, + version: str, + body: dict[str, Any], ) -> tuple[dict[str, Any], int]: _require_space_id(config) credentials = _resolve_credentials() started = time.monotonic() body_str = json.dumps(body, ensure_ascii=False) body_bytes = body_str.encode("utf-8") - parsed = urlparse(config.endpoint) + parsed = urlparse(endpoint) path = parsed.path or "/" - query = {"Action": action, "Version": config.version} + query = {"Action": action, "Version": version} headers_to_sign = { "Host": parsed.netloc, "Content-Type": "application/json", @@ -347,7 +382,7 @@ def _agentkit_post( auth_headers = _volc_sign_v4( access_key=credentials.access_key, secret_key=credentials.secret_key, - service=config.service_name, + service=service_name, region=config.region, method="POST", path=path, @@ -366,7 +401,7 @@ def _agentkit_post( response = None try: response = requests.post( - config.endpoint, + endpoint, params=query, headers=request_headers, data=body_bytes, @@ -400,6 +435,21 @@ def _agentkit_post( return data, duration_ms +def _identity_endpoint(config: AgentKitA2ARegistryConfig) -> str: + explicit = _first_env( + ["REGISTRY_ID_ENDPOINT", "AGENTKIT_ID_ENDPOINT", "ID_OPENAPI_ENDPOINT"] + ) + if explicit: + return explicit + + parsed = urlparse(config.endpoint) + host = parsed.netloc + if host.startswith("agentkit."): + host = "id." + host.split(".", 1)[1] + return urlunparse(parsed._replace(netloc=host)) + return config.endpoint + + def _agentkit_http_diagnostics( exc: requests.RequestException, response: requests.Response | None, @@ -459,6 +509,8 @@ def _get_a2a_agent( card = _parse_json_object( result.get("AgentCard"), "AGENT_CARD_PARSE_FAILED", "Result.AgentCard" ) + if "url" in card: + card["url"] = _clean_config_url(card.get("url", "")) if not card.get("url"): raise RegistryError( "AGENT_URL_MISSING", f"Agent {agent_name} AgentCard missing url" @@ -486,7 +538,7 @@ def _send_message( card["url"], "message/send", {"message": message, "configuration": {"blocking": False}}, - _agent_auth_headers(card), + _agent_auth_headers(card, config), config, ) except RegistryError as exc: @@ -514,7 +566,7 @@ def _poll_card( card["url"], "tasks/get", {"id": task_id.strip(), "historyLength": max(0, int(history_length))}, - _agent_auth_headers(card), + _agent_auth_headers(card, config), config, ) state = _task_state(a2a_result) @@ -761,7 +813,9 @@ def _sanitize_get_agent_result( } -def _agent_auth_headers(card: dict[str, Any]) -> dict[str, str]: +def _agent_auth_headers( + card: dict[str, Any], config: AgentKitA2ARegistryConfig | None = None +) -> dict[str, str]: security = card.get("security") or [] schemes = card.get("securitySchemes") or {} headers: dict[str, str] = {} @@ -771,16 +825,23 @@ def _agent_auth_headers(card: dict[str, Any]) -> dict[str, str]: continue for scheme_name, credentials in requirement.items(): scheme = schemes.get(scheme_name) or {} - if scheme.get("type") != "apiKey" or scheme.get("in") != "header": - continue - header_name = scheme.get("name") or "Authorization" - token = ( - credentials[0] - if isinstance(credentials, list) and credentials - else credentials - ) - if isinstance(token, str) and token: - headers[header_name] = token + scheme_type = str(scheme.get("type") or "").lower() + if scheme_type == "apikey" and scheme.get("in") == "header": + header_name = scheme.get("name") or "Authorization" + token = ( + credentials[0] + if isinstance(credentials, list) and credentials + else credentials + ) + if isinstance(token, str) and token: + headers[header_name] = token + elif scheme_type == "oauth2": + headers["Authorization"] = ( + "Bearer " + + _oauth2_client_credentials_token( + scheme, _resolve_config(config) + ) + ) if security and not headers: raise RegistryError( @@ -790,6 +851,163 @@ def _agent_auth_headers(card: dict[str, Any]) -> dict[str, str]: return headers +def _oauth2_client_credentials_token( + scheme: dict[str, Any], config: AgentKitA2ARegistryConfig +) -> str: + token_url = _oauth2_token_url(scheme) + if not token_url: + raise RegistryError( + "AGENT_OAUTH_CONFIG_INVALID", + "OAuth2 AgentCard missing clientCredentials tokenUrl", + ) + + cached = _OAUTH_TOKEN_CACHE.get(token_url) + now = time.time() + if cached and cached[1] > now: + return cached[0] + + user_pool_id = _user_pool_id_from_token_url(token_url) + if not user_pool_id: + raise RegistryError( + "AGENT_OAUTH_CONFIG_INVALID", + "OAuth2 tokenUrl does not contain a Volcengine user pool id", + {"token_url_host": urlparse(token_url).netloc}, + ) + + clients_response, _ = _identity_post( + config, + "ListUserPoolClients", + { + "UserPoolUid": user_pool_id, + "PageSize": 10, + "PageNumber": 1, + "Filter": {"ClientTypes": ["MACHINE_TO_MACHINE"]}, + }, + ) + clients_result = clients_response.get("Result") or {} + clients = clients_result.get("Data") or clients_result.get("Items") or [] + if not clients: + raise RegistryError( + "AGENT_OAUTH_CLIENT_MISSING", + "OAuth2 user pool has no MACHINE_TO_MACHINE client", + {"user_pool_id": user_pool_id}, + ) + + client_uid = str(clients[0].get("Uid") or clients[0].get("ClientUid") or "") + if not client_uid: + raise RegistryError( + "AGENT_OAUTH_CLIENT_INVALID", + "OAuth2 MACHINE_TO_MACHINE client response missing Uid", + ) + + client_response, _ = _identity_post( + config, + "GetUserPoolClient", + {"UserPoolUid": user_pool_id, "ClientUid": client_uid}, + ) + client_result = client_response.get("Result") or {} + client_secret = client_result.get("ClientSecret") or client_result.get( + "Secret" + ) + if not client_secret: + raise RegistryError( + "AGENT_OAUTH_CLIENT_INVALID", + "OAuth2 MACHINE_TO_MACHINE client response missing ClientSecret", + ) + + token_response = _fetch_oauth_access_token( + token_url, client_uid, str(client_secret), config + ) + access_token = token_response.get("access_token") + if not isinstance(access_token, str) or not access_token: + raise RegistryError( + "AGENT_OAUTH_TOKEN_INVALID", + "OAuth2 token endpoint response missing access_token", + ) + + expires_in = token_response.get("expires_in", 3600) + try: + ttl = max(60, int(expires_in) - 60) + except (TypeError, ValueError): + ttl = 3540 + _OAUTH_TOKEN_CACHE[token_url] = (access_token, now + ttl) + return access_token + + +def _oauth2_token_url(scheme: dict[str, Any]) -> str: + flows = scheme.get("flows") or {} + client_credentials = flows.get("clientCredentials") or flows.get( + "client_credentials" + ) + if not isinstance(client_credentials, dict): + return "" + return _clean_config_url( + client_credentials.get("tokenUrl") + or client_credentials.get("token_url") + or client_credentials.get("refreshUrl") + or client_credentials.get("refresh_url") + or "" + ) + + +def _clean_config_url(value: Any) -> str: + if value is None: + return "" + cleaned = str(value).strip() + while cleaned and cleaned[0] in {"`", '"', "'"}: + cleaned = cleaned[1:].strip() + while cleaned and cleaned[-1] in {"`", '"', "'"}: + cleaned = cleaned[:-1].strip() + return cleaned + + +def _user_pool_id_from_token_url(token_url: str) -> str: + host = urlparse(token_url).netloc + match = re.match(r"userpool-([^.]+)\.userpool\.auth\.id\.", host) + return match.group(1) if match else "" + + +def _fetch_oauth_access_token( + token_url: str, + client_id: str, + client_secret: str, + config: AgentKitA2ARegistryConfig, +) -> dict[str, Any]: + credentials = f"{client_id}:{client_secret}".encode("utf-8") + encoded_credentials = base64.b64encode(credentials).decode("ascii") + response = None + try: + response = requests.post( + token_url, + headers={ + "Authorization": f"Basic {encoded_credentials}", + "Content-Type": "application/x-www-form-urlencoded", + }, + data={"grant_type": "client_credentials"}, + timeout=_timeout_seconds(config), + ) + response.raise_for_status() + data = response.json() + except requests.RequestException as exc: + raise RegistryError( + "AGENT_OAUTH_TOKEN_FAILED", + f"OAuth2 token request failed: {exc}", + _http_response_diagnostics(exc, response), + ) from exc + except ValueError as exc: + raise RegistryError( + "AGENT_OAUTH_TOKEN_INVALID", + "OAuth2 token endpoint returned non-JSON response", + ) from exc + + if not isinstance(data, dict): + raise RegistryError( + "AGENT_OAUTH_TOKEN_INVALID", + "OAuth2 token endpoint response is not an object", + ) + return data + + def _text_from_parts(parts: list[Any]) -> str: texts: list[str] = [] for part in parts: From 23021b474d1ad7a50f34cd668dcfd239e1482001 Mon Sep 17 00:00:00 2001 From: songyichun1 Date: Wed, 24 Jun 2026 16:33:03 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=E5=9B=BA=E5=AE=9Aoauth=E8=AE=BF?= =?UTF-8?q?=E9=97=AEid-api=E7=9A=84version?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/a2a/test_registry_client.py | 2 ++ veadk/a2a/registry_client.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/a2a/test_registry_client.py b/tests/a2a/test_registry_client.py index b6f916bb..364159ee 100644 --- a/tests/a2a/test_registry_client.py +++ b/tests/a2a/test_registry_client.py @@ -287,7 +287,9 @@ def test_create_task_gets_oauth_agent_token_and_sends_message(post: Mock): assert post.call_args_list[0].kwargs["params"]["Action"] == "GetA2aAgent" assert post.call_args_list[1].kwargs["params"]["Action"] == "ListUserPoolClients" + assert post.call_args_list[1].kwargs["params"]["Version"] == "2025-10-30" assert post.call_args_list[2].kwargs["params"]["Action"] == "GetUserPoolClient" + assert post.call_args_list[2].kwargs["params"]["Version"] == "2025-10-30" assert post.call_args_list[3].args[0].endswith("/oauth/token") assert post.call_args_list[3].kwargs["headers"]["Authorization"].startswith( "Basic " diff --git a/veadk/a2a/registry_client.py b/veadk/a2a/registry_client.py index daf6a252..dbc3f3cd 100644 --- a/veadk/a2a/registry_client.py +++ b/veadk/a2a/registry_client.py @@ -33,6 +33,7 @@ DEFAULT_ENDPOINT = "http://volcengineapi.byted.org/" DEFAULT_VERSION = "2025-10-30" +IDENTITY_VERSION = "2025-10-30" DEFAULT_SERVICE_NAME = "agentkit" DEFAULT_REGION = "cn-beijing" DEFAULT_TOP_K = 3 @@ -353,7 +354,7 @@ def _identity_post( endpoint=_identity_endpoint(config), service_name="id", action=action, - version=config.version, + version=IDENTITY_VERSION, body=body, ) From 91351950ecb97ba2ccdddf59c8a7857dead9c5af Mon Sep 17 00:00:00 2001 From: songyichun1 Date: Wed, 24 Jun 2026 17:32:11 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8Dgithub=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E6=A3=80=E6=9F=A5=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/a2a/test_registry_client.py | 4 ++-- veadk/a2a/registry_client.py | 11 +++-------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/tests/a2a/test_registry_client.py b/tests/a2a/test_registry_client.py index 364159ee..089f15e0 100644 --- a/tests/a2a/test_registry_client.py +++ b/tests/a2a/test_registry_client.py @@ -291,8 +291,8 @@ def test_create_task_gets_oauth_agent_token_and_sends_message(post: Mock): assert post.call_args_list[2].kwargs["params"]["Action"] == "GetUserPoolClient" assert post.call_args_list[2].kwargs["params"]["Version"] == "2025-10-30" assert post.call_args_list[3].args[0].endswith("/oauth/token") - assert post.call_args_list[3].kwargs["headers"]["Authorization"].startswith( - "Basic " + assert ( + post.call_args_list[3].kwargs["headers"]["Authorization"].startswith("Basic ") ) assert post.call_args_list[4].args[0] == "https://oauth-agent.test/a2a/" assert post.call_args_list[4].kwargs["headers"]["Authorization"] == ( diff --git a/veadk/a2a/registry_client.py b/veadk/a2a/registry_client.py index dbc3f3cd..b4aae29b 100644 --- a/veadk/a2a/registry_client.py +++ b/veadk/a2a/registry_client.py @@ -837,11 +837,8 @@ def _agent_auth_headers( if isinstance(token, str) and token: headers[header_name] = token elif scheme_type == "oauth2": - headers["Authorization"] = ( - "Bearer " - + _oauth2_client_credentials_token( - scheme, _resolve_config(config) - ) + headers["Authorization"] = "Bearer " + _oauth2_client_credentials_token( + scheme, _resolve_config(config) ) if security and not headers: @@ -907,9 +904,7 @@ def _oauth2_client_credentials_token( {"UserPoolUid": user_pool_id, "ClientUid": client_uid}, ) client_result = client_response.get("Result") or {} - client_secret = client_result.get("ClientSecret") or client_result.get( - "Secret" - ) + client_secret = client_result.get("ClientSecret") or client_result.get("Secret") if not client_secret: raise RegistryError( "AGENT_OAUTH_CLIENT_INVALID",