diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml new file mode 100644 index 00000000..ac1dfe20 --- /dev/null +++ b/.github/workflows/conformance.yml @@ -0,0 +1,77 @@ +name: Conformance + +on: + push: + branches: [main] + pull_request: + branches: [main] + workflow_dispatch: + +concurrency: + group: conformance-${{ github.ref }} + cancel-in-progress: true + +env: + # Pinned for reproducible runs; bump deliberately when the suite updates. + CONFORMANCE_VERSION: "0.1.16" + +jobs: + server: + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@v7 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - uses: Swatinem/rust-cache@v2 + + # Build the whole package (server + client bins): the conformance crate is + # excluded from the workspace default-members, so this is the only CI job + # that catches compile breakage in it. + - name: Build conformance binaries + run: cargo build -p mcp-conformance + + - name: Start conformance server + run: | + PORT=8001 ./target/debug/conformance-server & + echo $! > server.pid + for _ in $(seq 1 30); do + if curl -s -o /dev/null http://127.0.0.1:8001/mcp; then + exit 0 + fi + sleep 1 + done + echo "conformance server did not become ready" >&2 + exit 1 + + - name: Run server conformance suite + run: | + npx -y "@modelcontextprotocol/conformance@${CONFORMANCE_VERSION}" server \ + --url http://127.0.0.1:8001/mcp \ + --spec-version 2025-11-25 \ + -o conformance-results + + # These pass today but are excluded from the default "active" suite; + # run them explicitly so regressions are still caught. + - name: Run pending scenarios + run: | + for scenario in json-schema-2020-12 server-sse-polling; do + npx -y "@modelcontextprotocol/conformance@${CONFORMANCE_VERSION}" server \ + --url http://127.0.0.1:8001/mcp \ + --scenario "$scenario" \ + -o conformance-results + done + + - name: Stop conformance server + if: always() + run: kill "$(cat server.pid)" 2>/dev/null || true + + - name: Upload results + if: always() + uses: actions/upload-artifact@v7 + with: + name: conformance-server-results + path: conformance-results diff --git a/conformance/src/bin/server.rs b/conformance/src/bin/server.rs index b0b0d635..4fc89bfe 100644 --- a/conformance/src/bin/server.rs +++ b/conformance/src/bin/server.rs @@ -656,11 +656,11 @@ impl ServerHandler for ConformanceServer { "test_prompt_with_arguments", Some("A test prompt that accepts arguments"), Some(vec![ - PromptArgument::new("name") - .with_description("The name to greet") + PromptArgument::new("arg1") + .with_description("First test argument") .with_required(true), - PromptArgument::new("style") - .with_description("The greeting style") + PromptArgument::new("arg2") + .with_description("Second test argument") .with_required(false), ]), ), @@ -692,14 +692,11 @@ impl ServerHandler for ConformanceServer { .with_description("A simple test prompt")), "test_prompt_with_arguments" => { let args = request.arguments.unwrap_or_default(); - let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("World"); - let style = args - .get("style") - .and_then(|v| v.as_str()) - .unwrap_or("friendly"); + let arg1 = args.get("arg1").and_then(|v| v.as_str()).unwrap_or(""); + let arg2 = args.get("arg2").and_then(|v| v.as_str()).unwrap_or(""); Ok(GetPromptResult::new(vec![PromptMessage::new_text( Role::User, - format!("Please greet {} in a {} style.", name, style), + format!("Prompt with arguments: arg1='{}', arg2='{}'", arg1, arg2), )]) .with_description("A prompt with arguments")) } diff --git a/crates/rmcp/src/model/content.rs b/crates/rmcp/src/model/content.rs index 680136f8..a468de8f 100644 --- a/crates/rmcp/src/model/content.rs +++ b/crates/rmcp/src/model/content.rs @@ -297,7 +297,7 @@ impl ContentBlock { ContentBlock::Resource(EmbeddedResource::new( ResourceContents::TextResourceContents { uri: uri.into(), - mime_type: Some("text".to_string()), + mime_type: Some("text/plain".to_string()), text: content.into(), meta: None, }, diff --git a/crates/rmcp/src/model/resource.rs b/crates/rmcp/src/model/resource.rs index a5ad9506..0381d4d8 100644 --- a/crates/rmcp/src/model/resource.rs +++ b/crates/rmcp/src/model/resource.rs @@ -193,7 +193,7 @@ impl ResourceContents { pub fn text(text: impl Into, uri: impl Into) -> Self { Self::TextResourceContents { uri: uri.into(), - mime_type: Some("text".into()), + mime_type: Some("text/plain".into()), text: text.into(), meta: None, } diff --git a/crates/rmcp/src/task_manager.rs b/crates/rmcp/src/task_manager.rs index 32bcf8f0..21adb38b 100644 --- a/crates/rmcp/src/task_manager.rs +++ b/crates/rmcp/src/task_manager.rs @@ -49,6 +49,7 @@ impl OperationDescriptor { self } + /// Time-to-live in milliseconds, matching `TaskMetadata.ttl` from the MCP spec. pub fn with_ttl(mut self, ttl: u64) -> Self { self.ttl = Some(ttl); self @@ -75,7 +76,11 @@ pub trait OperationResultTransport: Send + Sync + 'static { } // ===== Operation Processor ===== -pub const DEFAULT_TASK_TIMEOUT_SECS: u64 = 300; // 5 minutes +#[deprecated(note = "use DEFAULT_TASK_TIMEOUT_MS; ttl values are milliseconds per the MCP spec")] +pub const DEFAULT_TASK_TIMEOUT_SECS: u64 = 300; +/// Default execution timeout (5 minutes), in milliseconds, applied when a +/// descriptor does not specify a `ttl`. +pub const DEFAULT_TASK_TIMEOUT_MS: u64 = 300_000; /// Operation processor that coordinates extractors and handlers pub struct OperationProcessor { /// Currently running tasks keyed by id @@ -165,13 +170,13 @@ impl OperationProcessor { fn spawn_async_task(&mut self, message: OperationMessage) { let OperationMessage { descriptor, future } = message; let task_id = descriptor.operation_id.clone(); - let timeout_secs = descriptor.ttl.or(Some(DEFAULT_TASK_TIMEOUT_SECS)); + let timeout_ms = descriptor.ttl.or(Some(DEFAULT_TASK_TIMEOUT_MS)); let sender = self.task_result_sender.clone(); let descriptor_for_result = descriptor.clone(); let timed_future = async move { - if let Some(secs) = timeout_secs { - match timeout(Duration::from_secs(secs), future).await { + if let Some(ms) = timeout_ms { + match timeout(Duration::from_millis(ms), future).await { Ok(result) => result, Err(_) => Err(Error::TaskError("Operation timed out".to_string())), } @@ -191,7 +196,7 @@ impl OperationProcessor { let running_task = RunningTask { task_handle: handle, started_at: std::time::Instant::now(), - timeout: timeout_secs, + timeout: timeout_ms, descriptor, }; self.running_tasks.insert(task_id, running_task); @@ -213,7 +218,7 @@ impl OperationProcessor { for (task_id, task) in &self.running_tasks { if let Some(timeout_duration) = task.timeout { - if now.duration_since(task.started_at).as_secs() > timeout_duration { + if now.duration_since(task.started_at).as_millis() > u128::from(timeout_duration) { task.task_handle.abort(); timed_out_tasks.push(task_id.clone()); } diff --git a/crates/rmcp/src/transport/auth.rs b/crates/rmcp/src/transport/auth.rs index c4c03872..ba99c1e3 100644 --- a/crates/rmcp/src/transport/auth.rs +++ b/crates/rmcp/src/transport/auth.rs @@ -1687,7 +1687,10 @@ impl AuthorizationManager { debug!("refresh token present, attempting refresh"); let refresh_token_value = RefreshToken::new(refresh_token.secret().to_string()); - let mut refresh_request = oauth_client.exchange_refresh_token(&refresh_token_value); + let mut refresh_request = oauth_client + .exchange_refresh_token(&refresh_token_value) + // RFC 8707: the resource indicator is required on token requests, including refreshes + .add_extra_param("resource", self.base_url.to_string()); let mut refresh_scopes = stored_credentials.granted_scopes; self.add_offline_access_if_supported(&mut refresh_scopes); for scope in refresh_scopes { @@ -5482,6 +5485,42 @@ mod tests { assert_eq!(scope_parts, vec!["read", "write"]); } + #[tokio::test] + async fn refresh_token_includes_resource_parameter() { + let (base_url, captured) = start_token_server().await; + + let mut manager = manager_with_metadata(Some(AuthorizationMetadata { + authorization_endpoint: format!("{}/authorize", base_url), + token_endpoint: format!("{}/token", base_url), + ..Default::default() + })) + .await; + manager.configure_client(test_client_config()).unwrap(); + + let stored = StoredCredentials { + client_id: "my-client".to_string(), + token_response: Some(make_token_response_with_refresh( + "old-token", + "my-refresh-token", + )), + granted_scopes: vec![], + token_received_at: Some(AuthorizationManager::now_epoch_secs()), + }; + manager.credential_store.save(stored).await.unwrap(); + + manager.refresh_token().await.unwrap(); + + let body = captured.lock().unwrap().take().unwrap(); + let params: std::collections::HashMap<_, _> = url::form_urlencoded::parse(body.as_bytes()) + .into_owned() + .collect(); + assert_eq!( + params.get("resource").map(String::as_str), + Some("http://localhost/"), + "refresh requests must carry the RFC 8707 resource parameter, got body: {body}" + ); + } + #[tokio::test] async fn refresh_token_adds_offline_access_when_as_supports_it() { let (base_url, captured) = start_token_server().await; diff --git a/crates/rmcp/tests/test_task.rs b/crates/rmcp/tests/test_task.rs index ca0f4af5..6f9d6604 100644 --- a/crates/rmcp/tests/test_task.rs +++ b/crates/rmcp/tests/test_task.rs @@ -80,6 +80,38 @@ async fn rejects_duplicate_operation_ids() { assert!(format!("{err}").contains("already running")); } +#[tokio::test] +async fn ttl_is_interpreted_as_milliseconds() { + let mut processor = OperationProcessor::new(); + let descriptor = OperationDescriptor::new("slow", "dummy").with_ttl(50); + let future = Box::pin(async { + tokio::time::sleep(Duration::from_millis(500)).await; + Ok(Box::new(DummyTransport { + id: "slow".to_string(), + value: 0, + }) as Box) + }); + + processor + .submit_operation(OperationMessage::new(descriptor, future)) + .expect("submit operation"); + + tokio::time::sleep(Duration::from_millis(200)).await; + let results = processor.peek_completed(); + assert_eq!( + results.len(), + 1, + "50ms ttl should have timed out the operation well within 200ms" + ); + match &results[0].result { + Err(err) => assert!( + err.to_string().contains("timed out"), + "unexpected error: {err}" + ), + Ok(_) => panic!("expected the operation to time out, but it completed"), + } +} + #[test] fn task_status_notification_param_preserves_meta() { let raw = json!({