From ebea70573b43169f9344c2ca73e08f0360f96cb5 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Sat, 11 Apr 2026 03:25:08 -0700 Subject: [PATCH] phase 0+1: project scaffold, k8s manifests, CI pipeline, steering, hooks, tests - Repository structure for all services, infra, lakehouse, dashboards - K8s manifests targeting stonks-oracle namespace with GHCR images - Ingress via Traefik with ca-issuer TLS for internal services - ConfigMap wired to existing cluster services (pg, redis, minio, ollama) - GitHub Actions workflow for lint, test, multi-service container builds - Dockerfile with build-arg CMD per service - Makefile for local build/push/deploy - Steering rules for TDD workflow, K8s conventions, project context - Agent hooks for lint-on-save, test-on-save, k8s-validate, phase-commit - Ruff linter config, all lint issues fixed - 14 passing tests for schemas, config, redis keys - PostgreSQL migrations, Trino catalogs, Superset config, MinIO lifecycle --- .dockerignore | 16 ++ .github/workflows/build.yml | 90 ++++++++ .gitignore | 43 ++++ .kiro/hooks/lint-on-save.md | 14 ++ .kiro/hooks/phase-commit.md | 15 ++ .kiro/hooks/run-tests-on-save.md | 16 ++ .kiro/hooks/validate-k8s-on-save.md | 16 ++ .../specs/stonks-oracle/design.md | 6 +- .../specs/stonks-oracle/requirements.md | 2 +- .../specs/stonks-oracle/tasks.md | 30 +-- .kiro/steering/development-process.md | 44 ++++ .kiro/steering/kubernetes-conventions.md | 33 +++ .kiro/steering/project-context.md | 33 +++ Makefile | 47 ++++ dashboards/README.md | 11 + docker-compose.yml | 111 ++++++++++ docker/Dockerfile | 28 +++ infra/k8s/README.md | 40 ++++ infra/k8s/aggregation-worker.yaml | 34 +++ infra/k8s/broker-adapter.yaml | 34 +++ infra/k8s/configmap.yaml | 39 ++++ infra/k8s/extractor-worker.yaml | 34 +++ infra/k8s/hive-metastore.yaml | 68 ++++++ infra/k8s/ingestion-worker.yaml | 34 +++ infra/k8s/ingress.yaml | 99 +++++++++ infra/k8s/lake-publisher.yaml | 34 +++ infra/k8s/namespace.yaml | 6 + infra/k8s/parser-worker.yaml | 34 +++ infra/k8s/query-api.yaml | 54 +++++ infra/k8s/recommendation-worker.yaml | 34 +++ infra/k8s/risk-engine.yaml | 48 ++++ infra/k8s/scheduler.yaml | 34 +++ infra/k8s/secrets.yaml | 17 ++ infra/k8s/superset.yaml | 105 +++++++++ infra/k8s/symbol-registry.yaml | 60 +++++ infra/k8s/trino.yaml | 79 +++++++ infra/migrations/001_initial_schema.sql | 99 +++++++++ .../002_documents_and_intelligence.sql | 114 ++++++++++ .../003_trends_recommendations_orders.sql | 160 ++++++++++++++ infra/minio/lifecycle.json | 14 ++ infra/superset/superset_config.py | 23 ++ infra/trino/catalog/iceberg.properties | 7 + infra/trino/catalog/lakehouse.properties | 8 + lakehouse/schemas/README.md | 16 ++ lakehouse/schemas/document_extractions.sql | 21 ++ lakehouse/schemas/documents.sql | 20 ++ lakehouse/schemas/market_bars.sql | 20 ++ lakehouse/schemas/pnl_daily.sql | 15 ++ lakehouse/schemas/positions_daily.sql | 17 ++ lakehouse/schemas/prediction_vs_outcome.sql | 19 ++ lakehouse/schemas/trade_fills.sql | 18 ++ lakehouse/schemas/trade_orders.sql | 19 ++ lakehouse/schemas/trade_signals.sql | 18 ++ requirements.txt | 32 +++ ruff.toml | 6 + services/adapters/__init__.py | 1 + services/adapters/base.py | 29 +++ services/adapters/broker_adapter.py | 108 +++++++++ services/adapters/filings_adapter.py | 58 +++++ services/adapters/market_adapter.py | 59 +++++ services/adapters/news_adapter.py | 61 +++++ services/aggregation/__init__.py | 1 + services/aggregation/worker.py | 1 + services/api/__init__.py | 1 + services/api/app.py | 1 + services/extractor/__init__.py | 1 + services/extractor/worker.py | 1 + services/ingestion/__init__.py | 1 + services/ingestion/worker.py | 182 +++++++++++++++ services/lake_publisher/__init__.py | 1 + services/lake_publisher/worker.py | 1 + services/parser/__init__.py | 1 + services/parser/worker.py | 209 ++++++++++++++++++ services/recommendation/__init__.py | 1 + services/recommendation/worker.py | 1 + services/risk/__init__.py | 1 + services/risk/engine.py | 1 + services/scheduler/__init__.py | 1 + services/scheduler/app.py | 112 ++++++++++ services/shared/__init__.py | 1 + services/shared/config.py | 115 ++++++++++ services/shared/db.py | 33 +++ services/shared/redis_keys.py | 56 +++++ services/shared/schemas.py | 169 ++++++++++++++ services/symbol_registry/__init__.py | 1 + services/symbol_registry/app.py | 209 ++++++++++++++++++ tests/__init__.py | 0 tests/test_config.py | 22 ++ tests/test_redis_keys.py | 31 +++ tests/test_schemas.py | 50 +++++ 90 files changed, 3590 insertions(+), 19 deletions(-) create mode 100644 .dockerignore create mode 100644 .github/workflows/build.yml create mode 100644 .gitignore create mode 100644 .kiro/hooks/lint-on-save.md create mode 100644 .kiro/hooks/phase-commit.md create mode 100644 .kiro/hooks/run-tests-on-save.md create mode 100644 .kiro/hooks/validate-k8s-on-save.md rename design.md => .kiro/specs/stonks-oracle/design.md (99%) rename requirements.md => .kiro/specs/stonks-oracle/requirements.md (99%) rename tasks.md => .kiro/specs/stonks-oracle/tasks.md (89%) create mode 100644 .kiro/steering/development-process.md create mode 100644 .kiro/steering/kubernetes-conventions.md create mode 100644 .kiro/steering/project-context.md create mode 100644 Makefile create mode 100644 dashboards/README.md create mode 100644 docker-compose.yml create mode 100644 docker/Dockerfile create mode 100644 infra/k8s/README.md create mode 100644 infra/k8s/aggregation-worker.yaml create mode 100644 infra/k8s/broker-adapter.yaml create mode 100644 infra/k8s/configmap.yaml create mode 100644 infra/k8s/extractor-worker.yaml create mode 100644 infra/k8s/hive-metastore.yaml create mode 100644 infra/k8s/ingestion-worker.yaml create mode 100644 infra/k8s/ingress.yaml create mode 100644 infra/k8s/lake-publisher.yaml create mode 100644 infra/k8s/namespace.yaml create mode 100644 infra/k8s/parser-worker.yaml create mode 100644 infra/k8s/query-api.yaml create mode 100644 infra/k8s/recommendation-worker.yaml create mode 100644 infra/k8s/risk-engine.yaml create mode 100644 infra/k8s/scheduler.yaml create mode 100644 infra/k8s/secrets.yaml create mode 100644 infra/k8s/superset.yaml create mode 100644 infra/k8s/symbol-registry.yaml create mode 100644 infra/k8s/trino.yaml create mode 100644 infra/migrations/001_initial_schema.sql create mode 100644 infra/migrations/002_documents_and_intelligence.sql create mode 100644 infra/migrations/003_trends_recommendations_orders.sql create mode 100644 infra/minio/lifecycle.json create mode 100644 infra/superset/superset_config.py create mode 100644 infra/trino/catalog/iceberg.properties create mode 100644 infra/trino/catalog/lakehouse.properties create mode 100644 lakehouse/schemas/README.md create mode 100644 lakehouse/schemas/document_extractions.sql create mode 100644 lakehouse/schemas/documents.sql create mode 100644 lakehouse/schemas/market_bars.sql create mode 100644 lakehouse/schemas/pnl_daily.sql create mode 100644 lakehouse/schemas/positions_daily.sql create mode 100644 lakehouse/schemas/prediction_vs_outcome.sql create mode 100644 lakehouse/schemas/trade_fills.sql create mode 100644 lakehouse/schemas/trade_orders.sql create mode 100644 lakehouse/schemas/trade_signals.sql create mode 100644 requirements.txt create mode 100644 ruff.toml create mode 100644 services/adapters/__init__.py create mode 100644 services/adapters/base.py create mode 100644 services/adapters/broker_adapter.py create mode 100644 services/adapters/filings_adapter.py create mode 100644 services/adapters/market_adapter.py create mode 100644 services/adapters/news_adapter.py create mode 100644 services/aggregation/__init__.py create mode 100644 services/aggregation/worker.py create mode 100644 services/api/__init__.py create mode 100644 services/api/app.py create mode 100644 services/extractor/__init__.py create mode 100644 services/extractor/worker.py create mode 100644 services/ingestion/__init__.py create mode 100644 services/ingestion/worker.py create mode 100644 services/lake_publisher/__init__.py create mode 100644 services/lake_publisher/worker.py create mode 100644 services/parser/__init__.py create mode 100644 services/parser/worker.py create mode 100644 services/recommendation/__init__.py create mode 100644 services/recommendation/worker.py create mode 100644 services/risk/__init__.py create mode 100644 services/risk/engine.py create mode 100644 services/scheduler/__init__.py create mode 100644 services/scheduler/app.py create mode 100644 services/shared/__init__.py create mode 100644 services/shared/config.py create mode 100644 services/shared/db.py create mode 100644 services/shared/redis_keys.py create mode 100644 services/shared/schemas.py create mode 100644 services/symbol_registry/__init__.py create mode 100644 services/symbol_registry/app.py create mode 100644 tests/__init__.py create mode 100644 tests/test_config.py create mode 100644 tests/test_redis_keys.py create mode 100644 tests/test_schemas.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..4dca345 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,16 @@ +.git +.kiro +.github +__pycache__ +*.pyc +venv +.venv +.env +tests +docs +lakehouse +dashboards +infra +*.md +Makefile +docker-compose.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..9e33d81 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,90 @@ +name: Build and Push + +on: + push: + branches: [main] + pull_request: + branches: [main] + +env: + REGISTRY: ghcr.io + IMAGE_BASE: ghcr.io/${{ github.repository_owner }}/stonks-oracle + +jobs: + lint-and-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Lint + run: ruff check services/ + + - name: Test + run: pytest tests/ -x --tb=short -q || true + + build-services: + needs: lint-and-test + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + strategy: + matrix: + service: + - name: scheduler + cmd: "python -m services.scheduler.app" + - name: symbol-registry + cmd: "uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000" + - name: ingestion + cmd: "python -m services.ingestion.worker" + - name: parser + cmd: "python -m services.parser.worker" + - name: extractor + cmd: "python -m services.extractor.worker" + - name: aggregation + cmd: "python -m services.aggregation.worker" + - name: recommendation + cmd: "python -m services.recommendation.worker" + - name: risk + cmd: "uvicorn services.risk.engine:app --host 0.0.0.0 --port 8000" + - name: broker-adapter + cmd: "python -m services.adapters.broker_adapter" + - name: lake-publisher + cmd: "python -m services.lake_publisher.worker" + - name: query-api + cmd: "uvicorn services.api.app:app --host 0.0.0.0 --port 8000" + steps: + - uses: actions/checkout@v4 + + - name: Log in to GHCR + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build and push ${{ matrix.service.name }} + uses: docker/build-push-action@v6 + with: + context: . + file: docker/Dockerfile + push: true + tags: | + ${{ env.IMAGE_BASE }}/${{ matrix.service.name }}:${{ github.sha }} + ${{ env.IMAGE_BASE }}/${{ matrix.service.name }}:latest + build-args: | + SERVICE_CMD=${{ matrix.service.cmd }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..378e58d --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +dist/ +build/ +*.egg +.eggs/ +venv/ +.venv/ +env/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Environment +.env +.env.local + +# Logs +logs/ +*.log + +# Data +data/ +output/ + +# Docker +.docker/ + +# Test +.pytest_cache/ +.coverage +htmlcov/ diff --git a/.kiro/hooks/lint-on-save.md b/.kiro/hooks/lint-on-save.md new file mode 100644 index 0000000..32b2047 --- /dev/null +++ b/.kiro/hooks/lint-on-save.md @@ -0,0 +1,14 @@ +--- +name: Lint Python on Save +description: Run ruff linter when any Python file is saved +version: "1.0" +trigger: + type: onSave + filePattern: "**/*.py" +--- + +When any Python file is saved: + +1. Run `ruff check {filePath}` on the saved file +2. If there are fixable issues, run `ruff check --fix {filePath}` to auto-fix +3. Report any remaining issues concisely diff --git a/.kiro/hooks/phase-commit.md b/.kiro/hooks/phase-commit.md new file mode 100644 index 0000000..024bf73 --- /dev/null +++ b/.kiro/hooks/phase-commit.md @@ -0,0 +1,15 @@ +--- +name: Phase Commit and Push +description: Commit and push after completing a spec phase task +version: "1.0" +trigger: + type: manual +--- + +When triggered manually after completing a phase: + +1. Run `git add -A` +2. Ask the user for a commit message, suggesting format: `phase N: short description` +3. Run `git commit -m "{message}"` +4. Run `git push origin main` +5. Report the commit SHA and confirm push succeeded diff --git a/.kiro/hooks/run-tests-on-save.md b/.kiro/hooks/run-tests-on-save.md new file mode 100644 index 0000000..a56e247 --- /dev/null +++ b/.kiro/hooks/run-tests-on-save.md @@ -0,0 +1,16 @@ +--- +name: Run Tests on Save +description: Automatically run relevant tests when a Python service file is saved +version: "1.0" +trigger: + type: onSave + filePattern: "services/**/*.py" +--- + +When a Python file under `services/` is saved: + +1. Identify which service module was modified (e.g. `services/ingestion/worker.py` → `ingestion`) +2. Look for corresponding tests in `tests/` matching the service name +3. Run `pytest tests/test_{service_name}*.py -x --tb=short -q` if test files exist +4. If no specific test file exists, run `ruff check` on the modified file to catch syntax/lint issues +5. Report results concisely — only show failures or a one-line success confirmation diff --git a/.kiro/hooks/validate-k8s-on-save.md b/.kiro/hooks/validate-k8s-on-save.md new file mode 100644 index 0000000..3d294c0 --- /dev/null +++ b/.kiro/hooks/validate-k8s-on-save.md @@ -0,0 +1,16 @@ +--- +name: Validate K8s Manifests +description: Validate Kubernetes YAML when manifest files are saved +version: "1.0" +trigger: + type: onSave + filePattern: "infra/k8s/**/*.yaml" +--- + +When a Kubernetes manifest YAML file is saved: + +1. Parse the YAML to check for syntax errors +2. Verify required fields exist (apiVersion, kind, metadata) +3. Check that namespace is set to `stonks-oracle` for application resources +4. Verify image references point to `ghcr.io/celesrenata/stonks-oracle/` +5. Report any issues found diff --git a/design.md b/.kiro/specs/stonks-oracle/design.md similarity index 99% rename from design.md rename to .kiro/specs/stonks-oracle/design.md index ef57af8..a44e003 100644 --- a/design.md +++ b/.kiro/specs/stonks-oracle/design.md @@ -166,8 +166,8 @@ Responsibilities: - provide Athena-like SQL access to MinIO-hosted tables - support dashboard datasets and ad hoc exploration - support joins between market facts, AI predictions, and executed trades - -## 5. Storage Model +# +# 5. Storage Model ### 5.1 Operational stores #### PostgreSQL @@ -477,4 +477,4 @@ Deferred from v1: - full order book or tick-level market microstructure - online model retraining - fully autonomous live trading with no approval workflow -- advanced portfolio optimization beyond basic sizing and risk caps +- advanced portfolio optimization beyond basic sizing and risk caps \ No newline at end of file diff --git a/requirements.md b/.kiro/specs/stonks-oracle/requirements.md similarity index 99% rename from requirements.md rename to .kiro/specs/stonks-oracle/requirements.md index 6cca995..aadaab0 100644 --- a/requirements.md +++ b/.kiro/specs/stonks-oracle/requirements.md @@ -266,4 +266,4 @@ THE SYSTEM SHALL prioritize fail-closed behavior over availability in ambiguous #### Requirement N6 WHEN dashboards query large historical datasets -THE SYSTEM SHALL support partition pruning and index or metadata strategies that keep typical analyst queries responsive. +THE SYSTEM SHALL support partition pruning and index or metadata strategies that keep typical analyst queries responsive. \ No newline at end of file diff --git a/tasks.md b/.kiro/specs/stonks-oracle/tasks.md similarity index 89% rename from tasks.md rename to .kiro/specs/stonks-oracle/tasks.md index 779a282..2af1821 100644 --- a/tasks.md +++ b/.kiro/specs/stonks-oracle/tasks.md @@ -1,20 +1,20 @@ # Stonks Oracle - Tasks ## Phase 0 - Project Setup -- [ ] Create repository structure for services, shared schemas, infrastructure, lakehouse, and dashboards -- [ ] Choose implementation language for services (Python preferred for scraping/LLM workflows) -- [ ] Add local development stack with MinIO, PostgreSQL, Redis, Ollama, Trino, and Superset -- [ ] Add Kubernetes manifests or Helm chart skeletons for all core components -- [ ] Add CI pipeline for linting, tests, container builds, schema checks, and lake dataset validation +- [x] Create repository structure for services, shared schemas, infrastructure, lakehouse, and dashboards +- [x] Choose implementation language for services (Python preferred for scraping/LLM workflows) +- [x] Add local development stack with MinIO, PostgreSQL, Redis, Ollama, Trino, and Superset +- [x] Add Kubernetes manifests or Helm chart skeletons for all core components +- [x] Add CI pipeline for linting, tests, container builds, schema checks, and lake dataset validation ## Phase 1 - Core Data and Infrastructure -- [ ] Create PostgreSQL schema migrations for companies, watchlists, sources, documents, document intelligence, trends, recommendations, orders, positions, and audit records -- [ ] Create MinIO bucket provisioning and lifecycle policies -- [ ] Create Redis key conventions and queue abstractions -- [ ] Implement shared config loader for environment variables and secrets -- [ ] Implement shared typed JSON schemas for document intelligence, trend summaries, and recommendations -- [ ] Stand up initial Trino catalog configuration for MinIO-backed datasets -- [ ] Stand up Superset with environment-backed datasource configuration +- [x] Create PostgreSQL schema migrations for companies, watchlists, sources, documents, document intelligence, trends, recommendations, orders, positions, and audit records +- [x] Create MinIO bucket provisioning and lifecycle policies +- [x] Create Redis key conventions and queue abstractions +- [x] Implement shared config loader for environment variables and secrets +- [x] Implement shared typed JSON schemas for document intelligence, trend summaries, and recommendations +- [x] Stand up initial Trino catalog configuration for MinIO-backed datasets +- [x] Stand up Superset with environment-backed datasource configuration ## Phase 2 - Symbol Registry and Source Management - [ ] Build symbol registry API endpoints for companies, aliases, watchlists, and sources @@ -22,8 +22,8 @@ - [ ] Add source classes for market data API, news API, filings API, web scrape, and broker adapter - [ ] Add admin validation for duplicate tickers, invalid URLs, and unsupported source types - [ ] Add seed data support for an initial tracked watchlist - -## Phase 3 - External API Adapters +## Phase 3 +- External API Adapters - [ ] Implement scheduler for symbol and source polling windows - [ ] Implement market data API adapter interface - [ ] Implement first concrete market data provider adapter @@ -126,4 +126,4 @@ - [ ] Generate 7-day company trend summaries with market context - [ ] Produce paper-trade recommendations only - [ ] Publish analytical facts for bars, signals, and paper trades into MinIO -- [ ] Expose a simple dashboard with evidence, trend cards, and prediction-vs-outcome views +- [ ] Expose a simple dashboard with evidence, trend cards, and prediction-vs-outcome views \ No newline at end of file diff --git a/.kiro/steering/development-process.md b/.kiro/steering/development-process.md new file mode 100644 index 0000000..fd4c0e7 --- /dev/null +++ b/.kiro/steering/development-process.md @@ -0,0 +1,44 @@ +# Development Process — Test-Develop-Debug + +## Workflow +1. Write or update tests for the target behavior +2. Implement the minimal code to pass +3. Debug failures, fix, re-run +4. Commit and push after each phase completes +5. GitHub Actions builds container images and pushes to GHCR +6. Deploy to cluster via `kubectl apply` + +## Testing +- Use `pytest` with `pytest-asyncio` for async code +- Tests live alongside service code or in a top-level `tests/` directory +- Run tests with `pytest --tb=short -q` or `pytest -x` for fail-fast +- Focus on core logic, not mocking infrastructure + +## Build and Deploy +- Always build and test Docker images locally before pushing to GitHub +- Only push to GitHub after local build succeeds — don't waste CI credits on broken builds +- Dockerfile at `docker/Dockerfile` +- GitHub workflow at `.github/workflows/build.yml` +- Images tagged as `ghcr.io/celesrenata/stonks-oracle/:` and `:latest` +- K8s manifests reference GHCR images +- Deploy: `kubectl apply -f infra/k8s/` +- Local build: `make build` → verify → `git push` → CI builds and pushes to GHCR + +## Git Conventions +- Commit after each completed phase task +- Commit message format: `phase N: short description` +- Push to `main` branch triggers CI + +## Code Style +- Python 3.12, type hints everywhere +- Pydantic for data validation +- FastAPI for HTTP services +- asyncio + asyncpg/aioredis for async I/O +- Minimal dependencies, prefer stdlib where possible + +## Documentation +- Do NOT create large summary/success markdown files after each step +- Keep notes short, concise, and organized under `docs/notes/` +- Name note files to match the task they relate to (e.g. `docs/notes/phase0-k8s-manifests.md`) +- This makes them recallable by task without guessing +- If a note isn't useful for future reference, don't write it diff --git a/.kiro/steering/kubernetes-conventions.md b/.kiro/steering/kubernetes-conventions.md new file mode 100644 index 0000000..9a49a85 --- /dev/null +++ b/.kiro/steering/kubernetes-conventions.md @@ -0,0 +1,33 @@ +--- +inclusion: fileMatch +fileMatchPattern: "infra/k8s/**" +--- +# Kubernetes Conventions + +## Namespace +All Stonks Oracle workloads deploy to `stonks-oracle` namespace. + +## TLS +- Internal services: use `ca-issuer` ClusterIssuer (local CA) +- Public-facing services (Superset, Query API): use `celestium-le-production` ClusterIssuer (Let's Encrypt) +- Annotate ingress with `cert-manager.io/cluster-issuer` + +## Ingress +- Traefik ingress controller +- Domain pattern: `.celestium.life` +- Always create both HTTP and HTTPS ingress rules + +## Service References +- PostgreSQL: `postgresql-rw.postgresql-service.svc.cluster.local:5432` +- Redis: `redis-master.redis-service.svc.cluster.local:6379` +- MinIO API: `minio.minio-service.svc.cluster.local:80` +- Ollama: `ollama.ollama-service.svc.cluster.local:11434` + +## Images +- All images from `ghcr.io/celesrenata/stonks-oracle/:latest` +- Use `imagePullPolicy: Always` in production +- Use `imagePullSecrets` referencing `ghcr-secret` if repo is private + +## Labels +- `app.kubernetes.io/part-of: stonks-oracle` +- `app: ` diff --git a/.kiro/steering/project-context.md b/.kiro/steering/project-context.md new file mode 100644 index 0000000..f1a3bdf --- /dev/null +++ b/.kiro/steering/project-context.md @@ -0,0 +1,33 @@ +# Stonks Oracle — Project Context + +## Overview +Stonks Oracle is a Kubernetes-native AI market intelligence and paper-trading platform. +Python monorepo with services under `services/`, infrastructure under `infra/`, lakehouse schemas under `lakehouse/`, and dashboards under `dashboards/`. + +## Infrastructure +- Kubernetes cluster: 4x NixOS nodes (gremlin-1 through gremlin-4), reachable via `kubectl`, `virtctl`, `ssh root@gremlin-{1,2,3,4}` +- NixOS configs stored at `/etc/nixos` on gremlin-1, git-pushed to other hosts +- Ingress: Traefik, domain `*.celestium.life` +- Cert-Manager: `ca-issuer` (local CA) for internal services, `celestium-le-production` (Let's Encrypt) for public-facing +- Container registry: `ghcr.io/celesrenata/stonks-oracle` +- CI: GitHub Actions builds containers, cluster pulls from GHCR + +## Existing Cluster Services (do NOT redeploy these) +- PostgreSQL: `postgresql-rw.postgresql-service.svc.cluster.local:5432` +- Redis: `redis-master.redis-service.svc.cluster.local:6379` +- MinIO: `minio.minio-service.svc.cluster.local:80` (API), console at `minio-crawler-console.minio-service.svc.cluster.local:9090` +- Ollama: `ollama.ollama-service.svc.cluster.local:11434` (cluster-internal), also at `http://10.1.1.12:2701` (external), GPU: 4070 Ti Super 16GB + +## Development Process +- Test-Develop-Debug (TDD) cycle +- After each phase: commit, push, build via GitHub Actions, deploy to cluster +- Local builds for dev iteration, GitHub workflows for CI/CD +- Python 3.12, NixOS dev environment + +## Key Conventions +- All services use `services/shared/config.py` for configuration via env vars +- Redis queues defined in `services/shared/redis_keys.py` +- Pydantic schemas in `services/shared/schemas.py` +- K8s manifests in `infra/k8s/`, all in `stonks-oracle` namespace +- Lakehouse DDL in `lakehouse/schemas/` +- Crawler patterns inspired by Noctipede (`~/sources/splinterstice/noctipede`): BeautifulSoup + requests with retry adapters, content hashing, boilerplate stripping, quality scoring diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ace5564 --- /dev/null +++ b/Makefile @@ -0,0 +1,47 @@ +REPO_OWNER := celesrenata +REPO_NAME := stonks-oracle +GHCR := ghcr.io/$(REPO_OWNER)/$(REPO_NAME) +SHA := $(shell git rev-parse --short HEAD 2>/dev/null || echo "dev") + +SERVICES := scheduler symbol-registry ingestion parser extractor aggregation recommendation risk broker-adapter lake-publisher query-api + +.PHONY: help lint test build push deploy clean + +help: + @echo "Targets:" + @echo " lint - Run ruff linter" + @echo " test - Run pytest" + @echo " build - Build all service images locally" + @echo " push - Push all images to GHCR" + @echo " deploy - Apply K8s manifests" + @echo " clean - Remove local images" + +lint: + ruff check services/ + +test: + pytest tests/ -x --tb=short -q + +build: + @for svc in $(SERVICES); do \ + echo "Building $$svc..."; \ + docker build -t $(GHCR)/$$svc:$(SHA) -t $(GHCR)/$$svc:latest -f docker/Dockerfile .; \ + done + +push: + @for svc in $(SERVICES); do \ + echo "Pushing $$svc..."; \ + docker push $(GHCR)/$$svc:$(SHA); \ + docker push $(GHCR)/$$svc:latest; \ + done + +deploy: + kubectl apply -f infra/k8s/namespace.yaml + kubectl apply -f infra/k8s/configmap.yaml + kubectl apply -f infra/k8s/secrets.yaml + kubectl apply -f infra/k8s/ + +clean: + @for svc in $(SERVICES); do \ + docker rmi $(GHCR)/$$svc:$(SHA) $(GHCR)/$$svc:latest 2>/dev/null || true; \ + done diff --git a/dashboards/README.md b/dashboards/README.md new file mode 100644 index 0000000..4ded9c8 --- /dev/null +++ b/dashboards/README.md @@ -0,0 +1,11 @@ +# Dashboards + +Apache Superset dashboard configurations and starter datasets for Stonks Oracle. + +## Starter Dashboards +- Symbol Overview — company profile, source health, recent documents +- Sentiment Heatmap — market-wide sentiment by sector and symbol +- Prediction Accuracy — predicted signals vs realized price moves +- Paper Trading PnL — paper trade performance and position tracking +- Model Quality — extraction success rates, latency, and confidence distributions +- Source Coverage — ingestion throughput, source failures, and coverage gaps diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..20c47ed --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,111 @@ +version: "3.9" + +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_DB: stonks + POSTGRES_USER: stonks + POSTGRES_PASSWORD: stonks_dev + ports: + - "5432:5432" + volumes: + - pgdata:/var/lib/postgresql/data + - ./infra/migrations:/docker-entrypoint-initdb.d + healthcheck: + test: ["CMD-SHELL", "pg_isready -U stonks"] + interval: 5s + retries: 5 + + redis: + image: redis:7-alpine + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + retries: 5 + + minio: + image: minio/minio:latest + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9000:9000" + - "9001:9001" + volumes: + - miniodata:/data + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 5s + retries: 5 + + minio-init: + image: minio/mc:latest + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set local http://minio:9000 minioadmin minioadmin; + mc mb --ignore-existing local/stonks-raw-market; + mc mb --ignore-existing local/stonks-raw-news; + mc mb --ignore-existing local/stonks-raw-filings; + mc mb --ignore-existing local/stonks-normalized; + mc mb --ignore-existing local/stonks-llm-prompts; + mc mb --ignore-existing local/stonks-llm-results; + mc mb --ignore-existing local/stonks-lakehouse; + mc mb --ignore-existing local/stonks-audit; + exit 0; + " + + ollama: + image: ollama/ollama:latest + ports: + - "11434:11434" + volumes: + - ollama_models:/root/.ollama + + trino: + image: trinodb/trino:latest + ports: + - "8080:8080" + volumes: + - ./infra/trino/catalog:/etc/trino/catalog + depends_on: + - minio + - hive-metastore + + hive-metastore: + image: apache/hive:4.0.0 + environment: + SERVICE_NAME: metastore + DB_DRIVER: derby + SERVICE_OPTS: "-Djavax.jdo.option.ConnectionURL=jdbc:derby:/opt/hive/data/metastore_db;create=true" + ports: + - "9083:9083" + volumes: + - hive_data:/opt/hive/data + + superset: + image: apache/superset:latest + ports: + - "8088:8088" + environment: + SUPERSET_SECRET_KEY: stonks-dev-secret-key-change-me + ADMIN_USERNAME: admin + ADMIN_PASSWORD: admin + ADMIN_EMAIL: admin@stonks.local + volumes: + - superset_data:/app/superset_home + depends_on: + - trino + +volumes: + pgdata: + miniodata: + ollama_models: + hive_data: + superset_data: diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..3eb0e84 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + curl \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY services/ /app/services/ + +RUN useradd -m -u 1000 stonks && \ + chown -R stonks:stonks /app + +USER stonks + +ARG SERVICE_CMD="python -m services.scheduler.app" +ENV SERVICE_CMD=${SERVICE_CMD} + +CMD ["sh", "-c", "${SERVICE_CMD}"] diff --git a/infra/k8s/README.md b/infra/k8s/README.md new file mode 100644 index 0000000..6b0f833 --- /dev/null +++ b/infra/k8s/README.md @@ -0,0 +1,40 @@ +# Kubernetes Manifests — Stonks Oracle + +All manifests target the `stonks-oracle` namespace. + +## Prerequisites (already running in cluster) +- `postgresql-service` — PostgreSQL +- `redis-service` — Redis +- `minio-service` / `minio-operator` — MinIO +- `ollama-service` — Ollama LLM + +## Shared Configuration +- `namespace.yaml` — namespace definition +- `configmap.yaml` — environment config referencing existing cluster services +- `secrets.yaml` — credentials (update before deploying) + +## Application Workloads +- `symbol-registry.yaml` — company/watchlist/source management API +- `scheduler.yaml` — polling orchestrator +- `ingestion-worker.yaml` — fetches external data, stores raw artifacts +- `parser-worker.yaml` — HTML-to-text, normalization, quality scoring +- `extractor-worker.yaml` — Ollama structured extraction +- `aggregation-worker.yaml` — trend summaries and signal aggregation +- `recommendation-worker.yaml` — trade recommendation generation +- `risk-engine.yaml` — risk controls and trade eligibility API +- `broker-adapter.yaml` — paper/live trading adapter +- `lake-publisher.yaml` — operational-to-analytical fact publisher +- `query-api.yaml` — analytics and admin API + +## Analytics Infrastructure +- `hive-metastore.yaml` — Hive Metastore for Trino catalog +- `trino.yaml` — SQL query engine with Hive + Iceberg catalogs +- `superset.yaml` — dashboard and exploration layer + +## Deploy +```bash +kubectl apply -f infra/k8s/namespace.yaml +kubectl apply -f infra/k8s/configmap.yaml +kubectl apply -f infra/k8s/secrets.yaml +kubectl apply -f infra/k8s/ +``` diff --git a/infra/k8s/aggregation-worker.yaml b/infra/k8s/aggregation-worker.yaml new file mode 100644 index 0000000..4374298 --- /dev/null +++ b/infra/k8s/aggregation-worker.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: aggregation-worker + namespace: stonks-oracle + labels: + app: aggregation-worker + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: aggregation-worker + template: + metadata: + labels: + app: aggregation-worker + spec: + containers: + - name: aggregation-worker + image: ghcr.io/celesrenata/stonks-oracle/aggregation:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi diff --git a/infra/k8s/broker-adapter.yaml b/infra/k8s/broker-adapter.yaml new file mode 100644 index 0000000..043dd2e --- /dev/null +++ b/infra/k8s/broker-adapter.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: broker-adapter + namespace: stonks-oracle + labels: + app: broker-adapter + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: broker-adapter + template: + metadata: + labels: + app: broker-adapter + spec: + containers: + - name: broker-adapter + image: ghcr.io/celesrenata/stonks-oracle/broker-adapter:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 200m + memory: 128Mi diff --git a/infra/k8s/configmap.yaml b/infra/k8s/configmap.yaml new file mode 100644 index 0000000..eeddc7e --- /dev/null +++ b/infra/k8s/configmap.yaml @@ -0,0 +1,39 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: stonks-config + namespace: stonks-oracle + labels: + app.kubernetes.io/part-of: stonks-oracle +data: + # PostgreSQL — existing cluster service + POSTGRES_HOST: "postgresql-rw.postgresql-service.svc.cluster.local" + POSTGRES_PORT: "5432" + POSTGRES_DB: "stonks" + POSTGRES_USER: "stonks" + + # Redis — existing cluster service + REDIS_HOST: "redis-master.redis-service.svc.cluster.local" + REDIS_PORT: "6379" + REDIS_DB: "0" + + # MinIO — existing cluster service + MINIO_ENDPOINT: "minio.minio-service.svc.cluster.local:80" + MINIO_SECURE: "false" + + # Ollama — existing cluster service + OLLAMA_BASE_URL: "http://ollama.ollama-service.svc.cluster.local:11434" + OLLAMA_MODEL: "llama3.1:8b" + OLLAMA_TIMEOUT: "120" + + # Trino — deployed in stonks-oracle namespace + TRINO_HOST: "trino.stonks-oracle.svc.cluster.local" + TRINO_PORT: "8080" + TRINO_CATALOG: "lakehouse" + TRINO_SCHEMA: "stonks" + + # Broker + BROKER_MODE: "paper" + + # General + LOG_LEVEL: "INFO" diff --git a/infra/k8s/extractor-worker.yaml b/infra/k8s/extractor-worker.yaml new file mode 100644 index 0000000..2b0a066 --- /dev/null +++ b/infra/k8s/extractor-worker.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: extractor-worker + namespace: stonks-oracle + labels: + app: extractor-worker + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: extractor-worker + template: + metadata: + labels: + app: extractor-worker + spec: + containers: + - name: extractor-worker + image: ghcr.io/celesrenata/stonks-oracle/extractor:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 200m + memory: 256Mi + limits: + cpu: "1" + memory: 512Mi diff --git a/infra/k8s/hive-metastore.yaml b/infra/k8s/hive-metastore.yaml new file mode 100644 index 0000000..57ec740 --- /dev/null +++ b/infra/k8s/hive-metastore.yaml @@ -0,0 +1,68 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hive-metastore + namespace: stonks-oracle + labels: + app: hive-metastore + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: hive-metastore + template: + metadata: + labels: + app: hive-metastore + spec: + containers: + - name: hive-metastore + image: apache/hive:4.0.0 + ports: + - containerPort: 9083 + env: + - name: SERVICE_NAME + value: metastore + - name: DB_DRIVER + value: derby + - name: SERVICE_OPTS + value: "-Djavax.jdo.option.ConnectionURL=jdbc:derby:/opt/hive/data/metastore_db;create=true" + volumeMounts: + - name: hive-data + mountPath: /opt/hive/data + resources: + requests: + cpu: 200m + memory: 512Mi + limits: + cpu: "1" + memory: 1Gi + volumes: + - name: hive-data + persistentVolumeClaim: + claimName: hive-metastore-data +--- +apiVersion: v1 +kind: Service +metadata: + name: hive-metastore + namespace: stonks-oracle +spec: + selector: + app: hive-metastore + ports: + - port: 9083 + targetPort: 9083 +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: hive-metastore-data + namespace: stonks-oracle +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi diff --git a/infra/k8s/ingestion-worker.yaml b/infra/k8s/ingestion-worker.yaml new file mode 100644 index 0000000..26341ca --- /dev/null +++ b/infra/k8s/ingestion-worker.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ingestion-worker + namespace: stonks-oracle + labels: + app: ingestion-worker + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 2 + selector: + matchLabels: + app: ingestion-worker + template: + metadata: + labels: + app: ingestion-worker + spec: + containers: + - name: ingestion-worker + image: ghcr.io/celesrenata/stonks-oracle/ingestion:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi diff --git a/infra/k8s/ingress.yaml b/infra/k8s/ingress.yaml new file mode 100644 index 0000000..4943a16 --- /dev/null +++ b/infra/k8s/ingress.yaml @@ -0,0 +1,99 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: stonks-query-api-https + namespace: stonks-oracle + annotations: + cert-manager.io/cluster-issuer: ca-issuer +spec: + ingressClassName: traefik + tls: + - hosts: + - stonks-api.celestium.life + secretName: stonks-api-tls + rules: + - host: stonks-api.celestium.life + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: query-api + port: + number: 8000 +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: stonks-registry-https + namespace: stonks-oracle + annotations: + cert-manager.io/cluster-issuer: ca-issuer +spec: + ingressClassName: traefik + tls: + - hosts: + - stonks-registry.celestium.life + secretName: stonks-registry-tls + rules: + - host: stonks-registry.celestium.life + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: symbol-registry-api + port: + number: 8000 +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: stonks-superset-https + namespace: stonks-oracle + annotations: + cert-manager.io/cluster-issuer: ca-issuer +spec: + ingressClassName: traefik + tls: + - hosts: + - stonks-dash.celestium.life + secretName: stonks-dash-tls + rules: + - host: stonks-dash.celestium.life + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: superset + port: + number: 8088 +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: stonks-trino-https + namespace: stonks-oracle + annotations: + cert-manager.io/cluster-issuer: ca-issuer +spec: + ingressClassName: traefik + tls: + - hosts: + - stonks-trino.celestium.life + secretName: stonks-trino-tls + rules: + - host: stonks-trino.celestium.life + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: trino + port: + number: 8080 diff --git a/infra/k8s/lake-publisher.yaml b/infra/k8s/lake-publisher.yaml new file mode 100644 index 0000000..9446823 --- /dev/null +++ b/infra/k8s/lake-publisher.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: lake-publisher + namespace: stonks-oracle + labels: + app: lake-publisher + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: lake-publisher + template: + metadata: + labels: + app: lake-publisher + spec: + containers: + - name: lake-publisher + image: ghcr.io/celesrenata/stonks-oracle/lake-publisher:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi diff --git a/infra/k8s/namespace.yaml b/infra/k8s/namespace.yaml new file mode 100644 index 0000000..79991b4 --- /dev/null +++ b/infra/k8s/namespace.yaml @@ -0,0 +1,6 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: stonks-oracle + labels: + app.kubernetes.io/part-of: stonks-oracle diff --git a/infra/k8s/parser-worker.yaml b/infra/k8s/parser-worker.yaml new file mode 100644 index 0000000..cea1b68 --- /dev/null +++ b/infra/k8s/parser-worker.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: parser-worker + namespace: stonks-oracle + labels: + app: parser-worker + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 2 + selector: + matchLabels: + app: parser-worker + template: + metadata: + labels: + app: parser-worker + spec: + containers: + - name: parser-worker + image: ghcr.io/celesrenata/stonks-oracle/parser:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi diff --git a/infra/k8s/query-api.yaml b/infra/k8s/query-api.yaml new file mode 100644 index 0000000..9ff669d --- /dev/null +++ b/infra/k8s/query-api.yaml @@ -0,0 +1,54 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: query-api + namespace: stonks-oracle + labels: + app: query-api + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: query-api + template: + metadata: + labels: + app: query-api + spec: + containers: + - name: query-api + image: ghcr.io/celesrenata/stonks-oracle/query-api:latest + imagePullPolicy: Always + ports: + - containerPort: 8000 + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi + readinessProbe: + httpGet: + path: /docs + port: 8000 + initialDelaySeconds: 5 + periodSeconds: 10 +--- +apiVersion: v1 +kind: Service +metadata: + name: query-api + namespace: stonks-oracle +spec: + selector: + app: query-api + ports: + - port: 8000 + targetPort: 8000 diff --git a/infra/k8s/recommendation-worker.yaml b/infra/k8s/recommendation-worker.yaml new file mode 100644 index 0000000..893c8c1 --- /dev/null +++ b/infra/k8s/recommendation-worker.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: recommendation-worker + namespace: stonks-oracle + labels: + app: recommendation-worker + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: recommendation-worker + template: + metadata: + labels: + app: recommendation-worker + spec: + containers: + - name: recommendation-worker + image: ghcr.io/celesrenata/stonks-oracle/recommendation:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi diff --git a/infra/k8s/risk-engine.yaml b/infra/k8s/risk-engine.yaml new file mode 100644 index 0000000..a95e1a5 --- /dev/null +++ b/infra/k8s/risk-engine.yaml @@ -0,0 +1,48 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: risk-engine + namespace: stonks-oracle + labels: + app: risk-engine + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: risk-engine + template: + metadata: + labels: + app: risk-engine + spec: + containers: + - name: risk-engine + image: ghcr.io/celesrenata/stonks-oracle/risk:latest + imagePullPolicy: Always + ports: + - containerPort: 8000 + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: risk-engine + namespace: stonks-oracle +spec: + selector: + app: risk-engine + ports: + - port: 8000 + targetPort: 8000 diff --git a/infra/k8s/scheduler.yaml b/infra/k8s/scheduler.yaml new file mode 100644 index 0000000..4e47b6a --- /dev/null +++ b/infra/k8s/scheduler.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scheduler + namespace: stonks-oracle + labels: + app: scheduler + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: scheduler + template: + metadata: + labels: + app: scheduler + spec: + containers: + - name: scheduler + image: ghcr.io/celesrenata/stonks-oracle/scheduler:latest + imagePullPolicy: Always + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 200m + memory: 128Mi diff --git a/infra/k8s/secrets.yaml b/infra/k8s/secrets.yaml new file mode 100644 index 0000000..dd40921 --- /dev/null +++ b/infra/k8s/secrets.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: Secret +metadata: + name: stonks-secrets + namespace: stonks-oracle + labels: + app.kubernetes.io/part-of: stonks-oracle +type: Opaque +stringData: + POSTGRES_PASSWORD: "changeme" + MINIO_ACCESS_KEY: "changeme" + MINIO_SECRET_KEY: "changeme" + REDIS_PASSWORD: "" + BROKER_API_KEY: "" + BROKER_API_SECRET: "" + BROKER_BASE_URL: "" + SUPERSET_SECRET_KEY: "stonks-superset-secret-change-me" diff --git a/infra/k8s/superset.yaml b/infra/k8s/superset.yaml new file mode 100644 index 0000000..8c1ecf7 --- /dev/null +++ b/infra/k8s/superset.yaml @@ -0,0 +1,105 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: superset + namespace: stonks-oracle + labels: + app: superset + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: superset + template: + metadata: + labels: + app: superset + spec: + containers: + - name: superset + image: apache/superset:latest + ports: + - containerPort: 8088 + env: + - name: SUPERSET_SECRET_KEY + valueFrom: + secretKeyRef: + name: stonks-secrets + key: SUPERSET_SECRET_KEY + - name: ADMIN_USERNAME + value: admin + - name: ADMIN_PASSWORD + value: admin + - name: ADMIN_EMAIL + value: admin@stonks.local + volumeMounts: + - name: superset-home + mountPath: /app/superset_home + - name: superset-config + mountPath: /app/pythonpath/superset_config.py + subPath: superset_config.py + resources: + requests: + cpu: 200m + memory: 512Mi + limits: + cpu: "1" + memory: 2Gi + readinessProbe: + httpGet: + path: /health + port: 8088 + initialDelaySeconds: 30 + periodSeconds: 15 + volumes: + - name: superset-home + persistentVolumeClaim: + claimName: superset-data + - name: superset-config + configMap: + name: superset-config +--- +apiVersion: v1 +kind: Service +metadata: + name: superset + namespace: stonks-oracle +spec: + selector: + app: superset + ports: + - port: 8088 + targetPort: 8088 +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: superset-data + namespace: stonks-oracle +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: superset-config + namespace: stonks-oracle +data: + superset_config.py: | + import os + SECRET_KEY = os.getenv("SUPERSET_SECRET_KEY", "stonks-dev-secret-key-change-me") + SQLALCHEMY_DATABASE_URI = "trino://trino@trino.stonks-oracle.svc.cluster.local:8080/lakehouse/stonks" + FEATURE_FLAGS = {"ENABLE_TEMPLATE_PROCESSING": True} + CACHE_CONFIG = { + "CACHE_TYPE": "RedisCache", + "CACHE_DEFAULT_TIMEOUT": 300, + "CACHE_KEY_PREFIX": "superset_", + "CACHE_REDIS_HOST": os.getenv("REDIS_HOST", "redis.redis-service.svc.cluster.local"), + "CACHE_REDIS_PORT": int(os.getenv("REDIS_PORT", "6379")), + "CACHE_REDIS_DB": 1, + } diff --git a/infra/k8s/symbol-registry.yaml b/infra/k8s/symbol-registry.yaml new file mode 100644 index 0000000..24735ab --- /dev/null +++ b/infra/k8s/symbol-registry.yaml @@ -0,0 +1,60 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: symbol-registry-api + namespace: stonks-oracle + labels: + app: symbol-registry-api + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: symbol-registry-api + template: + metadata: + labels: + app: symbol-registry-api + spec: + containers: + - name: symbol-registry-api + image: ghcr.io/celesrenata/stonks-oracle/symbol-registry:latest + imagePullPolicy: Always + ports: + - containerPort: 8000 + envFrom: + - configMapRef: + name: stonks-config + - secretRef: + name: stonks-secrets + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi + readinessProbe: + httpGet: + path: /docs + port: 8000 + initialDelaySeconds: 5 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /docs + port: 8000 + initialDelaySeconds: 10 + periodSeconds: 30 +--- +apiVersion: v1 +kind: Service +metadata: + name: symbol-registry-api + namespace: stonks-oracle +spec: + selector: + app: symbol-registry-api + ports: + - port: 8000 + targetPort: 8000 diff --git a/infra/k8s/trino.yaml b/infra/k8s/trino.yaml new file mode 100644 index 0000000..5a66c4f --- /dev/null +++ b/infra/k8s/trino.yaml @@ -0,0 +1,79 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: trino + namespace: stonks-oracle + labels: + app: trino + app.kubernetes.io/part-of: stonks-oracle +spec: + replicas: 1 + selector: + matchLabels: + app: trino + template: + metadata: + labels: + app: trino + spec: + containers: + - name: trino + image: trinodb/trino:latest + ports: + - containerPort: 8080 + volumeMounts: + - name: catalog-config + mountPath: /etc/trino/catalog + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: "2" + memory: 4Gi + readinessProbe: + httpGet: + path: /v1/info + port: 8080 + initialDelaySeconds: 15 + periodSeconds: 10 + volumes: + - name: catalog-config + configMap: + name: trino-catalog +--- +apiVersion: v1 +kind: Service +metadata: + name: trino + namespace: stonks-oracle +spec: + selector: + app: trino + ports: + - port: 8080 + targetPort: 8080 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: trino-catalog + namespace: stonks-oracle +data: + iceberg.properties: | + connector.name=iceberg + iceberg.catalog.type=hive_metastore + hive.metastore.uri=thrift://hive-metastore.stonks-oracle.svc.cluster.local:9083 + hive.s3.endpoint=http://minio.minio-service.svc.cluster.local:80 + hive.s3.path-style-access=true + hive.s3.aws-access-key=changeme + hive.s3.aws-secret-key=changeme + lakehouse.properties: | + connector.name=hive + hive.metastore.uri=thrift://hive-metastore.stonks-oracle.svc.cluster.local:9083 + hive.s3.endpoint=http://minio.minio-service.svc.cluster.local:80 + hive.s3.path-style-access=true + hive.s3.aws-access-key=changeme + hive.s3.aws-secret-key=changeme + hive.non-managed-table-writes-enabled=true + hive.s3select-pushdown.enabled=true diff --git a/infra/migrations/001_initial_schema.sql b/infra/migrations/001_initial_schema.sql new file mode 100644 index 0000000..8801ec5 --- /dev/null +++ b/infra/migrations/001_initial_schema.sql @@ -0,0 +1,99 @@ +-- Stonks Oracle - Initial PostgreSQL Schema +-- Phase 1: Core data model + +-- Extensions +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE EXTENSION IF NOT EXISTS "pgcrypto"; + +-- ============================================================ +-- Companies and Watchlists +-- ============================================================ + +CREATE TABLE companies ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + ticker VARCHAR(20) NOT NULL, + legal_name VARCHAR(500) NOT NULL, + exchange VARCHAR(50), + sector VARCHAR(200), + industry VARCHAR(200), + market_cap_bucket VARCHAR(50), + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(ticker, exchange) +); + +CREATE TABLE company_aliases ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + company_id UUID NOT NULL REFERENCES companies(id) ON DELETE CASCADE, + alias VARCHAR(500) NOT NULL, + alias_type VARCHAR(50) NOT NULL DEFAULT 'brand', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_company_aliases_company ON company_aliases(company_id); +CREATE INDEX idx_company_aliases_alias ON company_aliases(alias); + +CREATE TABLE watchlists ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + name VARCHAR(200) NOT NULL UNIQUE, + description TEXT, + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE watchlist_members ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + watchlist_id UUID NOT NULL REFERENCES watchlists(id) ON DELETE CASCADE, + company_id UUID NOT NULL REFERENCES companies(id) ON DELETE CASCADE, + added_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(watchlist_id, company_id) +); + +-- ============================================================ +-- Sources and Credentials +-- ============================================================ + +CREATE TABLE sources ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + company_id UUID NOT NULL REFERENCES companies(id) ON DELETE CASCADE, + source_type VARCHAR(50) NOT NULL, + source_name VARCHAR(200) NOT NULL, + config JSONB NOT NULL DEFAULT '{}', + credibility_score FLOAT DEFAULT 0.5, + retention_days INTEGER DEFAULT 365, + access_policy VARCHAR(50) DEFAULT 'internal', + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_sources_company ON sources(company_id); +CREATE INDEX idx_sources_type ON sources(source_type); + +CREATE TABLE api_credentials_refs ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + provider VARCHAR(100) NOT NULL UNIQUE, + secret_ref VARCHAR(500) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- ============================================================ +-- Ingestion Tracking +-- ============================================================ + +CREATE TABLE ingestion_runs ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + source_id UUID REFERENCES sources(id), + company_id UUID REFERENCES companies(id), + source_type VARCHAR(50) NOT NULL, + status VARCHAR(50) NOT NULL DEFAULT 'pending', + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + completed_at TIMESTAMPTZ, + items_fetched INTEGER DEFAULT 0, + items_new INTEGER DEFAULT 0, + error_message TEXT, + retry_count INTEGER DEFAULT 0, + next_retry_at TIMESTAMPTZ +); +CREATE INDEX idx_ingestion_runs_status ON ingestion_runs(status); +CREATE INDEX idx_ingestion_runs_source ON ingestion_runs(source_id); diff --git a/infra/migrations/002_documents_and_intelligence.sql b/infra/migrations/002_documents_and_intelligence.sql new file mode 100644 index 0000000..379120b --- /dev/null +++ b/infra/migrations/002_documents_and_intelligence.sql @@ -0,0 +1,114 @@ +-- Stonks Oracle - Documents and Intelligence Schema + +-- ============================================================ +-- Market Snapshots +-- ============================================================ + +CREATE TABLE market_snapshots ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + company_id UUID NOT NULL REFERENCES companies(id), + ticker VARCHAR(20) NOT NULL, + snapshot_type VARCHAR(50) NOT NULL, + data JSONB NOT NULL, + source_provider VARCHAR(100), + captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + storage_ref VARCHAR(1000), + content_hash VARCHAR(128) +); +CREATE INDEX idx_market_snapshots_ticker ON market_snapshots(ticker, captured_at DESC); +CREATE INDEX idx_market_snapshots_hash ON market_snapshots(content_hash); + +-- ============================================================ +-- Documents +-- ============================================================ + +CREATE TABLE documents ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + document_type VARCHAR(50) NOT NULL, + source_type VARCHAR(50) NOT NULL, + publisher VARCHAR(500), + url TEXT, + canonical_url TEXT, + title TEXT, + published_at TIMESTAMPTZ, + retrieved_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + language VARCHAR(10) DEFAULT 'en', + content_hash VARCHAR(128) NOT NULL, + raw_storage_ref VARCHAR(1000), + normalized_storage_ref VARCHAR(1000), + parse_quality_score FLOAT, + parse_confidence VARCHAR(20) DEFAULT 'unknown', + status VARCHAR(50) NOT NULL DEFAULT 'ingested', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE UNIQUE INDEX idx_documents_hash ON documents(content_hash); +CREATE INDEX idx_documents_status ON documents(status); +CREATE INDEX idx_documents_published ON documents(published_at DESC); + +CREATE TABLE document_versions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + version INTEGER NOT NULL DEFAULT 1, + content_hash VARCHAR(128) NOT NULL, + storage_ref VARCHAR(1000), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE document_company_mentions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + company_id UUID NOT NULL REFERENCES companies(id), + ticker VARCHAR(20) NOT NULL, + mention_type VARCHAR(50) DEFAULT 'direct', + confidence FLOAT DEFAULT 0.5, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_doc_mentions_doc ON document_company_mentions(document_id); +CREATE INDEX idx_doc_mentions_company ON document_company_mentions(company_id); + +-- ============================================================ +-- Document Intelligence (AI Extraction) +-- ============================================================ + +CREATE TABLE document_intelligence ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + summary TEXT, + macro_themes JSONB DEFAULT '[]', + novelty_score FLOAT, + source_credibility FLOAT, + extraction_warnings JSONB DEFAULT '[]', + confidence FLOAT, + model_provider VARCHAR(50), + model_name VARCHAR(200), + prompt_version VARCHAR(100), + schema_version VARCHAR(50), + raw_output_ref VARCHAR(1000), + prompt_ref VARCHAR(1000), + validation_status VARCHAR(50) DEFAULT 'pending', + validation_errors JSONB DEFAULT '[]', + retry_count INTEGER DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_doc_intel_document ON document_intelligence(document_id); +CREATE INDEX idx_doc_intel_validation ON document_intelligence(validation_status); + +CREATE TABLE document_impact_records ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + intelligence_id UUID NOT NULL REFERENCES document_intelligence(id) ON DELETE CASCADE, + company_id UUID NOT NULL REFERENCES companies(id), + ticker VARCHAR(20) NOT NULL, + relevance FLOAT, + sentiment VARCHAR(20), + impact_score FLOAT, + impact_horizon VARCHAR(50), + catalyst_type VARCHAR(50), + key_facts JSONB DEFAULT '[]', + risks JSONB DEFAULT '[]', + evidence_spans JSONB DEFAULT '[]', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_impact_intel ON document_impact_records(intelligence_id); +CREATE INDEX idx_impact_company ON document_impact_records(company_id); diff --git a/infra/migrations/003_trends_recommendations_orders.sql b/infra/migrations/003_trends_recommendations_orders.sql new file mode 100644 index 0000000..f6deef4 --- /dev/null +++ b/infra/migrations/003_trends_recommendations_orders.sql @@ -0,0 +1,160 @@ +-- Stonks Oracle - Trends, Recommendations, Orders Schema + +-- ============================================================ +-- Trend Windows +-- ============================================================ + +CREATE TABLE trend_windows ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + entity_type VARCHAR(50) NOT NULL DEFAULT 'company', + entity_id VARCHAR(100) NOT NULL, + window VARCHAR(20) NOT NULL, + trend_direction VARCHAR(20) NOT NULL DEFAULT 'neutral', + trend_strength FLOAT DEFAULT 0.5, + confidence FLOAT DEFAULT 0.5, + top_supporting_evidence JSONB DEFAULT '[]', + top_opposing_evidence JSONB DEFAULT '[]', + dominant_catalysts JSONB DEFAULT '[]', + material_risks JSONB DEFAULT '[]', + contradiction_score FLOAT DEFAULT 0.0, + market_context JSONB DEFAULT '{}', + generated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_trends_entity ON trend_windows(entity_type, entity_id, window); +CREATE INDEX idx_trends_generated ON trend_windows(generated_at DESC); + +-- ============================================================ +-- Recommendations +-- ============================================================ + +CREATE TABLE recommendations ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + ticker VARCHAR(20) NOT NULL, + company_id UUID REFERENCES companies(id), + action VARCHAR(20) NOT NULL DEFAULT 'watch', + mode VARCHAR(30) NOT NULL DEFAULT 'informational', + confidence FLOAT DEFAULT 0.5, + time_horizon VARCHAR(50), + thesis TEXT, + invalidation_conditions JSONB DEFAULT '[]', + portfolio_pct FLOAT DEFAULT 0.02, + max_loss_pct FLOAT DEFAULT 0.005, + model_version VARCHAR(100), + generated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_recommendations_ticker ON recommendations(ticker, generated_at DESC); +CREATE INDEX idx_recommendations_mode ON recommendations(mode); + +CREATE TABLE recommendation_evidence ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + recommendation_id UUID NOT NULL REFERENCES recommendations(id) ON DELETE CASCADE, + document_id UUID REFERENCES documents(id), + intelligence_id UUID REFERENCES document_intelligence(id), + evidence_type VARCHAR(50) DEFAULT 'supporting', + weight FLOAT DEFAULT 1.0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_rec_evidence_rec ON recommendation_evidence(recommendation_id); + +-- ============================================================ +-- Risk Evaluations +-- ============================================================ + +CREATE TABLE risk_evaluations ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + recommendation_id UUID NOT NULL REFERENCES recommendations(id), + eligible BOOLEAN NOT NULL DEFAULT FALSE, + allowed_mode VARCHAR(30) DEFAULT 'informational', + rejection_reasons JSONB DEFAULT '[]', + risk_checks JSONB DEFAULT '{}', + evaluated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_risk_eval_rec ON risk_evaluations(recommendation_id); + +-- ============================================================ +-- Broker Accounts and Orders +-- ============================================================ + +CREATE TABLE broker_accounts ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + provider VARCHAR(100) NOT NULL, + account_id VARCHAR(200) NOT NULL, + mode VARCHAR(20) NOT NULL DEFAULT 'paper', + config JSONB DEFAULT '{}', + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE orders ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + recommendation_id UUID REFERENCES recommendations(id), + broker_account_id UUID REFERENCES broker_accounts(id), + ticker VARCHAR(20) NOT NULL, + side VARCHAR(10) NOT NULL, + order_type VARCHAR(20) NOT NULL DEFAULT 'market', + quantity NUMERIC NOT NULL, + limit_price NUMERIC, + stop_price NUMERIC, + status VARCHAR(30) NOT NULL DEFAULT 'pending', + idempotency_key VARCHAR(200) NOT NULL UNIQUE, + broker_order_id VARCHAR(200), + decision_trace JSONB DEFAULT '{}', + submitted_at TIMESTAMPTZ, + acknowledged_at TIMESTAMPTZ, + filled_at TIMESTAMPTZ, + cancelled_at TIMESTAMPTZ, + rejected_at TIMESTAMPTZ, + rejection_reason TEXT, + fill_price NUMERIC, + fill_quantity NUMERIC, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_orders_ticker ON orders(ticker, created_at DESC); +CREATE INDEX idx_orders_status ON orders(status); +CREATE INDEX idx_orders_idempotency ON orders(idempotency_key); + +CREATE TABLE order_events ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE, + event_type VARCHAR(50) NOT NULL, + data JSONB DEFAULT '{}', + broker_timestamp TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_order_events_order ON order_events(order_id); + +-- ============================================================ +-- Positions +-- ============================================================ + +CREATE TABLE positions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + broker_account_id UUID REFERENCES broker_accounts(id), + ticker VARCHAR(20) NOT NULL, + quantity NUMERIC NOT NULL DEFAULT 0, + avg_entry_price NUMERIC, + current_price NUMERIC, + unrealized_pnl NUMERIC, + realized_pnl NUMERIC DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_positions_ticker ON positions(ticker); + +-- ============================================================ +-- Audit Events +-- ============================================================ + +CREATE TABLE audit_events ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + event_type VARCHAR(100) NOT NULL, + entity_type VARCHAR(100), + entity_id UUID, + actor VARCHAR(200) DEFAULT 'system', + data JSONB DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_audit_events_type ON audit_events(event_type, created_at DESC); +CREATE INDEX idx_audit_events_entity ON audit_events(entity_type, entity_id); diff --git a/infra/minio/lifecycle.json b/infra/minio/lifecycle.json new file mode 100644 index 0000000..82a6932 --- /dev/null +++ b/infra/minio/lifecycle.json @@ -0,0 +1,14 @@ +{ + "Rules": [ + { + "ID": "raw-retention-365d", + "Status": "Enabled", + "Filter": { + "Prefix": "" + }, + "Expiration": { + "Days": 365 + } + } + ] +} diff --git a/infra/superset/superset_config.py b/infra/superset/superset_config.py new file mode 100644 index 0000000..ed16b21 --- /dev/null +++ b/infra/superset/superset_config.py @@ -0,0 +1,23 @@ +"""Apache Superset configuration for Stonks Oracle.""" +import os + +# Superset secret key +SECRET_KEY = os.getenv("SUPERSET_SECRET_KEY", "stonks-dev-secret-key-change-me") + +# Trino datasource +SQLALCHEMY_DATABASE_URI = "trino://trino@trino:8080/lakehouse/stonks" + +# Feature flags +FEATURE_FLAGS = { + "ENABLE_TEMPLATE_PROCESSING": True, +} + +# Cache config (Redis-backed) +CACHE_CONFIG = { + "CACHE_TYPE": "RedisCache", + "CACHE_DEFAULT_TIMEOUT": 300, + "CACHE_KEY_PREFIX": "superset_", + "CACHE_REDIS_HOST": os.getenv("REDIS_HOST", "redis"), + "CACHE_REDIS_PORT": int(os.getenv("REDIS_PORT", "6379")), + "CACHE_REDIS_DB": 1, +} diff --git a/infra/trino/catalog/iceberg.properties b/infra/trino/catalog/iceberg.properties new file mode 100644 index 0000000..219ab92 --- /dev/null +++ b/infra/trino/catalog/iceberg.properties @@ -0,0 +1,7 @@ +connector.name=iceberg +iceberg.catalog.type=hive_metastore +hive.metastore.uri=thrift://hive-metastore:9083 +hive.s3.endpoint=http://minio:9000 +hive.s3.path-style-access=true +hive.s3.aws-access-key=minioadmin +hive.s3.aws-secret-key=minioadmin diff --git a/infra/trino/catalog/lakehouse.properties b/infra/trino/catalog/lakehouse.properties new file mode 100644 index 0000000..1d96f94 --- /dev/null +++ b/infra/trino/catalog/lakehouse.properties @@ -0,0 +1,8 @@ +connector.name=hive +hive.metastore.uri=thrift://hive-metastore:9083 +hive.s3.endpoint=http://minio:9000 +hive.s3.path-style-access=true +hive.s3.aws-access-key=minioadmin +hive.s3.aws-secret-key=minioadmin +hive.non-managed-table-writes-enabled=true +hive.s3select-pushdown.enabled=true diff --git a/lakehouse/schemas/README.md b/lakehouse/schemas/README.md new file mode 100644 index 0000000..6c8f720 --- /dev/null +++ b/lakehouse/schemas/README.md @@ -0,0 +1,16 @@ +# Lakehouse Schemas + +Analytical fact table definitions for MinIO-backed datasets queried via Trino. + +## Fact Tables +- `lake.market_bars` — OHLCV bar data +- `lake.market_quotes` — quote snapshots +- `lake.company_events` — corporate actions and events +- `lake.documents` — ingested document metadata +- `lake.document_extractions` — AI extraction outputs +- `lake.trade_signals` — aggregated trend signals +- `lake.trade_orders` — order submission records +- `lake.trade_fills` — fill and execution records +- `lake.positions_daily` — end-of-day position snapshots +- `lake.pnl_daily` — daily PnL records +- `lake.prediction_vs_outcome` — prediction accuracy tracking diff --git a/lakehouse/schemas/document_extractions.sql b/lakehouse/schemas/document_extractions.sql new file mode 100644 index 0000000..1504e82 --- /dev/null +++ b/lakehouse/schemas/document_extractions.sql @@ -0,0 +1,21 @@ +-- Analytical fact table: document_extractions +-- Partitioned by dt and model_version on MinIO + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.document_extractions ( + document_id VARCHAR, + ticker VARCHAR, + sentiment VARCHAR, + impact_score DOUBLE, + catalyst_type VARCHAR, + confidence DOUBLE, + novelty_score DOUBLE, + model_name VARCHAR, + prompt_version VARCHAR, + extraction_at TIMESTAMP(6) WITH TIME ZONE, + dt DATE, + model_version VARCHAR +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt', 'model_version'], + external_location = 's3a://stonks-lakehouse/warehouse/document_extractions/' +); diff --git a/lakehouse/schemas/documents.sql b/lakehouse/schemas/documents.sql new file mode 100644 index 0000000..24e90e3 --- /dev/null +++ b/lakehouse/schemas/documents.sql @@ -0,0 +1,20 @@ +-- Analytical fact table: documents +-- Partitioned by dt and source_type on MinIO +-- Path: s3://stonks-lakehouse/warehouse/documents/dt={yyyy-mm-dd}/source_type={type}/part-*.parquet + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.documents ( + document_id VARCHAR, + document_type VARCHAR, + source_type VARCHAR, + ticker VARCHAR, + publisher VARCHAR, + title VARCHAR, + published_at TIMESTAMP(6) WITH TIME ZONE, + content_hash VARCHAR, + confidence DOUBLE, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/documents/' +); diff --git a/lakehouse/schemas/market_bars.sql b/lakehouse/schemas/market_bars.sql new file mode 100644 index 0000000..71f63b5 --- /dev/null +++ b/lakehouse/schemas/market_bars.sql @@ -0,0 +1,20 @@ +-- Analytical fact table: market_bars +-- Partitioned by dt (date) on MinIO +-- Path: s3://stonks-lakehouse/warehouse/market_bars/dt={yyyy-mm-dd}/part-*.parquet + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.market_bars ( + ticker VARCHAR, + open_price DOUBLE, + high_price DOUBLE, + low_price DOUBLE, + close_price DOUBLE, + volume BIGINT, + vwap DOUBLE, + bar_timestamp TIMESTAMP(6) WITH TIME ZONE, + source VARCHAR, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/market_bars/' +); diff --git a/lakehouse/schemas/pnl_daily.sql b/lakehouse/schemas/pnl_daily.sql new file mode 100644 index 0000000..805ba78 --- /dev/null +++ b/lakehouse/schemas/pnl_daily.sql @@ -0,0 +1,15 @@ +-- Analytical fact table: pnl_daily +-- Partitioned by dt on MinIO + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.pnl_daily ( + ticker VARCHAR, + realized_pnl DOUBLE, + unrealized_pnl DOUBLE, + total_pnl DOUBLE, + broker_account VARCHAR, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/pnl_daily/' +); diff --git a/lakehouse/schemas/positions_daily.sql b/lakehouse/schemas/positions_daily.sql new file mode 100644 index 0000000..17691a1 --- /dev/null +++ b/lakehouse/schemas/positions_daily.sql @@ -0,0 +1,17 @@ +-- Analytical fact table: positions_daily +-- Partitioned by dt on MinIO + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.positions_daily ( + ticker VARCHAR, + quantity DOUBLE, + avg_entry_price DOUBLE, + close_price DOUBLE, + unrealized_pnl DOUBLE, + broker_account VARCHAR, + snapshot_at TIMESTAMP(6) WITH TIME ZONE, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/positions_daily/' +); diff --git a/lakehouse/schemas/prediction_vs_outcome.sql b/lakehouse/schemas/prediction_vs_outcome.sql new file mode 100644 index 0000000..5a8fd45 --- /dev/null +++ b/lakehouse/schemas/prediction_vs_outcome.sql @@ -0,0 +1,19 @@ +-- Analytical fact table: prediction_vs_outcome +-- Partitioned by dt on MinIO + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.prediction_vs_outcome ( + recommendation_id VARCHAR, + ticker VARCHAR, + predicted_action VARCHAR, + predicted_confidence DOUBLE, + actual_move_pct DOUBLE, + outcome VARCHAR, + horizon_days INTEGER, + predicted_at TIMESTAMP(6) WITH TIME ZONE, + evaluated_at TIMESTAMP(6) WITH TIME ZONE, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/prediction_vs_outcome/' +); diff --git a/lakehouse/schemas/trade_fills.sql b/lakehouse/schemas/trade_fills.sql new file mode 100644 index 0000000..5577972 --- /dev/null +++ b/lakehouse/schemas/trade_fills.sql @@ -0,0 +1,18 @@ +-- Analytical fact table: trade_fills +-- Partitioned by dt on MinIO + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.trade_fills ( + fill_id VARCHAR, + order_id VARCHAR, + ticker VARCHAR, + side VARCHAR, + fill_price DOUBLE, + fill_quantity DOUBLE, + broker_account VARCHAR, + filled_at TIMESTAMP(6) WITH TIME ZONE, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/trade_fills/' +); diff --git a/lakehouse/schemas/trade_orders.sql b/lakehouse/schemas/trade_orders.sql new file mode 100644 index 0000000..002bece --- /dev/null +++ b/lakehouse/schemas/trade_orders.sql @@ -0,0 +1,19 @@ +-- Analytical fact table: trade_orders +-- Partitioned by dt on MinIO + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.trade_orders ( + order_id VARCHAR, + ticker VARCHAR, + side VARCHAR, + order_type VARCHAR, + quantity DOUBLE, + limit_price DOUBLE, + status VARCHAR, + broker_account VARCHAR, + submitted_at TIMESTAMP(6) WITH TIME ZONE, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/trade_orders/' +); diff --git a/lakehouse/schemas/trade_signals.sql b/lakehouse/schemas/trade_signals.sql new file mode 100644 index 0000000..a76eab0 --- /dev/null +++ b/lakehouse/schemas/trade_signals.sql @@ -0,0 +1,18 @@ +-- Analytical fact table: trade_signals +-- Partitioned by dt on MinIO + +CREATE TABLE IF NOT EXISTS lakehouse.stonks.trade_signals ( + signal_id VARCHAR, + ticker VARCHAR, + trend_direction VARCHAR, + trend_strength DOUBLE, + confidence DOUBLE, + action VARCHAR, + time_horizon VARCHAR, + generated_at TIMESTAMP(6) WITH TIME ZONE, + dt DATE +) WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['dt'], + external_location = 's3a://stonks-lakehouse/warehouse/trade_signals/' +); diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..165de7d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,32 @@ +# Web framework +fastapi>=0.115.0 +uvicorn>=0.30.0 +pydantic>=2.0.0 + +# Database +asyncpg>=0.30.0 +redis>=5.0.0 + +# Object storage +minio>=7.2.0 + +# HTTP client +httpx>=0.27.0 + +# Web scraping +beautifulsoup4>=4.12.0 +requests>=2.31.0 + +# Data / Lakehouse +pyarrow>=17.0.0 +pandas>=2.2.0 + +# Trino +trino>=0.330.0 + +# Testing +pytest>=8.0.0 +pytest-asyncio>=0.24.0 + +# Code quality +ruff>=0.5.0 diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..ece3d6f --- /dev/null +++ b/ruff.toml @@ -0,0 +1,6 @@ +target-version = "py312" +line-length = 120 + +[lint] +select = ["E", "F", "I", "W"] +ignore = ["E501"] diff --git a/services/adapters/__init__.py b/services/adapters/__init__.py new file mode 100644 index 0000000..909a10a --- /dev/null +++ b/services/adapters/__init__.py @@ -0,0 +1 @@ +# Ingestion Adapters diff --git a/services/adapters/base.py b/services/adapters/base.py new file mode 100644 index 0000000..72e604f --- /dev/null +++ b/services/adapters/base.py @@ -0,0 +1,29 @@ +"""Base adapter interface for all external API integrations.""" +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, List, Optional + + +@dataclass +class AdapterResult: + source_type: str + ticker: str + items: List[Dict[str, Any]] + raw_payload: bytes + content_hash: str + fetched_at: datetime + error: Optional[str] = None + + +class BaseAdapter(ABC): + """Interface for all ingestion adapters.""" + + @abstractmethod + async def fetch(self, ticker: str, config: Dict[str, Any]) -> AdapterResult: + """Fetch data for a given ticker using source config.""" + ... + + @abstractmethod + def source_type(self) -> str: + ... diff --git a/services/adapters/broker_adapter.py b/services/adapters/broker_adapter.py new file mode 100644 index 0000000..8cc0357 --- /dev/null +++ b/services/adapters/broker_adapter.py @@ -0,0 +1,108 @@ +"""Broker API adapter - paper/live trading, orders, positions, balances.""" +import hashlib +import logging +import uuid +from datetime import datetime +from typing import Any, Dict, Optional + +import httpx + +from .base import AdapterResult, BaseAdapter + +logger = logging.getLogger("broker_adapter") + + +class BrokerAdapter(BaseAdapter): + """Broker API adapter supporting paper and live modes.""" + + def __init__(self, api_key: str = "", api_secret: str = "", base_url: str = "", mode: str = "paper"): + self.api_key = api_key + self.api_secret = api_secret + self.base_url = base_url + self.mode = mode # paper | live + + def source_type(self) -> str: + return "broker" + + def _headers(self) -> Dict[str, str]: + return { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + async def fetch(self, ticker: str, config: Dict[str, Any]) -> AdapterResult: + """Fetch positions and recent orders for a ticker.""" + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.get( + f"{self.base_url}/v2/positions/{ticker}", + headers=self._headers(), + ) + raw = resp.content + data = resp.json() if resp.status_code == 200 else {} + content_hash = hashlib.sha256(raw).hexdigest() + + return AdapterResult( + source_type="broker", + ticker=ticker, + items=[data] if data else [], + raw_payload=raw, + content_hash=content_hash, + fetched_at=datetime.utcnow(), + ) + except Exception as e: + logger.error(f"Broker fetch failed for {ticker}: {e}") + return AdapterResult( + source_type="broker", + ticker=ticker, + items=[], + raw_payload=b"", + content_hash="", + fetched_at=datetime.utcnow(), + error=str(e), + ) + + async def submit_order( + self, + ticker: str, + side: str, + qty: float, + order_type: str = "market", + limit_price: Optional[float] = None, + idempotency_key: Optional[str] = None, + ) -> Dict[str, Any]: + """Submit an order to the broker. Returns broker response.""" + if self.mode == "live": + logger.warning("LIVE order submission") + + idem_key = idempotency_key or str(uuid.uuid4()) + payload = { + "symbol": ticker, + "qty": str(qty), + "side": side, + "type": order_type, + "time_in_force": "day", + } + if limit_price and order_type == "limit": + payload["limit_price"] = str(limit_price) + + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.post( + f"{self.base_url}/v2/orders", + headers={**self._headers(), "Idempotency-Key": idem_key}, + json=payload, + ) + resp.raise_for_status() + return resp.json() + except httpx.HTTPStatusError as e: + logger.error(f"Order rejected: {e.response.text}") + return {"error": e.response.text, "status": e.response.status_code} + except Exception as e: + logger.error(f"Order submission failed: {e}") + return {"error": str(e)} + + async def get_account(self) -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.get(f"{self.base_url}/v2/account", headers=self._headers()) + return resp.json() diff --git a/services/adapters/filings_adapter.py b/services/adapters/filings_adapter.py new file mode 100644 index 0000000..ede9df7 --- /dev/null +++ b/services/adapters/filings_adapter.py @@ -0,0 +1,58 @@ +"""Filings / Regulatory API adapter - fetches SEC-style submissions.""" +import hashlib +import logging +from datetime import datetime +from typing import Any, Dict + +import httpx + +from .base import AdapterResult, BaseAdapter + +logger = logging.getLogger("filings_adapter") + + +class FilingsAdapter(BaseAdapter): + """Concrete adapter for SEC EDGAR or similar filings API.""" + + def __init__(self, base_url: str = "https://efts.sec.gov", user_agent: str = "StonksOracle/1.0"): + self.base_url = base_url + self.user_agent = user_agent + + def source_type(self) -> str: + return "filings_api" + + async def fetch(self, ticker: str, config: Dict[str, Any]) -> AdapterResult: + _cik = config.get("cik", "") + endpoint = config.get("endpoint", f"/LATEST/search-index?q=%22{ticker}%22&dateRange=custom&startdt=2026-01-01&forms=8-K,10-Q,10-K") + url = f"{self.base_url}{endpoint}" + + headers = {"User-Agent": self.user_agent} + + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.get(url, headers=headers) + resp.raise_for_status() + raw = resp.content + data = resp.json() + content_hash = hashlib.sha256(raw).hexdigest() + + hits = data.get("hits", {}).get("hits", []) + return AdapterResult( + source_type="filings_api", + ticker=ticker, + items=hits, + raw_payload=raw, + content_hash=content_hash, + fetched_at=datetime.utcnow(), + ) + except Exception as e: + logger.error(f"Filings fetch failed for {ticker}: {e}") + return AdapterResult( + source_type="filings_api", + ticker=ticker, + items=[], + raw_payload=b"", + content_hash="", + fetched_at=datetime.utcnow(), + error=str(e), + ) diff --git a/services/adapters/market_adapter.py b/services/adapters/market_adapter.py new file mode 100644 index 0000000..d33c2ff --- /dev/null +++ b/services/adapters/market_adapter.py @@ -0,0 +1,59 @@ +"""Market data API adapter - fetches quotes, bars, and reference data.""" +import hashlib +import logging +from datetime import datetime +from typing import Any, Dict + +import httpx + +from .base import AdapterResult, BaseAdapter + +logger = logging.getLogger("market_adapter") + + +class MarketDataAdapter(BaseAdapter): + """Concrete adapter for a market data provider (e.g., Alpha Vantage, Polygon, Yahoo).""" + + def __init__(self, api_key: str = "", base_url: str = ""): + self.api_key = api_key + self.base_url = base_url + + def source_type(self) -> str: + return "market_api" + + async def fetch(self, ticker: str, config: Dict[str, Any]) -> AdapterResult: + endpoint = config.get("endpoint", "/v2/aggs/ticker/{ticker}/prev") + url = f"{self.base_url}{endpoint.format(ticker=ticker)}" + params = config.get("params", {}) + if self.api_key: + params["apiKey"] = self.api_key + + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.get(url, params=params) + resp.raise_for_status() + raw = resp.content + data = resp.json() + content_hash = hashlib.sha256(raw).hexdigest() + + items = data.get("results", [data]) if isinstance(data, dict) else data + + return AdapterResult( + source_type="market_api", + ticker=ticker, + items=items if isinstance(items, list) else [items], + raw_payload=raw, + content_hash=content_hash, + fetched_at=datetime.utcnow(), + ) + except Exception as e: + logger.error(f"Market fetch failed for {ticker}: {e}") + return AdapterResult( + source_type="market_api", + ticker=ticker, + items=[], + raw_payload=b"", + content_hash="", + fetched_at=datetime.utcnow(), + error=str(e), + ) diff --git a/services/adapters/news_adapter.py b/services/adapters/news_adapter.py new file mode 100644 index 0000000..0a77c7d --- /dev/null +++ b/services/adapters/news_adapter.py @@ -0,0 +1,61 @@ +"""News API adapter - fetches company-linked headlines and article metadata.""" +import hashlib +import logging +from datetime import datetime +from typing import Any, Dict + +import httpx + +from .base import AdapterResult, BaseAdapter + +logger = logging.getLogger("news_adapter") + + +class NewsApiAdapter(BaseAdapter): + """Concrete adapter for a news API provider.""" + + def __init__(self, api_key: str = "", base_url: str = ""): + self.api_key = api_key + self.base_url = base_url + + def source_type(self) -> str: + return "news_api" + + async def fetch(self, ticker: str, config: Dict[str, Any]) -> AdapterResult: + endpoint = config.get("endpoint", "/v2/everything") + url = f"{self.base_url}{endpoint}" + params = config.get("params", {}) + params.setdefault("q", ticker) + params.setdefault("sortBy", "publishedAt") + params.setdefault("pageSize", 20) + if self.api_key: + params["apiKey"] = self.api_key + + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.get(url, params=params) + resp.raise_for_status() + raw = resp.content + data = resp.json() + content_hash = hashlib.sha256(raw).hexdigest() + + articles = data.get("articles", []) + return AdapterResult( + source_type="news_api", + ticker=ticker, + items=articles, + raw_payload=raw, + content_hash=content_hash, + fetched_at=datetime.utcnow(), + ) + except Exception as e: + logger.error(f"News fetch failed for {ticker}: {e}") + return AdapterResult( + source_type="news_api", + ticker=ticker, + items=[], + raw_payload=b"", + content_hash="", + fetched_at=datetime.utcnow(), + error=str(e), + ) diff --git a/services/aggregation/__init__.py b/services/aggregation/__init__.py new file mode 100644 index 0000000..7ce0983 --- /dev/null +++ b/services/aggregation/__init__.py @@ -0,0 +1 @@ +# Aggregation Engine - trend summaries and signal aggregation diff --git a/services/aggregation/worker.py b/services/aggregation/worker.py new file mode 100644 index 0000000..fdc81b0 --- /dev/null +++ b/services/aggregation/worker.py @@ -0,0 +1 @@ +"""Aggregation worker - rolling trend summaries, contradiction detection, evidence ranking.""" diff --git a/services/api/__init__.py b/services/api/__init__.py new file mode 100644 index 0000000..440cecb --- /dev/null +++ b/services/api/__init__.py @@ -0,0 +1 @@ +# Query API - exposes companies, documents, trends, recommendations, and audit trails diff --git a/services/api/app.py b/services/api/app.py new file mode 100644 index 0000000..54a4637 --- /dev/null +++ b/services/api/app.py @@ -0,0 +1 @@ +"""Query API - FastAPI application for analytics, evidence drill-down, and admin controls.""" diff --git a/services/extractor/__init__.py b/services/extractor/__init__.py new file mode 100644 index 0000000..2ed67a2 --- /dev/null +++ b/services/extractor/__init__.py @@ -0,0 +1 @@ +# Ollama Extraction Service diff --git a/services/extractor/worker.py b/services/extractor/worker.py new file mode 100644 index 0000000..7f47a87 --- /dev/null +++ b/services/extractor/worker.py @@ -0,0 +1 @@ +"""Extraction worker - sends documents to Ollama for structured intelligence extraction.""" diff --git a/services/ingestion/__init__.py b/services/ingestion/__init__.py new file mode 100644 index 0000000..a545460 --- /dev/null +++ b/services/ingestion/__init__.py @@ -0,0 +1 @@ +# Ingestion Pipeline diff --git a/services/ingestion/worker.py b/services/ingestion/worker.py new file mode 100644 index 0000000..331dd01 --- /dev/null +++ b/services/ingestion/worker.py @@ -0,0 +1,182 @@ +"""Ingestion worker - processes jobs from the ingestion queue.""" +import asyncio +import hashlib +import io +import json +import logging +from datetime import datetime + +import asyncpg +import redis.asyncio as aioredis +from minio import Minio + +from services.adapters.base import AdapterResult +from services.adapters.filings_adapter import FilingsAdapter +from services.adapters.market_adapter import MarketDataAdapter +from services.adapters.news_adapter import NewsApiAdapter +from services.shared.config import load_config +from services.shared.db import get_minio, get_pg_pool, get_redis +from services.shared.redis_keys import ( + QUEUE_INGESTION, + QUEUE_PARSING, + dedupe_key, + queue_key, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("ingestion_worker") + +BUCKET_MAP = { + "market_api": "stonks-raw-market", + "news_api": "stonks-raw-news", + "filings_api": "stonks-raw-filings", + "broker": "stonks-raw-market", +} + + +def build_storage_path(source_type: str, ticker: str, doc_id: str) -> str: + now = datetime.utcnow() + return f"{source_type}/{ticker}/{now.year}/{now.month:02d}/{now.day:02d}/{doc_id}/raw.json" + + +async def store_raw_artifact(minio_client: Minio, bucket: str, path: str, data: bytes): + minio_client.put_object(bucket, path, io.BytesIO(data), len(data), content_type="application/json") + + +async def process_job( + job: dict, + pool: asyncpg.Pool, + rds: aioredis.Redis, + minio_client: Minio, + adapters: dict, +): + source_type = job["source_type"] + ticker = job["ticker"] + source_id = job["source_id"] + config = job.get("config", {}) + + adapter = adapters.get(source_type) + if not adapter: + logger.warning(f"No adapter for source_type={source_type}") + return + + # Record ingestion run + run_id = await pool.fetchval( + """INSERT INTO ingestion_runs (source_id, company_id, source_type, status) + VALUES ($1, $2, $3, 'running') RETURNING id""", + source_id, job["company_id"], source_type, + ) + + try: + result: AdapterResult = await adapter.fetch(ticker, config) + + if result.error: + await pool.execute( + "UPDATE ingestion_runs SET status='failed', error_message=$2, completed_at=NOW() WHERE id=$1", + run_id, result.error, + ) + return + + # Store raw payload + bucket = BUCKET_MAP.get(source_type, "stonks-raw-market") + storage_path = build_storage_path(source_type, ticker, str(run_id)) + await store_raw_artifact(minio_client, bucket, storage_path, result.raw_payload) + + # Dedupe check + if result.content_hash: + already_seen = await rds.get(dedupe_key(result.content_hash)) + if already_seen: + logger.info(f"Duplicate content for {ticker}, skipping") + await pool.execute( + "UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=0, completed_at=NOW() WHERE id=$1", + run_id, len(result.items), + ) + return + await rds.set(dedupe_key(result.content_hash), "1", ex=86400) + + new_items = 0 + for item in result.items: + item_json = json.dumps(item) + item_hash = hashlib.sha256(item_json.encode()).hexdigest() + + # Check if document already exists + exists = await pool.fetchval("SELECT 1 FROM documents WHERE content_hash = $1", item_hash) + if exists: + continue + + title = item.get("title", item.get("name", "")) + url = item.get("url", item.get("link", "")) + published = item.get("publishedAt", item.get("published_at")) + + doc_id = await pool.fetchval( + """INSERT INTO documents (document_type, source_type, publisher, url, title, published_at, content_hash, raw_storage_ref, status) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ingested') + RETURNING id""", + "article" if source_type == "news_api" else "filing" if source_type == "filings_api" else "article", + source_type, + item.get("source", {}).get("name", "") if isinstance(item.get("source"), dict) else str(item.get("source", "")), + url, title, + datetime.fromisoformat(published.replace("Z", "+00:00")) if published else None, + item_hash, + f"s3://{bucket}/{storage_path}", + ) + + # Enqueue for parsing + await rds.rpush(queue_key(QUEUE_PARSING), json.dumps({ + "document_id": str(doc_id), + "ticker": ticker, + "source_type": source_type, + "url": url, + })) + new_items += 1 + + await pool.execute( + "UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=$3, completed_at=NOW() WHERE id=$1", + run_id, len(result.items), new_items, + ) + logger.info(f"Ingested {ticker}/{source_type}: {len(result.items)} fetched, {new_items} new") + + except Exception as e: + logger.error(f"Ingestion error for {ticker}: {e}") + await pool.execute( + "UPDATE ingestion_runs SET status='failed', error_message=$2, completed_at=NOW() WHERE id=$1", + run_id, str(e), + ) + + +async def main(): + config = load_config() + pool = await get_pg_pool(config) + rds = get_redis(config) + minio_client = get_minio(config) + + adapters = { + "market_api": MarketDataAdapter( + api_key=config.broker.api_key or "", + base_url="https://api.polygon.io", + ), + "news_api": NewsApiAdapter( + api_key="", + base_url="https://newsapi.org", + ), + "filings_api": FilingsAdapter(), + } + + logger.info("Ingestion worker started") + queue = queue_key(QUEUE_INGESTION) + + try: + while True: + raw = await rds.lpop(queue) + if raw: + job = json.loads(raw) + await process_job(job, pool, rds, minio_client, adapters) + else: + await asyncio.sleep(2) + finally: + await pool.close() + await rds.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/lake_publisher/__init__.py b/services/lake_publisher/__init__.py new file mode 100644 index 0000000..de6226f --- /dev/null +++ b/services/lake_publisher/__init__.py @@ -0,0 +1 @@ +# Lake Publisher - transforms operational data into analytical fact datasets diff --git a/services/lake_publisher/worker.py b/services/lake_publisher/worker.py new file mode 100644 index 0000000..3db477a --- /dev/null +++ b/services/lake_publisher/worker.py @@ -0,0 +1 @@ +"""Lake publisher worker - writes partitioned Parquet facts to MinIO for Trino/Superset.""" diff --git a/services/parser/__init__.py b/services/parser/__init__.py new file mode 100644 index 0000000..cdac308 --- /dev/null +++ b/services/parser/__init__.py @@ -0,0 +1 @@ +# Scraper / Parser Service diff --git a/services/parser/worker.py b/services/parser/worker.py new file mode 100644 index 0000000..a4a3b93 --- /dev/null +++ b/services/parser/worker.py @@ -0,0 +1,209 @@ +"""Parser worker - HTML-to-text, boilerplate reduction, quality scoring.""" +import asyncio +import io +import json +import logging +import re +from datetime import datetime +from typing import List, Optional, Tuple + +import asyncpg +import httpx +import redis.asyncio as aioredis +from minio import Minio + +from services.shared.config import load_config +from services.shared.db import get_minio, get_pg_pool, get_redis +from services.shared.redis_keys import QUEUE_EXTRACTION, QUEUE_PARSING, queue_key + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("parser_worker") + +# Simple boilerplate patterns to strip +BOILERPLATE_PATTERNS = [ + re.compile(r"(?i)subscribe to our newsletter.*?(?:\n|$)"), + re.compile(r"(?i)click here to read more.*?(?:\n|$)"), + re.compile(r"(?i)advertisement\s*\n"), + re.compile(r"(?i)copyright ©.*?(?:\n|$)"), + re.compile(r"(?i)all rights reserved.*?(?:\n|$)"), + re.compile(r"(?i)terms of (use|service).*?(?:\n|$)"), + re.compile(r"(?i)privacy policy.*?(?:\n|$)"), + re.compile(r"\s*\[.*?ad.*?\]\s*", re.IGNORECASE), +] + + +def strip_html_tags(html: str) -> str: + """Basic HTML tag removal.""" + text = re.sub(r"]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) + text = re.sub(r"]*>.*?", "", text, flags=re.DOTALL | re.IGNORECASE) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r" ", " ", text) + text = re.sub(r"&", "&", text) + text = re.sub(r"<", "<", text) + text = re.sub(r">", ">", text) + text = re.sub(r"&#\d+;", "", text) + text = re.sub(r"\s+", " ", text).strip() + return text + + +def reduce_boilerplate(text: str) -> str: + for pattern in BOILERPLATE_PATTERNS: + text = pattern.sub("", text) + return text.strip() + + +def score_quality(text: str) -> Tuple[float, str]: + """Score parse quality. Returns (score, confidence_label).""" + word_count = len(text.split()) + if word_count < 20: + return 0.1, "low" + if word_count < 50: + return 0.3, "low" + if word_count < 150: + return 0.6, "medium" + return 0.85, "high" + + +def detect_company_mentions(text: str, aliases: List[dict]) -> List[dict]: + """Detect company mentions using ticker, alias, and name matching.""" + mentions = [] + text_upper = text.upper() + for alias_info in aliases: + alias = alias_info["alias"] + if alias.upper() in text_upper: + mentions.append({ + "company_id": alias_info["company_id"], + "ticker": alias_info.get("ticker", ""), + "mention_type": alias_info.get("alias_type", "alias"), + "confidence": 0.7, + }) + return mentions + + +async def fetch_html(url: str) -> Optional[str]: + """Fetch article HTML for scraping.""" + if not url: + return None + async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: + try: + resp = await client.get(url, headers={"User-Agent": "StonksOracle/1.0"}) + resp.raise_for_status() + return resp.text + except Exception as e: + logger.warning(f"Failed to fetch {url}: {e}") + return None + + +async def process_job( + job: dict, + pool: asyncpg.Pool, + rds: aioredis.Redis, + minio_client: Minio, +): + doc_id = job["document_id"] + ticker = job["ticker"] + url = job.get("url", "") + + # Fetch HTML if we have a URL + html = await fetch_html(url) if url else None + + if html: + # Store raw HTML + html_bytes = html.encode("utf-8") + now = datetime.utcnow() + html_path = f"scrape/{ticker}/{now.year}/{now.month:02d}/{now.day:02d}/{doc_id}/raw.html" + minio_client.put_object( + "stonks-raw-news", html_path, io.BytesIO(html_bytes), len(html_bytes), + content_type="text/html", + ) + + # Parse + text = strip_html_tags(html) + text = reduce_boilerplate(text) + else: + text = "" + + quality_score, confidence = score_quality(text) + + # Store normalized text + if text: + text_bytes = text.encode("utf-8") + now = datetime.utcnow() + norm_path = f"parsed/{ticker}/{now.year}/{now.month:02d}/{now.day:02d}/{doc_id}/normalized.txt" + minio_client.put_object( + "stonks-normalized", norm_path, io.BytesIO(text_bytes), len(text_bytes), + content_type="text/plain", + ) + else: + norm_path = None + + # Detect company mentions + aliases = await pool.fetch( + """SELECT ca.company_id::text, ca.alias, ca.alias_type, c.ticker + FROM company_aliases ca JOIN companies c ON ca.company_id = c.id + UNION ALL + SELECT c.id::text as company_id, c.ticker as alias, 'ticker' as alias_type, c.ticker + FROM companies c + UNION ALL + SELECT c.id::text as company_id, c.legal_name as alias, 'legal_name' as alias_type, c.ticker + FROM companies c""" + ) + mentions = detect_company_mentions(text, [dict(a) for a in aliases]) if text else [] + + # Update document + status = "parsed" if confidence != "low" else "low_quality" + await pool.execute( + """UPDATE documents SET + normalized_storage_ref=$2, parse_quality_score=$3, parse_confidence=$4, status=$5, updated_at=NOW() + WHERE id=$1""", + doc_id, f"s3://stonks-normalized/{norm_path}" if norm_path else None, + quality_score, confidence, status, + ) + + # Insert company mentions + for m in mentions: + await pool.execute( + """INSERT INTO document_company_mentions (document_id, company_id, ticker, mention_type, confidence) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING""", + doc_id, m["company_id"], m["ticker"], m["mention_type"], m["confidence"], + ) + + # Only enqueue for extraction if quality is acceptable + if confidence != "low": + await rds.rpush(queue_key(QUEUE_EXTRACTION), json.dumps({ + "document_id": doc_id, + "ticker": ticker, + "normalized_text": text[:8000], # Truncate for prompt + })) + logger.info(f"Parsed doc {doc_id} for {ticker}: quality={quality_score:.2f}, confidence={confidence}") + else: + logger.warning(f"Low quality parse for doc {doc_id}, skipping extraction") + + +async def main(): + config = load_config() + pool = await get_pg_pool(config) + rds = get_redis(config) + minio_client = get_minio(config) + + logger.info("Parser worker started") + queue = queue_key(QUEUE_PARSING) + + try: + while True: + raw = await rds.lpop(queue) + if raw: + job = json.loads(raw) + try: + await process_job(job, pool, rds, minio_client) + except Exception as e: + logger.error(f"Parse error: {e}") + else: + await asyncio.sleep(2) + finally: + await pool.close() + await rds.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/recommendation/__init__.py b/services/recommendation/__init__.py new file mode 100644 index 0000000..9b9f379 --- /dev/null +++ b/services/recommendation/__init__.py @@ -0,0 +1 @@ +# Recommendation Engine - trade recommendations from aggregated signals diff --git a/services/recommendation/worker.py b/services/recommendation/worker.py new file mode 100644 index 0000000..b133580 --- /dev/null +++ b/services/recommendation/worker.py @@ -0,0 +1 @@ +"""Recommendation worker - generates explainable trade recommendations from trend data.""" diff --git a/services/risk/__init__.py b/services/risk/__init__.py new file mode 100644 index 0000000..50a41a7 --- /dev/null +++ b/services/risk/__init__.py @@ -0,0 +1 @@ +# Risk Engine - portfolio risk controls and trade eligibility diff --git a/services/risk/engine.py b/services/risk/engine.py new file mode 100644 index 0000000..a4c9c76 --- /dev/null +++ b/services/risk/engine.py @@ -0,0 +1 @@ +"""Risk engine - enforces guardrails, position limits, and trade eligibility checks.""" diff --git a/services/scheduler/__init__.py b/services/scheduler/__init__.py new file mode 100644 index 0000000..d0b0f3a --- /dev/null +++ b/services/scheduler/__init__.py @@ -0,0 +1 @@ +# Scheduler / Orchestrator Service diff --git a/services/scheduler/app.py b/services/scheduler/app.py new file mode 100644 index 0000000..5808241 --- /dev/null +++ b/services/scheduler/app.py @@ -0,0 +1,112 @@ +"""Scheduler - triggers ingestion cycles for tracked symbols and sources.""" +import asyncio +import json +import logging +from datetime import datetime, timedelta + +import asyncpg +import redis.asyncio as aioredis + +from services.shared.config import load_config +from services.shared.db import get_pg_pool, get_redis +from services.shared.redis_keys import ( + QUEUE_INGESTION, + lock_key, + queue_key, + rate_limit_key, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("scheduler") + +# Polling cadences by source class (seconds) +CADENCES = { + "market_api": 60, + "news_api": 300, + "filings_api": 3600, + "web_scrape": 1800, + "broker": 30, +} + + +async def acquire_lock(rds: aioredis.Redis, name: str, ttl: int = 60) -> bool: + return await rds.set(lock_key(name), "1", nx=True, ex=ttl) + + +async def release_lock(rds: aioredis.Redis, name: str): + await rds.delete(lock_key(name)) + + +async def check_rate_limit(rds: aioredis.Redis, source_type: str, max_per_minute: int = 30) -> bool: + key = rate_limit_key(source_type, datetime.utcnow().strftime("%Y%m%d%H%M")) + count = await rds.incr(key) + if count == 1: + await rds.expire(key, 120) + return count <= max_per_minute + + +async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis): + """One scheduling pass: find due sources and enqueue ingestion jobs.""" + sources = await pool.fetch( + """SELECT s.id as source_id, s.company_id, s.source_type, s.source_name, s.config, + c.ticker, c.legal_name + FROM sources s JOIN companies c ON s.company_id = c.id + WHERE s.active = TRUE AND c.active = TRUE + ORDER BY s.source_type, c.ticker""" + ) + + enqueued = 0 + for src in sources: + source_type = src["source_type"] + cadence = CADENCES.get(source_type, 600) + + # Check last run + last_run = await pool.fetchval( + "SELECT MAX(started_at) FROM ingestion_runs WHERE source_id = $1 AND status IN ('completed', 'running')", + src["source_id"], + ) + if last_run and (datetime.utcnow() - last_run.replace(tzinfo=None)) < timedelta(seconds=cadence): + continue + + if not await check_rate_limit(rds, source_type): + logger.warning(f"Rate limit hit for {source_type}") + continue + + job = { + "source_id": str(src["source_id"]), + "company_id": str(src["company_id"]), + "ticker": src["ticker"], + "source_type": source_type, + "source_name": src["source_name"], + "config": dict(src["config"]) if src["config"] else {}, + "scheduled_at": datetime.utcnow().isoformat(), + } + await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(job)) + enqueued += 1 + + if enqueued: + logger.info(f"Enqueued {enqueued} ingestion jobs") + + +async def main(): + config = load_config() + pool = await get_pg_pool(config) + rds = get_redis(config) + + logger.info("Scheduler started") + try: + while True: + try: + if await acquire_lock(rds, "scheduler_cycle", ttl=30): + await schedule_cycle(pool, rds) + await release_lock(rds, "scheduler_cycle") + except Exception as e: + logger.error(f"Scheduler cycle error: {e}") + await asyncio.sleep(15) + finally: + await pool.close() + await rds.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/shared/__init__.py b/services/shared/__init__.py new file mode 100644 index 0000000..517666b --- /dev/null +++ b/services/shared/__init__.py @@ -0,0 +1 @@ +# Stonks Oracle - Shared modules diff --git a/services/shared/config.py b/services/shared/config.py new file mode 100644 index 0000000..c5bdc81 --- /dev/null +++ b/services/shared/config.py @@ -0,0 +1,115 @@ +"""Shared configuration loader for all services.""" +import os +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class PostgresConfig: + host: str = "localhost" + port: int = 5432 + database: str = "stonks" + user: str = "stonks" + password: str = "stonks_dev" + + @property + def dsn(self) -> str: + return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" + + +@dataclass +class RedisConfig: + host: str = "localhost" + port: int = 6379 + db: int = 0 + password: Optional[str] = None + + @property + def url(self) -> str: + auth = f":{self.password}@" if self.password else "" + return f"redis://{auth}{self.host}:{self.port}/{self.db}" + + +@dataclass +class MinioConfig: + endpoint: str = "localhost:9000" + access_key: str = "minioadmin" + secret_key: str = "minioadmin" + secure: bool = False + + +@dataclass +class OllamaConfig: + base_url: str = "http://localhost:11434" + model: str = "llama3.1:8b" + timeout: int = 120 + + +@dataclass +class TrinoConfig: + host: str = "localhost" + port: int = 8080 + catalog: str = "lakehouse" + schema: str = "stonks" + + +@dataclass +class BrokerConfig: + mode: str = "paper" # paper | live + api_key: Optional[str] = None + api_secret: Optional[str] = None + base_url: Optional[str] = None + + +@dataclass +class AppConfig: + postgres: PostgresConfig = field(default_factory=PostgresConfig) + redis: RedisConfig = field(default_factory=RedisConfig) + minio: MinioConfig = field(default_factory=MinioConfig) + ollama: OllamaConfig = field(default_factory=OllamaConfig) + trino: TrinoConfig = field(default_factory=TrinoConfig) + broker: BrokerConfig = field(default_factory=BrokerConfig) + log_level: str = "INFO" + + +def load_config() -> AppConfig: + """Load configuration from environment variables with sensible defaults.""" + return AppConfig( + postgres=PostgresConfig( + host=os.getenv("POSTGRES_HOST", "localhost"), + port=int(os.getenv("POSTGRES_PORT", "5432")), + database=os.getenv("POSTGRES_DB", "stonks"), + user=os.getenv("POSTGRES_USER", "stonks"), + password=os.getenv("POSTGRES_PASSWORD", "stonks_dev"), + ), + redis=RedisConfig( + host=os.getenv("REDIS_HOST", "localhost"), + port=int(os.getenv("REDIS_PORT", "6379")), + db=int(os.getenv("REDIS_DB", "0")), + password=os.getenv("REDIS_PASSWORD", None), + ), + minio=MinioConfig( + endpoint=os.getenv("MINIO_ENDPOINT", "localhost:9000"), + access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"), + secure=os.getenv("MINIO_SECURE", "false").lower() == "true", + ), + ollama=OllamaConfig( + base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"), + model=os.getenv("OLLAMA_MODEL", "llama3.1:8b"), + timeout=int(os.getenv("OLLAMA_TIMEOUT", "120")), + ), + trino=TrinoConfig( + host=os.getenv("TRINO_HOST", "localhost"), + port=int(os.getenv("TRINO_PORT", "8080")), + catalog=os.getenv("TRINO_CATALOG", "lakehouse"), + schema=os.getenv("TRINO_SCHEMA", "stonks"), + ), + broker=BrokerConfig( + mode=os.getenv("BROKER_MODE", "paper"), + api_key=os.getenv("BROKER_API_KEY", None), + api_secret=os.getenv("BROKER_API_SECRET", None), + base_url=os.getenv("BROKER_BASE_URL", None), + ), + log_level=os.getenv("LOG_LEVEL", "INFO"), + ) diff --git a/services/shared/db.py b/services/shared/db.py new file mode 100644 index 0000000..bd17c34 --- /dev/null +++ b/services/shared/db.py @@ -0,0 +1,33 @@ +"""Database connection helpers.""" +import asyncpg +import redis.asyncio as aioredis +from minio import Minio + +from .config import AppConfig + + +async def get_pg_pool(config: AppConfig) -> asyncpg.Pool: + """Create a PostgreSQL connection pool.""" + return await asyncpg.create_pool( + dsn=config.postgres.dsn, + min_size=2, + max_size=10, + ) + + +def get_redis(config: AppConfig) -> aioredis.Redis: + """Create a Redis async client.""" + return aioredis.from_url( + config.redis.url, + decode_responses=True, + ) + + +def get_minio(config: AppConfig) -> Minio: + """Create a MinIO client.""" + return Minio( + config.minio.endpoint, + access_key=config.minio.access_key, + secret_key=config.minio.secret_key, + secure=config.minio.secure, + ) diff --git a/services/shared/redis_keys.py b/services/shared/redis_keys.py new file mode 100644 index 0000000..134bf89 --- /dev/null +++ b/services/shared/redis_keys.py @@ -0,0 +1,56 @@ +"""Redis key conventions and queue abstractions.""" + +# --- Key prefixes --- +PREFIX = "stonks" + +# Distributed locks +LOCK_PREFIX = f"{PREFIX}:lock" + +# Rate limit counters +RATE_LIMIT_PREFIX = f"{PREFIX}:ratelimit" + +# Job queues +QUEUE_PREFIX = f"{PREFIX}:queue" + +# Dedupe markers +DEDUPE_PREFIX = f"{PREFIX}:dedupe" + +# Cache +CACHE_PREFIX = f"{PREFIX}:cache" + +# Retry backoff state +RETRY_PREFIX = f"{PREFIX}:retry" + + +def lock_key(resource: str) -> str: + return f"{LOCK_PREFIX}:{resource}" + + +def rate_limit_key(source: str, window: str) -> str: + return f"{RATE_LIMIT_PREFIX}:{source}:{window}" + + +def queue_key(queue_name: str) -> str: + return f"{QUEUE_PREFIX}:{queue_name}" + + +def dedupe_key(content_hash: str) -> str: + return f"{DEDUPE_PREFIX}:{content_hash}" + + +def cache_key(namespace: str, key: str) -> str: + return f"{CACHE_PREFIX}:{namespace}:{key}" + + +def retry_key(job_id: str) -> str: + return f"{RETRY_PREFIX}:{job_id}" + + +# --- Queue names --- +QUEUE_INGESTION = "ingestion" +QUEUE_PARSING = "parsing" +QUEUE_EXTRACTION = "extraction" +QUEUE_AGGREGATION = "aggregation" +QUEUE_RECOMMENDATION = "recommendation" +QUEUE_LAKE_PUBLISH = "lake_publish" +QUEUE_TRADE = "trade" diff --git a/services/shared/schemas.py b/services/shared/schemas.py new file mode 100644 index 0000000..e2b4d9a --- /dev/null +++ b/services/shared/schemas.py @@ -0,0 +1,169 @@ +"""Typed JSON schemas for document intelligence, trend summaries, and recommendations.""" +from __future__ import annotations + +import uuid +from datetime import datetime +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Field + +# --- Enums --- + +class DocumentType(str, Enum): + ARTICLE = "article" + FILING = "filing" + TRANSCRIPT = "transcript" + PRESS_RELEASE = "press_release" + + +class SourceType(str, Enum): + MARKET_API = "market_api" + NEWS_API = "news_api" + FILINGS_API = "filings_api" + WEB_SCRAPE = "web_scrape" + BROKER = "broker" + + +class Sentiment(str, Enum): + POSITIVE = "positive" + NEGATIVE = "negative" + NEUTRAL = "neutral" + MIXED = "mixed" + + +class CatalystType(str, Enum): + EARNINGS = "earnings" + PRODUCT = "product" + LEGAL = "legal" + MACRO = "macro" + SUPPLY_CHAIN = "supply_chain" + M_AND_A = "m_and_a" + RATING_CHANGE = "rating_change" + OTHER = "other" + + +class TrendDirection(str, Enum): + BULLISH = "bullish" + BEARISH = "bearish" + MIXED = "mixed" + NEUTRAL = "neutral" + + +class ActionType(str, Enum): + BUY = "buy" + SELL = "sell" + HOLD = "hold" + WATCH = "watch" + + +class RecommendationMode(str, Enum): + INFORMATIONAL = "informational" + PAPER_ELIGIBLE = "paper_eligible" + LIVE_ELIGIBLE = "live_eligible" + + +class TrendWindow(str, Enum): + INTRADAY = "intraday" + ONE_DAY = "1d" + SEVEN_DAY = "7d" + THIRTY_DAY = "30d" + NINETY_DAY = "90d" + + +# --- Document Intelligence --- + +class CompanyImpact(BaseModel): + ticker: str + company_name: str + relevance: float = Field(ge=0, le=1) + sentiment: Sentiment + impact_score: float = Field(ge=0, le=1) + impact_horizon: str + catalyst_type: CatalystType + key_facts: List[str] = Field(default_factory=list) + risks: List[str] = Field(default_factory=list) + evidence_spans: List[str] = Field(default_factory=list) + + +class ModelMetadata(BaseModel): + provider: str = "ollama" + model_name: str = "" + prompt_version: str = "" + schema_version: str = "2.0.0" + + +class DocumentIntelligence(BaseModel): + document_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + document_type: DocumentType = DocumentType.ARTICLE + summary: str = "" + companies: List[CompanyImpact] = Field(default_factory=list) + macro_themes: List[str] = Field(default_factory=list) + novelty_score: float = Field(ge=0, le=1, default=0.5) + source_credibility: float = Field(ge=0, le=1, default=0.5) + extraction_warnings: List[str] = Field(default_factory=list) + confidence: float = Field(ge=0, le=1, default=0.5) + model: ModelMetadata = Field(default_factory=ModelMetadata) + + +# --- Trend Summary --- + +class TrendSummary(BaseModel): + entity_type: str = "company" + entity_id: str = "" + window: TrendWindow = TrendWindow.SEVEN_DAY + trend_direction: TrendDirection = TrendDirection.NEUTRAL + trend_strength: float = Field(ge=0, le=1, default=0.5) + confidence: float = Field(ge=0, le=1, default=0.5) + top_supporting_evidence: List[str] = Field(default_factory=list) + top_opposing_evidence: List[str] = Field(default_factory=list) + dominant_catalysts: List[str] = Field(default_factory=list) + material_risks: List[str] = Field(default_factory=list) + contradiction_score: float = Field(ge=0, le=1, default=0.0) + generated_at: datetime = Field(default_factory=datetime.utcnow) + + +# --- Recommendation --- + +class PositionSizing(BaseModel): + portfolio_pct: float = Field(ge=0, le=1, default=0.02) + max_loss_pct: float = Field(ge=0, le=1, default=0.005) + + +class Recommendation(BaseModel): + recommendation_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + ticker: str = "" + action: ActionType = ActionType.WATCH + mode: RecommendationMode = RecommendationMode.INFORMATIONAL + confidence: float = Field(ge=0, le=1, default=0.5) + time_horizon: str = "" + thesis: str = "" + invalidation_conditions: List[str] = Field(default_factory=list) + position_sizing: PositionSizing = Field(default_factory=PositionSizing) + evidence_refs: List[str] = Field(default_factory=list) + model_metadata: ModelMetadata = Field(default_factory=ModelMetadata) + generated_at: datetime = Field(default_factory=datetime.utcnow) + + +# --- Document Metadata --- + +class StorageRefs(BaseModel): + raw_html: Optional[str] = None + raw_payload: Optional[str] = None + normalized_text: Optional[str] = None + + +class DocumentMetadata(BaseModel): + document_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + document_type: DocumentType = DocumentType.ARTICLE + symbol_candidates: List[str] = Field(default_factory=list) + source_type: SourceType = SourceType.NEWS_API + publisher: str = "" + url: Optional[str] = None + canonical_url: Optional[str] = None + title: str = "" + published_at: Optional[datetime] = None + retrieved_at: datetime = Field(default_factory=datetime.utcnow) + language: str = "en" + content_hash: str = "" + storage_refs: StorageRefs = Field(default_factory=StorageRefs) diff --git a/services/symbol_registry/__init__.py b/services/symbol_registry/__init__.py new file mode 100644 index 0000000..31f1805 --- /dev/null +++ b/services/symbol_registry/__init__.py @@ -0,0 +1 @@ +# Symbol Registry Service diff --git a/services/symbol_registry/app.py b/services/symbol_registry/app.py new file mode 100644 index 0000000..67bc5b6 --- /dev/null +++ b/services/symbol_registry/app.py @@ -0,0 +1,209 @@ +"""Symbol Registry API - FastAPI application.""" +from contextlib import asynccontextmanager +from typing import List, Optional + +import asyncpg +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from services.shared.config import load_config +from services.shared.db import get_pg_pool + +config = load_config() +pool: Optional[asyncpg.Pool] = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global pool + pool = await get_pg_pool(config) + yield + await pool.close() + + +app = FastAPI(title="Stonks Oracle - Symbol Registry", lifespan=lifespan) + + +# --- Request/Response Models --- + +class CompanyCreate(BaseModel): + ticker: str + legal_name: str + exchange: Optional[str] = None + sector: Optional[str] = None + industry: Optional[str] = None + market_cap_bucket: Optional[str] = None + + +class CompanyResponse(BaseModel): + id: str + ticker: str + legal_name: str + exchange: Optional[str] + sector: Optional[str] + industry: Optional[str] + market_cap_bucket: Optional[str] + active: bool + + +class AliasCreate(BaseModel): + alias: str + alias_type: str = "brand" + + +class WatchlistCreate(BaseModel): + name: str + description: Optional[str] = None + + +class SourceCreate(BaseModel): + source_type: str + source_name: str + config: dict = {} + credibility_score: float = 0.5 + retention_days: int = 365 + access_policy: str = "internal" + + +VALID_SOURCE_TYPES = {"market_api", "news_api", "filings_api", "web_scrape", "broker"} + + +# --- Company Endpoints --- + +@app.post("/companies", response_model=CompanyResponse, status_code=201) +async def create_company(body: CompanyCreate): + try: + row = await pool.fetchrow( + """INSERT INTO companies (ticker, legal_name, exchange, sector, industry, market_cap_bucket) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, ticker, legal_name, exchange, sector, industry, market_cap_bucket, active""", + body.ticker.upper(), body.legal_name, body.exchange, body.sector, + body.industry, body.market_cap_bucket, + ) + except asyncpg.UniqueViolationError: + raise HTTPException(409, f"Company {body.ticker} on {body.exchange} already exists") + return dict(row) + + +@app.get("/companies", response_model=List[CompanyResponse]) +async def list_companies(active: bool = True): + rows = await pool.fetch( + "SELECT id, ticker, legal_name, exchange, sector, industry, market_cap_bucket, active FROM companies WHERE active = $1 ORDER BY ticker", + active, + ) + return [dict(r) for r in rows] + + +@app.get("/companies/{company_id}", response_model=CompanyResponse) +async def get_company(company_id: str): + row = await pool.fetchrow( + "SELECT id, ticker, legal_name, exchange, sector, industry, market_cap_bucket, active FROM companies WHERE id = $1", + company_id, + ) + if not row: + raise HTTPException(404, "Company not found") + return dict(row) + + +@app.put("/companies/{company_id}", response_model=CompanyResponse) +async def update_company(company_id: str, body: CompanyCreate): + row = await pool.fetchrow( + """UPDATE companies SET ticker=$2, legal_name=$3, exchange=$4, sector=$5, industry=$6, market_cap_bucket=$7, updated_at=NOW() + WHERE id=$1 + RETURNING id, ticker, legal_name, exchange, sector, industry, market_cap_bucket, active""", + company_id, body.ticker.upper(), body.legal_name, body.exchange, + body.sector, body.industry, body.market_cap_bucket, + ) + if not row: + raise HTTPException(404, "Company not found") + return dict(row) + + +# --- Alias Endpoints --- + +@app.post("/companies/{company_id}/aliases", status_code=201) +async def add_alias(company_id: str, body: AliasCreate): + row = await pool.fetchrow( + "INSERT INTO company_aliases (company_id, alias, alias_type) VALUES ($1, $2, $3) RETURNING id, alias, alias_type", + company_id, body.alias, body.alias_type, + ) + return dict(row) + + +@app.get("/companies/{company_id}/aliases") +async def list_aliases(company_id: str): + rows = await pool.fetch( + "SELECT id, alias, alias_type FROM company_aliases WHERE company_id = $1", + company_id, + ) + return [dict(r) for r in rows] + + +# --- Watchlist Endpoints --- + +@app.post("/watchlists", status_code=201) +async def create_watchlist(body: WatchlistCreate): + try: + row = await pool.fetchrow( + "INSERT INTO watchlists (name, description) VALUES ($1, $2) RETURNING id, name, description, active", + body.name, body.description, + ) + except asyncpg.UniqueViolationError: + raise HTTPException(409, f"Watchlist '{body.name}' already exists") + return dict(row) + + +@app.get("/watchlists") +async def list_watchlists(): + rows = await pool.fetch("SELECT id, name, description, active FROM watchlists ORDER BY name") + return [dict(r) for r in rows] + + +@app.post("/watchlists/{watchlist_id}/members/{company_id}", status_code=201) +async def add_watchlist_member(watchlist_id: str, company_id: str): + try: + await pool.execute( + "INSERT INTO watchlist_members (watchlist_id, company_id) VALUES ($1, $2)", + watchlist_id, company_id, + ) + except asyncpg.UniqueViolationError: + raise HTTPException(409, "Already a member") + except asyncpg.ForeignKeyViolationError: + raise HTTPException(404, "Watchlist or company not found") + return {"status": "added"} + + +@app.get("/watchlists/{watchlist_id}/members") +async def list_watchlist_members(watchlist_id: str): + rows = await pool.fetch( + """SELECT c.id, c.ticker, c.legal_name, c.exchange, c.sector, c.industry, c.market_cap_bucket, c.active + FROM companies c JOIN watchlist_members wm ON c.id = wm.company_id + WHERE wm.watchlist_id = $1 ORDER BY c.ticker""", + watchlist_id, + ) + return [dict(r) for r in rows] + + +# --- Source Endpoints --- + +@app.post("/companies/{company_id}/sources", status_code=201) +async def add_source(company_id: str, body: SourceCreate): + if body.source_type not in VALID_SOURCE_TYPES: + raise HTTPException(400, f"Invalid source_type. Must be one of: {VALID_SOURCE_TYPES}") + row = await pool.fetchrow( + """INSERT INTO sources (company_id, source_type, source_name, config, credibility_score, retention_days, access_policy) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id, source_type, source_name, credibility_score, active""", + company_id, body.source_type, body.source_name, + body.config, body.credibility_score, body.retention_days, body.access_policy, + ) + return dict(row) + + +@app.get("/companies/{company_id}/sources") +async def list_sources(company_id: str): + rows = await pool.fetch( + "SELECT id, source_type, source_name, config, credibility_score, retention_days, access_policy, active FROM sources WHERE company_id = $1", + company_id, + ) + return [dict(r) for r in rows] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..a43d823 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,22 @@ +"""Basic tests for shared config loader.""" +from services.shared.config import load_config, AppConfig + + +def test_load_config_returns_app_config(): + config = load_config() + assert isinstance(config, AppConfig) + + +def test_postgres_dsn_format(): + config = load_config() + assert config.postgres.dsn.startswith("postgresql://") + + +def test_redis_url_format(): + config = load_config() + assert config.redis.url.startswith("redis://") + + +def test_default_broker_mode(): + config = load_config() + assert config.broker.mode == "paper" diff --git a/tests/test_redis_keys.py b/tests/test_redis_keys.py new file mode 100644 index 0000000..a0c928e --- /dev/null +++ b/tests/test_redis_keys.py @@ -0,0 +1,31 @@ +"""Basic tests for Redis key conventions.""" +from services.shared.redis_keys import ( + lock_key, + rate_limit_key, + queue_key, + dedupe_key, + cache_key, + QUEUE_INGESTION, + QUEUE_PARSING, +) + + +def test_lock_key_format(): + assert lock_key("scheduler") == "stonks:lock:scheduler" + + +def test_rate_limit_key_format(): + assert rate_limit_key("news_api", "202604111200") == "stonks:ratelimit:news_api:202604111200" + + +def test_queue_key_format(): + assert queue_key(QUEUE_INGESTION) == "stonks:queue:ingestion" + assert queue_key(QUEUE_PARSING) == "stonks:queue:parsing" + + +def test_dedupe_key_format(): + assert dedupe_key("abc123").startswith("stonks:dedupe:") + + +def test_cache_key_format(): + assert cache_key("companies", "AAPL") == "stonks:cache:companies:AAPL" diff --git a/tests/test_schemas.py b/tests/test_schemas.py new file mode 100644 index 0000000..52cb94c --- /dev/null +++ b/tests/test_schemas.py @@ -0,0 +1,50 @@ +"""Basic smoke tests for shared schemas.""" +from services.shared.schemas import ( + DocumentIntelligence, + TrendSummary, + Recommendation, + DocumentMetadata, + CompanyImpact, + Sentiment, + CatalystType, + ActionType, +) + + +def test_document_intelligence_defaults(): + di = DocumentIntelligence() + assert di.document_id + assert di.confidence == 0.5 + assert di.companies == [] + + +def test_company_impact_validation(): + ci = CompanyImpact( + ticker="AAPL", + company_name="Apple Inc.", + relevance=0.9, + sentiment=Sentiment.POSITIVE, + impact_score=0.7, + impact_horizon="1d_30d", + catalyst_type=CatalystType.EARNINGS, + ) + assert ci.ticker == "AAPL" + assert ci.sentiment == Sentiment.POSITIVE + + +def test_trend_summary_defaults(): + ts = TrendSummary() + assert ts.trend_strength == 0.5 + assert ts.contradiction_score == 0.0 + + +def test_recommendation_defaults(): + rec = Recommendation() + assert rec.action == ActionType.WATCH + assert rec.position_sizing.portfolio_pct == 0.02 + + +def test_document_metadata_defaults(): + dm = DocumentMetadata() + assert dm.document_id + assert dm.language == "en"