213 lines
8.2 KiB
Python
213 lines
8.2 KiB
Python
"""Tests for Kubernetes manifest security hardening.
|
|
|
|
Validates that all deployments in infra/k8s/ follow security best practices:
|
|
- Scoped secrets (no monolithic stonks-secrets)
|
|
- Pod security contexts (runAsNonRoot, seccompProfile)
|
|
- Container security contexts (no privilege escalation, drop ALL caps)
|
|
- automountServiceAccountToken disabled
|
|
- Broker secrets only on trading-tier pods
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import glob
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
|
|
K8S_DIR = Path("infra/k8s")
|
|
|
|
# Services that legitimately need broker secrets
|
|
BROKER_SECRET_ALLOWED = {"broker-adapter", "risk-engine"}
|
|
|
|
# Services that legitimately need market-data secrets
|
|
MARKET_SECRET_ALLOWED = {"ingestion-worker"}
|
|
|
|
|
|
def _load_deployments() -> list[tuple[str, dict]]:
|
|
"""Load all Deployment objects from infra/k8s/*.yaml."""
|
|
deployments = []
|
|
for path in sorted(K8S_DIR.glob("*.yaml")):
|
|
with open(path) as f:
|
|
for doc in yaml.safe_load_all(f):
|
|
if doc and doc.get("kind") == "Deployment":
|
|
name = doc["metadata"]["name"]
|
|
deployments.append((name, doc))
|
|
return deployments
|
|
|
|
|
|
def _get_secret_refs(spec: dict) -> list[str]:
|
|
"""Extract all secretRef names from a pod spec's envFrom."""
|
|
refs = []
|
|
for container in spec.get("containers", []):
|
|
for env_from in container.get("envFrom", []):
|
|
secret = env_from.get("secretRef", {})
|
|
if secret.get("name"):
|
|
refs.append(secret["name"])
|
|
return refs
|
|
|
|
|
|
class TestSecretScoping:
|
|
"""Verify that the monolithic stonks-secrets is no longer used."""
|
|
|
|
def test_no_monolithic_secret_ref(self):
|
|
"""No deployment should reference the old stonks-secrets."""
|
|
for name, dep in _load_deployments():
|
|
pod_spec = dep["spec"]["template"]["spec"]
|
|
refs = _get_secret_refs(pod_spec)
|
|
assert "stonks-secrets" not in refs, (
|
|
f"Deployment {name} still references monolithic stonks-secrets"
|
|
)
|
|
|
|
def test_broker_secrets_only_on_trading_tier(self):
|
|
"""Only broker-adapter and risk-engine should have broker secrets."""
|
|
for name, dep in _load_deployments():
|
|
pod_spec = dep["spec"]["template"]["spec"]
|
|
refs = _get_secret_refs(pod_spec)
|
|
if "stonks-broker-secrets" in refs:
|
|
assert name in BROKER_SECRET_ALLOWED, (
|
|
f"Deployment {name} has broker secrets but is not in "
|
|
f"allowed set {BROKER_SECRET_ALLOWED}"
|
|
)
|
|
|
|
def test_market_secrets_only_on_ingestion(self):
|
|
"""Only ingestion-worker should have market-data secrets."""
|
|
for name, dep in _load_deployments():
|
|
pod_spec = dep["spec"]["template"]["spec"]
|
|
refs = _get_secret_refs(pod_spec)
|
|
if "stonks-market-secrets" in refs:
|
|
assert name in MARKET_SECRET_ALLOWED, (
|
|
f"Deployment {name} has market secrets but is not in "
|
|
f"allowed set {MARKET_SECRET_ALLOWED}"
|
|
)
|
|
|
|
|
|
class TestPodSecurityContext:
|
|
"""Verify pod-level security settings."""
|
|
|
|
def test_run_as_non_root(self):
|
|
for name, dep in _load_deployments():
|
|
pod_sec = dep["spec"]["template"]["spec"].get("securityContext", {})
|
|
assert pod_sec.get("runAsNonRoot") is True, (
|
|
f"Deployment {name} missing runAsNonRoot: true"
|
|
)
|
|
|
|
def test_seccomp_profile(self):
|
|
for name, dep in _load_deployments():
|
|
pod_sec = dep["spec"]["template"]["spec"].get("securityContext", {})
|
|
seccomp = pod_sec.get("seccompProfile", {})
|
|
assert seccomp.get("type") == "RuntimeDefault", (
|
|
f"Deployment {name} missing seccompProfile RuntimeDefault"
|
|
)
|
|
|
|
def test_automount_service_account_disabled(self):
|
|
for name, dep in _load_deployments():
|
|
pod_spec = dep["spec"]["template"]["spec"]
|
|
assert pod_spec.get("automountServiceAccountToken") is False, (
|
|
f"Deployment {name} should set automountServiceAccountToken: false"
|
|
)
|
|
|
|
|
|
class TestContainerSecurityContext:
|
|
"""Verify container-level security settings."""
|
|
|
|
def test_no_privilege_escalation(self):
|
|
for name, dep in _load_deployments():
|
|
for container in dep["spec"]["template"]["spec"]["containers"]:
|
|
sec = container.get("securityContext", {})
|
|
assert sec.get("allowPrivilegeEscalation") is False, (
|
|
f"Deployment {name}, container {container['name']} "
|
|
f"missing allowPrivilegeEscalation: false"
|
|
)
|
|
|
|
def test_drop_all_capabilities(self):
|
|
for name, dep in _load_deployments():
|
|
for container in dep["spec"]["template"]["spec"]["containers"]:
|
|
sec = container.get("securityContext", {})
|
|
caps = sec.get("capabilities", {})
|
|
assert "ALL" in caps.get("drop", []), (
|
|
f"Deployment {name}, container {container['name']} "
|
|
f"should drop ALL capabilities"
|
|
)
|
|
|
|
|
|
class TestNetworkPolicies:
|
|
"""Verify network policy manifests exist and cover key patterns."""
|
|
|
|
def _load_netpols(self) -> list[dict]:
|
|
policies = []
|
|
for path in K8S_DIR.glob("*.yaml"):
|
|
with open(path) as f:
|
|
for doc in yaml.safe_load_all(f):
|
|
if doc and doc.get("kind") == "NetworkPolicy":
|
|
policies.append(doc)
|
|
return policies
|
|
|
|
def test_default_deny_exists(self):
|
|
policies = self._load_netpols()
|
|
deny_policies = [
|
|
p for p in policies
|
|
if p["metadata"]["name"] == "default-deny-ingress"
|
|
]
|
|
assert len(deny_policies) == 1, "Missing default-deny-ingress NetworkPolicy"
|
|
|
|
def test_broker_adapter_denied_ingress(self):
|
|
policies = self._load_netpols()
|
|
broker_policies = [
|
|
p for p in policies
|
|
if p["spec"].get("podSelector", {}).get("matchLabels", {}).get("app") == "broker-adapter"
|
|
]
|
|
assert len(broker_policies) >= 1, "Missing NetworkPolicy for broker-adapter"
|
|
# Should have empty ingress (deny all inbound)
|
|
for p in broker_policies:
|
|
assert p["spec"].get("ingress") == [] or p["spec"].get("ingress") is None, (
|
|
"broker-adapter should deny all ingress"
|
|
)
|
|
|
|
def test_risk_engine_restricted_ingress(self):
|
|
policies = self._load_netpols()
|
|
risk_policies = [
|
|
p for p in policies
|
|
if p["spec"].get("podSelector", {}).get("matchLabels", {}).get("app") == "risk-engine"
|
|
]
|
|
assert len(risk_policies) >= 1, "Missing NetworkPolicy for risk-engine"
|
|
|
|
|
|
class TestSecretsManifest:
|
|
"""Verify the secrets manifest uses scoped secrets."""
|
|
|
|
def _load_secrets(self) -> list[dict]:
|
|
secrets = []
|
|
path = K8S_DIR / "secrets.yaml"
|
|
with open(path) as f:
|
|
for doc in yaml.safe_load_all(f):
|
|
if doc and doc.get("kind") == "Secret":
|
|
secrets.append(doc)
|
|
return secrets
|
|
|
|
def test_scoped_secrets_exist(self):
|
|
secrets = self._load_secrets()
|
|
names = {s["metadata"]["name"] for s in secrets}
|
|
assert "stonks-core-secrets" in names
|
|
assert "stonks-broker-secrets" in names
|
|
assert "stonks-market-secrets" in names
|
|
assert "stonks-dashboard-secrets" in names
|
|
|
|
def test_no_monolithic_secret(self):
|
|
secrets = self._load_secrets()
|
|
names = {s["metadata"]["name"] for s in secrets}
|
|
assert "stonks-secrets" not in names, (
|
|
"Monolithic stonks-secrets should be replaced by scoped secrets"
|
|
)
|
|
|
|
def test_no_plaintext_defaults(self):
|
|
"""Secret values should be REPLACE_ME placeholders, not real defaults."""
|
|
secrets = self._load_secrets()
|
|
for secret in secrets:
|
|
for key, value in secret.get("stringData", {}).items():
|
|
if value: # skip empty strings (e.g. REDIS_PASSWORD)
|
|
assert value != "changeme", (
|
|
f"Secret {secret['metadata']['name']}.{key} "
|
|
f"still has 'changeme' default"
|
|
)
|