Commit 37152f70 authored by LE GAC Renaud's avatar LE GAC Renaud
Browse files

Delete CheckAndFix class

parent 85e065c7
""" harvest_tools.checkandfix
"""
import logging
import numpy as np
import re
from datetime import datetime
from gluon import current
from store_tools import (CheckException,
MSG_NO_CONF,
MSG_NO_THESIS,
OAI_URL,
RecordCdsConfPaper,
RecordCdsPubli,
RecordCdsThesis,
RecordHepConfPaper,
RecordHepPubli,
RecordHepThesis,
REG_OAI,
REG_YEAR,
search_synonym,
ToolException)
from store_tools.publicationinfomixin import PAPER_REFERENCE_KEYS
from plugin_dbui import CLEAN_SPACES, get_id, UNDEF_ID
DECODE_ARXIV = re.compile(r"arXiv:(\d{2})(\d{2})\.")
# Decode submitted date: DD MMM YYYY or DD MM YYY
DECODE_DD_MMM_YYYY = re.compile(r"(\d{1,2}) *([A-Za-z]{3}) *(\d{4})")
DECODE_DD_MM_YYYY = re.compile(r"(\d{1,2}) +(\d{1,2}) +(\d{4})")
MONTHS = {"Jan": "01",
"Feb": "02",
"Fev": "02",
"Mar": "03",
"Apr": "04",
"Avr": "04",
"May": "05",
"Mai": "05",
"Jun": "06",
"Jul": "07",
"Aug": "08",
"Sep": "09",
"Oct": "10",
"Nov": "11",
"Dec": "12"}
MSG_FAUTHOR_COLLABORATION = "Reject first author is a Collaboration"
MSG_NO_AUTHOR = "Reject no author(s)"
MSG_NO_CONF_DATE = "Reject no conference date"
MSG_NO_DATE = "Reject no submission date"
MSG_NO_MY_AUTHOR = "Reject no authors of my institute"
MSG_NO_REF = "Reject incomplete paper reference. Check "
MSG_TEMPORARY_RECORD = "Temporary record"
MSG_UNKNOWN_COLLABORATION = "Reject collaboration is unknown."
MSG_UNKNOWN_COUNTRY = "Reject country is unknown."
MSG_UNKNOWN_PUBLISHER = "Reject publisher is unknown."
MSG_WELL_FORMED_DATE = "Reject submission date is not well formed"
REG_CONF_DATES_1 = \
re.compile("0?(\d+) *-? *0?(\d+) *([A-Z][a-z]{2})[a-z]* *(\d{4})")
REG_CONF_DATES_2 = \
re.compile("0?(\d+) *([A-Z][a-z]{2})[a-z]* *-? *0?(\d+) *([A-Z][a-z]{2})[a-z]* *(\d{4})")
REG_DOI = re.compile(r"\d+\.\d+/([a-zA-Z]+)\.(\d+)\.(\w+)")
REG_WELL_FORMED_CONF_DATES_1 = \
re.compile("\d{1,2}-\d{1,2} [A-Z][a-z]{2} \d{4}")
REG_WELL_FORMED_CONF_DATES_2 = \
re.compile("\d{1,2} [A-Z][a-z]{2} - \d{1,2} [A-Z][a-z]{2} \d{4}")
T6 = " "*6
TMP_REC = "*Temporary record*"
UNIVERSITY = "University"
class CheckAndFix(object):
"""A collection of tools to check and repair the content of record.
"""
def __init__(self):
self.db = current.db
self.logger = logging.getLogger("web2py.app.limbra")
self.reg_institute = self._get_reg_institute()
# private cache for my_author rescue list
self.__par = None
self.__reference = None
# private cache for my authors list
self._my_authors = {}
@staticmethod
def _get_conference_dates(record):
"""Return the opening and closing dates of a conference.
Args:
record (RecordCdsConfPaper):
record describing a conference proceeding or talk.
Returns:
tuple of datetime.date:
opening and closing dates.
Raise:
ToolException:
no conference date found.
"""
if "meeting_name" not in record:
raise ToolException(MSG_NO_CONF_DATE)
meeting = record["meeting_name"]
meeting = (meeting[0] if isinstance(meeting, list) else meeting)
# CDS has the opening and closing dates encoded as 20141231
if "opening_date" in meeting and "closing_date" in meeting:
val = meeting["opening_date"].replace("-", "")
opening = datetime.strptime(val, "%Y%m%d")
val = meeting["closing_date"].replace("-", "")
closing = datetime.strptime(val, "%Y%m%d")
return (opening, closing)
# both CDS and INSPIRE have the dates subfield
val = meeting["date"]
# date is encode as 12 - 15 Mar 2014
m = REG_CONF_DATES_1.match(val)
if m:
fmt = "%d-%b-%Y"
val = "%s-%s-%s" % (m.group(1), m.group(3), m.group(4))
opening = datetime.strptime(val, fmt)
val = "%s-%s-%s" % (m.group(2), m.group(3), m.group(4))
closing = datetime.strptime(val, fmt)
return (opening, closing)
# dates are encoded 29 Feb - 1 Mar 2014
m = REG_CONF_DATES_2.match(val)
if m:
fmt = "%d-%b-%Y"
val = "%s-%s-%s" % (m.group(1), m.group(2), m.group(5))
opening = datetime.strptime(val, fmt)
val = "%s-%s-%s" % (m.group(3), m.group(4), m.group(5))
closing = datetime.strptime(val, fmt)
return (opening, closing)
raise ToolException(MSG_NO_CONF_DATE)
def _get_reg_institute(self):
"""Get the regular expression defining the affiliation of my institute.
It is obtained by concatenating the affiliation keys.
Affiliation key can contains character like ``(``, ``)`` or ``&``.
They are replaced by ``\(`` *etc*.
Returns:
str:
"""
# alias
db = self.db
app = current.app
reg_institute = app.reg_institute
# regular expression for the affiliation keys
# protect special character
# add start and end of string for an exact match
if not reg_institute:
lst = []
for row in db(db.affiliation_keys.id > 0).iterselect():
val = row.key_u
val = (val
.replace("(", "\(")
.replace(")", "\)")
.replace("&", "\&")
.replace("$", "\$")
.replace("+", "\+")
.replace("?", "\?"))
val = r"(^|\|){}($|\|)" .format(val)
lst.append(val)
reg_institute = r"|".join(lst)
return reg_institute
def _get_author_rescue_list(self, record, id_project, id_team):
"""Get the rescue list for my authors.
Args:
record (RecordCdsPubli):
record describing a publication.
id_project (int):
identifier of the project in the database.
id_team (int):
identifier of the team in the database.
Returns:
list:
empty when not defined
"""
year = record.submitted()
# try to recover year when not defined
if not year:
# published article, proceeding
if record.df_info.year.iloc[0] != "":
year = record.df_info.year.iloc[0]
# date of a conference (cds.cern.ch)
elif "meeting_name" in record:
if record._get("meeting_name", "opening_date") != "":
year = record._get("meeting_name", "opening_date")
# end date of a conference
elif record._get("meeting_name", "closing_date") != "":
year = record._get("meeting_name", "closing_date")
else:
return []
# date of a conference (inspirehep.net)
elif "conference" in record:
val = record.conference.get("opening_date", None)
if val is None:
return []
year = val[:4]
#
# protection
# submitted and paper year are protect against erratum, but ...
#
if isinstance(year, list):
year.sort()
year = year[0]
# the value can have several format 1992, 1992-12-31, ....
m = REG_YEAR.search(year)
if m:
year = m.group(1)
else:
return []
# caching
t = (year, id_project, id_team)
if t == self.__par:
return self.__reference
# extract the list from the database
row = self.db.my_authors(year=year,
id_projects=id_project,
id_teams=id_team)
if row:
self.__reference = row['authors'].strip("\n"). split(', ')
else:
self.__reference = []
return self.__reference
def _is_synonym(self, tablename, value):
"""Check that the synonym field contains *value*.
Args:
tablename (str): name of the database table
value (str): value to be searched
Returns:
bool: ``True`` if *one* row is found, ``False`` otherwise.
"""
query = self.db[tablename].synonyms.contains(value)
if db(query).count() == 1:
return True
return False
def _recover_submitted(self, record):
"""Recover submitted date using conference, preprint or thesis
information.
Args:
record (RecordCdsPubli):
record describing a publication.
Returns:
str:
target at least YYYY-MM
empty when procedure failed
"""
val = ""
if isinstance(record, RecordCdsConfPaper):
opening = self._get_conference_dates(record)[0]
val = opening.strftime("%Y-%m-%d")
elif isinstance(record, RecordHepConfPaper):
opening = record.get("opening_data", None)
if opening is not None:
val = opening.strftime("%Y-%m-%d")
elif isinstance(record, (RecordHepThesis, RecordCdsThesis)):
val = record.these_defense()
# try with a preprint number
if val == "" or len(val) < 7:
report = record.preprint_number()
if report:
m_arxiv = DECODE_ARXIV.match(report)
if m_arxiv:
val = "20%s-%s" % (m_arxiv.group(1), m_arxiv.group(2))
# last change use the creation date for the record
if val == "" or len(val) < 7:
if isinstance(record, (RecordCdsConfPaper, RecordCdsPubli, RecordCdsThesis)):
val = record["creation_date"][0:7]
else:
val = record.get("legacy_creation_date", "")[0:7]
return val
def authors(self, record):
"""Check that:
* author fields are defined.
* first author is not like ATLAS Collaboration
Args:
record (RecordCdsPubli):
record describing a publication.
Raises:
CheckException:
when there is no authors.
"""
self.logger.debug(f"{T6}check authors")
if not record.is_authors():
raise CheckException(MSG_NO_AUTHOR)
if "collaboration" in record.first_author().lower():
raise CheckException(MSG_FAUTHOR_COLLABORATION)
def collaboration(self, record):
"""Check synonyms for collaboration by using by the proper value.
Args:
record (RecordCdsPubli):
record describing a publication.
Raises:
CheckException:
* the collaboration is unknown (neither collaboration
or synonym)
* more than one synonym found.
"""
self.logger.debug(f"{T6}check collaboration")
val = record.collaboration()
if not val:
return
try:
db = self.db
dbid = search_synonym(db.collaborations, "collaboration", val)
if dbid == UNDEF_ID:
raise ToolException(MSG_UNKNOWN_COLLABORATION)
collaboration = db.collaborations[dbid].collaboration
if collaboration != val:
# cds.cern.ch
if "corporate_name" in record:
# one collaboration
if isinstance(record["corporate_name"], dict):
record["corporate_name"]["collaboration"] = \
collaboration
# several collaboration
# replace the list of dictionary by a single one
else:
record["corporate_name"] = \
{"collaboration": collaboration}
# inspirehep.net
elif "collaborations" in record:
record["collaborations"] = [{"value": collaboration}]
except ToolException as e:
raise CheckException(*e.args)
def country(self, record):
"""Check synonyms for conference country by using by the proper value.
Args:
record (RecordCdsPubli):
record describing a publication.
Raises:
CheckException:
* the country is unknown (neither country nor synonym)
* more than one synonym found.
"""
self.logger.debug(f"{T6}check country")
if not isinstance(record, RecordCdsConfPaper):
return
val = record.conference_country()
if len(val) == 0:
raise CheckException(MSG_UNKNOWN_COUNTRY)
try:
db = self.db
dbid = search_synonym(db.countries, "country", val)
if dbid == UNDEF_ID:
raise ToolException(MSG_UNKNOWN_COUNTRY)
country = db.countries[dbid].country
if country != val:
obj = record["meeting_name"]
if isinstance(obj, dict):
location = obj["location"].replace(val, country)
record["meeting_name"]["location"] = location
else:
for di in obj:
if "location" in di:
di["location"] = \
di["location"].replace(val, country)
record["meeting_name"] = obj
except ToolException as e:
raise CheckException(*e.args)
def conference_date(self, record):
"""Check conference date exists and well formatted.
Args:
record (RecordCdsConfPaper):
record describing a talk or a proceeding.
Raises:
CheckException:
dates are not found.
"""
self.logger.debug(f"{T6}check conference date")
# conference information are available, i.e proceeding
if not isinstance(record, RecordCdsConfPaper):
return
val = record.conference_dates()
if len(val) == 0:
raise CheckException(MSG_NO_CONF_DATE)
# is it well formed
if REG_WELL_FORMED_CONF_DATES_1.match(val):
return
if REG_WELL_FORMED_CONF_DATES_2.match(val):
return
# format the date properly
opening, closing = self._get_conference_dates(record)
if opening.month == closing.month:
val = "%i-%i %s %i" % (opening.day,
closing.day,
opening.strftime("%b"),
opening.year)
else:
val = "%i %s - %i %s %i" % (opening.day,
opening.strftime("%b"),
closing.day,
closing.strftime("%b"),
opening.year)
meeting = record["meeting_name"]
meeting = (meeting[0] if isinstance(meeting, list) else meeting)
meeting["date"] = val
def is_bad_oai_used(self, record):
"""Bad OAI is when the ``id`` in the OAI field is different from
the ``record id``. This happens when an old record is redirected
to new one.
Args:
record (RecordCdsPubli):
record describing a publication.
Returns:
bool:
``True`` when a record is found in the database with
the bad OAI.
"""
self.logger.debug(f"{T6}check is bad oai used")
value = record.oai()
match = REG_OAI.match(value)
if int(match.group(2)) != record.id():
db = self.db
# a record with the bad OAI exists in the database
bad_oai_url = OAI_URL % (match.group(1), match.group(2))
if get_id(db.publications, origin=bad_oai_url):
return True
return False
def is_oai(self, record):
"""``True`` when the OAI is not defined in the record.
Note:
make sense only for record from cds.cern.ch or old.inspirehep.net
Args:
record (RecordCdsPubli): record describing a publication.
Returns:
bool:
``True`` when the OAI is not defined in the record.
"""
self.logger.debug(f"{T6}check is oai")
# make no sense for record from new inspirehep.net (March 2020)
if record.host() == "inspirehep.net":
return True
# field / subfield depends on the store
test = ("oai" in record and "value" in record["oai"]) or \
("FIXME_OAI" in record and "id" in record["FIXME_OAI"])
return test
def format_authors(self, record, fmt="Last, First"):
"""Format the author names.
Args:
record (RecordCdsPubli):
record describing a publication.
fmt (str):
define the format for author names.
Possible values are ``First, Last``, ``F. Last``, ``Last``,
``Last, First`` and ``Last F.``
"""
self.logger.debug(f"{T6}format authors")
record.reformat_authors(fmt)
def format_editor(self, record):
"""Format the editor abbreviation.
The encoding depends on the store::
INVENIO: Phys. Lett. B + volume 673
INSPIREHEP: Phys.Lett + volume B673
Standardise the answer as ``Phys. Lett. B``.
Args:
record (RecordCdsPubli):
record describing a publication.
Raises:
CheckException:
when the editor is not well formed.
"""
self.logger.debug(f"{T6}format editor")
if not record.is_published():
return
df = record.df_info.iloc[0]
editor = df.title