from urllib.parse import quote import re import json import requests from .query_parser import QueryParser from .config import ResourceConfig from .diagnostics import Diagnostic, DiagnosticTypes class AnnisQueryParser(QueryParser): """ Parses search queries for ANNIS-based corpora. """ rxRelOps = re.compile('^(?:\\^\\*|\\||\\.[*,0-9]*)|_=_$') # Operators for setting relations between query words rxFramingQuotes = re.compile('^[/"]|(?<!\\\\)[/"]$') rxNodeIDPfx = re.compile('^[^/]*::') def build_get_string(self, params, config: ResourceConfig, searchOptions: dict, withinClause=''): """ Build a payload for an ANNIS search request. ANNIS uses POST with JSON payload rather than GET, but the function name is the same as in the other classes for compatibility. """ if len(withinClause) > 0 and withinClause not in ('text', 'session'): raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS only supports multi-word search within' 'a text (with a default maximum distance of ' '50 tokens).') q = { 'query': '', 'query_language': 'AQL', 'corpora': [config.annis_corpus_id], 'limit': min(config.max_hits, searchOptions['maximumRecords']), 'order': 'Randomized' } termIndexes = self.term_indexes(params) queryFront = '' queryTail = '' params = self.rename_params(params, config) for param in sorted(params): # For query words: # param = [annotation_layer, query_word_number, value, operator] # For relations between query words: # param = [relation, query_word_number_1, query_word_number_2] if param[0] == 'tok' and param[3] == '=': # Simplified form for token search queryFront += param[2].replace('"', '') + ' & ' elif self.rxRelOps.search(param[0]) is not None: queryTail += '#' + str(param[1]) + ' ' + param[0] + ' #' + str(param[2]) + ' & ' else: queryFront += param[0] + param[3] + param[2] + ' & ' q['query'] = (queryFront.strip(' ') + ' ' + queryTail).strip(' &') return q def term_query(self, query: str, config: ResourceConfig): """ Return list of query parameters for one term or sequence of terms. """ if len(query) >= 2 and query.startswith('"') and query.endswith('"'): query = query[1:len(query)-1] if len(query) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) getParams = [] iTerm = 0 for term in query.split(' '): if len(term) > 0: iTerm += 1 getParams.append(['tok', iTerm, '"' + term.replace('"', '') + '"', '=']) if iTerm >= 2: getParams.append(['.', iTerm-1, iTerm]) return getParams def binary_bool(self, strOp: str, operandL, operandR, config): if len(operandL) <= 0 or len(operandR) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) termsL = self.term_indexes(operandL) operandR = self.shift_term_indexes(operandR, max(termsL)) termsR = self.term_indexes(operandR) if operandL[0][0] != 'tok' or operandR[0][0] != 'tok': raise Diagnostic(DiagnosticTypes.sru, 47) if strOp == 'AND': if ((len(termsL) > 1 or len(termsR) > 1) and (any(op[0] not in ('tok', '^*') for op in operandR) or any(op[0] not in ('tok', '^*') for op in operandL))): message = 'ANNIS does not support queries that combine several ' \ 'multi-word sequences with boolean operators or multiple ' \ 'boolean operators.' raise Diagnostic(DiagnosticTypes.sru, 48, message=message) return operandL + operandR + [['^*', max(termsL), min(termsR)]] elif strOp == 'OR': if ((len(operandL) > 1 or len(operandR) > 1) or operandL[0][0] != 'tok' or operandR[0][0] != 'tok' or operandL[0][3] != operandR[0][3]): message = 'ANNIS does not support queries that combine several ' \ 'multi-word sequences with boolean operators or multiple ' \ 'boolean operators.' raise Diagnostic(DiagnosticTypes.sru, 48, message=message) paramNew = [operandL[0][0], operandL[0][1], '/(' + self.rxFramingQuotes.sub('', operandL[0][2]) + ')|(' + self.rxFramingQuotes.sub('', operandR[0][2]) + ')/', operandL[0][3]] return [paramNew] raise Diagnostic(DiagnosticTypes.sru, 37, details=strOp) def not_bool(self, operand, config): if len(operand) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) if not (len(operand) == 1 and operand[0] == 'tok'): message = 'ANNIS does not support queries that negate anything ' \ 'other than a single-token subquery.' raise Diagnostic(DiagnosticTypes.sru, 48, message=message) result = operand[:] if result[3] == '=': result[3] = '!=' else: result[3] = '=' return result def adv_term_query_proper(self, identifier: str, op: str, value: str, flags: str, config: ResourceConfig): """ Return list of query parameters for one term in an advanced query. """ flags = flags.strip('/') if len(value) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) if flags not in ('', 'I', 'C'): raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS does not support regex flags.') getParams = [] if identifier == 'text': getParams.append(['tok', 1, '/' + value.replace('/', '\\/') + '/', op]) elif identifier == 'lemma': getParams.append(['lemma', 1, '/' + value.replace('/', '\\/') + '/', op]) elif identifier == 'pos': if value in config.pos_convert_reverse: # UD to corpus-specific POS tags value = config.pos_convert_reverse[value] getParams.append(['pos', 1, '/' + value.replace('/', '\\/') + '/', op]) else: getParams.append([identifier, 1, '/' + value.replace('/', '\\/') + '/', op]) # raise Diagnostic(DiagnosticTypes.sru, 10, # message='The identifier ' + identifier + ' is not supported in ANNIS.') return getParams def adv_quantify_segment(self, getParams, quantifier: str, config: ResourceConfig): if len(getParams) != 1 or getParams[0][0] != 'tok' or getParams[0][2] != '/.*/' or getParams[0][3] != '=': raise Diagnostic(DiagnosticTypes.sru, 48, message='Token quantifiers are only allowed with empty token queries ' 'in ANNIS (for setting distance constraints).') minDist = 1 maxDist = 50 if quantifier == '?': maxDist = 2 elif quantifier == '+': minDist = 2 elif self.rxQuantifierExact.search(quantifier) is not None: minDist = maxDist = int(quantifier[1:len(quantifier)-1]) else: m = self.rxQuantifierInterval.search(quantifier) if m is None: raise Diagnostic(DiagnosticTypes.sru, 10, message='Something is wrong with a token quantifier.') if len(m.group(1)) > 0: minDist = int(m.group(1)) + 1 if len(m.group(2)) > 0: maxDist = int(m.group(2)) + 1 op = '.*' if minDist > 1 or maxDist != 50: op = '.' + str(minDist) + ',' + str(maxDist) getParams = [ [op, getParams[0][1] - 1, getParams[0][1]] ] return getParams def adv_main_sequence(self, operandL, operandR, config: ResourceConfig): # print('SEQUENCE JOIN', str(operandL), str(operandR)) if len(operandL) <= 0 or len(operandR) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) termsL = self.term_indexes(operandL) operandR = self.shift_term_indexes(operandR, max(termsL)) termsR = self.term_indexes(operandR) # Find out if there is already a distance constraint wordRelPresent = (any(param[0].startswith('.') for param in operandL) or any(param[0].startswith('.') and param[1] == max(termsL) for param in operandR)) if not wordRelPresent: wordRelParams = [ ['.', max(termsL), min(termsR)] ] operandR += wordRelParams return operandL + operandR def adv_binary_bool(self, strOp: str, operandL, operandR, config: ResourceConfig): # Join multiple constraints on one word in an advanced query # print('ADVANCED INTERNAL BOOL', strOp, str(operandL), str(operandR)) if strOp == '|': if (len(operandL) == 1 and len(operandR) == 1 and operandL[0][0] == operandR[0][0] and self.rxRelOps.search(operandL[0][0]) is None and operandL[0][3] == operandR[0][3]): # Disjunction of two values of the same layer: join as regex paramNew = [operandL[0][0], operandL[0][1], '/(' + self.rxFramingQuotes.sub('', operandL[0][2]) + ')|(' + self.rxFramingQuotes.sub('', operandR[0][2]) + ')/', operandL[0][3]] return [paramNew] raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS endpoint does not support disjunctions ' 'of constraints on different layers ' 'within the same word.') # If we are here, strOp == '&' # Operands are either single parameter queries or conjunctions thereof # (arbitrary disjunctions raise an exception, see above) termsL = self.term_indexes(operandL) operandR = self.shift_term_indexes(operandR, max(termsL)) termsR = self.term_indexes(operandR) wordRelParams = [ ['_=_', min(termsR), max(termsL)] ] operandR += wordRelParams return operandL + operandR def send_query(self, query, config: ResourceConfig): """ Send the translated query to the ANNIS API. Return JSON results returned by the corpus. """ res = { 'n_hits': -1, 'hit_ids': [], 'hits': [] } urlCount = config.resource_base_url.strip('/') + '/v1/search/count' response = requests.post(urlCount, json=query, timeout=config.query_timeout) try: res['n_hits'] = response.json()['match_count'] except: pass if res['n_hits'] > 0: # First, find IDs for the matches urlFind = config.resource_base_url.strip('/') + '/v1/search/find' response = requests.post(urlFind, json=query, timeout=config.query_timeout) res['hit_ids'] = [list(tokenIDs.split(' ')) for tokenIDs in response.content.decode('utf-8').strip('\n').split('\n') if len(tokenIDs) > 0] # Second, find subgraphs including those matches and some context urlSubgraph = config.resource_base_url.strip('/') + '/v1/corpora/' \ + config.annis_corpus_id + '/subgraph' for hitIDs in res['hit_ids']: subgraphQuery = { 'node_ids': [self.rxNodeIDPfx.sub('', hitID) for hitID in hitIDs], 'left': config.annis_context_size, 'right': config.annis_context_size } response = requests.post(urlSubgraph, json=subgraphQuery, timeout=config.query_timeout) res['hits'].append(response.content) return res if __name__ == '__main__': pass