From 48bf4f7e7e1d5929a32d6335f6ff47ada16f822f Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Sun, 12 Apr 2026 03:24:10 -0700 Subject: [PATCH] phase 17: extractor fetches normalized text from MinIO when not in job payload --- services/extractor/main.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/services/extractor/main.py b/services/extractor/main.py index a7e25c7..1ebe0b0 100644 --- a/services/extractor/main.py +++ b/services/extractor/main.py @@ -61,6 +61,25 @@ async def main() -> None: ticker = job.get("ticker", "") text = job.get("text", "") or job.get("normalized_text", "") + # If no text in job, try to fetch from MinIO via the document's normalized_storage_ref + if not text: + ref_row = await pool.fetchrow( + "SELECT normalized_storage_ref FROM documents WHERE id = $1::uuid", + document_id, + ) + if ref_row and ref_row["normalized_storage_ref"]: + try: + ref = ref_row["normalized_storage_ref"] + # ref format: s3://bucket/path + parts = ref.replace("s3://", "").split("/", 1) + if len(parts) == 2: + obj = minio_client.get_object(parts[0], parts[1]) + text = obj.read().decode("utf-8") + obj.close() + obj.release_conn() + except Exception as e: + logger.warning("Could not fetch normalized text for doc %s: %s", document_id, e) + logger.info("Processing extraction job for doc %s / %s", document_id, ticker) # Refresh company map every 100 jobs