phantom-mri-dicoms/code/heuristic-q01.py
Michał Szczepanik 34f2bfd434 Fix DWI fieldmap handling
The heuristic did not handle DICOM series named "dwi_acq-(...)b0ref"
correctly - it placed them as "_sbref" into dwi/, while they should
have been "_epi" in func. These have an opposite phase encoding
direction to the main dwi acquisitions, and are intended for field map
correction.

This change assigns the correct folder and suffix to these files, and
adds dir-PA / dir-AP to the dwi and reference acquisitions. As a
result, QSIPrep should pick these up as field maps and apply
susceptibility distorsion correction.
2026-05-11 12:26:00 +02:00

158 lines
6 KiB
Python

from itertools import product
import re
from typing import Optional
from heudiconv.utils import SeqInfo
# https://heudiconv.readthedocs.io/en/latest/heuristics.html#populate-intended-for-opts
POPULATE_INTENDED_FOR_OPTS = {
"matching_parameters": ["ImagingVolume", "Shims"],
"criterion": "Closest",
}
def create_key(
template: Optional[str],
outtype: tuple[str, ...] = ("nii.gz",),
annotation_classes: None = None,
) -> tuple[str, tuple[str, ...], None]:
if template is None or not template:
raise ValueError("Template must be a valid format string")
return (template, outtype, annotation_classes)
def infotodict(
seqinfo: list[SeqInfo],
) -> dict[tuple[str, tuple[str, ...], None], list[str]]:
"""Heuristic evaluator for determining which runs belong where
allowed template fields - follow python string module:
item: index within category
subject: participant id
seqitem: run number during scanning
subindex: sub index within group
"""
# regular expressions to match protocol names of each type (named groups capture detail)
anat_regex = re.compile(r"anat(?!_acq-scout)(_acq-.*)?_(?P<suffix>T[12]w)")
func_regex = re.compile(r"task-(?P<task>rest(vid[an])?)")
fmap_regex = re.compile(r"fmap(_se_|_dir-)(?P<dir>ap|pa)(.*_run-(?P<run>\d))?")
dwi_regex = re.compile(r"dwi_(acq-)?(?P<label>[a-zA-Z0-9]+)")
# keys for the mapping
t1w = create_key("{bids_subject_session_dir}/anat/{bids_subject_session_prefix}_T1w")
t2w = create_key("{bids_subject_session_dir}/anat/{bids_subject_session_prefix}_T2w")
rest = create_key("{bids_subject_session_dir}/func/{bids_subject_session_prefix}_task-rest_bold")
restvida = create_key("{bids_subject_session_dir}/func/{bids_subject_session_prefix}_task-restvida_bold")
restvidn = create_key("{bids_subject_session_dir}/func/{bids_subject_session_prefix}_task-restvidn_bold")
fmap_ap = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_dir-ap_run-{item}_epi")
fmap_pa = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_dir-pa_run-{item}_epi")
dwi_b1200 = create_key("{bids_subject_session_dir}/dwi/{bids_subject_session_prefix}_acq-b1200_dir-PA_dwi")
ref_b1200 = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_acq-b1200_dir-AP_epi")
dwi_mshell = create_key("{bids_subject_session_dir}/dwi/{bids_subject_session_prefix}_acq-mshell_dir-PA_dwi")
ref_mshell = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_acq-mshell_dir-AP_epi")
# generate fieldmap keys with fixed run labels for explicitly declared runs
fmap_explicit_keys = {
(dirlabel, runlabel): create_key(
f"{{bids_subject_session_dir}}/fmap/{{bids_subject_session_prefix}}_dir-{dirlabel}_run-{runlabel}_epi"
)
for dirlabel, runlabel in product(("ap", "pa"), ("1", "2", "3"))
}
info: dict[tuple[str, tuple[str, ...], None], list[str]] = {
t1w: [],
t2w: [],
rest: [],
restvida: [],
restvidn: [],
fmap_ap: [],
fmap_pa: [],
dwi_b1200: [],
ref_b1200: [],
dwi_mshell: [],
ref_mshell: [],
}
# also include the generated keys
info.update({k: [] for k in fmap_explicit_keys.values()})
for s in seqinfo:
"""
The namedtuple `s` contains the following fields:
* total_files_till_now
* example_dcm_file
* series_id
* dcm_dir_name
* unspecified2
* unspecified3
* dim1
* dim2
* dim3
* dim4
* TR
* TE
* protocol_name
* is_motion_corrected
* is_derived
* patient_id
* study_description
* referring_physician_name
* series_description
* image_type
"""
if (m := anat_regex.search(s.protocol_name)) is not None:
if m.group("suffix") == "T1w":
info[t1w].append(s.series_id)
elif m.group("suffix") == "T2w":
info[t2w].append(s.series_id)
elif (m := func_regex.search(s.protocol_name)) is not None:
if m.group("task") == "rest":
info[rest].append(s.series_id)
if m.group("task") == "restvida":
info[restvida].append(s.series_id)
if m.group("task") == "restvidn":
info[restvidn].append(s.series_id)
elif (m := dwi_regex.search(s.protocol_name)) is not None:
if "b1200" in m.group("label") and not s.is_derived:
info[dwi_b1200].append(s.series_id)
elif "b0ref" in m.group("label"):
info[ref_b1200].append(s.series_id)
elif "mshell" in m.group("label") and not s.is_derived:
# need to look outside the matched group
if "ref" in s.protocol_name:
info[ref_mshell].append(s.series_id)
else:
info[dwi_mshell].append(s.series_id)
elif (m := fmap_regex.search(s.protocol_name)) is not None:
if m.group("run") is None:
# implicit run numbering
if m.group("dir") == "ap":
info[fmap_ap].append(s.series_id)
elif m.group("dir") == "pa":
info[fmap_pa].append(s.series_id)
else:
# explicit run numbering
key = fmap_explicit_keys.get((m.group("dir"), m.group("run")))
if key is None:
# unexpected label combination, ignore
continue
info[key].append(s.series_id)
# deduplicate
removed = []
for key, sid_list in info.items():
if key not in {fmap_ap, fmap_pa}:
# we allow (expect) run identifiers in field maps
continue
if len(sid_list) > 1:
# all other should be unique, keep last in case of repetitions
removed.extend(sid_list[:-1])
del sid_list[:-1]
return info