# Copyright 2021 Sean Robertson
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Filters and filter banks"""
import abc
import math
from typing import Mapping, Optional, Tuple, Union
import numpy as np
import pydrobert.speech.config as config
from pydrobert.speech.alias import AliasedFactory, alias_factory_subclass_from_arg
from pydrobert.speech.scales import MelScaling
from pydrobert.speech.scales import ScalingFunction
from pydrobert.speech.util import angular_to_hertz
from pydrobert.speech.util import hertz_to_angular
__all__ = [
"BartlettWindow",
"BlackmanWindow",
"ComplexGammatoneFilterBank",
"GaborFilterBank",
"GammaWindow",
"HammingWindow",
"HannWindow",
"LinearFilterBank",
"TriangularOverlappingFilterBank",
"WindowFunction",
]
# banks
[docs]
class LinearFilterBank(AliasedFactory):
"""A collection of linear, time invariant filters
A :class:`LinearFilterBank` instance is expected to provide factory methods for
instantiating a fixed number of LTI filters in either the time or frequency domain.
Filters should be organized lowest frequency first.
"""
@abc.abstractproperty
def is_real(self) -> bool:
"""bool : Whether the filters are real or complex"""
pass
@abc.abstractproperty
def is_analytic(self) -> bool:
"""bool : Whether the filters are (approximately) analytic
An analytic signal has no negative frequency components. A real signal cannot
be analytic.
"""
pass
@abc.abstractproperty
def is_zero_phase(self) -> bool:
"""bool : Whether the filters are zero phase or not
Zero phase filters are even functions with no imaginary part in the fourier
domain. Their impulse responses center around 0.
"""
pass
@abc.abstractproperty
def num_filts(self) -> int:
"""int : Number of filters in the bank"""
pass
@abc.abstractproperty
def sampling_rate(self) -> float:
"""float : Number of samples in a second of a target recording"""
pass
@abc.abstractproperty
def supports_hz(self) -> Tuple[Tuple[float, float], ...]:
"""tuple : Boundaries of effective support of filter freq responses, in Hz.
Returns a tuple of length `num_filts` containing pairs of floats of the low and
high frequencies. Frequencies outside the span have a response of approximately
(with magnitude up to :obj:`pydrobert.speech.EFFECTIVE_SUPPORT_SIGNAL`) zero.
The boundaries need not be tight, i.e. the region inside the boundaries could be
zero. It is more important to guarantee that the region outside the boundaries
is approximately zero.
The boundaries ignore the Hermitian symmetry of the filter if it is real. Bounds
of ``(10, 20)`` for a real filter imply that the region ``(-20, -10)`` could
also be nonzero.
The user is responsible for adjusting the for the periodicity induced by
sampling. For example, if the boundaries are ``(-5, 10)`` and the filter is
sampled at 15Hz, then all bins of an associated DFT could be nonzero.
"""
pass
@abc.abstractproperty
def supports(self) -> Tuple[Tuple[float, float], ...]:
"""tuple : Boundaries of effective support of filter impulse resps, in samples
Returns a tuple of length `num_filts` containing pairs of integers of the first
and last (effectively) nonzero samples.
The boundaries need not be tight, i.e. the region inside the boundaries could be
zero. It is more important to guarantee that the region outside the boundaries
is approximately zero.
If a filter is instantiated using a buffer that is unable to fully contain the
supported region, samples will wrap around the boundaries of the buffer.
Noncausal filters will have start indices less than 0. These samples will wrap
to the end of the filter buffer when the filter is instantiated.
"""
pass
@property
def supports_ms(self) -> Tuple[Tuple[float, float], ...]:
"""tuple : Boundaries of effective support of filter impulse resps, in ms"""
return tuple(
(s[0] * 1000 / self.sampling_rate, s[1] * 1000 / self.sampling_rate,)
for s in self.supports
)
[docs]
@abc.abstractmethod
def get_impulse_response(self, filt_idx: int, width: int) -> np.ndarray:
"""Construct filter impulse response in a fixed-width buffer
Construct the filter in the time domain.
Parameters
----------
filt_idx
The index of the filter to generate. Less than `num_filts`
width
The length of the buffer, in samples. If less than the support of the
filter, the filter will alias.
Returns
-------
ir : np.ndarray
1D float64 or complex128 numpy array of length `width`
"""
pass
[docs]
@abc.abstractmethod
def get_frequency_response(
self, filt_idx: int, width: int, half: bool = False
) -> np.ndarray:
"""Construct filter frequency response in a fixed-width buffer
Construct the 2pi-periodized filter in the frequency domain. Zero-phase filters
are returned as 8-byte float arrays. Otherwise, they will be 16-byte complex
floats.
Parameters
----------
filt_idx
The index of the filter to generate. Less than `num_filts`
width
The length of the DFT to output
half
Whether to return only the DFT bins between ``[0,pi]``
Returns
-------
fr : np.ndarray
If `half` is :obj:`False`, returns a 1D float64 or complex128 numpy array of
length `width`. If `half` is :obj:`True` and `width` is even, the returned
array is of length ``width // 2 + 1``. If `width` is odd, the returned array
is of length ``(width + 1) // 2``.
"""
pass
[docs]
@abc.abstractmethod
def get_truncated_response(
self, filt_idx: int, width: int
) -> Tuple[int, np.ndarray]:
"""Get nonzero region of filter frequency response
Many filters will be compactly supported in frequency (or approximately so).
This method generates a tuple `(bin_idx, buf)` of the nonzero region.
In the case of a complex filter, ``bin_idx + len(buf)`` may be greater than
`width`; the filter wraps around in this case. The full frequency response can
be calculated from the truncated response by:
>>> bin_idx, trnc = bank.get_truncated_response(filt_idx, width)
>>> full = numpy.zeros(width, dtype=trnc.dtype)
>>> wrap = min(bin_idx + len(trnc), width) - bin_idx
>>> full[bin_idx:bin_idx + wrap] = trnc[:wrap]
>>> full[:len(trnc) - wrap] = tnc[wrap:]
In the case of a real filter, only the nonzero region between ``[0, pi]``
(half-spectrum) is returned. No wrapping can occur since it would inevitably
interfere with itself due to conjugate symmetry. The half-spectrum can easily be
recovered by:
>>> half_width = (width + width % 2) // 2 + 1 - width % 2
>>> half = numpy.zeros(half_width, dtype=trnc.dtype)
>>> half[bin_idx:bin_idx + len(trnc)] = trnc
And the full spectrum by:
>>> full[bin_idx:bin_idx + len(trnc)] = trnc
>>> full[width - bin_idx - len(trnc) + 1:width - bin_idx + 1] = \\
... trnc[:None if bin_idx else 0:-1].conj()
(the embedded if-statement is necessary when bin_idx is 0, as the full fft
excludes its symmetric bin)
Parameters
----------
filt_idx
The index of the filter to generate. Less than `num_filts`
width
The length of the DFT to output
Returns
-------
tfr: tuple of int, array
"""
pass
[docs]
class TriangularOverlappingFilterBank(LinearFilterBank):
"""Triangular frequency response whose vertices are along the scale
The vertices of the filters are sampled uniformly along the passed scale. If the
scale is nonlinear, the triangles will be asymmetrical. This is closely related to,
but not identical to, the filters described in [povey2011]_ and [young]_.
Parameters
----------
scaling_function
Dictates the layout of filters in the Fourier domain. Can be a
:class:`ScalingFunction` or something compatible with
:func:`pydrobert.speech.alias.alias_factory_subclass_from_arg`
num_filts
The number of filters in the bank
high_hz
The topmost edge of the filter frequencies. The default is the Nyquist for
`sampling_rate`.
low_hz
The bottommost edge of the filter frequences.
sampling_rate
The sampling rate (cycles/sec) of the target recordings
analytic
Whether to use an analytic form of the bank. The analytic form is easily derived
from the real form in [povey2011]_ and [young]_. Since the filter is compactly
supported in frequency, the analytic form is simply the suppression of the
``[-pi, 0)`` frequencies
Raises
------
ValueError
If `high_hz` is above the Nyquist, or `low_hz` is below :obj:`0`, or
``high_hz <= low_hz``
"""
aliases = {"tri", "triangular"} #:
def __init__(
self,
scaling_function: Union[ScalingFunction, Mapping, str],
num_filts: int = 40,
high_hz: Optional[float] = None,
low_hz: float = 20.0,
sampling_rate: float = 16000,
analytic: bool = False,
):
scaling_function = alias_factory_subclass_from_arg(
ScalingFunction, scaling_function
)
nyquist = sampling_rate / 2
if high_hz is None:
high_hz = nyquist
# allow 1Hz-leeway for floating point / serialization errors
if not (0 <= low_hz < high_hz <= nyquist + 1):
raise ValueError(
"Invalid frequency range: ({:.2f},{:.2f}".format(low_hz, high_hz)
)
high_hz = min(high_hz, nyquist)
self._rate = sampling_rate
# compute vertices
scale_low = scaling_function.hertz_to_scale(low_hz)
scale_high = scaling_function.hertz_to_scale(high_hz)
scale_delta = (scale_high - scale_low) / (num_filts + 1)
self._vertices = tuple(
scaling_function.scale_to_hertz(scale_low + scale_delta * idx)
for idx in range(0, num_filts + 2)
)
self._analytic = analytic
@property
def is_real(self) -> bool:
return not self._analytic
@property
def is_analytic(self) -> bool:
return self._analytic
@property
def is_zero_phase(self) -> bool:
return True
@property
def num_filts(self) -> int:
return len(self._vertices) - 2
@property
def sampling_rate(self) -> float:
return self._rate
@property
def centers_hz(self) -> Tuple[float, ...]:
"""The point of maximum gain in each filter's frequency response, in Hz
This property gives the so-called "center frequencies" - the point of maximum
gain - of each filter.
"""
return self._vertices[1:-1]
@property
def supports_hz(self) -> Tuple[Tuple[float, float], ...]:
return tuple(
(low, high) for low, high in zip(self._vertices[:-2], self._vertices[2:])
)
@property
def supports(self) -> Tuple[Tuple[float, float], ...]:
# A given filter is bound from above by
# 2(w_r - w_l) / ((w_c - w_l)(w_r - w_c)t^2pi)
supports = []
for idx in range(len(self._vertices) - 2):
left = hertz_to_angular(self._vertices[idx], self._rate)
mid = hertz_to_angular(self._vertices[idx + 1], self._rate)
right = hertz_to_angular(self._vertices[idx + 2], self._rate)
K = np.sqrt(8 * (right - left) / np.pi)
K /= np.sqrt(config.EFFECTIVE_SUPPORT_THRESHOLD)
K /= np.sqrt(mid - left) * np.sqrt(right - mid)
K = int(np.ceil(K))
supports.append((-K // 2 - 1, K // 2 + 1))
return tuple(supports)
def get_impulse_response(self, filt_idx: int, width: int) -> np.ndarray:
left = hertz_to_angular(self._vertices[filt_idx], self._rate)
mid = hertz_to_angular(self._vertices[filt_idx + 1], self._rate)
right = hertz_to_angular(self._vertices[filt_idx + 2], self._rate)
res = np.zeros(width, dtype=np.complex128 if self._analytic else np.float64)
# for numerical stability (angles can get pretty small)
if right - mid > mid - left:
denom = right - mid
div_term = mid - left
else:
denom = mid - left
div_term = right - mid
denom *= (int(self._analytic) + 1) * np.pi
for t in range(1, width + 1):
if self._analytic:
numer = (right - left) / div_term * np.exp(1j * mid * t)
numer -= (right - mid) / div_term * np.exp(1j * left * t)
numer -= (mid - left) / div_term * np.exp(1j * right * t)
else:
numer = (right - left) / div_term * np.cos(mid * t)
numer -= (right - mid) / div_term * np.cos(left * t)
numer -= (mid - left) / div_term * np.cos(right * t)
val = numer / t ** 2
if t < width:
res[t] += val
res[-t] += val.conj()
else:
res[0] += val
numer = mid / div_term * (right ** 2 - left ** 2)
numer += right / div_term * (left ** 2 - mid ** 2)
numer += left / div_term * (mid ** 2 - right ** 2)
res[0] += numer / 2
res /= denom
return res
def get_frequency_response(
self, filt_idx: int, width: int, half: bool = False
) -> np.ndarray:
left = self._vertices[filt_idx]
mid = self._vertices[filt_idx + 1]
right = self._vertices[filt_idx + 2]
left_idx = int(np.ceil(width * left / self._rate))
right_idx = int(width * right / self._rate)
assert self._rate * (left_idx - 1) / width <= left
assert self._rate * (right_idx + 1) / width >= right, width
dft_size = width
if half:
if width % 2:
dft_size = (width + 1) // 2
else:
dft_size = width // 2 + 1
res = np.zeros(dft_size, dtype=np.float64)
for idx in range(left_idx, min(dft_size, right_idx + 1)):
hz = self._rate * idx / width
if hz <= mid:
val = (hz - left) / (mid - left)
else:
val = (right - hz) / (right - mid)
res[idx] = val
if not half and not self._analytic:
res[-idx] = val
return res
def get_truncated_response(
self, filt_idx: int, width: int
) -> Tuple[int, np.ndarray]:
left = self._vertices[filt_idx]
mid = self._vertices[filt_idx + 1]
right = self._vertices[filt_idx + 2]
left_idx = int(np.ceil(width * left / self._rate))
right_idx = int(width * right / self._rate)
assert self._rate * (left_idx - 1) / width <= left
assert self._rate * (right_idx + 1) / width >= right, width
res = np.zeros(1 + right_idx - left_idx, dtype=np.float64)
for idx in range(left_idx, min(width, right_idx + 1)):
hz = self._rate * idx / width
if hz <= mid:
res[idx - left_idx] = (hz - left) / (mid - left)
else:
res[idx - left_idx] = (right - hz) / (right - mid)
return left_idx, res
class Fbank(LinearFilterBank):
"""A mel-triangular filter bank that is square-rooted
An ``Fbank`` instance is intended to replicate the filters from Kaldi [povey2011]_
and HTK [young]_. Its scale is fixed to Mel-scale. Like a
``TriangularOverlappingFilterBank``, ``Fbank`` places the vertices of triangular
filters uniformly along the target scale. However, an ``Fbank`` is triangular in the
Mel-scale, whereas the triangular bank is triangular in frequency.
Parameters
----------
num_filts
The number of filters in the bank
high_hz
The topmost edge of the filter frequencies. The default is the Nyquist for
`sampling_rate`.
low_hz
The bottommost edge of the filter frequences.
sampling_rate
The sampling rate (cycles/sec) of the target recordings
analytic
Whether to use an analytic form of the bank. The analytic form is easily derived
from the real form in [povey2011]_ and [young]_. Since the filter is compactly
supported in frequency, the analytic form is simply the suppression of the
``[-pi, 0)`` frequencies
Notes
-----
In a standard mel-filterbank spectrogram, the power spectrum is calculated before
filtering. This module's spectrogram takes the power spectrum after filtering. To
recreate the frequency response of the alternate order, we can take the pointwise
square root of the frequency response.
"""
aliases = {"fbank"} #:
def __init__(
self,
num_filts: int = 40,
high_hz: Optional[float] = None,
low_hz: float = 20.0,
sampling_rate: float = 16000,
analytic: bool = False,
):
scaling_function = MelScaling()
if low_hz < 0 or (
high_hz and (high_hz <= low_hz or high_hz > sampling_rate // 2)
):
raise ValueError(
"Invalid frequency range: ({:.2f},{:.2f}".format(low_hz, high_hz)
)
self._rate = sampling_rate
if high_hz is None:
high_hz = sampling_rate // 2
# compute vertices
scale_low = scaling_function.hertz_to_scale(low_hz)
scale_high = scaling_function.hertz_to_scale(high_hz)
scale_delta = (scale_high - scale_low) / (num_filts + 1)
self._vertices = tuple(
scaling_function.scale_to_hertz(scale_low + scale_delta * idx)
for idx in range(0, num_filts + 2)
)
self._analytic = analytic
@property
def is_real(self) -> bool:
return not self._analytic
@property
def is_analytic(self) -> bool:
return self._analytic
@property
def is_zero_phase(self) -> bool:
return True
@property
def num_filts(self) -> int:
return len(self._vertices) - 2
@property
def sampling_rate(self) -> float:
return self._rate
@property
def centers_hz(self) -> Tuple[float, ...]:
"""tuple : The point of maximum gain in each filter's frequency response, in Hz
This property gives the so-called "center frequencies" - the point of maximum
gain - of each filter.
"""
return self._vertices[1:-1]
@property
def supports_hz(self) -> Tuple[Tuple[float, float], ...]:
return tuple(
(low, high) for low, high in zip(self._vertices[:-2], self._vertices[2:])
)
@property
def supports(self) -> Tuple[Tuple[float, float], ...]:
# A given filter is bound above for t > 0 by
# ((w_r - w_c) ** .5 + (w_c - w_l) ** .5) /
# (2 ** 3 * t ** 3 * (w_c - w_l) * (w_r - w_c) * pi) ** .5
supports = []
for idx in range(len(self._vertices) - 2):
left = hertz_to_angular(self._vertices[idx], self._rate)
mid = hertz_to_angular(self._vertices[idx + 1], self._rate)
right = hertz_to_angular(self._vertices[idx + 2], self._rate)
K = right - left + 2 * ((right - mid) * (mid - left)) ** 2
K /= config.EFFECTIVE_SUPPORT_THRESHOLD ** 2 * np.pi
K /= (right - mid) * (mid - left)
K /= np.sqrt(config.EFFECTIVE_SUPPORT_THRESHOLD)
K /= np.sqrt(mid - left) * np.sqrt(right - mid)
K **= 0.3333
K = int(np.ceil(K))
supports.append((-K // 2 - 1, K // 2 + 1))
return tuple(supports)
def get_impulse_response(self, filt_idx: int, width: int) -> np.ndarray:
# For the time being, I'll just invert the frequency response
if self.is_analytic:
freq_response = self.get_frequency_response(filt_idx, width, half=False)
return np.fft.ifft(freq_response)
else:
freq_response = self.get_frequency_response(filt_idx, width, half=True)
return np.fft.irfft(freq_response, n=width)
def get_frequency_response(
self, filt_idx: int, width: int, half: bool = False
) -> np.ndarray:
scaling_function = MelScaling()
left_hz = self._vertices[filt_idx]
mid_hz = self._vertices[filt_idx + 1]
right_hz = self._vertices[filt_idx + 2]
left_mel = scaling_function.hertz_to_scale(left_hz)
mid_mel = scaling_function.hertz_to_scale(mid_hz)
right_mel = scaling_function.hertz_to_scale(right_hz)
left_idx = int(np.ceil(width * left_hz / self._rate))
right_idx = int(width * right_hz / self._rate)
assert self._rate * (left_idx - 1) / width <= left_hz
assert self._rate * (right_idx + 1) / width >= right_hz, width
dft_size = width
if half:
if width % 2:
dft_size = (width + 1) // 2
else:
dft_size = width // 2 + 1
res = np.zeros(dft_size, dtype=np.float64)
for idx in range(left_idx, min(dft_size, right_idx + 1)):
hz = self._rate * idx / width
mel = scaling_function.hertz_to_scale(hz)
if mel <= mid_mel:
val = (mel - left_mel) / (mid_mel - left_mel)
else:
val = (right_mel - mel) / (right_mel - mid_mel)
res[idx] = val ** 0.5
if not half and not self._analytic:
res[-idx] = val ** 0.5
return res
def get_truncated_response(
self, filt_idx: int, width: int
) -> Tuple[int, np.ndarray]:
scaling_function = MelScaling()
left_hz = self._vertices[filt_idx]
mid_hz = self._vertices[filt_idx + 1]
right_hz = self._vertices[filt_idx + 2]
left_mel = scaling_function.hertz_to_scale(left_hz)
mid_mel = scaling_function.hertz_to_scale(mid_hz)
right_mel = scaling_function.hertz_to_scale(right_hz)
left_idx = int(np.ceil(width * left_hz / self._rate))
right_idx = int(width * right_hz / self._rate)
assert self._rate * (left_idx - 1) / width <= left_hz
assert self._rate * (right_idx + 1) / width >= right_hz, width
res = np.zeros(min(width, right_idx + 1) - left_idx, dtype=np.float64)
for idx in range(left_idx, min(width, right_idx + 1)):
hz = self._rate * idx / width
mel = scaling_function.hertz_to_scale(hz)
if mel <= mid_mel:
res[idx - left_idx] = (mel - left_mel) / (mid_mel - left_mel)
else:
res[idx - left_idx] = (right_mel - mel) / (right_mel - mid_mel)
return left_idx, res ** 0.5
[docs]
class GaborFilterBank(LinearFilterBank):
r"""Gabor filters with ERBs between points from a scale
Gabor filters are complex, mostly analytic filters that have a Gaussian envelope in
both the time and frequency domains. They are defined as
.. math::
f(t) = C \sigma^{-1/2} \pi^{-1/4}
e^{\frac{-t^2}{2\sigma^2} + i\xi t}
in the time domain and
.. math::
\widehat{f}(\omega) = C \sqrt{2\sigma} \pi^{1/4}
e^{\frac{-\sigma^2(\xi - \omega)^2}{2}}
in the frequency domain. Though Gaussians never truly reach 0, in either domain,
they are effectively compactly supported. Gabor filters are optimal with respect to
their time-bandwidth product.
`scaling_function` is used to split up the frequencies between `high_hz` and
`low_hz` into a series of filters. Every subsequent filter's width is scaled such
that, if the filters are all of the same height, the intersection with the precedent
filter's response matches the filter's Equivalent Rectangular Bandwidth (``erb ==
True``) or its 3dB bandwidths (``erb == False``). The ERB is the width of a
rectangular filter with the same height as the filter's maximum frequency response
that has the same :math:`L^2` norm.
Parameters
----------
scaling_function
Dictates the layout of filters in the Fourier domain. Can be a
:class:`ScalingFunction` or something compatible with
:func:`pydrobert.speech.alias.alias_factory_subclass_from_arg`
num_filts
The number of filters in the bank
high_hz
The topmost edge of the filter frequencies. The default is the Nyquist for
`sampling_rate`.
low_hz
The bottommost edge of the filter frequences.
sampling_rate
The sampling rate (cycles/sec) of the target recordings
scale_l2_norm
Whether to scale the l2 norm of each filter to 1. Otherwise the frequency
response of each filter will max out at an absolute value of 1.
erb : bool
See Also
--------
pydrobert.speech.config.EFFECTIVE_SUPPORT_THRESHOLD
The absolute value below which counts as zero
"""
aliases = {"gabor"} #:
def __init__(
self,
scaling_function: Union[ScalingFunction, Mapping, str],
num_filts: int = 40,
high_hz: Optional[float] = None,
low_hz: float = 20.0,
sampling_rate: float = 16000,
scale_l2_norm: bool = False,
erb: bool = False,
):
scaling_function = alias_factory_subclass_from_arg(
ScalingFunction, scaling_function
)
self._scale_l2_norm = scale_l2_norm
self._erb = erb
if low_hz < 0 or (
high_hz and (high_hz <= low_hz or high_hz > sampling_rate // 2)
):
raise ValueError(
"Invalid frequency range: ({:.2f},{:.2f}".format(low_hz, high_hz)
)
self._rate = sampling_rate
if high_hz is None:
high_hz = sampling_rate // 2
scale_low = scaling_function.hertz_to_scale(low_hz)
scale_high = scaling_function.hertz_to_scale(high_hz)
scale_delta = (scale_high - scale_low) / (num_filts + 1)
# edges dictate the points where filters should intersect. We
# make a pretend intersection halfway between low_hz and
# the first filter center in the scaled domain. Likewise with
# high_hz and the last filter center. Intersections are spaced
# uniformly in the scaled domain
edges = tuple(
scaling_function.scale_to_hertz(scale_low + scale_delta * (idx + 0.5))
for idx in range(0, num_filts + 1)
)
centers_hz = []
centers_ang = []
stds = []
supports_ang = []
supports = []
wrap_supports_ang = []
self._wrap_below = False
log_2 = np.log(2)
log_pi = np.log(np.pi)
t_support_const = -2 * np.log(config.EFFECTIVE_SUPPORT_THRESHOLD)
f_support_const = t_support_const
if scale_l2_norm:
f_support_const += log_2 + 0.5 * log_pi
t_support_const -= 0.5 * log_pi
else:
t_support_const -= log_2 + log_pi
if erb:
bandwidth_const = np.sqrt(np.pi) / 2
else:
bandwidth_const = np.sqrt(3 / 10 * np.log(10))
for left_intersect, right_intersect in zip(edges[:-1], edges[1:]):
center_hz = (left_intersect + right_intersect) / 2
center_ang = hertz_to_angular(center_hz, self._rate)
std = bandwidth_const / hertz_to_angular(
center_hz - left_intersect, self._rate
)
log_std = np.log(std)
if scale_l2_norm:
diff_ang = np.sqrt(log_std + f_support_const) / std
wrap_diff_ang = np.sqrt(log_std + f_support_const + log_2) / std
diff_samps = int(np.ceil(std * np.sqrt(t_support_const - log_std)))
else:
diff_ang = np.sqrt(f_support_const) / std
wrap_diff_ang = np.sqrt(f_support_const + log_2) / std
diff_samps = int(np.ceil(std * np.sqrt(t_support_const - 2 * log_std)))
supp_ang_low = center_ang - diff_ang
if supp_ang_low < 0:
self._wrap_below = True
centers_hz.append(center_hz)
centers_ang.append(center_ang)
supports_ang.append((center_ang - diff_ang, center_ang + diff_ang))
wrap_supports_ang.append(2 * wrap_diff_ang)
supports.append((-diff_samps, diff_samps))
stds.append(std)
self._centers_ang = tuple(centers_ang)
self._centers_hz = tuple(centers_hz)
self._stds = tuple(stds)
self._supports_ang = tuple(supports_ang)
self._wrap_supports_ang = tuple(wrap_supports_ang)
self._supports_hz = tuple(
(angular_to_hertz(ang_l, self._rate), angular_to_hertz(ang_h, self._rate),)
for ang_l, ang_h in supports_ang
)
self._supports = tuple(supports)
@property
def is_real(self) -> bool:
return False
@property
def is_analytic(self) -> bool:
return not self._wrap_below
@property
def num_filts(self) -> int:
return len(self._centers_hz)
@property
def is_zero_phase(self) -> bool:
return True
@property
def sampling_rate(self) -> float:
return self._rate
@property
def centers_hz(self) -> Tuple[float, ...]:
"""tuple :The point of maximum gain in each filter's frequency response, in Hz
This property gives the so-called "center frequencies" - the
point of maximum gain - of each filter.
"""
return self._centers_hz
@property
def supports_hz(self) -> Tuple[Tuple[float, float], ...]:
return self._supports_hz
@property
def supports(self) -> Tuple[Tuple[float, float], ...]:
return self._supports
@property
def scaled_l2_norm(self) -> bool:
return self._scale_l2_norm
@property
def erb(self) -> bool:
return self._erb
def get_impulse_response(self, filt_idx: int, width: int) -> np.ndarray:
center_ang = self._centers_ang[filt_idx]
std = self._stds[filt_idx]
res = np.zeros(width, dtype=np.complex128)
if self._scale_l2_norm:
const_term = -0.5 * np.log(std) - 0.25 * np.log(np.pi)
else:
const_term = -0.5 * np.log(2 * np.pi) - np.log(std)
denom_term = 2 * std ** 2
for t in range(width + 1):
val = -(t ** 2) / denom_term + const_term + 1j * center_ang * t
val = np.exp(val)
if t != width:
res[t] += val
if t:
res[-t] += val.conj()
return res
def get_frequency_response(
self, filt_idx: int, width: int, half: bool = False
) -> np.ndarray:
center_ang = self._centers_ang[filt_idx]
lowest_ang, highest_ang = self._supports_ang[filt_idx]
std = self._stds[filt_idx]
dft_size = width
if half:
if width % 2:
dft_size = (width + 1) // 2
else:
dft_size = width // 2 + 1
res = np.zeros(dft_size, dtype=np.float64)
if self._scale_l2_norm:
const_term = 0.5 * np.log(2 * std) + 0.25 * np.log(np.pi)
else:
const_term = 0
num_term = -(std ** 2) / 2
for idx in range(dft_size):
for period in range(
-1 - int(max(-lowest_ang, 0) / (2 * np.pi)),
2 + int(highest_ang / (2 * np.pi)),
):
omega = (idx / width + period) * 2 * np.pi
val = num_term * (center_ang - omega) ** 2 + const_term
val = np.exp(val)
res[idx] += val
return res
def get_truncated_response(
self, filt_idx: int, width: int
) -> Tuple[int, np.ndarray]:
# wrap_supports_ang contains the angular supports of each filter
# if the effective support threshold were halved. If this
# support exceeds the 2pi period, overlap from aliasing in the
# periphery will exceed the effective support, meaning the
# entire period lies in the support
if self._wrap_supports_ang[filt_idx] >= 2 * np.pi:
return 0, self.get_frequency_response(filt_idx, width)
center_ang = self._centers_ang[filt_idx]
std = self._stds[filt_idx]
lowest_ang, highest_ang = self._supports_ang[filt_idx]
left_idx = int(np.ceil(width * lowest_ang / (2 * np.pi)))
right_idx = int(width * highest_ang / (2 * np.pi))
res = np.zeros(1 + right_idx - left_idx, dtype=np.float64)
if self._scale_l2_norm:
const_term = 0.5 * np.log(2 * std) + 0.25 * np.log(np.pi)
else:
const_term = 0
num_term = -(std ** 2) / 2
for idx in range(left_idx, right_idx + 1):
for period in range(
-int(max(-lowest_ang, 0) / (2 * np.pi)),
1 + int(highest_ang / (2 * np.pi)),
):
omega = (idx / width + period) * 2 * np.pi
val = num_term * (center_ang - omega) ** 2 + const_term
val = np.exp(val)
res[idx - left_idx] += val
return left_idx % width, res
[docs]
class ComplexGammatoneFilterBank(LinearFilterBank):
r"""Gammatone filters with complex carriers
A complex gammatone filter [flanagan1960]_ [aertsen1981]_ can be defined as
.. math::
h(t) = c t^{n - 1} e^{- \alpha t + i\xi t} u(t)
in the time domain, where :math:`\alpha` is the bandwidth parameter, :math:`\xi` is
the carrier frequency, :math:`n` is the order of the function, :math:`u(t)` is the
step function, and :math:`c` is a normalization constant. In the frequency domain,
the filter is defined as
.. math::
H(\omega) = \frac{c(n - 1)!}{\left(
\alpha + i(\omega - \xi) \right)^n}
For large :math:`\xi`, the complex gammatone is approximately analytic.
`scaling_function` is used to split up the frequencies between `high_hz` and
`low_hz` into a series of filters. Every subsequent filter's width is scaled such
that, if the filters are all of the same height, the intersection with the precedent
filter's response matches the filter's Equivalent Rectangular Bandwidth (``erb ==
True``) or its 3dB bandwidths (``erb == False``). The ERB is the width of a
rectangular filter with the same height as the filter's maximum frequency response
that has the same :math:`L^2` norm.
Parameters
----------
scaling_function
Dictates the layout of filters in the Fourier domain. Can be a
:class:`ScalingFunction` or something compatible with
:func:`pydrobert.speech.alias.alias_factory_subclass_from_arg`
num_filts
The number of filters in the bank
high_hz
The topmost edge of the filter frequencies. The default is the Nyquist for
`sampling_rate`.
low_hz
The bottommost edge of the filter frequences.
sampling_rate : float, optional
The sampling rate (cycles/sec) of the target recordings
order
The :math:`n` parameter in the Gammatone. Should be positive. Larger orders
will make the gammatone more symmetrical.
max_centered
While normally causal, setting `max_centered` to true will shift all filters in
the bank such that the maximum absolute value in time is centered at sample 0.
scale_l2_norm
Whether to scale the l2 norm of each filter to 1. Otherwise the frequency
response of each filter will max out at an absolute value of 1.
erb
See Also
--------
pydrobert.speech.config.EFFECTIVE_SUPPORT_THRESHOLD
The absolute value below which counts as zero
"""
aliases = {"gammatone", "tonebank"} #:
def __init__(
self,
scaling_function: Union[ScalingFunction, Mapping, str],
num_filts: int = 40,
high_hz: Optional[float] = None,
low_hz: float = 20.0,
sampling_rate: float = 16000,
order: int = 4,
max_centered: bool = False,
scale_l2_norm: bool = False,
erb: bool = False,
):
scaling_function = alias_factory_subclass_from_arg(
ScalingFunction, scaling_function
)
self._scale_l2_norm = scale_l2_norm
self._erb = erb
if low_hz < 0 or (
high_hz and (high_hz <= low_hz or high_hz > sampling_rate // 2)
):
raise ValueError(
"Invalid frequency range: ({:.2f},{:.2f}".format(low_hz, high_hz)
)
if not isinstance(order, int) or order <= 0:
raise ValueError("order must be a positive integer")
self._order = order
self._rate = sampling_rate
if high_hz is None:
high_hz = sampling_rate // 2
scale_low = scaling_function.hertz_to_scale(low_hz)
scale_high = scaling_function.hertz_to_scale(high_hz)
scale_delta = (scale_high - scale_low) / (num_filts + 1)
# see gabor filters for more info
edges = tuple(
scaling_function.scale_to_hertz(scale_low + scale_delta * (idx + 0.5))
for idx in range(0, num_filts + 1)
)
self._centers_hz = []
self._xis = []
self._alphas = []
self._cs = []
self._offsets = []
self._supports = []
self._supports_ang = []
self._wrap_supports_ang = []
self._wrap_below = False
log_eps = np.log(config.EFFECTIVE_SUPPORT_THRESHOLD)
log_double_factorial = np.log(math.factorial(2 * order - 2))
log_factorial = np.log(math.factorial(order - 1))
log_2 = np.log(2)
if erb:
alpha_const = log_2 * (2 * order - 1)
alpha_const += 2 * log_factorial
alpha_const -= log_double_factorial
else:
alpha_const = -0.5 * np.log(4 * (2 ** (1 / order)) - 4)
for left_intersect, right_intersect in zip(edges[:-1], edges[1:]):
center_hz = (left_intersect + right_intersect) / 2
xi = hertz_to_angular(center_hz, self._rate)
log_alpha = alpha_const + np.log(
hertz_to_angular(right_intersect - left_intersect, self._rate)
)
alpha = np.exp(log_alpha)
if scale_l2_norm:
log_c = 0.5 * (log_2 + log_alpha + log_double_factorial)
log_c -= order * (log_alpha + log_2)
else:
log_c = order * log_alpha - log_factorial
c = np.exp(log_c)
if max_centered:
offset = -(order - 1) / alpha
else:
offset = 0
supp_a = (2 / order) * (log_c + log_factorial - log_eps)
wrap_supp_a = supp_a + (2 / order) * log_2
supp_b = np.exp(2 * log_alpha)
diff_ang = (np.exp(supp_a) - supp_b) ** 0.5
wrap_diff_ang = (np.exp(wrap_supp_a) - supp_b) ** 0.5
self._centers_hz.append(center_hz)
self._xis.append(xi)
self._alphas.append(alpha)
self._cs.append(c)
self._offsets.append(offset)
self._supports.append(self._calculate_temp_support(-1))
self._supports_ang.append((xi - diff_ang, xi + diff_ang))
if self._supports_ang[-1][0] < 0:
self._wrap_below = True
self._wrap_supports_ang.append(2 * wrap_diff_ang)
self._xis = tuple(self._xis)
self._cs = tuple(self._cs)
self._alphas = tuple(self._alphas)
self._offsets = tuple(self._offsets)
self._centers_hz = tuple(self._centers_hz)
self._supports_ang = tuple(self._supports_ang)
self._wrap_supports_ang = tuple(self._wrap_supports_ang)
self._supports_hz = tuple(
(angular_to_hertz(ang_l, self._rate), angular_to_hertz(ang_h, self._rate),)
for ang_l, ang_h in self._supports_ang
)
self._supports = tuple(self._supports)
@property
def is_real(self) -> bool:
return False
@property
def is_analytic(self) -> bool:
return not self._wrap_below
@property
def num_filts(self) -> int:
return len(self._centers_hz)
@property
def order(self) -> int:
return self._order
@property
def is_zero_phase(self) -> bool:
return False
@property
def sampling_rate(self) -> float:
return self._rate
@property
def centers_hz(self) -> Tuple[float, ...]:
"""int : The point of maximum gain in each filter's frequency response, in Hz
This property gives the so-called "center frequencies" - the point of maximum
gain - of each filter.
"""
return self._centers_hz
@property
def supports_hz(self) -> Tuple[Tuple[float, float], ...]:
return self._supports_hz
@property
def supports(self) -> Tuple[Tuple[float, float], ...]:
return self._supports
@property
def scaled_l2_norm(self) -> bool:
return self._scale_l2_norm
@property
def erb(self) -> bool:
return self._erb
def get_impulse_response(self, filt_idx: int, width: int) -> np.ndarray:
left_sup, right_sup = self.supports[filt_idx]
left_period = int(np.floor(left_sup / width))
right_period = int(np.ceil(right_sup / width))
res = np.zeros(width, dtype=np.complex128)
for period in range(left_period, right_period + 1):
for idx in range(width):
t = period * width + idx
res[idx] += self._h(t, filt_idx)
return res
def get_frequency_response(
self, filt_idx: int, width: int, half: bool = False
) -> np.ndarray:
left_sup, right_sup = self._supports_ang[filt_idx]
left_period = int(np.floor(left_sup / 2 / np.pi))
right_period = int(np.ceil(right_sup / 2 / np.pi))
if half:
if width % 2:
dft_size = (width + 1) // 2
else:
dft_size = width // 2 + 1
else:
dft_size = width
res = np.zeros(dft_size, dtype=np.complex128)
omega = np.arange(dft_size, dtype=np.float64) * 2 * np.pi / width
for period in range(left_period, right_period + 1):
res += self._H(omega + 2 * np.pi * period, filt_idx)
return res
def get_truncated_response(
self, filt_idx: int, width: int
) -> Tuple[int, np.ndarray]:
left_sup, right_sup = self._supports_ang[filt_idx]
wrap_ang = self._wrap_supports_ang[filt_idx]
# wrap_ang is the additional support needed to hit
# half the effective support threshold. If that support is
# greater than the 2pi periodization, some points could exceed
# the threshold due to wrapping.
if right_sup - left_sup + wrap_ang >= 2 * np.pi:
return 0, self.get_frequency_response(filt_idx, width)
left_idx = int(np.ceil(width * left_sup / (2 * np.pi)))
right_idx = int(width * right_sup / (2 * np.pi))
omega = np.arange(left_idx, right_idx + 1, dtype=np.float64)
omega *= 2 * np.pi / width
return left_idx % width, self._H(omega, filt_idx)
def _h(self, t, idx):
# calculate impulse response of filt idx at sample t
offset = self._offsets[idx]
if t <= offset:
return 0j
alpha = self._alphas[idx]
log_c = np.log(self._cs[idx])
xi = self._xis[idx]
n = self._order
r = log_c + (n - 1) * np.log(t - offset)
r += (-alpha + 1j * xi) * (t - offset)
return np.exp(r)
def _H(self, omega, idx):
# calculate frequency response of filt idx at ang freqs omega
alpha = self._alphas[idx]
c = self._cs[idx]
xi = self._xis[idx]
offset = self._offsets[idx]
n = self._order
numer = np.exp(-1j * omega * offset) * c * math.factorial(n - 1)
denom = (alpha + 1j * (omega - xi)) ** n
return numer / denom
def _calculate_temp_support(self, idx):
# calculate the nonzero region of the temp support of filt idx
alpha = self._alphas[idx]
c = self._cs[idx]
# xi = self._xis[idx]
offset = self._offsets[idx]
n = self._order
eps = config.EFFECTIVE_SUPPORT_THRESHOLD
if n == 1:
right = int(np.ceil((np.log(c) - np.log(eps) / alpha)))
else:
def _d(t):
# derivative of abs func
v = c * np.exp(-alpha * t) * t ** (n - 2)
v *= (n - 1) - alpha * t
return v
right = (n - 1 + np.sqrt((n - 1) / 2)) / alpha
h_0 = np.abs(self._h(right, idx))
while h_0 > eps:
d_0 = _d(right)
right -= h_0 / d_0
h_0 = np.abs(self._h(right, idx))
return (int(np.floor(offset)), int(np.ceil(right) + offset))
# windows
[docs]
class WindowFunction(AliasedFactory):
"""A real linear filter, usually lowpass"""
[docs]
@abc.abstractmethod
def get_impulse_response(self, width: int) -> np.ndarray:
"""Write the filter into a numpy array of fixed width
Parameters
----------
width
The length of the window in samples
Returns
-------
ir : np.ndarray
A 1D vector of length `width`
"""
pass
[docs]
class BartlettWindow(WindowFunction):
"""A unit-normalized triangular window
See Also
--------
numpy.bartlett
"""
aliases = {"bartlett", "triangular", "tri"} #:
def get_impulse_response(self, width: int) -> np.ndarray:
window = np.bartlett(width)
window /= max(1, width - 1) / 2
return window
[docs]
class BlackmanWindow(WindowFunction):
"""A unit-normalized Blackman window
See Also
--------
numpy.blackman
"""
aliases = {"blackman", "black"} #:
def get_impulse_response(self, width: int) -> np.ndarray:
window = np.blackman(width)
window /= 0.42 * max(1, width - 1)
return window
[docs]
class HammingWindow(WindowFunction):
"""A unit-normalized Hamming window
See Also
--------
numpy.hamming
"""
aliases = {"hamming"} #:
def get_impulse_response(self, width: int) -> np.ndarray:
window = np.hamming(width)
window /= 0.54 * max(1, width - 1)
return window
[docs]
class HannWindow(WindowFunction):
"""A unit-normalized Hann window
See Also
--------
numpy.hanning
"""
aliases = {"hanning", "hann"} #:
def get_impulse_response(self, width: int) -> np.ndarray:
window = np.hanning(width)
window /= 0.5 * max(1, width - 1)
return window
[docs]
class GammaWindow(WindowFunction):
r"""A lowpass filter based on the Gamma function
A Gamma function is defined as:
.. math:: p(t; \alpha, n) = t^{n - 1} e^{-\alpha t} u(t)
Where :math:`n` is the order of the function, :math:`\alpha` controls the bandwidth
of the filter, and :math:`u` is the step function.
This function returns a window based off a reflected Gamma function. :math:`\alpha`
is chosen such that the maximum value of the window aligns with `peak`. The window
is clipped to the width. For reasonable values of `peak` (i.e. in the last quarter
of samples), the majority of the support should lie in this interval anyways.
Arguments
---------
order
peak
``peak * width``, where ``width`` is the length of the window in samples, is
where the approximate maximal value of the window lies
"""
order: int #:
peak: float #:
aliases = {"gamma"} #:
def __init__(self, order: int = 4, peak: float = 0.75):
self.order = order
self.peak = peak
def get_impulse_response(self, width: np.ndarray) -> np.ndarray:
if width <= 0:
return np.array([], dtype=float)
elif width == 1:
return np.array([1], dtype=float)
peak = self.peak * width
ret = np.arange(width - 1, -1, -1, dtype=float)
if self.order > 1:
alpha = (self.order - 1) / (width - peak)
offs = width - 1
else:
# align alpha roughly with a support in M
alpha = 5 / width
offs = width
ln_c = self.order * np.log(alpha)
ln_c -= np.log(math.factorial(self.order - 1))
ret[:offs] = ret[:offs] ** (self.order - 1) * np.exp(-alpha * ret[:offs] + ln_c)
return ret