automaton.py 19.4 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
""" harvest_tools.automaton
3 4 5 6 7

"""
import re
import traceback

8

9
from base import MSG_FIX_ORIGIN, MSG_IN_DB, ToolException
10
from gluon.storage import Storage
11 12 13
from invenio_tools import (CheckAndFix,
                           InvenioStore,
                           Marc12,
14
                           OAI_URL)
15 16
from msg import Msg
from msgcollection import MsgCollection
17
from plugin_dbui import CALLBACK_ERRORS, get_create_id, get_id, UNDEF_ID
18

19

20 21 22
MSG_NO_CAT = 'Select a "category" !!!'
MSG_NO_PROJECT = 'Select a "project" !!!'
MSG_NO_TEAM = 'Select a "team" !!!'
23

24
MSG_NSERT_FAIL = "Fail to insert the new record in the database."
25
MSG_NO_OAI = "Reject no OAI identifier"
26
MSG_WELL_FORM_OAI = "Reject OAI is not well formed"
27

28

29
class Automaton(object):
30
    """Base class to search and process publications:
31

32 33 34 35 36
        * Decode the selector defining user criteria.
        * Search for publications in the store, according to
          user criteria
        * Decode the XML string return by the store.
        * Insert new records in the database.
37

38 39
    Note:
        The parameters of the search are defined by the current ``request``.
40

41 42 43 44 45
    The logic implements in the ``Automaton`` class is the following:

        #. Ask to the store, all the `record_id` satisfying the user request.
        #. Reject `record_id` matching the `origin` field of database entry.
        #. Request to the store, the XML description of the publication
46
           and decode it.
47 48 49
        #. Check that the *oai* of the publication is defined and well formed.
           Recover it, if it is not the case. From time to time, the `id`
           encoded in the `oai` field is different from the `record_id`.
50 51 52
           This happens when an old record is redirected to new one
           for obscure reasons. The record is ignore if a database entry
           is found with the bad OAI.
53 54
        #. Reject temporarily publication.
        #. Check that *authors* are defined.
55
           Reject the publication if it is not the case.
56
        #. Check that *my institute* is in the list of the institutes
57 58 59 60 61 62
           signing the publication. Reject the publication if it is
           not the case. When the affiliation are not defined,
           try to recover this case, by finding the author of my institute
           signing the publication. This recovery procedure uses
           the *author rescue list*. Reject the record when the recovery
           procedure failed.
63
        #. Check that the *collaboration*, if defined, is well formed.
64
           Reject the publication if it is not the case
65 66 67 68 69 70 71 72
        #. Several check are applied depending on the publication type.
        #. At the end of this process, the publisher, the authors are
           formatted and the list of signatories of my institute extracted.

    Args:
        db (gluon.DAL): the database connection.
        id_team (int): the identifier of the team in the database.
        id_project (int): the identifier of the project in the database.
73 74 75 76
        controller (unicode): the name of the automaton which
            will be used to process the data. Possible values are:
            ``articles``, ``notes``, ``preprints``, ``proceedings``,
            ``reports``, ``talks`` and ``theses``.
77 78 79 80 81 82 83 84 85 86
        id_category (int): the identifier of the category of publication
        year_start (int): starting year for the scan
        year_end (int): ending year of the scan
        dry_run (bool): new records are not inserted in the database
            when ``True``.
        debug (bool): activate the verbose mode when ``True``.

    Raises:
        ToolException: when the team or the project or the publication category
            is not defined.
87

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
    """
    def __init__(self,
                 db,
                 id_team,
                 id_project,
                 controller,
                 id_category,
                 year_start=None,
                 year_end=None,
                 dry_run=True,
                 debug=False):

        self.collection_logs = []
        self.db = db
        self.id_team = id_team
        self.id_project = id_project
        self.controller = controller
        self.id_category = id_category
        self.year_start = year_start
        self.year_end = year_end
        self.dry_run = dry_run
        self.dbg = debug
        self.logs = []

        self.check = CheckAndFix()
        self.marc12 = Marc12()

        # check parameters
        # protection team, project and/or category have to be defined
        if not self.id_team:
            raise ToolException(MSG_NO_TEAM)

        if not self.id_project:
            raise ToolException(MSG_NO_PROJECT)

        if not self.id_category:
            raise ToolException(MSG_NO_CAT)

        # Construct harvester Storage needed for the log
        self.harvester = Storage(id_teams=self.id_team,
                                 id_projects=self.id_project,
                                 controller=self.controller,
                                 id_categories=self.id_category)

