Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[TRIS-722] Custom Tagging #2144

Open
wants to merge 69 commits into
base: master
Choose a base branch
from

Conversation

RikishK
Copy link

@RikishK RikishK commented Nov 13, 2024

Options to add custom tags are in the order:

environment variable dictionary,
step functions cli create flag,
batch decorator variable dictionary

This PR builds ontop of this PR: #1628 - this original PR did not work when testing but I would like to credit it here as some of the changes are still used.

@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch 6 times, most recently from 2810093 to 0d45ad5 Compare November 21, 2024 05:13
@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch from 0d45ad5 to 4c5cebf Compare November 22, 2024 02:38
@RikishK
Copy link
Author

RikishK commented Nov 25, 2024

Testing

Setup:

Environment variable:

{ name: 'METAFLOW_BATCH_EMIT_TAGS', value: 'True' },
{ name: 'METAFLOW_BATCH_DEFAULT_TAGS', value: '{"default_greeting": "default_world"}' }

Step functions create command:

step-functions create --aws-tags "hello=world"

Flow:

from metaflow import (
    FlowSpec,
    step,
    batch,
    retry,
    schedule,
    project,
    conda,
    current
)
import logging
import logging_setup

logging_setup.configure()


custom_step_tags = {
    "goodbye": "world",
    "hello": "universe",
   #"inv@lid": "t@g"
    }

