Select Git revision
query_parser.py
-
Arkhangelskiy, Timofey authoredArkhangelskiy, Timofey authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
query_parser.py 7.25 KiB
from .enums import *
from .diagnostics import Diagnostic
from .config import ResourceConfig
import re
import copy
class QueryParser:
"""
This class contains commonly used methods for initial parsing of a GET
query. It does not include platform-specific methods.
"""
rxTermQuery = re.compile('^(?:(?:[^ "]|\\\\")*|"(?:[^"]|\\\\")*")$')
def __init__(self):
pass
@staticmethod
def find_operator(strQuery, start=0, end=-1):
if end == -1:
end = len(strQuery) - 1
if strQuery[start:start+3] == 'NOT':
return start, 'NOT'
parenthBalance = 0
inQuotes = False
for i in range(start, end):
if inQuotes:
if strQuery[i] == '"':
inQuotes = False
continue
if strQuery[i] == '"':
inQuotes = True
continue
if strQuery[i] == '(':
parenthBalance += 1
elif strQuery[i] == ')':
parenthBalance -= 1
elif parenthBalance == 0:
if strQuery[i:i+3] == 'AND':
return i, 'AND'
elif strQuery[i:i+2] == 'OR':
return i, 'OR'
return -1, ''
@staticmethod
def shift_term_indexes(getParams, shift):
"""
Increase all search term indexes in the GET parameters
specified by getParams by shift.
"""
getParamsShifted = []
for param in getParams:
if type(param[2]) is int:
newParam = (param[0], param[1] + shift, param[2] + shift)
elif param[1] >= 0:
newParam = (param[0], param[1] + shift, param[2])
else:
newParam = copy.deepcopy(param)
getParamsShifted.append(newParam)
return getParamsShifted
@staticmethod
def term_indexes(getParams):
"""
Find all search term indexes used in the GET parameters
specified by getParams list. Return list of integers (1-based).
"""
terms = set()
for param in getParams:
if type(param[1]) is int:
terms.add(param[1])
elif type(param[1]) is list:
for t in param[1]:
terms.add(t)
return [t for t in sorted(terms)]
def build_get_string(self, getParams, config):
# Abstract function
raise NotImplementedError()
def term_query(self, query, config):
# Abstract function
raise NotImplementedError()
def binary_bool(self, strOp, operandL, operandR, config):
# Abstract function
raise NotImplementedError()
def not_bool(self, operand, config):
# Abstract function
raise NotImplementedError()
def translate_fcsql(self, query: str, config: ResourceConfig, basicSearch: bool = False, start=0, end=-1):
"""
Translate an FCS-QL query into a corpus-specific query (GET query,
JSON Elasticsearch query or whatever).
If something is wrong with the query, raise a Diagnostic exception.
This is a top-level platform-independent function. It recursively
parses the query by locating the hierarchically highest logical operator
in the current query and then calling a respective lower-level
function, which may be platform-specific.
The function is recursive and only looks at the part of the string
delimited by start and end parameters.
"""
if end == -1:
# Top-level call, so return a finalized corpus-specific query
end = len(query)
if end == 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if self.rxTermQuery.search(query) is not None:
return self.build_get_string(self.term_query(query, config), config)
return self.build_get_string(self.translate_fcsql(query, config,
basicSearch=basicSearch,
start=start, end=end),
config)
# if query.count('(') != query.count(')'):
# return None
if len(query) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if start >= len(query) - 1 or end <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
while start < len(query) and query[start] in ' \t\n':
start += 1
while end > 0 and query[end - 1] in ' \t\n':
end -= 1
if start >= end:
raise Diagnostic(DiagnosticTypes.sru, 10)
iOpPos, strOp = self.find_operator(query, start, end)
if iOpPos == -1:
if query[start] == '(' and query[end - 1] == ')':
return self.translate_fcsql(query, config, basicSearch=basicSearch, start=start + 1, end=end - 1)
else:
return self.term_query(query[start:end], config)
if strOp in ('AND', 'OR'):
resultLeft = self.translate_fcsql(query, config, basicSearch=basicSearch, start=start, end=iOpPos)
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp), end=end)
if len(resultLeft) <= 0 or len(resultRight) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
return self.binary_bool(strOp, resultLeft, resultRight, config)
elif strOp == 'NOT':
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp),
end=end)
return self.not_bool(resultRight, config)
return {}
def validate_query(self, operation, version, queryType, query,
xFcsEndpointDescription, xFcsContext,
xFcsDataviews, xFcsRewritesAllowed):
"""
Check if the query parameters contain a valid combination of values.
:param operation:
:param version:
:param queryType:
:param query:
:param xFcsEndpointDescription:
:param xFcsContext:
:param xFcsDataviews:
:param xFcsRewritesAllowed:
:return: Return a list of diagnostics describing problems with the query.
If the query is prima facie valid and can be processed further, an empty
list will be returned.
"""
diagnostics = []
# Check if additional parameters combine with the operation requested
# (FCS specifications, 4.1)
if len(xFcsEndpointDescription) > 0 and operation != Operation.explain:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-endpoint-description'))
if len(xFcsContext) > 0 and operation != Operation.searchRetrieve:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-context'))
if len(xFcsDataviews) > 0 and operation != Operation.searchRetrieve:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-dataviews'))
if len(xFcsRewritesAllowed) > 0 and operation != Operation.searchRetrieve:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-rewrites-allowed'))
return diagnostics
if __name__ == '__main__':
pass