Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions engine/src/engine/observe/adapter/enrich.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use petgraph::visit::EdgeRef;
use super::*;
use crate::engine::graph::{Behavior, Reachability};
use crate::engine::observe::Attribution;
use crate::engine::observe::ip_index::IpIndex;

/// Annotates Image nodes with vulnerability findings (Vulnerability port). Like
/// [`ExposureAdapter`], it enriches existing Image nodes, so it runs after the
Expand Down Expand Up @@ -45,6 +46,16 @@ impl Adapter for RuntimeAdapter {
if snapshot.runtime_events.is_empty() {
return;
}
// IP → cluster-object index, built once per pass from the SAME Pod/Service
// objects the reflector stores already hold (JEF: resolve-connection-peers). It
// turns a raw `IP:port` connection peer into the workload/service it belongs to
// (`analytics/influxdb:8086 (10.42.1.159)`) so the dashboard AND the adjudicator
// prompt — which both render `Behavior::summary()` — show *what* a pod connects
// to, not a bare IP, with NO change to either rendering site. A pure in-memory
// lookup: zero outbound calls, so the zero-egress invariant holds (we explicitly
// do NOT do reverse DNS; cluster pod IPs aren't in external DNS and a PTR lookup
// would leave the cluster).
let ip_index = IpIndex::from_snapshot(snapshot);
// UID → the Pod from the watch, so events a sensor attributed by cgroup UID (the
// eBPF agent) resolve to a workload without the agent ever touching the cluster
// API (ADR-0014). The full Pod (not just ns/name) is needed to refine a raw
Expand Down Expand Up @@ -108,6 +119,16 @@ impl Adapter for RuntimeAdapter {
continue;
}
},
// Resolve a connection peer's cluster IP to the workload/service it
// belongs to (JEF: resolve-connection-peers). `resolve_peer` keeps an
// internet/unknown/unresolvable peer exactly as the raw `IP:port`, so
// this only ever *enriches* a same-cluster pod/service peer; the resolved
// name then flows through `Behavior::summary()` to both the prompt and the
// dashboard unchanged.
Behavior::NetworkConnection { peer, internet } => Behavior::NetworkConnection {
peer: ip_index.resolve_peer(peer, *internet),
internet: *internet,
},
other => other.clone(),
};
// Carry the sensor's identity and observation time into the provenance:
Expand Down Expand Up @@ -828,6 +849,87 @@ mod tests {
);
}

/// The `NetworkConnection` behaviors attached to the (single) workload after the
/// full adapter pipeline — i.e. the peer strings as `Behavior::summary()` (the
/// prompt + dashboard) will render them.
fn connection_peers(snap: Snapshot) -> Vec<(String, bool)> {
let graph = super::super::build_graph(&snap, &super::super::default_adapters());
graph
.inner()
.node_weights()
.find_map(|n| match n {
Node::Workload(w) => Some(
w.runtime
.iter()
.filter_map(|o| match &o.behavior {
Behavior::NetworkConnection { peer, internet } => {
Some((peer.clone(), *internet))
}
_ => None,
})
.collect::<Vec<_>>(),
),
_ => None,
})
.expect("workload node exists")
}

#[test]
fn runtime_adapter_resolves_cluster_connection_peers_to_names() {
// app/web connects to a cluster pod (analytics/influxdb-0), a cluster service
// (analytics/influxdb), an unknown cluster IP, and the internet. After the
// pipeline the pod/service peers are resolved to ns/name:port (raw-ip); the
// unknown IP stays raw; the internet peer stays raw (egress, not resolved).
let web = pod(json!({
"apiVersion": "v1", "kind": "Pod",
"metadata": {"name": "web", "namespace": "app", "labels": {"app": "web"}},
"spec": {"containers": [{"name": "web", "image": "web:1"}]}
}));
let influx_pod = pod(json!({
"apiVersion": "v1", "kind": "Pod",
"metadata": {"name": "influxdb-0", "namespace": "analytics"},
"spec": {"containers": [{"name": "influxdb", "image": "influxdb:2"}]},
"status": {"podIP": "10.42.1.159"}
}));
let influx_svc: k8s_openapi::api::core::v1::Service = serde_json::from_value(json!({
"apiVersion": "v1", "kind": "Service",
"metadata": {"name": "influxdb", "namespace": "analytics"},
"spec": {"clusterIP": "10.43.0.10"}
}))
.expect("valid Service");
let conn = |peer: &str, internet: bool| RuntimeObservation {
attribution: Attribution::by_namespaced_name("app", "web"),
source: None,
observed_at_ms: None,
behavior: Behavior::NetworkConnection {
peer: peer.into(),
internet,
},
};
let snap = Snapshot {
pods: vec![web, influx_pod],
services: vec![influx_svc],
runtime_events: vec![
conn("10.42.1.159:8086", false), // a cluster pod
conn("10.43.0.10:8086", false), // a cluster service ClusterIP
conn("10.99.0.1:443", false), // an unresolvable cluster IP
conn("1.2.3.4:443", true), // internet egress
],
..Default::default()
};
let mut peers = connection_peers(snap);
peers.sort();
assert_eq!(
peers,
vec![
("1.2.3.4:443".to_string(), true),
("10.99.0.1:443".to_string(), false),
("analytics/influxdb-0:8086 (10.42.1.159)".to_string(), false),
("analytics/influxdb:8086 (10.43.0.10)".to_string(), false),
]
);
}

