Skip to content

Commit

Permalink
Merge pull request #11 from airflow-laminar/tkp/config
Browse files Browse the repository at this point in the history
integrate with airflow-config
  • Loading branch information
timkpaine authored Aug 8, 2024
2 parents b4fd71c + deb598c commit 41c9219
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 19 deletions.
17 changes: 17 additions & 0 deletions airflow_priority/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
from pathlib import Path
from types import MappingProxyType
from typing import Literal, Optional, Tuple

Expand Down Expand Up @@ -29,6 +31,21 @@ class AirflowPriorityConfigurationOptionNotFound(RuntimeError): ...


def get_config_option(section, key, required=True, default=None):
try:
try:
from airflow_config import ConfigNotFoundError, Configuration

config = Configuration.load("config", "config", basepath=str(Path(os.environ.get("AIRFLOW_HOME", "")) / "dags"), _offset=4)
ret = getattr(getattr(config.extensions.get("priority", None), section, None), key, None)
if ret is not None:
return ret
except ConfigNotFoundError:
# SKIP
pass
except ImportError:
# SKIP
pass

import airflow.configuration

config_option = airflow.configuration.conf.get(f"priority.{section}", key, default)
Expand Down
6 changes: 3 additions & 3 deletions airflow_priority/plugins/datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from datadog_api_client.v2.model.metric_resource import MetricResource
from datadog_api_client.v2.model.metric_series import MetricSeries

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = (
"send_metric_datadog",
Expand Down Expand Up @@ -87,10 +87,10 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):

try:
# Call once to ensure plugin will work
get_configuration()
get_config_option("datadog", "api_key")

class DatadogPriorityPlugin(AirflowPlugin):
name = "DatadogPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:
_log.exception("Plugin could not be enabled")
7 changes: 4 additions & 3 deletions airflow_priority/plugins/discord.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from airflow.plugins_manager import AirflowPlugin
from discord import Client, Intents

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = ("get_client", "send_metric_discord", "on_dag_run_failed", "DiscordPriorityPlugin")

Expand Down Expand Up @@ -67,10 +67,11 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):

try:
# Call once to ensure plugin will work
get_client()
get_config_option("discord", "channel")
get_config_option("discord", "token")

class DiscordPriorityPlugin(AirflowPlugin):
name = "DiscordPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:
_log.exception("Plugin could not be enabled")
7 changes: 3 additions & 4 deletions airflow_priority/plugins/newrelic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.plugins_manager import AirflowPlugin
from newrelic_telemetry_sdk import GaugeMetric, MetricClient

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = (
"send_metric_newrelic",
Expand Down Expand Up @@ -55,11 +55,10 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):

try:
# Call once to ensure plugin will work
get_client()
get_config_option("newrelic", "api_key")

class NewRelicPriorityPlugin(AirflowPlugin):
name = "NewRelicPriorityPlugin"
listeners = [sys.modules[__name__]]

except Exception:
except AirflowPriorityConfigurationOptionNotFound:
_log.exception("Plugin could not be enabled")
7 changes: 4 additions & 3 deletions airflow_priority/plugins/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.plugins_manager import AirflowPlugin
from slack_sdk import WebClient

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = ("get_client", "get_channel_id", "send_metric_slack", "on_dag_run_failed", "SlackPriorityPlugin")

Expand Down Expand Up @@ -58,10 +58,11 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):

try:
# Call once to ensure plugin will work
get_client()
get_config_option("slack", "token")
get_config_option("slack", "channel")

class SlackPriorityPlugin(AirflowPlugin):
name = "SlackPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:
_log.exception("Plugin could not be enabled")
6 changes: 3 additions & 3 deletions airflow_priority/plugins/symphony.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import ssl
import sys
from functools import lru_cache
from httpx import post
from logging import getLogger

from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.plugins_manager import AirflowPlugin
from httpx import post

from airflow_priority import DagStatus, get_config_option, has_priority_tag
from airflow_priority import AirflowPriorityConfigurationOptionNotFound, DagStatus, get_config_option, has_priority_tag

__all__ = ("get_config_options", "get_headers", "get_room_id", "send_metric_symphony", "on_dag_run_failed", "SymphonyPriorityPlugin")

