automaton.py 21.7 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
""" harvest_tools.automaton
3 4 5 6 7

"""
import re
import traceback

8

9
from base import MSG_FIX_ORIGIN, MSG_IN_DB, ToolException
10
from gluon.storage import Storage
11 12 13
from invenio_tools import (CheckAndFix,
                           InvenioStore,
                           Marc12,
14
                           OAI_URL)
15 16
from msg import Msg
from msgcollection import MsgCollection
17
from plugin_dbui import CALLBACK_ERRORS, get_id, UNDEF_ID
18

19

20 21 22
MSG_NO_CAT = 'Select a "category" !!!'
MSG_NO_PROJECT = 'Select a "project" !!!'
MSG_NO_TEAM = 'Select a "team" !!!'
23

24
MSG_TOOMANY_SYNONYM = "Reject too many %s synonyms."
25
MSG_NSERT_FAIL = "Fail to insert the new record in the database."
26
MSG_NO_OAI = "Reject no OAI identifier"
27
MSG_WELL_FORM_OAI = "Reject OAI is not well formed"
28

29

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
def search_synonym(table, fieldname, value, create=False):
    """Get the database identifier for the record having the database field
    or the synonyms field matching the value.

    Note:
        The database table must have a field name *synonyms*.
        It is a string containing values separated by a comma.
    Args:
        table (gluon.DAL.Table): database table.
        fieldname (unicode): field of the database table
            identified by its name.
        value (unicode): value to be matched.
        create(bool): create a new entry in the database table when
            it is ``True``

    Returns:
        int:
            * the id of the database record.
            * UNDEF_ID if value is not defined.

    Raises:
        ToolException: when more than one synonym is found.

    """
    if not value:
        return UNDEF_ID

    db = table._db

    kwargs = {}
    kwargs[fieldname] = value

    id_rec = get_id(table, **kwargs)
    if id_rec is not None:
        return id_rec

    # nothing found, have a look to the synonyms field
    query = table.synonyms.contains(value)
    setrows = db(query)

    # no synonym found, create the entry
    ncount = setrows.count()
    if ncount == 0 and create:
        return table.insert(**kwargs)

    # one synonym found
    elif ncount == 1:
        return setrows.select(table.id).first().id

    # more than one synonyms - don't know how to choose
    else:
        msg = MSG_TOOMANY_SYNONYM % table._tablename
        raise ToolException(msg)


85
class Automaton(object):
86
    """Base class to search and process publications:
87

88 89 90 91 92
        * Decode the selector defining user criteria.
        * Search for publications in the store, according to
          user criteria
        * Decode the XML string return by the store.
        * Insert new records in the database.
93

94 95
    Note:
        The parameters of the search are defined by the current ``request``.
96

97 98 99 100 101
    The logic implements in the ``Automaton`` class is the following:

        #. Ask to the store, all the `record_id` satisfying the user request.
        #. Reject `record_id` matching the `origin` field of database entry.
        #. Request to the store, the XML description of the publication
102
           and decode it.
103 104 105
        #. Check that the *oai* of the publication is defined and well formed.
           Recover it, if it is not the case. From time to time, the `id`
           encoded in the `oai` field is different from the `record_id`.
106 107 108
           This happens when an old record is redirected to new one
           for obscure reasons. The record is ignore if a database entry
           is found with the bad OAI.
109 110
        #. Reject temporarily publication.
        #. Check that *authors* are defined.
111
           Reject the publication if it is not the case.
112
        #. Check that *my institute* is in the list of the institutes
113 114 115 116 117 118
           signing the publication. Reject the publication if it is
           not the case. When the affiliation are not defined,
           try to recover this case, by finding the author of my institute
           signing the publication. This recovery procedure uses
           the *author rescue list*. Reject the record when the recovery
           procedure failed.
119
        #. Check that the *collaboration*, if defined, is well formed.
120
           Reject the publication if it is not the case
121 122 123 124 125 126 127 128
        #. Several check are applied depending on the publication type.
        #. At the end of this process, the publisher, the authors are
           formatted and the list of signatories of my institute extracted.

    Args:
        db (gluon.DAL): the database connection.
        id_team (int): the identifier of the team in the database.
        id_project (int): the identifier of the project in the database.
129
        automaton (unicode): the name of the automaton which
130 131 132
            will be used to process the data. Possible values are:
            ``articles``, ``notes``, ``preprints``, ``proceedings``,
            ``reports``, ``talks`` and ``theses``.
133 134 135 136 137 138 139 140 141 142
        id_category (int): the identifier of the category of publication
        year_start (int): starting year for the scan
        year_end (int): ending year of the scan
        dry_run (bool): new records are not inserted in the database
            when ``True``.
        debug (bool): activate the verbose mode when ``True``.

    Raises:
        ToolException: when the team or the project or the publication category
            is not defined.
143

144 145 146 147 148
    """
    def __init__(self,
                 db,
                 id_team,
                 id_project,
149
                 automaton,
150 151 152 153 154 155 156 157 158 159
                 id_category,
                 year_start=None,
                 year_end=None,
                 dry_run=True,
                 debug=False):

        self.collection_logs = []
        self.db = db
        self.id_team = id_team
        self.id_project = id_project
