automaton.py 19.5 KB
Newer Older
1
""" harvest_tools.automaton
2 3

"""
4
import logging
5 6
import re

7 8
from .base import get_rex_institute, MSG_FIX_ORIGIN, MSG_IN_DB
from gluon import current
9
from gluon.storage import Storage
10 11
from .msg import Msg
from .msgcollection import MsgCollection
12
from plugin_dbui import CALLBACK_ERRORS, get_id
13
from store_tools import (build_store,
14
                         OAI,
15 16 17
                         OAI_URL,
                         StoreException,
                         ToolException)
18
from store_tools.factory import build_record
19

20 21 22
MSG_NO_CAT = 'Select a "category" !!!'
MSG_NO_PROJECT = 'Select a "project" !!!'
MSG_NO_TEAM = 'Select a "team" !!!'
23

LE GAC Renaud's avatar
LE GAC Renaud committed
24
MSG_INSERT_FAIL = "Fail to insert the new record in the database."
25

26
OAI = "oai:%s:%i"
27

28 29
T2 = " "*2
T4 = " "*4
30
T6 = " "*6
31

32

33
class Automaton(object):
34
    """Base class to search and process publications:
35

36
        * Decode the selector defining user criteria.
LE GAC Renaud's avatar
LE GAC Renaud committed
37
        * Search in the store publications matching user criteria.
LE GAC Renaud's avatar
LE GAC Renaud committed
38
        * Instantiate the record and check it.
39
        * Insert new records in the database.
40

41 42
    Note:
        The parameters of the search are defined by the current ``request``.
43

44 45 46
    The logic implements in the ``Automaton`` class is the following:

        #. Ask to the store, all the `record_id` satisfying the user request.
LE GAC Renaud's avatar
LE GAC Renaud committed
47 48
        #. Reject `record_id` contains in the *origin* field of a
           database entry.
LE GAC Renaud's avatar
LE GAC Renaud committed
49
        #. Request to the store, the JSON description of the publications
LE GAC Renaud's avatar
LE GAC Renaud committed
50 51 52 53
           and decode them.
        #. Reject the record for which the *secondary_oai_url* is contained in
           the *origin* field of a database entry. Update the *origin* field
           of the database record.
54
        #. Check that the *oai* of the publication is defined and well formed.
LE GAC Renaud's avatar
LE GAC Renaud committed
55 56
           Recover it, if it is not the case. At this stage the OAI is always
           defined.
57 58
        #. Reject temporarily publication.
        #. Check that *authors* are defined.
59
           Reject the publication if it is not the case.
60
        #. Check that *my institute* is in the list of the institutes
61 62 63 64 65 66
           signing the publication. Reject the publication if it is
           not the case. When the affiliation are not defined,
           try to recover this case, by finding the author of my institute
           signing the publication. This recovery procedure uses
           the *author rescue list*. Reject the record when the recovery
           procedure failed.
67
        #. Check that the *collaboration*, if defined, is well formed.
68
           Reject the publication if it is not the case
69 70 71 72 73
        #. Several check are applied depending on the publication type.
        #. At the end of this process, the publisher, the authors are
           formatted and the list of signatories of my institute extracted.

    Args:
LE GAC Renaud's avatar
LE GAC Renaud committed
74 75 76 77 78 79 80 81 82
        db (gluon.DAL):
            the database connection.

        id_team (int):
            the identifier of the team in the database.

        id_project (int):
            the identifier of the project in the database.

83
        automaton (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
            the name of the automaton which will be used to process the data.
            Possible values are: ``articles``, ``notes``, ``preprints``,
            ``proceedings``, ``reports``, ``talks`` and ``theses``.

        id_category (int):
            the identifier of the category of publication

        year_start (int):
            starting year for the scan

        year_end (int):
            ending year of the scan

        dry_run (bool):
            new records are not inserted in the database when ``True``.

100
    Raises:
LE GAC Renaud's avatar
LE GAC Renaud committed
101 102
        ToolException:
            * team or project or the publication category not defined
103

104
    """
