Source code for drf_network_pipeline.sz.ml

from django.contrib.auth import get_user_model
from django.contrib.auth.models import User
from django.conf import settings
from django.db.models import Q
from rest_framework import serializers
from rest_framework import status as drf_status
from spylunking.log.setup_logging import build_colorized_logger
from drf_network_pipeline.pipeline.consts import SUCCESS
from drf_network_pipeline.pipeline.consts import FAILED
from drf_network_pipeline.pipeline.consts import ERR
from drf_network_pipeline.pipeline.consts import NOTDONE
from drf_network_pipeline.sz.user import UserSerializer
from drf_network_pipeline.pipeline.models import MLJob
from drf_network_pipeline.pipeline.models import MLPrepare
from drf_network_pipeline.pipeline.models import MLJobResult
from drf_network_pipeline.job_utils.run_task import run_task
from drf_network_pipeline.pipeline.tasks import task_ml_prepare
from drf_network_pipeline.pipeline.tasks import task_ml_job
from drf_network_pipeline.pipeline.create_ml_prepare_record import \
    create_ml_prepare_record
from drf_network_pipeline.pipeline.create_ml_job_record import \
    create_ml_job_record


name = 'ml-sz'
log = build_colorized_logger(
    name=name)


User = get_user_model()  # noqa


