The heuristic did not handle DICOM series named "dwi_acq-(...)b0ref" correctly - it placed them as "_sbref" into dwi/, while they should have been "_epi" in func. These have an opposite phase encoding direction to the main dwi acquisitions, and are intended for field map correction. This change assigns the correct folder and suffix to these files, and adds dir-PA / dir-AP to the dwi and reference acquisitions. As a result, QSIPrep should pick these up as field maps and apply susceptibility distorsion correction.
158 lines
6 KiB
Python
158 lines
6 KiB
Python
from itertools import product
|
|
import re
|
|
from typing import Optional
|
|
from heudiconv.utils import SeqInfo
|
|
|
|
# https://heudiconv.readthedocs.io/en/latest/heuristics.html#populate-intended-for-opts
|
|
POPULATE_INTENDED_FOR_OPTS = {
|
|
"matching_parameters": ["ImagingVolume", "Shims"],
|
|
"criterion": "Closest",
|
|
}
|
|
|
|
|
|
def create_key(
|
|
template: Optional[str],
|
|
outtype: tuple[str, ...] = ("nii.gz",),
|
|
annotation_classes: None = None,
|
|
) -> tuple[str, tuple[str, ...], None]:
|
|
if template is None or not template:
|
|
raise ValueError("Template must be a valid format string")
|
|
return (template, outtype, annotation_classes)
|
|
|
|
|
|
def infotodict(
|
|
seqinfo: list[SeqInfo],
|
|
) -> dict[tuple[str, tuple[str, ...], None], list[str]]:
|
|
"""Heuristic evaluator for determining which runs belong where
|
|
|
|
allowed template fields - follow python string module:
|
|
|
|
item: index within category
|
|
subject: participant id
|
|
seqitem: run number during scanning
|
|
subindex: sub index within group
|
|
"""
|
|
|
|
# regular expressions to match protocol names of each type (named groups capture detail)
|
|
anat_regex = re.compile(r"anat(?!_acq-scout)(_acq-.*)?_(?P<suffix>T[12]w)")
|
|
func_regex = re.compile(r"task-(?P<task>rest(vid[an])?)")
|
|
fmap_regex = re.compile(r"fmap(_se_|_dir-)(?P<dir>ap|pa)(.*_run-(?P<run>\d))?")
|
|
dwi_regex = re.compile(r"dwi_(acq-)?(?P<label>[a-zA-Z0-9]+)")
|
|
|
|
# keys for the mapping
|
|
t1w = create_key("{bids_subject_session_dir}/anat/{bids_subject_session_prefix}_T1w")
|
|
t2w = create_key("{bids_subject_session_dir}/anat/{bids_subject_session_prefix}_T2w")
|
|
rest = create_key("{bids_subject_session_dir}/func/{bids_subject_session_prefix}_task-rest_bold")
|
|
restvida = create_key("{bids_subject_session_dir}/func/{bids_subject_session_prefix}_task-restvida_bold")
|
|
restvidn = create_key("{bids_subject_session_dir}/func/{bids_subject_session_prefix}_task-restvidn_bold")
|
|
fmap_ap = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_dir-ap_run-{item}_epi")
|
|
fmap_pa = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_dir-pa_run-{item}_epi")
|
|
dwi_b1200 = create_key("{bids_subject_session_dir}/dwi/{bids_subject_session_prefix}_acq-b1200_dir-PA_dwi")
|
|
ref_b1200 = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_acq-b1200_dir-AP_epi")
|
|
dwi_mshell = create_key("{bids_subject_session_dir}/dwi/{bids_subject_session_prefix}_acq-mshell_dir-PA_dwi")
|
|
ref_mshell = create_key("{bids_subject_session_dir}/fmap/{bids_subject_session_prefix}_acq-mshell_dir-AP_epi")
|
|
|
|
# generate fieldmap keys with fixed run labels for explicitly declared runs
|
|
fmap_explicit_keys = {
|
|
(dirlabel, runlabel): create_key(
|
|
f"{{bids_subject_session_dir}}/fmap/{{bids_subject_session_prefix}}_dir-{dirlabel}_run-{runlabel}_epi"
|
|
)
|
|
for dirlabel, runlabel in product(("ap", "pa"), ("1", "2", "3"))
|
|
}
|
|
|
|
info: dict[tuple[str, tuple[str, ...], None], list[str]] = {
|
|
t1w: [],
|
|
t2w: [],
|
|
rest: [],
|
|
restvida: [],
|
|
restvidn: [],
|
|
fmap_ap: [],
|
|
fmap_pa: [],
|
|
dwi_b1200: [],
|
|
ref_b1200: [],
|
|
dwi_mshell: [],
|
|
ref_mshell: [],
|
|
}
|
|
|
|
# also include the generated keys
|
|
info.update({k: [] for k in fmap_explicit_keys.values()})
|
|
|
|
for s in seqinfo:
|
|
"""
|
|
The namedtuple `s` contains the following fields:
|
|
|
|
* total_files_till_now
|
|
* example_dcm_file
|
|
* series_id
|
|
* dcm_dir_name
|
|
* unspecified2
|
|
* unspecified3
|
|
* dim1
|
|
* dim2
|
|
* dim3
|
|
* dim4
|
|
* TR
|
|
* TE
|
|
* protocol_name
|
|
* is_motion_corrected
|
|
* is_derived
|
|
* patient_id
|
|
* study_description
|
|
* referring_physician_name
|
|
* series_description
|
|
* image_type
|
|
"""
|
|
|
|
if (m := anat_regex.search(s.protocol_name)) is not None:
|
|
if m.group("suffix") == "T1w":
|
|
info[t1w].append(s.series_id)
|
|
elif m.group("suffix") == "T2w":
|
|
info[t2w].append(s.series_id)
|
|
|
|
elif (m := func_regex.search(s.protocol_name)) is not None:
|
|
if m.group("task") == "rest":
|
|
info[rest].append(s.series_id)
|
|
if m.group("task") == "restvida":
|
|
info[restvida].append(s.series_id)
|
|
if m.group("task") == "restvidn":
|
|
info[restvidn].append(s.series_id)
|
|
|
|
elif (m := dwi_regex.search(s.protocol_name)) is not None:
|
|
if "b1200" in m.group("label") and not s.is_derived:
|
|
info[dwi_b1200].append(s.series_id)
|
|
elif "b0ref" in m.group("label"):
|
|
info[ref_b1200].append(s.series_id)
|
|
elif "mshell" in m.group("label") and not s.is_derived:
|
|
# need to look outside the matched group
|
|
if "ref" in s.protocol_name:
|
|
info[ref_mshell].append(s.series_id)
|
|
else:
|
|
info[dwi_mshell].append(s.series_id)
|
|
|
|
elif (m := fmap_regex.search(s.protocol_name)) is not None:
|
|
if m.group("run") is None:
|
|
# implicit run numbering
|
|
if m.group("dir") == "ap":
|
|
info[fmap_ap].append(s.series_id)
|
|
elif m.group("dir") == "pa":
|
|
info[fmap_pa].append(s.series_id)
|
|
else:
|
|
# explicit run numbering
|
|
key = fmap_explicit_keys.get((m.group("dir"), m.group("run")))
|
|
if key is None:
|
|
# unexpected label combination, ignore
|
|
continue
|
|
info[key].append(s.series_id)
|
|
|
|
# deduplicate
|
|
removed = []
|
|
for key, sid_list in info.items():
|
|
if key not in {fmap_ap, fmap_pa}:
|
|
# we allow (expect) run identifiers in field maps
|
|
continue
|
|
if len(sid_list) > 1:
|
|
# all other should be unique, keep last in case of repetitions
|
|
removed.extend(sid_list[:-1])
|
|
del sid_list[:-1]
|
|
|
|
return info
|