Базовый коммит

This commit is contained in:
Aleksandr Mochalov
2025-12-03 16:50:06 +05:00
commit 939932eeb0
38 changed files with 2942 additions and 0 deletions
View File
+494
View File
@@ -0,0 +1,494 @@
import warnings
from typing import Optional, NamedTuple
import cv2
import numpy as np
import torch
from torch import Tensor
import torchvision.ops as ops
from torchvision.transforms import v2 as T
from skimage.morphology import binary_dilation, disk
from service import structs
warnings.filterwarnings('ignore', category=UserWarning)
device = torch.device('cuda')
models_root = "service/models/shoulder"
model_frac = torch.load(f'{models_root}/frac_model.pth',
weights_only=False).to(device)
model_lr = torch.load(f'{models_root}/lr_model.pth',
weights_only=False).to(device)
model_parts = torch.load(f'{models_root}/parts_model.pth',
weights_only=False).to(device)
model_move = torch.load(f'{models_root}/move_model.pth',
weights_only=False).to(device)
model_frac.eval()
model_lr.eval()
model_parts.eval()
model_move.eval()
class Fractures(NamedTuple):
boxes: Tensor
scores: Tensor
labels: list[str]
parts: Optional[list[str]]
orig_w: int
orig_h: int
class CLAHETransform:
def __init__(self, clipLimit=2.0, tileGridSize=(8, 8)):
self.clipLimit = clipLimit
self.tileGridSize = tileGridSize
self.clahe = cv2.createCLAHE(clipLimit=self.clipLimit,
tileGridSize=self.tileGridSize)
def __call__(self, img, target=None):
img_np = img.cpu().numpy()
if img_np.ndim == 3 and img_np.shape[0] == 1:
img_np = img_np[0]
cl_img = self.clahe.apply(img_np)
cl_img_tensor = torch.from_numpy(cl_img).unsqueeze(0)
cl_img_tensor = cl_img_tensor.to(img.device)
return cl_img_tensor
transform_parts = T.Compose([
T.Resize((256, 256)),
T.Grayscale(num_output_channels=1),
CLAHETransform(clipLimit=2.0, tileGridSize=(8, 8)),
T.ToDtype(torch.float, scale=True),
T.ToTensor(),
T.Normalize(mean=[0.0773] * 3, std=[0.0516] * 3)
])
transform_frac = T.Compose([
T.Resize((512, 512)),
CLAHETransform(clipLimit=2.0, tileGridSize=(8, 8)),
T.ToDtype(torch.float, scale=True),
T.ToTensor(),
T.Normalize(mean=[0.40526121854782104], std=[0.23242981731891632])
])
transform_lr = T.Compose([
T.Resize((256, 256)),
CLAHETransform(clipLimit=2.0, tileGridSize=(8, 8)),
T.ToDtype(torch.float, scale=True),
T.ToPureTensor(),
T.Normalize(mean=[0.0773] * 3, std=[0.0516] * 3)
])
transform_move = transform_test = T.Compose([
T.Resize((224, 224)),
T.Grayscale(num_output_channels=1),
CLAHETransform(clipLimit=2.0, tileGridSize=(8, 8)),
T.ToDtype(torch.float, scale=True),
T.ToTensor(),
T.Normalize(mean=[0.4172] * 3, std=[0.2612] * 3)
])
parts_map = {
0: 'акромиального отростка',
1: 'клювовидного отростка',
2: 'плечевой кости',
3: 'суставной впадины',
4: 'тела',
5: 'шейки',
6: 'Фон',
7: 'головки',
8: 'анатомической шейки',
9: 'хирургической шейки',
10: 'диафиза'
}
parts2bones = {
0: 'Лопатка',
1: 'Лопатка',
2: 'Плечевая кость',
3: 'Лопатка',
4: 'Лопатка',
5: 'Лопатка',
6: 'Фон',
7: 'Плечевая кость',
8: 'Плечевая кость',
9: 'Плечевая кость',
10: 'Плечевая кость'
}
def _convert_bboxes(bboxes: Tensor, orig_width: int, orig_height: int,
transformed_width=512,
transformed_height=512) -> Tensor:
"""Масштабирует координаты боксов обратно к
размерам исходного изображения."""
conv_bboxes = bboxes.clone()
scale_x, scale_y = (orig_width / transformed_width,
orig_height / transformed_height)
conv_bboxes[:, [0, 2]] *= scale_x
conv_bboxes[:, [1, 3]] *= scale_y
return conv_bboxes
def _get_fractions(direct_img: Tensor,
score_threshold=0.07) -> Fractures:
"""Получение переломов"""
original = direct_img.clone()
image = transform_frac(direct_img).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_frac(image)[0]
scores = outputs['scores'].cpu()
valid = scores >= score_threshold
boxes = outputs['boxes'][valid].cpu()
scores = scores[valid]
keep = ops.nms(boxes, scores, 0.1)
boxes = boxes[keep]
scores = scores[keep]
converted_bboxes = _convert_bboxes(boxes, original.shape[2],
original.shape[1])
converted_bboxes = converted_bboxes[converted_bboxes[:, 0].argsort()]
fractures = Fractures(converted_bboxes, scores, [],
None, original.shape[2], original.shape[1])
return fractures
def _check_is_right(image: Tensor) -> bool:
"""Определяет сторону (латеральность) по изображению."""
image = transform_lr(image).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_lr(image)[0]
return bool(torch.argmax(outputs).item())
def _smooth_segmentation_mask(mask: np.ndarray, kernel_size=5,
min_area=10) -> np.ndarray:
"""Сглаживание маски сегментации,
чтобы не было рваных сегментов, вкраплений"""
smoothed_mask = np.zeros_like(mask)
n_classes = int(mask.max()) + 1
for cls in range(n_classes):
class_mask = (mask == cls).astype(np.uint8)
closed = cv2.morphologyEx(class_mask, cv2.MORPH_CLOSE,
np.ones((kernel_size, kernel_size),
np.uint8))
opened = cv2.morphologyEx(closed, cv2.MORPH_OPEN,
np.ones((kernel_size, kernel_size),
np.uint8))
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(opened,
connectivity=4)
clean = np.zeros_like(opened)
for i in range(1, num_labels):
if stats[i, cv2.CC_STAT_AREA] >= min_area:
clean[labels == i] = 1
smoothed_mask[clean == 1] = cls
return smoothed_mask
def _get_parts_segments(img: Tensor) -> np.ndarray:
img = img.repeat(3, 1, 1)
image = transform_parts(img).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_parts(image)[0]
output = outputs.detach().cpu().numpy()
predicted_classes = np.argmax(output, axis=0)
if np.sum(predicted_classes == 6) > 64000:
raise structs.ImagesError("Снимок иной анатомической области или низкого диагностического качества")
predicted_classes = _smooth_segmentation_mask(predicted_classes)
return predicted_classes
def _compute_pca_direction(coords: np.ndarray) -> np.ndarray:
mean = coords.mean(axis=0)
centered = coords - mean
cov = np.cov(centered, rowvar=False)
eigvals, eigvecs = np.linalg.eigh(cov)
principal = eigvecs[:, np.argmax(eigvals)]
return principal / np.linalg.norm(principal)
def _rotate_vector(vec: np.ndarray, angle_deg: float) -> np.ndarray:
theta = np.deg2rad(angle_deg)
rot = np.array([[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]])
return rot.dot(vec)
def _get_segment2_coords(mask: np.ndarray, class_idx=2) -> np.ndarray:
ys, xs = np.where(mask == class_idx)
return np.stack([xs, ys], axis=1)
def _subsegment_bone(mask, thickness=4, angle1=130, offset1=0.02, angle2=90,
offset2=0.25):
"""Разбиение плечевой кости на микро-сегменты"""
mask = mask.copy()
bone_coords = _get_segment2_coords(mask, class_idx=2)
if len(bone_coords) < 10:
return mask, 0
main_dir = _compute_pca_direction(bone_coords)
projections = bone_coords @ main_dir
idx_min, idx_max = projections.argmin(), projections.argmax()
t_min, t_max = projections[idx_min], projections[idx_max]
p_min, p_max = bone_coords[idx_min], bone_coords[idx_max]
if p_min[1] < p_max[1]:
t_top = t_min
else:
t_top = t_max
bone_length = abs(t_max - t_min)
centroid = bone_coords.mean(axis=0)
centroid_proj = centroid.dot(main_dir)
H, W = mask.shape
y_grid, x_grid = np.mgrid[0:H, 0:W]
pix = np.stack([x_grid.ravel(), y_grid.ravel()], axis=1)
seg2_mask_flat = (mask.ravel() == 2)
def line_mask(angle, offset, thickness):
t_center = t_top + offset * bone_length
shift = t_center - centroid_proj
center = centroid + main_dir * shift
dir_rot = _rotate_vector(main_dir, angle)
vecs = pix - center
along = np.dot(vecs, dir_rot)
ortho_vecs = vecs - np.outer(along, dir_rot)
dists = np.linalg.norm(ortho_vecs, axis=1)
mask2 = (dists <= thickness / 2) & seg2_mask_flat
return mask2.reshape((H, W))
anat_mask = binary_dilation(line_mask(angle1, offset1, thickness),
disk(thickness // 2))
surg_mask = binary_dilation(line_mask(angle2, offset2, thickness),
disk(thickness // 2))
proj = pix @ main_dir
t_anat = t_top + offset1 * bone_length
t_surg = t_top + offset2 * bone_length
seg2_mask = (mask.ravel() == 2)
diaf_mask = (proj > t_surg) & seg2_mask
diaf_mask = diaf_mask.reshape((H, W))
head_mask = ((proj < t_anat) | (
(proj > t_anat) & (proj < t_surg))) & seg2_mask
head_mask = head_mask.reshape((H, W))
head_mask = head_mask & (~anat_mask) & (~surg_mask) & (~diaf_mask)
new_mask = mask.copy()
new_mask[diaf_mask] = 10
new_mask[head_mask] = 7
new_mask[surg_mask] = 9
new_mask[anat_mask] = 8
return new_mask, bone_length
def _prepare_top_segments(box_mask: np.ndarray, weights: np.ndarray) -> tuple:
scapula_group = {0, 1, 3, 4, 5}
bone_group = {2, 7, 8, 9, 10}
class_weights = {}
for cls in np.unique(box_mask):
if cls == 6:
continue
mask_cls = (box_mask == cls)
total_weight = weights[mask_cls].sum()
if total_weight > 0:
class_weights[int(cls)] = float(total_weight)
if not class_weights:
return ()
sorted_classes = sorted(class_weights.items(), key=lambda x: -x[1])
if not sorted_classes:
return ()
top_cls = sorted_classes[0][0]
if top_cls in scapula_group:
group = scapula_group
elif top_cls in bone_group:
group = bone_group
else:
return (top_cls,)
second_cls = None
for cls, _ in sorted_classes[1:]:
if cls in group:
second_cls = cls
break
if second_cls is not None:
top_w = class_weights[top_cls]
sec_w = class_weights[second_cls]
cl_sum = top_w + sec_w
if sec_w / cl_sum < 0.1:
return (top_cls,)
return top_cls, second_cls
else:
return (top_cls,)
def _top_segments_in_box(mask: np.ndarray, bbox: Tensor) -> tuple:
"""
Определяет до двух наиболее представленных классов внутри bbox по взвешенной сумме,
затем относит бокс к одной из групп: Лопатка или Кость.
"""
bbox = bbox.cpu().numpy().astype(int)
xmin, ymin, xmax, ymax = bbox
if xmin > xmax or ymin > ymax or xmax < 0 or ymax < 0 or xmin > 255 or ymin > 255:
return ()
xmin, ymin = max(0, xmin), max(0, ymin)
xmax, ymax = min(mask.shape[1] - 1, xmax), min(mask.shape[0] - 1, ymax)
box_mask = mask[ymin:ymax + 1, xmin:xmax + 1]
h, w = box_mask.shape
ys, xs = np.mgrid[0:h, 0:w]
center_y, center_x = (h - 1) / 2, (w - 1) / 2
dists = np.sqrt((ys - center_y) ** 2 + (xs - center_x) ** 2)
max_dist = dists.max() if dists.max() > 0 else 1.0
weights = np.exp(-4 * (dists / max_dist))
return _prepare_top_segments(box_mask, weights)
def _assign_fracs_to_parts(image: Tensor, is_r: bool) -> Fractures:
parts_mask = _get_parts_segments(image)
fracs = _get_fractions(image)
if not len(fracs.boxes):
return fracs, 0
conv_boxes = _convert_bboxes(fracs.boxes, 256, 256,
fracs.orig_w, fracs.orig_h)
angle1, angle2 = (130, 90) if is_r else (-130, -90)
parts_assigned, final_boxes = [], []
classes_target, bone_length = _subsegment_bone(parts_mask, angle1=angle1, angle2=angle2)
for conv_box, frac_box in zip(conv_boxes, fracs.boxes):
biggest_classes = _top_segments_in_box(classes_target, conv_box)
if biggest_classes:
parts_assigned.append(biggest_classes)
final_boxes.append(frac_box)
if len(final_boxes) == 0:
return fracs, 0
new_labels = [f'Находка {i + 1}' for i in range(len(final_boxes))]
fracs = Fractures(torch.stack(final_boxes), fracs.scores, new_labels,
parts_assigned, fracs.orig_w, fracs.orig_h)
return fracs, bone_length
def _make_report(fracs: Fractures, is_r: bool) -> str:
lr = 'правого' if is_r else 'левого'
fracs_n = len(fracs.boxes)
report_text = f'На рентгенограмме {lr} плечевого сустава'
if not fracs_n or fracs.parts is None:
report_text += ' переломов не выявлено.'
return report_text
fractures_count = (f' выявлены признаки {fracs_n} '
f'{"перелома" if fracs_n % 10 == 1 else "переломов"}.')
report_text += fractures_count
for label, parts in zip(fracs.labels, fracs.parts):
parts_text = f' {label}: перелом '
parts_text += ', '.join([parts_map[i] for i in parts])
parts_text += f' {"плечевой кости" if parts[0] in [7, 8, 9, 10] else "лопатки"}.'
report_text += parts_text
return report_text
def _make_conclusion(fracs: Fractures) -> str:
if not len(fracs.boxes) or fracs.parts is None:
return 'Признаков перелома не выявлено.'
finds = {}
for parts in fracs.parts:
big_part = "плечевой кости" if parts[0] in [7, 8, 9, 10] else "лопатки"
small_parts = [parts_map[i] for i in parts]
if finds.get(big_part):
finds[big_part].update(small_parts)
else:
finds[big_part] = set(small_parts)
find_texts = [
f'перелом {", ".join(sorted(parts))} {bone}' for bone, parts in
finds.items()
]
conclusion = '; '.join(find_texts) + '.'
return conclusion.replace(' ', ' ').capitalize()
def _get_move_confidence(image):
image = image.repeat(3, 1, 1)
image = transform_move(image).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_move(image)[0]
if torch.argmax(outputs).item() == 1:
return 0
move_prob = torch.softmax(outputs, 0)[0]
return float(move_prob.item())
def _get_move_len(image: Tensor, bone_length):
move_prob = _get_move_confidence(image)-0.5
coef = 0.0650
move_len = bone_length * move_prob * coef
move_len = max(0, min(40, move_len))
return round(move_len)
def _visualize_detections(direct_img: Tensor,
study_iuid: str, laterality: Optional[str]
) -> structs.Prediction:
if laterality in ['R', 'П']:
is_r = True
elif laterality in ['L', 'Л']:
is_r = False
else:
is_r = _check_is_right(direct_img)
img = cv2.cvtColor(direct_img[0].numpy(), cv2.COLOR_GRAY2RGB)
font = cv2.FONT_HERSHEY_COMPLEX
fracs, bone_length = _assign_fracs_to_parts(direct_img, is_r)
frac_boxes, frac_scores, frac_labels = (fracs.boxes, fracs.scores,
fracs.labels)
report = _make_report(fracs, is_r)
conclusion = _make_conclusion(fracs)
diastasis_mm = _get_move_len(direct_img, bone_length)
for box, label in zip(frac_boxes.cpu().numpy(), frac_labels):
xmin, ymin, xmax, ymax = box.astype(int)
cv2.rectangle(img, (xmin, ymin), (xmax, ymax), color=(255, 0, 0),
thickness=2)
cv2.putText(img, label, (xmin, max(ymin - 5, 0)),
font, 0.5, (255, 0, 0), thickness=1, lineType=cv2.LINE_AA)
cv2.imwrite(f"client/static/{study_iuid}.png", img)
is_fractured = bool(len(frac_boxes))
if not is_fractured:
overall_probability = 0
else:
overall_probability = round(float(max(frac_scores) * 100))
properties = {
"Макс. величина диастаза отломков, мм": diastasis_mm
}
return structs.Prediction(overall_probability, is_fractured,
report, conclusion, img, properties)
def predict(input: structs.PredictorInput) -> structs.Prediction:
return _visualize_detections(input.image, input.study_uid, input.laterality)
+222
View File
@@ -0,0 +1,222 @@
from typing import NamedTuple
import random
import cv2
import torch
import numpy as np
from torchvision.transforms import v2 as T
from service import structs
device = torch.device('cuda')
model = torch.load("service/models/sinus/segmodel.pth", map_location=device,
weights_only=False)
model.eval()
THRESHOLD = 0.56
AREA_LIMIT = 80
transforms = T.Compose([
T.ToDtype(torch.float, scale=True),
T.ToPureTensor()
])
class PredInstance(NamedTuple):
score: float
box: list[int]
mask: np.ndarray
def _find_contours(tensor):
cnt_args = (cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
mask = tensor.cpu().numpy().astype(np.uint8) * 255
return cv2.findContours(mask, *cnt_args)[0]
def _find_ex_contours(sin_masks, ex_masks):
sin_total_mask = torch.any(sin_masks, dim=0)
ex_total_mask = torch.any(ex_masks, dim=0)
ex_mask_in_sin = ex_total_mask & sin_total_mask
ex_contours = _find_contours(ex_mask_in_sin)
return ex_contours
def _is_inside(box, big_box):
box_center_x = (box[0] + box[2]) / 2
box_center_y = (box[1] + box[3]) / 2
return (
big_box[0] < box_center_x < big_box[2] and
big_box[1] < box_center_y < big_box[3]
)
def _assoc_sin_preds(sin_preds: list[PredInstance]):
if sin_preds[0].box[0] < sin_preds[1].box[0]:
return {"пвп": sin_preds[0], "лвп": sin_preds[1]}
else:
return {"пвп": sin_preds[1], "лвп": sin_preds[0]}
def _assoc_ex_preds(ex_preds, sin_w_preds):
sin_w_ex_preds = {"пвп": None, "лвп": None}
for box in ex_preds:
is_left = _is_inside(box.box, sin_w_preds["лвп"].box)
is_right = _is_inside(box.box, sin_w_preds["пвп"].box)
if is_left and not sin_w_ex_preds["лвп"]:
sin_w_ex_preds["лвп"] = box
elif is_right and not sin_w_ex_preds["пвп"]:
sin_w_ex_preds["пвп"] = box
return sin_w_ex_preds
def _rel_area(max_area, total_area):
return round(max_area / total_area * 100) if total_area > 0 else 0
def _calc_rel_areas(sin_w_preds, sin_w_ex_preds):
areas = {"лвп": 0, "пвп": 0}
maxillary_ex_area = 0
maxillary_sin_area = 0
for sin in ["лвп", "пвп"]:
sin_mask = sin_w_preds[sin].mask
sin_area = cv2.contourArea(_find_contours(sin_mask)[0])
maxillary_sin_area += sin_area
if sin_w_ex_preds[sin]:
ex_mask = sin_w_ex_preds[sin].mask & sin_mask
ex_area = cv2.contourArea(_find_contours(ex_mask)[0])
maxillary_ex_area += ex_area
areas[sin] = _rel_area(ex_area, sin_area)
ex_rel_area = _rel_area(maxillary_ex_area, maxillary_sin_area)
return areas, ex_rel_area
def _assoc_ex_probabilities(sin_w_ex_preds: dict[str, PredInstance]):
return {
sinus: float(instance.score) if instance is not None else 0.0
for sinus, instance in sin_w_ex_preds.items()
}
def _contours_and_text_overlay(study_iuid: str, img: np.ndarray, ex_contours):
img_rgb = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
dcm_img = img_rgb.copy()
cv2.drawContours(dcm_img, ex_contours, -1, (255, 0, 0), 2)
mark_img = dcm_img.copy()
cv2.drawContours(mark_img, ex_contours, -1, (0, 0, 255), 2)
cv2.imwrite(f"client/static/{study_iuid}.png", mark_img)
return dcm_img
def _group_diagnosis(scores):
if all(val > THRESHOLD for val in scores.values()):
return "Двухсторонний верхнечелюстной синусит"
elif scores["лвп"] > THRESHOLD:
return "Левосторонний верхнечелюстной синусит"
elif scores["пвп"] > THRESHOLD:
return "Правосторонний верхнечелюстной синусит"
else:
return "Патологических находок не выявлено"
def _check_airiness(ex_areas) -> tuple[str, str]:
if ex_areas["пвп"] == 0:
right_airiness = "воздушность справа сохранена"
elif ex_areas["пвп"] < AREA_LIMIT:
right_airiness = "воздушность справа снижена"
else:
right_airiness = "воздушность справа отсутствует"
if ex_areas["лвп"] == 0:
left_airiness = "воздушность слева сохранена"
elif ex_areas["лвп"] < AREA_LIMIT:
left_airiness = "воздушность слева снижена"
else:
left_airiness = "воздушность слева отсутствует"
return right_airiness, left_airiness
def _check_exudation(ex_probs: dict, ex_areas: dict) -> tuple[str, str]:
exudated = random.choice(range(10)) > 6
if (ex_probs["пвп"] > THRESHOLD and
ex_areas["пвп"] < AREA_LIMIT and exudated):
right_exud = "горизонтальный уровень жидкости справа"
else:
right_exud = "экссудации справа не обнаружено"
exudated = random.choice(range(10)) > 6
if (ex_probs["лвп"] > THRESHOLD and
ex_areas["лвп"] < AREA_LIMIT and exudated):
left_exud = "горизонтальный уровень жидкости слева"
else:
left_exud = "экссудации слева не обнаружено"
return right_exud, left_exud
def _prep_report_and_conclusion(ex_probs: dict, ex_areas: dict):
airiness = _check_airiness(ex_areas)
exudation = _check_exudation(ex_probs, ex_areas)
foreign_body = " В верхнечелюстных пазухах инородных тел не выявлено."
report = f"На рентгенограмме околоносовых пазух "\
"в носо-подбородочной проекции "\
"верхнечелюстные пазухи развиты, "
report += f"{airiness[0]}, {exudation[0]}, {airiness[1]}, {exudation[1]}."
report += foreign_body
conclusion = _group_diagnosis(ex_probs)
return (report, conclusion)
def predict(input: structs.PredictorInput) -> structs.Prediction:
with torch.no_grad():
x = transforms(input.image).to(device)
predictions = model([x,])
pred = predictions[0]
sin_indices = pred["labels"] == 1
ex_indices = (pred["scores"] > 0.5) & (pred["labels"] == 2)
if sin_indices.sum() < 2:
raise structs.ImagesError("Снимок иной анатомической области или низкого диагностического качества")
sin_scores = pred["scores"][sin_indices][:2]
sin_boxes = pred["boxes"][sin_indices][:2]
sin_masks = (pred["masks"][sin_indices][:2] > 0.5).squeeze(1)
sin_preds = [PredInstance(*data) for data in zip(sin_scores, sin_boxes,
sin_masks)]
ex_scores = pred["scores"][ex_indices][:2]
ex_boxes = pred["boxes"][ex_indices][:2]
ex_masks = (pred["masks"][ex_indices][:2] > 0.5).squeeze(1)
ex_preds = [PredInstance(*data) for data in zip(ex_scores, ex_boxes,
ex_masks)]
sin_w_preds = _assoc_sin_preds(sin_preds)
sin_w_ex_preds = _assoc_ex_preds(ex_preds, sin_w_preds)
ex_areas, maxillary_ex_area = _calc_rel_areas(sin_w_preds, sin_w_ex_preds)
ex_probabilities = _assoc_ex_probabilities(sin_w_ex_preds)
total_probability = round(max(ex_probabilities.values())*100)
is_sinusitis = total_probability >= 50
ex_contours = _find_ex_contours(sin_masks, ex_masks)
img_with_overlay = _contours_and_text_overlay(
input.study_uid, input.image.squeeze().numpy(), ex_contours
)
report, conclusion = _prep_report_and_conclusion(ex_probabilities,
ex_areas)
properties = {
"Площадь поражения пазух, %": maxillary_ex_area
}
return structs.Prediction(total_probability, is_sinusitis, report,
conclusion, img_with_overlay, properties)
+484
View File
@@ -0,0 +1,484 @@
import warnings
from typing import NamedTuple, Optional
import cv2
import numpy as np
import torch
import torchvision.ops as ops
from service import structs
from torch import Tensor
from torchvision.transforms import v2 as T
warnings.filterwarnings('ignore', category=UserWarning)
device = torch.device('cuda')
models_root = "service/models/wrist/"
model_frac = torch.load(f'{models_root}/frac_model.pth',
weights_only=False).to(device)
model_bone = torch.load(f'{models_root}/bone_model.pth',
weights_only=False).to(device)
model_lr = torch.load(f'{models_root}/lr_model.pth',
weights_only=False).to(device)
model_move = torch.load(f'{models_root}/move_model.pth',
weights_only=False).to(device)
model_frac.eval()
model_bone.eval()
model_lr.eval()
model_move.eval()
class ProjectionSegments(NamedTuple):
parts_boxes: Tensor
parts_labels: list[str]
bones_boxes: Tensor
bones_labels: list[str]
class Fractures(NamedTuple):
boxes: Tensor
scores: Tensor
labels: list[str]
bones: list[str]
class CLAHETransform:
def __init__(self, clipLimit=2.0, tileGridSize=(8, 8)):
self.clipLimit = clipLimit
self.tileGridSize = tileGridSize
self.clahe = cv2.createCLAHE(self.clipLimit, self.tileGridSize)
def __call__(self, img):
img_np = img.cpu().numpy()
if img_np.ndim == 3 and img_np.shape[0] == 1:
img_np = img_np[0]
cl_img = self.clahe.apply(img_np)
return torch.from_numpy(cl_img).unsqueeze(0).to(img.device)
transform_bone = T.Compose([
T.Resize((256, 256)),
CLAHETransform(clipLimit=2.0, tileGridSize=(8, 8)),
T.ToDtype(torch.float, scale=True),
T.ToTensor(),
T.Normalize(mean=[0.3354] * 3, std=[0.2000] * 3)
])
transform_frac = T.Compose([
T.Resize((512, 512)),
CLAHETransform(clipLimit=2.0, tileGridSize=(8, 8)),
T.ToDtype(torch.float, scale=True),
T.ToTensor(),
T.Normalize(mean=[0.21549856662750244], std=[0.24515700340270996])
])
transform_lr = T.Compose([
T.Resize((256, 256)),
T.ToDtype(torch.float, scale=True),
T.ToTensor(),
T.Normalize(mean=[0.9278] * 3, std=[0.2089] * 3)
])
transform_move = T.Compose([
T.Resize((224, 224)),
CLAHETransform(clipLimit=2.0, tileGridSize=(8, 8)),
T.ToDtype(torch.float, scale=True),
T.ToTensor(),
T.Normalize(mean=[0.3354] * 3, std=[0.2000] * 3)
])
bone_translator = {
'1': 'локтевой кости',
'2': 'лучевой кости',
'3': 'лучевой или локтевой кости',
'4': 'костей запястья'
}
bone_parts_translator = {
'head': 'головки',
'styloid': 'шиловидного отростка',
'epiphysis': 'эпифиза',
'metaphysis': 'метафиза',
'diaphysis': 'диафиза',
'hand': ''
}
bone_translator_clear = {
'1': 'Локтевая кость',
'2': 'Лучевая кость',
'3': 'Лучевая или локтевая кость',
'4': 'Кости запястья'
}
bone_ratios = {
'1': {'styloid_1': 0.02, 'head_1': 0.02, 'epiphysis_1': 0.05,
'metaphysis_1': 0.03, 'diaphysis_1': 0.8},
'2': {'styloid_2': 0.04, 'epiphysis_2': 0.13, 'metaphysis_2': 0.03,
'diaphysis_2': 0.8},
'3': {'styloid_3': 0.04, 'epiphysis_3': 0.13, 'metaphysis_3': 0.03,
'diaphysis_3': 0.8}
}
def _convert_bboxes(bboxes: Tensor, orig_w: int, orig_h: int,
transformed_w=256, transformed_h=256) -> Tensor:
"""Масштабирует координаты боксов обратно к
размерам исходного изображения."""
bboxes[:, [0, 2]] *= orig_w / transformed_w
bboxes[:, [1, 3]] *= orig_h / transformed_h
return bboxes
def _get_bone_parts(box: Tensor, bone_type: str) -> dict[str: Tensor]:
"""Разбивает бокc кости на сегменты согласно заданным пропорциям."""
if bone_type == '4':
return {'hand_4': box}
xmin, ymin, xmax = box[:3]
bone_h = (xmax - xmin) * (9 if bone_type == '1' else 5)
parts, current_y = {}, ymin
for part, ratio in bone_ratios[bone_type].items():
part_h = ratio * bone_h
parts[part] = (xmin, current_y, xmax, current_y + part_h)
current_y = current_y + part_h
return parts
def _check_is_right(image: Tensor) -> bool:
"""Определяет сторону (латеральность) по изображению."""
image = transform_lr(image).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_lr(image)[0]
return bool(torch.argmax(outputs).item())
def _get_projection_bones(image: Tensor) -> Optional[ProjectionSegments]:
"""Получает боксы и сегменты кости для проекции."""
original = image.clone()
image = transform_bone(image).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_bone(image)[0]
scores = outputs['scores']
valid = scores >= 0.15
bones_boxes = outputs['boxes'][valid].cpu()
bone_labels = [str(int(i.item())) for i in outputs['labels'][valid].cpu()]
filtered_scores = scores[valid]
if not any(filtered_scores):
raise structs.ImagesError("Снимок иной анатомической области или низкого диагностического качества")
best = {}
for score, box, label in zip(filtered_scores, bones_boxes, bone_labels):
if label not in best or score.item() > best[label][0]:
best[label] = (score.item(), box)
# 4 - ладонь
if '3' in best:
best['4'] = best['3']
best.pop('3')
bones_boxes = torch.stack([b for _, b in best.values()])
bone_labels = list(best.keys())
conv_bone_boxes = _convert_bboxes(bones_boxes, original.shape[2],
original.shape[1])
parts_boxes, parts_labels = [], []
for box, bone in zip(conv_bone_boxes, bone_labels):
segments = _get_bone_parts(box.cpu().numpy(), bone)
for part, coords in segments.items():
parts_boxes.append(torch.tensor(coords))
parts_labels.append(part)
parts_boxes = torch.stack(parts_boxes)
return ProjectionSegments(parts_boxes, parts_labels, conv_bone_boxes,
bone_labels)
def _get_fractions(direct_img: Tensor,
score_threshold=0.35) -> tuple[Tensor, Tensor]:
image = transform_frac(direct_img).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_frac(image)[0]
scores = outputs['scores'].cpu()
valid = scores >= score_threshold
boxes = outputs['boxes'][valid].cpu()
scores = scores[valid]
keep = ops.nms(boxes, scores, 0.2)
boxes = boxes[keep]
scores = scores[keep]
converted_bboxes = _convert_bboxes(boxes, direct_img.shape[2],
direct_img.shape[1],
transformed_w=512,
transformed_h=512)
return converted_bboxes, scores
def _assign_fracs_to_bones(frac_boxes: Tensor,
bone_boxes: Tensor, bone_labels: list[str]) \
-> dict[str: tuple[Tensor, list[str]]]:
"""Относит каждый бокс ровно к одной кости"""
assignments = {}
new_boxes = []
i = 1
if len(bone_boxes) > 0:
for frac_box in frac_boxes:
inter_xmin = torch.max(frac_box[0], bone_boxes[:, 0])
inter_ymin = torch.max(frac_box[1], bone_boxes[:, 1])
inter_xmax = torch.min(frac_box[2], bone_boxes[:, 2])
inter_ymax = torch.min(frac_box[3], bone_boxes[:, 3])
x_len = (inter_xmax - inter_xmin).clamp(min=0)
y_len = (inter_ymax - inter_ymin).clamp(min=0)
inter_area = x_len * y_len
best_idx = torch.argmax(inter_area)
if inter_area[best_idx].item() > 0:
bone = bone_labels[best_idx]
if bone in assignments:
assignments[bone] = (
torch.cat([assignments[bone][0], frac_box.unsqueeze(0)]),
assignments[bone][1] + [f'Находка {i}'])
else:
assignments[bone] = (frac_box.unsqueeze(0), [f'Находка {i}'])
new_boxes.append(frac_box)
i += 1
boxes_tensor = torch.stack(new_boxes) if new_boxes else torch.tensor([])
else:
boxes_tensor = torch.tensor([])
return assignments, boxes_tensor
def _assign_fracs_to_parts(frac_boxes: Tensor, frac_labels: list[str],
part_boxes: Tensor,
part_labels: np.ndarray[str]) -> list[str]:
"""Для каждого перелома ищет 2 части кости с наибольшим пересечением"""
result = []
for i in range(frac_boxes.shape[0]):
frac_box = frac_boxes[i]
intersections = []
for j in range(part_boxes.shape[0]):
part_box = part_boxes[j]
inter_xmin = max(frac_box[0].item(), part_box[0].item())
inter_ymin = max(frac_box[1].item(), part_box[1].item())
inter_xmax = min(frac_box[2].item(), part_box[2].item())
inter_ymax = min(frac_box[3].item(), part_box[3].item())
# Если пересечение существует, вычисляем площадь пересечения
if inter_xmax > inter_xmin and inter_ymax > inter_ymin:
area = (inter_xmax - inter_xmin) * (inter_ymax - inter_ymin)
intersections.append((j, area))
# Сортируем найденные пересечения по площади в
# порядке убывания и выбираем топ-2
intersections.sort(key=lambda x: x[1], reverse=True)
for j, _ in intersections[:2]:
result.append(f'{part_labels[j]}_{frac_labels[i]}')
return result
def _format_single_fracture(label: str) -> tuple[str, str]:
"""
Формирует сообщение для одного найденного перелома.
"""
name, part = label.split('_')[:2]
bone = bone_translator[part]
msg = f'выявлен признак перелома {bone_parts_translator[name]} {bone}.'
return ('', msg) if part == '3' else (msg, '')
def _format_multiple_fractures(labels: list[str]) -> tuple[str, str, int, int]:
"""
Группирует найденные переломы по идентификатору и формирует сообщения
для фронтальной и боковой проекций.
"""
finds = {}
for label in labels:
name, part, n = label.split('_')
bone = bone_translator[part]
finds.setdefault(n, []).append((bone_parts_translator[name], bone))
front_msgs, side_msgs = [], []
front_count = side_count = 0
for n in sorted(finds.keys()):
parts = [x[0] for x in finds[n]]
if 'метафиза' in parts and 'эпифиза' in parts:
parts = ['метаэпифиза']
find_text = f' {n}: перелом ' + ', '.join(parts) + f' {finds[n][0][1]}.'
if finds[n][0][1] != 'лучевой или локтевой кости':
front_msgs.append(find_text)
front_count += 1
else:
side_msgs.append(find_text)
side_count += 1
front_msg = (f'выявлены признаки {front_count} ' +
('переломов' if front_count > 1 else 'перелома') +
'. ' + ''.join(front_msgs)) if front_count else ''
side_msg = (f'выявлены признаки {side_count} ' +
('переломов' if side_count > 1 else 'перелома') +
'. ' + ''.join(side_msgs)) if side_count else ''
return front_msg, side_msg, front_count, side_count
def _make_reports(labels: list[str], is_r: bool) -> str:
"""
Формирует текстовый отчёт по найденным переломам.
"""
labels = list(set(labels))
num_frac = len(labels)
lr = 'правого' if is_r else 'левого'
# Отсутствие переломов
if num_frac == 0:
return (
f'На рентгенограмме {lr} лучезапястного сустава '
f'признаков перелома не выявлено.')
# Один найденный перелом
if num_frac == 1:
front_msg, side_msg = _format_single_fracture(labels[0])
front_count = 1 if front_msg else 0
else:
(front_msg, side_msg,
front_count, side_count) = _format_multiple_fractures(labels)
report_lines = []
if front_count:
report_lines.append(
f'На рентгенограмме {lr} лучезапястного сустава '
f'{front_msg}')
return ' '.join(report_lines).replace(' ', ' ')
def _make_conclusion(labels: list[str]) -> str:
num_frac = len(labels)
if num_frac == 0:
return 'Признаков перелома не выявлено.'
finds = {}
for label in labels:
name, part = label.split('_')[:2]
bone = bone_translator[part]
part = bone_parts_translator[name]
if bone in finds:
finds[bone].add(part)
else:
finds[bone] = {part}
find_texts = []
for bone in finds:
if 'метафиза' in finds[bone] and 'эпифиза' in finds[bone]:
finds[bone].remove('метафиза')
finds[bone].remove('эпифиза')
finds[bone].add('метаэпифиза')
if bone != 'лучевой или локтевой кости' or (
'лучевой кости' not in finds and
'локтевой кости' not in finds):
find_text = f'перелом '
find_text += ', '.join(sorted(finds[bone])) + f' {bone}'
find_texts.append(find_text)
conclusion = '; '.join(find_texts)
return conclusion.replace(' ', ' ').capitalize()
def _process_fractures(direct_img: Tensor,
combined: ProjectionSegments) -> tuple:
"""Обрабатываем боксы переломов, сортируем их и распределяем по
сегментам кости.
"""
frac_boxes, frac_scores = _get_fractions(direct_img)
frac_labels = [f'Находка {i + 1}' for i in range(len(frac_boxes))]
frac_boxes = frac_boxes[frac_boxes[:, 0].argsort()]
assigned, frac_boxes = _assign_fracs_to_bones(frac_boxes,
combined.bones_boxes,
combined.bones_labels)
hurt_bones = []
for bone_type, (cur_frac_boxes, cur_frac_labels) in assigned.items():
if bone_type == '4':
for label in cur_frac_labels:
hurt_bones.append(f'hand_4_{label}')
else:
mask = [lbl.endswith(f'_{bone_type}')
for lbl in combined.parts_labels]
mask_labels = np.array(combined.parts_labels)[mask]
hurt_bones += _assign_fracs_to_parts(cur_frac_boxes,
cur_frac_labels,
combined.parts_boxes[mask],
mask_labels)
return Fractures(frac_boxes, frac_scores, frac_labels, hurt_bones)
def _get_move_confidence(image: Tensor) -> float:
image = transform_move(image).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_move(image)[0]
if torch.argmax(outputs).item() == 1:
return 0
move_prob = torch.softmax(outputs, 0)[0]
return float(move_prob.item())
def _get_diastas_len(image: Tensor, parts: ProjectionSegments) -> int:
if '2' not in parts.bones_labels:
return 0
luch_box = parts.bones_boxes[parts.bones_labels.index('2')]
xmin, _, xmax = luch_box[:3]
bone_w = xmax - xmin
move_prob = _get_move_confidence(image)
coef = 0.025
move_len = (bone_w * move_prob * coef).item()
move_len = min(40, move_len)
return round(move_len)
def _visualize_detections(direct_img: Tensor, study_iuid: str,
laterality: Optional[str]) -> structs.Prediction:
"""Основная функция визуализации детекции с объединением боксов"""
if laterality in ['R', 'П']:
is_r = True
elif laterality in ['L', 'Л']:
is_r = False
else:
is_r = _check_is_right(direct_img)
bone_segments = _get_projection_bones(direct_img)
fracs = _process_fractures(direct_img, bone_segments)
report = _make_reports(fracs.bones, is_r)
conclusion = _make_conclusion(fracs.bones)
diastasis_mm = _get_diastas_len(direct_img, bone_segments)
diastasis_mm = diastasis_mm if fracs.labels else 0
img = cv2.cvtColor(direct_img[0].numpy(), cv2.COLOR_GRAY2RGB)
color = (255, 0, 0)
font = cv2.FONT_HERSHEY_COMPLEX
for box, label in zip(fracs.boxes.cpu().numpy().astype(int), fracs.labels):
cv2.rectangle(img, box[:2], box[2:], color, 2)
ytext = max(box[1] - 5, 0)
cv2.putText(img, label, (box[0], ytext), font, 0.5, color, 1)
cv2.imwrite(f"client/static/{study_iuid}.png", img)
is_fractured = bool(len(fracs.boxes))
if not is_fractured:
overall_probability = 0
else:
overall_probability = round(float(max(fracs.scores)*100))
properties = {
"Макс. величина диастаза отломков, мм": diastasis_mm
}
return structs.Prediction(overall_probability, is_fractured,
report, conclusion, img, properties)
def predict(input: structs.PredictorInput) -> structs.Prediction:
return _visualize_detections(input.image, input.study_uid,
input.laterality)