query_managers.py 14.6 KB
import operator
import re

# https://anaconda.org/conda-forge/boolean.py
import boolean
# https://pypi.org/project/python-intervals/
import intervals as I

from functools import reduce
from itertools import chain

from django.core.exceptions import ValidationError
from django.db.models import Q
from django.utils.translation import gettext as _

'''
QueryManager and its subclasses implement make_entry_queries() and make_object_queries()
methods returning lists of Q objects to be applied in a cascade of filter() calls.
The use of Q objects is necessary to allow for alternatives in queries:
the union() and intersection() methods of QuerySets yield a QuerySet that does not support
further filtering.
'''

# https://docs.djangoproject.com/en/2.2/ref/forms/validation/#raising-validationerror
# ValidationError params don’t work with str.format(), must use old-style % formatting

class QueryManager(object):
    
    def __init__(self, entry_lookup, object_lookup, default_conjunction=True):
        self.entry_lookup = entry_lookup
        self.object_lookup = object_lookup if object_lookup is not None else self.entry_lookup
        self.default_conjunction=default_conjunction
    
    # https://stackoverflow.com/questions/310732/in-django-how-does-one-filter-a-queryset-with-dynamic-field-lookups
    def make_Q(self, lookup, value):
        return Q(**{lookup : value})
        
    def make_entry_queries(self, value, conjunction):
        return self._make_queries(self.entry_lookup, value, conjunction)
    
    def make_object_queries(self, value):
        return self._make_queries(self.object_lookup, value, conjunction=False)
    
    def _make_queries(self, lookup, value, conjunction):
        raise NotImplementedError

class SingleValueQueryManager(QueryManager):
    
    def _make_queries(self, lookup, value, conjunction):
        return [self.make_Q(lookup, value)]
    
class MultiValueQueryManager(QueryManager):
    
    def _make_queries(self, lookup, values, conjunction):
        queries = [self.make_Q(lookup, value) for value in values]
        if conjunction:
            return list(queries)
        else:
            return [reduce(operator.or_, queries)]

class ExpressionAlgebra(boolean.BooleanAlgebra):
    
    TOKENS = None
    
    def valid_symbol_begin(self, char):
        raise NotImplementedError
    
    def allowed(self, char):
        raise NotImplementedError
    
    def literal_validator(self, literal):
        raise NotImplementedError
    
    # modified from boolean.BooleanAlgebra.tokenize
    def tokenize(self, expr):
        if not isinstance(expr, str):
            raise TypeError('expr must be string but it is %s.' % type(expr))
        position = 0
        length = len(expr)
        while position < length:
            tok = expr[position]
            sym = self.valid_symbol_begin(tok)
            if sym:
                position += 1
                while position < length:
                    char = expr[position]
                    if self.allowed(char):
                        position += 1
                        tok += char
                    else:
                        break
                position -= 1
            try:
                yield self.TOKENS[tok.lower()], tok, position
            except KeyError:
                if sym:
                    yield boolean.TOKEN_SYMBOL, tok, position
                elif tok not in (' ', '\t', '\r', '\n'):
                    raise boolean.ParseError(token_string=tok, position=position, error_code=1)
            position += 1

class RangesAlgebra(ExpressionAlgebra):
    
    TOKENS = {
        '&'   : boolean.TOKEN_AND,
        'and' : boolean.TOKEN_AND,
        '|'   : boolean.TOKEN_OR,
        'or'  : boolean.TOKEN_OR,
        '~'   : boolean.TOKEN_NOT,
        '!'   : boolean.TOKEN_NOT,
        'not' : boolean.TOKEN_NOT,
    }
    
    OPEN_RANGE = '*'
    
    def valid_symbol_begin(self, char):
        return char == '['
    
    def allowed(self, char):
        return char.isdigit() or char in ',]' + self.OPEN_RANGE
    
    def literal_validator(self, literal):
        literal = literal.obj
        if literal[0] != '[' or literal[-1] != ']':
            raise ValidationError('Zakres musi być ograniczony nawiasami kwadratowymi [...]: %(x)s.', params={'x': literal}, code='invalid')
        inside = literal[1:-1]
        ends = [x.strip() for x in inside.split(',')]
        if len(ends) != 2:
            raise ValidationError('Zakres musi dwa końce (podano %(n)d): %(x)s.', params={'n' : len(ends), 'x': literal}, code='invalid')
        lo, hi = ends
        for e in (lo, hi):
            if not e.isdigit() and e != self.OPEN_RANGE:
                raise ValidationError('Ograniczenie zakresu musi być liczbą lub znakiem %(c)s: %(x)s.', params={'c' : self.OPEN_RANGE, 'x': e}, code='invalid')
        if lo.isdigit() and hi.isdigit() and int(lo) > int(hi):
            raise ValidationError('Pusty zakres: %(x)s.', params={'x': literal}, code='invalid')
    
