diff --git a/services/extractor/main.py b/services/extractor/main.py index a7e25c7..1ebe0b0 100644 --- a/services/extractor/main.py +++ b/services/extractor/main.py @@ -61,6 +61,25 @@ async def main() -> None: ticker = job.get("ticker", "") text = job.get("text", "") or job.get("normalized_text", "") + # If no text in job, try to fetch from MinIO via the document's normalized_storage_ref + if not text: + ref_row = await pool.fetchrow( + "SELECT normalized_storage_ref FROM documents WHERE id = $1::uuid", + document_id, + ) + if ref_row and ref_row["normalized_storage_ref"]: + try: + ref = ref_row["normalized_storage_ref"] + # ref format: s3://bucket/path + parts = ref.replace("s3://", "").split("/", 1) + if len(parts) == 2: + obj = minio_client.get_object(parts[0], parts[1]) + text = obj.read().decode("utf-8") + obj.close() + obj.release_conn() + except Exception as e: + logger.warning("Could not fetch normalized text for doc %s: %s", document_id, e) + logger.info("Processing extraction job for doc %s / %s", document_id, ticker) # Refresh company map every 100 jobs