#[test]
fn cve_without_pkg_name_stays_unknown() {
// No package name to correlate against → the CVE keeps Unknown even with a load.
Expand Down
217 changes: 217 additions & 0 deletions engine/src/engine/observe/ip_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
//! An in-memory IP → cluster-object index (JEF: resolve-connection-peers).
//!
//! A `NetworkConnection` behavior carries a raw `IP:port` peer (e.g.
//! `10.42.1.159:8086`), which reads as opaque in the dashboard and the adjudicator
//! prompt. This index turns the IP half into the workload/service it belongs to —
//! `analytics/influxdb:8086 (10.42.1.159)` — so an operator (and the model) see
//! *what* a pod connects to.
//!
//! ## Why not reverse DNS
//! Cluster pod IPs (10.42.x.x) aren't in external DNS, and any outbound PTR lookup
//! would violate the zero-egress invariant (the security graph and evidence never
//! leave the cluster — see CLAUDE.md / docs/adr). So resolution is a *pure in-memory
//! lookup* against the Pod/Service objects the engine's reflector stores already
//! watch — `status.podIP` on a Pod, `spec.clusterIP` on a Service. No network call,
//! ever, on the hot path: the index is built from a [`Snapshot`] read and queried
//! with [`IpIndex::resolve`].
//!
//! The index lives in the engine (which has cluster access), never in the shared
//! `behavior` crate (which has none) — the wire type stays pure data.

use std::collections::HashMap;

use k8s_openapi::api::core::v1::{Pod, Service};

use super::Snapshot;

/// What an IP resolves to: the namespace + name of the owning cluster object. A pod
/// IP resolves to its pod's `namespace/name`; a service ClusterIP to the service's
/// `namespace/name`. (We don't keep the `kind` distinct in the rendered label — both
/// render as `namespace/name` — but it's tracked so a future caller can tell them
/// apart without rebuilding the index.)
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResolvedPeer {
pub namespace: String,
pub name: String,
pub kind: PeerKind,
}

/// Which kind of cluster object an IP belongs to.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerKind {
/// A Pod, resolved from `status.podIP`.
Pod,
/// A Service, resolved from `spec.clusterIP`.
Service,
}

impl ResolvedPeer {
/// The `namespace/name` label rendered into the peer string. Derived from cluster
/// object names (namespace + pod/service name) — still untrusted-adjacent, so the
/// prompt/render sites sanitize it exactly as they already do the raw peer.
fn label(&self) -> String {
format!("{}/{}", self.namespace, self.name)
}
}

/// A pure, in-memory map from a bare cluster IP to the object that owns it. Built from
/// a [`Snapshot`]'s Pods and Services (the same objects the reflector stores hold), so
/// a lookup is a hashmap probe with zero IO.
///
/// A Pod entry takes precedence over a Service entry on the rare event of a collision
/// (a ClusterIP and a podIP should never coincide, but if they did, the concrete pod
/// is the more specific answer). Pods are indexed after services so they win.
#[derive(Debug, Default, Clone)]
pub struct IpIndex {
by_ip: HashMap<String, ResolvedPeer>,
}

