Select Git revision
-
Seseke, David authoredSeseke, David authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
annis_query_parser.py 10.70 KiB
from urllib.parse import quote
import re
import json
import urllib.request
from .query_parser import QueryParser
from .config import ResourceConfig
from .diagnostics import Diagnostic, DiagnosticTypes
class AnnisQueryParser(QueryParser):
"""
Parses search queries for ANNIS-based corpora.
"""
rxTsakorpusBool = re.compile('[()|,]')
rxRelOps = re.compile('^(?:\\^\\*|\\||\\.[*,0-9]*)$') # Operators for setting relations between query words
def build_get_string(self, params, config: ResourceConfig, withinClause=''):
"""
Build a payload for an ANNIS search request.
ANNIS uses POST with JSON payload rather than GET, but the
function name is the same as in the other classes for
compatibility.
"""
if len(withinClause) > 0 and withinClause not in ('text', 'session'):
raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS only supports multi-word search within'
'a text (with a default maximum distance of '
'50 tokens).')
q = {
'query': '',
'query_language': 'AQL',
'corpora': config.annis_corpus_list,
'limit': config.max_hits,
'order': 'Randomized'
}
termIndexes = self.term_indexes(params)
queryFront = ''
queryTail = ''
for param in sorted(params):
# For query words:
# param = [annotation_layer, query_word_number, value, operator]
# For relations between query words:
# param = [relation, query_word_number_1, query_word_number_2]
if param[0] == 'tok' and param[3] == '=':
# Simplified form for token search
queryFront += param[2].replace('"', '') + ' & '
elif self.rxRelOps.search(param[0]) is not None:
queryTail += '#' + str(param[1]) + ' ' + param[0] + ' #' + str(param[2]) + ' & '
else:
queryFront += param[0] + param[3] + param[2] + ' & '
q['query'] = queryFront.strip(' ') + ' ' + queryTail.strip(' &')
return q
def term_query(self, query: str, config: ResourceConfig):
"""
Return list of query parameters for one term or sequence of terms.
"""
if len(query) >= 2 and query.startswith('"') and query.endswith('"'):
query = query[1:len(query)-1]
if len(query) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
getParams = []
iTerm = 0
for term in query.split(' '):
if len(term) > 0:
iTerm += 1
getParams.append(['tok', iTerm, '"' + term.replace('"', '') + '"', '='])
if iTerm >= 2:
getParams.append(['.', iTerm, iTerm-1])
return getParams
def binary_bool(self, strOp: str, operandL, operandR, config):
if len(operandL) <= 0 or len(operandR) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
termsL = self.term_indexes(operandL)
operandR = self.shift_term_indexes(operandR, max(termsL))
termsR = self.term_indexes(operandR)
if operandL[0][0] != 'tok' or operandR[0][0] != 'tok':
raise Diagnostic(DiagnosticTypes.sru, 47)
if strOp == 'AND':
if ((len(termsL) > 1 or len(termsR) > 1)
and (any(op[0] not in ('tok', '^*') for op in operandR)
or any(op[0] not in ('tok', '^*') for op in operandL))):
message = 'ANNIS does not support queries that combine several ' \
'multi-word sequences with boolean operators or multiple ' \
'boolean operators.'
raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
return operandL + operandR + [['^*', max(termsL), min(termsR)]]
elif strOp == 'OR':
if ((len(termsL) > 1 or len(termsR) > 1)
and (any(op[0] not in ('tok', '|') for op in operandR)
or any(op[0] not in ('tok', '|') for op in operandL))):
message = 'ANNIS does not support queries that combine several ' \
'multi-word sequences with boolean operators or multiple ' \
'boolean operators.'
raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
return operandL + operandR + [['|', max(termsL), min(termsR)]]
raise Diagnostic(DiagnosticTypes.sru, 37, details=strOp)
def not_bool(self, operand, config):
if len(operand) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
if not (len(operand) == 1 and operand[0] == 'tok'):
message = 'ANNIS does not support queries that negate anything ' \
'other than a single-token subquery.'
raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
result = operand[:]
if result[3] == '=':
result[3] = '!='
else:
result[3] = '='
return result
def adv_term_query_proper(self, identifier: str, op: str, value: str, flags: str, config: ResourceConfig):
"""
Return list of query parameters for one term in an advanced query.
"""
flags = flags.strip('/')
if len(value) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
if flags not in ('', 'I', 'C'):
raise Diagnostic(DiagnosticTypes.sru, 48, message='ANNIS does not support regex flags.')
getParams = []
if identifier == 'text':
getParams.append(['tok', 1, '/' + value.replace('/', '\\/') + '/', op])
elif identifier == 'lemma':
getParams.append(['lemma', 1, '/' + value.replace('/', '\\/') + '/', op])
elif identifier == 'pos':
if value in config.pos_convert_reverse:
# UD to corpus-specific POS tags
value = config.pos_convert_reverse[value]
getParams.append(['pos', 1, '/' + value.replace('/', '\\/') + '/', op])
else:
getParams.append([identifier, 1, '/' + value.replace('/', '\\/') + '/', op])
# raise Diagnostic(DiagnosticTypes.sru, 10,
# message='The identifier ' + identifier + ' is not supported in ANNIS.')
return getParams
def adv_quantify_segment(self, getParams, quantifier: str, config: ResourceConfig):
if len(getParams) != 1 or getParams[0][0] != 'tok' or getParams[0][2] != '/.*/' or getParams[0][3] != '=':
raise Diagnostic(DiagnosticTypes.sru, 48,
message='Token quantifiers are only allowed with empty token queries '
'in ANNIS (for setting distance constraints).')
minDist = 1
maxDist = 50
if quantifier == '?':
maxDist = 2
elif quantifier == '+':
minDist = 2
elif self.rxQuantifierExact.search(quantifier) is not None:
minDist = maxDist = int(quantifier[1:len(quantifier)-1])
else:
m = self.rxQuantifierInterval.search(quantifier)
if m is None:
raise Diagnostic(DiagnosticTypes.sru, 10,
message='Something is wrong with a token quantifier.')
if len(m.group(1)) > 0:
minDist = int(m.group(1)) + 1
if len(m.group(2)) > 0:
maxDist = int(m.group(2)) + 1
op = '.*'
if minDist > 1 or maxDist != 50:
op = '.' + str(minDist) + ',' + str(maxDist)
getParams = [
[op, getParams[0][1], getParams[0][1] - 1]
]
return getParams
def adv_main_sequence(self, operandL, operandR, config: ResourceConfig):
# print('SEQUENCE JOIN', str(operandL), str(operandR))
if len(operandL) <= 0 or len(operandR) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
termsL = self.term_indexes(operandL)
operandR = self.shift_term_indexes(operandR, max(termsL))
termsR = self.term_indexes(operandR)
# Find out if there is already a distance constraint
wordRelPresent = (any(param[0].startswith('.') for param in operandL)
or any(param[0].startswith('.') and param[2] == max(termsL)
for param in operandR))
if not wordRelPresent:
wordRelParams = [
['.', min(termsR), max(termsL)]
]
operandR += wordRelParams
return operandL + operandR
# TODO: continue here
def adv_binary_bool(self, strOp: str, operandL, operandR, config: ResourceConfig):
# Join multiple constraints on one word in an advanced query
print('ADVANCED INTERNAL BOOL', strOp, str(operandL), str(operandR))
getParams = []
if strOp == '&':
strOp = ','
paramsR = {paramR[0] for paramR in operandR}
for paramR in operandR:
paramExists = False
for paramL in operandL:
if paramL[0] == paramR[0]:
if strOp == ',' and paramL[0] != 'gr':
raise Diagnostic(DiagnosticTypes.sru, 48,
message='Tsakorpus endpoint does not support conjunctions '
'of multiple constraints for the same layer '
'within the same word.')
paramExists = True
getParams.append([paramL[0], paramL[1], '(' + paramL[2] + ')' + strOp + '(' + paramR[2] + ')'])
if not paramExists:
getParams.append(paramR[:])
for paramL in operandL:
if paramL[0] not in paramsR:
if strOp == '|':
raise Diagnostic(DiagnosticTypes.sru, 48,
message='Tsakorpus does not support disjunctions '
'of constraints for multiple layers '
'within the same word.')
getParams.append(paramL[:])
return getParams
def send_query(self, strGetParams: str, config: ResourceConfig):
"""
Send the translated query to the Tsakorpus instance. Return JSON results
returned by the corpus.
"""
url = config.resource_base_url.strip('/') + '/search_sent?' + strGetParams
print(url)
response = urllib.request.urlopen(url)
data = response.read()
encoding = response.info().get_content_charset('utf-8')
responseJSON = json.loads(data.decode(encoding))
return responseJSON
if __name__ == '__main__':
pass