132 133 134
    def _insert_in_db(self, log_year="", **fields):
        """Insert the record in the database, handling database exception.

135 136
        Args:
            log_year (unicode): year of the record for the log
137

138 139 140
        Keyword Args:
            **fields: keyword arguments defining the record values to be
                inserted in the database.
141

142 143 144
        Returns:
            int: one when the record is inserted / updated in the database,
                zero otherwise.
145 146 147 148 149

        """
        db = self.db

        try:
LE GAC Renaud's avatar
LE GAC Renaud committed
150
            rec_id = db.publications.insert(**fields)
LE GAC Renaud's avatar
LE GAC Renaud committed
151 152
            if rec_id:
                return 1
153

LE GAC Renaud's avatar
LE GAC Renaud committed
154 155 156 157 158
            # operation can be reject by callbacks table._before_insert
            else:
                msg = MSG_NSERT_FAIL
                if CALLBACK_ERRORS in db.publications:
                    msg = db.publications._callback_errors
159

LE GAC Renaud's avatar
LE GAC Renaud committed
160 161 162
                # reduce the error message
                if isinstance(msg, list):
                    msg = "%s %s" % (msg[0], msg[-1])
163

LE GAC Renaud's avatar
LE GAC Renaud committed
164 165
                self.logs[-1].reject(msg, log_year)
                return 0
166

LE GAC Renaud's avatar
LE GAC Renaud committed
167 168 169 170
        # operation can be rejected by the database
        except Exception as dbe:
            self.logs[-1].reject(dbe.message, log_year)
            return 0
171

172
    def _is_record_in_db(self, rec_id, title):
173
        """Return ``True`` when the record is already in the database.
174 175
        The search is based on the origin field.

176 177
        Note:
            A new log entry is created when a record is found.
178

179 180 181
        Args:
            rec_id (int): record identifier
            title (unicode): title of the collection
182

183 184
        Returns:
            bool: ``True`` when a record if found, ``False`` otherwise.
185 186 187

        """
        db = self.db
188
        harvester = self.harvester
189

190 191
        # check
        url = OAI_URL % (harvester.host, rec_id)
192 193 194 195 196 197 198
        db_id = get_id(db.publications, origin=url)

        if db_id is None:
            return False

        publication = db.publications[db_id]

199 200 201 202 203 204 205
        # same category for the publication and the harvester
        # keep the record if it is not the case
        # this is required to transform a preprint into article
        if publication.id_categories != harvester.id_categories:
            return False

        # log
206 207 208 209 210 211 212 213 214
        self.logs.append(Msg(harvester=self.harvester,
                             collection=title,
                             record_id=rec_id,
                             title=publication.title))

        self.logs[-1].idle(MSG_IN_DB, publication.year)

        return True

215 216 217 218 219
    def _search_parameters(self, collection):
        """Build the keywords to steer the URL search in invenio store.
        The main parameter is the collection and the date range defined
        in the selector.

220 221 222 223 224 225
        Args:
            collection (unicode): string defining the collection in the
                store. The syntax depends on the invenio store:

                    * ``"find cn d0 and tc p and not tc c"``
                    * ``"LHCb Papers"``.
226

227 228 229
        Returns:
            dict: the key are a sub-set of those defined in
                :py:meth:invenio_tools.InvenioStore.get_ids.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275

        """

        # INSPIREHEP store
        if collection.startswith('find'):

            query = collection

            if self.year_start and not self.year_end:
                query += " and date %s" % self.year_start

            elif not self.year_start and self.year_end:
                query += " and date %s" % self.year_end

            elif self.year_start and self.year_end:
                query += " and date > %s and date < %s " \
                         % (self.year_start - 1, self.year_end + 1)

            dic = dict(p=query,  # query à la spires
                       rg=1000,  # maximum number of records returned
                       sf='year',  # sort by date
                       so='d')  # descending order

        # CERN INVENIO store
        else:

            if self.year_start and not self.year_end:
                rex = self.year_start

            elif not self.year_start and self.year_end:
                rex = self.year_end

            elif self.year_start and self.year_end:
                li = []
                for year in range(self.year_start, self.year_end + 1):
                    li.append(str(year))
                rex = '|'.join(li)

            dic = dict(cc=collection,  # collection
                       f1='year',  # search on year
                       m1='r',  # use regular expression
                       p1=rex,  # regular expression defining year
                       sf='year',  # sort by date
                       so='d')  # descending order
        return dic