105

106 107 108 109
    def __init__(self,
                 db,
                 id_team,
                 id_project,
110
                 automaton,
111 112 113
                 id_category,
                 year_start=None,
                 year_end=None,
114
                 dry_run=True):
115 116

        # protection team, project and/or category have to be defined
LE GAC Renaud's avatar
LE GAC Renaud committed
117
        if not id_team:
118 119
            raise ToolException(MSG_NO_TEAM)

LE GAC Renaud's avatar
LE GAC Renaud committed
120
        if not id_project:
121 122
            raise ToolException(MSG_NO_PROJECT)

LE GAC Renaud's avatar
LE GAC Renaud committed
123
        if not id_category:
124 125
            raise ToolException(MSG_NO_CAT)

LE GAC Renaud's avatar
LE GAC Renaud committed
126 127 128 129 130 131 132 133
        self.collection_logs = []
        self.controller = automaton
        self.db = db
        self.dry_run = dry_run
        self.id_category = id_category
        self.id_team = id_team
        self.id_project = id_project
        self.logs = []
134
        self.logger = logging.getLogger("web2py.app.limbra")
135
        self.rex_institute = get_rex_institute(db, current.app)
LE GAC Renaud's avatar
LE GAC Renaud committed
136 137 138 139
        self.store = None
        self.year_start = year_start
        self.year_end = year_end

140
        # Construct harvester Storage needed for the log
LE GAC Renaud's avatar
LE GAC Renaud committed
141 142 143 144
        self.harvester = Storage(id_teams=id_team,
                                 id_projects=id_project,
                                 controller=automaton,
                                 id_categories=id_category)
145

146 147 148 149 150
        # Identifier of the categories preprint and articles
        # Used by the method _is_record_in_db
        self._id_preprint = get_id(db.categories, code="PRE")
        self._id_article = get_id(db.categories, code="ACL")

151 152 153
        # Keep track of the shelf for inspirehep.net
        self.shelf = None

154 155 156
    def _insert_in_db(self, log_year="", **fields):
        """Insert the record in the database, handling database exception.

157
        Args:
158
            log_year (str): year of the record for the log
159

160
        Keyword Args:
LE GAC Renaud's avatar
LE GAC Renaud committed
161 162
            **fields:
                keyword arguments defining the record values to be
163
                inserted in the database.
164

165
        Returns:
LE GAC Renaud's avatar
LE GAC Renaud committed
166 167
            int:
                one when the record is inserted / updated in the database,
168
                zero otherwise.
169 170 171 172 173

        """
        db = self.db

        try:
LE GAC Renaud's avatar
LE GAC Renaud committed
174
            rec_id = db.publications.insert(**fields)
LE GAC Renaud's avatar
LE GAC Renaud committed
175 176
            if rec_id:
                return 1
177

LE GAC Renaud's avatar
LE GAC Renaud committed
178
            # operation can be reject by callback table._before_insert
LE GAC Renaud's avatar
LE GAC Renaud committed
179
            else:
LE GAC Renaud's avatar
LE GAC Renaud committed
180
                msg = MSG_INSERT_FAIL
LE GAC Renaud's avatar
LE GAC Renaud committed
181 182
                if CALLBACK_ERRORS in db.publications:
                    msg = db.publications._callback_errors
183

LE GAC Renaud's avatar
LE GAC Renaud committed
184 185 186
                # reduce the error message
                if isinstance(msg, list):
                    msg = "%s %s" % (msg[0], msg[-1])
187

188
                self.logs[-1].reject(msg, year=log_year)
LE GAC Renaud's avatar
LE GAC Renaud committed
189
                return 0
190

LE GAC Renaud's avatar
LE GAC Renaud committed
191 192
        # operation can be rejected by the database
        except Exception as dbe:
193
            self.logs[-1].reject(str(dbe), year=log_year)
