Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
refactor: KPI add into a new func in controller.
Browse files Browse the repository at this point in the history
Added a generalized `add_kpi` function in kpi_controller.
- Handles validation, adding to dashboards, anomaly params validation
  and running analytics.
- validation and analytics run can be switched off.
- Split some unrelated parts into separate functions
    - `_kpi_query_strip_trailing_semicolon`
    - `_add_kpi_to_dashboards`
    - `queue_kpi_analytics` (added to anomaly_tasks.py)

Updated both the CLI and the KPI view to use this new function.

Tested by adding a simple KPI from the UI and importing a KPI from the
CLI.
  • Loading branch information
Samyak2 committed Jan 17, 2022
1 parent 943be98 commit e562d84
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 143 deletions.
93 changes: 9 additions & 84 deletions chaos_genius/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from flask.cli import with_appcontext

from chaos_genius.controllers.dashboard_controller import create_dashboard_kpi_mapper
from chaos_genius.controllers.kpi_controller import add_kpi
from chaos_genius.core.utils.kpi_validation import validate_kpi
from chaos_genius.databases.models.kpi_model import Kpi
from chaos_genius.views.anomaly_data_view import (
Expand Down Expand Up @@ -196,90 +197,14 @@ def kpi_import(file_name: str):
data: dict

try:
# TODO: separate this and KPI endpoint code in a new function
data["dimensions"] = [] if data.get("dimensions") is None else data["dimensions"]

if data.get("kpi_query", "").strip():
data["kpi_query"] = data["kpi_query"].strip()
# remove trailing semicolon
if data["kpi_query"][-1] == ";":
data["kpi_query"] = data["kpi_query"][:-1]

has_anomaly_setup = "anomaly_params" in data
new_anomaly_params = {}

if has_anomaly_setup:
# validate anomaly params
err, new_anomaly_params = validate_partial_anomaly_params(
data["anomaly_params"]
)
if err != "":
click.echo(f"Error in validating anomaly params for KPI {data['name']}: {err}")
return 1

new_kpi = Kpi(
name=data.get("name"),
is_certified=data.get("is_certified"),
data_source=data.get("data_source"),
kpi_type=data.get("kpi_type"),
kpi_query=data.get("kpi_query"),
schema_name=data.get("schema_name"),
table_name=data.get("table_name"),
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
run_anomaly=data.get("run_anomaly"),
)

# Perform KPI Validation
status, message = validate_kpi(new_kpi.as_dict)
if status is not True:
click.echo(f"KPI validation failed for KPI {new_kpi.name}. Error: {message}")
return 1

new_kpi = new_kpi.save(commit=True)

# Add the dashboard id 0 to the kpi
dashboard_list = data.get("dashboard", []) + [0]
dashboard_list = list(set(dashboard_list))
create_dashboard_kpi_mapper(dashboard_list, [new_kpi.id])

if has_anomaly_setup:
# update anomaly params
err, new_kpi = update_anomaly_params(
new_kpi, new_anomaly_params, check_editable=False
)

if err != "":
click.echo(f"Error updating anomaly params for KPI {new_kpi.name}: {err}")
return 1

# we ensure anomaly task is run as soon as analytics is configured
# we also run RCA at the same time
# TODO: move this import to top and fix import issue
from chaos_genius.jobs.anomaly_tasks import ready_anomaly_task, ready_rca_task

anomaly_task = None
if has_anomaly_setup:
anomaly_task = ready_anomaly_task(new_kpi.id)

rca_task = ready_rca_task(new_kpi.id)
if rca_task is None:
click.echo(
"Could not run RCA task since newly configured KPI "
f"({new_kpi.name}) was not found: {new_kpi.id}"
)
else:
if anomaly_task is None:
click.echo(
"Not running anomaly since it is not configured or KPI "
f"({new_kpi.name}) was not found."
)
else:
anomaly_task.apply_async()
rca_task.apply_async()
kpi, err, _ = add_kpi(data, validate=True, run_analytics=True)

if err != "":
click.echo(click.style(
f"Error in KPI ({kpi.name}): {err}",
fg="red",
bold=True
))

except Exception as e:
click.echo(click.style(
Expand Down
123 changes: 116 additions & 7 deletions chaos_genius/controllers/kpi_controller.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,132 @@
import logging
import typing
from datetime import date, datetime, timedelta
from typing import Optional, Union, Iterator
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

from flask import current_app # noqa: F401
from flask import current_app

from chaos_genius.controllers.dashboard_controller import create_dashboard_kpi_mapper # noqa: F401
from chaos_genius.controllers.task_monitor import checkpoint_failure, checkpoint_success
from chaos_genius.core.anomaly.controller import AnomalyDetectionController
from chaos_genius.core.rca.rca_controller import RootCauseAnalysisController
from chaos_genius.core.utils.data_loader import DataLoader
from chaos_genius.core.utils.kpi_validation import validate_kpi
from chaos_genius.databases.models.kpi_model import Kpi
from chaos_genius.settings import DAYS_OFFSET_FOR_ANALTYICS, MAX_DEEPDRILLS_SLACK_DAYS

from chaos_genius.settings import (
MAX_DEEPDRILLS_SLACK_DAYS,
DAYS_OFFSET_FOR_ANALTYICS,
)
logger = logging.getLogger(__name__)


logger = logging.getLogger(__name__)
def add_kpi(
data: Dict[str, Any],
validate=True,
run_analytics=True
) -> Tuple[Kpi, str, bool]:
"""Adds a new KPI.
Also handles adding the KPI to the default dashboard, running analytics and
validations.
If "anomaly_params" is present in data, it also performs anomaly params validation
and runs anomaly.
Arguments:
data: the KPI data. Field names are same as the KPI model.
validate: whether to perform KPI validation. (Default: True).
run_rca: whether to start RCA task after adding. (Default: True).
Returns:
A tuple of (newly added KPI, error message - empty if success, boolean
indicating whether error was critical)
"""
data["dimensions"] = [] if data.get("dimensions") is None else data["dimensions"]

data["kpi_query"] = _kpi_query_strip_trailing_semicolon(
(data.get("kpi_query", "") or "")
)

has_anomaly_setup = "anomaly_params" in data
new_anomaly_params = {}

new_kpi = Kpi(
name=data.get("name"),
is_certified=data.get("is_certified"),
data_source=data.get("data_source"),
kpi_type=data.get("dataset_type") or data.get("kpi_type"),
kpi_query=data.get("kpi_query"),
schema_name=data.get("schema_name"),
table_name=data.get("table_name"),
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
run_anomaly=data.get("run_anomaly", True),
)

if has_anomaly_setup:
from chaos_genius.views.anomaly_data_view import validate_partial_anomaly_params

# validate anomaly params
err, new_anomaly_params = validate_partial_anomaly_params(
data["anomaly_params"]
)
if err != "":
return (
new_kpi,
f"Error in validating anomaly params for KPI {data['name']}: {err}",
True,
)

if validate:
# Perform KPI Validation
status, message = validate_kpi(new_kpi.as_dict)
if status is not True:
return new_kpi, message, True

new_kpi.save(commit=True)

# Add KPI to dashboard 0 and all required dashboards
_add_kpi_to_dashboards(new_kpi.id, data.get("dashboard", []) + [0])

if has_anomaly_setup:
from chaos_genius.views.anomaly_data_view import update_anomaly_params

# update anomaly params
err, new_kpi = update_anomaly_params(
new_kpi, new_anomaly_params, check_editable=False
)

if err != "":
return (
new_kpi,
f"Error updating anomaly params for KPI {new_kpi.name}: {err}",
True
)

if run_analytics:
# we ensure analytics tasks are run as soon as analytics is configured
from chaos_genius.jobs.anomaly_tasks import queue_kpi_analytics
queue_kpi_analytics(new_kpi.id, has_anomaly_setup)

return new_kpi, "", False


def _kpi_query_strip_trailing_semicolon(query: str) -> str:
if not query:
return ""

query = query.strip()

# remove trailing semicolon
if query[-1] == ";":
query = query[:-1]
return query


def _add_kpi_to_dashboards(kpi_id: int, dashboards: List[int]):
dashboards = list(set(dashboards))
create_dashboard_kpi_mapper(dashboards, [kpi_id])


def _is_data_present_for_end_date(
Expand Down
26 changes: 26 additions & 0 deletions chaos_genius/jobs/anomaly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,29 @@ def anomaly_scheduler():
g = group(task_group)
res = g.apply_async()
return res


def queue_kpi_analytics(kpi_id: int, run_anomaly=True):
"""Adds analytics tasks to the queue for given KPI."""
anomaly_task = None
if run_anomaly:
anomaly_task = ready_anomaly_task(kpi_id)

# run rca as soon as new KPI is added
rca_task = ready_rca_task(kpi_id)
if rca_task is None:
logger.error(
"Could not run RCA task since newly added KPI was not found: %s",
kpi_id,
)
else:
rca_task.apply_async()

if anomaly_task is None:
logger.error(
"Not running anomaly since it is not configured or KPI "
"(%d) was not found.",
kpi_id,
)
else:
anomaly_task.apply_async()
66 changes: 14 additions & 52 deletions chaos_genius/views/kpi_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import traceback # noqa: F401
from datetime import date, datetime, timedelta
from typing import cast

from flask import ( # noqa: F401
Blueprint,
Expand All @@ -24,7 +25,7 @@
from chaos_genius.databases.models.rca_data_model import RcaData
from chaos_genius.extensions import cache, db
from chaos_genius.databases.db_utils import chech_editable_field
from chaos_genius.controllers.kpi_controller import get_kpi_data_from_id
from chaos_genius.controllers.kpi_controller import add_kpi, get_kpi_data_from_id
from chaos_genius.controllers.dashboard_controller import (
create_dashboard_kpi_mapper,
get_mapper_obj_by_dashboard_ids,
Expand All @@ -46,67 +47,28 @@


@blueprint.route("", methods=["GET", "POST"])
@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this
@blueprint.route("/", methods=["GET", "POST"]) # TODO: Remove this
def kpi():
"""kpi list view."""
"""KPI add and list view."""
# Handle logging in
if request.method == "POST":
if not request.is_json:
return jsonify({"error": "The request payload is not in JSON format"})

data = request.get_json()
data["dimensions"] = [] if data["dimensions"] is None else data["dimensions"]

if data.get("kpi_query", "").strip():
data["kpi_query"] = data["kpi_query"].strip()
# remove trailing semicolon
if data["kpi_query"][-1] == ";":
data["kpi_query"] = data["kpi_query"][:-1]

new_kpi = Kpi(
name=data.get("name"),
is_certified=data.get("is_certified"),
data_source=data.get("data_source"),
kpi_type=data.get("dataset_type"),
kpi_query=data.get("kpi_query"),
schema_name=data.get("schema_name"),
table_name=data.get("table_name"),
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
)
# Perform KPI Validation
status, message = validate_kpi(new_kpi.as_dict)
if status is not True:
return jsonify(
{"error": message, "status": "failure", "is_critical": "true"}
)

new_kpi.save(commit=True)

# Add the dashboard id 0 to the kpi
dashboard_list = data.get("dashboard", []) + [0]
dashboard_list = list(set(dashboard_list))
mapper_obj_list = create_dashboard_kpi_mapper(dashboard_list, [new_kpi.id])
if data is None:
return jsonify({"error": "The request payload is not in JSON format"})

# TODO: Fix circular import error
from chaos_genius.jobs.anomaly_tasks import ready_rca_task
kpi, err_msg, critical = add_kpi(data)

# run rca as soon as new KPI is added
rca_task = ready_rca_task(new_kpi.id)
if rca_task is None:
print(
f"Could not run RCA task since newly added KPI was not found: {new_kpi.id}"
)
else:
rca_task.apply_async()
if err_msg != "":
ret = {"error": err_msg, "status": "failure"}
if critical:
ret["is_critical"] = "true"
return jsonify(ret)

return jsonify(
{
"data": {"kpi_id": new_kpi.id},
"message": f"KPI {new_kpi.name} has been created successfully.",
"data": {"kpi_id": kpi.id},
"message": f"KPI {kpi.name} has been created successfully.",
"status": "success",
}
)
Expand Down

0 comments on commit e562d84

Please # to comment.