LE GAC Renaud's avatar
LE GAC Renaud committed
276
    def check_record(self, record):
277 278
        """Check the content of the record in order to fix non-conformities.
        Return ``False`` when non-conformities are found and can not be
279 280
        corrected.

281 282 283
        Note:
            Some checks depend on the type of publications and have to be
            implemented in inherited class.
284

285 286 287 288
        Note:
            The order of the checks matter. It should be oai,
            temporary record, authors, my authors and then a series of checks
            specific to the publication type.
289

290 291
        Args:
            record (Record): Marc12 record describing the publication.
292

293 294 295
        Returns:
            bool: ``False`` when a non-conformity is found and can not be
                corrected.
296 297 298 299 300 301

        """
        if self.dbg:
            print "check record"

        try:
302 303
            self.check.recover_oai(record, self.harvester.host)

304 305 306 307
            if self.check.is_bad_oai_used(record):
                self.logs[-1].idle(MSG_IN_DB, record.year())
                return False

308 309
            self.check.temporary_record(record)
            self.check.authors(record)
310
            self.check.my_affiliation(record, self.id_project, self.id_team)
311 312 313 314 315 316 317 318
            self.check.collaboration(record)

        except Exception as e:
            self.logs[-1].reject(e, record.year())
            return False

        return True

319
    def get_record_by_fields(self, **kwargs):
320 321
        """Get database record matching fields values defined
        in the keyword arguments.
322

323 324 325
        Args:
            oai_url (unicode): *e.g* ``"http://cds.cern.ch/record/123456"``
            year (int): the year of the publication.
326

327 328
        Note:
            Fix the field origin when a match is found.
329

330 331
        Note:
            The year is only used by the logger.
332

333 334 335 336 337
        Returns:
            tuple: ``(id, status)`` which contains the ``id`` of the record.
            The ``id`` is equal to ``None`` when there is no matching.
            The ``status`` is equal to one when the existing record was
            modified zero otherwise.
338 339 340

        """
        if self.dbg:
341
            print "get existing record by fields"
342 343 344 345 346 347 348 349 350 351 352 353 354 355

        db = self.db

        # origin can't be used for the search
        oai_url = kwargs["oai_url"]
        del kwargs["oai_url"]

        # look for an existing record
        rec_id = get_id(db.publications, **kwargs)
        if not rec_id:
            return (None, 0)

        # fix origin field
        ok = db.publications[rec_id].origin and \
LE GAC Renaud's avatar
LE GAC Renaud committed
356
            db.publications[rec_id].origin == oai_url
357 358 359 360 361 362 363 364 365 366
        if not ok:
            if not self.dry_run:
                db.publications[rec_id] = dict(origin=oai_url)

            self.logs[-1].modify(MSG_FIX_ORIGIN, kwargs["year"])
            return (rec_id, 1)

        self.logs[-1].idle(MSG_IN_DB, kwargs["year"])
        return (rec_id, 0)

367 368
    def get_create_collaboration(self, value):
        """Get the database collaboration identifier.
369
        Create it, if it does not exist.
370

371 372
        Args:
            value (unicode): the name of the collaboration.
373

374 375
        Returns:
            int: the id of the collaboration.
376 377 378 379 380 381 382

        """
        if not value:
            return UNDEF_ID

        return get_create_id(self.db.collaborations, collaboration=value)

383 384 385
    def get_create_publisher(self, value):
        """Get the database publisher identifier.
        Create it, if it does not exit.
386

387 388
        Args:
            value (unicode): the abbreviation of the publisher name.
389

390 391
        Returns:
            int: the id of the publisher.
392 393 394 395 396 397 398

        """
        if not value:
            return UNDEF_ID

        return get_create_id(self.db.publishers, abbreviation=value)

399 400
    def insert_record(self, record):
        """Insert the record in the database.
401

402 403 404
        Note:
            This method depend on the type of publications.
            It has to be implemented for each inherited class.
405

406 407
        Args:
            record (Record): Marc12 record describing the publication.
408

409 410 411
        Returns:
            int: one when the record is inserted / updated in the database,
                zero otherwise.
412 413 414 415 416

        """
        return 0

    def process_xml(self, xml):