LE GAC Renaud's avatar
LE GAC Renaud committed
194
            return 0
195

LE GAC Renaud's avatar
LE GAC Renaud committed
196 197 198 199 200 201 202
    def _is_record_in_db(self,
                         collection_title,
                         host=None,
                         rec_id=None,
                         oai_url=None):
        """Return the database identifier when the publication is registered.
        The search is based on the ``origin`` field and on the primary OAI.
203

204 205
        Note:
            A new log entry is created when a record is found.
206

207
        Args:
208
            title (str): the title of the publication.
209 210

        Keyword Args:
211
            host (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
212
                the store. possible values are ``cds.cern.ch`` or
213 214
                ``inspirehep.net``. To be used with *rec_id*.

LE GAC Renaud's avatar
LE GAC Renaud committed
215 216 217
            rec_id (int):
                the record identifier in the store

218
            oai_url (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
219 220
                the URL of the record in the store.
                Either use *host* and *rec_id* or *oai_url*
221

222
        Returns:
LE GAC Renaud's avatar
LE GAC Renaud committed
223 224
            int:
                the id of the record in the database when a record is found,
225
                0 otherwise.
226

227
        Raises:
LE GAC Renaud's avatar
LE GAC Renaud committed
228 229
            ValueError:
                * keyword arguments are not defined properly.
230

231 232
        """
        db = self.db
233
        harvester = self.harvester
234

235 236 237 238 239 240 241 242
        # build the OAI URL
        if host is not None and rec_id is not None and oai_url is None:
            url = OAI_URL % (host, rec_id)
        elif host is None and rec_id is None and oai_url is not None:
            url = oai_url
        else:
            raise ValueError

LE GAC Renaud's avatar
LE GAC Renaud committed
243
        # protection empty URL
244 245 246
        if len(url) == 0:
            return 0

247 248 249
        # check the OAI
        query = db.publications.origin.contains(url)
        setrows = db(query)
250

251
        if setrows.count() == 0:
252
            return 0
253

254
        # one record found
255 256
        columns = [db.publications.id,
                   db.publications.id_categories,
257 258 259
                   db.publications.title,
                   db.publications.year]
        publication = setrows.select(*columns).first()
260

261 262
        # Note:
        # The category for the publication and the harvester have to be equal.
263 264 265 266 267 268 269
        # However, keep the record if it is a preprint when the harvester
        # looks for articles. This is required to transform a preprint
        # into article
        #
        # Category can disagree when the publication is an article and
        # the harvester look for preprint. In that case, keep the article
        #
270
        if publication.id_categories != harvester.id_categories:
271 272 273 274 275 276 277

            is_preprint_to_article = \
                publication.id_categories == self._id_preprint \
                and harvester.id_categories == self._id_article

            if is_preprint_to_article:
                return 0
278 279

        # log
280
        self.logs.append(Msg(harvester=harvester,
LE GAC Renaud's avatar
LE GAC Renaud committed
281
                             collection=collection_title,
282 283 284 285 286
                             record_id=rec_id,
                             title=publication.title))

        self.logs[-1].idle(MSG_IN_DB, publication.year)

287
        logger = self.logger
288
        logger.info(f"{T2}record {rec_id} in db with id {publication.id}")
LE GAC Renaud's avatar
LE GAC Renaud committed
289

290
        return publication.id
291

LE GAC Renaud's avatar
LE GAC Renaud committed
292
    def check_record(self, record):
293 294
        """Check the content of the record in order to fix non-conformities.
        Return ``False`` when non-conformities are found and can not be
295 296
        corrected.

297
        Note:
298
            To be implemented by inheried classes
299

300
        Args:
LE GAC Renaud's avatar
LE GAC Renaud committed
301 302
            record (Record):
                JSON record describing the publication.
303

304
        Returns:
LE GAC Renaud's avatar
LE GAC Renaud committed
305 306
            bool:
                ``False`` when a non-conformity is found and can not be
307
                corrected.
308 309

        """
