Skip to content
Snippets Groups Projects
Commit 7b6550fc authored by Hartung, Michael's avatar Hartung, Michael
Browse files

renaming netex to drugstone and celery integration (with flower and celery-beat)

Former-commit-id: 052fe34e6eded1074e13f95dfc24e3974daaf86d [formerly 5990bcf48d8adfc0e903da2e7a7f156728e1520f]
Former-commit-id: f2342f0a48d6d3ca54333c04f7cf6c633fb0359a
parent c1279300
No related branches found
No related tags found
No related merge requests found
Showing with 240 additions and 195 deletions
.idea/ .idea/
.env/
venv/ venv/
*.pyc *.pyc
*.pydevproject *.pydevproject
...@@ -8,4 +9,8 @@ db.sqlite3 ...@@ -8,4 +9,8 @@ db.sqlite3
supervisord.log supervisord.log
supervisord.log supervisord.log
supervisord.pid supervisord.pid
docker-entrypoint.lock docker-entrypoint.lock
\ No newline at end of file celerybeat-schedule.bak
celerybeat-schedule.dat
celerybeat-schedule.dir
docker-django.env.prod
\ No newline at end of file
...@@ -22,11 +22,11 @@ RUN pip install -r /usr/src/drugstone/requirements.txt ...@@ -22,11 +22,11 @@ RUN pip install -r /usr/src/drugstone/requirements.txt
RUN pip install gunicorn RUN pip install gunicorn
COPY ./supervisord.conf /etc/supervisor/conf.d/supervisord.conf COPY ./supervisord.conf /etc/supervisor/conf.d/supervisord.conf
COPY ./docker-entrypoint.sh /entrypoint.sh COPY ./docker-entrypoint.sh /usr/src/drugstone/docker-entrypoint.sh
COPY ./import-data.sh /import.sh # COPY ./scripts/ /usr/src/drugstone/scripts/
COPY . /usr/src/drugstone/ COPY . /usr/src/drugstone/
#EXPOSE 8000 #EXPOSE 8000
ENTRYPOINT ["sh", "/entrypoint.sh"] # ENTRYPOINT ["sh", "/entrypoint.sh"]
...@@ -25,4 +25,4 @@ python3 manage.py make_graphs ...@@ -25,4 +25,4 @@ python3 manage.py make_graphs
### Docker DEV environment (building is optional) ### Docker DEV environment (building is optional)
``docker-compose -f docker-compose.yml -f docker-compose.dev.yml up -d --build`` ``docker-compose -f docker-compose.yml up -d --build``
version: '3.3' version: '3.8'
services: services:
app: app:
image: drugstone_backend image: drugstone_backend
container_name: drugstone_backend container_name: drugstone_backend
command:
- "sh"
- "/usr/src/drugstone/docker-entrypoint.sh"
build: . build: .
env_file: env_file:
- 'docker-django.env' - 'docker-django.env.dev'
restart: always restart: always
volumes: volumes:
- drugstone_usr_volume:/usr/src/drugstone/data_drugstone/ - ./:/usr/src/drugstone/
ports: ports:
- 8001:8000 - 8001:8000
networks: networks:
...@@ -32,49 +35,66 @@ services: ...@@ -32,49 +35,66 @@ services:
- POSTGRES_DB=drugstone - POSTGRES_DB=drugstone
- POSTGRES_USER=drugstone - POSTGRES_USER=drugstone
- POSTGRES_PASSWORD=t6278yczAH7rPKVMxaDD - POSTGRES_PASSWORD=t6278yczAH7rPKVMxaDD
command: command:
- "postgres" - "postgres"
- "-c" - "-c"
- "max_connections=10000" - "max_connections=10000"
- "-c" - "-c"
- "shared_buffers=2GB" - "shared_buffers=2GB"
redis: redis:
image: redis image: redis
container_name: drugstone_redis container_name: drugstone_redis
hostname: drugstone_redis
restart: always restart: always
command: ["redis-server"] command: ["redis-server"]
networks: networks:
- drugstone_net - drugstone_net
ports:
- 6379:6379
celery: celery:
command:
- "sh"
- "/usr/src/drugstone/scripts/start_celery_worker.sh"
restart: always restart: always
build: . build: .
container_name: drugstone_celery container_name: drugstone_celery
hostname: drugstone_celery hostname: drugstone_celery
command: celery -A core worker -l info env_file:
- './docker-django.env.dev'
volumes: volumes:
- ./:/usr/src/app/ - ./:/usr/src/drugstone/
depends_on: depends_on:
- redis - redis
- db - db
networks: networks:
- drugstone_net - drugstone_net
celery-beat: celery-beat:
command:
- "sh"
- "/usr/src/drugstone/scripts/start_celery_beat.sh"
build: . build: .
container_name: drugstone_celery_beat container_name: drugstone_celery_beat
hostname: drugstone_celery_beat hostname: drugstone_celery_beat
command: celery -A core beat -l info env_file:
- './docker-django.env.dev'
volumes: volumes:
- ./:/usr/src/app/ - ./:/usr/src/drugstone/
depends_on: depends_on:
- redis - redis
- db - db
networks: networks:
- drugstone_net - drugstone_net
flower:
image: mher/flower
container_name: drugstone_flower
env_file:
- './docker-django.env.dev'
ports:
- 8888:8888
networks:
- drugstone_net
networks: networks:
drugstone_net: drugstone_net:
volumes: volumes:
drugstone_postgres_volume: drugstone_postgres_volume:
drugstone_usr_volume: \ No newline at end of file
\ No newline at end of file
DEBUG=1
SECRET_KEY="0&y9v0@9%@c^woz8m+h2(^$#3gd^c@d82kmmq8tu*nesc_x9i+"
SUPERUSER_NAME=admin
SUPERUSER_PASS=Mb2R7CbqAPbpaKawKg7Z
SQL_ENGINE=django.db.backends.postgresql
SQL_DATABASE=drugstone
SQL_USER=drugstone
SQL_PASSWORD=t6278yczAH7rPKVMxaDD
SQL_HOST=drugstone_postgres
SQL_PORT=5432
REDIS_HOST=redis
REDIS_PORT=6379
DJANGO_SETTINGS_MODULE=drugstone.settings
CELERY_BROKER_URL=redis://redis:6379/0
FLOWER_PORT=8888
FLOWER_BASIC_AUTH=drugstone:test
\ No newline at end of file
#!/bin/bash #!/bin/bash
python3 manage.py migrate --run-syncdb
python3 manage.py createfixtures
python3 manage.py cleanuptasks
file="docker-entrypoint.lock" file="docker-entrypoint.lock"
# exit if entrypoint.lock exists to prevent new import of data every time docker is restarted # exit if entrypoint.lock exists to prevent new import of data every time docker is restarted
if ! test -f "$file"; then if ! test -f "$file"; then
sh import-data.sh python3 manage.py createfixtures
python3 manage.py cleanuptasks
sh scripts/import-data.sh
touch $file touch $file
fi fi
......
from .wsgi import application # noqa: F401 from drugstone.wsgi import application # noqa: F401
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from drugstone.celery import app as celery_app
__all__ = ['celery_app']
\ No newline at end of file
import json
from datetime import datetime
import redis
import rq
import os
from tasks.task_hook import TaskHook
qr_r = redis.Redis(host=os.getenv('REDIS_HOST', 'redis'),
port=os.getenv('REDIS_PORT', 6379),
db=0,
decode_responses=False)
rq_tasks = rq.Queue('drugstone_tasks', connection=qr_r)
r = redis.Redis(host=os.getenv('REDIS_HOST', 'redis'),
port=os.getenv('REDIS_PORT', 6379),
db=0,
decode_responses=True)
def run_task(token, algorithm, parameters):
def set_progress(progress, status):
r.set(f'{token}_progress', f'{progress}')
r.set(f'{token}_status', f'{status}')
def set_result(results):
r.set(f'{token}_result', json.dumps(results, allow_nan=True))
r.set(f'{token}_finished_at', str(datetime.now().timestamp()))
r.set(f'{token}_done', '1')
set_progress(1.0, 'Done.')
set_progress(0.0, 'Computation started')
worker_id = os.getenv('RQ_WORKER_ID')
r.set(f'{token}_worker_id', f'{worker_id}')
job_id = os.getenv('RQ_JOB_ID')
r.set(f'{token}_job_id', f'{job_id}')
r.set(f'{token}_started_at', str(datetime.now().timestamp()))
task_hook = TaskHook(json.loads(parameters), './data_drugstone/Networks/', set_progress, set_result)
try:
if algorithm == 'dummy':
raise RuntimeError('Dummy algorithm for testing purposes.')
elif algorithm == 'multisteiner':
from tasks.multi_steiner import multi_steiner
multi_steiner(task_hook)
elif algorithm == 'keypathwayminer':
from tasks.keypathwayminer_task import kpm_task
kpm_task(task_hook)
elif algorithm == 'trustrank':
from tasks.trust_rank import trust_rank
trust_rank(task_hook)
elif algorithm == 'closeness':
from tasks.closeness_centrality import closeness_centrality
closeness_centrality(task_hook)
elif algorithm == 'degree':
from tasks.degree_centrality import degree_centrality
degree_centrality(task_hook)
elif algorithm == 'proximity':
from tasks.network_proximity import network_proximity
network_proximity(task_hook)
elif algorithm == 'betweenness':
from tasks.betweenness_centrality import betweenness_centrality
betweenness_centrality(task_hook)
elif algorithm in ['quick', 'super']:
from tasks.quick_task import quick_task
quick_task(task_hook)
except Exception as e:
r.set(f'{token}_status', f'{e}')
r.set(f'{token}_failed', '1')
def refresh_from_redis(task):
task.worker_id = r.get(f'{task.token}_worker_id')
if not task.worker_id:
return
task.job_id = r.get(f'{task.token}_job_id')
task.progress = float(r.get(f'{task.token}_progress'))
task.done = True if r.get(f'{task.token}_done') else False
task.failed = True if r.get(f'{task.token}_failed') else False
status = r.get(f'{task.token}_status')
if not status or len(status) < 255:
task.status = status
else:
task.status = status[:255]
started_at = r.get(f'{task.token}_started_at')
if started_at:
task.started_at = datetime.fromtimestamp(float(started_at))
finished_at = r.get(f'{task.token}_finished_at')
if finished_at:
task.finished_at = datetime.fromtimestamp(float(finished_at))
task.result = r.get(f'{task.token}_result')
def start_task(task):
job = rq_tasks.enqueue(run_task, task.token, task.algorithm, task.parameters, job_timeout=30*60)
task.job_id = job.id
def task_stats(task):
pos = 1
for j in rq_tasks.jobs:
if j.id == task.job_id:
break
pos += 1
return {
'queueLength': rq_tasks.count,
'queuePosition': pos,
}
def task_result(task):
if not task.done:
return None
return json.loads(task.result, parse_constant=lambda c: None)
def task_parameters(task):
return json.loads(task.parameters)
...@@ -3,8 +3,8 @@ import os ...@@ -3,8 +3,8 @@ import os
from celery import Celery from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "drugstone.settings")
app = Celery("core") app = Celery("drugstone")
app.config_from_object("django.conf:settings", namespace="CELERY") app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks() app.autodiscover_tasks()
\ No newline at end of file
from celery import shared_task
from celery.utils.log import get_task_logger
from drugstone.util.nedrex import fetch_nedrex_data, integrate_nedrex_data
logger = get_task_logger(__name__)
def task_update_db_from_nedrex():
logger.info("Updating DB from NeDRex.")
logger.info("Fetching data...")
fetch_nedrex_data()
logger.info("Integrating data...")
integrate_nedrex_data()
logger.info("Done.")
from .settings import *
\ No newline at end of file
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"update_db": {
"task": "drugstone.tasks.task_update_db_from_nedrex",
"schedule": crontab(minute="*/1000"),
},
}
\ No newline at end of file
...@@ -11,9 +11,7 @@ https://docs.djangoproject.com/en/3.0/ref/settings/ ...@@ -11,9 +11,7 @@ https://docs.djangoproject.com/en/3.0/ref/settings/
""" """
import os import os
from .celery_schedule import *
# celery beat
from celery.schedules import crontab
# Build paths inside the project like this: os.path.join(BASE_DIR, ...) # Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
...@@ -23,20 +21,15 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ...@@ -23,20 +21,15 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/ # See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret! # SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.environ.get('SECRET_KEY', '0&y9v0@9%@c^woz8m+h2(^$#3gd^c@d82kmmq8tu*nesc_x9i+') SECRET_KEY = os.environ.get('SECRET_KEY')
# SECURITY WARNING: don't run with debug turned on in production! # SECURITY WARNING: don't run with debug turned on in production!
DEBUG = os.environ.get('DEBUG', True) DEBUG = os.environ.get('DEBUG', False)
ALLOWED_HOSTS = [ ALLOWED_HOSTS = [
'www.exbio.wzw.tum.de',
'exbio.wzw.tum.de',
'10.162.163.32', # oskar
'localhost', 'localhost',
'127.0.0.1', '127.0.0.1',
'10.162.163.20', # alfred 'drugstone-backend.zbh.uni-hamburg.de',
'ml-s-zbhdock1.ad.uni-hamburg.de',
'cosy-test.zbh.uni-hamburg.de',
'drugst.one' 'drugst.one'
] ]
...@@ -94,11 +87,11 @@ WSGI_APPLICATION = 'drugstone.wsgi.application' ...@@ -94,11 +87,11 @@ WSGI_APPLICATION = 'drugstone.wsgi.application'
DATABASES = { DATABASES = {
'default': { 'default': {
'ENGINE': os.environ.get('SQL_ENGINE', 'django.db.backends.postgresql'), 'ENGINE': os.environ.get('SQL_ENGINE', 'django.db.backends.postgresql'),
'NAME': os.environ.get('SQL_DATABASE', 'drugstone'), # os.path.join(BASE_DIR, 'db.sqlite3') 'NAME': os.environ.get('SQL_DATABASE'),
'USER': os.environ.get('SQL_USER', 'drugstone'), 'USER': os.environ.get('SQL_USER'),
'PASSWORD': os.environ.get('SQL_PASSWORD', 't6278yczAH7rPKVMxaDD'), 'PASSWORD': os.environ.get('SQL_PASSWORD'),
'HOST': os.environ.get('SQL_HOST', 'drugstone_postgres'), 'HOST': os.environ.get('SQL_HOST'),
'PORT': os.environ.get('SQL_PORT', '5432'), 'PORT': os.environ.get('SQL_PORT'),
} }
} }
...@@ -165,20 +158,14 @@ SESSION_ENGINE = 'django.contrib.sessions.backends.cache' ...@@ -165,20 +158,14 @@ SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
CACHES = { CACHES = {
'default': { 'default': {
'BACKEND': 'django_redis.cache.RedisCache', 'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': f'redis://{os.environ.get("REDIS_HOST", "drugstone_redis")}: \ 'LOCATION': f'redis://{os.environ.get("REDIS_HOST")}: \
{os.environ.get("REDIS_PORT", "6379")}/1', {os.environ.get("REDIS_PORT")}/1',
'OPTIONS': { 'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient', 'CLIENT_CLASS': 'django_redis.client.DefaultClient',
} }
} }
} }
CELERY_BROKER_URL = "redis://redis:6379" CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL")
CELERY_RESULT_BACKEND = "redis://redis:6379" # timezones: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_BEAT_SCHEDULE = {
"update_db": {
"task": "control.celery_tasks.task_update_db_from_nedrex",
"schedule": crontab(minute="*/3"),
}
}
drugstone/tasks.py 100755 → 100644
import json from celery import shared_task
from datetime import datetime from celery.utils.log import get_task_logger
from drugstone.util.nedrex import fetch_nedrex_data, integrate_nedrex_data
import redis logger = get_task_logger(__name__)
import rq
import os
from tasks.task_hook import TaskHook @shared_task
def task_update_db_from_nedrex():
logger.info("Updating DB from NeDRex.")
print('here')
logger.info("Fetching data...")
fetch_nedrex_data()
qr_r = redis.Redis(host=os.getenv('REDIS_HOST', 'drugstone_redis'), logger.info("Integrating data...")
port=os.getenv('REDIS_PORT', 6379), integrate_nedrex_data()
db=0, logger.info("Done.")
decode_responses=False)
rq_tasks = rq.Queue('drugstone_tasks', connection=qr_r)
r = redis.Redis(host=os.getenv('REDIS_HOST', 'drugstone_redis'),
port=os.getenv('REDIS_PORT', 6379),
db=0,
decode_responses=True)
def run_task(token, algorithm, parameters):
def set_progress(progress, status):
r.set(f'{token}_progress', f'{progress}')
r.set(f'{token}_status', f'{status}')
def set_result(results):
r.set(f'{token}_result', json.dumps(results, allow_nan=True))
r.set(f'{token}_finished_at', str(datetime.now().timestamp()))
r.set(f'{token}_done', '1')
set_progress(1.0, 'Done.')
set_progress(0.0, 'Computation started')
worker_id = os.getenv('RQ_WORKER_ID')
r.set(f'{token}_worker_id', f'{worker_id}')
job_id = os.getenv('RQ_JOB_ID')
r.set(f'{token}_job_id', f'{job_id}')
r.set(f'{token}_started_at', str(datetime.now().timestamp()))
task_hook = TaskHook(json.loads(parameters), './data_drugstone/Networks/', set_progress, set_result)
try:
if algorithm == 'dummy':
raise RuntimeError('Dummy algorithm for testing purposes.')
elif algorithm == 'multisteiner':
from tasks.multi_steiner import multi_steiner
multi_steiner(task_hook)
elif algorithm == 'keypathwayminer':
from tasks.keypathwayminer_task import kpm_task
kpm_task(task_hook)
elif algorithm == 'trustrank':
from tasks.trust_rank import trust_rank
trust_rank(task_hook)
elif algorithm == 'closeness':
from tasks.closeness_centrality import closeness_centrality
closeness_centrality(task_hook)
elif algorithm == 'degree':
from tasks.degree_centrality import degree_centrality
degree_centrality(task_hook)
elif algorithm == 'proximity':
from tasks.network_proximity import network_proximity
network_proximity(task_hook)
elif algorithm == 'betweenness':
from tasks.betweenness_centrality import betweenness_centrality
betweenness_centrality(task_hook)
elif algorithm in ['quick', 'super']:
from tasks.quick_task import quick_task
quick_task(task_hook)
except Exception as e:
r.set(f'{token}_status', f'{e}')
r.set(f'{token}_failed', '1')
def refresh_from_redis(task):
task.worker_id = r.get(f'{task.token}_worker_id')
if not task.worker_id:
return
task.job_id = r.get(f'{task.token}_job_id')
task.progress = float(r.get(f'{task.token}_progress'))
task.done = True if r.get(f'{task.token}_done') else False
task.failed = True if r.get(f'{task.token}_failed') else False
status = r.get(f'{task.token}_status')
if not status or len(status) < 255:
task.status = status
else:
task.status = status[:255]
started_at = r.get(f'{task.token}_started_at')
if started_at:
task.started_at = datetime.fromtimestamp(float(started_at))
finished_at = r.get(f'{task.token}_finished_at')
if finished_at:
task.finished_at = datetime.fromtimestamp(float(finished_at))
task.result = r.get(f'{task.token}_result')
def start_task(task):
job = rq_tasks.enqueue(run_task, task.token, task.algorithm, task.parameters, job_timeout=30*60)
task.job_id = job.id
def task_stats(task):
pos = 1
for j in rq_tasks.jobs:
if j.id == task.job_id:
break
pos += 1
return {
'queueLength': rq_tasks.count,
'queuePosition': pos,
}
def task_result(task):
if not task.done:
return None
return json.loads(task.result, parse_constant=lambda c: None)
def task_parameters(task):
return json.loads(task.parameters)
...@@ -24,7 +24,7 @@ from drugstone.models import Protein, Task, ProteinDrugInteraction, \ ...@@ -24,7 +24,7 @@ from drugstone.models import Protein, Task, ProteinDrugInteraction, \
from drugstone.serializers import ProteinSerializer, TaskSerializer, \ from drugstone.serializers import ProteinSerializer, TaskSerializer, \
ProteinDrugInteractionSerializer, DrugSerializer, TaskStatusSerializer, TissueSerializer, NetworkSerializer, \ ProteinDrugInteractionSerializer, DrugSerializer, TaskStatusSerializer, TissueSerializer, NetworkSerializer, \
ProteinDisorderAssociationSerializer, DisorderSerializer, DrugDisorderIndicationSerializer ProteinDisorderAssociationSerializer, DisorderSerializer, DrugDisorderIndicationSerializer
from drugstone.tasks import start_task, refresh_from_redis, task_stats, task_result, task_parameters from drugstone.backend_tasks import start_task, refresh_from_redis, task_stats, task_result, task_parameters
# we might want to replace this class with some ProteinProteinInteraction view of user input proteins # we might want to replace this class with some ProteinProteinInteraction view of user input proteins
......
#!/bin/bash #!/bin/bash
python3 manage.py migrate --run-syncdb
python3 manage.py populate_db --delete_model PPI,PDI,Drug,Protein,Tissue,Disorder,PDiAssociations python3 manage.py populate_db --delete_model PPI,PDI,Drug,Protein,Tissue,Disorder,PDiAssociations
python3 manage.py populate_db --data_dir . -p protein-file.txt python3 manage.py populate_db --data_dir . -p protein-file.txt
......
celery -A drugstone beat -l INFO
\ No newline at end of file
celery -A drugstone worker -l INFO
\ No newline at end of file
[supervisord] [supervisord]
nodaemon=true nodaemon=true
user=root # user=root
[program:drugstone_app] [program:drugstone_django]
command=gunicorn --bind 0.0.0.0:8000 --timeout 1200 --workers 8 --log-level debug drugstone:application command=gunicorn --bind 0.0.0.0:8000 --timeout 1200 --workers 8 --log-level debug drugstone:application
directory=/usr/src/drugstone/ directory=/usr/src/drugstone/
user=nobody user=nobody
...@@ -13,7 +13,7 @@ stdout_logfile_maxbytes=0 ...@@ -13,7 +13,7 @@ stdout_logfile_maxbytes=0
[program:drugstone_worker] [program:drugstone_worker]
process_name=drugstone_worker_%(process_num)02d process_name=drugstone_worker_%(process_num)02d
command=rq worker --url redis://redis:6379/0 drugsotne_tasks command=rq worker --url redis://redis:6379/0 drugstone_tasks
directory=/usr/src/drugstone/ directory=/usr/src/drugstone/
numprocs=20 numprocs=20
autostart=true autostart=true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment