Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions .github/workflows/conformance.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
name: Conformance

on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:

concurrency:
group: conformance-${{ github.ref }}
cancel-in-progress: true

env:
# Pinned for reproducible runs; bump deliberately when the suite updates.
CONFORMANCE_VERSION: "0.1.16"

jobs:
server:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v7

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable

- uses: Swatinem/rust-cache@v2

# Build the whole package (server + client bins): the conformance crate is
# excluded from the workspace default-members, so this is the only CI job
# that catches compile breakage in it.
- name: Build conformance binaries
run: cargo build -p mcp-conformance

- name: Start conformance server
run: |
PORT=8001 ./target/debug/conformance-server &
echo $! > server.pid
for _ in $(seq 1 30); do
if curl -s -o /dev/null http://127.0.0.1:8001/mcp; then
exit 0
fi
sleep 1
done
echo "conformance server did not become ready" >&2
exit 1

- name: Run server conformance suite
run: |
npx -y "@modelcontextprotocol/conformance@${CONFORMANCE_VERSION}" server \
--url http://127.0.0.1:8001/mcp \
--spec-version 2025-11-25 \
-o conformance-results

# These pass today but are excluded from the default "active" suite;
# run them explicitly so regressions are still caught.
- name: Run pending scenarios
run: |
for scenario in json-schema-2020-12 server-sse-polling; do
npx -y "@modelcontextprotocol/conformance@${CONFORMANCE_VERSION}" server \
--url http://127.0.0.1:8001/mcp \
--scenario "$scenario" \
-o conformance-results
done

- name: Stop conformance server
if: always()
run: kill "$(cat server.pid)" 2>/dev/null || true

