Skip to content
Snippets Groups Projects
Commit 397e4286 authored by Arkhangelskiy, Timofey's avatar Arkhangelskiy, Timofey
Browse files

Start Litterae query parser (GET requests)

parent 0f505c26
Branches
No related tags found
No related merge requests found
from urllib.parse import quote
import re
import json
import copy
import random
import urllib.request
from .query_parser import QueryParser
from .config import ResourceConfig
from .diagnostics import Diagnostic, DiagnosticTypes
class LitteraeQueryParser(QueryParser):
"""
Parses search queries for Formulae, Litterae, Chartae.
"""
def build_get_string(self, getParams, config):
"""
Build a GET string (everything after the ?) from a description
of the GET parameters in the getParams list.
"""
termIndexes = self.term_indexes(getParams)
nWords = len(termIndexes)
boolOperatorMentioned = False
s = 'source=advanced&sort=urn&lemma_search=False&simple_search_id=' + str(random.randint(100000, 1000000))
for param in getParams:
nSfx = ''
if param[1] > 0:
nSfx = str(param[1])
if param[0] == 'bool_operator':
if boolOperatorMentioned:
continue
boolOperatorMentioned = True
s += '&' + param[0] + nSfx + '=' + quote(str(param[2]))
if param[0] == 'q_':
s += '&fuzziness_' + nSfx + '=0'
s += '&slop_' + nSfx + '=0'
s += '&in_order_' + nSfx + '=False'
s += '&search_field_' + nSfx + '=text'
s += '&exclude_q_' + nSfx + '='
return s
def term_query(self, query, config):
"""
Return list of query parameters for one term or sequence of terms.
"""
if len(query) >= 2 and query.startswith('"') and query.endswith('"'):
query = query[1:len(query)-1]
if len(query) <= 0:
return Diagnostic(DiagnosticTypes.sru, 10)
getParams = [['q_', 1, query]]
return getParams
def binary_bool(self, strOp, operandL, operandR, config):
if len(operandL) <= 0 or len(operandR) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
termsL = self.term_indexes(operandL)
operandR = self.shift_term_indexes(operandR, max(termsL))
termsR = self.term_indexes(operandR)
if strOp == 'AND':
getParamsNew = operandL + operandR
if any(p[0] == 'bool_operator' and p[2] == 'should' for p in getParamsNew):
message = 'FLC web interface does not support queries that ' \
'combine AND and OR.'
raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
return getParamsNew + [['bool_operator', -1, 'must']]
elif strOp == 'OR':
getParamsNew = operandL + operandR
if any(p[0] == 'bool_operator' and p[2] == 'must' for p in getParamsNew):
message = 'FLC web interface does not support queries that ' \
'combine AND and OR.'
raise Diagnostic(DiagnosticTypes.sru, 48, message=message)
return getParamsNew + [['bool_operator', -1, 'should']]
raise Diagnostic(DiagnosticTypes.sru, 37, details=strOp)
def not_bool(self, operand, config):
# TODO: implement
raise NotImplementedError()
def send_query(self, strGetParams: str, config: ResourceConfig):
"""
Send the translated query to the Litterae instance. Return JSON results
returned by the corpus.
"""
url = config.resource_base_url.strip('/') + '/results?' + strGetParams
response = urllib.request.urlopen(url)
data = response.read()
encoding = response.info().get_content_charset('utf-8')
responseHTML = data.decode(encoding)
return responseHTML
if __name__ == '__main__':
pass
from urllib.parse import quote
import re
import json
import html
from lxml.html import fragment_fromstring
from .enums import *
from .config import ResourceConfig
from .search_retrieve import Record
from .diagnostics import Diagnostic, DiagnosticTypes
class POSConvertor:
"""
Convert corpus-specific parts of speech / grammar tags to
UPOS, using regexes correspondences set in the config.
"""
def __init__(self, config: ResourceConfig):
self.posConvert = config.pos_convert
self.posTests = [(re.compile(k), v) for k, v in self.posConvert]
def convert_pos(self, pos):
"""
Convert corpus-specific POS tags to UPOS, if possible.
Ea
"""
for k, v in self.posTests:
if k.search(pos) is not None:
return v
return pos
class LitteraeResponseParser:
"""
Parses responses from a Litterae instance.
TODO: implement
"""
def __init__(self):
self.pc = None # POS convertor, rebuilt with each parse call
def parse_annotation(self, anno, segID, record):
"""
Parse HTML annotation for one word taken from a hit.
Add the data to the layers in the record object.
"""
annoTree = fragment_fromstring(anno,
create_parent='div')
lemmas = set()
lemmasStr = '_'
pos = set()
posStr = '_'
lexNodes = annoTree.xpath('div[@class="popup_word"]/div[@class="popup_ana"]/span[@class="popup_lex"]')
for node in lexNodes:
if node.text is not None:
lemmas.add(node.text)
if len(lemmas) > 0:
lemmasStr = '|'.join(l for l in sorted(lemmas))
posNodes = annoTree.xpath('div[@class="popup_word"]/div[@class="popup_ana"]/span[@class="popup_pos"]')
for node in posNodes:
if node.text is not None:
posText = re.sub('&nbsp;|[ \t\ufeff]+', '', node.text)
posText = self.pc.convert_pos(posText)
pos.add(posText)
if len(pos) > 0:
posStr = '|'.join(p for p in sorted(pos))
if 'pos' not in record.layers:
record.layers['pos'] = []
record.layers['pos'].append({
'ref': segID,
'value': posStr
})
if 'lemma' not in record.layers:
record.layers['lemma'] = []
record.layers['lemma'].append({
'ref': segID,
'value': lemmasStr
})
def parse_span(self, el, record, advancedHits=False):
"""
Parse one <span> element from the HTML representation
of one hit returned by a Litterae instance. Add the extracted
text to the record object.
"""
if el.tag == 'span' and 'class' in el.attrib and 'sentence_meta' in el.attrib['class']:
# This is the introductory span that only contains the header
# (title, author etc.)
if el.tail is not None:
record.text += el.tail.strip('\n\t ')
return
if el.text is not None:
bMatch = False
if 'class' in el.attrib and re.search('\\bword\\b', el.attrib['class']) is not None:
if re.search('\\bwmatch\\b', el.attrib['class']) is not None:
bMatch = True
record.textNoHighlight += el.text
if advancedHits:
segID = 's' + str(len(record.segments))
segment = {
'id': segID,
'start': len(record.textNoHighlight) + 1,
'end': len(record.textNoHighlight) + len(el.text)
}
record.segments.append(segment)
if 'data-ana' in el.attrib:
self.parse_annotation(el.attrib['data-ana'], segID, record)
if bMatch:
record.text += '<hits:Hit>' + el.text + '</hits:Hit>'
else:
record.text += el.text
if el.tail is not None:
record.text += el.tail
record.textNoHighlight += el.tail
def parse_context(self, hit, config: ResourceConfig, lang='', advancedHits=False):
"""
Parse one hit. Return it as a Record object.
"""
record = Record(advancedHits=advancedHits)
if len(lang) <= 0:
lang = config.search_lang_id
if ('languages' not in hit
or lang not in hit['languages']
or 'text' not in hit['languages'][lang]):
return record
contentTxt = re.sub('[\r\n\t\ufeff]+', '', hit['languages'][lang]['text'], flags=re.DOTALL)
print(contentTxt)
content = fragment_fromstring(contentTxt,
create_parent='div')
for el in content:
self.parse_span(el, record, advancedHits)
return record
def parse(self, response, config: ResourceConfig, xFcsDataviews, lang=''):
"""
Read a dictionary with the first N hits returned by a Litterae
instance. Return a list of Record objects and the total number of
records found.
"""
self.pc = POSConvertor(config)
diagnostics = []
advancedHits = False
dataViewsRequested = {v.strip() for v in xFcsDataviews.split(',') if len(v.strip()) > 0}
if 'adv' in dataViewsRequested:
advancedHits = True
for v in dataViewsRequested:
if v not in ('hits', 'adv'):
diagnostics.append(Diagnostic(DiagnosticTypes.fcs, 4, details=v))
nRecords = 0
if 'n_sentences' in response:
nRecords = response['n_sentences']
if nRecords <= 0 or 'contexts' not in response:
return [], nRecords
records = []
for context in response['contexts']:
records.append(self.parse_context(context, config, lang, advancedHits))
return records, nRecords
if __name__ == '__main__':
pass
from .enums import *
from .diagnostics import Diagnostic
from .config import ResourceConfig
import re
import copy
class QueryParser:
......@@ -41,6 +43,107 @@ class QueryParser:
return i, 'OR'
return -1, ''
@staticmethod
def shift_term_indexes(getParams, shift):
"""
Increase all search term indexes in the GET parameters
specified by getParams by shift.
"""
getParamsShifted = []
for param in getParams:
if type(param[2]) is int:
newParam = (param[0], param[1] + shift, param[2] + shift)
elif param[1] >= 0:
newParam = (param[0], param[1] + shift, param[2])
else:
newParam = copy.deepcopy(param)
getParamsShifted.append(newParam)
return getParamsShifted
@staticmethod
def term_indexes(getParams):
"""
Find all search term indexes used in the GET parameters
specified by getParams list. Return list of integers (1-based).
"""
terms = set()
for param in getParams:
if type(param[1]) is int:
terms.add(param[1])
elif type(param[1]) is list:
for t in param[1]:
terms.add(t)
return [t for t in sorted(terms)]
def build_get_string(self, getParams, config):
# Abstract function
raise NotImplementedError()
def term_query(self, query, config):
# Abstract function
raise NotImplementedError()
def binary_bool(self, strOp, operandL, operandR, config):
# Abstract function
raise NotImplementedError()
def not_bool(self, operand, config):
# Abstract function
raise NotImplementedError()
def translate_fcsql(self, query: str, config: ResourceConfig, basicSearch: bool = False, start=0, end=-1):
"""
Translate an FCS-QL query into a corpus-specific query (GET query,
JSON Elasticsearch query or whatever).
If something is wrong with the query, raise a Diagnostic exception.
This is a top-level platform-independent function. It recursively
parses the query by locating the hierarchically highest logical operator
in the current query and then calling a respective lower-level
function, which may be platform-specific.
The function is recursive and only looks at the part of the string
delimited by start and end parameters.
"""
if end == -1:
# Top-level call, so return a finalized corpus-specific query
end = len(query)
if end == 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if self.rxTermQuery.search(query) is not None:
return self.build_get_string(self.term_query(query, config), config)
return self.build_get_string(self.translate_fcsql(query, config,
basicSearch=basicSearch,
start=start, end=end),
config)
# if query.count('(') != query.count(')'):
# return None
if len(query) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if start >= len(query) - 1 or end <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
while start < len(query) and query[start] in ' \t\n':
start += 1
while end > 0 and query[end - 1] in ' \t\n':
end -= 1
if start >= end:
raise Diagnostic(DiagnosticTypes.sru, 10)
iOpPos, strOp = self.find_operator(query, start, end)
if iOpPos == -1:
if query[start] == '(' and query[end - 1] == ')':
return self.translate_fcsql(query, config, basicSearch=basicSearch, start=start + 1, end=end - 1)
else:
return self.term_query(query[start:end], config)
if strOp in ('AND', 'OR'):
resultLeft = self.translate_fcsql(query, config, basicSearch=basicSearch, start=start, end=iOpPos)
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp), end=end)
if len(resultLeft) <= 0 or len(resultRight) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
return self.binary_bool(strOp, resultLeft, resultRight, config)
elif strOp == 'NOT':
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp),
end=end)
return self.not_bool(resultRight, config)
return {}
def validate_query(self, operation, version, queryType, query,
xFcsEndpointDescription, xFcsContext,
xFcsDataviews, xFcsRewritesAllowed):
......
......@@ -54,37 +54,6 @@ class TsakorpusQueryParser(QueryParser):
getParams.append(['word_dist_to_', iTerm, '1'])
return getParams
def term_indexes(self, getParams):
"""
Find all search term indexes used in the GET parameters
specified by getParams list. Return list of integers (1-based).
"""
terms = set()
for param in getParams:
if type(param[1]) is int:
terms.add(param[1])
elif type(param[1]) is list:
for t in param[1]:
terms.add(t)
return [t for t in sorted(terms)]
def shift_term_indexes(self, getParams, shift):
"""
Increase all search term indexes in the GET parameters
specified by getParams by shift.
"""
getParamsShifted = []
for param in getParams:
if type(param[2]) is int:
newParam = (param[0], param[1] + shift, param[2] + shift)
else:
newParam = (param[0], param[1] + shift, param[2])
getParamsShifted.append(newParam)
return getParamsShifted
def binary_bool(self, strOp, operandL, operandR, config):
if len(operandL) <= 0 or len(operandR) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
......@@ -111,53 +80,9 @@ class TsakorpusQueryParser(QueryParser):
return getParamsNew
raise Diagnostic(DiagnosticTypes.sru, 37, details=strOp)
def translate_fcsql(self, query: str, config: ResourceConfig, basicSearch: bool = False, start=0, end=-1):
"""
Translate an FCS-QL query into a Tsakorpus GET query.
If something is wrong with the query, raise a Diagnostic exception.
The function is recursive and only looks at the part of the string
delimited by start and end parameters.
"""
if end == -1:
# Top-level call, so return a finalized GET string
end = len(query)
if end == 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if self.rxTermQuery.search(query) is not None:
return self.build_get_string(self.term_query(query, config), config)
return self.build_get_string(self.translate_fcsql(query, config,
basicSearch=basicSearch,
start=start, end=end),
config)
# if query.count('(') != query.count(')'):
# return None
if len(query) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 27)
if start >= len(query) - 1 or end <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
while start < len(query) and query[start] in ' \t\n':
start += 1
while end > 0 and query[end - 1] in ' \t\n':
end -= 1
if start >= end:
raise Diagnostic(DiagnosticTypes.sru, 10)
iOpPos, strOp = self.find_operator(query, start, end)
if iOpPos == -1:
if query[start] == '(' and query[end - 1] == ')':
return self.translate_fcsql(query, config, basicSearch=basicSearch, start=start + 1, end=end - 1)
else:
return self.term_query(query[start:end], config)
if strOp in ('AND', 'OR'):
resultLeft = self.translate_fcsql(query, config, basicSearch=basicSearch, start=start, end=iOpPos)
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp), end=end)
if len(resultLeft) <= 0 or len(resultRight) <= 0:
raise Diagnostic(DiagnosticTypes.sru, 10)
return self.binary_bool(strOp, resultLeft, resultRight, config)
elif strOp == 'NOT':
resultRight = self.translate_fcsql(query, config, basicSearch=basicSearch, start=iOpPos + len(strOp),
end=end)
return self.not_bool(resultRight)
return {}
def not_bool(self, operand, config):
# TODO: implement
raise NotImplementedError()
def send_query(self, strGetParams: str, config: ResourceConfig):
"""
......
{
"host": "0.0.0.0",
"port": "80",
"max_hits": 8,
"platform": "litterae",
"resource_base_url": "https://werkstatt.formulae.uni-hamburg.de/search/",
"pos_convert": [
]
}
\ No newline at end of file
......@@ -4,6 +4,8 @@ from fastapi.templating import Jinja2Templates
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from common.query_parser import QueryParser
from common.litterae_query_parser import LitteraeQueryParser
from common.litterae_response_parser import LitteraeResponseParser
from common.tsakorpus_query_parser import TsakorpusQueryParser
from common.tsakorpus_response_parser import TsakorpusResponseParser
from common.enums import *
......@@ -19,6 +21,9 @@ app.mount('/static', StaticFiles(directory='static'), name='static')
templates = Jinja2Templates(directory='static')
app.qp = QueryParser()
app.qp_litterae = LitteraeQueryParser()
app.rp_litterae = LitteraeResponseParser()
app.qp_tsakorpus = TsakorpusQueryParser()
app.rp_tsakorpus = TsakorpusResponseParser()
app.configs = read_configs()
......@@ -85,6 +90,24 @@ def endpoint(
'records': records
},
media_type='application/xml')
elif config.platform == CorpPlatform.litterae:
try:
strGetParams = app.qp_litterae.translate_fcsql(query, config)
print(strGetParams)
return strGetParams
res = app.qp_litterae.send_query(strGetParams, config)
except Diagnostic as diag:
print('diag', str(diag))
return Response(content=str(diag), media_type='application/xml')
records, nHits = app.rp_litterae.parse(res, config, xFcsDataviews)
records = [r.as_dict() for r in records]
return templates.TemplateResponse('search_retrieve_response.xml',
{
'request': request,
'n_hits': nHits,
'records': records
},
media_type='application/xml')
# return str(res)
return {'operation': operation, 'version': version}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment