Identifier's creator and activity's object can now be inlined (PID only remains valid). When downloading spdx licenses file (which is saved to a json file), the in-memory cache will be disabled, so there should be no unnecessary caching going on (this also avoids importing requests for that single download). The pid_of function is duplicated from another file, and should be a target for a potential refactoring in the future - but for now, this is acceptable, since we don't have to worry about package structure and we keep the scripts self-contained.
401 lines
14 KiB
Python
401 lines
14 KiB
Python
import json
|
|
from urllib.parse import urljoin
|
|
from pathlib import Path
|
|
import re
|
|
import warnings
|
|
|
|
import click
|
|
from lxml import html
|
|
from requests_cache import CachedSession
|
|
|
|
|
|
def consult_spdx_license(license_uri: str) -> str | None:
|
|
"""Match the license uri against spdx data
|
|
|
|
This function consults the spdx license file, trying to match the
|
|
given url against "reference" or "see also" links. Returns the
|
|
label, if match is found.
|
|
|
|
"""
|
|
spdx_list_data = load_spdx_licenses()
|
|
# uri may be lacking "/legalcode" and/or extension, compared to spdx
|
|
pat = re.compile(rf"{license_uri}(/legalcode)?(\.[a-z]{{1-5}})?")
|
|
|
|
# go through the licenses searching for matching one
|
|
res = None
|
|
if license_uri.startswith("https://spdx.org"):
|
|
# explicit match for "reference"
|
|
for license in spdx_list_data["licenses"]:
|
|
if re.match(pat, license["reference"]) is not None:
|
|
res = license
|
|
else:
|
|
# match against "see also" urls
|
|
for license in spdx_list_data["licenses"]:
|
|
for see_also in license["seeAlso"]:
|
|
if re.match(pat, see_also) is not None:
|
|
res = license
|
|
|
|
return res["licenseId"] if res is not None else None
|
|
|
|
|
|
def csl_abstract(d: dict) -> str | None:
|
|
"""Get abstract from csl
|
|
|
|
Some abstracts seen in the wild are marked up with jats tags, and
|
|
the top-level may include (a combination of) sections, titles and
|
|
paragraphs (usually, a section itself contains a title and one
|
|
paragraph). We can use the paragraphs, and mix in the section
|
|
titles. Otherwise, remove all tags (return text content).
|
|
|
|
"""
|
|
if abstract := d.get("abstract", False):
|
|
h = html.fromstring(abstract)
|
|
if {x.tag for x in h} <= {"jats:p", "jats:title", "jats:sec"}:
|
|
return jats2md(h)
|
|
else:
|
|
return h.text_content()
|
|
else:
|
|
return None
|
|
|
|
|
|
def csl_license(d: dict) -> list:
|
|
"""Get license from doi content-negotiation json"""
|
|
license_urls = []
|
|
for license in d.get("license", []):
|
|
if license["content-version"] == "vor":
|
|
# "version of record"
|
|
license_urls.append(license["URL"])
|
|
# deduplicate before returning, just in case
|
|
return list(set(license_urls))
|
|
|
|
|
|
def csl_publish_date(d: dict, allow_incomplete: bool = True) -> str | None:
|
|
"""Get one publication date out of csl"""
|
|
if "issued" in d:
|
|
date = d["issued"]["date-parts"]
|
|
elif "published-online" in d:
|
|
date = d["published-online"]["date-parts"]
|
|
else:
|
|
return None
|
|
|
|
# partial date, a nested array of numbers
|
|
if len(date[0]) == 1 or (len(date[0]) < 3 and not allow_incomplete):
|
|
isodate = f"{date[0][0]}" # yyyy (only year is required)
|
|
elif len(date[0]) == 2:
|
|
isodate = f"{date[0][0]}-{date[0][1]:02}" # yyyy-mm
|
|
else:
|
|
isodate = f"{date[0][0]}-{date[0][1]:02}-{date[0][2]:02}" # yyyy-mm-dd
|
|
|
|
return isodate
|
|
|
|
|
|
def discover_authors(
|
|
publication: dict, all_our_people: dict[str, dict], citeproc_record: dict
|
|
) -> list[dict]:
|
|
|
|
missing_attributions = []
|
|
|
|
# check which contributors with orcids are already declared
|
|
declared_contributor_orcids = set()
|
|
for attribution in publication.get("attributed_to", []):
|
|
if isinstance(attribution, dict):
|
|
if (orcid := process_orcid(attribution.get("object", {}))) is not None:
|
|
declared_contributor_orcids.add(f"https://orcid.org/{orcid}")
|
|
|
|
# compare to contributors with orcids in the citeproc record
|
|
for author in citeproc_record.get("author", []):
|
|
if (
|
|
(orcid := author.get("ORCID")) is not None
|
|
and orcid in all_our_people.keys()
|
|
and orcid not in declared_contributor_orcids
|
|
):
|
|
if author.get("sequence") == "first":
|
|
r = "obo:MS_1002034" # first author
|
|
elif author.get("sequence") == "additional":
|
|
r = "obo:MS_1002036" # co-author
|
|
else:
|
|
r = "marcrel:aut"
|
|
missing_attributions.append({"object": all_our_people[orcid], "roles": [r]})
|
|
|
|
return missing_attributions
|
|
|
|
|
|
def jats2md(span: html.HtmlElement, rstrip: bool = True) -> str:
|
|
full_text = ""
|
|
for elem in span:
|
|
if elem.tag == "jats:title":
|
|
if elem.text.lower() != "abstract":
|
|
# we know an abstract is an abstract
|
|
full_text += elem.text_content()
|
|
full_text += ": " if not elem.text_content().endswith(".") else " "
|
|
elif elem.tag == "jats:p":
|
|
this_text = elem.text_content()
|
|
for sub in elem:
|
|
if sub.tag == "jats:ext-link":
|
|
# wrap at least plain links for unambiguous parsing by hugo
|
|
if (href := sub.get("xlink:href")) == sub.text_content():
|
|
this_text = this_text.replace(href, f"<{href}>")
|
|
full_text += this_text
|
|
full_text += "\n\n"
|
|
elif elem.tag == "jats:sec":
|
|
full_text += jats2md(elem, rstrip=False)
|
|
else:
|
|
full_text += elem.text_content()
|
|
return full_text.rstrip() if rstrip else full_text
|
|
|
|
|
|
def load_spdx_licenses(lic_file: Path = Path(".cache/licenses.json")) -> dict:
|
|
"""Load spdx license file - from Internet or disk
|
|
|
|
If loading from Internet, store in a file for future use.
|
|
|
|
"""
|
|
if lic_file.exists():
|
|
with lic_file.open() as f:
|
|
d = json.load(f)
|
|
else:
|
|
# "permanently" cache by downloading
|
|
session = CachedSession(backend="memory")
|
|
with session.cache_disabled():
|
|
r = session.get("https://spdx.org/licenses/licenses.json")
|
|
if r.ok:
|
|
d = r.json()
|
|
with lic_file.open("w") as f:
|
|
json.dump(d, f)
|
|
else:
|
|
warnings.warn("Failed to retrieve the spdx license file")
|
|
d = {"licenses": []}
|
|
return d
|
|
|
|
|
|
def pid_of(x: str | dict) -> str:
|
|
"""Return a PID of an object, inlined or not
|
|
|
|
A shortcut - makes a pid string or an inlined dict (where pid is a
|
|
property) equivalent. Does not do further validation, but it could
|
|
be added here.
|
|
|
|
"""
|
|
return x.get("pid", "") if isinstance(x, dict) else x
|
|
|
|
|
|
def process_doi(paper: dict) -> str | None:
|
|
"""Return a DOI from identifiers"""
|
|
|
|
for identifier in paper.get("identifiers", []):
|
|
if (
|
|
pid_of(identifier.get("creator")) == "ror:01fyxcz70"
|
|
or identifier.get("schema_type") == "dlthings:DOI"
|
|
):
|
|
return identifier.get("notation")
|
|
|
|
|
|
def process_orcid(person: dict) -> str | None:
|
|
"""Return an ORCID from identifiers"""
|
|
|
|
for identifier in person.get("identifiers", []):
|
|
if (
|
|
identifier.get("schema_type") == "trr379ri:ORCID"
|
|
or pid_of(identifier.get("creator")) == "ror:04fa4r544"
|
|
):
|
|
return identifier.get("notation")
|
|
|
|
|
|
def publishing_process(d: dict) -> dict[str, str] | None:
|
|
res = {"object": "obo:IAO_0000444"}
|
|
has_detail = False
|
|
|
|
if (pubdate := csl_publish_date(d)) is not None:
|
|
has_detail = True
|
|
res["at_time"] = pubdate
|
|
|
|
if (issn := d["ISSN"]) is not None:
|
|
has_detail = True
|
|
# there can be more than one (e.g. different for print / online)
|
|
# if that's the case, use the 1st - we have no more data at hand
|
|
res["at_location"] = f"ISSN:{issn[0]}"
|
|
|
|
return res if has_detail else None
|
|
|
|
|
|
def query_doi_citation(session: CachedSession, doi: str) -> str | None:
|
|
doi_url = urljoin("https://doi.org/", doi)
|
|
r = session.get(doi_url, headers={"Accept": "text/x-bibliography; style=apa"})
|
|
if r.ok and (r.encoding != r.apparent_encoding == "utf-8"):
|
|
# if it appears like utf-8, it likely is utf-8
|
|
# see https://stackoverflow.com/questions/44203397/
|
|
r.encoding = r.apparent_encoding
|
|
return r.text if r.ok else None
|
|
|
|
|
|
def query_doi_csl(session: CachedSession, doi: str) -> dict | None:
|
|
doi_url = urljoin("https://doi.org", doi)
|
|
r = session.get(
|
|
doi_url, headers={"Accept": "application/vnd.citationstyles.csl+json"}
|
|
)
|
|
return r.json() if r.ok else None
|
|
|
|
|
|
def remap_person_records(records: list[dict]) -> dict[str, dict]:
|
|
orcid_map = {
|
|
f"https://orcid.org/{orcid}": record
|
|
for record in records
|
|
if (orcid := process_orcid(record)) is not None
|
|
}
|
|
return orcid_map
|
|
|
|
|
|
def rules(citeproc_record: dict) -> list[str]:
|
|
res = []
|
|
for url in csl_license(citeproc_record):
|
|
if (license_label := consult_spdx_license(url)) is not None:
|
|
res.append(f"spdxlic:{license_label}")
|
|
return sorted(res)
|
|
|
|
|
|
def short_name_from_citeproc(d: dict) -> str | None:
|
|
"""Generate file name based on citeproc data
|
|
|
|
Combines last name of the first author, (short) container title,
|
|
and date to form something that is human-readable and likely
|
|
unique enough.
|
|
|
|
Required properties are usually present, but they are not
|
|
required, so we proceed only if we find all three.
|
|
|
|
"""
|
|
|
|
if not (
|
|
"author" in d
|
|
and ("container-title-short" in d or "container-title" in d)
|
|
and "issued" in d
|
|
):
|
|
return None
|
|
|
|
# first author (et al)
|
|
author = d["author"]
|
|
if len(author) == 1:
|
|
# family is required (at least in crossref) - define default to be safe
|
|
author_part = author[0].get("family", "unknown")
|
|
else:
|
|
author_part = author[0].get("family", "unknown") + "_etal"
|
|
|
|
# journal title (abbreviated)
|
|
if container := d.get("container-title-short", False):
|
|
journal_part = container.replace(" ", "_")
|
|
elif ((container := d.get("container-title")) is not None) and container != []:
|
|
# todo: iso4?
|
|
journal_part = container.replace(" ", "_")
|
|
else:
|
|
# none of those are mandatory
|
|
journal_part = d.get("group-title", "")
|
|
institution = d.get("institution", [{}])[0].get("name")
|
|
if institution == "bioRxiv":
|
|
# "biorxiv-neuroscience" over "neuroscience"
|
|
journal_part = institution + "-" + "journal_part"
|
|
if journal_part == "":
|
|
journal_part = "unknown"
|
|
journal_part = re.sub(r"[^\w]", "", journal_part) # keep alphanumerics
|
|
|
|
date_part = csl_publish_date(d).replace("-", "_") # pyright:ignore
|
|
|
|
return "_".join((author_part, journal_part, date_part)) + ".md"
|
|
|
|
|
|
@click.command()
|
|
@click.argument("input", type=click.File("rb"))
|
|
@click.argument("persons", type=click.File("rb"))
|
|
@click.argument("output", type=click.File("wt"))
|
|
@click.option("--extras", is_flag=True)
|
|
def main(input, persons, output, extras):
|
|
|
|
session = CachedSession(
|
|
".cache/requests-cache/http_cache",
|
|
backend="sqlite",
|
|
match_headers=["Accept"],
|
|
expire_after=7200,
|
|
)
|
|
|
|
all_people = [json.loads(line) for line in persons]
|
|
all_people_dict = remap_person_records(all_people)
|
|
|
|
for line in input:
|
|
paper = json.loads(line)
|
|
doi = process_doi(paper)
|
|
citeproc_metadata = query_doi_csl(session, doi) if doi is not None else None
|
|
citation_text = (
|
|
query_doi_citation(session, doi) if doi is not None and extras else None
|
|
)
|
|
|
|
if citation_text is not None:
|
|
paper["x_citation"] = citation_text
|
|
|
|
if citeproc_metadata is None:
|
|
# nothing to do, emit unchanged
|
|
click.echo(json.dumps(paper), output)
|
|
continue
|
|
|
|
# contributors
|
|
more_attributions = discover_authors(paper, all_people_dict, citeproc_metadata)
|
|
if len(more_attributions) > 0:
|
|
if "attributed_to" not in paper:
|
|
paper["attributed_to"] = more_attributions
|
|
else:
|
|
paper["attributed_to"].extend(more_attributions)
|
|
|
|
# publishing activity (date / ISSN)
|
|
citeproc_pp = publishing_process(citeproc_metadata)
|
|
activities = paper.get("generated_by", [])
|
|
|
|
# find publishing process in publication
|
|
pp_idx = None
|
|
for i in range(len(activities)):
|
|
if pid_of(activities[i].get("object")) == "obo:IAO_0000444": # Publishing process
|
|
pp_idx = i
|
|
break
|
|
|
|
# update publishing activity (date & issn)
|
|
if citeproc_pp is not None:
|
|
if "generated_by" not in paper:
|
|
# no activities so far: add a list
|
|
paper["generated_by"] = [citeproc_pp]
|
|
elif pp_idx is None:
|
|
# activities but no publishing process: append
|
|
paper["generated_by"].append(citeproc_pp)
|
|
else:
|
|
# activities incl. publishing process: merge keeping original values
|
|
paper["generated_by"][pp_idx] = (
|
|
citeproc_pp | paper["generated_by"][pp_idx]
|
|
)
|
|
# override date if is more precise in citeproc
|
|
if len(citeproc_pp.get("at_time", "").split("-")) > len(
|
|
paper["generated_by"][pp_idx].get("at_time", "").split("-")
|
|
):
|
|
paper["generated_by"][pp_idx]["at_time"] = citeproc_pp["at_time"]
|
|
|
|
# title
|
|
if paper.get("title") is None and citeproc_metadata.get("title") is not None:
|
|
paper["title"] = citeproc_metadata.get("title")
|
|
|
|
# abstract
|
|
if (
|
|
paper.get("description") is None
|
|
and (citeproc_abstract := csl_abstract(citeproc_metadata)) is not None
|
|
):
|
|
paper["description"] = citeproc_abstract
|
|
|
|
# rules (licenses)
|
|
if paper.get("rules") is None:
|
|
citeproc_rules = rules(citeproc_metadata)
|
|
if len(citeproc_rules) > 0:
|
|
paper["rules"] = citeproc_rules
|
|
|
|
# suggested output file name
|
|
if extras and (sn := short_name_from_citeproc(citeproc_metadata)) is not None:
|
|
paper["x_suggested_name"] = sn
|
|
|
|
click.echo(json.dumps(paper), output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|