From 23cff13ce1f55cedb4806f6599475d89dc95b7d6 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Wed, 16 Nov 2016 22:50:31 +0100 Subject: [PATCH] Add DelayedSignal class and as_delayed_signal() --- examples/time_domain.py | 15 ++++---- sfs/defs.py | 3 -- sfs/time/drivingfunction.py | 58 ++++++++++++++----------------- sfs/time/soundfield.py | 30 ++++++++-------- sfs/time/source.py | 27 +++++++-------- sfs/util.py | 69 +++++++++++++++++++++++++++++++++++++ 6 files changed, 129 insertions(+), 73 deletions(-) diff --git a/examples/time_domain.py b/examples/time_domain.py index d790372..2edb23f 100644 --- a/examples/time_domain.py +++ b/examples/time_domain.py @@ -15,18 +15,18 @@ N = 56 # number of secondary sources R = 1.5 # radius of spherical/circular array x0, n0, a0 = sfs.array.circular(N, R) # get secondary source positions +fs = 44100 # sampling rate -# create dirac -signal = [1] +# unit impulse +signal = [1], fs # POINT SOURCE xs = [2, 2, 0] # position of virtual source t = 0.008 # compute driving signals d_delay, d_weight = sfs.time.drivingfunction.wfs_25d_point(x0, n0, xs) -d, t_offset = sfs.time.drivingfunction.driving_signals(d_delay, d_weight, - signal) -t -= t_offset +d = sfs.time.drivingfunction.driving_signals(d_delay, d_weight, signal) + # test soundfield a = sfs.mono.drivingfunction.source_selection_point(n0, x0, xs) twin = sfs.tapering.tukey(a, .3) @@ -48,9 +48,8 @@ # compute driving signals d_delay, d_weight = sfs.time.drivingfunction.wfs_25d_plane(x0, n0, npw) -d, t_offset = sfs.time.drivingfunction.driving_signals(d_delay, - d_weight, signal) -t -= t_offset +d = sfs.time.drivingfunction.driving_signals(d_delay, d_weight, signal) + # test soundfield a = sfs.mono.drivingfunction.source_selection_plane(n0, npw) twin = sfs.tapering.tukey(a, .3) diff --git a/sfs/defs.py b/sfs/defs.py index 0365e45..c7a36ad 100644 --- a/sfs/defs.py +++ b/sfs/defs.py @@ -8,6 +8,3 @@ # tolerance used for secondary source selection selection_tolerance = 1e-6 - -# sampling frequency -fs = 44100 diff --git a/sfs/time/drivingfunction.py b/sfs/time/drivingfunction.py index 88d5c70..7bff1a7 100644 --- a/sfs/time/drivingfunction.py +++ b/sfs/time/drivingfunction.py @@ -130,7 +130,7 @@ def wfs_25d_point(x0, n0, xs, xref=[0, 0, 0], c=None): return delays, weights -def driving_signals(delays, weights, signal, fs=None): +def driving_signals(delays, weights, signal): """Get driving signals per secondary source. Returned signals are the delayed and weighted mono input signal @@ -142,58 +142,52 @@ def driving_signals(delays, weights, signal, fs=None): Delay in seconds for each channel, negative values allowed. weights : (C,) array_like Amplitude weighting factor for each channel. - signal : (N,) array_like - Excitation signal (mono) which gets weighted and delayed. - fs: int, optional - Sampling frequency in Hertz. + signal : tuple of (N,) array_like, followed by 1 or 2 scalars + Excitation signal consisting of (mono) audio data, sampling rate + (in Hertz) and optional starting time (in seconds). Returns ------- - driving_signals : (N, C) numpy.ndarray - Driving signal per channel (column represents channel). - t_offset : float - Simulation point in time offset (seconds). + `DelayedSignal` + A tuple containing the driving signals (in a `numpy.ndarray` + with shape ``(N, C)``), followed by the sampling rate (in Hertz) + and a (possibly negative) time offset (in seconds). """ delays = util.asarray_1d(delays) weights = util.asarray_1d(weights) - d, t_offset = apply_delays(signal, delays, fs) - return d * weights, t_offset + data, samplerate, signal_offset = apply_delays(signal, delays) + return util.DelayedSignal(data * weights, samplerate, signal_offset) -def apply_delays(signal, delays, fs=None): +def apply_delays(signal, delays): """Apply delays for every channel. - A mono input signal gets delayed for each channel individually. The - simultation point in time is shifted by the smallest delay provided, - which allows negative delays as well. - Parameters ---------- - signal : (N,) array_like - Mono excitation signal (with N samples) which gets delayed. + signal : tuple of (N,) array_like, followed by 1 or 2 scalars + Excitation signal consisting of (mono) audio data, sampling rate + (in Hertz) and optional starting time (in seconds). delays : (C,) array_like Delay in seconds for each channel (C), negative values allowed. - fs: int, optional - Sampling frequency in Hertz. Returns ------- - out : (N, C) numpy.ndarray - Output signals (column represents channel). - t_offset : float - Simulation point in time offset (seconds). + `DelayedSignal` + A tuple containing the delayed signals (in a `numpy.ndarray` + with shape ``(N, C)``), followed by the sampling rate (in Hertz) + and a (possibly negative) time offset (in seconds). """ - if fs is None: - fs = defs.fs - signal = util.asarray_1d(signal) + data, samplerate, initial_offset = util.as_delayed_signal(signal) + data = util.asarray_1d(data) delays = util.asarray_1d(delays) + delays += initial_offset - delays_samples = np.rint(fs * delays).astype(int) + delays_samples = np.rint(samplerate * delays).astype(int) offset_samples = delays_samples.min() delays_samples -= offset_samples - out = np.zeros((delays_samples.max() + len(signal), len(delays_samples))) - for channel, cdelay in enumerate(delays_samples): - out[cdelay:cdelay + len(signal), channel] = signal - return out, offset_samples / fs + out = np.zeros((delays_samples.max() + len(data), len(delays_samples))) + for column, row in enumerate(delays_samples): + out[row:row + len(data), column] = data + return util.DelayedSignal(out, samplerate, offset_samples / samplerate) diff --git a/sfs/time/soundfield.py b/sfs/time/soundfield.py index a223a66..a34416c 100644 --- a/sfs/time/soundfield.py +++ b/sfs/time/soundfield.py @@ -1,26 +1,26 @@ """Compute sound field.""" from __future__ import division -import numpy as np from .. import util from .. import defs from .source import point -def p_array(x0, d, channel_weights, t, grid, source=point, fs=None, c=None): +def p_array(x0, signals, weights, observation_time, grid, source=point, + c=None): """Compute sound field for an array of secondary sources. Parameters ---------- x0 : (N, 3) array_like Sequence of secondary source positions. - d : (N, C) array_like - Specifies the signals (with N samples) fed into each secondary - source channel C (columns). - channel_weights : (C,) array_like + signals : tuple of (N, C) array_like, followed by 1 or 2 scalars + Driving signals consisting of audio data (C channels), sampling + rate (in Hertz) and optional starting time (in seconds). + weights : (C,) array_like Additional weights applied during integration, e.g. source tapering. - t : float + observation_time : float Simulation point in time (seconds). grid : triple of array_like The grid that is used for the sound field calculations. @@ -28,8 +28,6 @@ def p_array(x0, d, channel_weights, t, grid, source=point, fs=None, c=None): source: function, optional Source type is a function, returning scalar field. For default, see `sfs.time.source.point()`. - fs: int, optional - Sampling frequency in Hertz. c : float, optional Speed of sound. @@ -39,19 +37,19 @@ def p_array(x0, d, channel_weights, t, grid, source=point, fs=None, c=None): Sound pressure at grid positions. """ - if fs is None: - fs = defs.fs if c is None: c = defs.c x0 = util.asarray_of_rows(x0) - channel_weights = util.asarray_1d(channel_weights) - d = np.asarray(d) - if not (len(channel_weights) == len(x0) == d.shape[1]): + data, samplerate, signal_offset = util.as_delayed_signal(signals) + weights = util.asarray_1d(weights) + channels = data.T + if not (len(weights) == len(x0) == len(channels)): raise ValueError("Length mismatch") # synthesize soundfield p = 0 - for signal, weight, position in zip(d.T, channel_weights, x0): + for channel, weight, position in zip(channels, weights, x0): if weight != 0: - p_s = source(position, signal, t, grid, fs, c) + signal = channel, samplerate, signal_offset + p_s = source(position, signal, observation_time, grid, c) p += p_s * weight # integrate over secondary sources return p diff --git a/sfs/time/source.py b/sfs/time/source.py index 2d66a53..6bce271 100644 --- a/sfs/time/source.py +++ b/sfs/time/source.py @@ -12,7 +12,7 @@ from .. import defs -def point(xs, signal, t, grid, fs=None, c=None): +def point(xs, signal, observation_time, grid, c=None): r"""Source model for a point source: 3D Green's function. Calculates the scalar sound pressure field for a given point in @@ -22,15 +22,14 @@ def point(xs, signal, t, grid, fs=None, c=None): ---------- xs : (3,) array_like Position of source in cartesian coordinates. - signal : (N,) array_like - Excitation signal. - t : float + signal : tuple of (N,) array_like, followed by 1 or 2 scalars + Excitation signal consisting of (mono) audio data, sampling rate + (in Hertz) and optional starting time (in seconds). + observation_time : float Observed point in time. grid : triple of array_like The grid that is used for the sound field calculations. See `sfs.util.xyz_grid()`. - fs: int, optional - Sampling frequency in Hertz. c : float, optional Speed of sound. @@ -49,16 +48,16 @@ def point(xs, signal, t, grid, fs=None, c=None): """ xs = util.asarray_1d(xs) - signal = util.asarray_1d(signal) + data, samplerate, signal_offset = util.as_delayed_signal(signal) + data = util.asarray_1d(data) grid = util.as_xyz_components(grid) - if fs is None: - fs = defs.fs if c is None: c = defs.c r = np.linalg.norm(grid - xs) # evaluate g over grid - g_amplitude = 1 / (4 * np.pi * r) - g_time = r / c - p = np.interp(t - g_time, np.arange(len(signal)) / fs, signal, - left=0, right=0) - return p * g_amplitude + weights = 1 / (4 * np.pi * r) + delays = r / c + base_time = observation_time - signal_offset + return weights * np.interp(base_time - delays, + np.arange(len(data)) / samplerate, + data, left=0, right=0) diff --git a/sfs/util.py b/sfs/util.py index 86b0683..64429f3 100644 --- a/sfs/util.py +++ b/sfs/util.py @@ -1,5 +1,6 @@ """Various utility functions.""" +import collections import numpy as np from . import defs @@ -116,6 +117,59 @@ def as_xyz_components(components, **kwargs): return XyzComponents([np.asarray(c, **kwargs) for c in components]) +def as_delayed_signal(arg, **kwargs): + """Make sure that the given argument can be used as a signal. + + Parameters + ---------- + arg : sequence of 1 array_like followed by 1 or 2 scalars + The first element is converted to a NumPy array, the second + element is used a the sampling rate (in Hertz) and the optional + third element is used as the starting time of the signal (in + seconds). Default starting time is 0. + **kwargs + All keyword arguments are forwarded to :func:`numpy.asarray`. + + Returns + ------- + `DelayedSignal` + A named tuple consisting of a `numpy.ndarray` containing the + audio data, followed by the sampling rate and the starting time + of the signal. + + Examples + -------- + Typically, this is used together with tuple unpacking to assign the + audio data, the sampling rate and the starting time to separate + variables: + + >>> import sfs + >>> sig = [1], 44100 + >>> data, fs, signal_offset = sfs.util.as_delayed_signal(sig) + >>> data + array([1]) + >>> fs + 44100 + >>> signal_offset + 0 + + """ + try: + # In Python 3, this could be: data, samplerate, *time = arg + data, samplerate, time = arg[0], arg[1], arg[2:] + time, = time or [0] + except (IndexError, TypeError, ValueError): + pass + else: + valid_arguments = (not np.isscalar(data) and + np.isscalar(samplerate) and + np.isscalar(time)) + if valid_arguments: + data = np.asarray(data, **kwargs) + return DelayedSignal(data, samplerate, time) + raise TypeError('expected audio data, samplerate, optional start time') + + def strict_arange(start, stop, step=1, endpoint=False, dtype=None, **kwargs): """Like :func:`numpy.arange`, but compensating numeric errors. @@ -346,3 +400,18 @@ def apply(self, func, *args, **kwargs): """ return XyzComponents([func(i, *args, **kwargs) for i in self]) + + +DelayedSignal = collections.namedtuple('DelayedSignal', 'data samplerate time') +"""A tuple of audio data, sampling rate and start time. + +This class (a `collections.namedtuple`) is not meant to be instantiated +by users. + +To pass a signal to a function, just use a simple `tuple` or `list` +containing the audio data and the sampling rate, with an optional +starting time (in seconds) as a third item. +If you want to ensure that a given variable contains a valid signal, use +`sfs.util.as_delayed_signal()`. + +"""