from urllib.parse import quote import re import json import urllib.request from .query_parser import QueryParser from .config import ResourceConfig from .diagnostics import Diagnostic, DiagnosticTypes class AnnisQueryParser(QueryParser): """ Parses search queries for ANNIS-based corpora. """ rxTsakorpusBool = re.compile('[()|,]') rxRelOps = re.compile('^(?:\\^\\*|\\||\\.[*,0-9]*)$') # Operators for setting relations between query words def build_get_string(self, params, config: ResourceConfig, withinClause=''): """ Build a payload for an ANNIS search request. ANNIS uses POST with JSON payload rather than GET, but the function name is the same as in the other classes for compatibility. """ if len(withinClause) > 0 and withinClause not in ('text', 'session'): raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS only supports multi-word search within' 'a text (with a default maximum distance of ' '50 tokens).') q = { 'query': '', 'query_language': 'AQL', 'corpora': config.annis_corpus_list, 'limit': config.max_hits, 'order': 'Randomized' } termIndexes = self.term_indexes(params) queryFront = '' queryTail = '' for param in sorted(params): # For query words: # param = [annotation_layer, query_word_number, value, operator] # For relations between query words: # param = [relation, query_word_number_1, query_word_number_2] if param[0] == 'tok' and param[3] == '=': # Simplified form for token search queryFront += param[2].replace('"', '') + ' & ' elif self.rxRelOps.search(param[0]) is not None: queryTail += '#' + str(param[1]) + ' ' + param[0] + ' #' + str(param[2]) + ' & ' else: queryFront += param[0] + param[3] + param[2] + ' & ' q['query'] = queryFront.strip(' ') + ' ' + queryTail.strip(' &') return q def term_query(self, query: str, config: ResourceConfig): """ Return list of query parameters for one term or sequence of terms. """ if len(query) >= 2 and query.startswith('"') and query.endswith('"'): query = query[1:len(query)-1] if len(query) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) getParams = [] iTerm = 0 for term in query.split(' '): if len(term) > 0: iTerm += 1 getParams.append(['tok', iTerm, '"' + term.replace('"', '') + '"', '=']) if iTerm >= 2: getParams.append(['.', iTerm, iTerm-1]) return getParams def binary_bool(self, strOp: str, operandL, operandR, config): if len(operandL) <= 0 or len(operandR) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) termsL = self.term_indexes(operandL) operandR = self.shift_term_indexes(operandR, max(termsL)) termsR = self.term_indexes(operandR) if operandL[0][0] != 'tok' or operandR[0][0] != 'tok': raise Diagnostic(DiagnosticTypes.sru, 47) if strOp == 'AND': if ((len(termsL) > 1 or len(termsR) > 1) and (any(op[0] not in ('tok', '^*') for op in operandR) or any(op[0] not in ('tok', '^*') for op in operandL))): message = 'ANNIS does not support queries that combine several ' \ 'multi-word sequences with boolean operators or multiple ' \ 'boolean operators.' raise Diagnostic(DiagnosticTypes.sru, 48, message=message) return operandL + operandR + [['^*', max(termsL), min(termsR)]] elif strOp == 'OR': if ((len(termsL) > 1 or len(termsR) > 1) and (any(op[0] not in ('tok', '|') for op in operandR) or any(op[0] not in ('tok', '|') for op in operandL))): message = 'ANNIS does not support queries that combine several ' \ 'multi-word sequences with boolean operators or multiple ' \ 'boolean operators.' raise Diagnostic(DiagnosticTypes.sru, 48, message=message) return operandL + operandR + [['|', max(termsL), min(termsR)]] raise Diagnostic(DiagnosticTypes.sru, 37, details=strOp) def not_bool(self, operand, config): if len(operand) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) if not (len(operand) == 1 and operand[0] == 'tok'): message = 'ANNIS does not support queries that negate anything ' \ 'other than a single-token subquery.' raise Diagnostic(DiagnosticTypes.sru, 48, message=message) result = operand[:] if result[3] == '=': result[3] = '!=' else: result[3] = '=' return result def adv_term_query_proper(self, identifier: str, op: str, value: str, flags: str, config: ResourceConfig): """ Return list of query parameters for one term in an advanced query. """ flags = flags.strip('/') if len(value) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) if flags not in ('', 'I', 'C'): raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS does not support regex flags.') getParams = [] if identifier == 'text': getParams.append(['tok', 1, '/' + value.replace('/', '\\/') + '/', op]) elif identifier == 'lemma': getParams.append(['lemma', 1, '/' + value.replace('/', '\\/') + '/', op]) elif identifier == 'pos': if value in config.pos_convert_reverse: # UD to corpus-specific POS tags value = config.pos_convert_reverse[value] getParams.append(['pos', 1, '/' + value.replace('/', '\\/') + '/', op]) else: getParams.append([identifier, 1, '/' + value.replace('/', '\\/') + '/', op]) # raise Diagnostic(DiagnosticTypes.sru, 10, # message='The identifier ' + identifier + ' is not supported in ANNIS.') return getParams def adv_quantify_segment(self, getParams, quantifier: str, config: ResourceConfig): if len(getParams) != 1 or getParams[0][0] != 'tok' or getParams[0][2] != '/.*/' or getParams[0][3] != '=': raise Diagnostic(DiagnosticTypes.sru, 48, message='Token quantifiers are only allowed with empty token queries ' 'in ANNIS (for setting distance constraints).') minDist = 1 maxDist = 50 if quantifier == '?': maxDist = 2 elif quantifier == '+': minDist = 2 elif self.rxQuantifierExact.search(quantifier) is not None: minDist = maxDist = int(quantifier[1:len(quantifier)-1]) else: m = self.rxQuantifierInterval.search(quantifier) if m is None: raise Diagnostic(DiagnosticTypes.sru, 10, message='Something is wrong with a token quantifier.') if len(m.group(1)) > 0: minDist = int(m.group(1)) + 1 if len(m.group(2)) > 0: maxDist = int(m.group(2)) + 1 op = '.*' if minDist > 1 or maxDist != 50: op = '.' + str(minDist) + ',' + str(maxDist) getParams = [ [op, getParams[0][1], getParams[0][1] - 1] ] return getParams def adv_main_sequence(self, operandL, operandR, config: ResourceConfig): # print('SEQUENCE JOIN', str(operandL), str(operandR)) if len(operandL) <= 0 or len(operandR) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) termsL = self.term_indexes(operandL) operandR = self.shift_term_indexes(operandR, max(termsL)) termsR = self.term_indexes(operandR) # Find out if there is already a distance constraint wordRelPresent = (any(param[0].startswith('.') for param in operandL) or any(param[0].startswith('.') and param[2] == max(termsL) for param in operandR)) if not wordRelPresent: wordRelParams = [ ['.', min(termsR), max(termsL)] ] operandR += wordRelParams return operandL + operandR # TODO: continue here def adv_binary_bool(self, strOp: str, operandL, operandR, config: ResourceConfig): # Join multiple constraints on one word in an advanced query print('ADVANCED INTERNAL BOOL', strOp, str(operandL), str(operandR)) getParams = [] if strOp == '&': strOp = ',' paramsR = {paramR[0] for paramR in operandR} for paramR in operandR: paramExists = False for paramL in operandL: if paramL[0] == paramR[0]: if strOp == ',' and paramL[0] != 'gr': raise Diagnostic(DiagnosticTypes.sru, 48, message='Tsakorpus endpoint does not support conjunctions ' 'of multiple constraints for the same layer ' 'within the same word.') paramExists = True getParams.append([paramL[0], paramL[1], '(' + paramL[2] + ')' + strOp + '(' + paramR[2] + ')']) if not paramExists: getParams.append(paramR[:]) for paramL in operandL: if paramL[0] not in paramsR: if strOp == '|': raise Diagnostic(DiagnosticTypes.sru, 48, message='Tsakorpus does not support disjunctions ' 'of constraints for multiple layers ' 'within the same word.') getParams.append(paramL[:]) return getParams def send_query(self, strGetParams: str, config: ResourceConfig): """ Send the translated query to the Tsakorpus instance. Return JSON results returned by the corpus. """ url = config.resource_base_url.strip('/') + '/search_sent?' + strGetParams print(url) response = urllib.request.urlopen(url) data = response.read() encoding = response.info().get_content_charset('utf-8') responseJSON = json.loads(data.decode(encoding)) return responseJSON if __name__ == '__main__': pass