commit 83dfbcc2385ab7307b991a944b83214de2f9e678 Author: Roman Date: Fri Jan 23 20:35:07 2026 +0500 Initial scoliosis pipeline (without models)\ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..734889d --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +����� �뢮�� ������ �� ��࠭ (ECHO) ����祭. +# Python +__pycache__/ +*.pyc + +results/ +outputs/ +tmp/ + +# OS +.DS_Store +Thumbs.db + +# IDE +.vscode/ +.idea/ + +# Models +models/*.pt +models/*.onnx +models/*.engine diff --git a/classification.py b/classification.py new file mode 100644 index 0000000..43ea5d8 --- /dev/null +++ b/classification.py @@ -0,0 +1,139 @@ +import os + +import cv2 +import torch +import torch.nn.functional as F +from PIL import Image, ImageOps +from torchvision import transforms +from torchvision.transforms import InterpolationMode + +STITCH_IMG_SIZE = 512 +STITCH_THRESHOLD = float(os.environ.get("STITCH_THRESHOLD", 0.8)) + +TRIM_THR = 30 +TRIM_BLACK_FRAC = 0.97 +TRIM_MAX_PX = 600 + +def pad_to_square(img: Image.Image, fill: int = 0) -> Image.Image: + """Дополненяет изображения полями до квадратного формата, чтобы не искажать при масштабировании.""" + w, h = img.size + if w == h: + return img + side = max(w, h) + pl = (side - w) // 2 + pr = side - w - pl + pt = (side - h) // 2 + pb = side - h - pt + return ImageOps.expand(img, border=(pl, pt, pr, pb), fill=fill) + +def trim_black_frame(img: Image.Image) -> Image.Image: + """Обрезает чёрную рамку по краям; если обрезка слишком агрессивная — возвращает оригинал.""" + g = img.convert("L") + w, h = g.size + px = g.load() + step_y = max(1, h // 180) + step_x = max(1, w // 180) + + def col_black_frac(x: int) -> float: + total = 0 + black = 0 + for y in range(0, h, step_y): + total += 1 + if px[x, y] <= TRIM_THR: + black += 1 + return black / max(1, total) + + def row_black_frac(y: int) -> float: + total = 0 + black = 0 + for x in range(0, w, step_x): + total += 1 + if px[x, y] <= TRIM_THR: + black += 1 + return black / max(1, total) + + left = 0 + for x in range(w): + if col_black_frac(x) < TRIM_BLACK_FRAC: + break + left += 1 + if left >= TRIM_MAX_PX: + break + + right = 0 + for x in range(w - 1, -1, -1): + if col_black_frac(x) < TRIM_BLACK_FRAC: + break + right += 1 + if right >= TRIM_MAX_PX: + break + + top = 0 + for y in range(h): + if row_black_frac(y) < TRIM_BLACK_FRAC: + break + top += 1 + if top >= TRIM_MAX_PX: + break + + bottom = 0 + for y in range(h - 1, -1, -1): + if row_black_frac(y) < TRIM_BLACK_FRAC: + break + bottom += 1 + if bottom >= TRIM_MAX_PX: + break + + x1 = min(left, w - 2) + y1 = min(top, h - 2) + x2 = max(x1 + 2, w - right) + y2 = max(y1 + 2, h - bottom) + + if x2 <= x1 + 1 or y2 <= y1 + 1: + return img + if (x2 - x1) < w * 0.35 or (y2 - y1) < h * 0.35: + return img + + return img.crop((x1, y1, x2, y2)) + +STITCH_TFM = transforms.Compose([ + transforms.Lambda(lambda arr: Image.fromarray(cv2.cvtColor(arr, cv2.COLOR_BGR2RGB))), + transforms.Lambda(trim_black_frame), + transforms.Lambda(pad_to_square), + transforms.Resize((STITCH_IMG_SIZE, STITCH_IMG_SIZE), interpolation=InterpolationMode.BILINEAR), + transforms.ToTensor(), + transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), +]) + +PROJ_TFM = transforms.Compose([ + transforms.Lambda(lambda arr: Image.fromarray(cv2.cvtColor(arr, cv2.COLOR_BGR2RGB))), + transforms.Resize((224, 224), interpolation=InterpolationMode.BILINEAR), + transforms.ToTensor(), + transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), +]) + +def classify_stitched(model, img_bgr): + """Модель распознает склеенное/одиночное изображение. Возвращает класс 0(склеенный)/1(одиночный) и уверенность""" + device = next(model.parameters()).device + x = STITCH_TFM(img_bgr).unsqueeze(0).to(device) + with torch.no_grad(): + logits = model(x) + if logits.shape[1] == 1 or (logits.ndim == 2 and logits.shape[-1] == 1): + prob = torch.sigmoid(logits)[0, 0].item() + pred = 1 if prob >= STITCH_THRESHOLD else 0 + conf = prob if pred == 1 else (1 - prob) + return pred, float(conf) + probs = F.softmax(logits, dim=1) + conf, pred = probs.max(dim=1) + return int(pred.item()), float(conf.item()) + +def classify_projection(model, img_bgr): + """Определяет проекцию 0(боковая)/1(фронтальная), возвращает класс и уверенность.""" + device = next(model.parameters()).device + x = PROJ_TFM(img_bgr).unsqueeze(0).to(device) + with torch.no_grad(): + logits = model(x) + probs = F.softmax(logits, dim=1) + conf, pred = probs.max(dim=1) + return int(pred.item()), float(conf.item()) + diff --git a/config.py b/config.py new file mode 100644 index 0000000..ee67382 --- /dev/null +++ b/config.py @@ -0,0 +1,78 @@ +import os +from dataclasses import dataclass + + +def _as_bool(value: str, default: bool) -> bool: + if value is None: + return default + return value.strip() not in {"0", "false", "False", ""} + + +def _env_path(key: str, default_path: str) -> str: + return os.path.abspath(os.environ.get(key, default_path)) + + +@dataclass(frozen=True) +class ModelPaths: + classification: str + projection: str + detection: str + + +@dataclass(frozen=True) +class AppConfig: + models: ModelPaths + postprocessing: bool + interpolate_missing: bool + results_dir: str + detect_eps_px: float + detect_min_len: int + detect_zero_gap: int + detect_edge_zero_extend: int + merge_max_gap: int + min_cobb_main: float + min_cobb_second_abs: float + min_cobb_second_rel: float + min_len_struct_small: int + min_len_struct_large: int + min_apex_margin_small: int + min_apex_margin_large: int + + +def load_config() -> AppConfig: + base_dir = os.path.dirname(__file__) + default_results = os.path.join(base_dir, "results") + + models = ModelPaths( + classification=_env_path( + "MODEL_CLASSIFICATION", + os.path.join(base_dir, "models", "classification.pt"), + ), + projection=_env_path( + "MODEL_PROJECTION", + os.path.join(base_dir, "models", "projection.pt"), + ), + detection=_env_path( + "MODEL_DETECTION", + os.path.join(base_dir, "models", "1_best_yolo8(L).pt"), + ), + ) + + return AppConfig( + models=models, + postprocessing=_as_bool(os.environ.get("POSTPROCESSING"), True), + interpolate_missing=_as_bool(os.environ.get("INTERPOLATE_MISSING"), False), + results_dir=_env_path("RESULTS_DIR", default_results), + detect_eps_px=float(os.environ.get("DETECT_EPS_PX", 1.0)), + detect_min_len=int(os.environ.get("DETECT_MIN_LEN", 2)), + detect_zero_gap=int(os.environ.get("DETECT_ZERO_GAP", 2)), + detect_edge_zero_extend=int(os.environ.get("DETECT_EDGE_ZERO_EXTEND", 3)), + merge_max_gap=int(os.environ.get("MERGE_MAX_GAP", 1)), + min_cobb_main=float(os.environ.get("MIN_COBB_MAIN", 1.0)), + min_cobb_second_abs=float(os.environ.get("MIN_COBB_SECOND_ABS", 5.0)), + min_cobb_second_rel=float(os.environ.get("MIN_COBB_SECOND_REL", 0.35)), + min_len_struct_small=int(os.environ.get("MIN_LEN_STRUCT_SMALL", 3)), + min_len_struct_large=int(os.environ.get("MIN_LEN_STRUCT_LARGE", 4)), + min_apex_margin_small=int(os.environ.get("MIN_APEX_MARGIN_SMALL", 1)), + min_apex_margin_large=int(os.environ.get("MIN_APEX_MARGIN_LARGE", 2)), + ) diff --git a/detect_vertebrae.py b/detect_vertebrae.py new file mode 100644 index 0000000..6e72abb --- /dev/null +++ b/detect_vertebrae.py @@ -0,0 +1,374 @@ +import os + +import cv2 +import numpy as np + + +def detect_vertebrae( + image, + model_results, + labelmap, + debug_save_path=None, + enable_postprocessing=True, + interpolate_missing=False, + spine_sequence=None, +): + """ + Detect vertebrae with optional postprocessing. + + Returns: + dict[label] = [x_center, y_center, width, height, angle, confidence] + """ + default_spine_sequence = [ + "T1", "T2", "T3", "T4", "T5", "T6", "T7", "T8", "T9", "T10", "T11", "T12", + "L1", "L2", "L3", "L4", "L5", + ] + + if spine_sequence is None: + spine_sequence = default_spine_sequence + + label_to_index = {label: idx for idx, label in enumerate(spine_sequence)} + + best_boxes = {} + + colors = [ + (0, 0, 255), # Red + (0, 255, 0), # Green + (255, 0, 0), # Blue + (0, 255, 255), # Yellow + (255, 0, 255), # Magenta + (255, 255, 0), # Cyan + (0, 165, 255), # Orange + (255, 20, 147), # Pink + (147, 20, 255), # Violet + (0, 215, 255), # Gold + (255, 215, 0), # Turquoise + (255, 105, 180), # Hot pink + (0, 255, 127), # Spring green + (255, 69, 0), # Orange red + (72, 61, 139), # Dark slate blue + (47, 255, 173), # Aquamarine + (255, 140, 0), # Dark orange + ] + + if model_results: + result = model_results[0] + if hasattr(result, "obb") and result.obb is not None and len(result.obb) > 0: + obb_data = result.obb + for i in range(len(obb_data)): + try: + conf = float(obb_data.conf[i].cpu().numpy()) + cls_id = int(obb_data.cls[i].cpu().numpy()) + + label = labelmap[cls_id] if cls_id < len(labelmap) else f"Unknown_{cls_id}" + + if hasattr(obb_data, "xyxyxyxy"): + box_points = obb_data.xyxyxyxy[i].cpu().numpy() + elif hasattr(obb_data, "xyxyxyxyn"): + box_points = obb_data.xyxyxyxyn[i].cpu().numpy() + h, w = image.shape[:2] + box_points = box_points.reshape(4, 2) + box_points[:, 0] *= w + box_points[:, 1] *= h + box_points = box_points.flatten() + else: + box_points = result.obb.xyxyxyxy[i].cpu().numpy() + + points = box_points.reshape(4, 2).astype(np.float32) + rect = cv2.minAreaRect(points) + center, size, angle = rect + + x_center, y_center = center + width, height = size + + if width < height: + angle += 90 + width, height = height, width + + result_box = np.array([x_center, y_center, width, height, angle, conf], dtype=np.float32) + + if label not in best_boxes or conf > best_boxes[label][-1]: + best_boxes[label] = result_box + except Exception as exc: + print(f"Box parse error: {exc}") + continue + + if enable_postprocessing and best_boxes: + boxes_list = [(label, box) for label, box in best_boxes.items()] + + boxes_list.sort(key=lambda x: x[1][-1], reverse=True) + kept_boxes = [] + for label, box in boxes_list: + x_center, y_center, width, height, _, _ = box + is_duplicate = False + + for kept_label, kept_box in kept_boxes: + xk, yk, wk, hk, _, _ = kept_box + dist = np.sqrt((x_center - xk) ** 2 + (y_center - yk) ** 2) + threshold = 0.3 * min(height, hk) + + if dist < threshold: + is_duplicate = True + print(f"Duplicate removed: {label} overlaps with {kept_label}") + break + + if not is_duplicate: + kept_boxes.append((label, box)) + + kept_boxes.sort(key=lambda x: x[1][1]) + + if kept_boxes: + valid_boxes = [] + last_index = -1 + last_y = -1 + y_gaps = [] + + for label, box in kept_boxes: + x_center, y_center, width, height, angle, conf = box + + if label not in label_to_index: + print(f"Unknown vertebra skipped: {label}") + continue + + current_index = label_to_index[label] + + if last_index == -1: + valid_boxes.append((label, box)) + last_index = current_index + last_y = y_center + continue + + expected_index = last_index + 1 + + if current_index == expected_index: + valid_boxes.append((label, box)) + y_gaps.append(y_center - last_y) + last_index = current_index + last_y = y_center + continue + + if current_index > expected_index: + valid_boxes.append((label, box)) + last_index = current_index + last_y = y_center + continue + + avg_gap = np.median(y_gaps) if y_gaps else 50.0 + expected_y = last_y + avg_gap + + y_diff = abs(y_center - expected_y) + if y_diff < avg_gap * 0.6 and expected_index < len(spine_sequence): + new_label = spine_sequence[expected_index] + new_box = box.copy() + new_box[5] = -1.0 + + valid_boxes.append((new_label, new_box)) + y_gaps.append(y_center - last_y) + print(f"Order corrected: {label} -> {new_label} (conf=-1)") + + last_index = expected_index + last_y = y_center + continue + + print(f"Out-of-order skipped: {label} after {valid_boxes[-1][0]}") + + if interpolate_missing and len(valid_boxes) > 1: + valid_boxes.sort(key=lambda x: x[1][1]) + + gaps = [] + for i in range(1, len(valid_boxes)): + prev_y = valid_boxes[i - 1][1][1] + curr_y = valid_boxes[i][1][1] + gaps.append(curr_y - prev_y) + + avg_gap = np.median(gaps) if gaps else 0 + max_allowed_gap = avg_gap * 1.8 if avg_gap > 0 else float("inf") + + reliable_angles = [] + reliable_ratios = [] + for _, box in valid_boxes: + _, _, width, height, angle, conf = box + if conf > 0: + norm_angle = angle % 180 + if norm_angle > 90: + norm_angle -= 180 + reliable_angles.append(norm_angle) + + aspect_ratio = width / max(height, 1) + reliable_ratios.append(aspect_ratio) + + median_angle = np.median(reliable_angles) if reliable_angles else 0.0 + median_ratio = np.median(reliable_ratios) if reliable_ratios else 2.0 + + new_boxes = [] + for i in range(len(valid_boxes)): + current_label, current_box = valid_boxes[i] + current_idx = label_to_index[current_label] + new_boxes.append((current_label, current_box)) + + if i < len(valid_boxes) - 1: + next_label, next_box = valid_boxes[i + 1] + next_idx = label_to_index[next_label] + + y_gap = next_box[1] - current_box[1] + index_gap = next_idx - current_idx + + if index_gap > 1 and y_gap > max_allowed_gap * 0.7: + num_missing = index_gap - 1 + print(f"Missing between {current_label} and {next_label}: {num_missing}") + + x1, y1, w1, h1, ang1, _ = current_box + x2, y2, w2, h2, ang2, _ = next_box + + for k in range(1, num_missing + 1): + missing_idx = current_idx + k + if missing_idx >= len(spine_sequence): + continue + + missing_label = spine_sequence[missing_idx] + fraction = k / index_gap + + x_center = (x1 + x2) / 2 + y_center = y1 + fraction * (y2 - y1) + + avg_width = (w1 + w2) / 2 + avg_height = (h1 + h2) / 2 + + if median_ratio > 1: + width = avg_width + height = width / median_ratio + else: + height = avg_height + width = height * median_ratio + + norm_ang1 = ang1 % 180 + norm_ang2 = ang2 % 180 + + if abs(norm_ang1 - norm_ang2) > 90: + if norm_ang1 > norm_ang2: + norm_ang1 -= 180 + else: + norm_ang2 -= 180 + + angle = norm_ang1 + fraction * (norm_ang2 - norm_ang1) + angle = (angle + 90) % 180 - 90 + + if reliable_angles: + angle = 0.7 * angle + 0.3 * median_angle + + if height > width: + width, height = height, width + angle = (angle + 90) % 180 + + conf = -0.5 + + interpolated_box = np.array( + [x_center, y_center, width, height, angle, conf], + dtype=np.float32, + ) + + print( + "Interpolated: " + f"{missing_label} x={x_center:.1f} y={y_center:.1f} " + f"w={width:.1f} h={height:.1f} angle={angle:.2f}" + ) + new_boxes.append((missing_label, interpolated_box)) + + new_boxes.sort(key=lambda x: x[1][1]) + valid_boxes = new_boxes + + best_boxes = {label: box for label, box in valid_boxes} + + if debug_save_path: + if len(image.shape) == 2: + vis_image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) + elif image.shape[2] == 3: + vis_image = image.copy() + else: + vis_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + + for idx, (label, box) in enumerate(best_boxes.items()): + try: + x_center, y_center, width, height, angle, conf = box + + color = colors[idx % len(colors)] + + is_corrected = conf < 0 + thickness = 4 if is_corrected else 3 + alpha = 0.35 if is_corrected else 0.2 + + rect = ((x_center, y_center), (width, height), angle) + box_points = cv2.boxPoints(rect) + box_points = np.int64(box_points) + + contour_color = (0, 0, 255) if is_corrected else color + cv2.drawContours(vis_image, [box_points], 0, contour_color, thickness) + + overlay = vis_image.copy() + fill_color = (0, 0, 255) if is_corrected else color + cv2.fillPoly(overlay, [box_points], fill_color) + cv2.addWeighted(overlay, alpha, vis_image, 1 - alpha, 0, vis_image) + + if is_corrected: + text = f"{label} CV({conf:.1f})" + else: + text = f"{label} ({conf:.3f})" + + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 0.6 + thickness = 2 + + (text_width, text_height), _ = cv2.getTextSize(text, font, font_scale, thickness) + text_x = int(x_center - text_width / 2) + text_y = int(y_center + text_height / 2) + + padding = 5 + bg_rect = [ + (text_x - padding, text_y - text_height - padding), + (text_x + text_width + padding, text_y + padding), + ] + + text_overlay = vis_image.copy() + bg_color = (0, 0, 100) if is_corrected else (0, 0, 0) + cv2.rectangle(text_overlay, bg_rect[0], bg_rect[1], bg_color, -1) + + text_alpha = 0.7 + cv2.addWeighted(text_overlay, text_alpha, vis_image, 1 - text_alpha, 0, vis_image) + + text_color = (255, 255, 255) if not is_corrected else (255, 200, 200) + cv2.putText( + vis_image, + text, + (text_x, text_y), + font, + font_scale, + text_color, + thickness, + ) + + center_color = (0, 0, 255) if is_corrected else (255, 255, 255) + cv2.circle(vis_image, (int(x_center), int(y_center)), 5, center_color, -1) + cv2.circle(vis_image, (int(x_center), int(y_center)), 2, (0, 0, 0), -1) + except Exception as exc: + print(f"Draw error for {label}: {exc}") + continue + + dir_path = os.path.dirname(debug_save_path) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + + try: + if len(vis_image.shape) == 2: + vis_image = cv2.cvtColor(vis_image, cv2.COLOR_GRAY2BGR) + + cv2.imwrite(debug_save_path, vis_image, [cv2.IMWRITE_JPEG_QUALITY, 95]) + print(f"Debug saved: {debug_save_path}") + print(f"Detections: {len(best_boxes)}") + + corrected_count = sum(1 for box in best_boxes.values() if box[5] < 0) + if corrected_count > 0: + print(f"Corrected order: {corrected_count}") + except Exception as exc: + print(f"Debug save error: {exc}") + + return {label: box.astype(float).tolist() for label, box in best_boxes.items()} diff --git a/io_dicom.py b/io_dicom.py new file mode 100644 index 0000000..58aa3ea --- /dev/null +++ b/io_dicom.py @@ -0,0 +1,191 @@ +import os +from typing import Optional + +import cv2 +import numpy as np +import pydicom + +def read_dicom(path: str) -> np.ndarray: + ds = pydicom.dcmread(path) + img = ds.pixel_array + + if getattr(ds, "PhotometricInterpretation", "") == "MONOCHROME1": + img = np.max(img) - img + + img = img.astype(np.float32) + img -= img.min() + if img.max() > 0: + img /= img.max() + img = (img * 255.0).clip(0, 255).astype(np.uint8) + + if img.ndim == 2: + img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) + elif img.shape[2] == 3: + img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + + return img + + +def save_secondary_capture( + original_dicom_path: str, + img_bgr: np.ndarray, + out_path: str, + series_description: str = "RG-SCOLIOSIS-CONTOUR", + overlay_text: Optional[str] = None, +): + import pydicom + from pydicom.uid import generate_uid + + ds = pydicom.dcmread(original_dicom_path) + + img_draw = img_bgr.copy() + if overlay_text: + cv2.putText( + img_draw, + overlay_text, + (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, + 1.0, + (0, 255, 0), + 2, + ) + + img_rgb = cv2.cvtColor(img_draw, cv2.COLOR_BGR2RGB) + rows, cols = img_rgb.shape[:2] + + ds.SOPClassUID = "1.2.840.10008.5.1.4.1.1.7" + ds.SOPInstanceUID = generate_uid() + ds.SeriesInstanceUID = generate_uid() + ds.Modality = "OT" + ds.SeriesDescription = series_description + + if "SeriesNumber" in ds: + try: + ds.SeriesNumber = int(ds.SeriesNumber) + 1000 + except Exception: + ds.SeriesNumber = 1000 + else: + ds.SeriesNumber = 1000 + + ds.Rows = rows + ds.Columns = cols + ds.SamplesPerPixel = 3 + ds.PhotometricInterpretation = "RGB" + ds.PlanarConfiguration = 0 + ds.BitsAllocated = 8 + ds.BitsStored = 8 + ds.HighBit = 7 + ds.PixelRepresentation = 0 + ds.PixelData = img_rgb.tobytes() + + os.makedirs(os.path.dirname(out_path), exist_ok=True) + ds.save_as(out_path, write_like_original=False) + + +def save_dicom_sr( + original_dicom_path: str, + description_text: str, + conclusion_text: str, + out_path: str, + series_description: str = "SCOLIOSIS_SR", + extra_struct: Optional[dict] = None, +): + import datetime as _dt + import pydicom + from pydicom.dataset import Dataset, FileDataset + from pydicom.uid import ExplicitVRLittleEndian, generate_uid + + src = pydicom.dcmread(original_dicom_path) + + file_meta = Dataset() + file_meta.MediaStorageSOPClassUID = "1.2.840.10008.5.1.4.1.1.88.11" + file_meta.MediaStorageSOPInstanceUID = generate_uid() + file_meta.TransferSyntaxUID = ExplicitVRLittleEndian + file_meta.ImplementationClassUID = generate_uid() + + ds = FileDataset(out_path, {}, file_meta=file_meta, preamble=b"\0" * 128) + + for tag in ( + "PatientName", + "PatientID", + "PatientBirthDate", + "PatientSex", + "StudyInstanceUID", + "StudyID", + "AccessionNumber", + "StudyDate", + "StudyTime", + ): + if tag in src: + ds[tag] = src[tag] + + ds.SOPClassUID = file_meta.MediaStorageSOPClassUID + ds.SOPInstanceUID = file_meta.MediaStorageSOPInstanceUID + ds.SeriesInstanceUID = generate_uid() + ds.Modality = "SR" + ds.SeriesDescription = series_description + ds.SeriesNumber = 2000 + ds.InstanceNumber = 1 + + if "SpecificCharacterSet" in src: + ds.SpecificCharacterSet = src.SpecificCharacterSet + else: + ds.SpecificCharacterSet = "ISO_IR 192" + + now = _dt.datetime.now() + ds.ContentDate = now.strftime("%Y%m%d") + ds.ContentTime = now.strftime("%H%M%S") + + root = Dataset() + root.ValueType = "CONTAINER" + root.ContinuityOfContent = "SEPARATE" + root.ConceptNameCodeSequence = [Dataset()] + root.ConceptNameCodeSequence[0].CodeValue = "11528-7" + root.ConceptNameCodeSequence[0].CodingSchemeDesignator = "LN" + root.ConceptNameCodeSequence[0].CodeMeaning = "Imaging Report" + + desc_item = Dataset() + desc_item.ValueType = "TEXT" + desc_item.ConceptNameCodeSequence = [Dataset()] + desc_item.ConceptNameCodeSequence[0].CodeValue = "121070" + desc_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = "DCM" + desc_item.ConceptNameCodeSequence[0].CodeMeaning = "Findings" + desc_item.TextValue = str(description_text) + + concl_item = Dataset() + concl_item.ValueType = "TEXT" + concl_item.ConceptNameCodeSequence = [Dataset()] + concl_item.ConceptNameCodeSequence[0].CodeValue = "121106" + concl_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = "DCM" + concl_item.ConceptNameCodeSequence[0].CodeMeaning = "Summary" + concl_item.TextValue = str(conclusion_text) + + content_items = [desc_item, concl_item] + + if extra_struct: + def _text_item(code, meaning, value, scheme="DCM"): + item = Dataset() + item.ValueType = "TEXT" + item.ConceptNameCodeSequence = [Dataset()] + item.ConceptNameCodeSequence[0].CodeValue = code + item.ConceptNameCodeSequence[0].CodingSchemeDesignator = scheme + item.ConceptNameCodeSequence[0].CodeMeaning = meaning + item.TextValue = str(value) + return item + + if "degree_class" in extra_struct: + content_items.append(_text_item("SCL-DEGREE", "Scoliosis degree class (1-4, 0=none)", extra_struct["degree_class"])) + if "scoliosis_type" in extra_struct: + content_items.append(_text_item("SCL-TYPE", "Scoliosis type", extra_struct["scoliosis_type"])) + if "main_cobb" in extra_struct: + content_items.append(_text_item("SCL-MAIN-COBB", "Main Cobb angle, deg", f"{extra_struct['main_cobb']:.1f}")) + if "prob_scoliosis" in extra_struct: + content_items.append(_text_item("SCL-PROB", "Probability of scoliosis (0-1)", f"{extra_struct['prob_scoliosis']:.2f}")) + + root.ContentSequence = content_items + ds.ContentSequence = [root] + ds.CompletionFlag = "COMPLETE" + ds.VerificationFlag = "UNVERIFIED" + + os.makedirs(os.path.dirname(out_path), exist_ok=True) + ds.save_as(out_path, write_like_original=False) diff --git a/model_loader.py b/model_loader.py new file mode 100644 index 0000000..ad13dd6 --- /dev/null +++ b/model_loader.py @@ -0,0 +1,88 @@ +import os +import threading +from typing import Optional + +import torch +import torch.nn as nn +from torchvision import models +from ultralytics import YOLO + +from config import load_config + +_LOCK = threading.Lock() +_CACHE = {} +_CFG = load_config() + +def _cache_key(kind: str, path: str, device: Optional[str]) -> str: + return f"{kind}|{os.path.abspath(path)}|{device or 'auto'}" + +def _load_shufflenet(path: str, device: Optional[str]): + """Загружает shufflenet‑классификатор из state_dict""" + sd = torch.load(path, map_location=device or "cpu") + num_classes = sd["fc.weight"].shape[0] + candidates = [ + models.shufflenet_v2_x0_5(weights=None), + models.shufflenet_v2_x1_0(weights=None), + ] + model = None + for m in candidates: + m.fc = nn.Linear(m.fc.in_features, num_classes) + try: + m.load_state_dict(sd, strict=True) + model = m + break + except Exception: + continue + if model is None: + raise RuntimeError(f"Cannot load state_dict from {path} into supported shufflenet variants") + model.eval() + if device: + model.to(device) + return model + +def _load_mobilenet_v3_large(path: str, device: Optional[str]): + """Загружает mobilenet_v3_large из чекпойнта, настраивает число классов, переносит на device, ставит eval()""" + ckpt = torch.load(path, map_location=device or "cpu") + sd = ckpt.get("model_state", ckpt) + num_classes = sd["classifier.3.weight"].shape[0] + model = models.mobilenet_v3_large(weights=None) + model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes) + model.load_state_dict(sd, strict=True) + model.eval() + if device: + model.to(device) + return model + +def get_detection_model(device: Optional[str] = None, model_path: Optional[str] = None): + """Получает YOLO‑детектор: берёт путь (из аргумента или конфига), достаёт из кеша или загружает и кэширует""" + path = model_path or _CFG.models.detection + key = _cache_key("detection", path, device) + with _LOCK: + if key not in _CACHE: + model = YOLO(path) + if device: + try: + model.to(device) + except Exception: + if hasattr(model, "model"): + model.model.to(device) + _CACHE[key] = model + return _CACHE[key] + +def get_classification_model(device: Optional[str] = None, model_path: Optional[str] = None): + """Получает классификатор stitched/single (mobilenet): кеш + загрузка при первом вызове""" + path = model_path or _CFG.models.classification + key = _cache_key("classification", path, device) + with _LOCK: + if key not in _CACHE: + _CACHE[key] = _load_mobilenet_v3_large(path, device) + return _CACHE[key] + +def get_projection_model(device: Optional[str] = None, model_path: Optional[str] = None): + """Получает классификатор проекции (shufflenet): кеш + загрузка при первом вызове""" + path = model_path or _CFG.models.projection + key = _cache_key("projection", path, device) + with _LOCK: + if key not in _CACHE: + _CACHE[key] = _load_shufflenet(path, device) + return _CACHE[key] diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000..df6f2ba --- /dev/null +++ b/pipeline.py @@ -0,0 +1,274 @@ +import os +import sys + +import cv2 +import numpy as np + +from classification import classify_projection, classify_stitched +from config import load_config +from detect_vertebrae import detect_vertebrae +from io_dicom import read_dicom, save_dicom_sr, save_secondary_capture +from model_loader import ( + get_classification_model, + get_detection_model, + get_projection_model, +) +from report_builder import build_scoliosis_description +from scoliosis_arcs import ( + build_conclusion, + cobb_for_arc, + detect_arcs_from_dx, + merge_arcs, + split_arcs, + scoliosis_type, +) +from scoliosis_geometry import ( + arc_centerline, + fit_spine_axis_dx, + fit_spine_axis_dx_endpoints, + fit_spine_axis_dx_local, + smooth_1d, +) +from vis import draw_arc_contours + +CLASS_NAMES = { + 0: "L1", 1: "L2", 2: "L3", 3: "L4", 4: "L5", + 5: "T1", 6: "T2", 7: "T3", 8: "T4", 9: "T5", + 10: "T6", 11: "T7", 12: "T8", 13: "T9", 14: "T10", + 15: "T11", 16: "T12", +} + +CONFIG = load_config() +POSTPROCESSING = CONFIG.postprocessing +INTERPOLATE_MISSING = CONFIG.interpolate_missing + +def build_labelmap(class_names): + """Делает список меток по индексам (0..max), + чтобы по cls_id получить имя позвонка.""" + max_id = max(class_names) + return [class_names.get(i, f"Unknown_{i}") for i in range(max_id + 1)] + +def vertebrae_from_detect_result(detect_result): + """Преобразует результат детекции в список позвонков: + центр, коробка, угол, уверенность; сортирует по Y.""" + vertebrae = [] + for label, box in detect_result.items(): + x_center, y_center, width, height, angle, conf = box + rect = ( + (float(x_center), float(y_center)), + (float(width), float(height)), + float(angle), + ) + pts = cv2.boxPoints(rect).astype(np.float32) + center = np.array([x_center, y_center], dtype=np.float32) + vertebrae.append({ + "id": label, + "l": label, + "c": center, + "b": pts, + "ang": float(angle), + "conf": float(conf), + }) + vertebrae.sort(key=lambda x: x["c"][1]) + return vertebrae + +def run_pipeline(img_bgr, vertebrae): + """Основная логика анализа: строит оси/сигналы, находит дуги, считает Cobb, + делит дуги на структурные/минорные, формирует заключение и итоговые метрики.""" + if img_bgr is None: + raise ValueError("img_bgr is None") + + h, w = img_bgr.shape[:2] + + dx_global = fit_spine_axis_dx(vertebrae) + dx_seg = smooth_1d(dx_global, iters=3, alpha=0.25) + + dx_local = fit_spine_axis_dx_local(vertebrae, window=7) + dx_cobb = smooth_1d(dx_local, iters=3, alpha=0.25) + + dx_ref = smooth_1d(fit_spine_axis_dx_endpoints(vertebrae), iters=3, alpha=0.25) + + arcs = detect_arcs_from_dx( + dx_cobb, + eps_px=CONFIG.detect_eps_px, + min_len=CONFIG.detect_min_len, + zero_gap=CONFIG.detect_zero_gap, + edge_zero_extend=CONFIG.detect_edge_zero_extend, + ) + arcs = merge_arcs(arcs, dx_ref, max_gap=CONFIG.merge_max_gap) + + global_amp = float(np.max(np.abs(dx_seg)) + 1e-6) + + arc_infos = [] + for (s, e) in arcs: + info = cobb_for_arc(vertebrae, s, e, dx_cobb, dx_ref) + info["amp_rel"] = float(np.max(np.abs(dx_seg[s:e + 1])) / global_amp) + info["apex_margin"] = int(min(info["apex_idx"] - s, e - info["apex_idx"])) + info["centerline_pts"] = arc_centerline(vertebrae, info, smooth_window=5) + arc_infos.append(info) + + n = len(vertebrae) + min_len_struct = CONFIG.min_len_struct_small if n <= 7 else CONFIG.min_len_struct_large + min_apex_margin_struct = CONFIG.min_apex_margin_small if n <= 7 else CONFIG.min_apex_margin_large + + structural, minor = split_arcs( + arc_infos, + min_cobb_main=CONFIG.min_cobb_main, + min_cobb_second_abs=CONFIG.min_cobb_second_abs, + min_cobb_second_rel=CONFIG.min_cobb_second_rel, + min_len_struct=min_len_struct, + min_apex_margin_struct=min_apex_margin_struct, + ) + + conclusion = build_conclusion(arc_infos, structural=structural, minor=minor) + + main = max(arc_infos, key=lambda x: x["cobb_deg"]) if arc_infos else None + degree_class = 0 + prob_scoliosis = 0.0 + if main: + cd = main["cobb_deg"] + if cd >= 50: + degree_class = 4 + elif cd >= 26: + degree_class = 3 + elif cd >= 11: + degree_class = 2 + elif cd >= 1: + degree_class = 1 + prob_scoliosis = min(1.0, cd / 50.0) + + scol_type = "C-сколиоз" + if structural: + scol_type = scoliosis_type(structural) + else: + scol_type = "нет сколиоза" + + return { + "img_bgr": img_bgr, + "h": h, "w": w, + "vertebrae": vertebrae, + "arc_infos": arc_infos, + "structural": structural, + "minor": minor, + "conclusion": conclusion, + "main_arc": main, + "degree_class": degree_class, + "scoliosis_type": scol_type, + "prob_scoliosis": prob_scoliosis, + } + +def _split_if_stitched(img_bgr, cls_model): + """Проверяет, “сшитый” ли снимок. + Если да — делит на левую/правую половины, + иначе возвращает исходный.""" + pred, conf = classify_stitched(cls_model, img_bgr) + if pred != 1: # 0 = single + return [img_bgr], {"classification": pred, "classification_conf": conf} + h, w = img_bgr.shape[:2] + mid = w // 2 + left = img_bgr[:, :mid].copy() + right = img_bgr[:, mid:].copy() + return [left, right], {"classification": pred, "classification_conf": conf} + +def _select_frontal(images, proj_model): + """Из списка изображений выбирает фронтальную проекцию; + если нет — берёт с максимальной уверенностью.""" + best_img = images[0] + best_meta = {"projection": None, "projection_conf": None} + for img in images: + pred, conf = classify_projection(proj_model, img) + if pred == 1: # 1 = frontal + return img, {"projection": pred, "projection_conf": conf} + if best_meta["projection_conf"] is None or conf > best_meta["projection_conf"]: + best_img, best_meta = img, {"projection": pred, "projection_conf": conf} + return best_img, best_meta + +def run_scoliosis_pipeline(infer_dicom_path, model=None, model_path=None, out_dir=None): + """Полный пайплайн: читает DICOM, выбирает проекцию, детектит позвонки, + запускает run_pipeline, сохраняет SC и SR DICOM, возвращает пути и результат.""" + model_path = model_path or CONFIG.models.detection + base_results_dir = out_dir or CONFIG.results_dir + image_stem = os.path.splitext(os.path.basename(infer_dicom_path))[0] + run_dir = os.path.join(base_results_dir, f"{image_stem}_result") + os.makedirs(run_dir, exist_ok=True) + + det_model = model or get_detection_model(model_path=model_path) + cls_model = get_classification_model() + proj_model = get_projection_model() + + img_bgr_full = read_dicom(infer_dicom_path) + split_imgs, cls_meta = _split_if_stitched(img_bgr_full, cls_model) + img_bgr, proj_meta = _select_frontal(split_imgs, proj_model) + print(f"classification pred={cls_meta['classification']} conf={cls_meta['classification_conf']:.3f}; " + f"projection pred={proj_meta['projection']} conf={proj_meta['projection_conf']:.3f}") + + yolo_results = det_model(img_bgr, verbose=False) + + labelmap = build_labelmap(CLASS_NAMES) + detect_result = detect_vertebrae( + img_bgr, + yolo_results, + labelmap, + enable_postprocessing=POSTPROCESSING, + interpolate_missing=INTERPOLATE_MISSING, + ) + + if not detect_result: + return { + "ok": False, + "reason": "no_detections", + } + + vertebrae = vertebrae_from_detect_result(detect_result) + result = run_pipeline(img_bgr, vertebrae) + + sc_img = draw_arc_contours(img_bgr, vertebrae, result["structural"]) + sc_out_path = os.path.join(run_dir, f"sc_{image_stem}.dcm") + overlay_txt = f"prob={result.get('prob_scoliosis', 0.0):.2f} deg={result.get('degree_class', 0)} type={result.get('scoliosis_type', '')}" + save_secondary_capture( + infer_dicom_path, + sc_img, + sc_out_path, + series_description="RG-SCOLIOSIS-CONTOUR", + overlay_text=overlay_txt, + ) + + sr_desc = build_scoliosis_description(result["arc_infos"]) + sr_out_path = os.path.join(run_dir, f"sr_{image_stem}.dcm") + main_cobb = result["main_arc"]["cobb_deg"] if result.get("main_arc") else 0.0 + save_dicom_sr( + infer_dicom_path, + sr_desc, + result["conclusion"], + sr_out_path, + series_description="RG-SCOLIOSIS-SR", + extra_struct={ + "degree_class": result.get("degree_class", 0), + "scoliosis_type": result.get("scoliosis_type", ""), + "main_cobb": main_cobb, + "prob_scoliosis": result.get("prob_scoliosis", 0.0), + }, + ) + + return { + "ok": True, + "sr_dicom_path": sr_out_path, + "sc_dicom_path": sc_out_path, + "conclusion": result["conclusion"], + "arc_infos": result["arc_infos"], + "out_dir": run_dir, + } + +if __name__ == "__main__": + MODEL_PATH = CONFIG.models.detection + INFER_IMG_PATH = os.environ.get("INFER_IMG_PATH", r"C:\Users\Роман Владимирович\Desktop\XR_SCOLIOS\XR_SCOLIOS\1.2.643.5.1.13.13.12.2.77.8252.00090213030609010011090905030205\1.2.643.5.1.13.13.12.2.77.8252.09051310090608150606061508100113\1.2.643.5.1.13.13.12.2.77.8252.03061304011101150513090811030403.dcm") + OUT_DIR = CONFIG.results_dir + + if not INFER_IMG_PATH: + sys.exit(0) + + model = get_detection_model(model_path=MODEL_PATH) + out = run_scoliosis_pipeline(INFER_IMG_PATH, model=model, out_dir=OUT_DIR) + + if not out.get("ok"): + sys.exit(1) diff --git a/report_builder.py b/report_builder.py new file mode 100644 index 0000000..9e7e3ba --- /dev/null +++ b/report_builder.py @@ -0,0 +1,13 @@ +from scoliosis_arcs import chaklin_degree + +def build_scoliosis_description(arc_infos): + if not arc_infos: + return "Признаков сколиоза не выявлено." + lines = [] + for i, a in enumerate(sorted(arc_infos, key=lambda x: x["cobb_deg"], reverse=True), start=1): + degree = chaklin_degree(a["cobb_deg"]) + lines.append( + f"Дуга {i}: {a['direction']}, уровни {a['upper_end']}-{a['lower_end']}, " + f"вершина ~ {a['apex_label']}, Cobb={a['cobb_deg']:.1f}°, степень={degree}." + ) + return " ".join(lines) diff --git a/scoliosis_arcs.py b/scoliosis_arcs.py new file mode 100644 index 0000000..dfb46ac --- /dev/null +++ b/scoliosis_arcs.py @@ -0,0 +1,311 @@ +import numpy as np + +from scoliosis_geometry import angle_diff_deg + +def fill_zero_gaps(sign: np.ndarray, max_gap: int) -> np.ndarray: + if max_gap <= 0: + return sign + sign = sign.copy() + n = len(sign) + i = 0 + while i < n: + if sign[i] != 0: + i += 1 + continue + j = i + while j < n and sign[j] == 0: + j += 1 + left = sign[i - 1] if i > 0 else 0 + right = sign[j] if j < n else 0 + if (j - i) <= max_gap and left != 0 and left == right: + sign[i:j] = left + i = j + return sign + +def runs_of_same_sign(sign: np.ndarray) -> list[tuple[int, int, int]]: + "Находит непрерывные куски одинакового знака (+1 подряд, -1 подряд)" + n = len(sign) + out = [] + i = 0 + while i < n: + if sign[i] == 0: + i += 1 + continue + sgn = int(sign[i]) + s = i + while i < n and sign[i] == sgn: + i += 1 + e = i - 1 + out.append((sgn, s, e)) + return out + +def extend_through_zeros(sign: np.ndarray, s: int, e: int, max_extend: int) -> tuple[int, int]: + """Если дуга заканчивается рядом с нулями, + функция слегка "расширяет" дугу в нули, + чтобы включить край (где dx почти 0)""" + n = len(sign) + k = 0 + while k < max_extend and s > 0 and sign[s - 1] == 0: + s -= 1 + k += 1 + k = 0 + while k < max_extend and e < n - 1 and sign[e + 1] == 0: + e += 1 + k += 1 + return s, e + +def dx_to_sign(dx: np.ndarray, eps: float) -> np.ndarray: + return np.where(dx > eps, 1, np.where(dx < -eps, -1, 0)).astype(np.int8) + +def compute_eps(dx: np.ndarray, eps_px: float, eps_rel: float = 0.01) -> float: + maxabs = float(np.max(np.abs(dx)) + 1e-6) + return max(float(eps_px), float(eps_rel) * maxabs) + +def detect_arcs_from_dx(dx, eps_px=2.0, min_len=3, zero_gap=1, edge_zero_extend=2): + "Основная детекция дуг" + dx = np.asarray(dx, dtype=np.float32) + if len(dx) < 2: + return [] + + eps = compute_eps(dx, eps_px, eps_rel=0.01) + sign = dx_to_sign(dx, eps) + sign = fill_zero_gaps(sign, zero_gap) + + arcs = [] + for sgn, s, e in runs_of_same_sign(sign): + if (e - s + 1) >= int(min_len): + s, e = extend_through_zeros(sign, s, e, edge_zero_extend) + arcs.append((s, e)) + return arcs + +def arc_sign(signal: np.ndarray, s: int, e: int) -> int: + """Определяет знак дуги по сигналу на участке (берёт медиану). + Нужно чтобы знак не ломался из-за шума""" + return 1 if float(np.median(signal[s:e + 1])) > 0 else -1 + +def merge_arcs(arcs, sign_signal, max_gap=0): + """Сливает дуги, если: + - знак одинаковый + - и они либо пересекаются, либо рядом (max_gap позволяет "почти рядом") + Это финальная "склейка дуг", чтобы не было дробления""" + if not arcs: + return [] + + arcs = sorted(arcs, key=lambda t: (t[0], t[1])) + out = [[arcs[0][0], arcs[0][1], arc_sign(sign_signal, arcs[0][0], arcs[0][1])]] + + for s, e in arcs[1:]: + sgn = arc_sign(sign_signal, s, e) + ps, pe, psgn = out[-1] + + if sgn == psgn and s <= pe + max_gap: + out[-1][1] = max(pe, e) + else: + out.append([s, e, sgn]) + + return [(s, e) for s, e, _ in out] + +def cobb_for_arc(vertebrae, s, e, dx_cobb, dx_ref_sign): + idxs = np.arange(s, e + 1) + apex_i = int(idxs[np.argmax(np.abs(dx_cobb[s:e + 1]))]) + + upper_candidates = range(s, apex_i + 1) + lower_candidates = range(apex_i, e + 1) + + best_cobb = -1.0 + best_pair = (s, e) + + for ui in upper_candidates: + ta = vertebrae[ui]["ang"] + for li in lower_candidates: + c = float(angle_diff_deg(ta, vertebrae[li]["ang"])) + if c > best_cobb: + best_cobb = c + best_pair = (ui, li) + + upper_idx, lower_idx = best_pair + + side_sign = arc_sign(dx_ref_sign, s, e) + direction = "левосторонний" if side_sign > 0 else "правосторонний" + + return { + "start_idx": int(s), "end_idx": int(e), + "apex_idx": int(apex_i), "apex_label": vertebrae[apex_i]["l"], + "direction": direction, + "side_sign": int(side_sign), + "cobb_deg": float(best_cobb), + "upper_end": vertebrae[upper_idx]["l"], + "lower_end": vertebrae[lower_idx]["l"], + "upper_idx": int(upper_idx), + "lower_idx": int(lower_idx), + } + +def arc_len(a): + "Длина дуги в позвонках: end-start+1" + return a["end_idx"] - a["start_idx"] + 1 + +def is_force(a, force_cobb, force_len): + """"Форс-правило": если дуга очень большая по Cobb и не слишком короткая + - считаем её структурной, даже если остальные критерии спорные.""" + return (a["cobb_deg"] >= force_cobb) and (arc_len(a) >= force_len) + +def dedupe_by_range(arcs): + "Функция удаляет дубликаты по (start_idx, end_idx)" + seen = set() + out = [] + for a in arcs: + key = (a["start_idx"], a["end_idx"]) + if key not in seen: + out.append(a) + seen.add(key) + return out + +def split_arcs( + arc_infos, + # 1) Главная дуга всегда structural, если вообще есть "сколиоз" + min_cobb_main=1.0, + + # 2) Вторая (для S-типа) должна быть не "шумом" + min_cobb_second_abs=5.0, + min_cobb_second_rel=0.35, + + min_len_struct=4, + min_len_minor=3, + min_apex_margin_struct=2, + min_amp_rel_struct=0.25, + + min_cobb_minor=3.0, + + force_struct_cobb=40.0, + force_struct_min_len=3 +): + """ + Разделяет все дуги на: + - structural (важные, формируют тип C/S) + - minor (дополнительные) + """ + if not arc_infos: + return [], [] + + main = max(arc_infos, key=lambda a: a["cobb_deg"]) + + def passes_geom(a): + amp_rel = a.get("amp_rel") + if amp_rel is None: + return False + + apex_margin = int(a.get("apex_margin", 999)) + + return ( + (arc_len(a) >= min_len_struct) and + (apex_margin >= min_apex_margin_struct) and + (amp_rel >= min_amp_rel_struct) + ) + + structural, minor = [], [] + + main_is_force = (main["cobb_deg"] >= force_struct_cobb) and (arc_len(main) >= force_struct_min_len) + + if main_is_force or (main["cobb_deg"] >= min_cobb_main): + structural.append(main) + + second_thr = max(min_cobb_second_abs, min_cobb_second_rel * float(main["cobb_deg"])) + main_sign = int(main.get("side_sign", 0)) + + for a in arc_infos: + if a is main: + continue + + length = arc_len(a) + a_sign = int(a.get("side_sign", 0)) + + if is_force(a, force_struct_cobb, force_struct_min_len): + structural.append(a) + continue + + is_opposite = (main_sign != 0) and (a_sign != 0) and (a_sign != main_sign) + + if is_opposite and (a["cobb_deg"] >= second_thr) and passes_geom(a): + structural.append(a) + else: + if length >= min_len_minor and a["cobb_deg"] >= min_cobb_minor: + minor.append(a) + + return dedupe_by_range(structural), dedupe_by_range(minor) + +def chaklin_degree(cobb_deg): + "Переводит Cobb в степень по Чаклину (0/I/II/III/IV)" + a = float(cobb_deg) + if a < 1.0: + return "0 ст. (практически нет)" + if a <= 10.: + return "I ст." + if a <= 25.: + return "II ст." + if a <= 50.: + return "III ст." + return "IV ст." + +def scoliosis_type(structural_arcs): + if len(structural_arcs) <= 1: + return "C-сколиоз" + + signs = {int(a["side_sign"]) for a in structural_arcs if a.get("side_sign") is not None} + + if len(structural_arcs) >= 3 and len(signs) >= 2: + return "Z-сколиоз" + if len(signs) >= 2: + return "S-сколиоз" + return "C-сколиоз" + +def build_conclusion(arc_infos, structural=None, minor=None): + """ + Собирает текст заключения: + - выбирает главную дугу + - пишет тип, направление, Cobb, степень + - выводит список структурных дуг + - выводит список дополнительных дуг (minor) + """ + if not arc_infos: + return "Угловая деформация оси позвоночника не выявлена. Рентгенологических признаков сколиоза нет." + if structural is None and minor is None: + structural, minor = split_arcs(arc_infos) + else: + structural = structural or [] + minor = minor or [] + + def fmt_arc(i, a, with_len=False): + arc_degree = chaklin_degree(a["cobb_deg"]) + s = (f" Дуга {i}: {a['direction']}, уровни {a['upper_end']}-{a['lower_end']}, " + f"вершина ~ {a['apex_label']}, Cobb={a['cobb_deg']:.1f}°, степень={arc_degree}") + if with_len: + length = arc_len(a) + s += f", длина={length} позв." + return s + "." + + main = max(arc_infos, key=lambda x: x["cobb_deg"]) + + scol_type = scoliosis_type(structural) if structural else "не определён (нет структурных дуг)" + degree = chaklin_degree(main["cobb_deg"]) + + lines = [ + f"1. Тип сколиоза - {scol_type}.", + f"2. Направление основной дуги - {main['direction']} (по изображению).", + f"3. Угол деформации (Cobb) основной дуги - {main['cobb_deg']:.1f}°.", + f"4. Степень сколиоза по углу деформации - {degree}.", + "", + "Структурные дуги (учитываются для типа):" + ] + + if structural: + for i, a in enumerate(sorted(structural, key=lambda a: a["cobb_deg"], reverse=True), start=1): + lines.append(fmt_arc(i, a)) + else: + lines.append(" Нет (не прошли критерии структурности).") + + if minor: + lines += ["", "Дополнительные дуги (не учитываются для типа):"] + for i, a in enumerate(sorted(minor, key=lambda a: a["cobb_deg"], reverse=True), start=1): + lines.append(fmt_arc(i, a, with_len=True)) + + return "\n".join(lines) diff --git a/scoliosis_geometry.py b/scoliosis_geometry.py new file mode 100644 index 0000000..54d82b5 --- /dev/null +++ b/scoliosis_geometry.py @@ -0,0 +1,130 @@ +import numpy as np + +def edge_angle_deg(p1, p2): + "Находит угол наклона отрезка p1→p2 в градусах (от -180 до +180)." + dx = float(p2[0] - p1[0]) + dy = float(p2[1] - p1[1]) + return np.degrees(np.arctan2(dy, dx)) + +def normalize_angle_minus90_90(a): + "Приводит угол к диапазону [-90, 90)." + return (a + 90.0) % 180.0 - 90.0 + +def angle_diff_deg(a, b): + "Возвращает насколько отличаются две линии по углу, но только 'острый' вариант (0..90)" + d = abs(a - b) % 180.0 + if d > 90.0: + d = 180.0 - d + return d + +def order_points_ccw(pts): + "Приводит 4 точки OBB к стабильному порядку по кругу" + pts = np.asarray(pts, dtype=np.float32) + c = pts.mean(axis=0) + ang = np.arctan2(pts[:, 1] - c[1], pts[:, 0] - c[0]) + return pts[np.argsort(ang)] + +def vertebra_endplates_angles(obb_pts): + "Нахождит углов верхней и нижней замыкательных пластинок" + p = order_points_ccw(obb_pts) + + edges = [] + for i in range(4): + a = p[i] + b = p[(i + 1) % 4] + + ang = normalize_angle_minus90_90(edge_angle_deg(a, b)) + mean_y = float((a[1] + b[1]) / 2.0) + score = abs(ang) + + edges.append((score, mean_y, float(ang))) + + edges.sort(key=lambda t: t[0]) + e1, e2 = edges[0], edges[1] + top, bottom = (e1, e2) if e1[1] <= e2[1] else (e2, e1) + + return top[2], bottom[2] + +def fit_line_x_of_y(y, x, eps=1e-6): + "Строит прямую вида x = a*y + b по точкам" + if np.std(y) < eps: + return 0.0, float(np.mean(x)) + a, b = np.polyfit(y, x, 1) + return float(a), float(b) + +def fit_spine_axis_dx(vertebrae): + """Строит глобальную ось позвоночника (одна прямая по всем центрам). + Возвращает dx: для каждого позвонка разницу показывая, + насколько позвонок левее/правее глобальной оси""" + centers = np.array([v["c"] for v in vertebrae], dtype=np.float32) + xs, ys = centers[:, 0], centers[:, 1] + + a, b = fit_line_x_of_y(ys, xs) + return xs - (a * ys + b) + +def fit_spine_axis_dx_local(vertebrae, window=7): + "Вычисляет локальные изгибы относительно соседей" + centers = np.array([v["c"] for v in vertebrae], dtype=np.float32) + xs, ys = centers[:, 0], centers[:, 1] + n = len(xs) + dx = np.zeros(n, dtype=np.float32) + + half = window // 2 + for i in range(n): + s = max(0, i - half) + e = min(n, i + half + 1) + a, b = fit_line_x_of_y(ys[s:e], xs[s:e]) + dx[i] = xs[i] - (a * ys[i] + b) + return dx + +def smooth_1d(x, iters=3, alpha=0.25): + "Простое сглаживание 1D-сигнала (dx)" + x = x.astype(np.float32).copy() + for _ in range(int(iters)): + x[1:-1] = x[1:-1] + alpha * (x[:-2] - 2 * x[1:-1] + x[2:]) + return x + +def fit_spine_axis_dx_endpoints(vertebrae): + "Строит ось как линию через первый и последний позвонок (а не через регрессию)" + centers = np.array([v["c"] for v in vertebrae], dtype=np.float32) + xs, ys = centers[:, 0], centers[:, 1] + + x0, y0 = float(xs[0]), float(ys[0]) + x1, y1 = float(xs[-1]), float(ys[-1]) + + if abs(y1 - y0) < 1e-6: + return xs - float(np.mean(xs)) + + t = (ys - y0) / (y1 - y0) + x_line = x0 + t * (x1 - x0) + return xs - x_line + +def _smooth_chain(points, window=5): + """сглаживает цепочку точек скользящим средним по окну window. + Делает окно нечётным, берёт среднее по соседям и возвращает сглаженные точки""" + pts = np.asarray(points, dtype=np.float32) + if len(pts) < 3 or window <= 2: + return pts + window = int(window) + if window % 2 == 0: + window += 1 + half = window // 2 + out = pts.copy() + for i in range(half, len(pts) - half): + out[i] = np.mean(pts[i - half:i + half + 1], axis=0) + return out + +def arc_centerline(vertebrae, info, smooth_window=5): + """Строит центральную линию дуги""" + s = info.get("upper_idx", info["start_idx"]) + e = info.get("lower_idx", info["end_idx"]) + if s > e: + s, e = e, s + centers = [vertebrae[k]["c"] for k in range(s, e + 1)] + if not centers: + return None + centers = np.array(centers, dtype=np.float32) + centers = centers[np.argsort(centers[:, 1])] + if smooth_window and smooth_window > 2: + centers = _smooth_chain(centers, window=smooth_window) + return centers diff --git a/vis.py b/vis.py new file mode 100644 index 0000000..44a9719 --- /dev/null +++ b/vis.py @@ -0,0 +1,19 @@ +import cv2 +import numpy as np + +def draw_arc_contours( + img_bgr: np.ndarray, + vertebrae, + arc_infos, + colors=((255, 0, 255), (0, 255, 255), (255, 255, 0)), +): + """Визуализация контура дуги деформации""" + vis = img_bgr.copy() + for i, info in enumerate(arc_infos): + pts = info.get("centerline_pts") + if pts is None: + continue + pts_i = np.round(pts).astype(np.int32).reshape(-1, 1, 2) + color = colors[i % len(colors)] + cv2.polylines(vis, [pts_i], isClosed=False, color=color, thickness=3) + return vis