Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Bufferization pipe #758

Merged
merged 3 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 30 additions & 49 deletions invisible_cities/cities/buffy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,23 @@
from typing import Callable
from typing import List

from .. core import system_of_units as units
from .. detsim.buffer_functions import buffer_calculator
from .. detsim.sensor_utils import first_and_last_times
from .. detsim.sensor_utils import get_n_sensors
from .. detsim.sensor_utils import sensor_order
from .. detsim.sensor_utils import pmt_and_sipm_bin_width
from .. detsim.sensor_utils import trigger_times
from .. io .event_filter_io import event_filter_writer
from .. io .rwf_io import buffer_writer
from .. reco import tbl_functions as tbl

from .. dataflow import dataflow as fl

from . components import city
from . components import collect
from . components import copy_mc_info
from . components import mcsensors_from_file
from . components import print_every
from . components import signal_finder
from . components import wf_binner
from .. core import system_of_units as units
from .. detsim.sensor_utils import first_and_last_times
from .. detsim.sensor_utils import get_n_sensors
from .. detsim.sensor_utils import sensor_order
from .. detsim.sensor_utils import pmt_and_sipm_bin_width
from .. io .event_filter_io import event_filter_writer
from .. reco import tbl_functions as tbl

from .. dataflow import dataflow as fl

from . components import city
from . components import collect
from . components import copy_mc_info
from . components import calculate_and_save_buffers
from . components import mcsensors_from_file
from . components import print_every
from . components import wf_binner


@city
Expand All @@ -68,22 +65,6 @@ def buffy(files_in , file_out , compression , event_range,
args = ("sipm_resp", "min_time", "max_time"),
out = ("sipm_bins", "sipm_bin_wfs") )

find_signal = fl.map(signal_finder(buffer_length, pmt_wid,
trigger_threshold ),
args = "pmt_bin_wfs" ,
out = "pulses" )

event_times = fl.map(trigger_times ,
args = ("pulses", "timestamp", "pmt_bins"),
out = "evt_times" )

calculate_buffers = fl.map(buffer_calculator(buffer_length, pre_trigger,
pmt_wid , sipm_wid),
args = ("pulses",
"pmt_bins" , "pmt_bin_wfs",
"sipm_bins", "sipm_bin_wfs") ,
out = "buffers" )