160
        self.controller = automaton
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
        self.id_category = id_category
        self.year_start = year_start
        self.year_end = year_end
        self.dry_run = dry_run
        self.dbg = debug
        self.logs = []

        self.check = CheckAndFix()
        self.marc12 = Marc12()

        # check parameters
        # protection team, project and/or category have to be defined
        if not self.id_team:
            raise ToolException(MSG_NO_TEAM)

        if not self.id_project:
            raise ToolException(MSG_NO_PROJECT)

        if not self.id_category:
            raise ToolException(MSG_NO_CAT)

        # Construct harvester Storage needed for the log
        self.harvester = Storage(id_teams=self.id_team,
                                 id_projects=self.id_project,
                                 controller=self.controller,
                                 id_categories=self.id_category)

188 189 190
    def _insert_in_db(self, log_year="", **fields):
        """Insert the record in the database, handling database exception.

191 192
        Args:
            log_year (unicode): year of the record for the log
193

194 195 196
        Keyword Args:
            **fields: keyword arguments defining the record values to be
                inserted in the database.
197

198 199 200
        Returns:
            int: one when the record is inserted / updated in the database,
                zero otherwise.
201 202 203 204 205

        """
        db = self.db

        try:
LE GAC Renaud's avatar
LE GAC Renaud committed
206
            rec_id = db.publications.insert(**fields)
LE GAC Renaud's avatar
LE GAC Renaud committed
207 208
            if rec_id:
                return 1
209

LE GAC Renaud's avatar
LE GAC Renaud committed
210 211 212 213 214
            # operation can be reject by callbacks table._before_insert
            else:
                msg = MSG_NSERT_FAIL
                if CALLBACK_ERRORS in db.publications:
                    msg = db.publications._callback_errors
215

LE GAC Renaud's avatar
LE GAC Renaud committed
216 217 218
                # reduce the error message
                if isinstance(msg, list):
                    msg = "%s %s" % (msg[0], msg[-1])
219

LE GAC Renaud's avatar
LE GAC Renaud committed
220 221
                self.logs[-1].reject(msg, log_year)
                return 0
222

LE GAC Renaud's avatar
LE GAC Renaud committed
223 224 225 226
        # operation can be rejected by the database
        except Exception as dbe:
            self.logs[-1].reject(dbe.message, log_year)
            return 0
227

228
    def _is_record_in_db(self, rec_id, title):
229
        """Return ``True`` when the record is already in the database.
230 231
        The search is based on the origin field.

232 233
        Note:
            A new log entry is created when a record is found.
234

235 236 237
        Args:
            rec_id (int): record identifier
            title (unicode): title of the collection
238

239 240
        Returns:
            bool: ``True`` when a record if found, ``False`` otherwise.
241 242 243

        """
        db = self.db
244
        harvester = self.harvester
245

246 247
        # check
        url = OAI_URL % (harvester.host, rec_id)
248 249 250 251 252 253 254
        db_id = get_id(db.publications, origin=url)

        if db_id is None:
            return False

        publication = db.publications[db_id]

