This adds a Python script that fixes the DWI field maps. The initial heuristic incorrectly placed series named "dwi_acq-(...)b0ref" as _sbref in the DWI folder. These should have been _epi in the func folder instead. This is a Python reimplementation of the bash script proposed by @richard0nkrumah in all/status#1 Here, we expect to run within a datalad run call, so we are using OS operations (via Path, rename and unlink) instead of git mv / rm; the subsequent datalad save should set the record straight. We do not require any NIfTIs to be present (we still need some of the jsons because we are writing the IntendedFor field). We use heudiconv's function for saving pretty JSON to minimize changes in the files we touch.
182 lines
6.1 KiB
Python
182 lines
6.1 KiB
Python
"""Fix up incorrect naming of DWI files
|
|
|
|
The initial heuristic incorrectly placed series named
|
|
"dwi_acq-(...)b0ref" as _sbref in the DWI folder instead of _epi in
|
|
the func folder. These series are intended to be used for field map
|
|
correction of the actual dwi images because they have opposite phase
|
|
encoding durations.
|
|
|
|
This is a Python reimplementation of the correction proposed by
|
|
richard0nkrumah.
|
|
|
|
Here, we expect to run within a datalad run call, so OS operations
|
|
(via Path) are used to rename or remove files (subsequent datalad save
|
|
should set the record straight). We use heudiconv's function for
|
|
saving pretty JSON to minimize changes in the files we touch.
|
|
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
import re
|
|
|
|
|
|
def fixup_session(ss_dir: Path) -> None:
|
|
|
|
subject, session = ss_labels(ss_dir)
|
|
|
|
repls = {}
|
|
for acq in ("b1200", "mshell"):
|
|
|
|
for p in ss_dir.glob(f"dwi/sub-{subject}_ses-{session}_acq-{acq}_dwi.*"):
|
|
# dwi files with no dir-<label> in name (should have been dir-PA)
|
|
new_stem = p.stem.replace("_dwi", "_dir-PA_dwi")
|
|
new_p = p.rename(p.with_stem(new_stem))
|
|
if p.suffix == ".gz":
|
|
# .nii.gz also goes to scans file
|
|
repls[p.relative_to(ss_dir)] = new_p.relative_to(ss_dir)
|
|
|
|
for p in ss_dir.glob(f"dwi/sub-{subject}_ses-{session}_acq-{acq}_sbref.*"):
|
|
# sbref (should have been in fmap/ with dir-AP and _epi suffix)
|
|
if p.suffix in (".bval", ".bvec"):
|
|
p.unlink()
|
|
elif p.suffix == ".gz":
|
|
new_p = p.rename(ss_dir / "fmap" / p.name.replace("_sbref", "_dir-AP_epi"))
|
|
repls[p.relative_to(ss_dir)] = new_p.relative_to(ss_dir)
|
|
elif p.suffix == ".json":
|
|
new_p = p.rename(ss_dir / "fmap" / p.name.replace("_sbref", "_dir-AP_epi"))
|
|
ensure_intended_for(
|
|
new_p,
|
|
[
|
|
f"ses-{session}/dwi/sub-{subject}_ses-{session}_acq-{acq}_dir-PA_dwi.nii.gz"
|
|
],
|
|
)
|
|
|
|
if len(repls) > 0:
|
|
edit_scans_file(ss_dir / f"sub-{subject}_ses-{session}_scans.tsv", repls)
|
|
|
|
|
|
def ss_labels(ss_dir: Path) -> tuple[str, str]:
|
|
subject_label = ss_dir.parent.name.split("-")[-1]
|
|
session_label = ss_dir.name.split("-")[-1]
|
|
return subject_label, session_label
|
|
|
|
|
|
def edit_scans_file(scans_tsv: Path, repls: dict[Path, Path]) -> None:
|
|
try:
|
|
txt = scans_tsv.read_text()
|
|
except FileNotFoundError:
|
|
print(f"{scans_tsv} file is missing, did you get it?")
|
|
return
|
|
|
|
for old, new in repls.items():
|
|
txt = txt.replace(str(old), str(new))
|
|
|
|
try:
|
|
scans_tsv.write_text(txt)
|
|
except PermissionError:
|
|
print(f"{scans_tsv} permission error, did you unlock it?")
|
|
|
|
|
|
def ensure_intended_for(sidecar: Path, targets: list[str]) -> None:
|
|
"""Ensure the given value of IntendedFor
|
|
|
|
Will not write to the file if the given value is already there.
|
|
|
|
"""
|
|
try:
|
|
with sidecar.open() as jp:
|
|
j = json.load(jp)
|
|
except FileNotFoundError:
|
|
print(f"{sidecar.name} file is missing, did you get it?")
|
|
return
|
|
|
|
if j.get("IntendedFor") == targets:
|
|
print(f"〰 {sidecar.name}: nothing to do")
|
|
else:
|
|
print(f"🔨 {sidecar.name}: updating IntendedFor (preset)")
|
|
j.update({"IntendedFor": targets})
|
|
try:
|
|
save_pretty_json(sidecar, j)
|
|
except PermissionError:
|
|
print(f"{sidecar.name} permission error, did you unlock it?")
|
|
|
|
|
|
def save_pretty_json(filename: Path, data: dict) -> None:
|
|
j = json_dumps_pretty(data)
|
|
with filename.open("w") as fp:
|
|
fp.write(j)
|
|
|
|
|
|
def json_dumps_pretty(j: dict, indent: int = 2, sort_keys: bool = True) -> str:
|
|
"""Given a json structure, pretty print it by colliding numeric arrays
|
|
into a line.
|
|
|
|
If resultant structure differs from original -- throws exception.
|
|
|
|
Copied & minimally changed from heudiconv 1.4.0 (utils.py).
|
|
https://github.com/nipy/heudiconv/
|
|
|
|
Copyright HeuDiConv developers; licensed under the Apache License,
|
|
Version 2.0. http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
"""
|
|
js = json.dumps(j, indent=indent, sort_keys=sort_keys)
|
|
# trim away \n and spaces between entries of numbers
|
|
js_ = re.sub(
|
|
'[\n ]+("?[-+.0-9e]+"?,?) *\n(?= *"?[-+.0-9e]+"?)',
|
|
r" \1",
|
|
js,
|
|
flags=re.MULTILINE,
|
|
)
|
|
# uniform no spaces before ]
|
|
js_ = re.sub(r" *\]", "]", js_)
|
|
# uniform spacing before numbers
|
|
# But that thing could screw up dates within strings which would have 2 spaces
|
|
# in a date like Mar 3 2017, so we do negative lookahead to avoid changing
|
|
# in those cases
|
|
# import pdb; pdb.set_trace()
|
|
js_ = re.sub(
|
|
r"(?<!\w{3})" # negative lookbehind for the month
|
|
r' *("?[-+.0-9e]+"?)'
|
|
r"(?! [123]\d{3})" # negative lookahead for a year
|
|
r"(?P<space> ?)[ \n]*",
|
|
r" \1\g<space>",
|
|
js_,
|
|
)
|
|
# no spaces after [
|
|
js_ = re.sub(r"\[ ", "[", js_)
|
|
# the load from the original dump and reload from tuned up
|
|
# version should result in identical values since no value
|
|
# must be changed, just formatting.
|
|
j_just_reloaded = json.loads(js)
|
|
j_tuned = json.loads(js_)
|
|
|
|
assert j_just_reloaded == j_tuned, (
|
|
"Values differed when they should have not. "
|
|
"Report to the heudiconv developers"
|
|
)
|
|
|
|
return js_
|
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("bids_root", type=Path)
|
|
parser.add_argument("--subject", nargs="*", help="Subject label(s)")
|
|
parser.add_argument("--session", nargs="*", help="Session label(s)")
|
|
args = parser.parse_args()
|
|
|
|
subjects = set(args.subject) if args.subject is not None else None
|
|
sessions = set(args.session) if args.session is not None else None
|
|
|
|
# glob the BIDS root and restrict to subjects / sessions from CLI
|
|
dirs: list[Path] = []
|
|
for p in sorted(args.bids_root.glob("sub-*/ses-*")):
|
|
subject, session = ss_labels(p)
|
|
if (subjects is None or subject in subjects) and (sessions is None or session in sessions):
|
|
dirs.append(p)
|
|
|
|
# fix up the sidecars for each subject-session
|
|
for session_dir in dirs:
|
|
fixup_session(session_dir)
|