Skip to content

Commit

Permalink
758 Bufferization pipe
Browse files Browse the repository at this point in the history
#758

[author: andLaing]

Moves some of the pipe from buffy to a compound component in components so it
can be used by the parametrized detsim city (under development).

Also adds a couple of protections to the buffer functions so that they are safe
for both cities.

[reviewer: mmkekic]

This PR adapts bufferization functions and extract the pipe that creates buffers
from the buffy flow allowing it to be reused in the detsim city as
well (#691,#700). Tests are added to show that the functions work both with
pandas and numpy inputs. Thanks for the effort!
  • Loading branch information
mmkekic authored and carmenromo committed Nov 27, 2020
2 parents 7191327 + 35a1c31 commit 6ef9a98
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 64 deletions.
79 changes: 30 additions & 49 deletions invisible_cities/cities/buffy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,23 @@
from typing import Callable
from typing import List

from .. core import system_of_units as units
from .. detsim.buffer_functions import buffer_calculator
from .. detsim.sensor_utils import first_and_last_times
from .. detsim.sensor_utils import get_n_sensors
from .. detsim.sensor_utils import sensor_order
from .. detsim.sensor_utils import pmt_and_sipm_bin_width
from .. detsim.sensor_utils import trigger_times
from .. io .event_filter_io import event_filter_writer
from .. io .rwf_io import buffer_writer
from .. reco import tbl_functions as tbl

from .. dataflow import dataflow as fl

from . components import city
from . components import collect
from . components import copy_mc_info
from . components import mcsensors_from_file
from . components import print_every
from . components import signal_finder
from . components import wf_binner
from .. core import system_of_units as units
from .. detsim.sensor_utils import first_and_last_times
from .. detsim.sensor_utils import get_n_sensors
from .. detsim.sensor_utils import sensor_order
from .. detsim.sensor_utils import pmt_and_sipm_bin_width
from .. io .event_filter_io import event_filter_writer
from .. reco import tbl_functions as tbl

from .. dataflow import dataflow as fl

from . components import city
from . components import collect
from . components import copy_mc_info
from . components import calculate_and_save_buffers
from . components import mcsensors_from_file
from . components import print_every
from . components import wf_binner


@city
Expand All @@ -68,22 +65,6 @@ def buffy(files_in , file_out , compression , event_range,
args = ("sipm_resp", "min_time", "max_time"),
out = ("sipm_bins", "sipm_bin_wfs") )

find_signal = fl.map(signal_finder(buffer_length, pmt_wid,
trigger_threshold ),
args = "pmt_bin_wfs" ,
out = "pulses" )

event_times = fl.map(trigger_times ,
args = ("pulses", "timestamp", "pmt_bins"),
out = "evt_times" )

calculate_buffers = fl.map(buffer_calculator(buffer_length, pre_trigger,
pmt_wid , sipm_wid),
args = ("pulses",
"pmt_bins" , "pmt_bin_wfs",
"sipm_bins", "sipm_bin_wfs") ,
out = "buffers" )

order_sensors = fl.map(sensor_order(detector_db, run_number,
nsamp_pmt , nsamp_sipm) ,
args = ("pmt_bin_wfs", "sipm_bin_wfs",
Expand All @@ -96,14 +77,18 @@ def buffy(files_in , file_out , compression , event_range,
events_with_resp = fl.count_filter(bool, args="event_passed")

with tb.open_file(file_out, "w", filters=tbl.filters(compression)) as h5out:
buffer_writer_ = fl.sink(buffer_writer(h5out ,
run_number = run_number,
n_sens_eng = npmt ,
n_sens_trk = nsipm ,
length_eng = nsamp_pmt ,
length_trk = nsamp_sipm),
args = ("event_number", "evt_times" ,
"ordered_buffers" ))
buffer_calculation = calculate_and_save_buffers(buffer_length ,
pre_trigger ,
pmt_wid ,
sipm_wid ,
trigger_threshold,
h5out ,
run_number ,
npmt ,
nsipm ,
nsamp_pmt ,
nsamp_sipm ,
order_sensors )

write_filter = fl.sink(event_filter_writer(h5out, "detected_events"),
args=("event_number", "event_passed") )
Expand All @@ -125,13 +110,9 @@ def buffy(files_in , file_out , compression , event_range,
extract_tminmax ,
bin_pmt_wf ,
bin_sipm_wf ,
find_signal ,
event_times ,
calculate_buffers ,
order_sensors ,
fl.branch("event_number" ,
evtnum_collect.sink),
buffer_writer_ ) ,
buffer_calculation ) ,
result = dict(events_in = event_count_in.future ,
events_resp = events_with_resp.future,
evtnum_list = evtnum_collect.future ))
Expand Down
51 changes: 51 additions & 0 deletions invisible_cities/cities/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .. core .configure import event_range_help
from .. core .random_sampling import NoiseSampler
from .. detsim import buffer_functions as bf
from .. detsim .sensor_utils import trigger_times
from .. reco import calib_functions as cf
from .. reco import sensor_functions as sf
from .. reco import calib_sensors_functions as csf
Expand All @@ -59,6 +60,7 @@
from .. io .dst_io import load_dst
from .. io .event_filter_io import event_filter_writer
from .. io .pmaps_io import pmap_writer
from .. io .rwf_io import buffer_writer
from .. types .ic_types import xy
from .. types .ic_types import NN
from .. types .ic_types import NNN
Expand Down Expand Up @@ -826,3 +828,52 @@ def compute_and_write_pmaps(detector_db, run_number, pmt_samp_wid, sipm_samp_wid
compute_pmaps = pipe(*filter(None, fn_list))

return compute_pmaps, empty_indices, empty_pmaps


def calculate_and_save_buffers(buffer_length : float ,
pre_trigger : float ,
pmt_wid : float ,
sipm_wid : float ,
trigger_threshold: int ,
h5out : tb.File ,
run_number : int ,
npmt : int ,
nsipm : int ,
nsamp_pmt : int ,
nsamp_sipm : int ,
order_sensors : Callable=None):
find_signal = fl.map(signal_finder(buffer_length, pmt_wid,
trigger_threshold ),
args = "pmt_bin_wfs" ,
out = "pulses" )

event_times = fl.map(trigger_times ,
args = ("pulses", "timestamp", "pmt_bins"),
out = "evt_times" )

calculate_buffers = fl.map(bf.buffer_calculator(buffer_length, pre_trigger,
pmt_wid , sipm_wid),
args = ("pulses",
"pmt_bins" , "pmt_bin_wfs",
"sipm_bins", "sipm_bin_wfs") ,
out = "buffers" )

saved_buffers = "buffers" if order_sensors is None else "ordered_buffers"
buffer_writer_ = sink(buffer_writer(h5out ,
run_number = run_number,
n_sens_eng = npmt ,
n_sens_trk = nsipm ,
length_eng = nsamp_pmt ,
length_trk = nsamp_sipm),
args = ("event_number", "evt_times" ,
saved_buffers ))

fn_list = (find_signal ,
event_times ,
calculate_buffers,
order_sensors ,
buffer_writer_ )

# Filter out order_sensors if it is not set
buffer_definition = pipe(*filter(None, fn_list))
return buffer_definition
38 changes: 23 additions & 15 deletions invisible_cities/detsim/buffer_functions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import numpy as np
import pandas as pd

from typing import Callable
from typing import List
from typing import Tuple
from typing import Callable
from typing import List
from typing import Tuple
from typing import Union

from .. reco.peak_functions import indices_and_wf_above_threshold
from .. reco.peak_functions import split_in_peaks
Expand Down Expand Up @@ -41,14 +42,17 @@ def bin_sensors(sensors : pd.DataFrame,

## !! to-do: clarify for non-pmt versions of next
## !! to-do: Check on integral instead of only threshold?
def find_signal_start(wfs : pd.Series,
bin_threshold: float ,
stand_off : int ) -> List[int]:
def find_signal_start(wfs : Union[pd.Series, np.ndarray],
bin_threshold: float ,
stand_off : int ) -> List[int]:
"""
Finds signal in the binned waveforms and
identifies candidate triggers.
"""
eng_sum = wfs.sum()
if isinstance(wfs, np.ndarray):
eng_sum = wfs.sum(axis=0)
else:
eng_sum = wfs.sum()
indices = indices_and_wf_above_threshold(eng_sum,
bin_threshold).indices
## Just using this and the stand_off for now
Expand Down Expand Up @@ -122,20 +126,24 @@ def generate_slice(trigger : int ,
pad_safe(sipm_charge[:, sipm_slice], sipm_pad))


def position_signal(triggers : List,
pmt_bins : np.ndarray,
pmt_charge : pd.Series,
sipm_bins : np.ndarray,
sipm_charge: pd.Series
def position_signal(triggers : List ,
pmt_bins : np.ndarray ,
pmt_charge : Union[pd.Series, np.ndarray],
sipm_bins : np.ndarray ,
sipm_charge: Union[pd.Series, np.ndarray]
) -> List[Tuple[np.ndarray, np.ndarray]]:
"""
Synchronises the SiPMs and PMTs for each identified
trigger and calls the padding function to fill with
zeros where necessary.
"""
pmt_q = np.asarray(pmt_charge.tolist())
sipm_q = np.empty((0,0))\
if sipm_charge.empty else np.asarray(sipm_charge.tolist())
if isinstance(pmt_charge, pd.Series):
pmt_q = np.asarray(pmt_charge.tolist())
sipm_q = np.empty((0,0))\
if sipm_charge.empty else np.asarray(sipm_charge.tolist())
else:
pmt_q = pmt_charge
sipm_q = sipm_charge
return [generate_slice(trigger, pmt_bins, pmt_q, sipm_bins, sipm_q)
for trigger in triggers ]
return position_signal
51 changes: 51 additions & 0 deletions invisible_cities/detsim/buffer_functions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ def test_find_signal_start(binned_waveforms, signal_thresh):
assert np.all(pmt_sum[pulses] > signal_thresh)


@mark.parametrize("signal_thresh", (2, 10))
def test_find_signal_start_numpy(binned_waveforms, signal_thresh):

pmt_bins, pmt_wfs, *_ = binned_waveforms

buffer_length = 800 * units.mus
bin_width = np.diff(pmt_bins)[0]
stand_off = int(buffer_length / bin_width)

pmt_sum = pmt_wfs.sum()
pmt_wfs_np = np.asarray(pmt_wfs.tolist())
pulses = find_signal_start(pmt_wfs_np, signal_thresh, stand_off)

assert np.all(pmt_sum[pulses] > signal_thresh)


def test_find_signal_start_correct_index():

thresh = 5
Expand Down Expand Up @@ -109,3 +125,38 @@ def test_buffer_calculator(mc_waveforms, binned_waveforms,
assert evt_pmt .shape[1] == int(buffer_length / pmt_binwid)
assert evt_sipm.shape[1] == int(buffer_length / sipm_binwid)
assert np.sum(evt_pmt, axis=0)[pre_trg_samp] == pmt_sum[pulses[i]]


def test_buffer_calculator_pandas_numpy(mc_waveforms, binned_waveforms):

_, pmt_binwid, sipm_binwid, _ = mc_waveforms

pmt_bins, pmt_wfs, sipm_bins, sipm_wfs = binned_waveforms

buffer_length = 800 * units.mus
bin_width = np.diff(pmt_bins)[0]
stand_off = int(buffer_length / bin_width)
pre_trigger = 100 * units.mus
signal_thresh = 2

pulses_pd = find_signal_start(pmt_wfs, signal_thresh, stand_off)
pmt_nparr = np.asarray(pmt_wfs.tolist())
pulses_np = find_signal_start(pmt_nparr, signal_thresh, stand_off)

calculate_buffers = buffer_calculator(buffer_length,
pre_trigger ,
pmt_binwid ,
sipm_binwid )

buffers_pd = calculate_buffers(pulses_pd, *binned_waveforms)
buffers_np = calculate_buffers(pulses_np ,
pmt_bins ,
pmt_nparr ,
sipm_bins ,
np.asarray(sipm_wfs.tolist()))

assert len(buffers_pd) == len(buffers_np) == 1
evtpd_buffers = buffers_pd[0]
evtnp_buffers = buffers_np[0]
assert np.all([np.all(evtpd_buffers[0] == evtnp_buffers[0]),
np.all(evtpd_buffers[1] == evtnp_buffers[1])])

0 comments on commit 6ef9a98

Please # to comment.