Skip to content

Add signal class and helper function #24

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Feb 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions examples/time_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
N = 56 # number of secondary sources
R = 1.5 # radius of spherical/circular array
x0, n0, a0 = sfs.array.circular(N, R) # get secondary source positions
fs = 44100 # sampling rate

# create dirac
signal = [1]
# unit impulse
signal = [1], fs

# POINT SOURCE
xs = [2, 2, 0] # position of virtual source
t = 0.008
# compute driving signals
d_delay, d_weight = sfs.time.drivingfunction.wfs_25d_point(x0, n0, xs)
d, t_offset = sfs.time.drivingfunction.driving_signals(d_delay, d_weight,
signal)
t -= t_offset
d = sfs.time.drivingfunction.driving_signals(d_delay, d_weight, signal)

# test soundfield
a = sfs.mono.drivingfunction.source_selection_point(n0, x0, xs)
twin = sfs.tapering.tukey(a, .3)
Expand All @@ -48,9 +48,8 @@

# compute driving signals
d_delay, d_weight = sfs.time.drivingfunction.wfs_25d_plane(x0, n0, npw)
d, t_offset = sfs.time.drivingfunction.driving_signals(d_delay,
d_weight, signal)
t -= t_offset
d = sfs.time.drivingfunction.driving_signals(d_delay, d_weight, signal)

# test soundfield
a = sfs.mono.drivingfunction.source_selection_plane(n0, npw)
twin = sfs.tapering.tukey(a, .3)
Expand Down
3 changes: 0 additions & 3 deletions sfs/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,3 @@

# tolerance used for secondary source selection
selection_tolerance = 1e-6

# sampling frequency
fs = 44100
58 changes: 26 additions & 32 deletions sfs/time/drivingfunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def wfs_25d_point(x0, n0, xs, xref=[0, 0, 0], c=None):
return delays, weights


def driving_signals(delays, weights, signal, fs=None):
def driving_signals(delays, weights, signal):
"""Get driving signals per secondary source.

Returned signals are the delayed and weighted mono input signal
Expand All @@ -142,58 +142,52 @@ def driving_signals(delays, weights, signal, fs=None):
Delay in seconds for each channel, negative values allowed.
weights : (C,) array_like
Amplitude weighting factor for each channel.
signal : (N,) array_like
Excitation signal (mono) which gets weighted and delayed.
fs: int, optional
Sampling frequency in Hertz.
signal : tuple of (N,) array_like, followed by 1 or 2 scalars
Excitation signal consisting of (mono) audio data, sampling rate
(in Hertz) and optional starting time (in seconds).

Returns
-------
driving_signals : (N, C) numpy.ndarray
Driving signal per channel (column represents channel).
t_offset : float
Simulation point in time offset (seconds).
`DelayedSignal`
A tuple containing the driving signals (in a `numpy.ndarray`
with shape ``(N, C)``), followed by the sampling rate (in Hertz)
and a (possibly negative) time offset (in seconds).

"""
delays = util.asarray_1d(delays)
weights = util.asarray_1d(weights)
d, t_offset = apply_delays(signal, delays, fs)
return d * weights, t_offset
data, samplerate, signal_offset = apply_delays(signal, delays)
return util.DelayedSignal(data * weights, samplerate, signal_offset)


def apply_delays(signal, delays, fs=None):
def apply_delays(signal, delays):
"""Apply delays for every channel.

A mono input signal gets delayed for each channel individually. The
simultation point in time is shifted by the smallest delay provided,
which allows negative delays as well.

Parameters
----------
signal : (N,) array_like
Mono excitation signal (with N samples) which gets delayed.
signal : tuple of (N,) array_like, followed by 1 or 2 scalars
Excitation signal consisting of (mono) audio data, sampling rate
(in Hertz) and optional starting time (in seconds).
delays : (C,) array_like
Delay in seconds for each channel (C), negative values allowed.
fs: int, optional
Sampling frequency in Hertz.

Returns
-------
out : (N, C) numpy.ndarray
Output signals (column represents channel).
t_offset : float
Simulation point in time offset (seconds).
`DelayedSignal`
A tuple containing the delayed signals (in a `numpy.ndarray`
with shape ``(N, C)``), followed by the sampling rate (in Hertz)
and a (possibly negative) time offset (in seconds).

"""
if fs is None:
fs = defs.fs
signal = util.asarray_1d(signal)
data, samplerate, initial_offset = util.as_delayed_signal(signal)
data = util.asarray_1d(data)
delays = util.asarray_1d(delays)
delays += initial_offset

