diff --git a/scripts/check_input.py b/scripts/check_input.py new file mode 100644 index 0000000..1f2344b --- /dev/null +++ b/scripts/check_input.py @@ -0,0 +1,45 @@ +from minio import Minio +import os, json + +mc = Minio(os.environ["MINIO_ENDPOINT"], access_key=os.environ["MINIO_ACCESS_KEY"], secret_key=os.environ["MINIO_SECRET_KEY"], secure=False) + +# Check the most recent extraction - what text did the model get? +# Look at the normalized text for a known doc +import asyncio, asyncpg + +async def main(): + pool = await asyncpg.create_pool( + host=os.environ["POSTGRES_HOST"], + port=int(os.environ["POSTGRES_PORT"]), + database=os.environ["POSTGRES_DB"], + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], + ) + + # Get a recently extracted doc + row = await pool.fetchrow( + "SELECT id, title, normalized_storage_ref, parse_quality_score " + "FROM documents WHERE source_type = 'news_api' AND parse_quality_score > 0.8 " + "ORDER BY updated_at DESC LIMIT 1" + ) + + if row: + print(f"Doc: {row['id']}") + print(f"Title: {row['title']}") + print(f"Quality: {row['parse_quality_score']}") + print(f"Ref: {row['normalized_storage_ref']}") + + ref = row["normalized_storage_ref"] + parts = ref.replace("s3://", "").split("/", 1) + if len(parts) == 2: + obj = mc.get_object(parts[0], parts[1]) + text = obj.read().decode("utf-8") + obj.close() + obj.release_conn() + print(f"Text length: {len(text)} chars") + print(f"First 500 chars:") + print(text[:500]) + + await pool.close() + +asyncio.run(main()) diff --git a/scripts/reenqueue_test.py b/scripts/reenqueue_test.py new file mode 100644 index 0000000..5aa4ddc --- /dev/null +++ b/scripts/reenqueue_test.py @@ -0,0 +1,29 @@ +import asyncio, asyncpg, json, os, redis + +async def main(): + pool = await asyncpg.create_pool( + host=os.environ["POSTGRES_HOST"], + port=int(os.environ["POSTGRES_PORT"]), + database=os.environ["POSTGRES_DB"], + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], + ) + r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD', '')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0") + + rows = await pool.fetch( + "SELECT d.id, dcm.ticker FROM documents d " + "LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id " + "WHERE d.source_type = 'news_api' AND d.parse_quality_score > 0.7 " + "ORDER BY d.parse_quality_score DESC LIMIT 5" + ) + + for row in rows: + r.rpush("stonks:queue:extraction", json.dumps({ + "document_id": str(row["id"]), + "ticker": row["ticker"] or "", + })) + + print(f"Enqueued {len(rows)} high-quality docs for re-extraction") + await pool.close() + +asyncio.run(main()) diff --git a/services/extractor/prompts.py b/services/extractor/prompts.py index 086dac0..9bb0c40 100644 --- a/services/extractor/prompts.py +++ b/services/extractor/prompts.py @@ -8,7 +8,6 @@ Requirements: 5.1, 5.2, 5.3, 5.4, 5.5 """ from __future__ import annotations -import json from typing import Any from services.extractor.schemas import SCHEMA_VERSION, generate_json_schema @@ -110,8 +109,6 @@ def build_extraction_prompt( "Use your judgment — but only include companies where the connection is clear from the text." ) - schema_str = json.dumps(EXTRACTION_JSON_SCHEMA, indent=2) - doc_id_line = f"Document ID: {document_id}\n" if document_id else "" user_prompt = f"""\ @@ -120,8 +117,7 @@ Extract structured intelligence from the following document. {doc_id_line}Document type: {document_type} {doctype_guidance} {ticker_hint} -Your output MUST be a single JSON object conforming to this schema: -{schema_str} +Return a JSON object with: summary, companies (array with ticker, company_name, relevance, sentiment, impact_score, impact_horizon, catalyst_type, key_facts, risks, evidence_spans), macro_themes, novelty_score, confidence, extraction_warnings. REMEMBER: - Only extract what is explicitly in the text below.