Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: improve handling of optional dependencies #213

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 17 additions & 29 deletions analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.ipynb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "0dc37683",
"metadata": {},
Expand All @@ -25,7 +24,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "404927fe",
"metadata": {},
Expand All @@ -37,7 +35,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bd72d323",
"metadata": {},
Expand Down Expand Up @@ -65,8 +62,6 @@
"from coffea.nanoevents import NanoAODSchema\n",
"from coffea.analysis_tools import PackedSelection\n",
"import copy\n",
"from func_adl import ObjectStream\n",
"from func_adl_servicex import ServiceXSourceUpROOT\n",
"import hist\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
Expand All @@ -78,7 +73,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "cd573bb1",
"metadata": {},
Expand Down Expand Up @@ -135,7 +129,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "141d6520",
"metadata": {},
Expand Down Expand Up @@ -407,7 +400,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "90dd4c9e",
"metadata": {},
Expand Down Expand Up @@ -454,7 +446,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "b910d3d5",
"metadata": {},
Expand All @@ -473,7 +464,7 @@
},
"outputs": [],
"source": [
"def get_query(source: ObjectStream) -> ObjectStream:\n",
"def get_query(source):\n",
" \"\"\"Query for event / column selection: >=4j >=1b, ==1 lep with pT>30 GeV + additional cuts,\n",
" return relevant columns\n",
" *NOTE* jet pT cut is set lower to account for systematic variations to jet pT\n",
Expand Down Expand Up @@ -554,7 +545,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d8f08fc1",
"metadata": {},
Expand All @@ -574,6 +564,12 @@
"outputs": [],
"source": [
"if USE_SERVICEX:\n",
" try:\n",
" from func_adl_servicex import ServiceXSourceUpROOT\n",
" except ImportError:\n",
" print(\"cannot import func_adl_servicex, which is a required dependency when using ServiceX\")\n",
" raise\n",
"\n",
" # dummy dataset on which to generate the query\n",
" dummy_ds = ServiceXSourceUpROOT(\"cernopendata://dummy\", \"Events\", backend_name=\"uproot\")\n",
"\n",
Expand All @@ -597,7 +593,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c28a9e49",
"metadata": {},
Expand Down Expand Up @@ -649,10 +644,10 @@
" executor = processor.FuturesExecutor(workers=utils.config[\"benchmarking\"][\"NUM_CORES\"])\n",
"\n",
"run = processor.Runner(\n",
" executor=executor, \n",
" schema=NanoAODSchema, \n",
" savemetrics=True, \n",
" metadata_cache={}, \n",
" executor=executor,\n",
" schema=NanoAODSchema,\n",
" savemetrics=True,\n",
" metadata_cache={},\n",
" chunksize=utils.config[\"benchmarking\"][\"CHUNKSIZE\"])\n",
"\n",
"if USE_SERVICEX:\n",
Expand All @@ -670,8 +665,8 @@
"t0 = time.monotonic()\n",
"# processing\n",
"all_histograms, metrics = run(\n",
" fileset, \n",
" treename, \n",
" fileset,\n",
" treename,\n",
" processor_instance=TtbarAnalysis(USE_INFERENCE, USE_TRITON)\n",
")\n",
"exec_time = time.monotonic() - t0\n",
Expand Down Expand Up @@ -701,7 +696,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d7bb4428",
"metadata": {},
Expand Down Expand Up @@ -732,6 +726,8 @@
}
],
"source": [
"import utils.plotting # noqa: E402\n",
"\n",
"utils.plotting.set_style()\n",
"\n",
"all_histograms[\"hist_dict\"][\"4j1b\"][120j::hist.rebin(2), :, \"nominal\"].stack(\"process\")[::-1].plot(stack=True, histtype=\"fill\", linewidth=1, edgecolor=\"grey\")\n",
Expand Down Expand Up @@ -767,7 +763,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bed3df8b",
"metadata": {},
Expand Down Expand Up @@ -884,7 +879,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7c334dd3",
"metadata": {},
Expand Down Expand Up @@ -917,7 +911,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "e904cd3c",
"metadata": {},
Expand All @@ -941,6 +934,8 @@
},
"outputs": [],
"source": [
"import utils.rebinning # noqa: E402\n",
"\n",
"cabinetry_config = cabinetry.configuration.load(\"cabinetry_config.yml\")\n",
"\n",
"# rebinning: lower edge 110 GeV, merge bins 2->1\n",
Expand All @@ -952,7 +947,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "f36dc601",
"metadata": {},
Expand Down Expand Up @@ -1000,7 +994,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c74e4361",
"metadata": {},
Expand Down Expand Up @@ -1038,7 +1031,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bd480eec",
"metadata": {},
Expand Down Expand Up @@ -1069,7 +1061,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "3a293479",
"metadata": {},
Expand Down Expand Up @@ -1125,7 +1116,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "14dc4b23",
"metadata": {},
Expand Down Expand Up @@ -1173,7 +1163,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7f60c316",
"metadata": {},
Expand Down Expand Up @@ -1317,7 +1306,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "a2ce2d14",
"metadata": {},
Expand Down
28 changes: 18 additions & 10 deletions analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.14.7
# jupytext_version: 1.15.2
# kernelspec:
# display_name: Python 3 (ipykernel)
# language: python
Expand Down Expand Up @@ -52,8 +52,6 @@
from coffea.nanoevents import NanoAODSchema
from coffea.analysis_tools import PackedSelection
import copy
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
import hist
import matplotlib.pyplot as plt
import numpy as np
Expand Down Expand Up @@ -388,7 +386,7 @@ def postprocess(self, accumulator):
# Define the func_adl query to be used for the purpose of extracting columns and filtering.

# %%
def get_query(source: ObjectStream) -> ObjectStream:
def get_query(source):
"""Query for event / column selection: >=4j >=1b, ==1 lep with pT>30 GeV + additional cuts,
return relevant columns
*NOTE* jet pT cut is set lower to account for systematic variations to jet pT
Expand Down Expand Up @@ -475,6 +473,12 @@ def get_query(source: ObjectStream) -> ObjectStream:

# %%
if USE_SERVICEX:
try:
from func_adl_servicex import ServiceXSourceUpROOT
except ImportError:
print("cannot import func_adl_servicex, which is a required dependency when using ServiceX")
raise

# dummy dataset on which to generate the query
dummy_ds = ServiceXSourceUpROOT("cernopendata://dummy", "Events", backend_name="uproot")

Expand Down Expand Up @@ -512,10 +516,10 @@ def get_query(source: ObjectStream) -> ObjectStream:
executor = processor.FuturesExecutor(workers=utils.config["benchmarking"]["NUM_CORES"])

run = processor.Runner(
executor=executor,
schema=NanoAODSchema,
savemetrics=True,
metadata_cache={},
executor=executor,
schema=NanoAODSchema,
savemetrics=True,
metadata_cache={},
chunksize=utils.config["benchmarking"]["CHUNKSIZE"])

if USE_SERVICEX:
Expand All @@ -533,8 +537,8 @@ def get_query(source: ObjectStream) -> ObjectStream:
t0 = time.monotonic()
# processing
all_histograms, metrics = run(
fileset,
treename,
fileset,
treename,
processor_instance=TtbarAnalysis(USE_INFERENCE, USE_TRITON)
)
exec_time = time.monotonic() - t0
Expand All @@ -552,6 +556,8 @@ def get_query(source: ObjectStream) -> ObjectStream:
# We built histograms in two phase space regions, for multiple physics processes and systematic variations.

# %%
import utils.plotting # noqa: E402

utils.plotting.set_style()

all_histograms["hist_dict"]["4j1b"][120j::hist.rebin(2), :, "nominal"].stack("process")[::-1].plot(stack=True, histtype="fill", linewidth=1, edgecolor="grey")
Expand Down Expand Up @@ -644,6 +650,8 @@ def get_query(source: ObjectStream) -> ObjectStream:
# We will use `cabinetry` to combine all histograms into a `pyhf` workspace and fit the resulting statistical model to the pseudodata we built.

# %%
import utils.rebinning # noqa: E402

cabinetry_config = cabinetry.configuration.load("cabinetry_config.yml")

# rebinning: lower edge 110 GeV, merge bins 2->1
Expand Down
4 changes: 3 additions & 1 deletion analyses/cms-open-data-ttbar/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
from . import metrics as metrics
from . import ml as ml
from . import plotting as plotting
from . import rebinning as rebinning
from . import systematics as systematics


# to avoid issues: only import submodules if dependencies are present on worker nodes too
11 changes: 10 additions & 1 deletion analyses/cms-open-data-ttbar/utils/file_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@
import numpy as np
import os
from pathlib import Path
from servicex import ServiceXDataset
import tqdm
import urllib


try:
from servicex import ServiceXDataset
except ImportError:
# if servicex is not available, ServiceXDatasetGroup cannot be used
# this is fine for worker nodes: only needed where main notebook is executed
pass


# If local_data_cache is a writable path, this function will download any missing file into it and
# then return file paths corresponding to these local copies.
def construct_fileset(n_files_max_per_sample, use_xcache=False, af_name="", local_data_cache=None, input_from_eos=False, xcache_atlas_prefix=None):
Expand Down Expand Up @@ -112,6 +120,7 @@ def download_file(url, out_file):
with tqdm.tqdm(unit='B', unit_scale=True, unit_divisor=1024, miniters=1, desc=out_path.name) as t:
urllib.request.urlretrieve(url, out_path.absolute(), reporthook=tqdm_urlretrieve_hook(t))


class ServiceXDatasetGroup():
def __init__(self, fileset, backend_name="uproot", ignore_cache=False):
self.fileset = fileset
Expand Down