Salud_UT/backend/src/extraer_autorizacion_pdf.py
2026-01-13 19:02:37 -05:00

757 lines
23 KiB
Python

import json
import os
import re
import sys
import unicodedata
def normalize(text):
text = text or ""
text = unicodedata.normalize("NFD", text)
text = "".join(ch for ch in text if not unicodedata.combining(ch))
text = re.sub(r"\s+", " ", text).strip()
return text.upper()
def extract_digits_from_text(text, min_len=6, max_len=20):
if not text:
return None
match = re.search(rf"\b\d{{{min_len},{max_len}}}\b", text)
if match:
return match.group(0)
groups = re.findall(rf"(?:\d[\s\-]*){{{min_len},{max_len}}}", text)
for group in groups:
digits = re.sub(r"\D", "", group)
if min_len <= len(digits) <= max_len:
return digits
return None
def extract_date_from_text(text):
if not text:
return None
match = re.search(
r"\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2})\b",
text,
)
if match:
return match.group(1)
return None
def extract_time_from_text(text):
if not text:
return None
match = re.search(r"\b(\d{1,2}:\d{2})\b", text)
if match:
return match.group(1)
return None
def extract_text_pdfplumber(path):
try:
import pdfplumber # type: ignore
except Exception:
return ""
parts = []
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
parts.append(page.extract_text() or "")
return "\n".join(parts)
def extract_text_fitz(path):
try:
import fitz # type: ignore
except Exception:
return ""
parts = []
doc = fitz.open(path)
try:
for page in doc:
parts.append(page.get_text() or "")
finally:
doc.close()
return "\n".join(parts)
def configure_tesseract():
try:
import pytesseract # type: ignore
except Exception:
return None
candidates = []
env_path = os.environ.get("TESSERACT_PATH")
if env_path:
candidates.append(env_path)
candidates.extend(
[
r"C:\Program Files\Tesseract-OCR\tesseract.exe",
r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe",
]
)
for path in candidates:
if path and os.path.isfile(path):
pytesseract.pytesseract.tesseract_cmd = path
return path
return None
def is_tesseract_available():
try:
import pytesseract # type: ignore
except Exception:
return False
configure_tesseract()
try:
_ = pytesseract.get_tesseract_version()
return True
except Exception:
return False
def extract_text_ocr(path, max_pages=2):
try:
import fitz # type: ignore
import pytesseract # type: ignore
from PIL import Image # type: ignore
except Exception:
return "", "ocr_modules_missing"
configure_tesseract()
try:
_ = pytesseract.get_tesseract_version()
except Exception:
return "", "tesseract_not_found"
text_parts = []
ocr_error = ""
doc = fitz.open(path)
try:
total_pages = min(max_pages, doc.page_count)
for i in range(total_pages):
page = doc.load_page(i)
pix = page.get_pixmap(dpi=200)
img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
try:
text = pytesseract.image_to_string(img, lang="spa") or ""
except Exception:
try:
text = pytesseract.image_to_string(img, lang="eng") or ""
if not ocr_error:
ocr_error = "tesseract_lang_missing_spa"
except Exception:
return "", "tesseract_lang_missing"
text_parts.append(text)
finally:
doc.close()
return "\n".join(text_parts), ocr_error
def extract_name(lines, norm_lines):
keys = ["DATOS DEL USUARIO", "DATOS DEL PACIENTE"]
idx = -1
for key in keys:
for i, line in enumerate(norm_lines):
if key in line:
idx = i
break
if idx != -1:
break
if idx == -1:
for line in lines:
if normalize(line).startswith("PACIENTE:"):
parts = line.split(":", 1)
if len(parts) == 2:
candidate = parts[1].split("-")[0].strip()
if candidate:
return candidate
return None
skip_tokens = [
"1ER APELLIDO",
"2DO APELLIDO",
"1ER NOMBRE",
"2DO NOMBRE",
"TIPO DOCUMENTO",
"DOCUMENTO DE IDENTIFICACION",
"REGISTRO CIVIL",
"TARJETA",
"CEDULA",
"NUIP",
]
for j in range(idx + 1, min(idx + 6, len(lines))):
nline = norm_lines[j]
if any(token in nline for token in skip_tokens):
continue
if len(lines[j].split()) >= 2:
return lines[j]
return None
def extract_document(lines, norm_lines):
idx = -1
for i, line in enumerate(norm_lines):
if (
"TIPO DOCUMENTO" in line
or "TIPO DE DOCUMENTO" in line
or "NUMERO DOCUMENTO" in line
or "DOCUMENTO DE IDENTIFICACION" in line
):
idx = i
break
if idx != -1:
for j in range(idx, min(idx + 8, len(lines))):
digits = extract_digits_from_text(lines[j])
if digits:
return digits
for i, line in enumerate(norm_lines):
if (
"CEDULA" in line
or "DOCUMENTO" in line
or "NUIP" in line
or "PASAPORTE" in line
):
digits = extract_digits_from_text(lines[i])
if digits:
return digits
return None
def extract_cups_code(text):
if not text:
return None
match = re.search(
r"\b(?=[A-Z0-9]{4,10}\b)[A-Z]*\d[A-Z0-9]*\b", text, re.IGNORECASE
)
if match:
return match.group(0)
digits = extract_digits_from_text(text, min_len=4, max_len=10)
if digits:
return digits
return None
def extract_cups_list(lines, norm_lines):
cups = []
header_idx = -1
for i, nline in enumerate(norm_lines):
if "CUPS" in nline and "CODIGO" in nline:
header_idx = i
break
stop_tokens = [
"JUSTIFICACION",
"IMPRESION",
"DIAGNOSTICO",
"INFORMACION",
"NOMBRE",
"RESPONSABLE",
"SOLICITA",
"SOLICITANTE",
"FIRMA",
]
def add_cup(code, desc):
if not code:
return
for existing, _ in cups:
if existing == code:
return
cups.append((code, desc or None))
if header_idx != -1:
for j in range(header_idx + 1, min(header_idx + 20, len(lines))):
nline = norm_lines[j]
if any(token in nline for token in stop_tokens):
break
code = extract_cups_code(lines[j])
if not code:
continue
raw = lines[j]
desc = ""
if code in raw:
desc = raw.split(code, 1)[-1].strip(" -:")
desc = re.sub(r"^\d+[.,]\d{1,2}(?:\s+|$)", "", desc)
desc = re.sub(r"^\d+(?:\s+|$)", "", desc)
if not desc and j + 1 < len(lines):
desc = lines[j + 1].strip()
add_cup(code, desc)
if not cups:
for i, line in enumerate(norm_lines):
if (
"CUPS" in line
or re.search(
r"C\s*[\.\-]?\s*U\s*[\.\-]?\s*P\s*[\.\-]?\s*S",
line,
re.IGNORECASE,
)
or re.search(r"\bCUP\b", line, re.IGNORECASE)
):
for j in range(i, min(i + 8, len(lines))):
code = extract_cups_code(lines[j])
if not code:
continue
raw = lines[j]
desc = ""
if code in raw:
desc = raw.split(code, 1)[-1].strip(" -:")
if not desc and j + 1 < len(lines):
desc = lines[j + 1].strip()
add_cup(code, desc)
if cups:
break
return cups
def extract_cups(lines, norm_lines):
cups = extract_cups_list(lines, norm_lines)
if cups:
return cups[0]
return None, None
def normalize_cie10_code(code):
if not code:
return None
cleaned = re.sub(r"[\s\.]", "", code).upper()
cleaned = re.sub(r"[^A-Z0-9]", "", cleaned)
return cleaned or None
def clean_diagnosis_desc(desc):
if not desc:
return None
desc = re.sub(r"(?i)impresion diagnostica", "", desc)
desc = re.sub(
r"(?i)diagnostico(s)?( principal| de egreso| egreso| principal)?", "", desc
)
desc = re.sub(r"(?i)dx( principal| de egreso| egreso| secundaria| secundario)?", "", desc)
desc = re.sub(r"(?i)cie[-\s]*10", "", desc)
desc = desc.strip(" -:")
return desc or None
def extract_cie10_codes_from_line(line):
if not line:
return []
matches = re.findall(
r"\b([A-Z]\s*\d{2,4}(?:\s*\.\s*\d{1,2})?[A-Z0-9]?)\b",
line,
re.IGNORECASE,
)
codes = []
for match in matches:
code = normalize_cie10_code(match)
if code and code not in codes:
codes.append(code)
return codes
def parse_cie10_from_line(line):
if not line:
return None, None
match = re.search(
r"\(([A-Z][0-9]{2,4}(?:\.[0-9]{1,2})?[A-Z0-9]?)\)",
line,
re.IGNORECASE,
)
if match:
code = normalize_cie10_code(match.group(1))
desc = clean_diagnosis_desc(line[: match.start()].strip(" -:"))
return code, desc.title() if desc else None
match = re.search(
r"\b([A-Z]\s*\d{2,4}(?:\s*\.\s*\d{1,2})?[A-Z0-9]?)\b",
line,
re.IGNORECASE,
)
if match:
code = normalize_cie10_code(match.group(1))
desc = line[match.end():].strip(" -:")
if not desc:
desc = line[: match.start()].strip(" -:")
desc = clean_diagnosis_desc(desc)
return code, desc.title() if desc else None
return None, None
def extract_cups_hint(lines, norm_lines):
keys = [
"EXAMENES Y PROCEDIMIENTOS ORDENADOS",
"EXAMENES Y PROCEDIMIENTOS",
"PROCEDIMIENTOS ORDENADOS",
"PROCEDIMIENTOS",
]
idx = -1
for key in keys:
for i, line in enumerate(norm_lines):
if key in line:
idx = i
break
if idx != -1:
break
if idx == -1:
return None
for j in range(idx + 1, min(idx + 10, len(lines))):
raw = lines[j].strip(" -*")
nline = norm_lines[j]
if not raw or "ORDEN NRO" in nline or "ORDEN NO" in nline:
continue
if len(raw.split()) < 2:
continue
candidate = raw.split("(")[0].strip(" -")
candidate = re.sub(r"(?i)AUTOMATIZADO", "", candidate).strip(" -")
if candidate:
return candidate
return None
def extract_cie10_list(lines, norm_lines):
results = []
seen = set()
diag_tokens = [
"DIAGNOSTICO",
"DIAGNOSTICOS",
"IMPRESION DIAGNOSTICA",
"CIE10",
"CIE-10",
"DX",
"DX PRINCIPAL",
"DX EGRESO",
"DIAGNOSTICO PRINCIPAL",
"DIAGNOSTICO DE EGRESO",
"DIAGNOSTICO EGRESO",
]
for i, nline in enumerate(norm_lines):
if not any(token in nline for token in diag_tokens):
continue
for j in range(i, min(i + 12, len(lines))):
code, desc = parse_cie10_from_line(lines[j])
if code and code not in seen:
seen.add(code)
results.append((code, desc))
for extra_code in extract_cie10_codes_from_line(lines[j]):
if extra_code and extra_code not in seen:
seen.add(extra_code)
results.append((extra_code, None))
return results
def extract_cie10(lines, norm_lines):
cie_list = extract_cie10_list(lines, norm_lines)
if cie_list:
return cie_list[0]
return None, None
def extract_fecha_ingreso_urgencias(lines, norm_lines):
keys = [
"INGRESO A URGENCIAS",
"INGRESO URGENCIAS",
"FECHA DE INGRESO",
"FECHA INGRESO",
"INGRESA",
"INGRESO",
]
for i, nline in enumerate(norm_lines):
if not any(key in nline for key in keys):
continue
match = re.search(
r"INGRES[AO][^0-9]{0,20}(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})",
nline,
)
if match:
return match.group(1)
date = extract_date_from_text(lines[i])
if not date and i + 1 < len(lines):
date = extract_date_from_text(lines[i + 1])
if date:
time = extract_time_from_text(lines[i]) or (
extract_time_from_text(lines[i + 1]) if i + 1 < len(lines) else None
)
return f"{date} {time}" if time else date
return None
def extract_fecha_egreso(lines, norm_lines):
keys = [
"FECHA DE EGRESO",
"FECHA EGRESO",
"EGRESO",
"EGRESA",
"ALTA MEDICA",
"ALTA HOSPITALARIA",
"FECHA DE ALTA",
"FECHA ALTA",
]
candidates = []
for i, nline in enumerate(norm_lines):
if not any(key in nline for key in keys):
continue
match = re.search(
r"(EGRESO|EGRESA|ALTA)[^0-9]{0,40}(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})",
nline,
)
date = match.group(2) if match else None
if not date:
date = extract_date_from_text(lines[i])
if not date and i + 1 < len(lines):
date = extract_date_from_text(lines[i + 1])
if date:
time = extract_time_from_text(lines[i]) or (
extract_time_from_text(lines[i + 1]) if i + 1 < len(lines) else None
)
candidates.append(f"{date} {time}" if time else date)
return candidates[-1] if candidates else None
def clean_ips_name(value):
if not value:
return ""
text = value.strip()
text = re.split(r"\bCC\b", text, maxsplit=1, flags=re.IGNORECASE)[0]
text = re.split(r"\bNUMERO\b", text, maxsplit=1, flags=re.IGNORECASE)[0]
text = re.split(r"\bNIT\b", text, maxsplit=1, flags=re.IGNORECASE)[0]
text = re.split(r"\bCODIGO\b", text, maxsplit=1, flags=re.IGNORECASE)[0]
return text.strip(" -:")
def extract_ips(lines, norm_lines):
nombre = None
nit = None
idx = -1
for i, line in enumerate(norm_lines):
if "INFORMACION DEL PRESTADOR" in line:
idx = i
break
if idx != -1:
for j in range(idx + 1, min(idx + 8, len(lines))):
if not nit and "NIT" in norm_lines[j]:
nit = extract_digits_from_text(lines[j], min_len=6, max_len=15)
if not nombre:
if "NOMBRE" in norm_lines[j] and ":" in lines[j] and "NIT" not in norm_lines[j]:
candidate = lines[j].split(":", 1)[1].strip()
candidate = clean_ips_name(candidate)
if len(candidate.split()) >= 2:
nombre = candidate
continue
if (
"NOMBRE" not in norm_lines[j]
and "NIT" not in norm_lines[j]
and "CODIGO" not in norm_lines[j]
):
candidate = clean_ips_name(lines[j])
if len(candidate.split()) >= 2:
nombre = candidate
if nombre and nit:
break
if not nombre:
for line in lines[:10]:
if re.search(r"\b(HOSPITAL|CLINICA|ESE|IPS|CENTRO MEDICO)\b", line, re.IGNORECASE):
candidate = clean_ips_name(line)
if len(candidate.split()) >= 2:
nombre = candidate
break
return nombre or None, nit or None
def detect_format(norm_text, norm_lines):
if "ANEXO TECNICO" in norm_text or "SOLICITUD DE AUTORIZACION" in norm_text:
return "ANEXO_TECNICO"
for line in norm_lines:
if "ATENCION INICIAL DE URGENCIAS" in line:
return "ANEXO_URGENCIAS"
return "DESCONOCIDO"
def merge_unique(base_list, extra_list):
result = list(base_list or [])
for item in extra_list or []:
if item not in result:
result.append(item)
return result
def merge_response(base, extra):
result = dict(base)
if base.get("cups_codigos"):
result["cups_codigos"] = list(base.get("cups_codigos") or [])
result["cups_descripciones"] = list(base.get("cups_descripciones") or [])
else:
result["cups_codigos"] = merge_unique(
base.get("cups_codigos"), extra.get("cups_codigos")
)
result["cups_descripciones"] = merge_unique(
base.get("cups_descripciones"), extra.get("cups_descripciones")
)
result["cie10_codigos"] = merge_unique(
base.get("cie10_codigos"), extra.get("cie10_codigos")
)
result["cie10_descripciones"] = merge_unique(
base.get("cie10_descripciones"), extra.get("cie10_descripciones")
)
if not result.get("cup_codigo") and extra.get("cup_codigo"):
result["cup_codigo"] = extra.get("cup_codigo")
result["cup_descripcion"] = extra.get("cup_descripcion")
if not result.get("cie10_codigo") and extra.get("cie10_codigo"):
result["cie10_codigo"] = extra.get("cie10_codigo")
result["cie10_descripcion"] = extra.get("cie10_descripcion")
if not result.get("ips_nombre") and extra.get("ips_nombre"):
result["ips_nombre"] = extra.get("ips_nombre")
if not result.get("ips_nit") and extra.get("ips_nit"):
result["ips_nit"] = extra.get("ips_nit")
if not result.get("fecha_ingreso_urgencias") and extra.get(
"fecha_ingreso_urgencias"
):
result["fecha_ingreso_urgencias"] = extra.get("fecha_ingreso_urgencias")
if not result.get("fecha_egreso") and extra.get("fecha_egreso"):
result["fecha_egreso"] = extra.get("fecha_egreso")
if not result.get("cups_busqueda") and extra.get("cups_busqueda"):
result["cups_busqueda"] = extra.get("cups_busqueda")
if result.get("formato") == "DESCONOCIDO" and extra.get("formato"):
result["formato"] = extra.get("formato")
if not result.get("cup_codigo") and result.get("cups_codigos"):
result["cup_codigo"] = result["cups_codigos"][0]
if not result.get("cie10_codigo") and result.get("cie10_codigos"):
result["cie10_codigo"] = ", ".join(result["cie10_codigos"])
if not result.get("cie10_descripcion") and result.get("cie10_descripciones"):
result["cie10_descripcion"] = ", ".join(
[d for d in result["cie10_descripciones"] if d]
)
warnings = list(result.get("warnings") or [])
if result.get("cup_codigo") or result.get("cups_codigos"):
warnings = [w for w in warnings if w != "cups_not_found"]
if result.get("cie10_codigo") or result.get("cie10_codigos"):
warnings = [w for w in warnings if w != "cie10_not_found"]
if result.get("ips_nombre") or result.get("ips_nit"):
warnings = [w for w in warnings if w != "ips_not_found"]
result["warnings"] = warnings
return result
def build_response(text, ocr_used, ocr_available, ocr_error):
lines = [line.strip() for line in (text or "").split("\n") if line.strip()]
norm_lines = [normalize(line) for line in lines]
norm_text = normalize(text)
nombre = extract_name(lines, norm_lines)
documento = extract_document(lines, norm_lines)
cups_list = extract_cups_list(lines, norm_lines)
cup_codigo, cup_desc = (cups_list[0] if cups_list else (None, None))
cups_codigos = [item[0] for item in cups_list]
cups_descripciones = [item[1] for item in cups_list]
cups_busqueda = extract_cups_hint(lines, norm_lines)
cie_list = extract_cie10_list(lines, norm_lines)
cie_codigos = [item[0] for item in cie_list]
cie_descs = [item[1] for item in cie_list]
cie_codigo = ", ".join(cie_codigos) if cie_codigos else None
cie_desc = ", ".join([d for d in cie_descs if d]) if any(cie_descs) else None
ips_nombre, ips_nit = extract_ips(lines, norm_lines)
fecha_ingreso = extract_fecha_ingreso_urgencias(lines, norm_lines)
fecha_egreso = extract_fecha_egreso(lines, norm_lines)
formato = detect_format(norm_text, norm_lines)
warnings = []
if not text:
warnings.append("no_text_extracted")
if not cup_codigo and not cups_codigos:
warnings.append("cups_not_found")
if not cie_codigo and not cie_codigos:
warnings.append("cie10_not_found")
if not ips_nombre and not ips_nit:
warnings.append("ips_not_found")
return {
"ok": True,
"text_length": len(norm_text),
"ocr_usado": ocr_used,
"ocr_disponible": ocr_available,
"ocr_error": ocr_error or None,
"formato": formato,
"nombre_paciente": nombre,
"numero_documento": documento,
"cup_codigo": cup_codigo,
"cup_descripcion": cup_desc,
"cups_codigos": cups_codigos,
"cups_descripciones": cups_descripciones,
"cups_busqueda": cups_busqueda,
"cie10_codigo": cie_codigo,
"cie10_descripcion": cie_desc,
"cie10_codigos": cie_codigos,
"cie10_descripciones": cie_descs,
"ips_nombre": ips_nombre,
"ips_nit": ips_nit,
"fecha_ingreso_urgencias": fecha_ingreso,
"fecha_egreso": fecha_egreso,
"warnings": warnings,
}
def main():
if len(sys.argv) < 2:
print(json.dumps({"ok": False, "error": "missing_file"}, ensure_ascii=True))
return
path = sys.argv[1]
text = extract_text_pdfplumber(path)
if not text:
text = extract_text_fitz(path)
ocr_used = False
ocr_error = ""
ocr_available = is_tesseract_available()
response = build_response(text, False, ocr_available, None)
needs_ocr = ocr_available and (
not response.get("cup_codigo") or not response.get("cie10_codigo")
)
if needs_ocr:
ocr_text, ocr_error = extract_text_ocr(path)
if ocr_text:
ocr_used = True
ocr_response = build_response(ocr_text, True, ocr_available, ocr_error)
response = merge_response(response, ocr_response)
if ocr_error:
response["ocr_error"] = ocr_error
response["ocr_usado"] = bool(ocr_used)
response["ocr_disponible"] = bool(ocr_available)
if "ocr_error" not in response:
response["ocr_error"] = ocr_error or None
print(json.dumps(response, ensure_ascii=True))
if __name__ == "__main__":
main()