Expand Down Expand Up @@ -103,5 +103,5 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
class SymphonyPriorityPlugin(AirflowPlugin):
name = "SymphonyPriorityPlugin"
listeners = [sys.modules[__name__]]
except Exception:
except AirflowPriorityConfigurationOptionNotFound:
_log.exception("Plugin could not be enabled")
2 changes: 1 addition & 1 deletion airflow_priority/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def airflow_config():
airflow.configuration.load_standard_airflow_configuration(airflow_config_parser)
airflow_config_parser.validate()
airflow.configuration.conf = airflow_config_parser
yield
yield str(Path(td))


@pytest.fixture(scope="function", autouse=True)
Expand Down
47 changes: 47 additions & 0 deletions airflow_priority/tests/test_airflow_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pathlib import Path

conf_text = """# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultArgs
owner: test
extensions:
priority:
_target_: airflow_config.PriorityConfiguration
slack:
_target_: airflow_config.SlackConfiguration
token: abc
channel: def
"""


def test_config_loading_via_airflow_config(airflow_config, dag_run):
dags_path = Path(airflow_config) / "dags"
conf_path = dags_path / "config"
conf_path.mkdir(parents=True, exist_ok=True)
conf_file = conf_path / "config.yaml"
conf_file.write_text(conf_text)

from airflow_config import Configuration

config = Configuration.load("config", "config", basepath=str(dags_path), _offset=4)
ret = getattr(getattr(config.extensions.get("priority", None), "slack", None), "token", None)
assert ret == "abc"


def test_call_plugin_via_airflow_config(airflow_config, dag_run):
dags_path = Path(airflow_config) / "dags"
conf_path = dags_path / "config"
conf_path.mkdir(parents=True, exist_ok=True)
conf_file = conf_path / "config.yaml"
conf_file.write_text(conf_text)
from airflow_priority.plugins.slack import get_client

client = get_client()
assert client is not None
assert client.token == "abc"
# with patch("airflow_priority.plugins.slack.send_metric_slack") as p2, \
# patch("airflow_priority.plugins.slack.send_metric_slack") as p1:
# on_dag_run_failed(dag_run, "test")

# assert p1.call_args == []
4 changes: 4 additions & 0 deletions airflow_priority/tests/test_datadog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("DATADOG_API_KEY") is None, reason="Datadog key not set")
def test_datadog_send(airflow_config, dag_run):
from airflow_priority.plugins.datadog import send_metric_datadog

Expand Down
4 changes: 4 additions & 0 deletions airflow_priority/tests/test_discord.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("DISCORD_TOKEN") is None, reason="Discord token not set")
def test_discord_send(airflow_config, dag_run):
from airflow_priority.plugins.discord import send_metric_discord

Expand Down
4 changes: 4 additions & 0 deletions airflow_priority/tests/test_newrelic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("NEWRELIC_API_KEY") is None, reason="New relic token not set")
def test_newrelic_send(airflow_config, dag_run):
from airflow_priority.plugins.newrelic import send_metric_newrelic

Expand Down
7 changes: 6 additions & 1 deletion airflow_priority/tests/test_slack.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import os
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("SLACK_TOKEN") is None, reason="Slack token token not set")
def test_slack_send(airflow_config, dag_run):
from airflow_priority.plugins.slack import send_metric_slack
from airflow_priority.plugins.slack import get_client, send_metric_slack

get_client.cache_clear()
send_metric_slack("UNIT TEST", 1, "BEEN TESTED")


Expand Down
3 changes: 2 additions & 1 deletion airflow_priority/tests/test_symphony.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import pytest
from unittest.mock import patch

import pytest


@pytest.mark.skipif(os.environ.get("SYMPHONY_ROOM_NAME", "") == "", reason="no symphony credentials")
def test_symphony_send(airflow_config, dag_run):
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ develop = [
"httpx",
"newrelic-telemetry-sdk",
"slack-sdk",
# Config
"airflow-config>=0.1.2",
]
aws = [
"boto3",
"botocore",
]
config = [
"airflow-config>=0.1.2",
]
datadog = [
"datadog-api-client",
]
Expand Down

0 comments on commit 41c9219

Please # to comment.