417 418 419 420 421 422 423
        """Decode the XML string and insert the corresponding records
        in the database.


        Args:
            xml (unicode): XML string encoding the publication records.
                The format follows the MARC12 standard.
424

425 426
        Raises:
           ToolException: when project, team or category identifier
427
              are not defined.
428
           StoreException: when something goes wrong interrogating the
429
              store.
430 431 432 433
           Marc12Exception: when something goes wrong decoding the XML
              string return by the store.
           CheckException: when the record has non-conformities.
           Exception: when the python code crashes.
434 435 436 437 438 439 440 441 442 443 444 445 446 447

        """
        if self.dbg:
            print "start processing", self.__class__.__name__
            print "decode request"

        if self.dbg:
            print "get harvest parameters"

        # decode the XML request
        self.collection_logs.append(MsgCollection(found=1))
        self.decode_xml(xml)

    def process_url(self, host, collections):
448 449
        """Retrieve the XML string from the invenio store and
        insert corresponding records in the database.
450

451 452 453
        Args:
            host (unicode): host name to query for publications, either
                ``cds.cern.ch`` or ``inspirehep.net``.
454

455
            collections (unicode): list of collection to be interrogated.
456

457 458 459 460 461 462 463
        Raises:
           StoreException: when something goes wrong interrogating the
              store.
           Marc12Exception: when something goes wrong decoding the XML
              string return by the store.
           CheckException: when the record has non-conformities.
           Exception: when the python code crashes.
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485

        """
        if self.dbg:
            print "process URL search"

        # extend harvester for logs
        self.harvester.host = host
        self.harvester.collections = collections

        store = InvenioStore(host)

        # list of collections
        collections = re.sub(' *, *', ',', collections).split(',')

        # alias
        controller = self.controller
        project = self.db.projects[self.id_project].project

        # extract the list of publications from the store for each collection
        # the search is perform on a range of creation date
        # if not defined all element are return
        #
486
        # The method use here minimize the memory usage
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
        # on the server as well as on the client side

        for collection in collections:

            # log collection information
            # A collection is identified as "Project Controller collection"
            title = "%s / %s / %s" % (project, controller, collection)
            self.collection_logs.append(MsgCollection(title=title))

            # search record in the harvester repository
            kwargs = self._search_parameters(collection)

            try:
                rec_ids = store.get_ids(**kwargs)

            except Exception as error:
                self.collection_logs[-1].url = store.last_search_url()
                self.collection_logs[-1].error = error
                continue

            self.collection_logs[-1].url = store.last_search_url()
            self.collection_logs[-1].found = len(rec_ids)

            if not rec_ids:
                continue

            if self.dbg:
                print '%i records found in %s' % (len(rec_ids), collection)

            for rec_id in rec_ids:

                if self.dbg:
                    print "\nprocessing record", rec_id

                try:
522
                    if self._is_record_in_db(rec_id, title):
523 524
                        continue

525 526 527
                    xml = store.get_record(rec_id)
                    self.decode_xml(xml)

528
                except Exception as e:
529 530 531 532 533 534 535 536 537
                    print traceback.format_exc()
                    url = OAI_URL % (host, rec_id)
                    self.logs.append(Msg(harvester=self.harvester,
                                         collection=title,
                                         record_id=rec_id,
                                         title=url))
                    self.logs[-1].reject(e)

    def decode_xml(self, xml):
538
        """Decode the MARC XML string and insert records in the database.
539

540 541
        Args:
            xml (unicode): MARC XML string
542 543 544 545 546

        """
        if self.dbg:
            print "process xml record"

547 548 549
        # NOTE
        # BaseException and inherited class
        # are caught by the previous stage
550 551 552 553 554 555 556 557 558 559 560 561 562 563
        records = self.marc12(xml)

        # process individual record
        for record in records:

            if self.dbg:
                print "record decoded"

            # start the log for the record
            self.logs.append(Msg(harvester=self.harvester,
                                 collection=self.collection_logs[-1].title,
                                 record_id=record.id(),
                                 title=record.title()))

564 565 566
            # check that the record is well formed
            # repair non-conformity as far as possible
            if not self.check_record(record):
567 568 569
                continue

            if self.dbg:
570
                print "insert record in the database"
571

572 573
            # insert the record in the database
            self.insert_record(record)
574 575 576 577 578 579 580

            if self.dbg:
                print self.logs[-1].action.upper(), self.logs[-1].txt

    def report(self):
        """Build the processing report.

581 582 583 584 585 586
        Returns:
            dict:
                * ``collection_logs`` list of :class:MsgCollection
                * ``controller`` unicode
                * ``logs`` list of :class:Msg
                * ``selector`` :py:class:plugin_dbui.Selector
587 588 589 590 591 592

        """

        return dict(collection_logs=self.collection_logs,
                    controller=self.controller,
                    logs=self.logs)