# -*- coding: utf-8 -*- """selector module """ import json from datetime import date, timedelta from gluon import current from gluon.dal import Field from plugin_dbui import get_id, Selector RAWSQL_ACTIVITY = \ """ SELECT start_date, end_date FROM history WHERE id_events=%i AND id_people=%i AND id_domains=%i AND id_teams=%i ORDER BY start_date, end_date """ class SelectorActiveItemsException(BaseException): pass class SelectorActiveItems(Selector): """Selector to get records active during a given period of time. The period of time are defined by the selector fields ``year_start`` and ``year_end``. Args: table (gluon.dal.Table): the virtual table defining the selector. exclude_fields (list of string): name of the selector fields which are excluded in the query. """ def __init__(self, table, exclude_fields=("year_end", "year_start")): self._cache_period = None self._period_end = None self._period_start = None Selector.__init__(self, table, exclude_fields=exclude_fields) if self.year_start and not self.year_end: self._period_start = date(self.year_start, 1, 1) self._period_end = date(self.year_start, 12, 31) elif self.year_start and self.year_end: self._period_start = date(self.year_start, 1, 1) self._period_end = date(self.year_end, 12, 31) else: raise SelectorActiveItemsException("Period is not defined.") def get_years(self): """The list of year between period start and end. Returns: list: """ if self._cache_period: start, end = self._cache_period else: start = self._period_start.year end = self._period_end.year return range(start, end + 1) def query(self, table): """Build the database query for the database *table*. The query includes inner join for foreign keys, selector constraints as well as extra queries. Args: table (gluon.dal.Table): the database table in which records are selected. Returns: gluon.dal.Query: The query selects active items during the given period of time. It only works for table having the database fields ``start_date`` and ``end_date``. """ query = Selector.query(self, table) if self._period_start and self._period_end: query &= table.start_date <= self._period_end q2 = table.end_date == None q3 = table.end_date >= self._period_start query &= (q2 | q3) return query def reset_period(self): """Reset the period to the initial values. """ if self._cache_period: self._period_start, self._period_end = self._cache_period self._cache_period = None def set_year(self, year): """Modify the period attributes in order to query records active during the given year. Args: year (integer): """ # keep original period if self._cache_period is None: self._cache_period = (self._period_start, self._period_end) self._period_start = date(year, 1, 1) self._period_end = date(year, 12, 31) class EvtSelector(SelectorActiveItems): """Main selector to build list, metric or graph according to the user criteria: - focus on history record and on a given event. - Select record active during the selected period of time. - Selection can be performed on category defined as a string. - For each record computes virtual field: ``age``, ``coverage``, ``duration``, ``fte`` and ``is_over``. Args: table (gluon.dal.Table): the virtual table defining the selector. exclude_fields (tuple of string): extend the list selector fields which are excluded in the query. The field related to people / object categories, year_end, year_start and year are systematically excluded. id_event (int): the event identifier """ def __init__(self, table, id_event=0, exclude_fields=()): li = ["data", "id_object_categories", "id_object_code", "id_people_categories", "id_people_code", "year_end", "year_start"] li.extend(exclude_fields) SelectorActiveItems.__init__(self, table, exclude_fields=li) self._db = db = current.globalenv["db"] self._id_event = id_event # add virtual fields history = db.history virtual = Field.Virtual db.people.age = virtual("age", self._age, ftype="integer") history.coverage = virtual("coverage", self._coverage, ftype="double") history.duration = virtual("duration", self._duration, ftype="double") history.fte = virtual("fte", self._fte, ftype="double") history.is_end = virtual("is_end", self._is_end, ftype="boolean") history.is_over = virtual("is_over", self._is_over, ftype="boolean") history.is_start = virtual("is_start", self._is_start, ftype="boolean") def _active_period(self, id_people, id_domain, id_team): """Determine the period when a person is active in a given domain and team and for a given event Args: id_people (int): id_domain (int): id_team (int): Returns tuple: start (datetime or None) stop (datetime or None) """ rawsql = RAWSQL_ACTIVITY % (self._id_event, id_people, id_domain, id_team) rows = self._db.executesql(rawsql) nrows = len(rows) if nrows == 1: return (rows[0][0], rows[0][1]) elif nrows > 1: return (rows[0][0], rows[-1][1]) else: return (None, None) def _age(self, row): """Compute the age of the person associated to the history record, now. Args: row (gluon.dal.Row): row of the history table. Returns: int or None: """ value = None now = date.today() if row.people.birth_date: value = ((now - row.people.birth_date).days / 365) return value def _coverage(self, row): """Compute the time coverage of the history record with respect to the period range. Args: row (gluon.dal.Row): row of the history table. Returns: float: a value between ``0`` and ``1``. """ value = 0. period_end = self._period_end period_start = self._period_start if period_start and period_end: start = max(period_start, row.history.start_date) end = period_end if row.history.end_date: end = min(period_end, row.history.end_date) x = (end - start).days y = (period_end - period_start).days value = float(x) / float(y) return value def _duration(self, row): """Compute the duration of the history record. The end date is ``now`` when the history record is not finished. Args: row (gluon.dal.Row): row of the history table. Returns: float: in seconds """ value = timedelta(seconds=0) end = row.history.end_date now = date.today() start = row.history.start_date if start and end and start <= now <= end: value = now - start elif start and end is None and start <= now: value = now - start elif start and end and end < now: value = end - start return value.total_seconds() def _fte(self, row): """Compute the full time equivalent of the person associated to the history record. It is the product of the time coverage and the percentage of the person assigned to the record. Args: row (gluon.dal.Row): row of the history table. Returns: float: a value between 0 and 1. """ value = 0. if row.history.percentage: value = self._coverage(row) * (row.history.percentage / 100.) return value def _is_end(self, row): """Return true is the active period (for a given person, domain and team) ends during the period range. Args: row (gluon.dal.Row): row of the history table. Returns: bool: """ history = row.history start, end = self._active_period(history.id_people, history.id_domains, history.id_teams) period_end = self._period_end period_start = self._period_start if end and start != end: return period_start <= end <= period_end return False def _is_over(self, row): """Return true is the active period (for a given person, domain and team) is over now. Args: row (gluon.dal.Row): row of the history table. Returns: bool: """ history = row.history start, end = self._active_period(history.id_people, history.id_domains, history.id_teams) if end and start != end: return end < date.today() return False def _is_start(self, row): """Return true is the active period (for a given person, domain and team) starts during the period range. Args: row (gluon.dal.Row): row of the history table. Returns: bool: """ history = row.history start, end = self._active_period(history.id_people, history.id_domains, history.id_teams) period_end = self._period_end period_start = self._period_start if start and start != end: return period_start <= start <= period_end return False def query(self, table): """Supersede the base class method to handle categories constraints. Args: table (gluon.dal.Table): """ db = table._db query = SelectorActiveItems.query(self, table) # history data block # list of key, value pair: [[None, "v1"], ["k2", "v2"], [None, "v5"]] # apply the like operator: %bla%bla% data = json.loads(self.data) if len(data) > 0: conditions = [] for key, val in data: if key is None and val is None: pass elif key is None: conditions.append(val) elif val is None: conditions.append('"%s":' % key) elif val in ("true", "false"): conditions.append('"%s": %s' % (key, val)) elif isinstance(val, (unicode, str)): conditions.append('"%s": "%s"' % (key, val)) else: conditions.append('"%s": %s' % (key, val)) conditions = "%%%s%%" % "%".join(conditions) query &= db.history.data.like(conditions, case_sensitive=False) # object category and code id_object_code = self.id_object_code if id_object_code: query &= db.history.id_object_categories == id_object_code id_object_category = self.id_object_categories if id_object_category: object_categories = db.object_categories category = object_categories[id_object_category].category qcat = object_categories.category == category myset = {row.id for row in db(qcat).select(object_categories.id)} query &= db.history.id_object_categories.belongs(myset) # people category and code id_people_code = self.id_people_code if id_people_code: query &= db.history.id_people_categories == id_people_code id_people_category = self.id_people_categories if id_people_category: people_categories = db.people_categories category = people_categories[id_people_category].category qcat = people_categories.category == category myset = {row.id for row in db(qcat).select(people_categories.id)} query &= db.history.id_people_categories.belongs(myset) return query def reset_extra_queries(self): self._extra_queries = []