Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Operator runner improvements #128

Merged
merged 7 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/CSET/operators/RECIPES/extract_instant_air_temp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ steps:
- operator: filters.filter_cubes
constraint:
operator: constraints.combine_constraints
constraint:
stash_constraint:
operator: constraints.generate_stash_constraint
stash: m01s03i236
cell_methods_constraint:
Expand Down
16 changes: 11 additions & 5 deletions src/CSET/operators/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def execute_recipe(
"""

def step_parser(step: dict, step_input: any, output_file_path: Path) -> str:
"""Executes a recipe step, recursively executing any sub-steps."""
logging.debug(f"Executing step: {step}")
kwargs = {}
for key in step.keys():
Expand All @@ -105,10 +106,8 @@ def step_parser(step: dict, step_input: any, output_file_path: Path) -> str:
kwargs[key] = output_file_path
else:
kwargs[key] = step[key]

logging.debug(f"args: {kwargs}")
logging.debug(f"step_input: {step_input}")

# If first argument of operator is explicitly defined, use that rather
# than step_input. This is known through introspection of the operator.
first_arg = next(iter(inspect.signature(operator).parameters.keys()))
Expand All @@ -126,18 +125,25 @@ def step_parser(step: dict, step_input: any, output_file_path: Path) -> str:
except ruamel.yaml.error.YAMLStreamError:
raise TypeError("Must provide a stream (with a read method)")

# Checking that the recipe actually has some steps, and providing helpful
# error messages otherwise.
logging.debug(recipe)
step_input = input_file
try:
if len(recipe["steps"]) < 1:
raise ValueError("Recipe must have at least 1 step.")
for step in recipe["steps"]:
step_input = step_parser(step, step_input, output_file)
except KeyError as err:
raise ValueError("Invalid Recipe:", err)
except TypeError as err:
if recipe is None:
raise ValueError("Recipe must have at least 1 step.")
if type(recipe) != dict:
raise ValueError("Recipe must either be YAML, or a Path.")
# This should never be reached.
raise err # pragma: no cover

# Execute the recipe.
step_input = input_file
for step in recipe["steps"]:
step_input = step_parser(step, step_input, output_file)

logging.info(f"Recipe output: {step_input}")
17 changes: 11 additions & 6 deletions src/CSET/operators/constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import datetime


def generate_stash_constraint(stash: str) -> iris.AttributeConstraint:
def generate_stash_constraint(stash: str, **kwargs) -> iris.AttributeConstraint:
"""
Operator that takes a stash string, and uses iris to generate a constraint
to be passed into the read operator to minimize the CubeList the read
Expand All @@ -43,7 +43,7 @@ def generate_stash_constraint(stash: str) -> iris.AttributeConstraint:
return stash_constraint


def generate_var_constraint(varname: str) -> iris.Constraint:
def generate_var_constraint(varname: str, **kwargs) -> iris.Constraint:
"""
Operator that takes a CF compliant variable name string, and uses iris to
generate a constraint to be passed into the read operator to minimize the
Expand All @@ -63,7 +63,7 @@ def generate_var_constraint(varname: str) -> iris.Constraint:
return varname_constraint


def generate_cell_methods_constraint(cell_methods: list) -> iris.Constraint:
def generate_cell_methods_constraint(cell_methods: list, **kwargs) -> iris.Constraint:
"""
Operator that takes a list of cell methods and generates a constraint from
that.
Expand All @@ -89,7 +89,7 @@ def check_cell_methods(cube: iris.cube.Cube):


def generate_time_constraint(
time_start: str, time_end: str = None
time_start: str, time_end: str = None, **kwargs
) -> iris.AttributeConstraint:
"""
Operator that takes one or two ISO 8601 date strings, and returns a
Expand Down Expand Up @@ -119,7 +119,7 @@ def generate_time_constraint(


def combine_constraints(
constraint: iris.Constraint = iris.Constraint(), **kwargs
constraint: iris.Constraint = None, **kwargs
) -> iris.Constraint:
"""
Operator that combines multiple constraints into one.
Expand All @@ -144,8 +144,13 @@ def combine_constraints(
TypeError
If the provided arguments are not constraints.
"""
# If the first argument is not a constraint, it is ignored. This handles the
# automatic passing of the previous step's output.
if isinstance(constraint, iris.Constraint):
combined_constraint = constraint
else:
combined_constraint = iris.Constraint()

combined_constraint = constraint
for constr in kwargs.values():
combined_constraint = combined_constraint & constr
return combined_constraint
2 changes: 1 addition & 1 deletion src/CSET/operators/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


def filter_cubes(
cubelist: iris.cube.CubeList, constraint: iris.Constraint
cubelist: iris.cube.CubeList, constraint: iris.Constraint, **kwargs
) -> iris.cube.Cube:
"""
Filters a cubelist down to a single cube based on a constraint.
Expand Down
2 changes: 1 addition & 1 deletion src/CSET/operators/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import matplotlib.pyplot as plt


def spatial_contour_plot(cube: iris.cube.Cube, file_path: Path) -> Path:
def spatial_contour_plot(cube: iris.cube.Cube, file_path: Path, **kwargs) -> Path:
"""
Plots a spatial variable onto a map.

Expand Down
2 changes: 1 addition & 1 deletion src/CSET/operators/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


def read_cubes(
loadpath: Path, constraint: iris.Constraint = None
loadpath: Path, constraint: iris.Constraint = None, **kwargs
) -> iris.cube.CubeList:
"""
Read operator that takes a path string (can include wildcards), and uses
Expand Down
2 changes: 1 addition & 1 deletion src/CSET/operators/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


def write_cube_to_nc(
cube: Union[iris.cube.Cube, iris.cube.CubeList], file_path: Path
cube: Union[iris.cube.Cube, iris.cube.CubeList], file_path: Path, **kwargs
) -> str:
"""
A write operator that sits after the read operator. This operator expects
Expand Down
2 changes: 2 additions & 0 deletions tests/test_data/noop_recipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ steps:
- operator: misc.noop
test_argument: Banana
dict_argument: {"key": "value"}
substep:
operator: constraints.combine_constraints
7 changes: 5 additions & 2 deletions tests/test_data/plot_instant_air_temp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ steps:
- operator: filters.filter_cubes
constraint:
operator: constraints.combine_constraints
constraint:
stash_constraint:
operator: constraints.generate_stash_constraint
stash: m01s03i236
cell_methods_constraint:
Expand All @@ -23,6 +23,9 @@ steps:
time_start: 2022-09-21T03:00:00
time_end: 2022-09-21T03:30:00

- operator: plot.spatial_contour_plot
- operator: write.write_cube_to_nc
# This is a magic value that becomes the runtime output file path.
file_path: CSET_OUTPUT_PATH
plot_data_as_well:
operator: plot.spatial_contour_plot
file_path: CSET_OUTPUT_PATH
8 changes: 8 additions & 0 deletions tests/test_recipe_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def test_execute_recipe():
exception_happened = True
assert exception_happened

# Test exception for recipe that parses to a non-dict.
exception_happened = False
try:
internal.execute_recipe("[]", os.devnull, os.devnull)
except ValueError:
exception_happened = True
assert exception_happened

# Test happy case (this is really an integration test).
output_file = Path(f"{tempfile.gettempdir()}/{uuid4()}.nc")
recipe_file = RECIPES.extract_instant_air_temp
Expand Down