Browse Source

Initial scoliosis pipeline (without models)\

main
Roman 4 days ago
commit
83dfbcc238
  1. 21
      .gitignore
  2. 139
      classification.py
  3. 78
      config.py
  4. 374
      detect_vertebrae.py
  5. 191
      io_dicom.py
  6. 88
      model_loader.py
  7. 274
      pipeline.py
  8. 13
      report_builder.py
  9. 311
      scoliosis_arcs.py
  10. 130
      scoliosis_geometry.py
  11. 19
      vis.py

21
.gitignore vendored

@ -0,0 +1,21 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <EFBFBD><EFBFBD><EFBFBD> <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <EFBFBD><EFBFBD> <EFBFBD><EFBFBD> (ECHO) <EFBFBD><EFBFBD><EFBFBD><EFBFBD>祭.
# Python
__pycache__/
*.pyc
results/
outputs/
tmp/
# OS
.DS_Store
Thumbs.db
# IDE
.vscode/
.idea/
# Models
models/*.pt
models/*.onnx
models/*.engine

139
classification.py

@ -0,0 +1,139 @@
import os
import cv2
import torch
import torch.nn.functional as F
from PIL import Image, ImageOps
from torchvision import transforms
from torchvision.transforms import InterpolationMode
STITCH_IMG_SIZE = 512
STITCH_THRESHOLD = float(os.environ.get("STITCH_THRESHOLD", 0.8))
TRIM_THR = 30
TRIM_BLACK_FRAC = 0.97
TRIM_MAX_PX = 600
def pad_to_square(img: Image.Image, fill: int = 0) -> Image.Image:
"""Дополненяет изображения полями до квадратного формата, чтобы не искажать при масштабировании."""
w, h = img.size
if w == h:
return img
side = max(w, h)
pl = (side - w) // 2
pr = side - w - pl
pt = (side - h) // 2
pb = side - h - pt
return ImageOps.expand(img, border=(pl, pt, pr, pb), fill=fill)
def trim_black_frame(img: Image.Image) -> Image.Image:
"""Обрезает чёрную рамку по краям; если обрезка слишком агрессивная — возвращает оригинал."""
g = img.convert("L")
w, h = g.size
px = g.load()
step_y = max(1, h // 180)
step_x = max(1, w // 180)
def col_black_frac(x: int) -> float:
total = 0
black = 0
for y in range(0, h, step_y):
total += 1
if px[x, y] <= TRIM_THR:
black += 1
return black / max(1, total)
def row_black_frac(y: int) -> float:
total = 0
black = 0
for x in range(0, w, step_x):
total += 1
if px[x, y] <= TRIM_THR:
black += 1
return black / max(1, total)
left = 0
for x in range(w):
if col_black_frac(x) < TRIM_BLACK_FRAC:
break
left += 1
if left >= TRIM_MAX_PX:
break
right = 0
for x in range(w - 1, -1, -1):
if col_black_frac(x) < TRIM_BLACK_FRAC:
break
right += 1
if right >= TRIM_MAX_PX:
break
top = 0
for y in range(h):
if row_black_frac(y) < TRIM_BLACK_FRAC:
break
top += 1
if top >= TRIM_MAX_PX:
break
bottom = 0
for y in range(h - 1, -1, -1):
if row_black_frac(y) < TRIM_BLACK_FRAC:
break
bottom += 1
if bottom >= TRIM_MAX_PX:
break
x1 = min(left, w - 2)
y1 = min(top, h - 2)
x2 = max(x1 + 2, w - right)
y2 = max(y1 + 2, h - bottom)
if x2 <= x1 + 1 or y2 <= y1 + 1:
return img
if (x2 - x1) < w * 0.35 or (y2 - y1) < h * 0.35:
return img
return img.crop((x1, y1, x2, y2))
STITCH_TFM = transforms.Compose([
transforms.Lambda(lambda arr: Image.fromarray(cv2.cvtColor(arr, cv2.COLOR_BGR2RGB))),
transforms.Lambda(trim_black_frame),
transforms.Lambda(pad_to_square),
transforms.Resize((STITCH_IMG_SIZE, STITCH_IMG_SIZE), interpolation=InterpolationMode.BILINEAR),
transforms.ToTensor(),
transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])
PROJ_TFM = transforms.Compose([
transforms.Lambda(lambda arr: Image.fromarray(cv2.cvtColor(arr, cv2.COLOR_BGR2RGB))),
transforms.Resize((224, 224), interpolation=InterpolationMode.BILINEAR),
transforms.ToTensor(),
transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])
def classify_stitched(model, img_bgr):
"""Модель распознает склеенное/одиночное изображение. Возвращает класс 0(склеенный)/1(одиночный) и уверенность"""
device = next(model.parameters()).device
x = STITCH_TFM(img_bgr).unsqueeze(0).to(device)
with torch.no_grad():
logits = model(x)
if logits.shape[1] == 1 or (logits.ndim == 2 and logits.shape[-1] == 1):
prob = torch.sigmoid(logits)[0, 0].item()
pred = 1 if prob >= STITCH_THRESHOLD else 0
conf = prob if pred == 1 else (1 - prob)
return pred, float(conf)
probs = F.softmax(logits, dim=1)
conf, pred = probs.max(dim=1)
return int(pred.item()), float(conf.item())
def classify_projection(model, img_bgr):
"""Определяет проекцию 0(боковая)/1(фронтальная), возвращает класс и уверенность."""
device = next(model.parameters()).device
x = PROJ_TFM(img_bgr).unsqueeze(0).to(device)
with torch.no_grad():
logits = model(x)
probs = F.softmax(logits, dim=1)
conf, pred = probs.max(dim=1)
return int(pred.item()), float(conf.item())

78
config.py

@ -0,0 +1,78 @@
import os
from dataclasses import dataclass
def _as_bool(value: str, default: bool) -> bool:
if value is None:
return default
return value.strip() not in {"0", "false", "False", ""}
def _env_path(key: str, default_path: str) -> str:
return os.path.abspath(os.environ.get(key, default_path))
@dataclass(frozen=True)
class ModelPaths:
classification: str
projection: str
detection: str
@dataclass(frozen=True)
class AppConfig:
models: ModelPaths
postprocessing: bool
interpolate_missing: bool
results_dir: str
detect_eps_px: float
detect_min_len: int
detect_zero_gap: int
detect_edge_zero_extend: int
merge_max_gap: int
min_cobb_main: float
min_cobb_second_abs: float
min_cobb_second_rel: float
min_len_struct_small: int
min_len_struct_large: int
min_apex_margin_small: int
min_apex_margin_large: int
def load_config() -> AppConfig:
base_dir = os.path.dirname(__file__)
default_results = os.path.join(base_dir, "results")
models = ModelPaths(
classification=_env_path(
"MODEL_CLASSIFICATION",
os.path.join(base_dir, "models", "classification.pt"),
),
projection=_env_path(
"MODEL_PROJECTION",
os.path.join(base_dir, "models", "projection.pt"),
),
detection=_env_path(
"MODEL_DETECTION",
os.path.join(base_dir, "models", "1_best_yolo8(L).pt"),
),
)
return AppConfig(
models=models,
postprocessing=_as_bool(os.environ.get("POSTPROCESSING"), True),
interpolate_missing=_as_bool(os.environ.get("INTERPOLATE_MISSING"), False),
results_dir=_env_path("RESULTS_DIR", default_results),
detect_eps_px=float(os.environ.get("DETECT_EPS_PX", 1.0)),
detect_min_len=int(os.environ.get("DETECT_MIN_LEN", 2)),
detect_zero_gap=int(os.environ.get("DETECT_ZERO_GAP", 2)),
detect_edge_zero_extend=int(os.environ.get("DETECT_EDGE_ZERO_EXTEND", 3)),
merge_max_gap=int(os.environ.get("MERGE_MAX_GAP", 1)),
min_cobb_main=float(os.environ.get("MIN_COBB_MAIN", 1.0)),
min_cobb_second_abs=float(os.environ.get("MIN_COBB_SECOND_ABS", 5.0)),
min_cobb_second_rel=float(os.environ.get("MIN_COBB_SECOND_REL", 0.35)),
min_len_struct_small=int(os.environ.get("MIN_LEN_STRUCT_SMALL", 3)),
min_len_struct_large=int(os.environ.get("MIN_LEN_STRUCT_LARGE", 4)),
min_apex_margin_small=int(os.environ.get("MIN_APEX_MARGIN_SMALL", 1)),
min_apex_margin_large=int(os.environ.get("MIN_APEX_MARGIN_LARGE", 2)),
)

374
detect_vertebrae.py

@ -0,0 +1,374 @@
import os
import cv2
import numpy as np
def detect_vertebrae(
image,
model_results,
labelmap,
debug_save_path=None,
enable_postprocessing=True,
interpolate_missing=False,
spine_sequence=None,
):
"""
Detect vertebrae with optional postprocessing.
Returns:
dict[label] = [x_center, y_center, width, height, angle, confidence]
"""
default_spine_sequence = [
"T1", "T2", "T3", "T4", "T5", "T6", "T7", "T8", "T9", "T10", "T11", "T12",
"L1", "L2", "L3", "L4", "L5",
]
if spine_sequence is None:
spine_sequence = default_spine_sequence
label_to_index = {label: idx for idx, label in enumerate(spine_sequence)}
best_boxes = {}
colors = [
(0, 0, 255), # Red
(0, 255, 0), # Green
(255, 0, 0), # Blue
(0, 255, 255), # Yellow
(255, 0, 255), # Magenta
(255, 255, 0), # Cyan
(0, 165, 255), # Orange
(255, 20, 147), # Pink
(147, 20, 255), # Violet
(0, 215, 255), # Gold
(255, 215, 0), # Turquoise
(255, 105, 180), # Hot pink
(0, 255, 127), # Spring green
(255, 69, 0), # Orange red
(72, 61, 139), # Dark slate blue
(47, 255, 173), # Aquamarine
(255, 140, 0), # Dark orange
]
if model_results:
result = model_results[0]
if hasattr(result, "obb") and result.obb is not None and len(result.obb) > 0:
obb_data = result.obb
for i in range(len(obb_data)):
try:
conf = float(obb_data.conf[i].cpu().numpy())
cls_id = int(obb_data.cls[i].cpu().numpy())
label = labelmap[cls_id] if cls_id < len(labelmap) else f"Unknown_{cls_id}"
if hasattr(obb_data, "xyxyxyxy"):
box_points = obb_data.xyxyxyxy[i].cpu().numpy()
elif hasattr(obb_data, "xyxyxyxyn"):
box_points = obb_data.xyxyxyxyn[i].cpu().numpy()
h, w = image.shape[:2]
box_points = box_points.reshape(4, 2)
box_points[:, 0] *= w
box_points[:, 1] *= h
box_points = box_points.flatten()
else:
box_points = result.obb.xyxyxyxy[i].cpu().numpy()
points = box_points.reshape(4, 2).astype(np.float32)
rect = cv2.minAreaRect(points)
center, size, angle = rect
x_center, y_center = center
width, height = size
if width < height:
angle += 90
width, height = height, width
result_box = np.array([x_center, y_center, width, height, angle, conf], dtype=np.float32)
if label not in best_boxes or conf > best_boxes[label][-1]:
best_boxes[label] = result_box
except Exception as exc:
print(f"Box parse error: {exc}")
continue
if enable_postprocessing and best_boxes:
boxes_list = [(label, box) for label, box in best_boxes.items()]
boxes_list.sort(key=lambda x: x[1][-1], reverse=True)
kept_boxes = []
for label, box in boxes_list:
x_center, y_center, width, height, _, _ = box
is_duplicate = False
for kept_label, kept_box in kept_boxes:
xk, yk, wk, hk, _, _ = kept_box
dist = np.sqrt((x_center - xk) ** 2 + (y_center - yk) ** 2)
threshold = 0.3 * min(height, hk)
if dist < threshold:
is_duplicate = True
print(f"Duplicate removed: {label} overlaps with {kept_label}")
break
if not is_duplicate:
kept_boxes.append((label, box))
kept_boxes.sort(key=lambda x: x[1][1])
if kept_boxes:
valid_boxes = []
last_index = -1
last_y = -1
y_gaps = []
for label, box in kept_boxes:
x_center, y_center, width, height, angle, conf = box
if label not in label_to_index:
print(f"Unknown vertebra skipped: {label}")
continue
current_index = label_to_index[label]
if last_index == -1:
valid_boxes.append((label, box))
last_index = current_index
last_y = y_center
continue
expected_index = last_index + 1
if current_index == expected_index:
valid_boxes.append((label, box))
y_gaps.append(y_center - last_y)
last_index = current_index
last_y = y_center
continue
if current_index > expected_index:
valid_boxes.append((label, box))
last_index = current_index
last_y = y_center
continue
avg_gap = np.median(y_gaps) if y_gaps else 50.0
expected_y = last_y + avg_gap
y_diff = abs(y_center - expected_y)
if y_diff < avg_gap * 0.6 and expected_index < len(spine_sequence):
new_label = spine_sequence[expected_index]
new_box = box.copy()
new_box[5] = -1.0
valid_boxes.append((new_label, new_box))
y_gaps.append(y_center - last_y)
print(f"Order corrected: {label} -> {new_label} (conf=-1)")
last_index = expected_index
last_y = y_center
continue
print(f"Out-of-order skipped: {label} after {valid_boxes[-1][0]}")
if interpolate_missing and len(valid_boxes) > 1:
valid_boxes.sort(key=lambda x: x[1][1])
gaps = []
for i in range(1, len(valid_boxes)):
prev_y = valid_boxes[i - 1][1][1]
curr_y = valid_boxes[i][1][1]
gaps.append(curr_y - prev_y)
avg_gap = np.median(gaps) if gaps else 0
max_allowed_gap = avg_gap * 1.8 if avg_gap > 0 else float("inf")
reliable_angles = []
reliable_ratios = []
for _, box in valid_boxes:
_, _, width, height, angle, conf = box
if conf > 0:
norm_angle = angle % 180
if norm_angle > 90:
norm_angle -= 180
reliable_angles.append(norm_angle)
aspect_ratio = width / max(height, 1)
reliable_ratios.append(aspect_ratio)
median_angle = np.median(reliable_angles) if reliable_angles else 0.0
median_ratio = np.median(reliable_ratios) if reliable_ratios else 2.0
new_boxes = []
for i in range(len(valid_boxes)):
current_label, current_box = valid_boxes[i]
current_idx = label_to_index[current_label]
new_boxes.append((current_label, current_box))
if i < len(valid_boxes) - 1:
next_label, next_box = valid_boxes[i + 1]
next_idx = label_to_index[next_label]
y_gap = next_box[1] - current_box[1]
index_gap = next_idx - current_idx
if index_gap > 1 and y_gap > max_allowed_gap * 0.7:
num_missing = index_gap - 1
print(f"Missing between {current_label} and {next_label}: {num_missing}")
x1, y1, w1, h1, ang1, _ = current_box
x2, y2, w2, h2, ang2, _ = next_box
for k in range(1, num_missing + 1):
missing_idx = current_idx + k
if missing_idx >= len(spine_sequence):
continue
missing_label = spine_sequence[missing_idx]
fraction = k / index_gap
x_center = (x1 + x2) / 2
y_center = y1 + fraction * (y2 - y1)
avg_width = (w1 + w2) / 2
avg_height = (h1 + h2) / 2
if median_ratio > 1:
width = avg_width
height = width / median_ratio
else:
height = avg_height
width = height * median_ratio
norm_ang1 = ang1 % 180
norm_ang2 = ang2 % 180
if abs(norm_ang1 - norm_ang2) > 90:
if norm_ang1 > norm_ang2:
norm_ang1 -= 180
else:
norm_ang2 -= 180
angle = norm_ang1 + fraction * (norm_ang2 - norm_ang1)
angle = (angle + 90) % 180 - 90
if reliable_angles:
angle = 0.7 * angle + 0.3 * median_angle
if height > width:
width, height = height, width
angle = (angle + 90) % 180
conf = -0.5
interpolated_box = np.array(
[x_center, y_center, width, height, angle, conf],
dtype=np.float32,
)
print(
"Interpolated: "
f"{missing_label} x={x_center:.1f} y={y_center:.1f} "
f"w={width:.1f} h={height:.1f} angle={angle:.2f}"
)
new_boxes.append((missing_label, interpolated_box))
new_boxes.sort(key=lambda x: x[1][1])
valid_boxes = new_boxes
best_boxes = {label: box for label, box in valid_boxes}
if debug_save_path:
if len(image.shape) == 2:
vis_image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
elif image.shape[2] == 3:
vis_image = image.copy()
else:
vis_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
for idx, (label, box) in enumerate(best_boxes.items()):
try:
x_center, y_center, width, height, angle, conf = box
color = colors[idx % len(colors)]
is_corrected = conf < 0
thickness = 4 if is_corrected else 3
alpha = 0.35 if is_corrected else 0.2
rect = ((x_center, y_center), (width, height), angle)
box_points = cv2.boxPoints(rect)
box_points = np.int64(box_points)
contour_color = (0, 0, 255) if is_corrected else color
cv2.drawContours(vis_image, [box_points], 0, contour_color, thickness)
overlay = vis_image.copy()
fill_color = (0, 0, 255) if is_corrected else color
cv2.fillPoly(overlay, [box_points], fill_color)
cv2.addWeighted(overlay, alpha, vis_image, 1 - alpha, 0, vis_image)
if is_corrected:
text = f"{label} CV({conf:.1f})"
else:
text = f"{label} ({conf:.3f})"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
thickness = 2
(text_width, text_height), _ = cv2.getTextSize(text, font, font_scale, thickness)
text_x = int(x_center - text_width / 2)
text_y = int(y_center + text_height / 2)
padding = 5
bg_rect = [
(text_x - padding, text_y - text_height - padding),
(text_x + text_width + padding, text_y + padding),
]
text_overlay = vis_image.copy()
bg_color = (0, 0, 100) if is_corrected else (0, 0, 0)
cv2.rectangle(text_overlay, bg_rect[0], bg_rect[1], bg_color, -1)
text_alpha = 0.7
cv2.addWeighted(text_overlay, text_alpha, vis_image, 1 - text_alpha, 0, vis_image)
text_color = (255, 255, 255) if not is_corrected else (255, 200, 200)
cv2.putText(
vis_image,
text,
(text_x, text_y),
font,
font_scale,
text_color,
thickness,
)
center_color = (0, 0, 255) if is_corrected else (255, 255, 255)
cv2.circle(vis_image, (int(x_center), int(y_center)), 5, center_color, -1)
cv2.circle(vis_image, (int(x_center), int(y_center)), 2, (0, 0, 0), -1)
except Exception as exc:
print(f"Draw error for {label}: {exc}")
continue
dir_path = os.path.dirname(debug_save_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
try:
if len(vis_image.shape) == 2:
vis_image = cv2.cvtColor(vis_image, cv2.COLOR_GRAY2BGR)
cv2.imwrite(debug_save_path, vis_image, [cv2.IMWRITE_JPEG_QUALITY, 95])
print(f"Debug saved: {debug_save_path}")
print(f"Detections: {len(best_boxes)}")
corrected_count = sum(1 for box in best_boxes.values() if box[5] < 0)
if corrected_count > 0:
print(f"Corrected order: {corrected_count}")
except Exception as exc:
print(f"Debug save error: {exc}")
return {label: box.astype(float).tolist() for label, box in best_boxes.items()}

191
io_dicom.py

@ -0,0 +1,191 @@
import os
from typing import Optional
import cv2
import numpy as np
import pydicom
def read_dicom(path: str) -> np.ndarray:
ds = pydicom.dcmread(path)
img = ds.pixel_array
if getattr(ds, "PhotometricInterpretation", "") == "MONOCHROME1":
img = np.max(img) - img
img = img.astype(np.float32)
img -= img.min()
if img.max() > 0:
img /= img.max()
img = (img * 255.0).clip(0, 255).astype(np.uint8)
if img.ndim == 2:
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
elif img.shape[2] == 3:
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
return img
def save_secondary_capture(
original_dicom_path: str,
img_bgr: np.ndarray,
out_path: str,
series_description: str = "RG-SCOLIOSIS-CONTOUR",
overlay_text: Optional[str] = None,
):
import pydicom
from pydicom.uid import generate_uid
ds = pydicom.dcmread(original_dicom_path)
img_draw = img_bgr.copy()
if overlay_text:
cv2.putText(
img_draw,
overlay_text,
(20, 40),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 255, 0),
2,
)
img_rgb = cv2.cvtColor(img_draw, cv2.COLOR_BGR2RGB)
rows, cols = img_rgb.shape[:2]
ds.SOPClassUID = "1.2.840.10008.5.1.4.1.1.7"
ds.SOPInstanceUID = generate_uid()
ds.SeriesInstanceUID = generate_uid()
ds.Modality = "OT"
ds.SeriesDescription = series_description
if "SeriesNumber" in ds:
try:
ds.SeriesNumber = int(ds.SeriesNumber) + 1000
except Exception:
ds.SeriesNumber = 1000
else:
ds.SeriesNumber = 1000
ds.Rows = rows
ds.Columns = cols
ds.SamplesPerPixel = 3
ds.PhotometricInterpretation = "RGB"
ds.PlanarConfiguration = 0
ds.BitsAllocated = 8
ds.BitsStored = 8
ds.HighBit = 7
ds.PixelRepresentation = 0
ds.PixelData = img_rgb.tobytes()
os.makedirs(os.path.dirname(out_path), exist_ok=True)
ds.save_as(out_path, write_like_original=False)
def save_dicom_sr(
original_dicom_path: str,
description_text: str,
conclusion_text: str,
out_path: str,
series_description: str = "SCOLIOSIS_SR",
extra_struct: Optional[dict] = None,
):
import datetime as _dt
import pydicom
from pydicom.dataset import Dataset, FileDataset
from pydicom.uid import ExplicitVRLittleEndian, generate_uid
src = pydicom.dcmread(original_dicom_path)
file_meta = Dataset()
file_meta.MediaStorageSOPClassUID = "1.2.840.10008.5.1.4.1.1.88.11"
file_meta.MediaStorageSOPInstanceUID = generate_uid()
file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
file_meta.ImplementationClassUID = generate_uid()
ds = FileDataset(out_path, {}, file_meta=file_meta, preamble=b"\0" * 128)
for tag in (
"PatientName",
"PatientID",
"PatientBirthDate",
"PatientSex",
"StudyInstanceUID",
"StudyID",
"AccessionNumber",
"StudyDate",
"StudyTime",
):
if tag in src:
ds[tag] = src[tag]
ds.SOPClassUID = file_meta.MediaStorageSOPClassUID
ds.SOPInstanceUID = file_meta.MediaStorageSOPInstanceUID
ds.SeriesInstanceUID = generate_uid()
ds.Modality = "SR"
ds.SeriesDescription = series_description
ds.SeriesNumber = 2000
ds.InstanceNumber = 1
if "SpecificCharacterSet" in src:
ds.SpecificCharacterSet = src.SpecificCharacterSet
else:
ds.SpecificCharacterSet = "ISO_IR 192"
now = _dt.datetime.now()
ds.ContentDate = now.strftime("%Y%m%d")
ds.ContentTime = now.strftime("%H%M%S")
root = Dataset()
root.ValueType = "CONTAINER"
root.ContinuityOfContent = "SEPARATE"
root.ConceptNameCodeSequence = [Dataset()]
root.ConceptNameCodeSequence[0].CodeValue = "11528-7"
root.ConceptNameCodeSequence[0].CodingSchemeDesignator = "LN"
root.ConceptNameCodeSequence[0].CodeMeaning = "Imaging Report"
desc_item = Dataset()
desc_item.ValueType = "TEXT"
desc_item.ConceptNameCodeSequence = [Dataset()]
desc_item.ConceptNameCodeSequence[0].CodeValue = "121070"
desc_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = "DCM"
desc_item.ConceptNameCodeSequence[0].CodeMeaning = "Findings"
desc_item.TextValue = str(description_text)
concl_item = Dataset()
concl_item.ValueType = "TEXT"
concl_item.ConceptNameCodeSequence = [Dataset()]
concl_item.ConceptNameCodeSequence[0].CodeValue = "121106"
concl_item.ConceptNameCodeSequence[0].CodingSchemeDesignator = "DCM"
concl_item.ConceptNameCodeSequence[0].CodeMeaning = "Summary"
concl_item.TextValue = str(conclusion_text)
content_items = [desc_item, concl_item]
if extra_struct:
def _text_item(code, meaning, value, scheme="DCM"):
item = Dataset()
item.ValueType = "TEXT"
item.ConceptNameCodeSequence = [Dataset()]
item.ConceptNameCodeSequence[0].CodeValue = code
item.ConceptNameCodeSequence[0].CodingSchemeDesignator = scheme
item.ConceptNameCodeSequence[0].CodeMeaning = meaning
item.TextValue = str(value)
return item
if "degree_class" in extra_struct:
content_items.append(_text_item("SCL-DEGREE", "Scoliosis degree class (1-4, 0=none)", extra_struct["degree_class"]))
if "scoliosis_type" in extra_struct:
content_items.append(_text_item("SCL-TYPE", "Scoliosis type", extra_struct["scoliosis_type"]))
if "main_cobb" in extra_struct:
content_items.append(_text_item("SCL-MAIN-COBB", "Main Cobb angle, deg", f"{extra_struct['main_cobb']:.1f}"))
if "prob_scoliosis" in extra_struct:
content_items.append(_text_item("SCL-PROB", "Probability of scoliosis (0-1)", f"{extra_struct['prob_scoliosis']:.2f}"))
root.ContentSequence = content_items
ds.ContentSequence = [root]
ds.CompletionFlag = "COMPLETE"
ds.VerificationFlag = "UNVERIFIED"
os.makedirs(os.path.dirname(out_path), exist_ok=True)
ds.save_as(out_path, write_like_original=False)

88
model_loader.py

@ -0,0 +1,88 @@
import os
import threading
from typing import Optional
import torch
import torch.nn as nn
from torchvision import models
from ultralytics import YOLO
from config import load_config
_LOCK = threading.Lock()
_CACHE = {}
_CFG = load_config()
def _cache_key(kind: str, path: str, device: Optional[str]) -> str:
return f"{kind}|{os.path.abspath(path)}|{device or 'auto'}"
def _load_shufflenet(path: str, device: Optional[str]):
"""Загружает shufflenet‑классификатор из state_dict"""
sd = torch.load(path, map_location=device or "cpu")
num_classes = sd["fc.weight"].shape[0]
candidates = [
models.shufflenet_v2_x0_5(weights=None),
models.shufflenet_v2_x1_0(weights=None),
]
model = None
for m in candidates:
m.fc = nn.Linear(m.fc.in_features, num_classes)
try:
m.load_state_dict(sd, strict=True)
model = m
break
except Exception:
continue
if model is None:
raise RuntimeError(f"Cannot load state_dict from {path} into supported shufflenet variants")
model.eval()
if device:
model.to(device)
return model
def _load_mobilenet_v3_large(path: str, device: Optional[str]):
"""Загружает mobilenet_v3_large из чекпойнта, настраивает число классов, переносит на device, ставит eval()"""
ckpt = torch.load(path, map_location=device or "cpu")
sd = ckpt.get("model_state", ckpt)
num_classes = sd["classifier.3.weight"].shape[0]
model = models.mobilenet_v3_large(weights=None)
model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)
model.load_state_dict(sd, strict=True)
model.eval()
if device:
model.to(device)
return model
def get_detection_model(device: Optional[str] = None, model_path: Optional[str] = None):
"""Получает YOLO‑детектор: берёт путь (из аргумента или конфига), достаёт из кеша или загружает и кэширует"""
path = model_path or _CFG.models.detection
key = _cache_key("detection", path, device)
with _LOCK:
if key not in _CACHE:
model = YOLO(path)
if device:
try:
model.to(device)
except Exception:
if hasattr(model, "model"):
model.model.to(device)
_CACHE[key] = model
return _CACHE[key]
def get_classification_model(device: Optional[str] = None, model_path: Optional[str] = None):
"""Получает классификатор stitched/single (mobilenet): кеш + загрузка при первом вызове"""
path = model_path or _CFG.models.classification
key = _cache_key("classification", path, device)
with _LOCK:
if key not in _CACHE:
_CACHE[key] = _load_mobilenet_v3_large(path, device)
return _CACHE[key]
def get_projection_model(device: Optional[str] = None, model_path: Optional[str] = None):
"""Получает классификатор проекции (shufflenet): кеш + загрузка при первом вызове"""
path = model_path or _CFG.models.projection
key = _cache_key("projection", path, device)
with _LOCK:
if key not in _CACHE:
_CACHE[key] = _load_shufflenet(path, device)
return _CACHE[key]

274
pipeline.py

@ -0,0 +1,274 @@
import os
import sys
import cv2
import numpy as np
from classification import classify_projection, classify_stitched
from config import load_config
from detect_vertebrae import detect_vertebrae
from io_dicom import read_dicom, save_dicom_sr, save_secondary_capture
from model_loader import (
get_classification_model,
get_detection_model,
get_projection_model,
)
from report_builder import build_scoliosis_description
from scoliosis_arcs import (
build_conclusion,
cobb_for_arc,
detect_arcs_from_dx,
merge_arcs,
split_arcs,
scoliosis_type,
)
from scoliosis_geometry import (
arc_centerline,
fit_spine_axis_dx,
fit_spine_axis_dx_endpoints,
fit_spine_axis_dx_local,
smooth_1d,
)
from vis import draw_arc_contours
CLASS_NAMES = {
0: "L1", 1: "L2", 2: "L3", 3: "L4", 4: "L5",
5: "T1", 6: "T2", 7: "T3", 8: "T4", 9: "T5",
10: "T6", 11: "T7", 12: "T8", 13: "T9", 14: "T10",
15: "T11", 16: "T12",
}
CONFIG = load_config()
POSTPROCESSING = CONFIG.postprocessing
INTERPOLATE_MISSING = CONFIG.interpolate_missing
def build_labelmap(class_names):
"""Делает список меток по индексам (0..max),
чтобы по cls_id получить имя позвонка."""
max_id = max(class_names)
return [class_names.get(i, f"Unknown_{i}") for i in range(max_id + 1)]
def vertebrae_from_detect_result(detect_result):
"""Преобразует результат детекции в список позвонков:
центр, коробка, угол, уверенность; сортирует по Y."""
vertebrae = []
for label, box in detect_result.items():
x_center, y_center, width, height, angle, conf = box
rect = (
(float(x_center), float(y_center)),
(float(width), float(height)),
float(angle),
)
pts = cv2.boxPoints(rect).astype(np.float32)
center = np.array([x_center, y_center], dtype=np.float32)
vertebrae.append({
"id": label,
"l": label,
"c": center,
"b": pts,
"ang": float(angle),
"conf": float(conf),
})
vertebrae.sort(key=lambda x: x["c"][1])
return vertebrae
def run_pipeline(img_bgr, vertebrae):
"""Основная логика анализа: строит оси/сигналы, находит дуги, считает Cobb,
делит дуги на структурные/минорные, формирует заключение и итоговые метрики."""
if img_bgr is None:
raise ValueError("img_bgr is None")
h, w = img_bgr.shape[:2]
dx_global = fit_spine_axis_dx(vertebrae)
dx_seg = smooth_1d(dx_global, iters=3, alpha=0.25)
dx_local = fit_spine_axis_dx_local(vertebrae, window=7)
dx_cobb = smooth_1d(dx_local, iters=3, alpha=0.25)
dx_ref = smooth_1d(fit_spine_axis_dx_endpoints(vertebrae), iters=3, alpha=0.25)
arcs = detect_arcs_from_dx(
dx_cobb,
eps_px=CONFIG.detect_eps_px,
min_len=CONFIG.detect_min_len,
zero_gap=CONFIG.detect_zero_gap,
edge_zero_extend=CONFIG.detect_edge_zero_extend,
)
arcs = merge_arcs(arcs, dx_ref, max_gap=CONFIG.merge_max_gap)
global_amp = float(np.max(np.abs(dx_seg)) + 1e-6)
arc_infos = []
for (s, e) in arcs:
info = cobb_for_arc(vertebrae, s, e, dx_cobb, dx_ref)
info["amp_rel"] = float(np.max(np.abs(dx_seg[s:e + 1])) / global_amp)
info["apex_margin"] = int(min(info["apex_idx"] - s, e - info["apex_idx"]))
info["centerline_pts"] = arc_centerline(vertebrae, info, smooth_window=5)
arc_infos.append(info)
n = len(vertebrae)
min_len_struct = CONFIG.min_len_struct_small if n <= 7 else CONFIG.min_len_struct_large
min_apex_margin_struct = CONFIG.min_apex_margin_small if n <= 7 else CONFIG.min_apex_margin_large
structural, minor = split_arcs(
arc_infos,
min_cobb_main=CONFIG.min_cobb_main,
min_cobb_second_abs=CONFIG.min_cobb_second_abs,
min_cobb_second_rel=CONFIG.min_cobb_second_rel,
min_len_struct=min_len_struct,
min_apex_margin_struct=min_apex_margin_struct,
)
conclusion = build_conclusion(arc_infos, structural=structural, minor=minor)
main = max(arc_infos, key=lambda x: x["cobb_deg"]) if arc_infos else None
degree_class = 0
prob_scoliosis = 0.0
if main:
cd = main["cobb_deg"]
if cd >= 50:
degree_class = 4
elif cd >= 26:
degree_class = 3
elif cd >= 11:
degree_class = 2
elif cd >= 1:
degree_class = 1
prob_scoliosis = min(1.0, cd / 50.0)
scol_type = "C-сколиоз"
if structural:
scol_type = scoliosis_type(structural)
else:
scol_type = "нет сколиоза"
return {
"img_bgr": img_bgr,
"h": h, "w": w,
"vertebrae": vertebrae,
"arc_infos": arc_infos,
"structural": structural,
"minor": minor,
"conclusion": conclusion,
"main_arc": main,
"degree_class": degree_class,
"scoliosis_type": scol_type,
"prob_scoliosis": prob_scoliosis,
}
def _split_if_stitched(img_bgr, cls_model):
"""Проверяет, “сшитый” ли снимок.
Если да делит на левую/правую половины,
иначе возвращает исходный."""
pred, conf = classify_stitched(cls_model, img_bgr)
if pred != 1: # 0 = single
return [img_bgr], {"classification": pred, "classification_conf": conf}
h, w = img_bgr.shape[:2]
mid = w // 2
left = img_bgr[:, :mid].copy()
right = img_bgr[:, mid:].copy()
return [left, right], {"classification": pred, "classification_conf": conf}
def _select_frontal(images, proj_model):
"""Из списка изображений выбирает фронтальную проекцию;
если нет берёт с максимальной уверенностью."""
best_img = images[0]
best_meta = {"projection": None, "projection_conf": None}
for img in images:
pred, conf = classify_projection(proj_model, img)
if pred == 1: # 1 = frontal
return img, {"projection": pred, "projection_conf": conf}
if best_meta["projection_conf"] is None or conf > best_meta["projection_conf"]:
best_img, best_meta = img, {"projection": pred, "projection_conf": conf}
return best_img, best_meta
def run_scoliosis_pipeline(infer_dicom_path, model=None, model_path=None, out_dir=None):
"""Полный пайплайн: читает DICOM, выбирает проекцию, детектит позвонки,
запускает run_pipeline, сохраняет SC и SR DICOM, возвращает пути и результат."""
model_path = model_path or CONFIG.models.detection
base_results_dir = out_dir or CONFIG.results_dir
image_stem = os.path.splitext(os.path.basename(infer_dicom_path))[0]
run_dir = os.path.join(base_results_dir, f"{image_stem}_result")
os.makedirs(run_dir, exist_ok=True)
det_model = model or get_detection_model(model_path=model_path)
cls_model = get_classification_model()
proj_model = get_projection_model()
img_bgr_full = read_dicom(infer_dicom_path)
split_imgs, cls_meta = _split_if_stitched(img_bgr_full, cls_model)
img_bgr, proj_meta = _select_frontal(split_imgs, proj_model)
print(f"classification pred={cls_meta['classification']} conf={cls_meta['classification_conf']:.3f}; "
f"projection pred={proj_meta['projection']} conf={proj_meta['projection_conf']:.3f}")
yolo_results = det_model(img_bgr, verbose=False)
labelmap = build_labelmap(CLASS_NAMES)
detect_result = detect_vertebrae(
img_bgr,
yolo_results,
labelmap,
enable_postprocessing=POSTPROCESSING,
interpolate_missing=INTERPOLATE_MISSING,
)
if not detect_result:
return {
"ok": False,
"reason": "no_detections",
}
vertebrae = vertebrae_from_detect_result(detect_result)
result = run_pipeline(img_bgr, vertebrae)
sc_img = draw_arc_contours(img_bgr, vertebrae, result["structural"])
sc_out_path = os.path.join(run_dir, f"sc_{image_stem}.dcm")
overlay_txt = f"prob={result.get('prob_scoliosis', 0.0):.2f} deg={result.get('degree_class', 0)} type={result.get('scoliosis_type', '')}"
save_secondary_capture(
infer_dicom_path,
sc_img,
sc_out_path,
series_description="RG-SCOLIOSIS-CONTOUR",
overlay_text=overlay_txt,
)
sr_desc = build_scoliosis_description(result["arc_infos"])
sr_out_path = os.path.join(run_dir, f"sr_{image_stem}.dcm")
main_cobb = result["main_arc"]["cobb_deg"] if result.get("main_arc") else 0.0
save_dicom_sr(
infer_dicom_path,
sr_desc,
result["conclusion"],
sr_out_path,
series_description="RG-SCOLIOSIS-SR",
extra_struct={
"degree_class": result.get("degree_class", 0),
"scoliosis_type": result.get("scoliosis_type", ""),
"main_cobb": main_cobb,
"prob_scoliosis": result.get("prob_scoliosis", 0.0),
},
)
return {
"ok": True,
"sr_dicom_path": sr_out_path,
"sc_dicom_path": sc_out_path,
"conclusion": result["conclusion"],
"arc_infos": result["arc_infos"],
"out_dir": run_dir,
}
if __name__ == "__main__":
MODEL_PATH = CONFIG.models.detection
INFER_IMG_PATH = os.environ.get("INFER_IMG_PATH", r"C:\Users\Роман Владимирович\Desktop\XR_SCOLIOS\XR_SCOLIOS\1.2.643.5.1.13.13.12.2.77.8252.00090213030609010011090905030205\1.2.643.5.1.13.13.12.2.77.8252.09051310090608150606061508100113\1.2.643.5.1.13.13.12.2.77.8252.03061304011101150513090811030403.dcm")
OUT_DIR = CONFIG.results_dir
if not INFER_IMG_PATH:
sys.exit(0)
model = get_detection_model(model_path=MODEL_PATH)
out = run_scoliosis_pipeline(INFER_IMG_PATH, model=model, out_dir=OUT_DIR)
if not out.get("ok"):
sys.exit(1)

13
report_builder.py

@ -0,0 +1,13 @@
from scoliosis_arcs import chaklin_degree
def build_scoliosis_description(arc_infos):
if not arc_infos:
return "Признаков сколиоза не выявлено."
lines = []
for i, a in enumerate(sorted(arc_infos, key=lambda x: x["cobb_deg"], reverse=True), start=1):
degree = chaklin_degree(a["cobb_deg"])
lines.append(
f"Дуга {i}: {a['direction']}, уровни {a['upper_end']}-{a['lower_end']}, "
f"вершина ~ {a['apex_label']}, Cobb={a['cobb_deg']:.1f}°, степень={degree}."
)
return " ".join(lines)

311
scoliosis_arcs.py

@ -0,0 +1,311 @@
import numpy as np
from scoliosis_geometry import angle_diff_deg
def fill_zero_gaps(sign: np.ndarray, max_gap: int) -> np.ndarray:
if max_gap <= 0:
return sign
sign = sign.copy()
n = len(sign)
i = 0
while i < n:
if sign[i] != 0:
i += 1
continue
j = i
while j < n and sign[j] == 0:
j += 1
left = sign[i - 1] if i > 0 else 0
right = sign[j] if j < n else 0
if (j - i) <= max_gap and left != 0 and left == right:
sign[i:j] = left
i = j
return sign
def runs_of_same_sign(sign: np.ndarray) -> list[tuple[int, int, int]]:
"Находит непрерывные куски одинакового знака (+1 подряд, -1 подряд)"
n = len(sign)
out = []
i = 0
while i < n:
if sign[i] == 0:
i += 1
continue
sgn = int(sign[i])
s = i
while i < n and sign[i] == sgn:
i += 1
e = i - 1
out.append((sgn, s, e))
return out
def extend_through_zeros(sign: np.ndarray, s: int, e: int, max_extend: int) -> tuple[int, int]:
"""Если дуга заканчивается рядом с нулями,
функция слегка "расширяет" дугу в нули,
чтобы включить край (где dx почти 0)"""
n = len(sign)
k = 0
while k < max_extend and s > 0 and sign[s - 1] == 0:
s -= 1
k += 1
k = 0
while k < max_extend and e < n - 1 and sign[e + 1] == 0:
e += 1
k += 1
return s, e
def dx_to_sign(dx: np.ndarray, eps: float) -> np.ndarray:
return np.where(dx > eps, 1, np.where(dx < -eps, -1, 0)).astype(np.int8)
def compute_eps(dx: np.ndarray, eps_px: float, eps_rel: float = 0.01) -> float:
maxabs = float(np.max(np.abs(dx)) + 1e-6)
return max(float(eps_px), float(eps_rel) * maxabs)
def detect_arcs_from_dx(dx, eps_px=2.0, min_len=3, zero_gap=1, edge_zero_extend=2):
"Основная детекция дуг"
dx = np.asarray(dx, dtype=np.float32)
if len(dx) < 2:
return []
eps = compute_eps(dx, eps_px, eps_rel=0.01)
sign = dx_to_sign(dx, eps)
sign = fill_zero_gaps(sign, zero_gap)
arcs = []
for sgn, s, e in runs_of_same_sign(sign):
if (e - s + 1) >= int(min_len):
s, e = extend_through_zeros(sign, s, e, edge_zero_extend)
arcs.append((s, e))
return arcs
def arc_sign(signal: np.ndarray, s: int, e: int) -> int:
"""Определяет знак дуги по сигналу на участке (берёт медиану).
Нужно чтобы знак не ломался из-за шума"""
return 1 if float(np.median(signal[s:e + 1])) > 0 else -1
def merge_arcs(arcs, sign_signal, max_gap=0):
"""Сливает дуги, если:
- знак одинаковый
- и они либо пересекаются, либо рядом (max_gap позволяет "почти рядом")
Это финальная "склейка дуг", чтобы не было дробления"""
if not arcs:
return []
arcs = sorted(arcs, key=lambda t: (t[0], t[1]))
out = [[arcs[0][0], arcs[0][1], arc_sign(sign_signal, arcs[0][0], arcs[0][1])]]
for s, e in arcs[1:]:
sgn = arc_sign(sign_signal, s, e)
ps, pe, psgn = out[-1]
if sgn == psgn and s <= pe + max_gap:
out[-1][1] = max(pe, e)
else:
out.append([s, e, sgn])
return [(s, e) for s, e, _ in out]
def cobb_for_arc(vertebrae, s, e, dx_cobb, dx_ref_sign):
idxs = np.arange(s, e + 1)
apex_i = int(idxs[np.argmax(np.abs(dx_cobb[s:e + 1]))])
upper_candidates = range(s, apex_i + 1)
lower_candidates = range(apex_i, e + 1)
best_cobb = -1.0
best_pair = (s, e)
for ui in upper_candidates:
ta = vertebrae[ui]["ang"]
for li in lower_candidates:
c = float(angle_diff_deg(ta, vertebrae[li]["ang"]))
if c > best_cobb:
best_cobb = c
best_pair = (ui, li)
upper_idx, lower_idx = best_pair
side_sign = arc_sign(dx_ref_sign, s, e)
direction = "левосторонний" if side_sign > 0 else "правосторонний"
return {
"start_idx": int(s), "end_idx": int(e),
"apex_idx": int(apex_i), "apex_label": vertebrae[apex_i]["l"],
"direction": direction,
"side_sign": int(side_sign),
"cobb_deg": float(best_cobb),
"upper_end": vertebrae[upper_idx]["l"],
"lower_end": vertebrae[lower_idx]["l"],
"upper_idx": int(upper_idx),
"lower_idx": int(lower_idx),
}
def arc_len(a):
"Длина дуги в позвонках: end-start+1"
return a["end_idx"] - a["start_idx"] + 1
def is_force(a, force_cobb, force_len):
""""Форс-правило": если дуга очень большая по Cobb и не слишком короткая
- считаем её структурной, даже если остальные критерии спорные."""
return (a["cobb_deg"] >= force_cobb) and (arc_len(a) >= force_len)
def dedupe_by_range(arcs):
"Функция удаляет дубликаты по (start_idx, end_idx)"
seen = set()
out = []
for a in arcs:
key = (a["start_idx"], a["end_idx"])
if key not in seen:
out.append(a)
seen.add(key)
return out
def split_arcs(
arc_infos,
# 1) Главная дуга всегда structural, если вообще есть "сколиоз"
min_cobb_main=1.0,
# 2) Вторая (для S-типа) должна быть не "шумом"
min_cobb_second_abs=5.0,
min_cobb_second_rel=0.35,
min_len_struct=4,
min_len_minor=3,
min_apex_margin_struct=2,
min_amp_rel_struct=0.25,
min_cobb_minor=3.0,
force_struct_cobb=40.0,
force_struct_min_len=3
):
"""
Разделяет все дуги на:
- structural (важные, формируют тип C/S)
- minor (дополнительные)
"""
if not arc_infos:
return [], []
main = max(arc_infos, key=lambda a: a["cobb_deg"])
def passes_geom(a):
amp_rel = a.get("amp_rel")
if amp_rel is None:
return False
apex_margin = int(a.get("apex_margin", 999))
return (
(arc_len(a) >= min_len_struct) and
(apex_margin >= min_apex_margin_struct) and
(amp_rel >= min_amp_rel_struct)
)
structural, minor = [], []
main_is_force = (main["cobb_deg"] >= force_struct_cobb) and (arc_len(main) >= force_struct_min_len)
if main_is_force or (main["cobb_deg"] >= min_cobb_main):
structural.append(main)
second_thr = max(min_cobb_second_abs, min_cobb_second_rel * float(main["cobb_deg"]))
main_sign = int(main.get("side_sign", 0))
for a in arc_infos:
if a is main:
continue
length = arc_len(a)
a_sign = int(a.get("side_sign", 0))
if is_force(a, force_struct_cobb, force_struct_min_len):
structural.append(a)
continue
is_opposite = (main_sign != 0) and (a_sign != 0) and (a_sign != main_sign)
if is_opposite and (a["cobb_deg"] >= second_thr) and passes_geom(a):
structural.append(a)
else:
if length >= min_len_minor and a["cobb_deg"] >= min_cobb_minor:
minor.append(a)
return dedupe_by_range(structural), dedupe_by_range(minor)
def chaklin_degree(cobb_deg):
"Переводит Cobb в степень по Чаклину (0/I/II/III/IV)"
a = float(cobb_deg)
if a < 1.0:
return "0 ст. (практически нет)"
if a <= 10.:
return "I ст."
if a <= 25.:
return "II ст."
if a <= 50.:
return "III ст."
return "IV ст."
def scoliosis_type(structural_arcs):
if len(structural_arcs) <= 1:
return "C-сколиоз"
signs = {int(a["side_sign"]) for a in structural_arcs if a.get("side_sign") is not None}
if len(structural_arcs) >= 3 and len(signs) >= 2:
return "Z-сколиоз"
if len(signs) >= 2:
return "S-сколиоз"
return "C-сколиоз"
def build_conclusion(arc_infos, structural=None, minor=None):
"""
Собирает текст заключения:
- выбирает главную дугу
- пишет тип, направление, Cobb, степень
- выводит список структурных дуг
- выводит список дополнительных дуг (minor)
"""
if not arc_infos:
return "Угловая деформация оси позвоночника не выявлена. Рентгенологических признаков сколиоза нет."
if structural is None and minor is None:
structural, minor = split_arcs(arc_infos)
else:
structural = structural or []
minor = minor or []
def fmt_arc(i, a, with_len=False):
arc_degree = chaklin_degree(a["cobb_deg"])
s = (f" Дуга {i}: {a['direction']}, уровни {a['upper_end']}-{a['lower_end']}, "
f"вершина ~ {a['apex_label']}, Cobb={a['cobb_deg']:.1f}°, степень={arc_degree}")
if with_len:
length = arc_len(a)
s += f", длина={length} позв."
return s + "."
main = max(arc_infos, key=lambda x: x["cobb_deg"])
scol_type = scoliosis_type(structural) if structural else "не определён (нет структурных дуг)"
degree = chaklin_degree(main["cobb_deg"])
lines = [
f"1. Тип сколиоза - {scol_type}.",
f"2. Направление основной дуги - {main['direction']} (по изображению).",
f"3. Угол деформации (Cobb) основной дуги - {main['cobb_deg']:.1f}°.",
f"4. Степень сколиоза по углу деформации - {degree}.",
"",
"Структурные дуги (учитываются для типа):"
]
if structural:
for i, a in enumerate(sorted(structural, key=lambda a: a["cobb_deg"], reverse=True), start=1):
lines.append(fmt_arc(i, a))
else:
lines.append(" Нет (не прошли критерии структурности).")
if minor:
lines += ["", "Дополнительные дуги (не учитываются для типа):"]
for i, a in enumerate(sorted(minor, key=lambda a: a["cobb_deg"], reverse=True), start=1):
lines.append(fmt_arc(i, a, with_len=True))
return "\n".join(lines)

130
scoliosis_geometry.py

@ -0,0 +1,130 @@
import numpy as np
def edge_angle_deg(p1, p2):
"Находит угол наклона отрезка p1→p2 в градусах (от -180 до +180)."
dx = float(p2[0] - p1[0])
dy = float(p2[1] - p1[1])
return np.degrees(np.arctan2(dy, dx))
def normalize_angle_minus90_90(a):
"Приводит угол к диапазону [-90, 90)."
return (a + 90.0) % 180.0 - 90.0
def angle_diff_deg(a, b):
"Возвращает насколько отличаются две линии по углу, но только 'острый' вариант (0..90)"
d = abs(a - b) % 180.0
if d > 90.0:
d = 180.0 - d
return d
def order_points_ccw(pts):
"Приводит 4 точки OBB к стабильному порядку по кругу"
pts = np.asarray(pts, dtype=np.float32)
c = pts.mean(axis=0)
ang = np.arctan2(pts[:, 1] - c[1], pts[:, 0] - c[0])
return pts[np.argsort(ang)]
def vertebra_endplates_angles(obb_pts):
"Нахождит углов верхней и нижней замыкательных пластинок"
p = order_points_ccw(obb_pts)
edges = []
for i in range(4):
a = p[i]
b = p[(i + 1) % 4]
ang = normalize_angle_minus90_90(edge_angle_deg(a, b))
mean_y = float((a[1] + b[1]) / 2.0)
score = abs(ang)
edges.append((score, mean_y, float(ang)))
edges.sort(key=lambda t: t[0])
e1, e2 = edges[0], edges[1]
top, bottom = (e1, e2) if e1[1] <= e2[1] else (e2, e1)
return top[2], bottom[2]
def fit_line_x_of_y(y, x, eps=1e-6):
"Строит прямую вида x = a*y + b по точкам"
if np.std(y) < eps:
return 0.0, float(np.mean(x))
a, b = np.polyfit(y, x, 1)
return float(a), float(b)
def fit_spine_axis_dx(vertebrae):
"""Строит глобальную ось позвоночника (одна прямая по всем центрам).
Возвращает dx: для каждого позвонка разницу показывая,
насколько позвонок левее/правее глобальной оси"""
centers = np.array([v["c"] for v in vertebrae], dtype=np.float32)
xs, ys = centers[:, 0], centers[:, 1]
a, b = fit_line_x_of_y(ys, xs)
return xs - (a * ys + b)
def fit_spine_axis_dx_local(vertebrae, window=7):
"Вычисляет локальные изгибы относительно соседей"
centers = np.array([v["c"] for v in vertebrae], dtype=np.float32)
xs, ys = centers[:, 0], centers[:, 1]
n = len(xs)
dx = np.zeros(n, dtype=np.float32)
half = window // 2
for i in range(n):
s = max(0, i - half)
e = min(n, i + half + 1)
a, b = fit_line_x_of_y(ys[s:e], xs[s:e])
dx[i] = xs[i] - (a * ys[i] + b)
return dx
def smooth_1d(x, iters=3, alpha=0.25):
"Простое сглаживание 1D-сигнала (dx)"
x = x.astype(np.float32).copy()
for _ in range(int(iters)):
x[1:-1] = x[1:-1] + alpha * (x[:-2] - 2 * x[1:-1] + x[2:])
return x
def fit_spine_axis_dx_endpoints(vertebrae):
"Строит ось как линию через первый и последний позвонок (а не через регрессию)"
centers = np.array([v["c"] for v in vertebrae], dtype=np.float32)
xs, ys = centers[:, 0], centers[:, 1]
x0, y0 = float(xs[0]), float(ys[0])
x1, y1 = float(xs[-1]), float(ys[-1])
if abs(y1 - y0) < 1e-6:
return xs - float(np.mean(xs))
t = (ys - y0) / (y1 - y0)
x_line = x0 + t * (x1 - x0)
return xs - x_line
def _smooth_chain(points, window=5):
"""сглаживает цепочку точек скользящим средним по окну window.
Делает окно нечётным, берёт среднее по соседям и возвращает сглаженные точки"""
pts = np.asarray(points, dtype=np.float32)
if len(pts) < 3 or window <= 2:
return pts
window = int(window)
if window % 2 == 0:
window += 1
half = window // 2
out = pts.copy()
for i in range(half, len(pts) - half):
out[i] = np.mean(pts[i - half:i + half + 1], axis=0)
return out
def arc_centerline(vertebrae, info, smooth_window=5):
"""Строит центральную линию дуги"""
s = info.get("upper_idx", info["start_idx"])
e = info.get("lower_idx", info["end_idx"])
if s > e:
s, e = e, s
centers = [vertebrae[k]["c"] for k in range(s, e + 1)]
if not centers:
return None
centers = np.array(centers, dtype=np.float32)
centers = centers[np.argsort(centers[:, 1])]
if smooth_window and smooth_window > 2:
centers = _smooth_chain(centers, window=smooth_window)
return centers

19
vis.py

@ -0,0 +1,19 @@
import cv2
import numpy as np
def draw_arc_contours(
img_bgr: np.ndarray,
vertebrae,
arc_infos,
colors=((255, 0, 255), (0, 255, 255), (255, 255, 0)),
):
"""Визуализация контура дуги деформации"""
vis = img_bgr.copy()
for i, info in enumerate(arc_infos):
pts = info.get("centerline_pts")
if pts is None:
continue
pts_i = np.round(pts).astype(np.int32).reshape(-1, 1, 2)
color = colors[i % len(colors)]
cv2.polylines(vis, [pts_i], isClosed=False, color=color, thickness=3)
return vis
Loading…
Cancel
Save