Source code for drf_network_pipeline.pipeline.tasks

from __future__ import absolute_import, unicode_literals
import json
import pandas as pd
from django.conf import settings
from django.db.models import Q
from celery import shared_task
from spylunking.log.setup_logging import build_colorized_logger
from antinex_utils.utils import ev
from antinex_utils.utils import ppj
from antinex_utils.consts import VALID
from drf_network_pipeline.pipeline.models import MLPrepare
from drf_network_pipeline.pipeline.models import MLJob
from drf_network_pipeline.pipeline.models import MLJobResult
from antinex_utils.consts import SUCCESS
from antinex_utils.consts import ERR
from antinex_utils.prepare_dataset_tools import build_csv
from antinex_utils.prepare_dataset_tools import find_all_pipeline_csvs
from antinex_utils.build_training_request import build_training_request
from antinex_utils.make_predictions import make_predictions
from drf_network_pipeline.users.db_lookup_user import \
    db_lookup_user
from drf_network_pipeline.pipeline.create_ml_prepare_record import \
    create_ml_prepare_record
from drf_network_pipeline.pipeline.process_worker_results import \
    process_worker_results
from drf_network_pipeline.job_utils.build_task_response import \
    build_task_response
from kombu import Connection
from kombu import Producer
from kombu import Exchange
from kombu import Queue


log = build_colorized_logger(
    name='ml.tasks')


