Source code for drf_network_pipeline.job_utils.run_task

from django.conf import settings
from antinex_utils.utils import ppj
from drf_network_pipeline.pipeline.consts import SUCCESS
from drf_network_pipeline.pipeline.consts import ERR
from drf_network_pipeline.pipeline.consts import NOTRUN
from drf_network_pipeline.pipeline.consts import NOTDONE
from spylunking.log.setup_logging import build_colorized_logger
from drf_network_pipeline.job_utils.build_task_request import \
    build_task_request
from drf_network_pipeline.job_utils.build_task_response import \
    build_task_response
from drf_network_pipeline.job_utils.handle_task_method import \
    handle_task_method


name = 'run-task'
log = build_colorized_logger(
    name=name)


[docs]def run_task( task_method=None, task_name="please-set-name", req_data=None, get_result=False, delay_timeout=settings.CELERY_GET_RESULT_TIMEOUT, use_cache=settings.CACHEOPS_ENABLED, cache_record=False, cache_key=None): """run_task Handles Celery sync/async task processing :param task_method: requested method :param task_name: name of the task for logging :param req_data: requested data :param get_result: get the result from task :param delay_timeout: seconds to wait for the task to finish :param use_cache: use the cached record if available :param cache_record: cache the result in redis after done :param cache_key: cache the result in this redis key """ req_node = build_task_request( task_name=task_name, use_cache=use_cache, cache_record=cache_record, cache_key=cache_key, data=req_data) res_node = build_task_response( status=NOTRUN, data=None, err="not-run") try: res_node = handle_task_method( req_node=req_node, get_result=get_result, delay_timeout=delay_timeout, task_method=task_method) if "celery_enabled" not in res_node: log.error(("Invalid return node from task={} " "task_method={} with req_node={} " "returned data={}") .format( task_name, task_method, ppj(req_node), ppj(res_node))) if res_node["status"] == SUCCESS: log.info(("celery={} - running task with data={}") .format( res_node["celery_enabled"], str(res_node["data"])[0:32])) elif not get_result and res_node["status"] == NOTDONE: log.info(("celery={} - running task with data={}") .format( res_node["celery_enabled"], str(res_node["data"])[0:32])) else: res_node["data"] = None res_node["status"] = res_node["status"] res_node["err"] = ("task={} method={} " "status={} err={}").format( task_name, task_method, res_node["status"], res_node["err"]) log.error(("Failed {}") .format( res_node["err"])) # end of handling success/failure except Exception as e: res_node = build_task_response( status=ERR, data=None, err=("Failed to run {} celery={} " "with ex={}").format( task_name, res_node.get( "celery_enabled", None), e)) log.error(res_node["err"]) # try/ex handling Celery task return res_node
# end of run_task