# Copyright 2021 Sean Robertson
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compute features from speech signals"""
import abc
from itertools import count
from typing import Mapping, Optional, Union
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
import numpy as np
from pydrobert.speech.alias import AliasedFactory, alias_factory_subclass_from_arg
from pydrobert.speech import config
from pydrobert.speech.filters import GammaWindow
from pydrobert.speech.filters import HannWindow
from pydrobert.speech.filters import LinearFilterBank
from pydrobert.speech.filters import WindowFunction
__all__ = [
"frame_by_frame_calculation",
"FrameComputer",
"LinearFilterBankFrameComputer",
"ShortIntegrationFrameComputer",
"ShortTimeFourierTransformFrameComputer",
"SIFrameComputer",
"STFTFrameComputer",
]
[docs]
class FrameComputer(AliasedFactory):
"""Construct features from a signal from fixed-length segments
A signal is treated as a (possibly overlapping) time series of frames. Each frame is
transformed into a fixed-length vector of coefficients.
Features can be computed one at a time, for example:
>>> chunk_size = 2 ** 10
>>> while len(signal):Z
>>> segment = signal[:chunk_size]
>>> feats = computer.compute_chunk(segment)
>>> # do something with feats
>>> signal = signal[chunk_size:]
>>> feats = computer.finalize()
Or all at once (which can be much faster, depending on how the computer is
optimized):
>>> feats = computer.compute_full(signal)
The k-th frame can be roughly localized to the signal offset to about ``signal[k *
computer.frame_shift]``. The signal's exact region of influence is dictated by the
`frame_style` property.
"""
@abc.abstractproperty
def frame_style(self) -> Literal["causal", "centered"]:
"""str : Dictates how the signal is split into frames
If :obj:`'causal'`, the k-th frame is computed over the indices ``signal[k *
frame_shift:k * frame_shift + frame_length]`` (at most). If :obj:`'centered'`,
the k-th frame is computed over the indices ``signal[k * frame_shift -
(frame_length + 1) // 2 + 1:k * frame_shift + frame_length // 2 + 1]``. Any
range beyond the bounds of the signal is generated in an implementation-specific
way.
"""
pass
@abc.abstractproperty
def sampling_rate(self) -> float:
"""float : Number of samples in a second of a target recording"""
pass
@abc.abstractproperty
def frame_length(self) -> int:
"""int : Number of samples which dictate a feature vector"""
pass
@property
def frame_length_ms(self) -> float:
"""float : Number of milliseconds of audio which dictate a feature vector"""
return self.frame_length * 1000 / self.sampling_rate
@abc.abstractproperty
def frame_shift(self) -> int:
"""int : Number of samples absorbed between successive frame computations"""
pass
@property
def frame_shift_ms(self) -> float:
"""float : Number of milliseconds between succecssive frame computations"""
return self.frame_shift * 1000 / self.sampling_rate
@abc.abstractproperty
def num_coeffs(self) -> int:
"""int : Number of coefficients returned per frame"""
pass
@abc.abstractproperty
def started(self) -> bool:
"""bool : Whether computations for a signal have started
Becomes :obj:`True` after the first call to :func:`compute_chunk`. Becomes
:obj:`False` after call to :func:`finalize`
"""
pass
[docs]
@abc.abstractmethod
def compute_chunk(self, chunk: np.ndarray) -> np.ndarray:
"""Compute some coefficients, given a chunk of audio
Parameters
----------
chunk
A 1D float array of the signal. Should be contiguous and non-overlapping
with any previously processed segments in the audio stream
Returns
-------
chunk : np.ndarray
A 2D float array of shape ``(num_frames, num_coeffs)``. ``num_frames`` is
nonnegative (possibly 0). Contains some number of feature vectors, ordered
in time over axis 0.
"""
pass
[docs]
@abc.abstractmethod
def finalize(self) -> np.ndarray:
"""Conclude processing a stream of audio, processing any stored buffer
Returns
-------
chunk : np.ndarray
A 2D float array of shape ``(num_frames, num_coeffs)``. ``num_frames`` is
either 1 or 0.
"""
pass
[docs]
def compute_full(self, signal: np.ndarray) -> np.ndarray:
"""Compute a full signal's worth of feature coefficients
Parameters
----------
signal
A 1D float array of the entire signal
Returns
-------
spec : np.ndarray
A 2D float array of shape ``(num_frames, num_coeffs)``. ``num_frames`` is
nonnegative (possibly 0). Contains some number of feature vectors, ordered
in time over axis 0.
Raises
------
ValueError
If already begin computing frames (``started=True``), and :func:`finalize`
has not been called
"""
return frame_by_frame_calculation(self, signal)
[docs]
class LinearFilterBankFrameComputer(FrameComputer):
"""Frame computers whose features are derived from linear filter banks
Computers based on linear filter banks have a predictable number of coefficients and
organization. Like the banks, the features with lower indices correspond to filters
with lower bandwidths. `num_coeffs` will be simply ``bank.num_filts +
int(include_energy)``.
Parameters
----------
bank
Each filter in the bank corresponds to a coefficient in a frame vector. Can be a
:class:`LinearFilterBank` or something compatible with
:func:`pydrobert.speech.alias.alias_factory_subclass_from_arg`
include_energy
Whether to include a coefficient based on the energy of the signal within the
frame. If :obj:`True`, the energy coefficient will be inserted at index 0.
"""
def __init__(
self, bank: Union[LinearFilterBank, Mapping, str], include_energy: bool = False
):
self._bank = alias_factory_subclass_from_arg(LinearFilterBank, bank)
self._include_energy = bool(include_energy)
@property
def bank(self) -> LinearFilterBank:
"""LinearFilterBank : The LinearFilterBank from which features are derived"""
return self._bank
@property
def includes_energy(self) -> bool:
"""bool : Whether the first coefficient is an energy coefficient"""
return self._include_energy
@property
def num_coeffs(self) -> int:
return self._bank.num_filts + int(self._include_energy)
def _power(x):
return np.linalg.norm(x, ord=2) ** 2
def _mag(x):
return np.sum(np.abs(x))
[docs]
class ShortTimeFourierTransformFrameComputer(LinearFilterBankFrameComputer):
"""Compute features of a signal by integrating STFTs
Computations are per frame and as follows:
1. The current frame is multiplied with some window (rectangular, Hamming, Hanning,
etc)
2. A DFT is performed on the result
3. For each filter in the provided input bank:
a. Multiply the result of 2. with the frequency response of the filter
b. Sum either the pointwise square or absolute value of elements in the buffer
from 3a.
c. Optionally take the log of the sum
Warning
--------
This behaviour differs from that of [povey2011]_ or [young]_ in three ways. First,
the sum (3b) comes after the filtering (3a), which changes the result in the squared
case. Second, the sum is over the full power spectrum, rather than just between 0
and the Nyquist. This doubles the value at the end of 3c. if a real filter is used.
Third, frame boundaries are calculated diffferently.
Parameters
----------
bank
Each filter in the bank corresponds to a coefficient in a frame vector. Can be a
:class:`LinearFilterBank` or something compatible with
:func:`pydrobert.speech.alias.alias_factory_subclass_from_arg`
frame_length_ms
The length of a frame, in milliseconds. Defaults to the length of the largest
filter in the bank
frame_shift_ms : float, optional
The offset between successive frames, in milliseconds
frame_style
Defaults to :obj:`'centered'` if ``bank.is_zero_phase``, :obj:`'causal'`
otherwise.
include_energy
pad_to_nearest_power_of_two
Whether the DFT should be a padded to a power of two for computational
efficiency
window_function
The window used in step 1. Can be a :class:`WindowFunction` or something
compatible with :func:`pydrobert.speech.alias_factory_subclass_from_arg`.
Defaults to :class:`pydrobert.speech.filters.GammaWindow` when `frame_style` is
:obj:`'causal'`, otherwise :class:`pydrobert.speech.filters.HannWindow`.
use_log
Whether to take the log of the sum from 3b.
use_power
Whether to sum the power spectrum or the magnitude spectrum
kaldi_shift
Dictates how to center frames when `frame_style` is :obj:`'centered'`. If
:obj:`True`, the k-th frame will be computed using the signal between ``signal[
k * frame_shift - frame_length // 2 + frame_shift // 2:k * frame_shift +
(frame_length + 1) // 2 + frame_shift // 2]``. These are the frame bounds for
Kaldi [povey2011]_. Otherwise, the k-th frame is ``signal[ k * frame_shift -
(frame_length + 1) // 2 + 1: k * frame_shift + frame_length // 2 + 1]``.
"""
aliases = {"stft"} #:
def __init__(
self,
bank: Union[LinearFilterBank, Mapping, str],
frame_length_ms: Optional[float] = None,
frame_shift_ms: Optional[float] = 10,
frame_style: Optional[Literal["causal", "centered"]] = None,
include_energy: bool = False,
pad_to_nearest_power_of_two: bool = True,
window_function: Optional[Union[WindowFunction, Mapping, str]] = None,
use_log: bool = True,
use_power: bool = False,
kaldi_shift: bool = False,
):
bank = alias_factory_subclass_from_arg(LinearFilterBank, bank)
self._rate = bank.sampling_rate
self._frame_shift = int(0.001 * frame_shift_ms * self._rate)
self._log = use_log
self._power = use_power
self._real = bank.is_real
self._started = False
self._first_frame = True
self._buf_len = 0
self._chunk_dtype = np.float64
self._kaldi_shift = kaldi_shift
if frame_style is None:
frame_style = "centered" if bank.is_zero_phase else "causal"
elif frame_style not in ("centered", "causal"):
raise ValueError('Invalid frame style: "{}"'.format(frame_style))
self._frame_style = frame_style
if frame_length_ms is None:
self._frame_length = max(
max(right - left for left, right in bank.supports),
# ensure at least one dft bin is nonzero per filter
int(
np.ceil(
2
* self._rate
/ min(right - left for left, right in bank.supports_hz)
)
),
)
else:
self._frame_length = int(0.001 * frame_length_ms * bank.sampling_rate)
self._buf = np.empty(self._frame_length, dtype=np.float64)
if window_function is None:
if frame_style == "causal":
window_function = GammaWindow()
else:
window_function = HannWindow()
else:
window_function = alias_factory_subclass_from_arg(
WindowFunction, window_function
)
self._window = window_function.get_impulse_response(self._frame_length)
if pad_to_nearest_power_of_two:
self._dft_size = int(2 ** np.ceil(np.log2(self._frame_length)))
else:
self._dft_size = self._frame_length
if self._power:
self._nonlin_op = _power
else:
self._nonlin_op = _mag
self._truncated_filts = []
self._filt_start_idxs = []
for filt_idx in range(bank.num_filts):
start_idx, truncated_filt = bank.get_truncated_response(
filt_idx, self._dft_size
)
self._filt_start_idxs.append(start_idx)
self._truncated_filts.append(truncated_filt)
super(ShortTimeFourierTransformFrameComputer, self).__init__(
bank, include_energy=include_energy
)
@property
def frame_style(self) -> str:
return self._frame_style
@property
def sampling_rate(self) -> float:
return self._rate
@property
def frame_length(self) -> int:
return self._frame_length
@property
def frame_shift(self) -> int:
return self._frame_shift
@property
def started(self) -> bool:
return self._started
@property
def kaldi_shift(self) -> bool:
return self._kaldi_shift
def _compute_frame(self, frame, coeffs):
# given the frame, store feature values within coeff
assert len(frame) == self._frame_length
assert len(coeffs) == self.num_coeffs
if self.includes_energy:
coeffs[0] = np.inner(frame, frame) / self._frame_length
if not self._power:
coeffs[0] **= 0.5
if self._log:
coeffs[0] = np.log(max(coeffs[0], config.LOG_FLOOR_VALUE))
coeffs = coeffs[1:]
if config.USE_FFTPACK:
from scipy import fftpack
is_odd = self._dft_size % 2
buffered_frame = np.zeros(self._dft_size + 2 - is_odd, dtype=np.float64)
buffered_frame[1 : self._frame_length + 1] = frame
buffered_frame[1 : self._frame_length + 1] *= self._window
buffered_frame[1 : self._dft_size + 1] = fftpack.rfft(
buffered_frame[1 : self._dft_size + 1], overwrite_x=True
)
buffered_frame[0] = buffered_frame[1]
buffered_frame[1] = 0
half_spect = buffered_frame.view(np.complex128)
else:
half_spect = np.fft.rfft(frame * self._window, n=self._dft_size)
assert half_spect.dtype == np.complex128
half_len = len(half_spect)
for filt_idx in range(len(self._filt_start_idxs)):
start_idx = self._filt_start_idxs[filt_idx]
truncated_filt = self._truncated_filts[filt_idx]
trunc_len = len(truncated_filt)
consumed = 0
conjugate = False
val = 0
while consumed < trunc_len:
if conjugate:
seg_len = (
min(
start_idx + trunc_len - consumed,
half_len - 2 + half_len % 2,
)
- start_idx
)
seg_len = max(0, seg_len)
if seg_len:
val += self._nonlin_op(
half_spect[
(-2 + (half_len % 2) - start_idx) : (
-2 + (half_len % 2) - start_idx - seg_len
) : -1
].conj()
* truncated_filt[consumed : consumed + seg_len]
)
start_idx -= half_len - 2 + half_len % 2
else:
seg_len = min(start_idx + trunc_len - consumed, half_len)
seg_len -= start_idx
seg_len = max(0, seg_len)
if seg_len:
val += self._nonlin_op(
half_spect[start_idx : start_idx + seg_len]
* truncated_filt[consumed : consumed + seg_len]
)
start_idx -= half_len
conjugate = not conjugate
consumed += seg_len
start_idx = max(0, start_idx)
if self._real:
val *= 2
if self._log:
val = np.log(max(val, config.LOG_FLOOR_VALUE))
coeffs[filt_idx] = val
def compute_chunk(self, chunk: np.ndarray) -> np.ndarray:
self._chunk_dtype = chunk.dtype # needed for `finalize`
# algorithm should work when frame shift is greater than frame
# length - buf_len may be negative, which will skip samples
buf_len = self._buf_len
chunk_len = len(chunk)
total_len = chunk_len + buf_len
noncausal_first = self._frame_style == "centered"
noncausal_first &= self._first_frame
if noncausal_first:
if self._kaldi_shift:
frame_length = (self._frame_length + 1) // 2
frame_length += self._frame_shift // 2
else:
frame_length = self._frame_length // 2 + 1
else:
frame_length = self._frame_length
frame_shift = self._frame_shift
num_frames = max(0, (total_len - frame_length) // frame_shift + 1)
coeffs = np.empty((num_frames, self.num_coeffs), dtype=self._chunk_dtype)
for frame_idx in range(num_frames):
frame_start_idx = frame_idx * frame_shift
if frame_start_idx < buf_len:
frame = np.concatenate(
[
self._buf[-(buf_len - frame_start_idx) :],
chunk[: frame_length - buf_len + frame_start_idx],
]
)
else:
frame = chunk[
(frame_start_idx - buf_len) : (
frame_start_idx - buf_len + frame_length
)
]
if noncausal_first:
# the first frame's l.h.s is a reflection of its r.h.s.
# shove the reflection into the buf - later frames
# might need it
chunk = chunk[(frame_length - buf_len) :]
chunk_len -= frame_length - buf_len
frame_length = self._frame_length
if self._kaldi_shift:
self._buf[:] = np.pad(
frame,
(self._frame_length // 2 - self._frame_shift // 2, 0),
"symmetric",
)
else:
self._buf[:] = np.pad(
frame, ((frame_length + 1) // 2 - 1, 0), "symmetric"
)
frame = self._buf
total_len = chunk_len + frame_length
buf_len = frame_length
noncausal_first = False
self._compute_frame(frame, coeffs[frame_idx])
self._first_frame = False
rem_len = total_len - num_frames * frame_shift
assert rem_len < frame_length
if rem_len > 0:
throw_away = total_len - rem_len
if throw_away < buf_len:
rem_ring_len = buf_len - throw_away
assert rem_ring_len < rem_len or (
rem_ring_len <= rem_len and not len(chunk)
)
self._buf[
self._frame_length
- rem_len : self._frame_length
- rem_len
+ rem_ring_len
] = self._buf[self._frame_length - rem_ring_len :]
self._buf[self._frame_length - (rem_len - rem_ring_len) :] = chunk
else:
self._buf[-rem_len:] = chunk[-rem_len:]
self._buf_len = rem_len
self._started = True
return coeffs
def finalize(self) -> np.ndarray:
buf_len = self._buf_len
frame_length = self._frame_length
frame_shift = self._frame_shift
if self._frame_style == "causal":
pad_left = 0
elif self._kaldi_shift:
pad_left = frame_length // 2 - frame_shift // 2
else:
pad_left = (frame_length + 1) // 2 - 1
num_frames = buf_len + frame_shift // 2
if not self._first_frame:
num_frames -= pad_left
pad_left = 0
num_frames //= frame_shift
if num_frames >= 1:
pad_right = (num_frames - 1) * frame_shift + frame_length - buf_len
pad_right -= pad_left
coeffs = np.empty((num_frames, self.num_coeffs), dtype=self._chunk_dtype)
frames = np.pad(self._buf[-buf_len:], (pad_left, pad_right), "symmetric",)
for frame_idx in range(num_frames):
frame = frames[
frame_idx * frame_shift : frame_idx * frame_shift + frame_length
]
self._compute_frame(frame, coeffs[frame_idx])
else:
coeffs = np.empty((0, self.num_coeffs), dtype=self._chunk_dtype)
self._buf_len = 0
self._started = False
self._first_frame = True
return coeffs
def compute_full(self, signal: np.ndarray) -> np.ndarray:
if self.started:
raise ValueError("Already started computing frames")
# there should be a nicer way to calculate this
frame_length = self._frame_length
frame_shift = self._frame_shift
if len(signal) < frame_length // 2 + 1:
return np.empty((0, self.num_coeffs), dtype=signal.dtype)
if self._frame_style == "causal":
pad_left = 0
elif self._kaldi_shift:
pad_left = frame_length // 2 - frame_shift // 2
else:
pad_left = (self._frame_length + 1) // 2 - 1
# total_len = pad_left + len(signal)
# num_frames = max(0, (total_len - frame_length) // frame_shift + 1)
# rem_len = total_len - num_frames * frame_shift
# if rem_len >= frame_length // 2 + 1:
# num_frames += 1
# pad_right = frame_length - rem_len
# else:
# pad_right = 0
num_frames = max(0, (len(signal) + frame_shift // 2) // frame_shift)
total_len = (num_frames - 1) * frame_shift - pad_left + frame_length
pad_right = max(0, total_len - len(signal))
if pad_left or pad_right:
signal = np.pad(signal, (pad_left, pad_right), "symmetric")
coeffs = np.zeros((num_frames, self.num_coeffs), dtype=signal.dtype)
for frame_idx in range(num_frames):
frame_left = frame_idx * frame_shift
self._compute_frame(
signal[frame_left : frame_left + frame_length], coeffs[frame_idx]
)
return coeffs
STFTFrameComputer = ShortTimeFourierTransformFrameComputer
[docs]
class ShortIntegrationFrameComputer(LinearFilterBankFrameComputer):
"""Compute features by integrating over the filter modulus
Each filter in the bank is convolved with the signal. A pointwise nonlinearity
pushes the frequency band towards zero. Most of the energy of the signal can be
captured in a short time integration. Though best suited to processing whole
utterances at once, short integration is compatable with the frame analogy if the
frame is assumed to be the cone of influence of the maximum-length filter.
For computational purposes, each filter's impulse response is clamped to zero
outside the support of the largest filter in the bank, making it a finite impulse
response filter. This effectively decreases the frequency resolution of the filters
which aren't already FIR. For better frequency resolution at the cost of
computational time, increase
:obj:`pydrobert.speech.config.EFFECTIVE_SUPPORT_THRESHOLD`.
Parameters
----------
bank
Each filter in the bank corresponds to a coefficient in a frame vector. Can be a
:class:`LinearFilterBank` or something compatible with
:func:`pydrobert.speech.alias.alias_factory_subclass_from_arg`
frame_shift_ms
The offset between successive frames, in milliseconds. Also the length of the
integration
frame_style
Defaults to :obj:`'centered'` if `bank.is_zero_phase`, :obj:`'causal'`
otherwise. If :obj:`'centered'` each filter of the bank is translated so that
its support lies in the center of the frame
include_energy
pad_to_nearest_power_of_two
Pad the DFTs used in computation to a power of two for efficient computation
window_function : pydrobert.speech.filters.WindowFunction, dict, or str
The window used to weigh integration. Can be a :class:`WindowFunction` or
something compatible with
:func:`pydrobert.speech.alias_factory_subclass_from_arg`. Defaults to
:class:`pydrobert.speech.filters.GammaWindow` when ``frame_style`` is
:obj:`'causal'`, otherwise :class:`pydrobert.speech.filters.HannWindow`.
use_power
Whether the pointwise linearity is the signal's power or magnitude
use_log
Whether to take the log of the integration
"""
aliases = {"si"} #:
def __init__(
self,
bank: Union[LinearFilterBank, Mapping, str],
frame_shift_ms: float = 10,
frame_style: Optional[Literal["causal", "centered"]] = None,
include_energy: bool = False,
pad_to_nearest_power_of_two: bool = True,
window_function: Optional[Union[WindowFunction, Mapping, str]] = None,
use_power: bool = False,
use_log: bool = True,
):
bank = alias_factory_subclass_from_arg(LinearFilterBank, bank)
self._rate = bank.sampling_rate
self._frame_shift = int(0.001 * frame_shift_ms * self._rate)
self._log = bool(use_log)
self._power = bool(use_power)
self._real = bank.is_real
self._ret_dtype = np.float64
self._x_rem, self._y_rem, self._skip = 0, 0, 0
self._started = False
if frame_style is None:
frame_style = "centered" if bank.is_zero_phase else "causal"
elif frame_style not in ("centered", "causal"):
raise ValueError('Invalid frame style: "{}"'.format(frame_style))
self._frame_style = frame_style
if window_function is None:
if frame_style == "causal":
window_function = GammaWindow()
else:
window_function = HannWindow()
else:
window_function = alias_factory_subclass_from_arg(
WindowFunction, window_function
)
window = window_function.get_impulse_response(2 * self._frame_shift)
self._window = window.reshape(2, self._frame_shift)
if frame_style == "centered":
# we will recenter all filters so that their zero sample
# is at max_support // 2
self._max_support = max(right - left for left, right in bank.supports)
self._translation = self._max_support // 2
else:
# we will shift all filters by whatever minimum value makes them all
# supported above/equal 0. We treat all that translated space as nonzero for
# the sake of the overlap-add algorithm
self._translation = 0
self._max_support = 0
for left, right in bank.supports:
self._translation = max(-left, self._translation)
self._max_support = max(self._max_support, right)
self._max_support += self._translation
min_support_hz = min(right - left for left, right in bank.supports_hz)
self._frame_length = self._max_support + self._frame_shift - 1
self._dft_size = max(
self._frame_length,
# make sure the effective support is represented in at least
# one dft bin
int(np.ceil(2 * self._rate / min_support_hz)),
)
if pad_to_nearest_power_of_two:
self._dft_size = int(2 ** np.ceil(np.log2(self._dft_size)))
self._x_buf = np.empty(self._dft_size, dtype=np.float64)
self._filts = []
if include_energy:
# this is a hacky way to make sure we get an accurate energy
# coefficient - dirac deltas return the signal or a
# translation of it
dirac_filter = np.zeros(self._dft_size, dtype=np.float64)
dirac_filter[self._translation] = 1
if self._real:
dirac_filter = np.fft.rfft(dirac_filter)
else:
dirac_filter = np.fft.fft(dirac_filter)
self._filts.append(dirac_filter)
for filt_idx in range(bank.num_filts):
filt = bank.get_impulse_response(filt_idx, self._dft_size)
if frame_style == "centered":
left_samp, right_samp = bank.supports[filt_idx]
mid_samp = (left_samp + right_samp) // 2
filt = np.roll(filt, self._translation - mid_samp + 1)
else:
filt = np.roll(filt, self._translation)
# we clamp the support in time to make the filter FIR.
self._filts.append(self._compute_dft(filt[: self._max_support]))
# we don't have to store the filtered signal, just the values
# that are accumulated in each frame shift. Since integration windows
# are not in general uniform, we add an index for taking the dot
# product of the first and second half of the window
y_blocks = self._dft_size - self._max_support + 2 * self._frame_shift
y_blocks = int(np.ceil(y_blocks / self._frame_shift))
self._y_buf = np.empty((y_blocks, 2, len(self._filts)), dtype=np.float64)
super(ShortIntegrationFrameComputer, self).__init__(
bank, include_energy=include_energy
)
@property
def frame_style(self) -> str:
return self._frame_style
@property
def sampling_rate(self) -> float:
return self._rate
@property
def frame_length(self) -> int:
return self._frame_length
@property
def frame_shift(self):
return self._frame_shift
@property
def started(self) -> bool:
return self._started
def compute_chunk(self, chunk: np.ndarray) -> np.ndarray:
self._compute_preamble(chunk)
chunk = self._handle_skip(chunk)
chunk_len = len(chunk)
valid_samples_per_dft = self._dft_size - self._max_support + 1
num_raw = self._x_rem + chunk_len
num_dfts = num_raw // valid_samples_per_dft
num_frames = max(0, (num_raw + self._y_rem) // self._frame_shift - 1)
if num_frames:
num_processed = (num_frames + 1) * self._frame_shift
else:
num_processed = self._y_rem
if num_processed - self._y_rem > num_dfts * valid_samples_per_dft:
num_dfts += 1
coeffs = np.empty((num_frames, self.num_coeffs), dtype=self._ret_dtype)
cur_frame, chunk_copied = 0, 0
for dft_idx in range(num_dfts):
end_idx = min(
(dft_idx + 1) * valid_samples_per_dft - self._x_rem, chunk_len
)
assert end_idx >= 0
y_keep = end_idx - dft_idx * valid_samples_per_dft + self._x_rem
start_idx = end_idx - self._dft_size # relative to chunk
if start_idx < 0:
chunk_to_copy = end_idx - chunk_copied
assert chunk_to_copy < self._dft_size
self._x_buf[: self._dft_size - chunk_to_copy] = self._x_buf[
chunk_to_copy:
]
self._x_buf[self._dft_size - chunk_to_copy :] = chunk[
chunk_copied:end_idx
]
chunk_copied = end_idx
cur_buf = self._x_buf
else:
cur_buf = chunk[start_idx:end_idx]
X_buf = self._compute_dft(cur_buf)
self._fill_y_buf(X_buf, y_keep)
del X_buf
while self._y_rem >= 2 * self._frame_shift:
self._compute_frame(coeffs[cur_frame, :])
cur_frame += 1
assert cur_frame == num_frames, (cur_frame, num_frames)
if chunk_len - chunk_copied:
chunk_to_copy = min(self._dft_size, chunk_len - chunk_copied)
self._x_buf[:-chunk_to_copy] = self._x_buf[chunk_to_copy:]
self._x_buf[-chunk_to_copy:] = chunk[-chunk_to_copy:]
self._x_rem = max(0, num_raw - num_dfts * valid_samples_per_dft)
return coeffs
def finalize(self) -> np.ndarray:
coeffs = np.empty((0, self.num_coeffs), dtype=self._ret_dtype)
if self._started:
frame_shift = self._frame_shift
frame_length = self._frame_length
if self._frame_style == "centered":
# we 'borrowed' a half frame's worth of coefficients
# from the start of the sequence in order to center the
# integration, so we discount that from the remaining
# samples
borrowed = frame_shift
else:
borrowed = 0
buf_len = self._translation - self._skip + self._x_rem
buf_len += self._y_rem - borrowed
num_frames = max(0, (buf_len + frame_shift // 2) // frame_shift)
if num_frames >= 1:
pad_right = (num_frames - 1) * frame_shift + frame_length
pad_right -= buf_len
coeffs = self.compute_chunk(np.zeros(pad_right, dtype=self._ret_dtype))[
:num_frames
]
self._started = False
return coeffs
def compute_full(self, signal: np.ndarray) -> np.ndarray:
if self._started:
raise ValueError("Already started computing frames")
return np.concatenate([self.compute_chunk(signal), self.finalize()])
def _compute_preamble(self, chunk):
# check for data type consistency, handle stuff if just started
if self._started:
if chunk.dtype != self._ret_dtype:
raise ValueError("Chunk does not share a type with previous chunks")
else:
if not np.issubdtype(chunk.dtype, np.floating):
raise ValueError("Chunk must be a float type")
self._ret_dtype = chunk.dtype
self._x_buf.fill(0)
self._y_buf.fill(0)
self._x_rem = 0
self._y_rem = 0
if self._frame_style == "centered":
self._skip = self._translation - self._frame_shift
if self._skip < 0:
self._x_rem = -self._skip
self._skip = 0
else:
self._skip = self._translation
self._started = True
def _handle_skip(self, chunk):
# 'skip' refers to some number of samples at the beginning of
# the utterance that won't count towards frames. We add them
# to x_buf, but do not increment x_rem
if not self._skip:
return chunk
assert not self._x_rem
consumed = min(self._skip, len(chunk))
x_len = len(self._x_buf)
if consumed < x_len:
self._x_buf[: x_len - consumed] = self._x_buf[consumed:]
self._x_buf[x_len - consumed :] = chunk[:consumed]
else:
self._x_buf[:] = chunk[consumed - x_len : consumed]
self._skip -= consumed
return chunk[consumed:]
def _fill_y_buf(self, X_buf, y_keep):
# using the fourier domain of the raw signal window, compute
# convolutions and store accumulations in y_buf
block_offs = self._y_rem // self._frame_shift
second_block_start = (block_offs + 1) * self._frame_shift - self._y_rem
for filt_idx in range(self.num_coeffs):
Y_buf = X_buf * self._filts[filt_idx]
y_valid = self._compute_idft(Y_buf)[-y_keep:]
if self._power:
y_valid[:] = y_valid * y_valid.conj()
else:
y_valid[:] = np.abs(y_valid)
del Y_buf
for block_end, block_idx in zip(
list(
range(
second_block_start,
y_keep + self._frame_shift,
self._frame_shift,
)
),
count(block_offs),
):
active_end = min(block_end, y_keep)
active_start = max(0, block_end - self._frame_shift)
y_active = y_valid[active_start:block_end].real
window_start = max(0, self._frame_shift - block_end)
window_end = self._frame_shift - block_end + active_end
window_active = self._window[:, window_start:window_end]
# block_accum = np.sum(y_active * window_active, axis=1)
self._y_buf[block_idx, :, filt_idx] += np.sum(
y_active * window_active, axis=1
)
self._y_rem += y_keep
def _compute_dft(self, buff):
# given a buffer, compute its fourier transform. Always copies
# the data
assert len(buff) <= self._dft_size
if config.USE_FFTPACK and self._real:
from scipy import fftpack
buffered_frame = np.zeros(
self._dft_size + 2 - self._dft_size % 2, dtype=np.float64
)
buffered_frame[1 : len(buff) + 1] = buff
buffered_frame[1 : self._dft_size + 1] = fftpack.rfft(
buffered_frame[1 : self._dft_size + 1], overwrite_x=True
)
buffered_frame[0] = buffered_frame[1]
buffered_frame[1] = 0
fourier_frame = buffered_frame.view(np.complex128)
elif self._real:
fourier_frame = np.fft.rfft(buff, n=self._dft_size)
elif config.USE_FFTPACK:
from scipy import fftpack
complex_frame = np.zeros(self._dft_size, dtype=np.complex128)
complex_frame[: len(buff)] = buff # implicit upcast if f32
fourier_frame = fftpack.fft(complex_frame, overwrite_x=True)
else:
fourier_frame = np.fft.fft(buff, n=self._dft_size)
assert fourier_frame.dtype == np.complex128
return fourier_frame
def _compute_idft(self, fourier_buff):
# given a buffer, compute its inverse fourier transform. Assume
# it's ok to modify the buffer.
assert fourier_buff.dtype == np.complex128
if config.USE_FFTPACK and self._real:
from scipy import fftpack
fourier_buff = fourier_buff.view(np.float64)
fourier_buff[1] = fourier_buff[0]
if self._dft_size % 2:
fourier_buff = fourier_buff[1:]
else:
fourier_buff = fourier_buff[1:-1]
idft = fftpack.irfft(fourier_buff, overwrite_x=True)
elif self._real:
idft = np.fft.irfft(fourier_buff, n=self._dft_size)
elif config.USE_FFTPACK:
from scipy import fftpack
idft = fftpack.ifft(fourier_buff, overwrite_x=True)
else:
idft = np.fft.ifft(fourier_buff)
return idft
def _compute_frame(self, coeffs):
# compute a frame's worth of coefficients from y_rem
assert self._y_rem >= 2 * self._frame_shift
# y_buf[0, 0, :] contains the accumulators of the first half of
# the frame (the first block) multiplied with the first half of
# the window
# y_buf[1, 1, :] contains the accumulators of the second half of
# the frame (the second block) multiplied with the second half
# of the window
coeffs[:] = self._y_buf[0, 0, :] + self._y_buf[1, 1, :]
if self._log:
coeffs[:] = np.log(np.maximum(coeffs, config.LOG_FLOOR_VALUE))
self._y_buf[:-1] = self._y_buf[1:]
self._y_buf[-1] = 0
self._y_rem -= self._frame_shift
SIFrameComputer = ShortIntegrationFrameComputer
[docs]
def frame_by_frame_calculation(
computer: FrameComputer, signal: np.ndarray, chunk_size: int = 2 ** 10
):
"""Compute feature representation of entire signal iteratively
This function constructs a feature matrix of a signal through successive calls to
``computer.compute_chunk``. Its return value should be identical to that of calling
``computer.compute_full(signal)``, but is possibly much slower.
``computer.compute_full`` should be favoured.
Parameters
----------
computer
signal
A 1D float array of the entire signal
chunk_size
The length of the signal buffer to process at a given time
Returns
-------
spec : np.ndarray
A 2D float array of shape ``(num_frames, num_coeffs)``. ``num_frames`` is
nonnegative (possibly 0). Contains some number of feature vectors, ordered in
time over axis 0.
Raises
------
ValueError
If already begin computing frames (``computer.started == True``)
"""
if computer.started:
raise ValueError("Already started computing frames")
coeffs = []
while len(signal):
coeffs.append(computer.compute_chunk(signal[:chunk_size]))
signal = signal[chunk_size:]
coeffs.append(computer.finalize())
return np.concatenate(coeffs)