# allow tasks to be sent straight to the worker
[docs]@shared_task( name=("drf_network_pipeline.pipeline.tasks." "task_ml_prepare"), queue=("drf_network_pipeline.pipeline.tasks." "task_ml_prepare"), bind=True) def task_ml_prepare( self=None, req_node=None): """task_ml_prepare :param self: parent task object for bind=True :param req_node: job utils dictionary for passing a dictionary """ log.info(("task - {} - start " "req_node={}") .format( req_node["task_name"], ppj(req_node))) ml_prepare_data = req_node["data"].get("ml_prepare_data", None) user_obj = None ml_prepare_obj = None if req_node["use_cache"]: ml_prepare_obj = MLPrepare.objects.select_related().filter( Q(id=int(ml_prepare_data["id"]))).cache().first() else: ml_prepare_obj = MLPrepare.objects.select_related().filter( Q(id=int(ml_prepare_data["id"]))).first() # end of finding the MLPrepare record create_new_record = False # create the response node from request res = build_task_response( use_cache=req_node["use_cache"], celery_enabled=req_node["celery_enabled"], cache_key=req_node["cache_key"]) try: if create_new_record: create_res = create_ml_prepare_record( req_node=req_node) user_obj = create_res.get( "user_obj", None) ml_prepare_obj = create_res.get( "ml_prepare_obj", None) if not user_obj: res["error"] = ("{} - Failed to find User").format( req_node["task_name"]) res["status"] = ERR res["error"] = create_res.get("err", "error not set") res["data"] = None log.error(res["error"]) return res if not ml_prepare_obj: res["error"] = ("{} - Failed to create MLPrepare").format( req_node["task_name"]) res["status"] = ERR res["error"] = create_res.get("err", "error not set") res["data"] = None log.error(res["error"]) return res # end of create_new_record last_step = ("starting user={} prepare={} " "pipeline={} clean={} full={} " "post={} label={} tracking={}").format( ml_prepare_obj.user_id, ml_prepare_obj.id, ml_prepare_obj.pipeline_files, ml_prepare_obj.clean_file, ml_prepare_obj.full_file, ml_prepare_obj.post_proc, ml_prepare_obj.label_rules, ml_prepare_obj.tracking_id) log.info(last_step) log_id = "job={}".format( ml_prepare_obj.id) log.info(("prepare={} csvs={}") .format( ml_prepare_obj.id, ml_prepare_obj.ds_glob_path)) ml_prepare_obj.pipeline_files = find_all_pipeline_csvs( use_log_id=log_id, csv_glob_path=ml_prepare_obj.ds_glob_path) log.info(("preparing={} clean={} full={} " "meta_suffix={} files={}") .format( ml_prepare_obj.id, ml_prepare_obj.clean_file, ml_prepare_obj.full_file, ml_prepare_obj.meta_suffix, ml_prepare_obj.pipeline_files)) save_node = build_csv( use_log_id=log_id, pipeline_files=ml_prepare_obj.pipeline_files, fulldata_file=ml_prepare_obj.full_file, clean_file=ml_prepare_obj.clean_file, post_proc_rules=ml_prepare_obj.post_proc, label_rules=ml_prepare_obj.label_rules, meta_suffix=ml_prepare_obj.meta_suffix) if save_node["status"] == VALID: log.info("successfully processed datasets:") ml_prepare_obj.post_proc = save_node["post_proc_rules"] ml_prepare_obj.post_proc["features_to_process"] = \ save_node["features_to_process"] ml_prepare_obj.post_proc["ignore_features"] = \ save_node["ignore_features"] ml_prepare_obj.post_proc["feature_to_predict"] = \ save_node["feature_to_predict"] ml_prepare_obj.label_rules = save_node["label_rules"] ml_prepare_obj.pipeline_files = save_node["pipeline_files"] ml_prepare_obj.full_file = save_node["fulldata_file"] ml_prepare_obj.clean_file = save_node["clean_file"] ml_prepare_obj.status = "finished" ml_prepare_obj.control_state = "finished" ml_prepare_obj.save() log.info(("saved prepare={}") .format( ml_prepare_obj.id)) if ev("SHOW_SUMMARY", "0") == "1": log.info(("Full csv: {}") .format( save_node["fulldata_file"])) log.info(("Full meta: {}") .format( save_node["fulldata_metadata_file"])) log.info(("Clean csv: {}") .format( save_node["clean_file"])) log.info(("Clean meta: {}") .format( save_node["clean_metadata_file"])) log.info("------------------------------------------") log.info(("Predicting Feature: {}") .format( save_node["feature_to_predict"])) log.info(("Features to Process: {}") .format( ppj(save_node["features_to_process"]))) log.info(("Ignored Features: {}") .format(ppj( save_node["ignore_features"]))) log.info("------------------------------------------") # end of show summary log.info("Full: {}".format( save_node["fulldata_file"])) log.info("Cleaned (no-NaNs in columns): {}".format( save_node["clean_file"])) data = ml_prepare_obj.get_public() res["status"] = SUCCESS res["err"] = "" res["data"] = data else: last_step = ("failed to prepare csv status={} " "errors: {}").format( save_node["status"], save_node["err"]) log.error(last_step) ml_prepare_obj.status = "error" ml_prepare_obj.control_state = "error" ml_prepare_obj.save() data["prepare"] = ml_prepare_obj.get_public() data["ready"] = {} res["status"] = ERR res["error"] = last_step res["data"] = data return res # end of checking it started except Exception as e: res["status"] = ERR res["err"] = ("Failed task={} with " "ex={}").format( req_node["task_name"], e) res["data"] = None log.error(res["err"]) # end of try/ex log.info(("task - {} - done") .format( req_node["task_name"])) return res
# end of task_ml_prepare # allow tasks to be sent straight to the worker
[docs]@shared_task( name=("drf_network_pipeline.pipeline.tasks." "task_publish_to_core"), queue=("drf_network_pipeline.pipeline.tasks." "task_publish_to_core"), bind=True, ignore_result=True) def task_publish_to_core( self=None, publish_node=None): """task_publish_to_core :param self: parent task object for bind=True :param publish_node: dictionary to send to the AntiNex Core Worker """ if settings.ANTINEX_WORKER_ENABLED: conn = None dataset = publish_node["body"].get("dataset", None) predict_rows = publish_node["body"].get("predict_rows", None) if not dataset and not predict_rows: log.info(("skipping antinex core publish body={} - " "is missing dataset and predict_rows") .format( publish_node)) return None # end of checking for supported requests to the core log.info(("task_publish_to_core - start req={}").format( str(publish_node)[0:32])) if not predict_rows: log.info(("building predict_rows from dataset={}") .format( dataset)) predict_rows = [] predict_rows_df = pd.read_csv(dataset) for idx, org_row in predict_rows_df.iterrows(): new_row = json.loads(org_row.to_json()) new_row["idx"] = len(predict_rows) + 1 predict_rows.append(new_row) # end of building predict rows publish_node["body"]["apply_scaler"] = True publish_node["body"]["predict_rows"] = pd.DataFrame( predict_rows).to_json() # end of validating publish_node["body"]["ml_type"] = \ publish_node["body"]["manifest"]["ml_type"] log.debug(("NEXCORE - ssl={} exchange={} routing_key={}") .format( settings.ANTINEX_SSL_OPTIONS, settings.ANTINEX_EXCHANGE_NAME, settings.ANTINEX_ROUTING_KEY)) try: if settings.ANTINEX_WORKER_SSL_ENABLED: log.debug("connecting with ssl") conn = Connection( settings.ANTINEX_AUTH_URL, login_method="EXTERNAL", ssl=settings.ANTINEX_SSL_OPTIONS) else: log.debug("connecting without ssl") conn = Connection( settings.ANTINEX_AUTH_URL) # end of connecting conn.connect() log.debug("getting channel") channel = conn.channel() core_exchange = Exchange( settings.ANTINEX_EXCHANGE_NAME, type=settings.ANTINEX_EXCHANGE_TYPE, durable=True) log.debug("creating producer") producer = Producer( channel=channel, auto_declare=True, serializer="json") try: log.debug("declaring exchange") producer.declare() except Exception as k: log.error(("declare exchange failed with ex={}") .format( k)) # end of try to declare exchange which can fail if it exists core_queue = Queue( settings.ANTINEX_QUEUE_NAME, core_exchange, routing_key=settings.ANTINEX_ROUTING_KEY, durable=True) try: log.debug("declaring queue") core_queue.maybe_bind(conn) core_queue.declare() except Exception as k: log.error(("declare queue={} routing_key={} failed with ex={}") .format( settings.ANTINEX_QUEUE_NAME, settings.ANTINEX_ROUTING_KEY, k)) # end of try to declare queue which can fail if it exists log.info(("publishing exchange={} routing_key={} persist={}") .format( core_exchange.name, settings.ANTINEX_ROUTING_KEY, settings.ANTINEX_DELIVERY_MODE)) producer.publish( body=publish_node["body"], exchange=core_exchange.name, routing_key=settings.ANTINEX_ROUTING_KEY, auto_declare=True, serializer="json", delivery_mode=settings.ANTINEX_DELIVERY_MODE) except Exception as e: log.info(("Failed to publish to core req={} with ex={}") .format( publish_node, e)) # try/ex if conn: conn.release() log.info(("task_publish_to_core - done")) else: log.debug("core - disabled") # publish to the core if enabled return None
# end of task_publish_to_core # allow tasks to be sent straight to the worker
[docs]@shared_task( name=("drf_network_pipeline.pipeline.tasks." "task_ml_process_results"), queue=("drf_network_pipeline.pipeline.tasks." "task_ml_process_results"), bind=True, ignore_result=True) def task_ml_process_results( self=None, res_node=None): """task_ml_process_results Core workers send results back to the REST API worker here :param self: parent task object for bind=True :param res_node: results dictionary from the core """ if settings.ANTINEX_WORKER_ENABLED: log.info("processing worker results") process_worker_results( res_node=res_node) else: log.info("no worker to get results") return None
# end of task_ml_process_results # allow tasks to be sent straight to the worker
[docs]@shared_task( name=("drf_network_pipeline.pipeline.tasks." "task_ml_job"), queue=("drf_network_pipeline.pipeline.tasks." "task_ml_job"), bind=True) def task_ml_job( self=None, req_node=None): """task_ml_job :param self: parent task object for bind=True :param req_node: job utils dictionary for passing a dictionary """ log.info(("task - {} - start " "req_node={}") .format( req_node["task_name"], ppj(req_node))) user_data = req_node["data"].get("user_data", None) ml_job = req_node["data"].get("ml_job_data", None) ml_result = req_node["data"].get("ml_result_data", None) model_desc = req_node["data"].get("model_desc", None) label_rules = req_node["data"].get("label_rules", None) predict_rows = req_node["data"].get("predict_rows", None) user_res = db_lookup_user( user_id=user_data["id"]) user_obj = user_res.get( "user_obj", None) ml_job_id = None ml_result_id = None ml_job_obj = None found_predictions = [] found_accuracy = None if req_node["use_cache"]: ml_job_obj = MLJob.objects.select_related().filter( Q(id=int(ml_job["id"])) & Q(user=user_obj)).cache().first() else: ml_job_obj = MLJob.objects.select_related().filter( Q(id=int(ml_job["id"])) & Q(user=user_obj)).first() # end of finding the MLJob record ml_result_obj = None if req_node["use_cache"]: ml_result_obj = MLJobResult.objects.select_related().filter( Q(id=int(ml_result["id"])) & Q(user=user_obj)).cache().first() else: ml_result_obj = MLJobResult.objects.select_related().filter( Q(id=int(ml_result["id"])) & Q(user=user_obj)).first() # end of finding the MLJobResult record res = build_task_response( use_cache=req_node["use_cache"], celery_enabled=req_node["celery_enabled"], cache_key=req_node["cache_key"]) last_step = "not started" data = {} data["job"] = {} data["results"] = {} try: res["status"] = ERR res["error"] = "" predict_manifest = ml_job_obj.predict_manifest csv_file = predict_manifest.get("csv_file", None) meta_file = predict_manifest.get("meta_file", None) epochs = int(predict_manifest.get("epochs", "5")) test_size = float(predict_manifest.get("test_size", "0.2")) batch_size = int(predict_manifest.get("batch_size", "32")) verbose = int(predict_manifest.get("verbose", "1")) # use pre-trained models in memory by label use_model_name = ml_job_obj.predict_manifest.get( "use_model_name", None) dataset = ml_job_obj.predict_manifest.get( "dataset", None) predict_rows = ml_job_obj.predict_manifest.get( "predict_rows", None) predict_feature = ml_job_obj.predict_manifest.get( "predict_feature", None) features_to_process = ml_job_obj.predict_manifest.get( "features_to_process", None) ignore_features = ml_job_obj.predict_manifest.get( "ignore_features", None) publish_to_core = ml_job_obj.predict_manifest.get( "publish_to_core", None) apply_scaler = ml_job_obj.predict_manifest.get( "apply_scaler", True) sort_values = ml_job_obj.predict_manifest.get( "sort_values", None) max_records = int(ml_job_obj.predict_manifest.get( "max_records", "100000")) loss = ml_job_obj.predict_manifest.get( "loss", "binary_crossentropy") metrics = ml_job_obj.predict_manifest.get( "metrics", [ "accuracy" ]) optimizer = ml_job_obj.predict_manifest.get( "optimizer", "adam") histories = ml_job_obj.predict_manifest.get( "histories", [ "val_loss", "val_acc", "loss", "acc" ]) needs_local_builder = True if ((dataset or predict_rows) and features_to_process): log.info(("using antinex builder dataset={} predict_rows={} " "features_to_process={}") .format( dataset, predict_rows, features_to_process)) needs_local_builder = False # flag for bypassing build inside django instead of antinex-utils image_file = ml_result_obj.acc_image_file version = ml_job_obj.version ml_job_id = ml_job_obj.id ml_result_id = ml_result_obj.id last_step = ("starting user={} " "job.id={} result.id={} predict={} " "model_desc={} " "csv={} meta={}").format( ml_job_obj.user.id, ml_job_id, ml_result_id, ml_job_obj.predict_feature, model_desc, csv_file, meta_file) log.info(last_step) ml_job_obj.status = "analyzing" ml_job_obj.save() if needs_local_builder: log.info("starting local build_training_request") ml_req = build_training_request( csv_file=csv_file, meta_file=meta_file, predict_feature=ml_job_obj.predict_feature, test_size=test_size) if ml_req["status"] != VALID: last_step = ("Stopping for status={} " "errors: {}").format( ml_req["status"], ml_req["err"]) log.error(last_step) ml_job_obj.status = "error" ml_job_obj.control_state = "error" log.info(("saving job={}") .format( ml_job_id)) ml_job_obj.save() data["job"] = ml_job_obj.get_public() error_data = { "status": ml_req["status"], "err": ml_req["err"] } data["results"] = error_data res["status"] = ERR res["error"] = last_step res["data"] = data return res else: predict_manifest["ignore_features"] = \ ml_req.get("ignore_features", []) predict_manifest["features_to_process"] = \ ml_req.get("features_to_process", []) if label_rules: predict_manifest["label_rules"] = \ label_rules else: predict_manifest["label_rules"] = \ ml_req["meta_data"]["label_rules"] predict_manifest["post_proc_rules"] = \ ml_req["meta_data"]["post_proc_rules"] predict_manifest["version"] = version last_step = ("job.id={} built_training_request={} " "predict={} features={} ignore={} " "label_rules={} post_proc={}").format( ml_job_obj.id, ml_req["status"], predict_manifest["predict_feature"], predict_manifest["features_to_process"], predict_manifest["ignore_features"], predict_manifest["label_rules"], predict_manifest["post_proc_rules"]) log.info(last_step) if ml_job_obj.ml_type == "regression": log.info(("using Keras - regression - " "sequential model ml_type={}") .format( ml_job_obj.ml_type)) loss = "mse" metrics = [ "mse", "mae", "mape", "cosine" ] histories = [ "mean_squared_error", "mean_absolute_error", "mean_absolute_percentage_error", "cosine_proximity" ] else: log.info(("using Keras - sequential model " "ml_type={}") .format(ml_job_obj.ml_type)) # end of classification vs regression ml_job_obj.predict_manifest["epochs"] = epochs ml_job_obj.predict_manifest["batch_size"] = batch_size ml_job_obj.predict_manifest["verbose"] = verbose ml_job_obj.predict_manifest["loss"] = loss ml_job_obj.predict_manifest["metrics"] = metrics ml_job_obj.predict_manifest["optimizer"] = optimizer ml_job_obj.predict_manifest["histories"] = histories ml_job_obj.predict_manifest = predict_manifest # end of updating without antinex-utils # end of if needs_local_builder: ml_job_obj.status = "started" ml_job_obj.save() scores = None prediction_req = { "label": "job_{}_result_{}".format( ml_job_id, ml_result_id), "manifest": ml_job_obj.predict_manifest, "model_json": ml_result_obj.model_json, "model_desc": model_desc, "weights_json": ml_result_obj.model_weights, } if dataset: prediction_req["dataset"] = dataset if max_records: prediction_req["max_records"] = max_records if predict_rows: prediction_req["predict_rows"] = json.dumps(predict_rows) if features_to_process: prediction_req["features_to_process"] = features_to_process if ignore_features: prediction_req["ignore_features"] = ignore_features if apply_scaler: prediction_req["apply_scaler"] = apply_scaler if sort_values: prediction_req["sort_values"] = sort_values if loss: prediction_req["loss"] = loss if metrics: prediction_req["metrics"] = metrics if optimizer: prediction_req["optimizer"] = optimizer if histories: prediction_req["histories"] = histories if predict_feature: prediction_req["predict_feature"] = predict_feature if csv_file: prediction_req["csv_file"] = csv_file if meta_file: prediction_req["meta_file"] = meta_file already_predicted = False # if you just want to use the core without django training: if publish_to_core or settings.ANTINEX_WORKER_ONLY: log.info(("model_name={} only publish={} worker={}") .format( use_model_name, publish_to_core, settings.ANTINEX_WORKER_ONLY)) ml_job_obj.status = "launched" ml_job_obj.control_state = "launched" ml_job_obj.save() ml_result_obj.status = "launched" ml_result_obj.control_state = "launched" ml_result_obj.save() else: log.info(("start make_predictions req={}").format( ppj(prediction_req))) prediction_res = make_predictions( req=prediction_req) if prediction_res["status"] != SUCCESS: last_step = ("Stopping for prediction_status={} " "errors: {}").format( prediction_res["status"], prediction_res["err"]) log.error(last_step) ml_job_obj.status = "error" ml_job_obj.control_state = "error" log.info(("saving job={}") .format( ml_job_id)) ml_job_obj.save() data["job"] = ml_job_obj.get_public() error_data = { "status": prediction_res["status"], "err": prediction_res["err"] } data["results"] = error_data res["status"] = ERR res["error"] = last_step res["data"] = data return res already_predicted = True res_data = prediction_res["data"] model = res_data["model"] model_weights = res_data["weights"] scores = res_data["scores"] acc_data = res_data["acc"] error_data = res_data["err"] predictions_json = { "predictions": json.loads( pd.Series( res_data["sample_predictions"]).to_json( orient="records")) } found_predictions = res_data["sample_predictions"] found_accuracy = acc_data.get( "accuracy", None) last_step = ("job={} accuracy={}").format( ml_job_id, scores[1] * 100) log.info(last_step) ml_job_obj.status = "finished" ml_job_obj.control_state = "finished" ml_job_obj.save() log.info(("saved job={}") .format( ml_job_id)) data["job"] = ml_job_obj.get_public() acc_data = { "accuracy": scores[1] * 100 } error_data = None log.info(("converting job={} model to json") .format( ml_job_id)) model_json = json.loads(model.to_json()) log.info(("saving job={} weights_file={}") .format( ml_job_id, ml_result_obj.model_weights_file)) log.info(("building job={} results") .format( ml_job_id)) ml_result_obj.status = "finished" ml_result_obj.acc_data = acc_data ml_result_obj.error_data = error_data ml_result_obj.model_json = model_json ml_result_obj.model_weights = model_weights ml_result_obj.acc_image_file = image_file ml_result_obj.predictions_json = predictions_json ml_result_obj.version = version # end of handing off to core worker without a database connection log.info(("saving job={} results") .format( ml_job_id)) # OpenShift 9.6 Postgres container killed the worker # here. Interested to see if this is a jsonb/jsonfield problem # 2018-05-20 try: ml_result_obj.save() except Exception as e: res["error"] = ( "Failed saving model job.id={} with ex={}").format( ml_job_id, e) res["status"] = ERR res["data"] = data log.error( res["error"]) return res # end try/ex log.info(("done saving job={} results") .format( ml_job_id)) data["job"] = ml_job_obj.get_public() data["results"] = ml_result_obj.get_public() res["status"] = SUCCESS res["error"] = "" res["data"] = data if settings.ANTINEX_WORKER_ENABLED and not already_predicted: if use_model_name: prediction_req["label"] = use_model_name log.info(("publishing to core use_model_name={} " "worker={} already_predicted={}") .format( use_model_name, settings.ANTINEX_WORKER_ENABLED, already_predicted)) publish_req = { "body": prediction_req } if settings.CELERY_ENABLED: task_publish_to_core.delay( publish_node=publish_req) else: task_publish_to_core( publish_node=publish_req) else: log.info(("skip - worker={} already_predicted={}") .format( settings.ANTINEX_WORKER_ENABLED, already_predicted)) # send to core except Exception as e: res["status"] = ERR res["err"] = ("Failed task={} with " "ex={}").format( req_node["task_name"], e) if ml_job_obj: data["job"] = ml_job_obj.get_public() else: data["job"] = None if ml_result_obj: data["results"] = ml_result_obj.get_public() else: data["results"] = None log.error(res["err"]) # end of try/ex log.info(("task - {} - done - " "ml_job.id={} ml_result.id={} " "accuracy={} predictions={}") .format( req_node["task_name"], ml_job_id, ml_result_id, found_accuracy, len(found_predictions))) return res
# end of task_ml_job