[docs]class MLPrepareSerializer(serializers.Serializer): """ AntiNex Prepare Dataset Serializer """ title = serializers.CharField( max_length=256, required=False, default=None) desc = serializers.CharField( max_length=1024, required=False, default=None) full_file = serializers.CharField( max_length=1024, allow_blank=False, default="/tmp/fulldata_attack_scans.csv") clean_file = serializers.CharField( max_length=1024, allow_blank=False, default="/tmp/cleaned_attack_scans.csv") meta_suffix = serializers.CharField( max_length=256, allow_blank=False, default="metadata") output_dir = serializers.CharField( max_length=1024, allow_blank=True, required=False, allow_null=True, default="/tmp") ds_dir = serializers.CharField( max_length=1024, allow_blank=True, required=False, allow_null=True, default="/opt/antinex/datasets") ds_glob_path = serializers.CharField( max_length=1024, min_length=None, allow_blank=False, default="/opt/antinex/datasets/*/*.csv") pipeline_files = serializers.CharField( max_length=None, min_length=None, allow_blank=True, trim_whitespace=True) meta_data = serializers.CharField( max_length=None, min_length=None, allow_blank=True, trim_whitespace=True) post_proc = serializers.CharField( max_length=None, min_length=None, allow_blank=False, trim_whitespace=True) label_rules = serializers.CharField( max_length=None, min_length=None, required=False, allow_blank=False, trim_whitespace=True) version = serializers.IntegerField( default=1, required=False) request = None class_name = "MLPrepare" class Meta: fields = ( "id", "status", "control_state", "title", "desc", "full_file", "clean_file", "meta_suffix", "output_dir", "ds_dir", "ds_glob_path", "pipeline_files", "post_proc", "label_rules", "meta_data", "version", )
[docs] def create(self, request, validated_data): """create Start a new Prepare Job :param request: http request :param validated_data: post dictionary """ last_step = "" data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: user_id = request.user.id log.info(("{} create user_id={} data={}") .format(self.class_name, user_id, validated_data)) # if the Full Stack is running with Celery # then it is assumed the task will be published # to the worker and only the MLPrepare record will be # returned for polling the status # of the long-running job prepare_task_name = "ml_prepare" get_result = True prepare_task = task_ml_prepare if settings.CELERY_ENABLED: prepare_task = task_ml_prepare.delay get_result = False req_data = validated_data req_data["user_id"] = user_id create_res = create_ml_prepare_record( req_data=req_data) user_obj = create_res.get( "user_obj", None) ml_prepare_obj = create_res.get( "ml_prepare_obj", None) if not user_obj: res["error"] = ("{} - Failed to find User").format( prepare_task_name) res["status"] = ERR res["error"] = create_res.get("err", "error not set") res["data"] = None log.error(res["error"]) return res if not ml_prepare_obj: res["error"] = ("{} - Failed to create MLPrepare").format( prepare_task_name) res["status"] = ERR res["error"] = create_res.get("err", "error not set") res["data"] = None log.error(res["error"]) return res req_data["user_data"] = { "id": user_obj.id, "email": user_obj.email, "username": user_obj.username } req_data["ml_prepare_data"] = ml_prepare_obj.get_public() job_res = run_task( task_method=prepare_task, task_name=prepare_task_name, req_data=req_data, get_result=get_result) log.info(("task={} res={}") .format( prepare_task_name, str(job_res)[0:30])) # in sync mode the data is in the task # response object, so send it back # because the client is blocking on it if job_res["status"] == SUCCESS: res = { "status": SUCCESS, "code": drf_status.HTTP_201_CREATED, "error": "", "data": job_res["data"] } # if celery (full stack async mode) # is working on this task, # return the initial record data we have. # from there the client will have to poll # to get the final results elif not get_result and job_res["status"] == NOTDONE: res = { "status": SUCCESS, "code": drf_status.HTTP_201_CREATED, "error": "", "data": req_data["ml_prepare_data"] } else: res = { "status": ERR, "code": drf_status.HTTP_500_INTERNAL_SERVER_ERROR, "error": job_res["err"], "data": job_res["data"] } # end of processing result except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} create res={}") .format( self.class_name, str(res)[0:30])) return res
# end of create
[docs] def update(self, request, validated_data, pk=None): """update Update an MLPrepare :param request: http request :param validated_data: dict of values :param pk: MLPrepare.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} update pk={}") .format(self.class_name, pk)) res = { "status": SUCCESS, "code": drf_status.HTTP_200_OK, "error": "not implemented yet", "data": data } except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} update res={}") .format( self.class_name, str(res)[0:30])) return res
# end of update
[docs] def get(self, request, pk): """get Get MLPrepare record :param request: http request :param pk: MLPrepare.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} get user_id={} pk={}") .format(self.class_name, request.user.id, pk)) db_query = (Q(user=request.user.id) & Q(id=pk)) qset = [] if pk: qset = MLPrepare.objects.select_related() \ .filter(db_query) else: db_query = (Q(user=request.user.id)) qset = MLPrepare.objects.select_related().filter( db_query).order_by( "-created").all()[:settings.MAX_RECS_ML_JOB_RESULT] if len(qset) == 1: data = qset[0].get_public() res["code"] = drf_status.HTTP_200_OK res["error"] = "" elif len(qset) > 1: data["prepares"] = [] for i in qset: data["prepares"].append(i.get_public()) res["code"] = drf_status.HTTP_200_OK res["error"] = "" else: data = {} res["code"] = drf_status.HTTP_200_OK res["error"] = "" res["status"] = SUCCESS res["data"] = data except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} get res={}") .format( self.class_name, str(res)[0:30])) return res
# end of get
[docs] def delete(self, request, pk): """delete Delete an MLPrepare :param request: http request :param pk: MLPrepare.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} delete pk={}") .format(self.class_name, pk)) res = { "status": SUCCESS, "code": drf_status.HTTP_200_OK, "error": "not implemented yet", "data": data } except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} delete res={}") .format( self.class_name, str(res)[0:30])) return res
# end of delete # end of MLPrepareSerializer
[docs]class MLJobsSerializer(serializers.Serializer): """ AntiNex AI Job Serializer """ title = serializers.CharField( required=False, default=None) desc = serializers.CharField( required=False, default=None) csv_file = serializers.CharField( max_length=1024, allow_blank=False, required=False) meta_file = serializers.CharField( max_length=1024, allow_blank=False, required=False) ds_name = serializers.CharField( max_length=1024, allow_blank=False, required=False) algo_name = serializers.CharField( max_length=1024, allow_blank=True, required=False, allow_null=True, default="") ml_type = serializers.CharField( max_length=256, allow_blank=True, required=False, allow_null=True, default="Predict with Filter") image_file = serializers.CharField( max_length=256, allow_blank=True, required=False, allow_null=True) status = serializers.CharField( max_length=256, allow_blank=True, required=False, allow_null=True, default="initial") control_state = serializers.CharField( max_length=256, allow_blank=True, required=False, allow_null=True, default="active") predict_feature = serializers.CharField( max_length=256, allow_blank=True, required=False, allow_null=True, default="") training_data = serializers.CharField( max_length=None, min_length=None, allow_blank=False, trim_whitespace=True) pre_proc = serializers.CharField( max_length=None, min_length=None, allow_blank=False, trim_whitespace=True) post_proc = serializers.CharField( max_length=None, min_length=None, allow_blank=False, trim_whitespace=True) meta_data = serializers.CharField( max_length=None, min_length=None, allow_blank=False, trim_whitespace=True) tracking_id = serializers.CharField( max_length=512, allow_blank=True, required=False, allow_null=True, default="") version = serializers.IntegerField( default=1, required=False) request = None class_name = "MLJob" class Meta: fields = ( "id", "csv_file", "meta_file", "title", "desc", "ds_name", "algo_name", "ml_type", "image_file", "status", "control_state", "predict_feature", "training_data", "pre_proc", "post_proc", "meta_data", "tracking_id", "version", )
[docs] def create(self, request, validated_data): """create Start a new MLJob :param request: http request :param validated_data: post dictionary """ last_step = "" data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: user_id = request.user.id log.info(("{} create user_id={} data={}") .format(self.class_name, user_id, validated_data)) # if the Full Stack is running with Celery # then it is assumed the task will be published # to the worker and only the MLJob and MLJobResult # records will be returned for polling the status # of the long-running training job task_name = "ml_job" get_result = True ml_job_task = task_ml_job if settings.CELERY_ENABLED: ml_job_task = task_ml_job.delay get_result = False req_data = validated_data req_data["user_id"] = user_id create_res = create_ml_job_record( req_data=req_data) user_obj = create_res.get( "user_obj", None) ml_job_obj = create_res.get( "ml_job_obj", None) ml_result_obj = create_res.get( "ml_result_obj", None) if not user_obj: res["error"] = ("{} - Failed to find User").format( task_name) res["status"] = ERR res["code"] = drf_status.HTTP_400_BAD_REQUEST res["error"] = create_res.get("err", "error not set") res["data"] = None log.error(res["error"]) return res if not ml_job_obj: res["error"] = ("{} - Failed to create MLJob").format( task_name) res["status"] = ERR res["code"] = drf_status.HTTP_400_BAD_REQUEST res["error"] = create_res.get("err", "error not set") res["data"] = None log.error(res["error"]) return res if not ml_result_obj: res["error"] = ("{} - Failed to create MLJobResult").format( task_name) res["status"] = ERR res["code"] = drf_status.HTTP_400_BAD_REQUEST res["error"] = create_res.get("err", "error not set") res["data"] = None log.error(res["error"]) return res req_data["user_data"] = { "id": user_obj.id, "email": user_obj.email, "username": user_obj.username } req_data["ml_job_data"] = ml_job_obj.get_public() req_data["ml_result_data"] = ml_result_obj.get_public() job_res = run_task( task_method=ml_job_task, task_name=task_name, req_data=req_data, get_result=get_result) log.info(("task={} res={}") .format( task_name, str(job_res)[0:30])) # in sync mode the data is in the task # response object, so send it back # because the client is blocking on it if job_res["status"] == SUCCESS: res = { "status": SUCCESS, "code": drf_status.HTTP_201_CREATED, "error": "", "data": { "job": job_res["data"]["job"], "results": job_res["data"]["results"] } } # if celery (full stack async mode) # is working on this task, # return the initial record data we have. # from there the client will have to poll # to get the final results elif not get_result and job_res["status"] == NOTDONE: res = { "status": SUCCESS, "code": drf_status.HTTP_201_CREATED, "error": "", "data": { "job": req_data["ml_job_data"], "results": req_data["ml_result_data"] } } else: res = { "status": ERR, "code": drf_status.HTTP_500_INTERNAL_SERVER_ERROR, "error": job_res["err"], "data": { "job": None, "results": None } } # end of processing result except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} create res={}") .format( self.class_name, str(res)[0:30])) return res
# end of create
[docs] def update(self, request, validated_data, pk=None): """update Update an MLJob :param request: http request :param validated_data: dict of values :param pk: MLJob.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} update pk={}") .format(self.class_name, pk)) res = { "status": SUCCESS, "code": drf_status.HTTP_200_OK, "error": "not implemented yet", "data": data } except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} update res={}") .format( self.class_name, str(res)[0:30])) return res
# end of update
[docs] def get(self, request, pk): """get Get MLJob or Get Recent ML Jobs for User (if pk=None) :param request: http request :param pk: MLJob.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} get user_id={} pk={}") .format(self.class_name, request.user.id, pk)) db_query = (Q(user=request.user.id) & Q(id=pk)) qset = [] if pk: qset = MLJob.objects.select_related() \ .filter(db_query) else: db_query = (Q(user=request.user.id)) qset = MLJob.objects.select_related() \ .filter(db_query) \ .order_by("-created") \ .all()[:settings.MAX_RECS_ML_JOB] if len(qset) == 1: data = qset[0].get_public() res["code"] = drf_status.HTTP_200_OK res["error"] = "" elif len(qset) > 1: data["jobs"] = [] for i in qset: data["jobs"].append(i.get_public()) res["code"] = drf_status.HTTP_200_OK res["error"] = "" else: data = {} res["code"] = drf_status.HTTP_200_OK res["error"] = "" res["status"] = SUCCESS res["data"] = data except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} get res={}") .format( self.class_name, str(res)[0:30])) return res
# end of get
[docs] def delete(self, request, pk): """delete Delete an MLJob :param request: http request :param pk: MLJob.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} delete pk={}") .format(self.class_name, pk)) res = { "status": SUCCESS, "code": drf_status.HTTP_200_OK, "error": "not implemented yet", "data": data } except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": last_step } # end of try/ex log.info(("{} delete res={}") .format( self.class_name, str(res)[0:30])) return res
# end of delete # end of MLJobsSerializer
[docs]class MLJobResultsSerializer(serializers.Serializer): """ AntiNex AI Job Results Serializer """ user = UserSerializer( many=False) job = MLJobsSerializer( many=False, required=True) status = serializers.CharField( max_length=20, allow_blank=True, required=False, allow_null=True, default="initial") acc_data = serializers.CharField( max_length=None, min_length=None, allow_blank=False, trim_whitespace=True) error_data = serializers.CharField( max_length=None, min_length=None, allow_blank=False, trim_whitespace=True) version = serializers.IntegerField( default=1, required=False) request = None class_name = "MLJobResults" class Meta: fields = ( "id", "user", "job", "status", "acc_data", "error_data", "version", )
[docs] def create(self, request, validated_data): """create Create new MLJobResult :param request: http request :param validated_data: post dictionary """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} create data={}") .format(self.class_name, validated_data)) res = { "status": SUCCESS, "code": drf_status.HTTP_200_OK, "error": "not implemented yet", "data": data } except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} create res={}") .format( self.class_name, str(res)[0:30])) return res
# end of create
[docs] def update(self, request, validated_data, pk=None): """update Update an MLJobResult :param request: http request :param validated_data: dict of values :param pk: MLJobResult.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} update pk={}") .format(self.class_name, pk)) res = { "status": SUCCESS, "code": drf_status.HTTP_200_OK, "error": "not implemented yet", "data": data } except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} update res={}") .format( self.class_name, str(res)[0:30])) return res
# end of update
[docs] def get(self, request, pk): """get Get MLResult record :param request: http request :param pk: MLJobResult.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} get user_id={} pk={}") .format(self.class_name, request.user.id, pk)) db_query = (Q(user=request.user.id) & Q(id=pk)) qset = [] if pk: qset = MLJobResult.objects.select_related() \ .filter(db_query) else: db_query = (Q(user=request.user.id)) qset = MLJobResult.objects.select_related().filter( db_query).order_by( "-created").all()[:settings.MAX_RECS_ML_JOB_RESULT] if len(qset) == 1: data = qset[0].get_public() res["code"] = drf_status.HTTP_200_OK res["error"] = "" elif len(qset) > 1: data["results"] = [] for i in qset: data["results"].append(i.get_public( include_model=settings.INCLUDE_ML_MODEL, include_weights=settings.INCLUDE_ML_WEIGHTS)) res["code"] = drf_status.HTTP_200_OK res["error"] = "" else: data = {} res["code"] = drf_status.HTTP_200_OK res["error"] = "" res["status"] = SUCCESS res["data"] = data except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} get res={}") .format( self.class_name, str(res)[0:30])) return res
# end of get
[docs] def delete(self, request, pk): """delete Delete an MLJobResult :param request: http request :param pk: MLJobResult.id """ data = {} res = { "status": FAILED, "code": drf_status.HTTP_400_BAD_REQUEST, "error": "not run", "data": data } try: log.info(("{} delete pk={}") .format(self.class_name, pk)) res = { "status": SUCCESS, "code": drf_status.HTTP_200_OK, "error": "not implemented yet", "data": data } except Exception as e: last_step = ("{} Failed with ex={}").format( self.class_name, e) log.error(last_step) res = { "status": ERR, "code": drf_status.HTTP_400_BAD_REQUEST, "error": last_step, "data": last_step } # end of try/ex log.info(("{} delete res={}") .format( self.class_name, str(res)[0:30])) return res
# end of delete # end of MLJobResultsSerializer