Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix non-interpolated deconvolution #906

Merged
merged 7 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions invisible_cities/reco/deconv_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from scipy import ndimage as ndi

from ..core .core_functions import shift_to_bin_centers
from .. core.core_functions import binedges_from_bincenters
from ..core .core_functions import in_range
from ..core .configure import check_annotations

Expand Down Expand Up @@ -210,15 +211,17 @@ def deconvolution_input(data : Tuple[np.ndarray, ...],
weight : np.ndarray
) -> Tuple[np.ndarray, Tuple[np.ndarray, ...]]:

ranges = [[coord.min() - 1.5 * sw, coord.max() + 1.5 * sw] for coord, sw in zip(data, sample_width)]
if inter_method in (InterpolationMethod.linear, InterpolationMethod.cubic, InterpolationMethod.nearest):
allbins = [np.arange(rang[0], rang[1] + np.finfo(np.float32).eps, sw) for rang, sw in zip(ranges, sample_width)]
Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight)
elif inter_method is InterpolationMethod.nointerpolation:
eps = np.finfo(np.float32).eps
ranges = [ [ coord.min() - 1.5 * sw
, coord.max() + 1.5 * sw + eps]
jwaiton marked this conversation as resolved.
Show resolved Hide resolved
for coord, sw in zip(data, sample_width) ]
if inter_method is InterpolationMethod.nointerpolation:
allbins = [grid[in_range(grid, *rang)] for rang, grid in zip(ranges, det_grid)]
allbins = [binedges_from_bincenters(bins) for bins in allbins]
Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight)
else:
raise ValueError(f'inter_method {inter_method} is not a valid interpolatin mode.')
allbins = [np.arange(*rang, sw) for rang, sw in zip(ranges, sample_width)]
Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight)
gonzaponte marked this conversation as resolved.
Show resolved Hide resolved

inter_points = np.meshgrid(*(shift_to_bin_centers(edge) for edge in edges), indexing='ij')
inter_points = tuple (inter_p.flatten() for inter_p in inter_points)
Expand Down
65 changes: 53 additions & 12 deletions invisible_cities/reco/deconv_functions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from pytest import mark
from pytest import raises
from pytest import fixture

from hypothesis import given
from hypothesis.strategies import floats
Expand Down Expand Up @@ -140,31 +141,71 @@ def test_interpolate_signal():
assert np.allclose(grid , sorted(set(inter_position[1])))


def test_deconvolution_input(data_hdst, data_hdst_deconvolved):
@fixture(scope="session")
def new_grid_1mm():
bin_sizes = (1., 1.)
det_db = DataSiPM('new', 0)
det_grid = [ np.arange( det_db[var].min() - bs/2
, det_db[var].max() + bs/2 + np.finfo(np.float32).eps, bs)
for var, bs in zip("X Y".split(), bin_sizes)]
return det_grid


@fixture(scope="session")
def data_hdst_first_peak(data_hdst):
event = 3021916
peak = 0
hdst = ( load_dst(data_hdst, 'RECO', 'Events')
.groupby("event npeak".split())
.get_group((event, peak))
.groupby(['X', 'Y']).Q.sum().reset_index()
.loc[lambda df: df.Q > 40]
)
return hdst


def test_deconvolution_input_exact_result(data_hdst_first_peak, data_hdst_deconvolved, new_grid_1mm):
"""
Compare the output of the deconvolution with a reference output
stored in a file.
"""
hdst = data_hdst_first_peak
ref_interpolation = np.load(data_hdst_deconvolved)

hdst = load_dst(data_hdst, 'RECO', 'Events')
h = hdst[(hdst.event == 3021916) & (hdst.npeak == 0)]
h = h.groupby(['X', 'Y']).Q.sum().reset_index()
h = h[h.Q > 40]

det_db = DataSiPM('new', 0)
det_grid = [np.arange(det_db[var].min() + bs/2, det_db[var].max() - bs/2 + np.finfo(np.float32).eps, bs)
for var, bs in zip(['X', 'Y'], [1., 1.])]
interpolator = deconvolution_input([10., 10.], det_grid, InterpolationMethod.cubic)
inter = interpolator((h.X, h.Y), h.Q)
interpolator = deconvolution_input([10., 10.], new_grid_1mm, InterpolationMethod.cubic)
inter = interpolator((hdst.X, hdst.Y), hdst.Q)

assert np.allclose(ref_interpolation['e_inter'], inter[0])
assert np.allclose(ref_interpolation['x_inter'], inter[1][0])
assert np.allclose(ref_interpolation['y_inter'], inter[1][1])


@mark.parametrize("interp_method", InterpolationMethod)
def test_deconvolution_input_interpolation_method(data_hdst_first_peak, new_grid_1mm, interp_method):
"""
Check that it runs with all interpolation methods.
Select one event/peak from input and apply a cut to obtain an
input image that results in round numbers.
"""
hdst = data_hdst_first_peak
interpolator = deconvolution_input([10., 10.], new_grid_1mm, interp_method)
output = interpolator((hdst.X, hdst.Y), hdst.Q)

shape = 120, 120
nelements = shape[0] * shape[1]
assert output[0].shape == shape
assert len(output[1]) == 2
assert output[1][0].shape == (nelements,)
assert output[1][1].shape == (nelements,)


@mark.parametrize("interp_method", InterpolationMethod.__members__)
def test_deconvolution_input_interpolation_method(data_hdst, data_hdst_deconvolved, interp_method):
def test_deconvolution_input_wrong_interpolation_method_raises(interp_method):
with raises(ValueError):
deconvolution_input([10., 10.], [1., 1.], interp_method)


#TODO: use data_hdst_first_peak and new_grid_1mm here too, if possible
def test_deconvolve(data_hdst, data_hdst_deconvolved):
ref_interpolation = np.load (data_hdst_deconvolved)

Expand Down
Loading