Files
stonks-oracle/services/shared/db.py
T
Celes Renata ebea70573b phase 0+1: project scaffold, k8s manifests, CI pipeline, steering, hooks, tests
- Repository structure for all services, infra, lakehouse, dashboards
- K8s manifests targeting stonks-oracle namespace with GHCR images
- Ingress via Traefik with ca-issuer TLS for internal services
- ConfigMap wired to existing cluster services (pg, redis, minio, ollama)
- GitHub Actions workflow for lint, test, multi-service container builds
- Dockerfile with build-arg CMD per service
- Makefile for local build/push/deploy
- Steering rules for TDD workflow, K8s conventions, project context
- Agent hooks for lint-on-save, test-on-save, k8s-validate, phase-commit
- Ruff linter config, all lint issues fixed
- 14 passing tests for schemas, config, redis keys
- PostgreSQL migrations, Trino catalogs, Superset config, MinIO lifecycle
2026-04-11 03:25:08 -07:00

34 lines
806 B
Python

"""Database connection helpers."""
import asyncpg
import redis.asyncio as aioredis
from minio import Minio
from .config import AppConfig
async def get_pg_pool(config: AppConfig) -> asyncpg.Pool:
"""Create a PostgreSQL connection pool."""
return await asyncpg.create_pool(
dsn=config.postgres.dsn,
min_size=2,
max_size=10,
)
def get_redis(config: AppConfig) -> aioredis.Redis:
"""Create a Redis async client."""
return aioredis.from_url(
config.redis.url,
decode_responses=True,
)
def get_minio(config: AppConfig) -> Minio:
"""Create a MinIO client."""
return Minio(
config.minio.endpoint,
access_key=config.minio.access_key,
secret_key=config.minio.secret_key,
secure=config.minio.secure,
)