diff --git a/engine/src/engine/observe/adapter/enrich.rs b/engine/src/engine/observe/adapter/enrich.rs index 2328cb6..dc162e5 100644 --- a/engine/src/engine/observe/adapter/enrich.rs +++ b/engine/src/engine/observe/adapter/enrich.rs @@ -3,6 +3,7 @@ use petgraph::visit::EdgeRef; use super::*; use crate::engine::graph::{Behavior, Reachability}; use crate::engine::observe::Attribution; +use crate::engine::observe::ip_index::IpIndex; /// Annotates Image nodes with vulnerability findings (Vulnerability port). Like /// [`ExposureAdapter`], it enriches existing Image nodes, so it runs after the @@ -45,6 +46,16 @@ impl Adapter for RuntimeAdapter { if snapshot.runtime_events.is_empty() { return; } + // IP → cluster-object index, built once per pass from the SAME Pod/Service + // objects the reflector stores already hold (JEF: resolve-connection-peers). It + // turns a raw `IP:port` connection peer into the workload/service it belongs to + // (`analytics/influxdb:8086 (10.42.1.159)`) so the dashboard AND the adjudicator + // prompt — which both render `Behavior::summary()` — show *what* a pod connects + // to, not a bare IP, with NO change to either rendering site. A pure in-memory + // lookup: zero outbound calls, so the zero-egress invariant holds (we explicitly + // do NOT do reverse DNS; cluster pod IPs aren't in external DNS and a PTR lookup + // would leave the cluster). + let ip_index = IpIndex::from_snapshot(snapshot); // UID → the Pod from the watch, so events a sensor attributed by cgroup UID (the // eBPF agent) resolve to a workload without the agent ever touching the cluster // API (ADR-0014). The full Pod (not just ns/name) is needed to refine a raw @@ -108,6 +119,16 @@ impl Adapter for RuntimeAdapter { continue; } }, + // Resolve a connection peer's cluster IP to the workload/service it + // belongs to (JEF: resolve-connection-peers). `resolve_peer` keeps an + // internet/unknown/unresolvable peer exactly as the raw `IP:port`, so + // this only ever *enriches* a same-cluster pod/service peer; the resolved + // name then flows through `Behavior::summary()` to both the prompt and the + // dashboard unchanged. + Behavior::NetworkConnection { peer, internet } => Behavior::NetworkConnection { + peer: ip_index.resolve_peer(peer, *internet), + internet: *internet, + }, other => other.clone(), }; // Carry the sensor's identity and observation time into the provenance: @@ -828,6 +849,87 @@ mod tests { ); } + /// The `NetworkConnection` behaviors attached to the (single) workload after the + /// full adapter pipeline — i.e. the peer strings as `Behavior::summary()` (the + /// prompt + dashboard) will render them. + fn connection_peers(snap: Snapshot) -> Vec<(String, bool)> { + let graph = super::super::build_graph(&snap, &super::super::default_adapters()); + graph + .inner() + .node_weights() + .find_map(|n| match n { + Node::Workload(w) => Some( + w.runtime + .iter() + .filter_map(|o| match &o.behavior { + Behavior::NetworkConnection { peer, internet } => { + Some((peer.clone(), *internet)) + } + _ => None, + }) + .collect::>(), + ), + _ => None, + }) + .expect("workload node exists") + } + + #[test] + fn runtime_adapter_resolves_cluster_connection_peers_to_names() { + // app/web connects to a cluster pod (analytics/influxdb-0), a cluster service + // (analytics/influxdb), an unknown cluster IP, and the internet. After the + // pipeline the pod/service peers are resolved to ns/name:port (raw-ip); the + // unknown IP stays raw; the internet peer stays raw (egress, not resolved). + let web = pod(json!({ + "apiVersion": "v1", "kind": "Pod", + "metadata": {"name": "web", "namespace": "app", "labels": {"app": "web"}}, + "spec": {"containers": [{"name": "web", "image": "web:1"}]} + })); + let influx_pod = pod(json!({ + "apiVersion": "v1", "kind": "Pod", + "metadata": {"name": "influxdb-0", "namespace": "analytics"}, + "spec": {"containers": [{"name": "influxdb", "image": "influxdb:2"}]}, + "status": {"podIP": "10.42.1.159"} + })); + let influx_svc: k8s_openapi::api::core::v1::Service = serde_json::from_value(json!({ + "apiVersion": "v1", "kind": "Service", + "metadata": {"name": "influxdb", "namespace": "analytics"}, + "spec": {"clusterIP": "10.43.0.10"} + })) + .expect("valid Service"); + let conn = |peer: &str, internet: bool| RuntimeObservation { + attribution: Attribution::by_namespaced_name("app", "web"), + source: None, + observed_at_ms: None, + behavior: Behavior::NetworkConnection { + peer: peer.into(), + internet, + }, + }; + let snap = Snapshot { + pods: vec![web, influx_pod], + services: vec![influx_svc], + runtime_events: vec![ + conn("10.42.1.159:8086", false), // a cluster pod + conn("10.43.0.10:8086", false), // a cluster service ClusterIP + conn("10.99.0.1:443", false), // an unresolvable cluster IP + conn("1.2.3.4:443", true), // internet egress + ], + ..Default::default() + }; + let mut peers = connection_peers(snap); + peers.sort(); + assert_eq!( + peers, + vec![ + ("1.2.3.4:443".to_string(), true), + ("10.99.0.1:443".to_string(), false), + ("analytics/influxdb-0:8086 (10.42.1.159)".to_string(), false), + ("analytics/influxdb:8086 (10.43.0.10)".to_string(), false), + ] + ); + } + #[test] fn cve_without_pkg_name_stays_unknown() { // No package name to correlate against → the CVE keeps Unknown even with a load. diff --git a/engine/src/engine/observe/ip_index.rs b/engine/src/engine/observe/ip_index.rs new file mode 100644 index 0000000..facf8d6 --- /dev/null +++ b/engine/src/engine/observe/ip_index.rs @@ -0,0 +1,217 @@ +//! An in-memory IP → cluster-object index (JEF: resolve-connection-peers). +//! +//! A `NetworkConnection` behavior carries a raw `IP:port` peer (e.g. +//! `10.42.1.159:8086`), which reads as opaque in the dashboard and the adjudicator +//! prompt. This index turns the IP half into the workload/service it belongs to — +//! `analytics/influxdb:8086 (10.42.1.159)` — so an operator (and the model) see +//! *what* a pod connects to. +//! +//! ## Why not reverse DNS +//! Cluster pod IPs (10.42.x.x) aren't in external DNS, and any outbound PTR lookup +//! would violate the zero-egress invariant (the security graph and evidence never +//! leave the cluster — see CLAUDE.md / docs/adr). So resolution is a *pure in-memory +//! lookup* against the Pod/Service objects the engine's reflector stores already +//! watch — `status.podIP` on a Pod, `spec.clusterIP` on a Service. No network call, +//! ever, on the hot path: the index is built from a [`Snapshot`] read and queried +//! with [`IpIndex::resolve`]. +//! +//! The index lives in the engine (which has cluster access), never in the shared +//! `behavior` crate (which has none) — the wire type stays pure data. + +use std::collections::HashMap; + +use k8s_openapi::api::core::v1::{Pod, Service}; + +use super::Snapshot; + +/// What an IP resolves to: the namespace + name of the owning cluster object. A pod +/// IP resolves to its pod's `namespace/name`; a service ClusterIP to the service's +/// `namespace/name`. (We don't keep the `kind` distinct in the rendered label — both +/// render as `namespace/name` — but it's tracked so a future caller can tell them +/// apart without rebuilding the index.) +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ResolvedPeer { + pub namespace: String, + pub name: String, + pub kind: PeerKind, +} + +/// Which kind of cluster object an IP belongs to. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PeerKind { + /// A Pod, resolved from `status.podIP`. + Pod, + /// A Service, resolved from `spec.clusterIP`. + Service, +} + +impl ResolvedPeer { + /// The `namespace/name` label rendered into the peer string. Derived from cluster + /// object names (namespace + pod/service name) — still untrusted-adjacent, so the + /// prompt/render sites sanitize it exactly as they already do the raw peer. + fn label(&self) -> String { + format!("{}/{}", self.namespace, self.name) + } +} + +/// A pure, in-memory map from a bare cluster IP to the object that owns it. Built from +/// a [`Snapshot`]'s Pods and Services (the same objects the reflector stores hold), so +/// a lookup is a hashmap probe with zero IO. +/// +/// A Pod entry takes precedence over a Service entry on the rare event of a collision +/// (a ClusterIP and a podIP should never coincide, but if they did, the concrete pod +/// is the more specific answer). Pods are indexed after services so they win. +#[derive(Debug, Default, Clone)] +pub struct IpIndex { + by_ip: HashMap, +} + +impl IpIndex { + /// Build the index from the cluster objects in `snapshot`. Pure: it reads + /// `spec.clusterIP` off each Service and `status.podIP`/`status.podIPs` off each + /// Pod and nothing else — no network, no blocking. + pub fn from_snapshot(snapshot: &Snapshot) -> Self { + let mut by_ip = HashMap::new(); + // Services first, then Pods, so a Pod IP wins any (pathological) collision. + for svc in &snapshot.services { + Self::index_service(&mut by_ip, svc); + } + for pod in &snapshot.pods { + Self::index_pod(&mut by_ip, pod); + } + Self { by_ip } + } + + fn index_service(by_ip: &mut HashMap, svc: &Service) { + let (Some(namespace), Some(name)) = + (svc.metadata.namespace.clone(), svc.metadata.name.clone()) + else { + return; + }; + let Some(spec) = &svc.spec else { return }; + // `clusterIP` is the primary; `clusterIPs` carries it plus any dual-stack + // sibling. Index every concrete one (skip the "None" headless sentinel and + // empty strings — a headless Service has no ClusterIP to resolve). + let ips = spec + .cluster_ip + .iter() + .chain(spec.cluster_ips.iter().flatten()); + for ip in ips { + if is_resolvable_ip(ip) { + by_ip.insert( + ip.clone(), + ResolvedPeer { + namespace: namespace.clone(), + name: name.clone(), + kind: PeerKind::Service, + }, + ); + } + } + } + + fn index_pod(by_ip: &mut HashMap, pod: &Pod) { + let (Some(namespace), Some(name)) = + (pod.metadata.namespace.clone(), pod.metadata.name.clone()) + else { + return; + }; + let Some(status) = &pod.status else { return }; + // `podIP` is the primary address; `podIPs` carries it plus any dual-stack + // sibling. Index every concrete one so either family resolves. + let ips = status + .pod_ip + .iter() + .chain(status.pod_ips.iter().flatten().map(|p| &p.ip)); + for ip in ips { + if is_resolvable_ip(ip) { + by_ip.insert( + ip.clone(), + ResolvedPeer { + namespace: namespace.clone(), + name: name.clone(), + kind: PeerKind::Pod, + }, + ); + } + } + } + + /// Resolve a bare IP to the object that owns it, or `None` if unknown. Pure + /// hashmap probe — no IO. + pub fn resolve(&self, ip: &str) -> Option<&ResolvedPeer> { + self.by_ip.get(ip) + } + + /// Rewrite a `NetworkConnection` peer string into its resolved cluster form, or + /// return it unchanged when it can't be resolved. + /// + /// Rules (JEF: resolve-connection-peers): + /// - `internet` peers are left as the raw `IP:port` — they're external egress, not + /// a cluster object, so there's nothing in-cluster to resolve to (the caller's + /// `internet` flag still labels them as egress downstream). + /// - A same-cluster pod or service IP becomes + /// `namespace/name:port (raw-ip)` — the resolved name, the original port, and the + /// raw IP kept in parens for forensics. + /// - An unknown / unresolvable IP is left exactly as the raw `IP:port` — we never + /// fabricate a name. + /// + /// Deterministic and pure given the index. + pub fn resolve_peer(&self, peer: &str, internet: bool) -> String { + if internet { + // External egress — nothing in-cluster to resolve to; keep it raw. + return peer.to_string(); + } + let Some((ip, port)) = split_ip_port(peer) else { + // Not in `IP:port` shape — leave it untouched rather than guess. + return peer.to_string(); + }; + match self.resolve(ip) { + Some(resolved) => format!("{}:{port} ({ip})", resolved.label()), + None => peer.to_string(), + } + } + + /// The number of indexed IPs — for observability/tests. + pub fn len(&self) -> usize { + self.by_ip.len() + } + + pub fn is_empty(&self) -> bool { + self.by_ip.is_empty() + } +} + +/// Whether an IP string is a concrete address we can index. Kubernetes uses the +/// literal `"None"` for a headless Service's ClusterIP and may carry an empty string; +/// neither is a real address, so we skip them. +fn is_resolvable_ip(ip: &str) -> bool { + !ip.is_empty() && ip != "None" +} + +/// Split a `peer` of the form `IP:port` into `(ip, port)`. Handles both IPv4 +/// (`10.42.1.159:8086`) and bracketed IPv6 (`[fd00::1]:8086`); returns `None` when +/// there's no `:port` suffix to split on (so the caller leaves the peer untouched). +/// +/// For IPv4 we split on the *last* colon (an IPv4 address has none of its own); for a +/// bracketed IPv6 literal we split after the closing bracket so the address's internal +/// colons are preserved. +fn split_ip_port(peer: &str) -> Option<(&str, &str)> { + if let Some(rest) = peer.strip_prefix('[') { + // Bracketed IPv6: `[addr]:port` → ip = `addr`, port after `]:`. + let (addr, after) = rest.split_once(']')?; + let port = after.strip_prefix(':')?; + if addr.is_empty() || port.is_empty() { + return None; + } + return Some((addr, port)); + } + let (ip, port) = peer.rsplit_once(':')?; + if ip.is_empty() || port.is_empty() { + return None; + } + Some((ip, port)) +} + +#[cfg(test)] +mod tests; diff --git a/engine/src/engine/observe/ip_index/tests.rs b/engine/src/engine/observe/ip_index/tests.rs new file mode 100644 index 0000000..a52ab7e --- /dev/null +++ b/engine/src/engine/observe/ip_index/tests.rs @@ -0,0 +1,161 @@ +use super::*; +use crate::engine::observe::Snapshot; +use serde_json::json; + +fn pod(value: serde_json::Value) -> Pod { + serde_json::from_value(value).expect("valid Pod fixture") +} + +fn service(value: serde_json::Value) -> Service { + serde_json::from_value(value).expect("valid Service fixture") +} + +/// A snapshot with one pod (10.42.1.159) and one service (10.43.0.10) — the fixture +/// the resolution tests probe against. +fn fixture() -> Snapshot { + Snapshot { + pods: vec![pod(json!({ + "apiVersion": "v1", "kind": "Pod", + "metadata": {"name": "influxdb-0", "namespace": "analytics"}, + "spec": {"containers": [{"name": "influxdb", "image": "influxdb:2"}]}, + "status": {"podIP": "10.42.1.159"} + }))], + services: vec![service(json!({ + "apiVersion": "v1", "kind": "Service", + "metadata": {"name": "influxdb", "namespace": "analytics"}, + "spec": {"clusterIP": "10.43.0.10", "clusterIPs": ["10.43.0.10"]} + }))], + ..Default::default() + } +} + +#[test] +fn pod_ip_resolves_to_namespace_name() { + let index = IpIndex::from_snapshot(&fixture()); + let resolved = index.resolve("10.42.1.159").expect("pod IP is indexed"); + assert_eq!(resolved.namespace, "analytics"); + assert_eq!(resolved.name, "influxdb-0"); + assert_eq!(resolved.kind, PeerKind::Pod); +} + +#[test] +fn service_cluster_ip_resolves_to_namespace_name() { + let index = IpIndex::from_snapshot(&fixture()); + let resolved = index + .resolve("10.43.0.10") + .expect("service ClusterIP is indexed"); + assert_eq!(resolved.namespace, "analytics"); + assert_eq!(resolved.name, "influxdb"); + assert_eq!(resolved.kind, PeerKind::Service); +} + +#[test] +fn unknown_ip_does_not_resolve() { + let index = IpIndex::from_snapshot(&fixture()); + assert!(index.resolve("8.8.8.8").is_none()); +} + +#[test] +fn resolve_peer_rewrites_a_pod_peer_with_raw_ip_kept_for_forensics() { + // The issue's canonical case: a cluster pod IP becomes ns/name:port (raw-ip). + let index = IpIndex::from_snapshot(&fixture()); + assert_eq!( + index.resolve_peer("10.42.1.159:8086", false), + "analytics/influxdb-0:8086 (10.42.1.159)" + ); +} + +#[test] +fn resolve_peer_rewrites_a_service_cluster_ip() { + let index = IpIndex::from_snapshot(&fixture()); + assert_eq!( + index.resolve_peer("10.43.0.10:8086", false), + "analytics/influxdb:8086 (10.43.0.10)" + ); +} + +#[test] +fn resolve_peer_leaves_an_unknown_ip_raw() { + // An unresolvable IP must stay EXACTLY as the raw IP:port — never fabricate a name. + let index = IpIndex::from_snapshot(&fixture()); + assert_eq!(index.resolve_peer("10.99.0.1:443", false), "10.99.0.1:443"); +} + +#[test] +fn resolve_peer_leaves_internet_peers_raw() { + // internet: true peers are external egress — not resolved, kept raw (the caller's + // `internet` flag still labels them as egress downstream). + let index = IpIndex::from_snapshot(&fixture()); + // Even an IP that *happens* to be indexed isn't rewritten when flagged internet. + assert_eq!( + index.resolve_peer("10.42.1.159:8086", true), + "10.42.1.159:8086" + ); + assert_eq!(index.resolve_peer("1.2.3.4:443", true), "1.2.3.4:443"); +} + +#[test] +fn resolve_peer_leaves_a_non_ip_port_peer_untouched() { + // A peer that isn't in IP:port shape (no usable suffix) is passed through, not + // guessed at. + let index = IpIndex::from_snapshot(&fixture()); + assert_eq!(index.resolve_peer("just-a-host", false), "just-a-host"); + assert_eq!(index.resolve_peer("10.42.1.159", false), "10.42.1.159"); +} + +#[test] +fn ipv6_bracketed_peer_resolves() { + let snap = Snapshot { + pods: vec![pod(json!({ + "apiVersion": "v1", "kind": "Pod", + "metadata": {"name": "web-0", "namespace": "app"}, + "spec": {"containers": [{"name": "web", "image": "web:1"}]}, + "status": {"podIP": "fd00::1", "podIPs": [{"ip": "fd00::1"}]} + }))], + ..Default::default() + }; + let index = IpIndex::from_snapshot(&snap); + assert_eq!( + index.resolve_peer("[fd00::1]:5432", false), + "app/web-0:5432 (fd00::1)" + ); +} + +#[test] +fn headless_service_clusterip_none_is_not_indexed() { + // A headless Service carries the literal "None" — not a real address, so it must + // not land in the index (and an empty string is skipped too). + let snap = Snapshot { + services: vec![service(json!({ + "apiVersion": "v1", "kind": "Service", + "metadata": {"name": "headless", "namespace": "app"}, + "spec": {"clusterIP": "None", "clusterIPs": ["None"]} + }))], + ..Default::default() + }; + let index = IpIndex::from_snapshot(&snap); + assert!(index.is_empty(), "headless ClusterIP 'None' is not indexed"); + assert!(index.resolve("None").is_none()); +} + +#[test] +fn pod_wins_a_collision_with_a_service() { + // Pathological: a Service and a Pod sharing an IP. The concrete Pod is the more + // specific answer, so it wins (Pods are indexed after Services). + let snap = Snapshot { + pods: vec![pod(json!({ + "apiVersion": "v1", "kind": "Pod", + "metadata": {"name": "p", "namespace": "ns"}, + "spec": {"containers": [{"name": "c", "image": "i:1"}]}, + "status": {"podIP": "10.0.0.1"} + }))], + services: vec![service(json!({ + "apiVersion": "v1", "kind": "Service", + "metadata": {"name": "s", "namespace": "ns"}, + "spec": {"clusterIP": "10.0.0.1"} + }))], + ..Default::default() + }; + let index = IpIndex::from_snapshot(&snap); + assert_eq!(index.resolve("10.0.0.1").unwrap().kind, PeerKind::Pod); +} diff --git a/engine/src/engine/observe/mod.rs b/engine/src/engine/observe/mod.rs index 68f7b21..9cceffb 100644 --- a/engine/src/engine/observe/mod.rs +++ b/engine/src/engine/observe/mod.rs @@ -12,6 +12,7 @@ pub mod exec_class; pub mod exploit_intel; pub mod health; pub mod ingest_guard; +pub mod ip_index; pub mod linkerd; pub mod runtime; pub mod trivy;