Skip to content

Commit

Permalink
Benchmark dependencies (ServiceNow#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
gasse authored and qipeng committed Nov 20, 2024
1 parent eee8010 commit 8166163
Show file tree
Hide file tree
Showing 6 changed files with 1,068 additions and 818 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import fnmatch
import logging
import typing
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass, field
from typing import Literal, Optional

Expand All @@ -10,7 +12,12 @@
from browsergym.core.action.highlevel import HighLevelActionSet
from browsergym.experiments.loop import EnvArgs

from .metadata.utils import task_list_from_metadata
from .metadata.utils import (
build_env_args_dependency_graphs,
build_full_task_dependency_graph_from_metadata,
extract_sparse_task_dependency_graph_from_subset,
task_list_from_metadata,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,6 +62,7 @@ class Benchmark(DataClassJsonMixin):
name: str
high_level_action_set_args: HighLevelActionSetArgs
is_multi_tab: bool
supports_parallel_seeds: bool
env_args_list: list[EnvArgs]
backends: list[BenchmarkBackend]
task_metadata: Optional[pd.DataFrame] = field(
Expand Down Expand Up @@ -135,7 +143,7 @@ def subset_from_split(self, split: Literal["train", "valid", "test"]):
split_column = "browsergym_split"

# check for a split column in metadata
if not split_column in self.task_metadata.columns:
if split_column not in self.task_metadata.columns:
raise NotImplementedError(
f"This benchmark does not provide default train/valid/test splits (missing a {repr(split_column)} column in task metadata)"
)
Expand Down Expand Up @@ -164,6 +172,7 @@ def subset_from_regexp(self, column, regexp):
name=f"{self.name}[{column}=/{regexp}/]",
high_level_action_set_args=self.high_level_action_set_args,
is_multi_tab=self.is_multi_tab,
supports_parallel_seeds=self.supports_parallel_seeds,
backends=self.backends,
env_args_list=[
env_args
Expand All @@ -172,3 +181,39 @@ def subset_from_regexp(self, column, regexp):
],
task_metadata=self.task_metadata,
)

def dependency_graph_over_tasks(self) -> dict[str, list[str]]:
# recover all unique task_names present in the benchmark
task_names = list(set([env_args.task_name for env_args in self.env_args_list]))

# if "depends_on" column is missing, raise a warning and deal with it
# (we don't want the "depends_on" column to be mandatory)
if "depends_on" not in self.task_metadata.columns:
logger.warning(
f'This benchmark does not provide a dependency graph (missing a "depends_on" column in task metadata). Assuming no task dependencies.'
)
zero_dependencies = {task_name: [] for task_name in task_names}
return zero_dependencies

# recover the task dependency graph, for tasks in the benchmark only
task_dependencies = extract_sparse_task_dependency_graph_from_subset(
task_subset=task_names,
parents=build_full_task_dependency_graph_from_metadata(
task_metadata=self.task_metadata
),
)

return task_dependencies

def dependency_graphs_over_env_args(self) -> list[dict[str, list[str]]]:
"""
Returns a list of dependency graphs to be executed sequentially, typically with a full instance reset in-between.
Ideally, a job scheduler should connect these graphs by injecting a reset task in-between each, which depends on all previous tasks being completed.
"""
task_dependencies = self.dependency_graph_over_tasks()
env_args_dependencies = build_env_args_dependency_graphs(
env_args_list=self.env_args_list,
task_dependencies=task_dependencies,
supports_parallel_seeds=self.supports_parallel_seeds,
)
return env_args_dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
name="miniwob",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["miniwob_all"],
is_multi_tab=False,
supports_parallel_seeds=True,
backends=["miniwob"],
env_args_list=make_env_args_list_from_repeat_tasks(
task_list=task_list_from_metadata(metadata=task_metadata("miniwob")),
Expand All @@ -107,6 +108,7 @@
name="miniwob_tiny_test",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["miniwob_all"],
is_multi_tab=False,
supports_parallel_seeds=True,
backends=["miniwob"],
env_args_list=make_env_args_list_from_repeat_tasks(
task_list=["miniwob.click-dialog", "miniwob.click-checkboxes"],
Expand All @@ -120,6 +122,7 @@
name="webarena",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["webarena"],
is_multi_tab=True,
supports_parallel_seeds=False,
backends=["webarena"],
env_args_list=make_env_args_list_from_repeat_tasks(
task_list=task_list_from_metadata(metadata=task_metadata("webarena")),
Expand All @@ -133,6 +136,7 @@
name="visualwebarena",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["visualwebarena"],
is_multi_tab=True,
supports_parallel_seeds=False,
backends=["visualwebarena"],
env_args_list=make_env_args_list_from_repeat_tasks(
task_list=task_list_from_metadata(metadata=task_metadata("visualwebarena")),
Expand All @@ -146,6 +150,7 @@
name="workarena_l1",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["workarena"],
is_multi_tab=False,
supports_parallel_seeds=True,
backends=["workarena"],
env_args_list=make_env_args_list_from_workarena_curriculum(
level="l1",
Expand All @@ -161,6 +166,7 @@
name="workarena_l2_agent_curriculum_eval",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["workarena++"],
is_multi_tab=True,
supports_parallel_seeds=True,
backends=["workarena"],
env_args_list=make_env_args_list_from_workarena_curriculum(
level="l2",
Expand All @@ -175,6 +181,7 @@
name="workarena_l3_agent_curriculum_eval",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["workarena++"],
is_multi_tab=True,
supports_parallel_seeds=True,
backends=["workarena"],
env_args_list=make_env_args_list_from_workarena_curriculum(
level="l3",
Expand All @@ -189,6 +196,7 @@
name="assistantbench",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["assistantbench"],
is_multi_tab=True,
supports_parallel_seeds=True,
backends=["assistantbench"],
env_args_list=make_env_args_list_from_repeat_tasks(
task_list=task_list_from_metadata(
Expand All @@ -204,6 +212,7 @@
name="weblinx",
high_level_action_set_args=DEFAULT_HIGHLEVEL_ACTION_SET_ARGS["assistantbench"],
is_multi_tab=True,
supports_parallel_seeds=True,
backends=["weblinx"],
env_args_list=make_env_args_list_from_repeat_tasks(
task_list=task_list_from_metadata(metadata=task_metadata("weblinx")),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import io
import pkgutil
from collections import defaultdict
from copy import deepcopy
from typing import Any, Optional

import pandas as pd

from browsergym.experiments.loop import EnvArgs


def task_metadata(benchmark_name: str):
return task_metadata_from_csv(
Expand All @@ -22,3 +27,128 @@ def task_list_from_metadata(metadata: pd.DataFrame, filter: dict[str, str] = {})
df = df[col_filter]
# return only the task names
return list(df["task_name"])


def build_full_task_dependency_graph_from_metadata(
task_metadata: pd.DataFrame,
) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
if "depends_on" not in task_metadata.columns:
raise ValueError(
'task metadata column "depends_on" is required to compute task dependencies.'
)

graph_parents = {
task_name: depends_on.split()
for task_name, depends_on in zip(task_metadata["task_name"], task_metadata["depends_on"])
}

return graph_parents


def extract_graph_children(parents: dict[Any, list[Any]]):
# start with empty children
children = {task_name: [] for task_name in parents.keys()}
# build children sets iteratively
for child, parents in parents.items():
for parent in parents:
children[parent].append(child)
return children


def extract_sparse_task_dependency_graph_from_subset(
task_subset: list[str],
parents: dict[str, list[str]],
children: Optional[dict[str, list[str]]] = None,
return_children: bool = False,
) -> tuple[dict[str, list[str]], dict[str, list[str]]]:

if children is None:
children = extract_graph_children(parents)

# consistency check
assert all([task in parents for task in task_subset])
assert all([task in children for task in task_subset])

# copy the graph
subgraph_parents = deepcopy(parents)
subgraph_children = deepcopy(children)

# prune the graph (node contraction)
for task_name in parents.keys():
# if task is not present in the target task subset, drop its node
if task_name not in task_subset:
# connect node's children to node's parents
for child in subgraph_children[task_name]:
subgraph_parents[child].remove(task_name)
subgraph_parents[child].extend(subgraph_parents[task_name])
# connect node's parents to node's children
for parent in subgraph_parents[task_name]:
subgraph_children[parent].remove(task_name)
subgraph_children[parent].extend(subgraph_children[task_name])
# remove node
del subgraph_parents[task_name]
del subgraph_children[task_name]

# return parent and (optionally) children mappings
if return_children:
return subgraph_parents, subgraph_children

return subgraph_parents


def build_env_args_dependency_graphs(
env_args_list: list[EnvArgs],
task_dependencies: dict[str, list[str]],
supports_parallel_seeds: bool,
) -> list[dict[int, list[int]]]:
"""
Returns a list of dependency graphs to be executed sequentially, typically with a full instance reset in-between.
Ideally, a job scheduler should connect these graphs by injecting a reset task in-between each, which depends on all previous tasks being completed.
"""
# build mapping from tasks to run subsets (task_name -> list of env_args_list indices)
task_runs = defaultdict(list)
for i, env_args in enumerate(env_args_list):
task_runs[env_args.task_name].append(i)
task_runs = dict(task_runs)

# consistency check
assert all([task in task_dependencies for task in task_runs.keys()])

# divide same-task runs into distinct splits if needed
task_runs_splits = []
if supports_parallel_seeds:
# if parallel task runs over seeds is supported, do not split
task_runs_splits.append(task_runs)
else:
# else, split runs so that each task has only one run (seed) in each split
while task_runs:
# extract first task run only (one seed per task)
split = {task_name: [runs.pop(0)] for task_name, runs in task_runs.items()}
task_runs_splits.append(split)
# update task list to only those with remaining runs (seeds)
task_runs = {task_name: runs for task_name, runs in task_runs.items() if runs}

# recover the parent and child mappings of the task dependency graph
task_parents = task_dependencies
task_children = extract_graph_children(task_parents)

# for each split, build the task dependency subgraph (task_name nodes), then build the run dependency graph (env_args index nodes)
run_parents_split = []
for split in task_runs_splits:
# build the task dependency graph (task_name nodes)
split_task_names = list(split.keys())
split_task_parents = extract_sparse_task_dependency_graph_from_subset(
task_subset=split_task_names,
parents=task_parents,
children=task_children,
)
# then, build the run dependency graph (env_args index nodes)
split_run_parents = defaultdict(list)
for task_name, runs in split.items():
parent_tasks_runs = sum([split[parent] for parent in split_task_parents[task_name]], [])
for run in runs:
split_run_parents[run].extend(parent_tasks_runs)
split_run_parents = dict(split_run_parents)
run_parents_split.append(split_run_parents)

return run_parents_split
Loading

0 comments on commit 8166163

Please # to comment.