Skip to content
Snippets Groups Projects
Select Git revision
  • c58dd603dd7f35a0c2ce48c6bf052d3461b88aec
  • main default protected
  • sumlab
  • dev/test_tobias
  • jack.rolph-main-patch-16563
  • jack.rolph-main-patch-96201
  • jack.rolph-main-patch-18340
  • jack.rolph-main-patch-15793
  • jack.rolph-main-patch-74592
  • 1.0.0
10 results

PeakOTron.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    annis_query_parser.py 10.70 KiB
    from urllib.parse import quote
    import re
    import json
    import urllib.request
    from .query_parser import QueryParser
    from .config import ResourceConfig
    from .diagnostics import Diagnostic, DiagnosticTypes
    
    
    class AnnisQueryParser(QueryParser):
        """
        Parses search queries for ANNIS-based corpora.
        """
    
        rxTsakorpusBool = re.compile('[()|,]')
        rxRelOps = re.compile('^(?:\\^\\*|\\||\\.[*,0-9]*)$')      # Operators for setting relations between query words
    
        def build_get_string(self, params, config: ResourceConfig, withinClause=''):
            """
            Build a payload for an ANNIS search request.
            ANNIS uses POST with JSON payload rather than GET, but the
            function name is the same as in the other classes for
            compatibility.
            """
            if len(withinClause) > 0 and withinClause not in ('text', 'session'):
                raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS only supports multi-word search within'
                                                                  'a text (with a default maximum distance of '
                                                                  '50 tokens).')
            q = {
                'query': '',
                'query_language': 'AQL',
                'corpora': config.annis_corpus_list,
                'limit': config.max_hits,
                'order': 'Randomized'
            }
            termIndexes = self.term_indexes(params)
            queryFront = ''
            queryTail = ''
            for param in sorted(params):
                # For query words:
                # param = [annotation_layer, query_word_number, value, operator]
                # For relations between query words:
                # param = [relation, query_word_number_1, query_word_number_2]
                if param[0] == 'tok' and param[3] == '=':
                    # Simplified form for token search
                    queryFront += param[2].replace('"', '') + ' & '
                elif self.rxRelOps.search(param[0]) is not None:
                    queryTail += '#' + str(param[1]) + ' ' + param[0] + ' #' + str(param[2]) + ' & '
                else:
                    queryFront += param[0] + param[3] + param[2] + ' & '
            q['query'] = queryFront.strip(' ') + ' ' + queryTail.strip(' &')
            return q
    
        def term_query(self, query: str, config: ResourceConfig):
            """
            Return list of query parameters for one term or sequence of terms.
            """
            if len(query) >= 2 and query.startswith('"') and query.endswith('"'):
                query = query[1:len(query)-1]
            if len(query) <= 0:
                raise Diagnostic(DiagnosticTypes.sru, 10)
            getParams = []
            iTerm = 0
            for term in query.split(' '):
                if len(term) > 0:
                    iTerm += 1
                    getParams.append(['tok', iTerm, '"' + term.replace('"', '') + '"', '='])
                    if iTerm >= 2:
                        getParams.append(['.', iTerm, iTerm-1])
            return getParams
    
        def binary_bool(self, strOp: str, operandL, operandR, config):
            if len(operandL) <= 0 or len(operandR) <= 0:
                raise Diagnostic(DiagnosticTypes.sru, 10)
            termsL = self.term_indexes(operandL)
            operandR = self.shift_term_indexes(operandR, max(termsL))
            termsR = self.term_indexes(operandR)
    
            if operandL[0][0] != 'tok' or operandR[0][0] != 'tok':
                raise Diagnostic(DiagnosticTypes.sru, 47)
            if strOp == 'AND':
                if ((len(termsL) > 1 or len(termsR) > 1)
                        and (any(op[0] not in ('tok', '^*') for op in operandR)
                             or any(op[0] not in ('tok', '^*') for op in operandL))):
                    message = 'ANNIS does not support queries that combine several ' \
                              'multi-word sequences with boolean operators or multiple ' \
                              'boolean operators.'
                    raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
                return operandL + operandR + [['^*', max(termsL), min(termsR)]]
            elif strOp == 'OR':
                if ((len(termsL) > 1 or len(termsR) > 1)
                        and (any(op[0] not in ('tok', '|') for op in operandR)
                             or any(op[0] not in ('tok', '|') for op in operandL))):
                    message = 'ANNIS does not support queries that combine several ' \
                              'multi-word sequences with boolean operators or multiple ' \
                              'boolean operators.'
                    raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
                return operandL + operandR + [['|', max(termsL), min(termsR)]]
            raise Diagnostic(DiagnosticTypes.sru, 37, details=strOp)
    
        def not_bool(self, operand, config):
            if len(operand) <= 0:
                raise Diagnostic(DiagnosticTypes.sru, 10)
            if not (len(operand) == 1 and operand[0] == 'tok'):
                message = 'ANNIS does not support queries that negate anything ' \
                          'other than a single-token subquery.'
                raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
            result = operand[:]
            if result[3] == '=':
                result[3] = '!='
            else:
                result[3] = '='
            return result
    
        def adv_term_query_proper(self, identifier: str, op: str, value: str, flags: str, config: ResourceConfig):
            """
            Return list of query parameters for one term in an advanced query.
            """
            flags = flags.strip('/')
            if len(value) <= 0:
                raise Diagnostic(DiagnosticTypes.sru, 10)
            if flags not in ('', 'I', 'C'):
                raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS does not support regex flags.')
            getParams = []
            if identifier == 'text':
                getParams.append(['tok', 1, '/' + value.replace('/', '\\/') + '/', op])
            elif identifier == 'lemma':
                getParams.append(['lemma', 1, '/' + value.replace('/', '\\/') + '/', op])
            elif identifier == 'pos':
                if value in config.pos_convert_reverse:
                    # UD to corpus-specific POS tags
                    value = config.pos_convert_reverse[value]
                getParams.append(['pos', 1, '/' + value.replace('/', '\\/') + '/', op])
            else:
                getParams.append([identifier, 1, '/' + value.replace('/', '\\/') + '/', op])
                # raise Diagnostic(DiagnosticTypes.sru, 10,
                #                   message='The identifier ' + identifier + ' is not supported in ANNIS.')
            return getParams
    
        def adv_quantify_segment(self, getParams, quantifier: str, config: ResourceConfig):
            if len(getParams) != 1 or getParams[0][0] != 'tok' or getParams[0][2] != '/.*/' or getParams[0][3] != '=':
                raise Diagnostic(DiagnosticTypes.sru, 48,
                                 message='Token quantifiers are only allowed with empty token queries '
                                         'in ANNIS (for setting distance constraints).')
            minDist = 1
            maxDist = 50
            if quantifier == '?':
                maxDist = 2
            elif quantifier == '+':
                minDist = 2
            elif self.rxQuantifierExact.search(quantifier) is not None:
                minDist = maxDist = int(quantifier[1:len(quantifier)-1])
            else:
                m = self.rxQuantifierInterval.search(quantifier)
                if m is None:
                    raise Diagnostic(DiagnosticTypes.sru, 10,
                                     message='Something is wrong with a token quantifier.')
                if len(m.group(1)) > 0:
                    minDist = int(m.group(1)) + 1
                if len(m.group(2)) > 0:
                    maxDist = int(m.group(2)) + 1
            op = '.*'
            if minDist > 1 or maxDist != 50:
                op = '.' + str(minDist) + ',' + str(maxDist)
            getParams = [
                [op, getParams[0][1], getParams[0][1] - 1]
            ]
            return getParams
    
        def adv_main_sequence(self, operandL, operandR, config: ResourceConfig):
            # print('SEQUENCE JOIN', str(operandL), str(operandR))
            if len(operandL) <= 0 or len(operandR) <= 0:
                raise Diagnostic(DiagnosticTypes.sru, 10)
            termsL = self.term_indexes(operandL)
            operandR = self.shift_term_indexes(operandR, max(termsL))
            termsR = self.term_indexes(operandR)
            # Find out if there is already a distance constraint
            wordRelPresent = (any(param[0].startswith('.') for param in operandL)
                              or any(param[0].startswith('.') and param[2] == max(termsL)
                                     for param in operandR))
            if not wordRelPresent:
                wordRelParams = [
                    ['.', min(termsR), max(termsL)]
                ]
                operandR += wordRelParams
            return operandL + operandR
    
        # TODO: continue here
        def adv_binary_bool(self, strOp: str, operandL, operandR, config: ResourceConfig):
            # Join multiple constraints on one word in an advanced query
            print('ADVANCED INTERNAL BOOL', strOp, str(operandL), str(operandR))
            getParams = []
            if strOp == '&':
                strOp = ','
            paramsR = {paramR[0] for paramR in operandR}
            for paramR in operandR:
                paramExists = False
                for paramL in operandL:
                    if paramL[0] == paramR[0]:
                        if strOp == ',' and paramL[0] != 'gr':
                            raise Diagnostic(DiagnosticTypes.sru, 48,
                                             message='Tsakorpus endpoint does not support conjunctions '
                                                     'of multiple constraints for the same layer '
                                                     'within the same word.')
                        paramExists = True
                        getParams.append([paramL[0], paramL[1], '(' + paramL[2] + ')' + strOp + '(' + paramR[2] + ')'])
                if not paramExists:
                    getParams.append(paramR[:])
            for paramL in operandL:
                if paramL[0] not in paramsR:
                    if strOp == '|':
                        raise Diagnostic(DiagnosticTypes.sru, 48,
                                         message='Tsakorpus does not support disjunctions '
                                                 'of constraints for multiple layers '
                                                 'within the same word.')
                    getParams.append(paramL[:])
            return getParams
    
        def send_query(self, strGetParams: str, config: ResourceConfig):
            """
            Send the translated query to the Tsakorpus instance. Return JSON results
            returned by the corpus.
            """
            url = config.resource_base_url.strip('/') + '/search_sent?' + strGetParams
            print(url)
            response = urllib.request.urlopen(url)
            data = response.read()
            encoding = response.info().get_content_charset('utf-8')
            responseJSON = json.loads(data.decode(encoding))
            return responseJSON
    
    
    if __name__ == '__main__':
        pass