255 256 257 258 259 260 261
        # same category for the publication and the harvester
        # keep the record if it is not the case
        # this is required to transform a preprint into article
        if publication.id_categories != harvester.id_categories:
            return False

        # log
262 263 264 265 266 267 268 269 270
        self.logs.append(Msg(harvester=self.harvester,
                             collection=title,
                             record_id=rec_id,
                             title=publication.title))

        self.logs[-1].idle(MSG_IN_DB, publication.year)

        return True

271 272 273 274 275
    def _search_parameters(self, collection):
        """Build the keywords to steer the URL search in invenio store.
        The main parameter is the collection and the date range defined
        in the selector.

276 277 278 279 280 281
        Args:
            collection (unicode): string defining the collection in the
                store. The syntax depends on the invenio store:

                    * ``"find cn d0 and tc p and not tc c"``
                    * ``"LHCb Papers"``.
282

283 284 285
        Returns:
            dict: the key are a sub-set of those defined in
                :py:meth:invenio_tools.InvenioStore.get_ids.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331

        """

        # INSPIREHEP store
        if collection.startswith('find'):

            query = collection

            if self.year_start and not self.year_end:
                query += " and date %s" % self.year_start

            elif not self.year_start and self.year_end:
                query += " and date %s" % self.year_end

            elif self.year_start and self.year_end:
                query += " and date > %s and date < %s " \
                         % (self.year_start - 1, self.year_end + 1)

            dic = dict(p=query,  # query à la spires
                       rg=1000,  # maximum number of records returned
                       sf='year',  # sort by date
                       so='d')  # descending order

        # CERN INVENIO store
        else:

            if self.year_start and not self.year_end:
                rex = self.year_start

            elif not self.year_start and self.year_end:
                rex = self.year_end

            elif self.year_start and self.year_end:
                li = []
                for year in range(self.year_start, self.year_end + 1):
                    li.append(str(year))
                rex = '|'.join(li)

            dic = dict(cc=collection,  # collection
                       f1='year',  # search on year
                       m1='r',  # use regular expression
                       p1=rex,  # regular expression defining year
                       sf='year',  # sort by date
                       so='d')  # descending order
        return dic

LE GAC Renaud's avatar
LE GAC Renaud committed
332
    def check_record(self, record):
333 334
        """Check the content of the record in order to fix non-conformities.
        Return ``False`` when non-conformities are found and can not be
335 336
        corrected.

337 338 339
        Note:
            Some checks depend on the type of publications and have to be
            implemented in inherited class.
340

341 342 343 344
        Note:
            The order of the checks matter. It should be oai,
            temporary record, authors, my authors and then a series of checks
            specific to the publication type.
345

346 347
        Args:
            record (Record): Marc12 record describing the publication.
348

349 350 351
        Returns:
            bool: ``False`` when a non-conformity is found and can not be
                corrected.
352 353 354 355 356 357

        """
        if self.dbg:
            print "check record"

        try:
358 359
            self.check.recover_oai(record, self.harvester.host)

360 361 362 363
            if self.check.is_bad_oai_used(record):
                self.logs[-1].idle(MSG_IN_DB, record.year())
                return False

364 365
            self.check.temporary_record(record)
            self.check.authors(record)
366
            self.check.my_affiliation(record, self.id_project, self.id_team)
367 368 369
            self.check.collaboration(record)

        except Exception as e:
370
            self.logs[-1].reject(e, record=record)
371 372 373 374
            return False

        return True

375
    def get_record_by_fields(self, **kwargs):
376 377
        """Get database record matching fields values defined
        in the keyword arguments.
378

379 380 381
        Args:
            oai_url (unicode): *e.g* ``"http://cds.cern.ch/record/123456"``
            year (int): the year of the publication.
382

383 384
        Note:
            Fix the field origin when a match is found.
385

386 387
        Note:
            The year is only used by the logger.
388

389 390 391 392 393
        Returns:
            tuple: ``(id, status)`` which contains the ``id`` of the record.
            The ``id`` is equal to ``None`` when there is no matching.
            The ``status`` is equal to one when the existing record was
            modified zero otherwise.
394 395 396

        """
        if self.dbg:
