You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
10 KiB

import os
import sys
import cv2
import numpy as np
from classification import classify_projection, classify_stitched
from config import load_config
from detect_vertebrae import detect_vertebrae
from io_dicom import read_dicom, save_dicom_sr, save_secondary_capture
from model_loader import (
get_classification_model,
get_detection_model,
get_projection_model,
)
from report_builder import build_scoliosis_description
from scoliosis_arcs import (
build_conclusion,
cobb_for_arc,
detect_arcs_from_dx,
merge_arcs,
split_arcs,
scoliosis_type,
)
from scoliosis_geometry import (
arc_centerline,
fit_spine_axis_dx,
fit_spine_axis_dx_endpoints,
fit_spine_axis_dx_local,
smooth_1d,
)
from vis import draw_arc_contours
CLASS_NAMES = {
0: "L1", 1: "L2", 2: "L3", 3: "L4", 4: "L5",
5: "T1", 6: "T2", 7: "T3", 8: "T4", 9: "T5",
10: "T6", 11: "T7", 12: "T8", 13: "T9", 14: "T10",
15: "T11", 16: "T12",
}
CONFIG = load_config()
POSTPROCESSING = CONFIG.postprocessing
INTERPOLATE_MISSING = CONFIG.interpolate_missing
def build_labelmap(class_names):
"""Делает список меток по индексам (0..max),
чтобы по cls_id получить имя позвонка."""
max_id = max(class_names)
return [class_names.get(i, f"Unknown_{i}") for i in range(max_id + 1)]
def vertebrae_from_detect_result(detect_result):
"""Преобразует результат детекции в список позвонков:
центр, коробка, угол, уверенность; сортирует по Y."""
vertebrae = []
for label, box in detect_result.items():
x_center, y_center, width, height, angle, conf = box
rect = (
(float(x_center), float(y_center)),
(float(width), float(height)),
float(angle),
)
pts = cv2.boxPoints(rect).astype(np.float32)
center = np.array([x_center, y_center], dtype=np.float32)
vertebrae.append({
"id": label,
"l": label,
"c": center,
"b": pts,
"ang": float(angle),
"conf": float(conf),
})
vertebrae.sort(key=lambda x: x["c"][1])
return vertebrae
def run_pipeline(img_bgr, vertebrae):
"""Основная логика анализа: строит оси/сигналы, находит дуги, считает Cobb,
делит дуги на структурные/минорные, формирует заключение и итоговые метрики."""
if img_bgr is None:
raise ValueError("img_bgr is None")
h, w = img_bgr.shape[:2]
dx_global = fit_spine_axis_dx(vertebrae)
dx_seg = smooth_1d(dx_global, iters=3, alpha=0.25)
dx_local = fit_spine_axis_dx_local(vertebrae, window=7)
dx_cobb = smooth_1d(dx_local, iters=3, alpha=0.25)
dx_ref = smooth_1d(fit_spine_axis_dx_endpoints(vertebrae), iters=3, alpha=0.25)
arcs = detect_arcs_from_dx(
dx_cobb,
eps_px=CONFIG.detect_eps_px,
min_len=CONFIG.detect_min_len,
zero_gap=CONFIG.detect_zero_gap,
edge_zero_extend=CONFIG.detect_edge_zero_extend,
)
arcs = merge_arcs(arcs, dx_ref, max_gap=CONFIG.merge_max_gap)
global_amp = float(np.max(np.abs(dx_seg)) + 1e-6)
arc_infos = []
for (s, e) in arcs:
info = cobb_for_arc(vertebrae, s, e, dx_cobb, dx_ref)
info["amp_rel"] = float(np.max(np.abs(dx_seg[s:e + 1])) / global_amp)
info["apex_margin"] = int(min(info["apex_idx"] - s, e - info["apex_idx"]))
info["centerline_pts"] = arc_centerline(vertebrae, info, smooth_window=5)
arc_infos.append(info)
n = len(vertebrae)
min_len_struct = CONFIG.min_len_struct_small if n <= 7 else CONFIG.min_len_struct_large
min_apex_margin_struct = CONFIG.min_apex_margin_small if n <= 7 else CONFIG.min_apex_margin_large
structural, minor = split_arcs(
arc_infos,
min_cobb_main=CONFIG.min_cobb_main,
min_cobb_second_abs=CONFIG.min_cobb_second_abs,
min_cobb_second_rel=CONFIG.min_cobb_second_rel,
min_len_struct=min_len_struct,
min_apex_margin_struct=min_apex_margin_struct,
)
conclusion = build_conclusion(arc_infos, structural=structural, minor=minor)
main = max(arc_infos, key=lambda x: x["cobb_deg"]) if arc_infos else None
degree_class = 0
prob_scoliosis = 0.0
if main:
cd = main["cobb_deg"]
if cd >= 50:
degree_class = 4
elif cd >= 26:
degree_class = 3
elif cd >= 11:
degree_class = 2
elif cd >= 1:
degree_class = 1
prob_scoliosis = min(1.0, cd / 50.0)
scol_type = "C-сколиоз"
if structural:
scol_type = scoliosis_type(structural)
else:
scol_type = "нет сколиоза"
return {
"img_bgr": img_bgr,
"h": h, "w": w,
"vertebrae": vertebrae,
"arc_infos": arc_infos,
"structural": structural,
"minor": minor,
"conclusion": conclusion,
"main_arc": main,
"degree_class": degree_class,
"scoliosis_type": scol_type,
"prob_scoliosis": prob_scoliosis,
}
def _split_if_stitched(img_bgr, cls_model):
"""Проверяет, “сшитый” ли снимок.
Если да — делит на левую/правую половины,
иначе возвращает исходный."""
pred, conf = classify_stitched(cls_model, img_bgr)
if pred != 1: # 0 = single
return [img_bgr], {"classification": pred, "classification_conf": conf}
h, w = img_bgr.shape[:2]
mid = w // 2
left = img_bgr[:, :mid].copy()
right = img_bgr[:, mid:].copy()
return [left, right], {"classification": pred, "classification_conf": conf}
def _select_frontal(images, proj_model):
"""Из списка изображений выбирает фронтальную проекцию;
если нет — берёт с максимальной уверенностью."""
best_img = images[0]
best_meta = {"projection": None, "projection_conf": None}
for img in images:
pred, conf = classify_projection(proj_model, img)
if pred == 1: # 1 = frontal
return img, {"projection": pred, "projection_conf": conf}
if best_meta["projection_conf"] is None or conf > best_meta["projection_conf"]:
best_img, best_meta = img, {"projection": pred, "projection_conf": conf}
return best_img, best_meta
def run_scoliosis_pipeline(infer_dicom_path, model=None, model_path=None, out_dir=None):
"""Полный пайплайн: читает DICOM, выбирает проекцию, детектит позвонки,
запускает run_pipeline, сохраняет SC и SR DICOM, возвращает пути и результат."""
model_path = model_path or CONFIG.models.detection
base_results_dir = out_dir or CONFIG.results_dir
image_stem = os.path.splitext(os.path.basename(infer_dicom_path))[0]
run_dir = os.path.join(base_results_dir, f"{image_stem}_result")
os.makedirs(run_dir, exist_ok=True)
det_model = model or get_detection_model(model_path=model_path)
cls_model = get_classification_model()
proj_model = get_projection_model()
img_bgr_full = read_dicom(infer_dicom_path)
split_imgs, cls_meta = _split_if_stitched(img_bgr_full, cls_model)
img_bgr, proj_meta = _select_frontal(split_imgs, proj_model)
print(f"classification pred={cls_meta['classification']} conf={cls_meta['classification_conf']:.3f}; "
f"projection pred={proj_meta['projection']} conf={proj_meta['projection_conf']:.3f}")
yolo_results = det_model(img_bgr, verbose=False)
labelmap = build_labelmap(CLASS_NAMES)
detect_result = detect_vertebrae(
img_bgr,
yolo_results,
labelmap,
enable_postprocessing=POSTPROCESSING,
interpolate_missing=INTERPOLATE_MISSING,
)
if not detect_result:
return {
"ok": False,
"reason": "no_detections",
}
vertebrae = vertebrae_from_detect_result(detect_result)
result = run_pipeline(img_bgr, vertebrae)
sc_img = draw_arc_contours(img_bgr, vertebrae, result["structural"])
sc_out_path = os.path.join(run_dir, f"sc_{image_stem}.dcm")
overlay_txt = f"prob={result.get('prob_scoliosis', 0.0):.2f} deg={result.get('degree_class', 0)} type={result.get('scoliosis_type', '')}"
save_secondary_capture(
infer_dicom_path,
sc_img,
sc_out_path,
series_description="RG-SCOLIOSIS-CONTOUR",
overlay_text=overlay_txt,
)
sr_desc = build_scoliosis_description(result["arc_infos"])
sr_out_path = os.path.join(run_dir, f"sr_{image_stem}.dcm")
main_cobb = result["main_arc"]["cobb_deg"] if result.get("main_arc") else 0.0
save_dicom_sr(
infer_dicom_path,
sr_desc,
result["conclusion"],
sr_out_path,
series_description="RG-SCOLIOSIS-SR",
extra_struct={
"degree_class": result.get("degree_class", 0),
"scoliosis_type": result.get("scoliosis_type", ""),
"main_cobb": main_cobb,
"prob_scoliosis": result.get("prob_scoliosis", 0.0),
},
)
return {
"ok": True,
"sr_dicom_path": sr_out_path,
"sc_dicom_path": sc_out_path,
"conclusion": result["conclusion"],
"arc_infos": result["arc_infos"],
"out_dir": run_dir,
}
if __name__ == "__main__":
MODEL_PATH = CONFIG.models.detection
INFER_IMG_PATH = os.environ.get("INFER_IMG_PATH", r"C:\Users\Роман Владимирович\Desktop\XR_SCOLIOS\XR_SCOLIOS\1.2.643.5.1.13.13.12.2.77.8252.00090213030609010011090905030205\1.2.643.5.1.13.13.12.2.77.8252.09051310090608150606061508100113\1.2.643.5.1.13.13.12.2.77.8252.03061304011101150513090811030403.dcm")
OUT_DIR = CONFIG.results_dir
if not INFER_IMG_PATH:
sys.exit(0)
model = get_detection_model(model_path=MODEL_PATH)
out = run_scoliosis_pipeline(INFER_IMG_PATH, model=model, out_dir=OUT_DIR)
if not out.get("ok"):
sys.exit(1)