class RegexAlgebra(ExpressionAlgebra):
    
    TOKENS = {
        '&'   : boolean.TOKEN_AND,
        'and' : boolean.TOKEN_AND,
        '|'   : boolean.TOKEN_OR,
        'or'  : boolean.TOKEN_OR,
        '~'   : boolean.TOKEN_NOT,
        '!'   : boolean.TOKEN_NOT,
        'not' : boolean.TOKEN_NOT,
        '['   : boolean.TOKEN_LPAR,
        ']'   : boolean.TOKEN_RPAR,
    }
    
    ALLOWED = '.*+,()_\\|'
    
    def valid_symbol_begin(self, char):
        # TODO what else can a valid regex start with in the context of filters?
        return char.isalpha() or char in '.'
    
    def allowed(self, char):
        return char.isalnum() or char in self.ALLOWED
    
    def literal_validator(self, literal):
        try:
            re.compile(literal.obj)
        except re.error as e:
            raise ValidationError('Niepoprawne wyrażenie regularne: %(x)s (%(msg)s).', params={'x' : literal.obj, 'msg': _(str(e))}, code='invalid')

class ExpressionQueryManager(QueryManager):
    
    expr_parser = None
    
    def __init__(self, entry_lookup, object_lookup, additional_operators=False, **kwargs):
        super().__init__(entry_lookup, object_lookup, **kwargs)
        self.additional_operators = additional_operators
    
    def expression_validator(self, value):
        print('EXPRESSION VALIDATOR')
        try:
            if value in ('.*', ''):
                return
            if not self.additional_operators:
                for op in ('!&', '&&'):
                    if op in value:
                        raise ValidationError('To pole nie dopuszcza operatora %(op)s.', params={'op': op}, code='invalid')
            if '!&' in value:
                for v in value.split('!&'):
                    expr = self.expr_parser.parse(v)
                    if not expr.isliteral:
                        raise ValidationError('Operator !& nie dopuszcza zagnieżdżonych wyrażeń: %(expr)s.', params={'expr': v.strip()}, code='invalid')
                    else:
                        self.expr_parser.literal_validator(expr.get_symbols()[0])
                return
            values = value.split('&&')
            exprs = list(map(self.expr_parser.parse, values))
            for expr in exprs:
                for symbol in expr.get_symbols():
                    self.expr_parser.literal_validator(symbol)
        # calls to self.expr_parser.parse will raise exceptions if the expression is malformed
        except boolean.boolean.ParseError as pe:
            raise ValidationError('Niepoprawne wyrażenie: %(msg)s.', params={'msg': _(str(pe))}, code='invalid')

class RangesQueryManager(ExpressionQueryManager):
    
    expr_parser = RangesAlgebra()
    
    def literal2intervals(self, literal):
        # a literal may be negated or have no operator attribute 
        try:
            op = literal.operator
        except:
            op = None
        symbols = literal.get_symbols()
        assert (len(symbols) == 1)
        lo, hi = symbols[0].obj.strip('[]').split(',')
        lo = int(lo) if lo != RangesQueryManager.expr_parser.OPEN_RANGE else -I.inf
        hi = int(hi) if hi != RangesQueryManager.expr_parser.OPEN_RANGE else I.inf
        interv = I.closed(lo, hi)
        if op == '~':
            interv = ~interv
        return interv
        
    def cnf2intervals(self, expr):
        if type(expr) in (boolean.AND, boolean.OR):
            subranges = list(map(self.cnf2intervals, expr.args))
            op = operator.and_ if type(expr) == boolean.AND else operator.or_
            return reduce(op, subranges)
        if expr.isliteral:
            return self.literal2intervals(expr)
        print(expr, type(expr))
        1/0
    
    def atomic_interval2query(self, interval, lookup):
        lo, hi = None, None
        if interval.lower != -I.inf:
            lo = interval.lower
            # open interval
            if not interval.left:
                lo += 1
        if interval.upper != I.inf:
            hi = interval.upper
            # open interval
            if not interval.right:
                hi -= 1
        if lo == hi:
            # (-inf,+inf)
            if lo == None:
                return None
            else:
                return self.make_Q(lookup, lo)
        else:
            qs = []
            if lo is not None:
                qs.append(self.make_Q(lookup + '__gte', lo))
            if hi is not None:
                qs.append(self.make_Q(lookup + '__lte', hi))
            return reduce(operator.and_, qs)
    
    def _make_queries(self, lookup, value, conjunction):
        if not value:
            return [[]]
        expr = self.expr_parser.parse(value)
        cnf_expr = self.expr_parser.cnf(expr)
        intervs = self.cnf2intervals(expr)._intervals
        queries = list(filter(None, [self.atomic_interval2query(interval, lookup) for interval in intervs]))
        if queries:
            return [reduce(operator.or_, queries)]
        else:
            return []

