Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: Added support for sending custom datadog metrics from querybook #1390

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
26 changes: 26 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,32 @@ services:
timeout: 30s
retries: 3

#
# OPTIONAL SERVICES
# Start separately if you need them via `docker compose up <service>`
#

datadog:
image: gcr.io/datadoghq/agent:latest
ports:
- 8125:8125/udp
expose:
- 8125/udp
environment:
DD_DOGSTATSD_NON_LOCAL_TRAFFIC: true
DD_USE_DOGSTATSD: true
DD_APM_ENABLED: false
DD_AC_EXCLUDE: '.*'
DD_ENV: 'dev'
DD_SERVICE: 'querybook'
DD_VERSION: 'latest'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /proc/:/host/proc/:ro
- /sys/fs/cgroup:/host/sys/fs/cgroup:ro
profiles:
- datadog

# EMAIL SERVER EXAMPLE
# If you need email to work use this
# dockerhostforward:
Expand Down
7 changes: 7 additions & 0 deletions querybook/server/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,10 @@ class QuerybookSettings(object):
VECTOR_STORE_CONFIG = get_env_config("VECTOR_STORE_CONFIG") or {}
EMBEDDINGS_PROVIDER = get_env_config("EMBEDDINGS_PROVIDER")
EMBEDDINGS_CONFIG = get_env_config("EMBEDDINGS_CONFIG") or {}

# Datadog
DD_AGENT_HOST = get_env_config("DD_AGENT_HOST", optional=True)
DD_DOGSTATSD_PORT = int(get_env_config("DD_DOGSTATSD_PORT", optional=True) or 8125)
DD_PREFIX = get_env_config("DD_PREFIX", optional=True)
DD_SERVICE = get_env_config("DD_SERVICE", optional=True) or "querybook"
DD_TAGS = get_env_config("DD_TAGS", optional=True) or []
27 changes: 27 additions & 0 deletions querybook/server/lib/celery/celery_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import time
import threading
from lib.stats_logger import stats_logger, ACTIVE_WORKERS, ACTIVE_TASKS


def send_stats_logger_metrics(celery):
while True:
i = celery.control.inspect()

active = i.active() or {}

active_workers = list(active.keys())
active_tasks = 0
for worker in active_workers:
if worker in active:
active_tasks += len(active[worker])

stats_logger.gauge(ACTIVE_WORKERS, len(active_workers))
stats_logger.gauge(ACTIVE_TASKS, active_tasks)
time.sleep(5)


def start_stats_logger_monitor(celery):
thread = threading.Thread(
target=send_stats_logger_metrics, args=[celery], daemon=True
)
thread.start()
4 changes: 4 additions & 0 deletions querybook/server/lib/stats_logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
WS_CONNECTIONS = "ws.connections"
SQL_SESSION_FAILURES = "sql_session.failures"
TASK_FAILURES = "task.failures"
TASK_SUCCESSES = "task.successes"
TASK_RECEIVED = "task.received"
REDIS_OPERATIONS = "redis.operations"
QUERY_EXECUTIONS = "query.executions"
ACTIVE_WORKERS = "celery.active_workers"
ACTIVE_TASKS = "celery.active_tasks"


logger_name = QuerybookSettings.STATS_LOGGER_NAME
Expand Down
12 changes: 11 additions & 1 deletion querybook/server/lib/stats_logger/all_stats_loggers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from lib.utils.import_helper import import_module_with_default

from .loggers.datadog_stats_logger import DatadogStatsLogger
from .loggers.null_stats_logger import NullStatsLogger
from .loggers.console_stats_logger import ConsoleStatsLogger

Expand All @@ -8,11 +10,19 @@
default=[],
)

ALL_STATS_LOGGERS = [NullStatsLogger(), ConsoleStatsLogger()] + ALL_PLUGIN_STATS_LOGGERS
ALL_STATS_LOGGERS = [
NullStatsLogger(),
ConsoleStatsLogger(),
DatadogStatsLogger(),
] + ALL_PLUGIN_STATS_LOGGERS


def get_stats_logger_class(name: str):
for logger in ALL_STATS_LOGGERS:
if logger.logger_name == name:
if hasattr(logger, "initialize") and callable(
getattr(logger, "initialize")
):
logger.initialize()
return logger
raise ValueError(f"Unknown stats logger name {name}")
93 changes: 93 additions & 0 deletions querybook/server/lib/stats_logger/loggers/datadog_stats_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from lib.stats_logger.base_stats_logger import BaseStatsLogger
from env import QuerybookSettings
from lib.logger import get_logger

LOG = get_logger(__file__)