delays_samples = np.rint(fs * delays).astype(int)
delays_samples = np.rint(samplerate * delays).astype(int)
offset_samples = delays_samples.min()
delays_samples -= offset_samples
out = np.zeros((delays_samples.max() + len(signal), len(delays_samples)))
for channel, cdelay in enumerate(delays_samples):
out[cdelay:cdelay + len(signal), channel] = signal
return out, offset_samples / fs
out = np.zeros((delays_samples.max() + len(data), len(delays_samples)))
for column, row in enumerate(delays_samples):
out[row:row + len(data), column] = data
return util.DelayedSignal(out, samplerate, offset_samples / samplerate)
30 changes: 14 additions & 16 deletions sfs/time/soundfield.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
"""Compute sound field."""

from __future__ import division
import numpy as np
from .. import util
from .. import defs
from .source import point


def p_array(x0, d, channel_weights, t, grid, source=point, fs=None, c=None):
def p_array(x0, signals, weights, observation_time, grid, source=point,
c=None):
"""Compute sound field for an array of secondary sources.

Parameters
----------
x0 : (N, 3) array_like
Sequence of secondary source positions.
d : (N, C) array_like
Specifies the signals (with N samples) fed into each secondary
source channel C (columns).
channel_weights : (C,) array_like
signals : tuple of (N, C) array_like, followed by 1 or 2 scalars
Driving signals consisting of audio data (C channels), sampling
rate (in Hertz) and optional starting time (in seconds).
weights : (C,) array_like
Additional weights applied during integration, e.g. source
tapering.
t : float
observation_time : float
Simulation point in time (seconds).
grid : triple of array_like
The grid that is used for the sound field calculations.
See `sfs.util.xyz_grid()`.
source: function, optional
Source type is a function, returning scalar field.
For default, see `sfs.time.source.point()`.
fs: int, optional
Sampling frequency in Hertz.
c : float, optional
Speed of sound.

Expand All @@ -39,19 +37,19 @@ def p_array(x0, d, channel_weights, t, grid, source=point, fs=None, c=None):
Sound pressure at grid positions.

"""
if fs is None:
fs = defs.fs
if c is None:
c = defs.c
x0 = util.asarray_of_rows(x0)
channel_weights = util.asarray_1d(channel_weights)
d = np.asarray(d)
if not (len(channel_weights) == len(x0) == d.shape[1]):
data, samplerate, signal_offset = util.as_delayed_signal(signals)
weights = util.asarray_1d(weights)
channels = data.T
if not (len(weights) == len(x0) == len(channels)):
raise ValueError("Length mismatch")
# synthesize soundfield
p = 0
for signal, weight, position in zip(d.T, channel_weights, x0):
for channel, weight, position in zip(channels, weights, x0):
if weight != 0:
p_s = source(position, signal, t, grid, fs, c)
signal = channel, samplerate, signal_offset
p_s = source(position, signal, observation_time, grid, c)
p += p_s * weight # integrate over secondary sources
return p
27 changes: 13 additions & 14 deletions sfs/time/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .. import defs