# TODO this got complicated, write more comments?
class RegexQueryManager(ExpressionQueryManager):
    
    expr_parser = RegexAlgebra()
    
    def __init__(self, entry_lookup, object_lookup, inner_class=None, outer_lookup=None, **kwargs):
        super().__init__(entry_lookup, object_lookup, **kwargs)
        #self.entry_lookup = entry_lookup
        #self.object_lookup = object_lookup if object_lookup is not None else self.entry_lookup
        self.inner_class = inner_class
        self.outer_lookup = outer_lookup
    
    def literal2query(self, literal, lookup):
        # a literal may be negated or have no operator attribute 
        try:
            op = literal.operator
        except:
            op = None
        symbols = literal.get_symbols()
        assert (len(symbols) == 1)
        q = self.make_Q(lookup + '__iregex', r'^{}$'.format(symbols[0].obj))
        if op == '~':
            q = ~q
        return q
        
    # the argument is assumed to be a conjunct of a CNF
    # (e.g. either a literal or a disjunction of literals)
    def disjunction2query(self, disjunction, lookup):
        if disjunction.isliteral:
            return self.literal2query(disjunction, lookup)
        else:
            assert (disjunction.operator == '|')
            return reduce(operator.or_, (self.literal2query(a, lookup) for a in disjunction.args))
    
    def cnf2queries(self, expr, lookup, tab=' '):
        if expr.isliteral:
            return [self.literal2query(expr, lookup)]
        if type(expr) == boolean.boolean._TRUE:
            return []
        assert (expr.operator in '|&')
        if expr.operator == '|':
            return [self.disjunction2query(expr, lookup)]
        else:
            return [self.disjunction2query(disjunction, lookup) for disjunction in expr.args]
    
    # TODO this operator is a horror...
    def exclusive_and2queries(self, lookup, value, conjunction):
        print('EXCLUSIVE AND!')
        values = value.split('!&')
        exprs = list(map(self.expr_parser.parse, values))
        print(exprs)
        for expr in exprs:
            print(expr, boolean.NOT(expr).simplify())
        return []
    
    # value has been validated as a proper expression
    def _make_queries(self, lookup, value, conjunction):
        if value == '.*':
            return [[]]
        if '!&' in value:
            return self.exclusive_and2queries(lookup, value, conjunction)
        values = value.split('&&')
        exprs = list(map(self.expr_parser.parse, values))
        print('\n\n', ' * '.join(map(str, exprs)))
        if not conjunction:
            exprs = [reduce(operator.or_, exprs)]
        print(' * '.join(map(str, exprs)))
        cnf_exprs = list(map(self.expr_parser.cnf, exprs))
        print(' * '.join(map(str, cnf_exprs)), '\n\n')
        return [self.cnf2queries(e, lookup) for e in cnf_exprs]
    
    def make_object_queries(self, value):
        # _make_queries will return a single list of queries when conjunction=False
        return self._make_queries(self.object_lookup, value, conjunction=False)[0]
        # this made little sense:
        #return reduce(operator.or_, self._make_queries(self.object_lookup, value, conjunction=False))
        
    
    # TODO (?):
    # Using ‘&’ on Q objects yields the first behavior described in 
    # https://docs.djangoproject.com/en/2.2/topics/db/queries/#spanning-multi-valued-relationships
    # Instead, a cascade of filter() calls seems necessary:
    # https://stackoverflow.com/questions/6230897/django-combining-and-and-or-queries-with-manytomany-field
    # but to keep consistent with the QueryManager interface (returning lists of Q objects),
    # Q objects for individual object specifications are created the ugly way, using the __in lookup
    def make_entry_queries(self, value, conjunction):
        if self.outer_lookup is None:
            return list(chain(*self._make_queries(self.entry_lookup, value, conjunction)))
        else:
            object_queries = self._make_queries(self.object_lookup, value, conjunction=True)
            entry_queries = []
            print('-------', object_queries)
            for queries in object_queries:
                if not queries:
                    continue
                print('    ---', queries)
                objects = self.inner_class.objects.all()
                for query in queries:
                    objects = objects.filter(query)
                entry_queries.append(self.make_Q(self.outer_lookup, objects))
            return entry_queries
    
# for MultiValueField-based filter fields
# doesn’t support operator switching for component queries (TODO?)
class MultiQueryManager(QueryManager):

    def __init__(self, managers):
        self.managers = managers
    
    '''
    def make_query(self, values, op):
        queries = [m.make_query(v, op) for m, v in zip(self.managers, values)]
        return reduce(operator.and_, queries)
    '''