@project(name="ml_apac_trisolaris")
@schedule(hourly=True)
class CanarySimpleAdhoc(FlowSpec):
    @step
    def start(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info(f"Canary Hello")
        self.next(self.hello)

    @batch(cpu=1, memory=500, tags=custom_step_tags)
    @retry
    @step
    def hello(self):
        self.logger.info("Canary Hello World in Batch!")
        self.next(self.end)

    @step
    def end(self):
        self.logger.info("HelloAWS is finished.")


if __name__ == "__main__":
    CanarySimpleAdhoc()

Output/Result:

Start step batch tags:
image

Hello step batch tags:
image

Additional notes:
Tag validation was tested with "inv@lid": "t@g" which caused an error as expected.

Copy link
Collaborator

@saikonen saikonen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notes for testing, the config variable

METAFLOW_BATCH_EMIT_TAGS=True 

needs to be set in order for tags to have any effect (even validation).

For a better UX, there could be a check somewhere in the path that if aws tags are being set, but emit tags is set to false, we print a warning on this.

Validation could also maybe be run despite the emit_tags feature flag, in order to catch issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add similar logic that is present in f.ex. the kubernetes_decorator.py#runtime_step_cli which json.dumps some attributes to the cli_args.commands, namely the new aws_tags in this case.

otherwise the passed in value will be not handled correctly in the CLI

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example replacing the cli_args.commands.update(self.attributes) with the following should work:

# dictionaries need to be json dumped for the CLI
            json_types = ["aws_tags"]
            for k, v in self.attributes.items():
                if k in json_types:
                    cli_args.command[k] = json.dumps(v)
                else:
                    cli_args.command_options[k] = v

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also for the batch_cli.py side, the --aws_tags option needs to be

@click.option("--aws-tags", multiple=False, type=JSONTypeClass(), default=None, help="AWS tags.")

in order to convert types correctly. note the multiple=False also, was there a reason to have the plural option act as multiple?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wanted it to be a multiple arg so --aws-tags can be used like: --aws-tags "hello=world" --aws-tags "something=else"

This makes it a bit easier to generate the command based on specs. But I'm open to changing if it makes it easier to setup.

Comment on lines 332 to 340

if not isinstance(BATCH_DEFAULT_TAGS, dict):
raise BatchException(
"The BATCH_DEFAULT_TAGS config option must be a dictionary of key-value tags."
)

for name, value in BATCH_DEFAULT_TAGS.items():
aws_tag = {'key': name, 'value': value}
validate_aws_tag(aws_tag)
job.tag(name, value)

if step_function_tags is not None:
aws_tags_list = []
for tag in step_function_tags:
key_value = tag.split("=", 1)
if len(key_value) == 2:
aws_tags_list.append({
'key': key_value[0],
'value': key_value[1]
})
for tag in aws_tags_list:
validate_aws_tag(tag)
job.tag(tag['key'], tag['value'])

if aws_tags is not None:
if not isinstance(aws_tags, dict):
raise BatchException("tags must be a dictionary of key-value tags.")
decorator_aws_tags_list = [
{'key': key,
'value': val} for key, val in aws_tags.items()
]
for tag in decorator_aws_tags_list:
validate_aws_tag(tag)
job.tag(tag['key'], tag['value'])

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't test this part thoroughly, but there is one significant caveat with running validations in the batch setup phase, namely: when validation fails, the metadata for the run/step/task have already been registered in this case, leading to a failed task with no logs.

An alternative approach with a better UX would be to perform batch tag validation inside the batch_decorator.py#step_init, which will happen before job submission. This allows for erroring out fairly quickly due to malformed tags.

Is there any issue with moving the validation and the BATCH_DEFAULT_TAGS usage into the decorator completely?

@saikonen
Copy link
Collaborator

saikonen commented Feb 4, 2025

also can you verify after the changes that the following flows run as expected? both with batch, and step-functions

expected to fail

from metaflow import step, FlowSpec, batch

class BatchTags(FlowSpec):
    @batch(aws_tags={"inv@lid": "t@g"})
    @step
    def start(self):
        print("Starting 👋")
        self.next(self.end)

    @step
    def end(self):
        print("Done! 🏁")


if __name__ == "__main__":
    BatchTags()

expected to run

from metaflow import (
    FlowSpec,
    step,
    batch,
)

custom_step_tags = {
    "goodbye": "world",
    "hello": "universe",
    }

class BatchTags2(FlowSpec):
    @step
    def start(self):
        self.next(self.hello)

    @batch(aws_tags=custom_step_tags)
    @step
    def hello(self):
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == "__main__":
    BatchTags2()

Comment on lines 187 to 220
if self.attributes["aws_tags"] is not None:
if not isinstance(self.attributes["aws_tags"], dict) and not all(isinstance(k, str) and isinstance(v, str) for k, v in self.attributes["aws_tags"].items()):
raise BatchException("aws_tags must be Dict[str, str]")
decorator_aws_tags_list = [
{'key': key,
'value': val} for key, val in self.attributes["aws_tags"].items()
]
for tag in decorator_aws_tags_list:
validate_aws_tag(tag)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move these under step_init instead to get cleaner formatting for exceptions. Unless there is some restriction that requires them to be in __init__

from ..aws_utils import (
compute_resource_attributes,
get_docker_registry,
get_ec2_instance_metadata,
)
from .batch import BatchException
from metaflow.tagging_util import validate_tags, validate_aws_tag
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_tags is unused.

Comment on lines 338 to 341
for name, value in BATCH_DEFAULT_TAGS.items():
aws_tag = {'key': name, 'value': value}
validate_aws_tag(aws_tag)
job.tag(name, value)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging the default values from BATCH_DEFAULT_TAGS could be moved into the batch_decorator. This way we have a single source to read tags from, and validation can be performed there as well.

Comment on lines 191 to 192
aws_tags=None,
step_function_aws_tags=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the step_function_aws_tags? as a comparison, the way custom labels/annotations are implemented for argo workflows&kubernetes we made a choice to treat the kubernetes decorator as the single source of truth.

right now it seems that step-functions create --aws-tags "test=tag" will only apply the tags to the batch jobs spun up by the state machine. There already exists a mechanism for applying tags to those without the new CLI option though, so keeping things simple might be appropriate at this point.

# tagging all steps
python test.py --with batch:aws_tags='{"test":"tag"}' step-functions create

# or set the environment variable
METAFLOWBATCH_DEFAULT_TAGS='{"envtag":"testvalue"}' python test.py step-functions create

# tagging a specific step, simply add a batch deco to the step before deploying
@batch(aws_tags={"tracking":"test_step"})

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing the step-function level --aws-tags option would reduce possible confusion about the scope of the option. One might think that it would apply tags to the state machine as well, which at the moment is not the case.

My suggestion is that we should keep this PR scoped to tagging batch jobs and revisit the need to tag other AWS resources as required, unless you already have a need for that?
Candidates for tagging would include the state machine, and eventbridge rules, but deciding how to configure these on the Metaflow side is nontrivial

@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch 2 times, most recently from bdf3930 to 2040a7c Compare February 16, 2025 23:25
Copy link
Collaborator

@saikonen saikonen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened a stacked PR in your fork to address some of the low hanging fruit. zendesk#8

@@ -71,6 +73,9 @@ class BatchDecorator(StepDecorator):
A swappiness value of 0 causes swapping not to happen unless absolutely
necessary. A swappiness value of 100 causes pages to be swapped very
aggressively. Accepted values are whole numbers between 0 and 100.
aws_batch_tags: Dict[str, str], optional
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling this aws_batch_tags seems a bit excessive given that we're inside the Batch decorator. Could it simply be aws_tags as before, or was there some issue with that?

self.attributes["aws_batch_tags"] = BATCH_DEFAULT_TAGS

if self.attributes["aws_batch_tags"] is not None:
if not isinstance(self.attributes["aws_tags"], dict) and not all(isinstance(k, str) and isinstance(v, str) for k, v in self.attributes["aws_batch_tags"].items()):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be "aws_batch_tags" in the check. now its throwing an error.

raise BatchException("aws_batch_tags must be Dict[str, str]")

batch_default_tags_copy = BATCH_DEFAULT_TAGS.copy()
self.attributes["aws_batch_tags"] = batch_default_tags_copy.update(self.attributes["aws_batch_tags"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is leading to the aws_batch_tags being None in the default case and failing in the following dict comprehension. Can it be simplified into

self.attributes["aws_batch_tags"] = {
    **BATCH_DEFAULT_TAGS,
    **self.attributes["aws_batch_tags"]
}

pending ordering. Should env take precedence over decorator values, or the other way around?

batch_default_tags_copy = BATCH_DEFAULT_TAGS.copy()
self.attributes["aws_batch_tags"] = batch_default_tags_copy.update(self.attributes["aws_batch_tags"])

decorator_aws_tags_list = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this conversion needed? would a dict not suffice?

@@ -179,6 +185,26 @@ def __init__(self, attributes=None, statically_defined=False):
if self.attributes["trainium"] is not None:
self.attributes["inferentia"] = self.attributes["trainium"]

if not isinstance(BATCH_DEFAULT_TAGS, dict) and not all(instance(k, str) and isinstance(v, str) for k, v in BATCH_DEFAULT_TAGS.items()):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isinstance, not instance

@@ -29,6 +30,12 @@ def is_utf8_decodable(x):
# How long may an individual tag value be
MAX_TAG_SIZE = 500

def validate_aws_tag(tag):
validate_tags([tag['key'], tag['value']])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_tags is for metaflow specific metadata tags validation. Some of the rules, like max tag set size probably do not apply to aws tags. Does it make sense to reuse the metaflow tag validation here, or can we do without?

It makes more sense to me that the AWS tag validation be kept completely separate

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this the reason that the tags were turned into a more complex type?

@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch 6 times, most recently from ddcfefb to 49793ff Compare February 24, 2025 23:25
@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch 5 times, most recently from f3da850 to b42e360 Compare February 25, 2025 03:22
@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch from b42e360 to 1442ecf Compare February 25, 2025 03:59
@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch from a6faf9a to e791a6d Compare March 10, 2025 05:33
@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch from e791a6d to 7b89b81 Compare March 10, 2025 05:48
@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch from 61ac55b to 25eae34 Compare March 11, 2025 05:40
@RikishK RikishK force-pushed the fsat/pr-1627--rebased--2.12.28 branch from 25eae34 to 475e282 Compare March 11, 2025 23:25
@RikishK
Copy link
Author

RikishK commented Mar 12, 2025

Testing

Flow:

Here we set the decorator level tags. NOTE: the invalid tag is commented out for this test

from metaflow import (
    FlowSpec,
    step,
    batch,
    retry,
    schedule,
    project,
    conda,
    current
)
import logging
from common import (
    logging_setup
)

logging_setup.configure()


custom_step_tags = {
    "goodbye": "world",
    "hello": "universe",
    "batch_decorator_tag": "True",
    #"inv@lid": "t@g"
}

custom_step_tags_v2 = {
    "something": "else",
    "batch_decorator_tag": "True"
}

@project(name="ml_apac_trisolaris")
@schedule(hourly=True)
class CanarySimpleAdhoc(FlowSpec):
    @step
    def start(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info(f"Canary Hello")
        self.next(self.hello)

    @batch(cpu=1, memory=500, aws_batch_tags=custom_step_tags)
    @retry
    @step
    def hello(self):
        self.logger.info("Canary Hello World in Batch!")
        self.next(self.goodbye)

    @batch(cpu=1, memory=500, aws_batch_tags=custom_step_tags_v2)
    @retry
    @step
    def goodbye(self):
        self.logger.info("Canary Goodbye World in Batch!")
        self.next(self.end)


    @step
    def end(self):
        self.logger.info("HelloAWS is finished.")


if __name__ == "__main__":
    CanarySimpleAdhoc()

Step functions creation command

Here we set the step functions cli level tags

poetry run python ${pipeline_full_path} --with retry --datastore=s3 step-functions create --aws-batch-tags "hello=world" --aws-batch-tags "cli-tag=true"

Env vars tags

Here we set the environment variables level tags

{ name: 'METAFLOW_BATCH_DEFAULT_TAGS', value: '{"default_greeting": "default_world", "env_var_aws_tag": "True"}' }

Expectations

We should see env vars tags and step functions cli tags being set on batch jobs. Batch jobs with decorator tags will apply their decorator tags last and override env vars and step functions cli tags.

Output Batch Jobs

Start Step:

image

Hello Step

image

Goodbye step

image

End step

image

@RikishK
Copy link
Author

RikishK commented Mar 12, 2025

Testing Invalid Tags

Testing with the same flow as previous test but with the invalid tag enabled.

Flow

import logging
from common import (
    logging_setup
)

logging_setup.configure()


custom_step_tags = {
    "goodbye": "world",
    "hello": "universe",
    "batch_decorator_tag": "True",
    "inv@lid": "t@g"
}

custom_step_tags_v2 = {
    "something": "else",
    "batch_decorator_tag": "True"
}

@project(name="ml_apac_trisolaris")
@schedule(hourly=True)
class CanarySimpleAdhoc(FlowSpec):
    @step
    def start(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info(f"Canary Hello")
        self.next(self.hello)

    @batch(cpu=1, memory=500, aws_batch_tags=custom_step_tags)
    @retry
    @step
    def hello(self):
        self.logger.info("Canary Hello World in Batch!")
        self.next(self.goodbye)

    @batch(cpu=1, memory=500, aws_batch_tags=custom_step_tags_v2)
    @retry
    @step
    def goodbye(self):
        self.logger.info("Canary Goodbye World in Batch!")
        self.next(self.end)


    @step
    def end(self):
        self.logger.info("HelloAWS is finished.")


if __name__ == "__main__":
    CanarySimpleAdhoc()

Output of trying to deploy stepfunction:

"app":"WaitForEnvoy","commit":"","level":"info","msg":"WaitForEnvoy starting up","pid":1,"time":"2025-03-12T03:56:12Z","version":"v1.1.1"}
[pipeline-deploy] Start
[pipeline-deploy] Run Environment [staging]
[pipeline-deploy] AWS_REGION [us-east-1]
[pipeline-deploy] AWS_DEFAULT_REGION [us-east-1]
[pipeline-deploy] Setting USERNAME environment variable to [staging]
[pipeline-deploy] Deploying [canary/canary_simple_adhoc.py] - full path [/app/canary/canary_simple_adhoc.py]
+ poetry run python /app/canary/canary_simple_adhoc.py --with retry --datastore=s3 step-functions create --aws-batch-tags hello=world --aws-batch-tags cli-tag=true
Skipping virtualenv creation, as specified in config file.
Metaflow 2.12.30 executing CanarySimpleAdhoc for user:staging
Project: ml_apac_trisolaris, Branch: user.staging
    Tagging error:
    Tags must match pattern: ^[\w\s._-]+$

BATCH_DEFAULT_TAGS: {'default_greeting': 'default_world', 'env_var_aws_tag': 'True'}
aws_batch_tags: {'goodbye': 'world', 'hello': 'universe', 'batch_decorator_tag': 'True', 'inv@lid': 't@g'}
Generating aws compatible list. Old aws_batch_tags: {'default_greeting': 'default_world', 'env_var_aws_tag': 'True', 'goodbye': 'world', 'hello': 'universe', 'batch_decorator_tag': 'True', 'inv@lid': 't@g'}, new generated list: [{'key': 'default_greeting', 'value': 'default_world'}, {'key': 'env_var_aws_tag', 'value': 'True'}, {'key': 'goodbye', 'value': 'world'}, {'key': 'hello', 'value': 'universe'}, {'key': 'batch_decorator_tag', 'value': 'True'}, {'key': 'inv@lid', 'value': 't@g'}]
BATCH_DEFAULT_TAGS: {'default_greeting': 'default_world', 'env_var_aws_tag': 'True'}
aws_batch_tags: {'something': 'else', 'batch_decorator_tag': 'True'}
Generating aws compatible list. Old aws_batch_tags: {'default_greeting': 'default_world', 'env_var_aws_tag': 'True', 'something': 'else', 'batch_decorator_tag': 'True'}, new generated list: [{'key': 'default_greeting', 'value': 'default_world'}, {'key': 'env_var_aws_tag', 'value': 'True'}, {'key': 'something', 'value': 'else'}, {'key': 'batch_decorator_tag', 'value': 'True'}]

@savingoyal savingoyal self-requested a review March 27, 2025 16:06
@joecorcoran
Copy link

Love this PR – we could really benefit from it. Is there anything we can do to help get it merged? ❤️

@@ -97,6 +98,12 @@ def step_functions(obj, name=None):
"with the given tag. You can specify this option multiple "
"times to attach multiple tags.",
)
@click.option(
"--aws-batch-tags",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are to keep this, the flag itself should be singular in my opinion as it is used to set one tag at a time

--aws-batch-tag some=tag --aws-batch-tag another=tag

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants