"""Tests for Kubernetes manifest security hardening. Validates that all deployments in infra/k8s/ follow security best practices: - Scoped secrets (no monolithic stonks-secrets) - Pod security contexts (runAsNonRoot, seccompProfile) - Container security contexts (no privilege escalation, drop ALL caps) - automountServiceAccountToken disabled - Broker secrets only on trading-tier pods """ from __future__ import annotations from pathlib import Path import yaml K8S_DIR = Path("infra/k8s") # Services that legitimately need broker secrets BROKER_SECRET_ALLOWED = {"broker-adapter", "risk-engine"} # Services that legitimately need market-data secrets MARKET_SECRET_ALLOWED = {"ingestion-worker"} def _load_deployments() -> list[tuple[str, dict]]: """Load all Deployment objects from infra/k8s/*.yaml.""" deployments = [] for path in sorted(K8S_DIR.glob("*.yaml")): with open(path) as f: for doc in yaml.safe_load_all(f): if doc and doc.get("kind") == "Deployment": name = doc["metadata"]["name"] deployments.append((name, doc)) return deployments def _get_secret_refs(spec: dict) -> list[str]: """Extract all secretRef names from a pod spec's envFrom.""" refs = [] for container in spec.get("containers", []): for env_from in container.get("envFrom", []): secret = env_from.get("secretRef", {}) if secret.get("name"): refs.append(secret["name"]) return refs class TestSecretScoping: """Verify that the monolithic stonks-secrets is no longer used.""" def test_no_monolithic_secret_ref(self): """No deployment should reference the old stonks-secrets.""" for name, dep in _load_deployments(): pod_spec = dep["spec"]["template"]["spec"] refs = _get_secret_refs(pod_spec) assert "stonks-secrets" not in refs, ( f"Deployment {name} still references monolithic stonks-secrets" ) def test_broker_secrets_only_on_trading_tier(self): """Only broker-adapter and risk-engine should have broker secrets.""" for name, dep in _load_deployments(): pod_spec = dep["spec"]["template"]["spec"] refs = _get_secret_refs(pod_spec) if "stonks-broker-secrets" in refs: assert name in BROKER_SECRET_ALLOWED, ( f"Deployment {name} has broker secrets but is not in " f"allowed set {BROKER_SECRET_ALLOWED}" ) def test_market_secrets_only_on_ingestion(self): """Only ingestion-worker should have market-data secrets.""" for name, dep in _load_deployments(): pod_spec = dep["spec"]["template"]["spec"] refs = _get_secret_refs(pod_spec) if "stonks-market-secrets" in refs: assert name in MARKET_SECRET_ALLOWED, ( f"Deployment {name} has market secrets but is not in " f"allowed set {MARKET_SECRET_ALLOWED}" ) class TestPodSecurityContext: """Verify pod-level security settings.""" def test_run_as_non_root(self): for name, dep in _load_deployments(): pod_sec = dep["spec"]["template"]["spec"].get("securityContext", {}) assert pod_sec.get("runAsNonRoot") is True, ( f"Deployment {name} missing runAsNonRoot: true" ) def test_seccomp_profile(self): for name, dep in _load_deployments(): pod_sec = dep["spec"]["template"]["spec"].get("securityContext", {}) seccomp = pod_sec.get("seccompProfile", {}) assert seccomp.get("type") == "RuntimeDefault", ( f"Deployment {name} missing seccompProfile RuntimeDefault" ) def test_automount_service_account_disabled(self): for name, dep in _load_deployments(): pod_spec = dep["spec"]["template"]["spec"] assert pod_spec.get("automountServiceAccountToken") is False, ( f"Deployment {name} should set automountServiceAccountToken: false" ) class TestContainerSecurityContext: """Verify container-level security settings.""" def test_no_privilege_escalation(self): for name, dep in _load_deployments(): for container in dep["spec"]["template"]["spec"]["containers"]: sec = container.get("securityContext", {}) assert sec.get("allowPrivilegeEscalation") is False, ( f"Deployment {name}, container {container['name']} " f"missing allowPrivilegeEscalation: false" ) def test_drop_all_capabilities(self): for name, dep in _load_deployments(): for container in dep["spec"]["template"]["spec"]["containers"]: sec = container.get("securityContext", {}) caps = sec.get("capabilities", {}) assert "ALL" in caps.get("drop", []), ( f"Deployment {name}, container {container['name']} " f"should drop ALL capabilities" ) class TestNetworkPolicies: """Verify network policy manifests exist and cover key patterns.""" def _load_netpols(self) -> list[dict]: policies = [] for path in K8S_DIR.glob("*.yaml"): with open(path) as f: for doc in yaml.safe_load_all(f): if doc and doc.get("kind") == "NetworkPolicy": policies.append(doc) return policies def test_default_deny_exists(self): policies = self._load_netpols() deny_policies = [ p for p in policies if p["metadata"]["name"] == "default-deny-ingress" ] assert len(deny_policies) == 1, "Missing default-deny-ingress NetworkPolicy" def test_broker_adapter_denied_ingress(self): policies = self._load_netpols() broker_policies = [ p for p in policies if p["spec"].get("podSelector", {}).get("matchLabels", {}).get("app") == "broker-adapter" ] assert len(broker_policies) >= 1, "Missing NetworkPolicy for broker-adapter" # Should have empty ingress (deny all inbound) for p in broker_policies: assert p["spec"].get("ingress") == [] or p["spec"].get("ingress") is None, ( "broker-adapter should deny all ingress" ) def test_risk_engine_restricted_ingress(self): policies = self._load_netpols() risk_policies = [ p for p in policies if p["spec"].get("podSelector", {}).get("matchLabels", {}).get("app") == "risk-engine" ] assert len(risk_policies) >= 1, "Missing NetworkPolicy for risk-engine" class TestSecretsManifest: """Verify the secrets manifest uses scoped secrets.""" def _load_secrets(self) -> list[dict]: secrets = [] path = K8S_DIR / "secrets.yaml" with open(path) as f: for doc in yaml.safe_load_all(f): if doc and doc.get("kind") == "Secret": secrets.append(doc) return secrets def test_scoped_secrets_exist(self): secrets = self._load_secrets() names = {s["metadata"]["name"] for s in secrets} assert "stonks-core-secrets" in names assert "stonks-broker-secrets" in names assert "stonks-market-secrets" in names assert "stonks-dashboard-secrets" in names def test_no_monolithic_secret(self): secrets = self._load_secrets() names = {s["metadata"]["name"] for s in secrets} assert "stonks-secrets" not in names, ( "Monolithic stonks-secrets should be replaced by scoped secrets" ) def test_no_plaintext_defaults(self): """Secret values should be REPLACE_ME placeholders, not real defaults.""" secrets = self._load_secrets() for secret in secrets: for key, value in secret.get("stringData", {}).items(): if value: # skip empty strings (e.g. REDIS_PASSWORD) assert value != "changeme", ( f"Secret {secret['metadata']['name']}.{key} " f"still has 'changeme' default" )