class DatadogStatsLogger(BaseStatsLogger):
"""
Stats Logger implemention for Datadog using DogStatsD.

Required environment variables:
- DD_API_KEY: The API key for Datadog.
- DD_AGENT_HOST: The host of the Datadog agent.
- DD_DOGSTATSD_PORT: The port of the Datadog agent, defaults to 8125.
- DD_SERVICE: The service name.

Optional environment variables:
- DD_PREFIX: The prefix for all metrics.
- DD_TAGS: Additional tags to be added to all metrics.
"""

metric_prefix = ""
dd_tags = []
_statsd = None

def metric_prefix_helper(self, key):
return self.metric_prefix + "." + key

def tag_helper(self, tags):
if tags:
return [f"{k}:{v}" for k, v in tags.items()]
return []

def initialize(self):
try:
from datadog import initialize, statsd

self._statsd = statsd
except ImportError:
raise ImportError(
"Datadog is not installed. Please install `requirements/datadog/datadog.txt` to use the Datadog stats logger."
)

if QuerybookSettings.DD_AGENT_HOST and QuerybookSettings.DD_DOGSTATSD_PORT:
LOG.info("Initializing Datadog")

self.dd_tags = (
QuerybookSettings.DD_TAGS.split(",")
if QuerybookSettings.DD_TAGS
else []
)
self.metric_prefix = (
QuerybookSettings.DD_PREFIX or QuerybookSettings.DD_SERVICE
)

options = {
"statsd_host": QuerybookSettings.DD_AGENT_HOST,
"statsd_port": QuerybookSettings.DD_DOGSTATSD_PORT,
"statsd_constant_tags": self.dd_tags,
}

initialize(**options)
else:
LOG.info("Datadog environment variables are not set")

@property
def logger_name(self) -> str:
return "datadog"

def incr(self, key: str, tags: dict[str, str] = None) -> None:
self._statsd.increment(
self.metric_prefix_helper(key),
1,
tags=self.tag_helper(tags),
)

def decr(self, key: str, tags: dict[str, str] = None) -> None:
self._statsd.decrement(
self.metric_prefix_helper(key),
1,
tags=self.tag_helper(tags),
)

def timing(self, key: str, value: float, tags: dict[str, str] = None) -> None:
self._statsd.histogram(
self.metric_prefix_helper(key), value, tags=self.tag_helper(tags)
)

def gauge(self, key: str, value: float, tags: dict[str, str] = None) -> None:
self._statsd.gauge(
self.metric_prefix_helper(key), value, tags=self.tag_helper(tags)
)
41 changes: 36 additions & 5 deletions querybook/server/tasks/all_tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
from celery.signals import celeryd_init, task_failure
from celery.signals import (
celeryd_init,
task_failure,
task_success,
task_received,
beat_init,
)
from celery.utils.log import get_task_logger
from importlib import import_module

from app.flask_app import celery
from env import QuerybookSettings
from lib.logger import get_logger
from lib.stats_logger import TASK_FAILURES, stats_logger
from logic.schedule import get_schedule_task_type
from lib.stats_logger import (
TASK_FAILURES,
TASK_SUCCESSES,
TASK_RECEIVED,
stats_logger,
)
from lib.celery.celery_stats import start_stats_logger_monitor

from .export_query_execution import export_query_execution_task
from .run_query import run_query_task
Expand All @@ -20,6 +31,7 @@
from .presto_hive_function_scrapper import presto_hive_function_scrapper
from .db_clean_up_jobs import run_all_db_clean_up_jobs
from .disable_scheduled_docs import disable_scheduled_docs
from logic.schedule import get_schedule_task_type

LOG = get_logger(__file__)

Expand Down Expand Up @@ -60,5 +72,24 @@ def configure_workers(sender=None, conf=None, **kwargs):

@task_failure.connect
def handle_task_failure(sender, signal, *args, **kwargs):
task_type = get_schedule_task_type(sender.name)
stats_logger.incr(TASK_FAILURES, tags={"task_type": task_type})
tags = {"task_name": sender.name, "task_type": get_schedule_task_type(sender.name)}
stats_logger.incr(TASK_FAILURES, tags=tags)


@task_success.connect
def handle_task_success(sender, signal, *args, **kwargs):
tags = {"task_name": sender.name, "task_type": get_schedule_task_type(sender.name)}
stats_logger.incr(TASK_SUCCESSES, tags=tags)


@task_received.connect
def handle_task_received(sender, signal, *args, **kwargs):
tags = {"task_name": kwargs["request"].name}
tags["task_type"] = get_schedule_task_type(tags["task_name"])
stats_logger.incr(TASK_RECEIVED, tags=tags)


@beat_init.connect
def start_celery_stats_logging(sender=None, conf=None, **kwargs):
LOG.info("Starting Celery Beat")
start_stats_logger_monitor(celery)
2 changes: 2 additions & 0 deletions requirements/datadog/datadog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
datadog==0.47.0
ddtrace==2.0.2
Loading