diff --git a/chaos_genius/app.py b/chaos_genius/app.py index d0faf6f17..cd2544b9e 100644 --- a/chaos_genius/app.py +++ b/chaos_genius/app.py @@ -121,3 +121,4 @@ def register_commands(app): app.cli.add_command(commands.reinstall_db) app.cli.add_command(commands.insert_demo_data) app.cli.add_command(commands.run_anomaly_rca_scheduler) + app.cli.add_command(commands.kpi_import) diff --git a/chaos_genius/commands.py b/chaos_genius/commands.py index 68ebdafe1..b489967fa 100644 --- a/chaos_genius/commands.py +++ b/chaos_genius/commands.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Click commands.""" +import json import os from datetime import datetime from glob import glob @@ -8,6 +9,7 @@ import click from flask.cli import with_appcontext +from chaos_genius.controllers.kpi_controller import add_kpi HERE = os.path.abspath(os.path.dirname(__file__)) PROJECT_ROOT = os.path.join(HERE, os.pardir) @@ -145,6 +147,66 @@ def run_anomaly_rca_scheduler(): click.echo("Completed running scheduler. Tasks should be running in the worker.") +@click.command() +@with_appcontext +@click.argument("file_name") +def kpi_import(file_name: str): + """Adds KPIs defined in given JSON file. + + The JSON must be in the following format: + + \b + ``` + [ + { + "name": "", + "is_certified": false, + "data_source": 0, + "kpi_type": "", + "kpi_query": "", + "schema_name": null, + "table_name": "", + "metric": "", + "aggregation": "", + "datetime_column": "", + "filters": [], + "dimensions": [], + "run_anomaly": true, + "anomaly_params": { + "frequency": "D", + "anomaly_period": 90, + "seasonality": [], + "model_name": "ProphetModel", + "sensitivity": "High" + } + } + ] + ``` + """ + with open(file_name) as f: + kpis = json.load(f) + + for data in kpis: + data: dict + + try: + kpi, err, _ = add_kpi(data, validate=True, run_analytics=True) + + if err != "": + click.echo(click.style( + f"Error in KPI ({kpi.name}): {err}", + fg="red", + bold=True + )) + + except Exception as e: + click.echo(click.style( + f"Could not set up KPI with name: {data['name']}, skipping. Error: {e}", + fg="red", + bold=True + )) + + @click.command() @with_appcontext def reinstall_db(): diff --git a/chaos_genius/controllers/kpi_controller.py b/chaos_genius/controllers/kpi_controller.py index 8cecc3d81..3bc5caa54 100644 --- a/chaos_genius/controllers/kpi_controller.py +++ b/chaos_genius/controllers/kpi_controller.py @@ -1,23 +1,132 @@ import logging import typing from datetime import date, datetime, timedelta -from typing import Optional, Union, Iterator +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union -from flask import current_app # noqa: F401 +from flask import current_app +from chaos_genius.controllers.dashboard_controller import create_dashboard_kpi_mapper # noqa: F401 from chaos_genius.controllers.task_monitor import checkpoint_failure, checkpoint_success from chaos_genius.core.anomaly.controller import AnomalyDetectionController from chaos_genius.core.rca.rca_controller import RootCauseAnalysisController from chaos_genius.core.utils.data_loader import DataLoader +from chaos_genius.core.utils.kpi_validation import validate_kpi from chaos_genius.databases.models.kpi_model import Kpi +from chaos_genius.settings import DAYS_OFFSET_FOR_ANALTYICS, MAX_DEEPDRILLS_SLACK_DAYS -from chaos_genius.settings import ( - MAX_DEEPDRILLS_SLACK_DAYS, - DAYS_OFFSET_FOR_ANALTYICS, -) +logger = logging.getLogger(__name__) -logger = logging.getLogger(__name__) +def add_kpi( + data: Dict[str, Any], + validate=True, + run_analytics=True +) -> Tuple[Kpi, str, bool]: + """Adds a new KPI. + + Also handles adding the KPI to the default dashboard, running analytics and + validations. + + If "anomaly_params" is present in data, it also performs anomaly params validation + and runs anomaly. + + Arguments: + data: the KPI data. Field names are same as the KPI model. + validate: whether to perform KPI validation. (Default: True). + run_rca: whether to start RCA task after adding. (Default: True). + + Returns: + A tuple of (newly added KPI, error message - empty if success, boolean + indicating whether error was critical) + """ + data["dimensions"] = data.get("dimensions", []) + + data["kpi_query"] = _kpi_query_strip_trailing_semicolon( + (data.get("kpi_query", "") or "") + ) + + has_anomaly_setup = "anomaly_params" in data + new_anomaly_params = {} + + new_kpi = Kpi( + name=data.get("name"), + is_certified=data.get("is_certified"), + data_source=data.get("data_source"), + kpi_type=data.get("dataset_type") or data.get("kpi_type"), + kpi_query=data.get("kpi_query"), + schema_name=data.get("schema_name"), + table_name=data.get("table_name"), + metric=data.get("metric"), + aggregation=data.get("aggregation"), + datetime_column=data.get("datetime_column"), + filters=data.get("filters"), + dimensions=data.get("dimensions"), + run_anomaly=data.get("run_anomaly", True), + ) + + if has_anomaly_setup: + from chaos_genius.views.anomaly_data_view import validate_partial_anomaly_params + + # validate anomaly params + err, new_anomaly_params = validate_partial_anomaly_params( + data["anomaly_params"] + ) + if err != "": + return ( + new_kpi, + f"Error in validating anomaly params for KPI {data['name']}: {err}", + True, + ) + + if validate: + # Perform KPI Validation + status, message = validate_kpi(new_kpi.as_dict) + if status is not True: + return new_kpi, message, True + + new_kpi.save(commit=True) + + # Add KPI to dashboard 0 and all required dashboards + _add_kpi_to_dashboards(new_kpi.id, data.get("dashboard", []) + [0]) + + if has_anomaly_setup: + from chaos_genius.views.anomaly_data_view import update_anomaly_params + + # update anomaly params + err, new_kpi = update_anomaly_params( + new_kpi, new_anomaly_params, check_editable=False + ) + + if err != "": + return ( + new_kpi, + f"Error updating anomaly params for KPI {new_kpi.name}: {err}", + True + ) + + if run_analytics: + # we ensure analytics tasks are run as soon as analytics is configured + from chaos_genius.jobs.anomaly_tasks import queue_kpi_analytics + queue_kpi_analytics(new_kpi.id, has_anomaly_setup) + + return new_kpi, "", False + + +def _kpi_query_strip_trailing_semicolon(query: str) -> str: + if not query: + return "" + + query = query.strip() + + # remove trailing semicolon + if query[-1] == ";": + query = query[:-1] + return query + + +def _add_kpi_to_dashboards(kpi_id: int, dashboards: List[int]): + dashboards = list(set(dashboards)) + create_dashboard_kpi_mapper(dashboards, [kpi_id]) def _is_data_present_for_end_date( diff --git a/chaos_genius/jobs/anomaly_tasks.py b/chaos_genius/jobs/anomaly_tasks.py index 89b8c283f..bbc1f83c5 100644 --- a/chaos_genius/jobs/anomaly_tasks.py +++ b/chaos_genius/jobs/anomaly_tasks.py @@ -301,3 +301,30 @@ def anomaly_scheduler(): g = group(task_group) res = g.apply_async() return res + + +def queue_kpi_analytics(kpi_id: int, run_anomaly=True): + """Adds analytics tasks to the queue for given KPI.""" + anomaly_task = None + if run_anomaly: + anomaly_task = ready_anomaly_task(kpi_id) + + # run rca as soon as new KPI is added + rca_task = ready_rca_task(kpi_id) + if rca_task is None: + logger.error( + "Could not run RCA task since newly added KPI was not found: %s", + kpi_id, + ) + else: + rca_task.apply_async() + + if run_anomaly: + if anomaly_task is None: + logger.error( + "Not running anomaly since it is not configured or KPI " + "(%d) was not found.", + kpi_id, + ) + else: + anomaly_task.apply_async() diff --git a/chaos_genius/views/kpi_view.py b/chaos_genius/views/kpi_view.py index c0dee88dc..5ce44afbc 100644 --- a/chaos_genius/views/kpi_view.py +++ b/chaos_genius/views/kpi_view.py @@ -4,6 +4,7 @@ import logging import traceback # noqa: F401 from datetime import date, datetime, timedelta +from typing import cast from flask import ( # noqa: F401 Blueprint, @@ -24,7 +25,7 @@ from chaos_genius.databases.models.rca_data_model import RcaData from chaos_genius.extensions import cache, db from chaos_genius.databases.db_utils import chech_editable_field -from chaos_genius.controllers.kpi_controller import get_kpi_data_from_id +from chaos_genius.controllers.kpi_controller import add_kpi, get_kpi_data_from_id from chaos_genius.controllers.dashboard_controller import ( create_dashboard_kpi_mapper, get_mapper_obj_by_dashboard_ids, @@ -46,67 +47,28 @@ @blueprint.route("", methods=["GET", "POST"]) -@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this +@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this def kpi(): - """kpi list view.""" + """KPI add and list view.""" # Handle logging in if request.method == "POST": - if not request.is_json: - return jsonify({"error": "The request payload is not in JSON format"}) - data = request.get_json() - data["dimensions"] = [] if data["dimensions"] is None else data["dimensions"] - - if data.get("kpi_query", "").strip(): - data["kpi_query"] = data["kpi_query"].strip() - # remove trailing semicolon - if data["kpi_query"][-1] == ";": - data["kpi_query"] = data["kpi_query"][:-1] - - new_kpi = Kpi( - name=data.get("name"), - is_certified=data.get("is_certified"), - data_source=data.get("data_source"), - kpi_type=data.get("dataset_type"), - kpi_query=data.get("kpi_query"), - schema_name=data.get("schema_name"), - table_name=data.get("table_name"), - metric=data.get("metric"), - aggregation=data.get("aggregation"), - datetime_column=data.get("datetime_column"), - filters=data.get("filters"), - dimensions=data.get("dimensions"), - ) - # Perform KPI Validation - status, message = validate_kpi(new_kpi.as_dict) - if status is not True: - return jsonify( - {"error": message, "status": "failure", "is_critical": "true"} - ) - new_kpi.save(commit=True) - - # Add the dashboard id 0 to the kpi - dashboard_list = data.get("dashboard", []) + [0] - dashboard_list = list(set(dashboard_list)) - mapper_obj_list = create_dashboard_kpi_mapper(dashboard_list, [new_kpi.id]) + if data is None: + return jsonify({"error": "The request payload is not in JSON format"}) - # TODO: Fix circular import error - from chaos_genius.jobs.anomaly_tasks import ready_rca_task + kpi, err_msg, critical = add_kpi(data) - # run rca as soon as new KPI is added - rca_task = ready_rca_task(new_kpi.id) - if rca_task is None: - print( - f"Could not run RCA task since newly added KPI was not found: {new_kpi.id}" - ) - else: - rca_task.apply_async() + if err_msg != "": + ret = {"error": err_msg, "status": "failure"} + if critical: + ret["is_critical"] = "true" + return jsonify(ret) return jsonify( { - "data": {"kpi_id": new_kpi.id}, - "message": f"KPI {new_kpi.name} has been created successfully.", + "data": {"kpi_id": kpi.id}, + "message": f"KPI {kpi.name} has been created successfully.", "status": "success", } )