- name: Upload results
if: always()
uses: actions/upload-artifact@v7
with:
name: conformance-server-results
path: conformance-results
17 changes: 7 additions & 10 deletions conformance/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,11 +656,11 @@ impl ServerHandler for ConformanceServer {
"test_prompt_with_arguments",
Some("A test prompt that accepts arguments"),
Some(vec![
PromptArgument::new("name")
.with_description("The name to greet")
PromptArgument::new("arg1")
.with_description("First test argument")
.with_required(true),
PromptArgument::new("style")
.with_description("The greeting style")
PromptArgument::new("arg2")
.with_description("Second test argument")
.with_required(false),
]),
),
Expand Down Expand Up @@ -692,14 +692,11 @@ impl ServerHandler for ConformanceServer {
.with_description("A simple test prompt")),
"test_prompt_with_arguments" => {
let args = request.arguments.unwrap_or_default();
let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("World");
let style = args
.get("style")
.and_then(|v| v.as_str())
.unwrap_or("friendly");
let arg1 = args.get("arg1").and_then(|v| v.as_str()).unwrap_or("");
let arg2 = args.get("arg2").and_then(|v| v.as_str()).unwrap_or("");
Ok(GetPromptResult::new(vec![PromptMessage::new_text(
Role::User,
format!("Please greet {} in a {} style.", name, style),
format!("Prompt with arguments: arg1='{}', arg2='{}'", arg1, arg2),
)])
.with_description("A prompt with arguments"))
}
Expand Down
2 changes: 1 addition & 1 deletion crates/rmcp/src/model/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl ContentBlock {
ContentBlock::Resource(EmbeddedResource::new(
ResourceContents::TextResourceContents {
uri: uri.into(),
mime_type: Some("text".to_string()),
mime_type: Some("text/plain".to_string()),
text: content.into(),
meta: None,
},
Expand Down
2 changes: 1 addition & 1 deletion crates/rmcp/src/model/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl ResourceContents {
pub fn text(text: impl Into<String>, uri: impl Into<String>) -> Self {
Self::TextResourceContents {
uri: uri.into(),
mime_type: Some("text".into()),
mime_type: Some("text/plain".into()),
text: text.into(),
meta: None,
}
Expand Down
17 changes: 11 additions & 6 deletions crates/rmcp/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl OperationDescriptor {
self
}

/// Time-to-live in milliseconds, matching `TaskMetadata.ttl` from the MCP spec.
pub fn with_ttl(mut self, ttl: u64) -> Self {
self.ttl = Some(ttl);
self
Expand All @@ -75,7 +76,11 @@ pub trait OperationResultTransport: Send + Sync + 'static {
}

// ===== Operation Processor =====
pub const DEFAULT_TASK_TIMEOUT_SECS: u64 = 300; // 5 minutes
#[deprecated(note = "use DEFAULT_TASK_TIMEOUT_MS; ttl values are milliseconds per the MCP spec")]
pub const DEFAULT_TASK_TIMEOUT_SECS: u64 = 300;
/// Default execution timeout (5 minutes), in milliseconds, applied when a
/// descriptor does not specify a `ttl`.
pub const DEFAULT_TASK_TIMEOUT_MS: u64 = 300_000;
/// Operation processor that coordinates extractors and handlers
pub struct OperationProcessor {
/// Currently running tasks keyed by id
Expand Down Expand Up @@ -165,13 +170,13 @@ impl OperationProcessor {
fn spawn_async_task(&mut self, message: OperationMessage) {
let OperationMessage { descriptor, future } = message;
let task_id = descriptor.operation_id.clone();
let timeout_secs = descriptor.ttl.or(Some(DEFAULT_TASK_TIMEOUT_SECS));
let timeout_ms = descriptor.ttl.or(Some(DEFAULT_TASK_TIMEOUT_MS));
let sender = self.task_result_sender.clone();
let descriptor_for_result = descriptor.clone();

let timed_future = async move {
if let Some(secs) = timeout_secs {
match timeout(Duration::from_secs(secs), future).await {
if let Some(ms) = timeout_ms {
match timeout(Duration::from_millis(ms), future).await {
Ok(result) => result,
Err(_) => Err(Error::TaskError("Operation timed out".to_string())),
}
Expand All @@ -191,7 +196,7 @@ impl OperationProcessor {
let running_task = RunningTask {
task_handle: handle,
started_at: std::time::Instant::now(),
timeout: timeout_secs,
timeout: timeout_ms,
descriptor,
};
self.running_tasks.insert(task_id, running_task);
Expand All @@ -213,7 +218,7 @@ impl OperationProcessor {

for (task_id, task) in &self.running_tasks {
if let Some(timeout_duration) = task.timeout {
if now.duration_since(task.started_at).as_secs() > timeout_duration {
if now.duration_since(task.started_at).as_millis() > u128::from(timeout_duration) {
task.task_handle.abort();
timed_out_tasks.push(task_id.clone());
}
Expand Down
41 changes: 40 additions & 1 deletion crates/rmcp/src/transport/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1687,7 +1687,10 @@ impl AuthorizationManager {
debug!("refresh token present, attempting refresh");

let refresh_token_value = RefreshToken::new(refresh_token.secret().to_string());
let mut refresh_request = oauth_client.exchange_refresh_token(&refresh_token_value);
let mut refresh_request = oauth_client
.exchange_refresh_token(&refresh_token_value)
// RFC 8707: the resource indicator is required on token requests, including refreshes
.add_extra_param("resource", self.base_url.to_string());
let mut refresh_scopes = stored_credentials.granted_scopes;
self.add_offline_access_if_supported(&mut refresh_scopes);
for scope in refresh_scopes {
Expand Down Expand Up @@ -5482,6 +5485,42 @@ mod tests {
assert_eq!(scope_parts, vec!["read", "write"]);
}

#[tokio::test]
async fn refresh_token_includes_resource_parameter() {
let (base_url, captured) = start_token_server().await;

let mut manager = manager_with_metadata(Some(AuthorizationMetadata {
authorization_endpoint: format!("{}/authorize", base_url),
token_endpoint: format!("{}/token", base_url),
..Default::default()
}))
.await;
manager.configure_client(test_client_config()).unwrap();

let stored = StoredCredentials {
client_id: "my-client".to_string(),
token_response: Some(make_token_response_with_refresh(
"old-token",
"my-refresh-token",
)),
granted_scopes: vec![],
token_received_at: Some(AuthorizationManager::now_epoch_secs()),
};
manager.credential_store.save(stored).await.unwrap();

manager.refresh_token().await.unwrap();

let body = captured.lock().unwrap().take().unwrap();
let params: std::collections::HashMap<_, _> = url::form_urlencoded::parse(body.as_bytes())
.into_owned()
.collect();
assert_eq!(
params.get("resource").map(String::as_str),
Some("http://localhost/"),
"refresh requests must carry the RFC 8707 resource parameter, got body: {body}"
);
}

#[tokio::test]
async fn refresh_token_adds_offline_access_when_as_supports_it() {
let (base_url, captured) = start_token_server().await;
Expand Down
32 changes: 32 additions & 0 deletions crates/rmcp/tests/test_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,38 @@ async fn rejects_duplicate_operation_ids() {
assert!(format!("{err}").contains("already running"));
}

#[tokio::test]
async fn ttl_is_interpreted_as_milliseconds() {
let mut processor = OperationProcessor::new();
let descriptor = OperationDescriptor::new("slow", "dummy").with_ttl(50);
let future = Box::pin(async {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(Box::new(DummyTransport {
id: "slow".to_string(),
value: 0,
}) as Box<dyn OperationResultTransport>)
});

processor
.submit_operation(OperationMessage::new(descriptor, future))
.expect("submit operation");

tokio::time::sleep(Duration::from_millis(200)).await;
let results = processor.peek_completed();
assert_eq!(
results.len(),
1,
"50ms ttl should have timed out the operation well within 200ms"
);
match &results[0].result {
Err(err) => assert!(
err.to_string().contains("timed out"),
"unexpected error: {err}"
),
Ok(_) => panic!("expected the operation to time out, but it completed"),
}
}

#[test]
fn task_status_notification_param_preserves_meta() {
let raw = json!({
Expand Down