310
        return False
311

312
    def get_record_by_fields(self, oai_url, year, **kwargs):
313 314
        """Get database record matching fields values defined
        in the keyword arguments.
315

316
        Note:
317 318
            This method is required to deal with publication entered by hand
            and found later by an harvester.
319

320
        Args:
321
            oai_url (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
322 323 324
                the oai_url, *e.g.* ``http://cds.cern.ch/record/123456``.
                The origin field of the existing database record is update to
                **oai_url** when a match is found.
325

LE GAC Renaud's avatar
LE GAC Renaud committed
326 327
            year (int):
                the year of the publication. It is used
328 329 330
                by the search algorithm and by the logger.

        Keyword Args:
331
            kwargs (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
332 333
                 a series of key, value pair where the key is the name of a
                 publications database field.
334

335
        Returns:
LE GAC Renaud's avatar
LE GAC Renaud committed
336 337 338 339 340
            tuple:
                ``(id, status)`` which contains the ``id`` of the record.
                The ``id`` is equal to ``None`` when there is no matching.
                The ``status`` is equal to one when the existing record was
                modified zero otherwise.
341 342

        """
343
        self.logger.debug(f"{T6}get existing record by fields")
344

345
        # alias
346
        db = self.db
347
        logs = self.logs
348

349 350 351
        # add the publication year to search criteria
        if year:
            kwargs["year"] = year
352 353 354 355 356 357 358

        # look for an existing record
        rec_id = get_id(db.publications, **kwargs)
        if not rec_id:
            return (None, 0)

        # fix origin field
359 360
        publication = db.publications[rec_id]
        ok = publication.origin and publication.origin == oai_url
361 362
        if not ok:
            if not self.dry_run:
363
                publication = dict(origin=oai_url)
364

365
            logs[-1].modify(MSG_FIX_ORIGIN, year)
366 367
            return (rec_id, 1)

368
        logs[-1].idle(MSG_IN_DB, year)
369 370
        return (rec_id, 0)

371 372
    def insert_record(self, record):
        """Insert the record in the database.
373

374 375 376
        Note:
            This method depend on the type of publications.
            It has to be implemented for each inherited class.
377

378
        Args:
LE GAC Renaud's avatar
LE GAC Renaud committed
379 380
            record (Record):
                record describing the publication.
381

382
        Returns:
LE GAC Renaud's avatar
LE GAC Renaud committed
383 384
            int:
                one when the record is inserted / updated in the database,
385
                zero otherwise.
386 387 388 389

        """
        return 0

LE GAC Renaud's avatar
LE GAC Renaud committed
390
    def process_collection(self, collection):
LE GAC Renaud's avatar
LE GAC Renaud committed
391
        """Retrieve JSON objects from the invenio store and for the given
LE GAC Renaud's avatar
LE GAC Renaud committed
392
        collection. Corresponding records are inserted in the database.
393

394
        Args:
395
            collection (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
396
                name of the collection to be interrogated.
397

398 399
        Note:
            * Design to never stop although exceptions are raised
LE GAC Renaud's avatar
LE GAC Renaud committed
400 401
            * Have a look to the attributes ``collection_logs`` and ``logs``
              in order to understand what happen.
402 403

        """
404
        logger = self.logger
405
        logger.info(f"process collection {collection}")
406 407

        # alias
408
        collection_logs = self.collection_logs
409
        controller = self.controller
LE GAC Renaud's avatar
LE GAC Renaud committed
410
        host = self.harvester.host
411
        project = self.db.projects[self.id_project].project
LE GAC Renaud's avatar
LE GAC Renaud committed
412
        store = self.store
413

414 415
        # log collection information
        # A collection is identified as "Project Controller collection"
LE GAC Renaud's avatar
LE GAC Renaud committed
416 417
        ctitle = "%s / %s / %s" % (project, controller, collection)
        collection_logs.append(MsgCollection(title=ctitle))
