diff --git a/invisible_cities/reco/deconv_functions.py b/invisible_cities/reco/deconv_functions.py index dd0bcd3cf..799d8e08d 100644 --- a/invisible_cities/reco/deconv_functions.py +++ b/invisible_cities/reco/deconv_functions.py @@ -14,6 +14,7 @@ from scipy import ndimage as ndi from ..core .core_functions import shift_to_bin_centers +from .. core.core_functions import binedges_from_bincenters from ..core .core_functions import in_range from ..core .configure import check_annotations @@ -210,15 +211,17 @@ def deconvolution_input(data : Tuple[np.ndarray, ...], weight : np.ndarray ) -> Tuple[np.ndarray, Tuple[np.ndarray, ...]]: - ranges = [[coord.min() - 1.5 * sw, coord.max() + 1.5 * sw] for coord, sw in zip(data, sample_width)] - if inter_method in (InterpolationMethod.linear, InterpolationMethod.cubic, InterpolationMethod.nearest): - allbins = [np.arange(rang[0], rang[1] + np.finfo(np.float32).eps, sw) for rang, sw in zip(ranges, sample_width)] - Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight) - elif inter_method is InterpolationMethod.nointerpolation: + eps = np.finfo(np.float32).eps + ranges = [ [ coord.min() - 1.5 * sw + , coord.max() + 1.5 * sw + eps] + for coord, sw in zip(data, sample_width) ] + if inter_method is InterpolationMethod.nointerpolation: allbins = [grid[in_range(grid, *rang)] for rang, grid in zip(ranges, det_grid)] + allbins = [binedges_from_bincenters(bins) for bins in allbins] Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight) else: - raise ValueError(f'inter_method {inter_method} is not a valid interpolatin mode.') + allbins = [np.arange(*rang, sw) for rang, sw in zip(ranges, sample_width)] + Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight) inter_points = np.meshgrid(*(shift_to_bin_centers(edge) for edge in edges), indexing='ij') inter_points = tuple (inter_p.flatten() for inter_p in inter_points) diff --git a/invisible_cities/reco/deconv_functions_test.py b/invisible_cities/reco/deconv_functions_test.py index 8841b7fea..b9c9b584f 100644 --- a/invisible_cities/reco/deconv_functions_test.py +++ b/invisible_cities/reco/deconv_functions_test.py @@ -6,6 +6,7 @@ from pytest import mark from pytest import raises +from pytest import fixture from hypothesis import given from hypothesis.strategies import floats @@ -140,31 +141,71 @@ def test_interpolate_signal(): assert np.allclose(grid , sorted(set(inter_position[1]))) -def test_deconvolution_input(data_hdst, data_hdst_deconvolved): +@fixture(scope="session") +def new_grid_1mm(): + bin_sizes = (1., 1.) + det_db = DataSiPM('new', 0) + det_grid = [ np.arange( det_db[var].min() - bs/2 + , det_db[var].max() + bs/2 + np.finfo(np.float32).eps, bs) + for var, bs in zip("X Y".split(), bin_sizes)] + return det_grid + + +@fixture(scope="session") +def data_hdst_first_peak(data_hdst): + event = 3021916 + peak = 0 + hdst = ( load_dst(data_hdst, 'RECO', 'Events') + .groupby("event npeak".split()) + .get_group((event, peak)) + .groupby(['X', 'Y']).Q.sum().reset_index() + .loc[lambda df: df.Q > 40] + ) + return hdst + + +def test_deconvolution_input_exact_result(data_hdst_first_peak, data_hdst_deconvolved, new_grid_1mm): + """ + Compare the output of the deconvolution with a reference output + stored in a file. + """ + hdst = data_hdst_first_peak ref_interpolation = np.load(data_hdst_deconvolved) - hdst = load_dst(data_hdst, 'RECO', 'Events') - h = hdst[(hdst.event == 3021916) & (hdst.npeak == 0)] - h = h.groupby(['X', 'Y']).Q.sum().reset_index() - h = h[h.Q > 40] - - det_db = DataSiPM('new', 0) - det_grid = [np.arange(det_db[var].min() + bs/2, det_db[var].max() - bs/2 + np.finfo(np.float32).eps, bs) - for var, bs in zip(['X', 'Y'], [1., 1.])] - interpolator = deconvolution_input([10., 10.], det_grid, InterpolationMethod.cubic) - inter = interpolator((h.X, h.Y), h.Q) + interpolator = deconvolution_input([10., 10.], new_grid_1mm, InterpolationMethod.cubic) + inter = interpolator((hdst.X, hdst.Y), hdst.Q) assert np.allclose(ref_interpolation['e_inter'], inter[0]) assert np.allclose(ref_interpolation['x_inter'], inter[1][0]) assert np.allclose(ref_interpolation['y_inter'], inter[1][1]) +@mark.parametrize("interp_method", InterpolationMethod) +def test_deconvolution_input_interpolation_method(data_hdst_first_peak, new_grid_1mm, interp_method): + """ + Check that it runs with all interpolation methods. + Select one event/peak from input and apply a cut to obtain an + input image that results in round numbers. + """ + hdst = data_hdst_first_peak + interpolator = deconvolution_input([10., 10.], new_grid_1mm, interp_method) + output = interpolator((hdst.X, hdst.Y), hdst.Q) + + shape = 120, 120 + nelements = shape[0] * shape[1] + assert output[0].shape == shape + assert len(output[1]) == 2 + assert output[1][0].shape == (nelements,) + assert output[1][1].shape == (nelements,) + + @mark.parametrize("interp_method", InterpolationMethod.__members__) -def test_deconvolution_input_interpolation_method(data_hdst, data_hdst_deconvolved, interp_method): +def test_deconvolution_input_wrong_interpolation_method_raises(interp_method): with raises(ValueError): deconvolution_input([10., 10.], [1., 1.], interp_method) +#TODO: use data_hdst_first_peak and new_grid_1mm here too, if possible def test_deconvolve(data_hdst, data_hdst_deconvolved): ref_interpolation = np.load (data_hdst_deconvolved)