Select Git revision
AdditionalPDFs.py
-
Jack Christopher Hutchinson Rolph authored
Update example.py, requirements.txt, Model.py, LossFunctions.py, HelperFunctions.py, AdditionalPDFs.py, PeakOTron.py
Jack Christopher Hutchinson Rolph authoredUpdate example.py, requirements.txt, Model.py, LossFunctions.py, HelperFunctions.py, AdditionalPDFs.py, PeakOTron.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
query_parser.py 7.25 KiB
from .enums import *
from .diagnostics import Diagnostic
from .config import ResourceConfig
import re
import copy
class QueryParser:
"""
This class contains commonly used methods for initial parsing of a GET
query. It does not include platform-specific methods.
"""
rxTermQuery = re.compile('^(?:(?:[^ "]|\\\\")*|"(?:[^"]|\\\\")*")$')
def __init__(self):
pass
@staticmethod
def find_operator(strQuery, start=0, end=-1):
if end == -1:
end = len(strQuery) - 1
if strQuery[start:start+3] == 'NOT':
return start, 'NOT'
parenthBalance = 0
inQuotes = False
for i in range(start, end):
if inQuotes:
if strQuery[i] == '"':
inQuotes = False
continue
if strQuery[i] == '"':
inQuotes = True
continue
if strQuery[i] == '(':
parenthBalance += 1
elif strQuery[i] == ')':
parenthBalance -= 1
elif parenthBalance == 0:
if strQuery[i:i+3] == 'AND':
return i, 'AND'
elif strQuery[i:i+2] == 'OR':
return i, 'OR'
return -1, ''
@staticmethod
def shift_term_indexes(getParams, shift):
"""
Increase all search term indexes in the GET parameters
specified by getParams by shift.
"""
getParamsShifted = []
for param in getParams:
if type(param[2]) is int:
newParam = (param[0], param[1] + shift, param[2] + shift)
elif param[1] >= 0:
newParam = (param[0], param[1] + shift, param[2])
else:
newParam = copy.deepcopy(param)
getParamsShifted.append(newParam)
return getParamsShifted
@staticmethod
def term_indexes(getParams):
"""
Find all search term indexes used in the GET parameters
specified by getParams list. Return list of integers (1-based).
"""
terms = set()
for param in getParams:
if type(param[1]) is int:
terms.add(param[1])
elif type(param[1]) is list:
for t in param[1]:
terms.add(t)
return [t for t in sorted(terms)]
def build_get_string(self, getParams, config):
# Abstract function
raise NotImplementedError()
def term_query(self, query, config):
# Abstract function
raise NotImplementedError()
def binary_bool(self, strOp, operandL, operandR, config):
# Abstract function
raise NotImplementedError()
def not_bool(self, operand, config):
# Abstract function
raise NotImplementedError()
def translate_fcsql(self, query: str, config: ResourceConfig, basicSearch: bool = False, start=0, end=-1):
"""
Translate an FCS-QL query into a corpus-specific query (GET query,
JSON Elasticsearch query or whatever).
If something is wrong with the query, raise a Diagnostic exception.
This is a top-level platform-independent function. It recursively
parses the query by locating the hierarchically highest logical operator
in the current query and then calling a respective lower-level
function, which may be platform-specific.
The function is recursive and only looks at the part of the string
delimited by start and end parameters.
"""
if end == -1:
# Top-level call, so return a finalized corpus-specific query
end = len(query)
if end == 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if self.rxTermQuery.search(query) is not None:
return self.build_get_string(self.term_query(query, config), config)
return self.build_get_string(self.translate_fcsql(query, config,
basicSearch=basicSearch,
start=start, end=end),
config)
# if query.count('(') != query.count(')'):
# return None
if len(query) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if start >= len(query) - 1 or end <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
while start < len(query) and query[start] in ' \t\n':
start += 1
while end > 0 and query[end - 1] in ' \t\n':
end -= 1
if start >= end:
raise Diagnostic(DiagnosticTypes.sru, 10)
iOpPos, strOp = self.find_operator(query, start, end)
if iOpPos == -1:
if query[start] == '(' and query[end - 1] == ')':
return self.translate_fcsql(query, config, basicSearch=basicSearch, start=start + 1, end=end - 1)
else:
return self.term_query(query[start:end], config)
if strOp in ('AND', 'OR'):
resultLeft = self.translate_fcsql(query, config, basicSearch=basicSearch, start=start, end=iOpPos)
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp), end=end)
if len(resultLeft) <= 0 or len(resultRight) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
return self.binary_bool(strOp, resultLeft, resultRight, config)
elif strOp == 'NOT':
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp),
end=end)
return self.not_bool(resultRight, config)
return {}
def validate_query(self, operation, version, queryType, query,
xFcsEndpointDescription, xFcsContext,
xFcsDataviews, xFcsRewritesAllowed):
"""
Check if the query parameters contain a valid combination of values.
:param operation:
:param version:
:param queryType:
:param query:
:param xFcsEndpointDescription:
:param xFcsContext:
:param xFcsDataviews:
:param xFcsRewritesAllowed:
:return: Return a list of diagnostics describing problems with the query.
If the query is prima facie valid and can be processed further, an empty
list will be returned.
"""
diagnostics = []
# Check if additional parameters combine with the operation requested
# (FCS specifications, 4.1)
if len(xFcsEndpointDescription) > 0 and operation != Operation.explain:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-endpoint-description'))
if len(xFcsContext) > 0 and operation != Operation.searchRetrieve:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-context'))
if len(xFcsDataviews) > 0 and operation != Operation.searchRetrieve:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-dataviews'))
if len(xFcsRewritesAllowed) > 0 and operation != Operation.searchRetrieve:
diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-rewrites-allowed'))
return diagnostics
if __name__ == '__main__':
pass