Skip to content
Snippets Groups Projects
Select Git revision
  • 397e428687e0494566f7bd3fe8ccb44e33c6dba2
  • master default protected
2 results

query_parser.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    query_parser.py 7.25 KiB
    from .enums import *
    from .diagnostics import Diagnostic
    from .config import ResourceConfig
    import re
    import copy
    
    
    class QueryParser:
        """
        This class contains commonly used methods for initial parsing of a GET
        query. It does not include platform-specific methods.
        """
    
        rxTermQuery = re.compile('^(?:(?:[^ "]|\\\\")*|"(?:[^"]|\\\\")*")$')
    
        def __init__(self):
            pass
    
        @staticmethod
        def find_operator(strQuery, start=0, end=-1):
            if end == -1:
                end = len(strQuery) - 1
            if strQuery[start:start+3] == 'NOT':
                return start, 'NOT'
            parenthBalance = 0
            inQuotes = False
            for i in range(start, end):
                if inQuotes:
                    if strQuery[i] == '"':
                        inQuotes = False
                    continue
                if strQuery[i] == '"':
                    inQuotes = True
                    continue
                if strQuery[i] == '(':
                    parenthBalance += 1
                elif strQuery[i] == ')':
                    parenthBalance -= 1
                elif parenthBalance == 0:
                    if strQuery[i:i+3] == 'AND':
                        return i, 'AND'
                    elif strQuery[i:i+2] == 'OR':
                        return i, 'OR'
            return -1, ''
    
        @staticmethod
        def shift_term_indexes(getParams, shift):
            """
            Increase all search term indexes in the GET parameters
            specified by getParams by shift.
            """
            getParamsShifted = []
            for param in getParams:
                if type(param[2]) is int:
                    newParam = (param[0], param[1] + shift, param[2] + shift)
                elif param[1] >= 0:
                    newParam = (param[0], param[1] + shift, param[2])
                else:
                    newParam = copy.deepcopy(param)
                getParamsShifted.append(newParam)
            return getParamsShifted
    
        @staticmethod
        def term_indexes(getParams):
            """
            Find all search term indexes used in the GET parameters
            specified by getParams list. Return list of integers (1-based).
            """
            terms = set()
            for param in getParams:
                if type(param[1]) is int:
                    terms.add(param[1])
                elif type(param[1]) is list:
                    for t in param[1]:
                        terms.add(t)
            return [t for t in sorted(terms)]
    
        def build_get_string(self, getParams, config):
            # Abstract function
            raise NotImplementedError()
    
        def term_query(self, query, config):
            # Abstract function
            raise NotImplementedError()
    
        def binary_bool(self, strOp, operandL, operandR, config):
            # Abstract function
            raise NotImplementedError()
    
        def not_bool(self, operand, config):
            # Abstract function
            raise NotImplementedError()
    
        def translate_fcsql(self, query: str, config: ResourceConfig, basicSearch: bool = False, start=0, end=-1):
            """
            Translate an FCS-QL query into a corpus-specific query (GET query,
            JSON Elasticsearch query or whatever).
            If something is wrong with the query, raise a Diagnostic exception.
            This is a top-level platform-independent function. It recursively
            parses the query by locating the hierarchically highest logical operator
            in the current query and then calling a respective lower-level
            function, which may be platform-specific.
            The function is recursive and only looks at the part of the string
            delimited by start and end parameters.
            """
            if end == -1:
                # Top-level call, so return a finalized corpus-specific query
                end = len(query)
                if end == 0:
                    raise Diagnostic(DiagnosticTypes.sru, 27)
                if self.rxTermQuery.search(query) is not None:
                    return self.build_get_string(self.term_query(query, config), config)
                return self.build_get_string(self.translate_fcsql(query, config,
                                                                  basicSearch=basicSearch,
                                                                  start=start, end=end),
                                             config)
                # if query.count('(') != query.count(')'):
                #     return None
            if len(query) <= 0:
                raise Diagnostic(DiagnosticTypes.sru, 27)
            if start >= len(query) - 1 or end <= 0:
                raise Diagnostic(DiagnosticTypes.sru, 10)
            while start < len(query) and query[start] in ' \t\n':
                start += 1
            while end > 0 and query[end - 1] in ' \t\n':
                end -= 1
            if start >= end:
                raise Diagnostic(DiagnosticTypes.sru, 10)
            iOpPos, strOp = self.find_operator(query, start, end)
            if iOpPos == -1:
                if query[start] == '(' and query[end - 1] == ')':
                    return self.translate_fcsql(query, config, basicSearch=basicSearch, start=start + 1, end=end - 1)
                else:
                    return self.term_query(query[start:end], config)
            if strOp in ('AND', 'OR'):
                resultLeft = self.translate_fcsql(query, config, basicSearch=basicSearch, start=start, end=iOpPos)
                resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp), end=end)
                if len(resultLeft) <= 0 or len(resultRight) <= 0:
                    raise Diagnostic(DiagnosticTypes.sru, 10)
                return self.binary_bool(strOp, resultLeft, resultRight, config)
            elif strOp == 'NOT':
                resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp),
                                                   end=end)
                return self.not_bool(resultRight, config)
            return {}
    
        def validate_query(self, operation, version, queryType, query,
                           xFcsEndpointDescription, xFcsContext,
                           xFcsDataviews, xFcsRewritesAllowed):
            """
            Check if the query parameters contain a valid combination of values.
            :param operation:
            :param version:
            :param queryType:
            :param query:
            :param xFcsEndpointDescription:
            :param xFcsContext:
            :param xFcsDataviews:
            :param xFcsRewritesAllowed:
            :return: Return a list of diagnostics describing problems with the query.
            If the query is prima facie valid and can be processed further, an empty
            list will be returned.
            """
            diagnostics = []
    
            # Check if additional parameters combine with the operation requested
            # (FCS specifications, 4.1)
            if len(xFcsEndpointDescription) > 0 and operation != Operation.explain:
                diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-endpoint-description'))
            if len(xFcsContext) > 0 and operation != Operation.searchRetrieve:
                diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-context'))
            if len(xFcsDataviews) > 0 and operation != Operation.searchRetrieve:
                diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-dataviews'))
            if len(xFcsRewritesAllowed) > 0 and operation != Operation.searchRetrieve:
                diagnostics.append(Diagnostic(DiagnosticTypes.sru, 8, details='x-fcs-rewrites-allowed'))
    
            return diagnostics
    
    
    if __name__ == '__main__':
        pass