order_sensors = fl.map(sensor_order(detector_db, run_number,
nsamp_pmt , nsamp_sipm) ,
args = ("pmt_bin_wfs", "sipm_bin_wfs",
Expand All @@ -96,14 +77,18 @@ def buffy(files_in , file_out , compression , event_range,
events_with_resp = fl.count_filter(bool, args="event_passed")

with tb.open_file(file_out, "w", filters=tbl.filters(compression)) as h5out:
buffer_writer_ = fl.sink(buffer_writer(h5out ,
run_number = run_number,
n_sens_eng = npmt ,
n_sens_trk = nsipm ,
length_eng = nsamp_pmt ,
length_trk = nsamp_sipm),
args = ("event_number", "evt_times" ,
"ordered_buffers" ))
buffer_calculation = calculate_and_save_buffers(buffer_length ,
pre_trigger ,
pmt_wid ,
sipm_wid ,
trigger_threshold,
h5out ,
run_number ,
npmt ,
nsipm ,
nsamp_pmt ,
nsamp_sipm ,
order_sensors )

write_filter = fl.sink(event_filter_writer(h5out, "detected_events"),
args=("event_number", "event_passed") )
Expand All @@ -125,13 +110,9 @@ def buffy(files_in , file_out , compression , event_range,
extract_tminmax ,
bin_pmt_wf ,
bin_sipm_wf ,
find_signal ,
event_times ,
calculate_buffers ,
order_sensors ,
fl.branch("event_number" ,
evtnum_collect.sink),
buffer_writer_ ) ,
buffer_calculation ) ,
result = dict(events_in = event_count_in.future ,
events_resp = events_with_resp.future,
evtnum_list = evtnum_collect.future ))
Expand Down
51 changes: 51 additions & 0 deletions invisible_cities/cities/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .. core .configure import event_range_help
from .. core .random_sampling import NoiseSampler
from .. detsim import buffer_functions as bf
from .. detsim .sensor_utils import trigger_times
from .. reco import calib_functions as cf
from .. reco import sensor_functions as sf
from .. reco import calib_sensors_functions as csf
Expand All @@ -59,6 +60,7 @@
from .. io .dst_io import load_dst
from .. io .event_filter_io import event_filter_writer
from .. io .pmaps_io import pmap_writer
from .. io .rwf_io import buffer_writer
from .. types .ic_types import xy
from .. types .ic_types import NN
from .. types .ic_types import NNN
Expand Down Expand Up @@ -826,3 +828,52 @@ def compute_and_write_pmaps(detector_db, run_number, pmt_samp_wid, sipm_samp_wid
compute_pmaps = pipe(*filter(None, fn_list))

return compute_pmaps, empty_indices, empty_pmaps


def calculate_and_save_buffers(buffer_length : float ,
pre_trigger : float ,
pmt_wid : float ,
sipm_wid : float ,
trigger_threshold: int ,
h5out : tb.File ,
run_number : int ,
npmt : int ,
nsipm : int ,
nsamp_pmt : int ,
nsamp_sipm : int ,
order_sensors : Callable=None):
find_signal = fl.map(signal_finder(buffer_length, pmt_wid,
trigger_threshold ),
args = "pmt_bin_wfs" ,
out = "pulses" )

event_times = fl.map(trigger_times ,
args = ("pulses", "timestamp", "pmt_bins"),
out = "evt_times" )

calculate_buffers = fl.map(bf.buffer_calculator(buffer_length, pre_trigger,
pmt_wid , sipm_wid),
args = ("pulses",
"pmt_bins" , "pmt_bin_wfs",
"sipm_bins", "sipm_bin_wfs") ,
out = "buffers" )

saved_buffers = "buffers" if order_sensors is None else "ordered_buffers"
buffer_writer_ = sink(buffer_writer(h5out ,
run_number = run_number,
n_sens_eng = npmt ,
n_sens_trk = nsipm ,
length_eng = nsamp_pmt ,
length_trk = nsamp_sipm),
args = ("event_number", "evt_times" ,
saved_buffers ))

fn_list = (find_signal ,
event_times ,
calculate_buffers,
order_sensors ,
buffer_writer_ )

# Filter out order_sensors if it is not set
buffer_definition = pipe(*filter(None, fn_list))
return buffer_definition
38 changes: 23 additions & 15 deletions invisible_cities/detsim/buffer_functions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import numpy as np
import pandas as pd

from typing import Callable
from typing import List
from typing import Tuple
from typing import Callable
from typing import List
from typing import Tuple
from typing import Union

from .. reco.peak_functions import indices_and_wf_above_threshold
from .. reco.peak_functions import split_in_peaks
Expand Down Expand Up @@ -41,14 +42,17 @@ def bin_sensors(sensors : pd.DataFrame,

## !! to-do: clarify for non-pmt versions of next
## !! to-do: Check on integral instead of only threshold?
def find_signal_start(wfs : pd.Series,
bin_threshold: float ,
stand_off : int ) -> List[int]:
def find_signal_start(wfs : Union[pd.Series, np.ndarray],
bin_threshold: float ,
stand_off : int ) -> List[int]:
"""
Finds signal in the binned waveforms and
identifies candidate triggers.
"""
eng_sum = wfs.sum()
if isinstance(wfs, np.ndarray):
eng_sum = wfs.sum(axis=0)
else:
eng_sum = wfs.sum()
indices = indices_and_wf_above_threshold(eng_sum,
bin_threshold).indices
## Just using this and the stand_off for now
Expand Down Expand Up @@ -122,20 +126,24 @@ def generate_slice(trigger : int ,
pad_safe(sipm_charge[:, sipm_slice], sipm_pad))


def position_signal(triggers : List,
pmt_bins : np.ndarray,
pmt_charge : pd.Series,
sipm_bins : np.ndarray,
sipm_charge: pd.Series
def position_signal(triggers : List ,
pmt_bins : np.ndarray ,
pmt_charge : Union[pd.Series, np.ndarray],
sipm_bins : np.ndarray ,
sipm_charge: Union[pd.Series, np.ndarray]
) -> List[Tuple[np.ndarray, np.ndarray]]:
"""
Synchronises the SiPMs and PMTs for each identified
trigger and calls the padding function to fill with
zeros where necessary.
"""
pmt_q = np.asarray(pmt_charge.tolist())
sipm_q = np.empty((0,0))\
if sipm_charge.empty else np.asarray(sipm_charge.tolist())
if isinstance(pmt_charge, pd.Series):
pmt_q = np.asarray(pmt_charge.tolist())
sipm_q = np.empty((0,0))\
if sipm_charge.empty else np.asarray(sipm_charge.tolist())
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for this, it should be very straightforward. Something similar to
test_buffer_calculator flow but just asserting that the output is the same when using pandas and numpy array as input?

pmt_q = pmt_charge
sipm_q = sipm_charge
return [generate_slice(trigger, pmt_bins, pmt_q, sipm_bins, sipm_q)
for trigger in triggers ]
return position_signal
51 changes: 51 additions & 0 deletions invisible_cities/detsim/buffer_functions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ def test_find_signal_start(binned_waveforms, signal_thresh):
assert np.all(pmt_sum[pulses] > signal_thresh)


@mark.parametrize("signal_thresh", (2, 10))
def test_find_signal_start_numpy(binned_waveforms, signal_thresh):

pmt_bins, pmt_wfs, *_ = binned_waveforms

buffer_length = 800 * units.mus
bin_width = np.diff(pmt_bins)[0]
stand_off = int(buffer_length / bin_width)

pmt_sum = pmt_wfs.sum()
pmt_wfs_np = np.asarray(pmt_wfs.tolist())
pulses = find_signal_start(pmt_wfs_np, signal_thresh, stand_off)

assert np.all(pmt_sum[pulses] > signal_thresh)


def test_find_signal_start_correct_index():

thresh = 5
Expand Down Expand Up @@ -109,3 +125,38 @@ def test_buffer_calculator(mc_waveforms, binned_waveforms,
assert evt_pmt .shape[1] == int(buffer_length / pmt_binwid)
assert evt_sipm.shape[1] == int(buffer_length / sipm_binwid)
assert np.sum(evt_pmt, axis=0)[pre_trg_samp] == pmt_sum[pulses[i]]


def test_buffer_calculator_pandas_numpy(mc_waveforms, binned_waveforms):

_, pmt_binwid, sipm_binwid, _ = mc_waveforms

pmt_bins, pmt_wfs, sipm_bins, sipm_wfs = binned_waveforms

buffer_length = 800 * units.mus
bin_width = np.diff(pmt_bins)[0]
stand_off = int(buffer_length / bin_width)
pre_trigger = 100 * units.mus
signal_thresh = 2

pulses_pd = find_signal_start(pmt_wfs, signal_thresh, stand_off)
pmt_nparr = np.asarray(pmt_wfs.tolist())
pulses_np = find_signal_start(pmt_nparr, signal_thresh, stand_off)

calculate_buffers = buffer_calculator(buffer_length,
pre_trigger ,
pmt_binwid ,
sipm_binwid )

buffers_pd = calculate_buffers(pulses_pd, *binned_waveforms)
buffers_np = calculate_buffers(pulses_np ,
pmt_bins ,
pmt_nparr ,
sipm_bins ,
np.asarray(sipm_wfs.tolist()))

assert len(buffers_pd) == len(buffers_np) == 1
evtpd_buffers = buffers_pd[0]
evtnp_buffers = buffers_np[0]
assert np.all([np.all(evtpd_buffers[0] == evtnp_buffers[0]),
np.all(evtpd_buffers[1] == evtnp_buffers[1])])