397
            print "get existing record by fields"
398 399 400 401 402 403 404 405 406 407 408 409 410 411

        db = self.db

        # origin can't be used for the search
        oai_url = kwargs["oai_url"]
        del kwargs["oai_url"]

        # look for an existing record
        rec_id = get_id(db.publications, **kwargs)
        if not rec_id:
            return (None, 0)

        # fix origin field
        ok = db.publications[rec_id].origin and \
LE GAC Renaud's avatar
LE GAC Renaud committed
412
            db.publications[rec_id].origin == oai_url
413 414 415 416 417 418 419 420 421 422
        if not ok:
            if not self.dry_run:
                db.publications[rec_id] = dict(origin=oai_url)

            self.logs[-1].modify(MSG_FIX_ORIGIN, kwargs["year"])
            return (rec_id, 1)

        self.logs[-1].idle(MSG_IN_DB, kwargs["year"])
        return (rec_id, 0)

423 424
    def insert_record(self, record):
        """Insert the record in the database.
425

426 427 428
        Note:
            This method depend on the type of publications.
            It has to be implemented for each inherited class.
429

430 431
        Args:
            record (Record): Marc12 record describing the publication.
432

433 434 435
        Returns:
            int: one when the record is inserted / updated in the database,
                zero otherwise.
436 437 438 439 440

        """
        return 0

    def process_xml(self, xml):
441 442 443 444 445 446 447
        """Decode the XML string and insert the corresponding records
        in the database.


        Args:
            xml (unicode): XML string encoding the publication records.
                The format follows the MARC12 standard.
448

449 450
        Raises:
           ToolException: when project, team or category identifier
451
              are not defined.
452
           StoreException: when something goes wrong interrogating the
453
              store.
454 455 456 457
           Marc12Exception: when something goes wrong decoding the XML
              string return by the store.
           CheckException: when the record has non-conformities.
           Exception: when the python code crashes.
458 459 460 461 462 463 464 465 466 467 468 469 470 471

        """
        if self.dbg:
            print "start processing", self.__class__.__name__
            print "decode request"

        if self.dbg:
            print "get harvest parameters"

        # decode the XML request
        self.collection_logs.append(MsgCollection(found=1))
        self.decode_xml(xml)

    def process_url(self, host, collections):
472 473
        """Retrieve the XML string from the invenio store and
        insert corresponding records in the database.
474

475 476 477
        Args:
            host (unicode): host name to query for publications, either
                ``cds.cern.ch`` or ``inspirehep.net``.
478

479
            collections (unicode): list of collection to be interrogated.
480

481 482 483 484 485 486 487
        Raises:
           StoreException: when something goes wrong interrogating the
              store.
           Marc12Exception: when something goes wrong decoding the XML
              string return by the store.
           CheckException: when the record has non-conformities.
           Exception: when the python code crashes.
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509

        """
        if self.dbg:
            print "process URL search"

        # extend harvester for logs
        self.harvester.host = host
        self.harvester.collections = collections

        store = InvenioStore(host)

        # list of collections
        collections = re.sub(' *, *', ',', collections).split(',')

        # alias
        controller = self.controller
        project = self.db.projects[self.id_project].project

        # extract the list of publications from the store for each collection
        # the search is perform on a range of creation date
        # if not defined all element are return
        #
510
        # The method use here minimize the memory usage
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
        # on the server as well as on the client side

        for collection in collections:

            # log collection information
            # A collection is identified as "Project Controller collection"
            title = "%s / %s / %s" % (project, controller, collection)
            self.collection_logs.append(MsgCollection(title=title))

            # search record in the harvester repository
            kwargs = self._search_parameters(collection)

            try:
                rec_ids = store.get_ids(**kwargs)

            except Exception as error:
                self.collection_logs[-1].url = store.last_search_url()
                self.collection_logs[-1].error = error
                continue

            self.collection_logs[-1].url = store.last_search_url()
            self.collection_logs[-1].found = len(rec_ids)

            if not rec_ids:
                continue

            if self.dbg:
                print '%i records found in %s' % (len(rec_ids), collection)

            for rec_id in rec_ids:

                if self.dbg:
                    print "\nprocessing record", rec_id

                try:
