from .enums import * from .diagnostics import Diagnostic from .config import ResourceConfig import re import copy class QueryParser: """ This class contains commonly used methods for initial parsing of a GET query. It does not include platform-specific methods. """ rxTermQuery = re.compile('^(?:(?:[^ "]|\\\\")*|"(?:[^"]|\\\\")*")$') def __init__(self): pass @staticmethod def find_operator(strQuery, start=0, end=-1): if end == -1: end = len(strQuery) - 1 if strQuery[start:start+3] == 'NOT': return start, 'NOT' parenthBalance = 0 inQuotes = False for i in range(start, end): if inQuotes: if strQuery[i] == '"': inQuotes = False continue if strQuery[i] == '"': inQuotes = True continue if strQuery[i] == '(': parenthBalance += 1 elif strQuery[i] == ')': parenthBalance -= 1 elif parenthBalance == 0: if strQuery[i:i+3] == 'AND': return i, 'AND' elif strQuery[i:i+2] == 'OR': return i, 'OR' return -1, '' @staticmethod def shift_term_indexes(getParams, shift): """ Increase all search term indexes in the GET parameters specified by getParams by shift. """ getParamsShifted = [] for param in getParams: if type(param[2]) is int: newParam = (param[0], param[1] + shift, param[2] + shift) elif param[1] >= 0: newParam = (param[0], param[1] + shift, param[2]) else: newParam = copy.deepcopy(param) getParamsShifted.append(newParam) return getParamsShifted @staticmethod def term_indexes(getParams): """ Find all search term indexes used in the GET parameters specified by getParams list. Return list of integers (1-based). """ terms = set() for param in getParams: if type(param[1]) is int: terms.add(param[1]) elif type(param[1]) is list: for t in param[1]: terms.add(t) return [t for t in sorted(terms)] def build_get_string(self, getParams, config): # Abstract function raise NotImplementedError() def term_query(self, query, config): # Abstract function raise NotImplementedError() def binary_bool(self, strOp, operandL, operandR, config): # Abstract function raise NotImplementedError() def not_bool(self, operand, config): # Abstract function raise NotImplementedError() def translate_fcsql(self, query: str, config: ResourceConfig, basicSearch: bool = False, start=0, end=-1): """ Translate an FCS-QL query into a corpus-specific query (GET query, JSON Elasticsearch query or whatever). If something is wrong with the query, raise a Diagnostic exception. This is a top-level platform-independent function. It recursively parses the query by locating the hierarchically highest logical operator in the current query and then calling a respective lower-level function, which may be platform-specific. The function is recursive and only looks at the part of the string delimited by start and end parameters. """ if end == -1: # Top-level call, so return a finalized corpus-specific query end = len(query) if end == 0: raise Diagnostic(DiagnosticTypes.sru, 27) if self.rxTermQuery.search(query) is not None: return self.build_get_string(self.term_query(query, config), config) return self.build_get_string(self.translate_fcsql(query, config, basicSearch=basicSearch, start=start, end=end), config) # if query.count('(') != query.count(')'): # return None if len(query) <= 0: raise Diagnostic(DiagnosticTypes.sru, 27) if start >= len(query) - 1 or end <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) while start < len(query) and query[start] in ' \t\n': start += 1 while end > 0 and query[end - 1] in ' \t\n': end -= 1 if start >= end: raise Diagnostic(DiagnosticTypes.sru, 10) iOpPos, strOp = self.find_operator(query, start, end) if iOpPos == -1: if query[start] == '(' and query[end - 1] == ')': return self.translate_fcsql(query, config, basicSearch=basicSearch, start=start + 1, end=end - 1) else: return self.term_query(query[start:end], config) if strOp in ('AND', 'OR'): resultLeft = self.translate_fcsql(query, config, basicSearch=basicSearch, start=start, end=iOpPos) resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp), end=end) if len(resultLeft) <= 0 or len(resultRight) <= 0: raise Diagnostic(DiagnosticTypes.sru, 10) return self.binary_bool(strOp, resultLeft, resultRight, config) elif strOp == 'NOT': resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp), end=end) return self.not_bool(resultRight, config) return {} def validate_query(self, operation, version, queryType, query, xFcsEndpointDescription, xFcsContext, xFcsDataviews, xFcsRewritesAllowed): """ Check if the query parameters contain a valid combination of values. :param operation: :param version: :param queryType: :param query: :param xFcsEndpointDescription: :param xFcsContext: :param xFcsDataviews: :param xFcsRewritesAllowed: :return: Return a list of diagnostics describing problems with the query. If the query is prima facie valid and can be processed further, an empty list will be returned. """ diagnostics = [] # Check if additional parameters combine with the operation requested # (FCS specifications, 4.1) if len(xFcsEndpointDescription) > 0 and operation != Operation.explain: diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-endpoint-description')) if len(xFcsContext) > 0 and operation != Operation.searchRetrieve: diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-context')) if len(xFcsDataviews) > 0 and operation != Operation.searchRetrieve: diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-dataviews')) if len(xFcsRewritesAllowed) > 0 and operation != Operation.searchRetrieve: diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-rewrites-allowed')) return diagnostics if __name__ == '__main__': pass