418

LE GAC Renaud's avatar
LE GAC Renaud committed
419
        # get search parameters for the collection including user criteria
420 421 422 423 424
        kwargs = store.search_parameters(collection,
                                         year_start=self.year_start,
                                         year_end=self.year_end)

        logger.debug(f"search parameters {kwargs}")
425

LE GAC Renaud's avatar
LE GAC Renaud committed
426
        # get the list of record identifier matching the search criteria
427 428
        try:
            rec_ids = store.get_ids(**kwargs)
429

430
        except StoreException as error:
431
            logger.info(f"exit process_collection: {error}")
432 433 434
            collection_logs[-1].url = store.last_search_url()
            collection_logs[-1].error = error
            return
435

LE GAC Renaud's avatar
LE GAC Renaud committed
436
        # log the number of record found for the collection
437 438
        collection_logs[-1].url = store.last_search_url()
        collection_logs[-1].found = len(rec_ids)
439

LE GAC Renaud's avatar
LE GAC Renaud committed
440
        if len(rec_ids) == 0:
441
            logger.info(f"no records found in {collection}")
442
            return
443

444 445
        logger.info(f"{len(rec_ids)} records found in {collection}")
        logger.info("")
446

LE GAC Renaud's avatar
LE GAC Renaud committed
447 448 449 450
        # remove form the list identifier already registered in the data base
        # and log them
        func = self._is_record_in_db
        rec_ids = [el for el in rec_ids if func(ctitle, host, el) == 0]
451

LE GAC Renaud's avatar
LE GAC Renaud committed
452
        # process the remaining identifiers
453
        (*map(self.process_recid, rec_ids),)
LE GAC Renaud's avatar
LE GAC Renaud committed
454

455 456
    def process_recjson(self, recjson):
        """Process the publication provided as a JSON record:
LE GAC Renaud's avatar
LE GAC Renaud committed
457 458 459 460

            * instantiate the record (RecordPubli, REcordConf, RecordThesis)
            * check the record
            * insert new record in the database
461

462
        Args:
463 464
            recjson (dict):
                record provided by the store.
465

LE GAC Renaud's avatar
LE GAC Renaud committed
466
        """
467
        logger = self.logger
468
        logger.info(f"{T4}process record (process_recjson)")
469

LE GAC Renaud's avatar
LE GAC Renaud committed
470 471
        collection_logs = self.collection_logs
        harvester = self.harvester
472 473
        logs = self.logs

LE GAC Renaud's avatar
LE GAC Renaud committed
474
        # instantiate the record
475
        record = build_record(recjson, shelf=self.shelf)
476
        logger.debug(f"{T4}{record.title()[:72]}")
LE GAC Renaud's avatar
LE GAC Renaud committed
477 478 479 480

        # start the log for the record
        logs.append(Msg(harvester=harvester,
                        collection=collection_logs[-1].title,
481
                        origin=record.oai(),
LE GAC Renaud's avatar
LE GAC Renaud committed
482 483 484 485 486 487
                        record_id=record.id(),
                        title=record.title()))

        # check that the record is well formed
        # repair non-conformity as far as possible
        if not self.check_record(record):
488
            logger.info(f"{T4}{logs[-1].txt}")
489
            return
LE GAC Renaud's avatar
LE GAC Renaud committed
490

491 492
        txt = ("(dry run)" if self.dry_run else "")
        logger.debug(f"{T4}insert record in the database {txt}")
493

LE GAC Renaud's avatar
LE GAC Renaud committed
494 495
        # insert the record in the database
        self.insert_record(record)
496

497
        if logger.getEffectiveLevel() <= logging.INFO:
LE GAC Renaud's avatar
LE GAC Renaud committed
498 499 500
            log = logs[-1]
            action = log.action
            action = (action.upper() if isinstance(action, str) else action)
501
            logger.info(f"{T4}log: {action} {log.txt}")