546
                    if self._is_record_in_db(rec_id, title):
547 548
                        continue

549 550 551
                    xml = store.get_record(rec_id)
                    self.decode_xml(xml)

552
                except Exception as e:
553 554 555 556 557 558 559 560 561
                    print traceback.format_exc()
                    url = OAI_URL % (host, rec_id)
                    self.logs.append(Msg(harvester=self.harvester,
                                         collection=title,
                                         record_id=rec_id,
                                         title=url))
                    self.logs[-1].reject(e)

    def decode_xml(self, xml):
562
        """Decode the MARC XML string and insert records in the database.
563

564 565
        Args:
            xml (unicode): MARC XML string
566 567 568 569 570

        """
        if self.dbg:
            print "process xml record"

571 572 573
        # NOTE
        # BaseException and inherited class
        # are caught by the previous stage
574 575 576 577 578 579 580 581 582 583 584 585 586 587
        records = self.marc12(xml)

        # process individual record
        for record in records:

            if self.dbg:
                print "record decoded"

            # start the log for the record
            self.logs.append(Msg(harvester=self.harvester,
                                 collection=self.collection_logs[-1].title,
                                 record_id=record.id(),
                                 title=record.title()))

588 589 590
            # check that the record is well formed
            # repair non-conformity as far as possible
            if not self.check_record(record):
591 592 593
                continue

            if self.dbg:
594
                print "insert record in the database"
595

596 597
            # insert the record in the database
            self.insert_record(record)
598 599 600 601 602 603 604

            if self.dbg:
                print self.logs[-1].action.upper(), self.logs[-1].txt

    def report(self):
        """Build the processing report.

605 606 607 608 609 610
        Returns:
            dict:
                * ``collection_logs`` list of :class:MsgCollection
                * ``controller`` unicode
                * ``logs`` list of :class:Msg
                * ``selector`` :py:class:plugin_dbui.Selector
611 612 613 614 615 616

        """

        return dict(collection_logs=self.collection_logs,
                    controller=self.controller,
                    logs=self.logs)
617 618

    def search_collaboration(self, value):
LE GAC Renaud's avatar
LE GAC Renaud committed
619
        """Get the database collaboration identifier using synonyms.
620
        Create the collaboration, if it is well formed and does not exist.
621 622 623 624 625 626

        Args:
            value (unicode): the name of the collaboration.

        Returns:
            int:
LE GAC Renaud's avatar
LE GAC Renaud committed
627
                * the id of the collaboration record.
628 629
                * UNDEF_ID if value is not defined.

630
        Raises:
631 632 633
            ToolException: when more than one synonym is found.

        """
634 635 636 637 638

        return search_synonym(self.db.collaborations,
                              "collaboration",
                              value,
                              True)
639 640

    def search_country(self, value):
LE GAC Renaud's avatar
LE GAC Renaud committed
641
        """Get the database country identifier using synonyms.
642 643 644 645 646 647

        Args:
            value (unicode): the name of the country.

        Returns:
            int:
LE GAC Renaud's avatar
LE GAC Renaud committed
648
                * the id of the country record.
649 650
                * UNDEF_ID if value is not defined.

651
        Raises:
652 653 654
            ToolException: when more than one synonym is found.

        """
655
        return search_synonym(self.db.countries, "country", value)
656 657

    def search_publisher(self, value):
LE GAC Renaud's avatar
LE GAC Renaud committed
658
        """Get the database publisher identifier using synonyms.
659 660 661 662 663 664

        Args:
            value (unicode): the abbreviation of the publisher.

        Returns:
            int:
LE GAC Renaud's avatar
LE GAC Renaud committed
665
                * the id of the publisher record.
666 667
                * UNDEF_ID if value is not defined.

668
        Raises:
669 670 671
            ToolException: when more than one synonym is found.

        """
672
        return search_synonym(self.db.publishers, "abbreviation", value)