impl IpIndex {
/// Build the index from the cluster objects in `snapshot`. Pure: it reads
/// `spec.clusterIP` off each Service and `status.podIP`/`status.podIPs` off each
/// Pod and nothing else — no network, no blocking.
pub fn from_snapshot(snapshot: &Snapshot) -> Self {
let mut by_ip = HashMap::new();
// Services first, then Pods, so a Pod IP wins any (pathological) collision.
for svc in &snapshot.services {
Self::index_service(&mut by_ip, svc);
}
for pod in &snapshot.pods {
Self::index_pod(&mut by_ip, pod);
}
Self { by_ip }
}

fn index_service(by_ip: &mut HashMap<String, ResolvedPeer>, svc: &Service) {
let (Some(namespace), Some(name)) =
(svc.metadata.namespace.clone(), svc.metadata.name.clone())
else {
return;
};
let Some(spec) = &svc.spec else { return };
// `clusterIP` is the primary; `clusterIPs` carries it plus any dual-stack
// sibling. Index every concrete one (skip the "None" headless sentinel and
// empty strings — a headless Service has no ClusterIP to resolve).
let ips = spec
.cluster_ip
.iter()
.chain(spec.cluster_ips.iter().flatten());
for ip in ips {
if is_resolvable_ip(ip) {
by_ip.insert(
ip.clone(),
ResolvedPeer {
namespace: namespace.clone(),
name: name.clone(),
kind: PeerKind::Service,
},
);
}
}
}

fn index_pod(by_ip: &mut HashMap<String, ResolvedPeer>, pod: &Pod) {
let (Some(namespace), Some(name)) =
(pod.metadata.namespace.clone(), pod.metadata.name.clone())
else {
return;
};
let Some(status) = &pod.status else { return };
// `podIP` is the primary address; `podIPs` carries it plus any dual-stack
// sibling. Index every concrete one so either family resolves.
let ips = status
.pod_ip
.iter()
.chain(status.pod_ips.iter().flatten().map(|p| &p.ip));
for ip in ips {
if is_resolvable_ip(ip) {
by_ip.insert(
ip.clone(),
ResolvedPeer {
namespace: namespace.clone(),
name: name.clone(),
kind: PeerKind::Pod,
},
);
}
}
}

/// Resolve a bare IP to the object that owns it, or `None` if unknown. Pure
/// hashmap probe — no IO.
pub fn resolve(&self, ip: &str) -> Option<&ResolvedPeer> {
self.by_ip.get(ip)
}

/// Rewrite a `NetworkConnection` peer string into its resolved cluster form, or
/// return it unchanged when it can't be resolved.
///
/// Rules (JEF: resolve-connection-peers):
/// - `internet` peers are left as the raw `IP:port` — they're external egress, not
/// a cluster object, so there's nothing in-cluster to resolve to (the caller's
/// `internet` flag still labels them as egress downstream).
/// - A same-cluster pod or service IP becomes
/// `namespace/name:port (raw-ip)` — the resolved name, the original port, and the
/// raw IP kept in parens for forensics.
/// - An unknown / unresolvable IP is left exactly as the raw `IP:port` — we never
/// fabricate a name.
///
/// Deterministic and pure given the index.
pub fn resolve_peer(&self, peer: &str, internet: bool) -> String {
if internet {
// External egress — nothing in-cluster to resolve to; keep it raw.
return peer.to_string();
}
let Some((ip, port)) = split_ip_port(peer) else {
// Not in `IP:port` shape — leave it untouched rather than guess.
return peer.to_string();
};
match self.resolve(ip) {
Some(resolved) => format!("{}:{port} ({ip})", resolved.label()),
None => peer.to_string(),
}
}

/// The number of indexed IPs — for observability/tests.
pub fn len(&self) -> usize {
self.by_ip.len()
}

pub fn is_empty(&self) -> bool {
self.by_ip.is_empty()
}
}

/// Whether an IP string is a concrete address we can index. Kubernetes uses the
/// literal `"None"` for a headless Service's ClusterIP and may carry an empty string;
/// neither is a real address, so we skip them.
fn is_resolvable_ip(ip: &str) -> bool {
!ip.is_empty() && ip != "None"
}

/// Split a `peer` of the form `IP:port` into `(ip, port)`. Handles both IPv4
/// (`10.42.1.159:8086`) and bracketed IPv6 (`[fd00::1]:8086`); returns `None` when
/// there's no `:port` suffix to split on (so the caller leaves the peer untouched).
///
/// For IPv4 we split on the *last* colon (an IPv4 address has none of its own); for a
/// bracketed IPv6 literal we split after the closing bracket so the address's internal
/// colons are preserved.
fn split_ip_port(peer: &str) -> Option<(&str, &str)> {
if let Some(rest) = peer.strip_prefix('[') {
// Bracketed IPv6: `[addr]:port` → ip = `addr`, port after `]:`.
let (addr, after) = rest.split_once(']')?;
let port = after.strip_prefix(':')?;
if addr.is_empty() || port.is_empty() {
return None;
}
return Some((addr, port));
}
let (ip, port) = peer.rsplit_once(':')?;
if ip.is_empty() || port.is_empty() {
return None;
}
Some((ip, port))
}

#[cfg(test)]
mod tests;
Loading