LE GAC Renaud's avatar
LE GAC Renaud committed
502

503 504 505 506
    def process_recid(self, rec_id):
        """Process the publication identified by its record identifier:

            * get the publication data from the store using its identifier
LE GAC Renaud's avatar
LE GAC Renaud committed
507 508
            * instantiate the record: ``RecordPubli``, ``RecordConf``
              or ``RecordThesis``
509 510 511 512 513 514
            * process OAI data
            * check the record
            * insert new record in the database

        Note:
            * Design to never stop although exception are raised
LE GAC Renaud's avatar
LE GAC Renaud committed
515 516
            * Have a look to the attribute ``collection_logs`` and ``logs`` in
              order to understand what happen.
517 518 519 520 521 522

        Args:
            rec_id (int):
                identifier of the publication in the store.

        """
523
        logger = self.logger
524 525
        logger.info("")
        logger.info(f"{T2}get record {rec_id} (process_recid)")
526 527 528 529 530 531 532 533 534 535

        collection_logs = self.collection_logs
        harvester = self.harvester
        logs = self.logs

        try:
            recjson = self.store.get_record(rec_id)
            self.process_recjson(recjson)

        except Exception as e:
536
            logger.debug(f"{T2}{str(e)}")
537 538 539
            url = OAI_URL % (harvester.host, rec_id)
            logs.append(Msg(harvester=harvester,
                            collection=collection_logs[-1].title,
540
                            origin=OAI % (harvester.host, rec_id),
541 542 543 544 545
                            record_id=rec_id,
                            title=url))
            logs[-1].reject(e)
            return

LE GAC Renaud's avatar
LE GAC Renaud committed
546 547 548 549
    def process_url(self, host, collections):
        """Retrieve JSON objects from the invenio store and
        insert corresponding records in the database.

550 551
        Note:
            * Design to never stop although exceptions are raised
LE GAC Renaud's avatar
LE GAC Renaud committed
552 553
            * Have a look to the attributes ``collection_logs`` and ``logs``
              in order to understand what happen.
554

LE GAC Renaud's avatar
LE GAC Renaud committed
555
        Args:
556
            host (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
557 558 559
                host name to query for publications, either
                ``cds.cern.ch`` or ``inspirehep.net``.

560
            collections (str):
LE GAC Renaud's avatar
LE GAC Renaud committed
561
                list of collection to be interrogated.
562
                Collections are separated by a comma.
LE GAC Renaud's avatar
LE GAC Renaud committed
563 564

        """
565 566 567 568
        code = self.db.categories[self.id_category].code

        self.logger.info("")
        self.logger.info(f"process URL -- {host} -- {collections} -- {code}")
LE GAC Renaud's avatar
LE GAC Renaud committed
569 570 571 572 573 574

        # extend harvester for logs
        self.harvester.host = host
        self.harvester.collections = collections

        # instantiate the store
575 576
        self.shelf = ("literature" if host == "inspirehep.net" else None)
        self.store = build_store(host, shelf=self.shelf)
LE GAC Renaud's avatar
LE GAC Renaud committed
577 578 579 580 581

        # list of collections
        collections = re.sub(" *, *", ",", collections).split(",")

        # process
582
        (*map(self.process_collection, collections),)
583 584 585 586

    def report(self):
        """Build the processing report.

587 588
        Returns:
            dict:
LE GAC Renaud's avatar
LE GAC Renaud committed
589
                * ``collection_logs`` list of :class:`MsgCollection`
590
                * ``controller`` str
LE GAC Renaud's avatar
LE GAC Renaud committed
591
                * ``logs`` list of :class:`Msg`
LE GAC Renaud's avatar
LE GAC Renaud committed
592
                * ``selector`` :class:`plugin_dbui.Selector`
593 594 595 596 597 598

        """

        return dict(collection_logs=self.collection_logs,
                    controller=self.controller,
                    logs=self.logs)