-
AndiMajore authoredAndiMajore authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
keypathwayminer_task.py 7.37 KiB
import base64
import datetime
import json
import random
import string
import time
from os.path import join
import requests
from tasks.task_hook import TaskHook
from drugstone.models import Protein, EnsemblGene
# Base URL
# url = 'http://172.25.0.1:9003/keypathwayminer/requests/'
url = 'https://exbio.wzw.tum.de/keypathwayminer/requests/'
attached_to_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=32))
def send_request(sub_url, data):
"""
Send a POST request with form-data to a given sub-URL and retrieve the JSON response.
Throws a RuntimeError if there was an error while submitting
:param sub_url: Sub-URL to send the POST request to
:param data: Data dictionary that is sent via the POST request
:return: JSON-object from the server request
"""
request_url = join(url, sub_url)
try:
response = requests.post(url=request_url, data=data)
except Exception as e:
print(e)
# Check if submitting the job was successful
if response.status_code != 200:
raise RuntimeError(f'KPM server response code was "{response.status_code}", expected "200".')
try:
response_json = response.json()
except json.decoder.JSONDecodeError:
raise RuntimeError(f'The response could not be decoded as JSON, please check the URL:\n{request_url}')
return response_json
def kpm_task(task_hook: TaskHook):
"""
Run KeyPathwayMiner on given proteins and parameters remotely using the RESTful API of KPM-web
Updates status of the TaskHook by polling the KPM-web server every second
Writes results back to the TaskHook as 'networks'.
:param task_hook: Needs to have 'k' set as a parameter (str or int) and a list of proteins set
:return: None
"""
# --- Fetch and generate the datasets
dataset_name = 'indicatorMatrix'
indicator_matrix_string = ''
id_space = task_hook.parameters["config"].get("identifier", "symbol")
proteins = []
if id_space == 'symbol':
proteins = Protein.objects.filter(gene__in=task_hook.seeds)
elif id_space == 'entrez':
proteins = Protein.objects.filter(entrez__in=task_hook.seeds)
elif id_space == 'uniprot':
proteins = Protein.objects.filter(uniprot_code__in=task_hook.seeds)
elif id_space == 'ensg':
protein_ids = {ensg.protein_id for ensg in EnsemblGene.objects.filter(name__in=task_hook.seeds)}
proteins = Protein.objects.filter(id__in=protein_ids)
protein_backend_ids = {p.id for p in proteins}
for protein in proteins:
indicator_matrix_string += f'{protein.uniprot_code}\t1\n'
datasets = [
{
'name': dataset_name,
'attachedToID': attached_to_id,
'contentBase64': base64.b64encode(indicator_matrix_string.encode('UTF-8')).decode('ascii')
}
]
datasets_data = json.dumps(datasets)
# --- Generate KPM settings
k_val = str(task_hook.parameters['k'])
kpm_settings = {
'parameters': {
'name': f'Drugstone run on {datetime.datetime.now()}',
'algorithm': 'Greedy',
'strategy': 'INES',
'removeBENs': 'true',
'unmapped_nodes': 'Add to negative list',
'computed_pathways': 1,
'graphID': 27,
'l_samePercentage': 'false',
'samePercentage_val': 0,
'k_values': {
'val': k_val,
'val_step': '1',
'val_max': k_val,
'use_range': 'false',
'isPercentage': 'false'
},
'l_values': {
'val': '0',
'val_step': '1',
'val_max': '0',
'use_range': 'false',
'isPercentage': 'false',
'datasetName': dataset_name
}
},
'withPerturbation': 'false',
'perturbation': [
{
'technique': 'Node-swap',
'startPercent': '5',
'stepPercent': '1',
'maxPercent': '15',
'graphsPerStep': '1'
}
],
'linkType': 'OR',
'attachedToID': attached_to_id,
'positiveNodes': '',
'negativeNodes': ''
}
kpm_settings_data = json.dumps(kpm_settings)
# --- Submit kpm job asynchronously
kpm_job_data = {'kpmSettings': kpm_settings_data,
'datasets': datasets_data}
submit_json = send_request('submitAsync', kpm_job_data)
# Check if the submission was correct (add check whether parameters were correct)
if not submit_json["success"]:
print(f'Job submission failed. Server response:\n{submit_json}')
raise RuntimeError(f'Job submission failed. Server response:\n{submit_json}')
# Obtain questID for getting the result
quest_id_data = {'questID': submit_json['questID']}
# print(submit_json["resultUrl"]) # Remove in production
# --- Retrieve status and update task_hook every 1s
old_progress = -1
while True:
# Get status of job
status_json = send_request('runStatus', quest_id_data)
# Check if the questID exists (should)
if not status_json['runExists']:
raise RuntimeError(f'Job status retrieval failed. Run does not exist:\n{status_json}')
# Set progress only when it changed
progress = status_json['progress']
if old_progress != progress:
task_hook.set_progress(progress=progress, status='')
old_progress = progress
# Stop and go to results
if status_json['completed'] or status_json['cancelled']:
break
time.sleep(1)
# --- Retrieve results and write back
results_json = send_request('results', quest_id_data)
if not results_json['success']:
raise RuntimeError(f'Job terminated but was unsuccessful:\n{results_json}')
graphs_json = results_json['resultGraphs']
# Build the networks
network = None
# Only build networks if the result is not empty
if graphs_json:
for graph in graphs_json:
# Ignore the union set
if graph['isUnionSet']:
continue
# Add nodes
nodes = []
for node in graph['nodes']:
nodes.append(node['name'])
# Add edges
edges = []
for edge in graph['edges']:
edges.append({'from': edge['source'], 'to': edge['target']})
# Add nodes and edges to network
network = {'nodes': nodes, 'edges': edges}
# Remapping everything from UniProt Accession numbers to internal IDs
result_nodes = Protein.objects.filter(uniprot_code__in=network["nodes"])
node_map = {}
for node in result_nodes:
node_map[node.uniprot_code] = node.id
network["nodes"] = list(map(lambda uniprot: "p" + str(node_map[uniprot]), network["nodes"]))
network["edges"] = list(map(
lambda uniprot_edge: {"from": "p" + str(node_map[uniprot_edge["from"]]),
"to": "p" + str(node_map[uniprot_edge["to"]])},
network["edges"]))
node_types = {node: "protein" for node in network["nodes"]}
is_seed = {node: node in set(map(lambda p: "p"+str(p),protein_backend_ids)) for node in network["nodes"]}
result_dict = {
"network": network,
"node_attributes": {"node_types": node_types, "is_seed": is_seed}
}
task_hook.set_results(results=result_dict)