def point(xs, signal, t, grid, fs=None, c=None):
def point(xs, signal, observation_time, grid, c=None):
r"""Source model for a point source: 3D Green's function.

Calculates the scalar sound pressure field for a given point in
Expand All @@ -22,15 +22,14 @@ def point(xs, signal, t, grid, fs=None, c=None):
----------
xs : (3,) array_like
Position of source in cartesian coordinates.
signal : (N,) array_like
Excitation signal.
t : float
signal : tuple of (N,) array_like, followed by 1 or 2 scalars
Excitation signal consisting of (mono) audio data, sampling rate
(in Hertz) and optional starting time (in seconds).
observation_time : float
Observed point in time.
grid : triple of array_like
The grid that is used for the sound field calculations.
See `sfs.util.xyz_grid()`.
fs: int, optional
Sampling frequency in Hertz.
c : float, optional
Speed of sound.

Expand All @@ -49,16 +48,16 @@ def point(xs, signal, t, grid, fs=None, c=None):

"""
xs = util.asarray_1d(xs)
signal = util.asarray_1d(signal)
data, samplerate, signal_offset = util.as_delayed_signal(signal)
data = util.asarray_1d(data)
grid = util.as_xyz_components(grid)
if fs is None:
fs = defs.fs
if c is None:
c = defs.c
r = np.linalg.norm(grid - xs)
# evaluate g over grid
g_amplitude = 1 / (4 * np.pi * r)
g_time = r / c
p = np.interp(t - g_time, np.arange(len(signal)) / fs, signal,
left=0, right=0)
return p * g_amplitude
weights = 1 / (4 * np.pi * r)
delays = r / c
base_time = observation_time - signal_offset
return weights * np.interp(base_time - delays,
np.arange(len(data)) / samplerate,
data, left=0, right=0)
69 changes: 69 additions & 0 deletions sfs/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Various utility functions."""

import collections
import numpy as np
from . import defs

Expand Down Expand Up @@ -116,6 +117,59 @@ def as_xyz_components(components, **kwargs):
return XyzComponents([np.asarray(c, **kwargs) for c in components])


def as_delayed_signal(arg, **kwargs):
"""Make sure that the given argument can be used as a signal.

Parameters
----------
arg : sequence of 1 array_like followed by 1 or 2 scalars
The first element is converted to a NumPy array, the second
element is used a the sampling rate (in Hertz) and the optional
third element is used as the starting time of the signal (in
seconds). Default starting time is 0.
**kwargs
All keyword arguments are forwarded to :func:`numpy.asarray`.

Returns
-------
`DelayedSignal`
A named tuple consisting of a `numpy.ndarray` containing the
audio data, followed by the sampling rate and the starting time
of the signal.

Examples
--------
Typically, this is used together with tuple unpacking to assign the
audio data, the sampling rate and the starting time to separate
variables:

>>> import sfs
>>> sig = [1], 44100
>>> data, fs, signal_offset = sfs.util.as_delayed_signal(sig)
>>> data
array([1])
>>> fs
44100
>>> signal_offset
0

"""
try:
# In Python 3, this could be: data, samplerate, *time = arg
data, samplerate, time = arg[0], arg[1], arg[2:]
time, = time or [0]
except (IndexError, TypeError, ValueError):
pass
else:
valid_arguments = (not np.isscalar(data) and
np.isscalar(samplerate) and
np.isscalar(time))
if valid_arguments:
data = np.asarray(data, **kwargs)
return DelayedSignal(data, samplerate, time)
raise TypeError('expected audio data, samplerate, optional start time')


def strict_arange(start, stop, step=1, endpoint=False, dtype=None, **kwargs):
"""Like :func:`numpy.arange`, but compensating numeric errors.

Expand Down Expand Up @@ -346,3 +400,18 @@ def apply(self, func, *args, **kwargs):

"""
return XyzComponents([func(i, *args, **kwargs) for i in self])


DelayedSignal = collections.namedtuple('DelayedSignal', 'data samplerate time')
"""A tuple of audio data, sampling rate and start time.

This class (a `collections.namedtuple`) is not meant to be instantiated
by users.

To pass a signal to a function, just use a simple `tuple` or `list`
containing the audio data and the sampling rate, with an optional
starting time (in seconds) as a third item.
If you want to ensure that a given variable contains a valid signal, use
`sfs.util.as_delayed_signal()`.

"""