phase 17: extractor fetches normalized text from MinIO when not in job payload
This commit is contained in:
@@ -61,6 +61,25 @@ async def main() -> None:
|
|||||||
ticker = job.get("ticker", "")
|
ticker = job.get("ticker", "")
|
||||||
text = job.get("text", "") or job.get("normalized_text", "")
|
text = job.get("text", "") or job.get("normalized_text", "")
|
||||||
|
|
||||||
|
# If no text in job, try to fetch from MinIO via the document's normalized_storage_ref
|
||||||
|
if not text:
|
||||||
|
ref_row = await pool.fetchrow(
|
||||||
|
"SELECT normalized_storage_ref FROM documents WHERE id = $1::uuid",
|
||||||
|
document_id,
|
||||||
|
)
|
||||||
|
if ref_row and ref_row["normalized_storage_ref"]:
|
||||||
|
try:
|
||||||
|
ref = ref_row["normalized_storage_ref"]
|
||||||
|
# ref format: s3://bucket/path
|
||||||
|
parts = ref.replace("s3://", "").split("/", 1)
|
||||||
|
if len(parts) == 2:
|
||||||
|
obj = minio_client.get_object(parts[0], parts[1])
|
||||||
|
text = obj.read().decode("utf-8")
|
||||||
|
obj.close()
|
||||||
|
obj.release_conn()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Could not fetch normalized text for doc %s: %s", document_id, e)
|
||||||
|
|
||||||
logger.info("Processing extraction job for doc %s / %s", document_id, ticker)
|
logger.info("Processing extraction job for doc %s / %s", document_id, ticker)
|
||||||
|
|
||||||
# Refresh company map every 100 jobs
|
# Refresh company map every 100 jobs
|
||||||
|
|||||||
Reference in New Issue
Block a user