diff --git a/docs/modules.rst b/docs/modules.rst index 986b66b..fdaea0d 100644 --- a/docs/modules.rst +++ b/docs/modules.rst @@ -4,4 +4,6 @@ Modules The following gives detailed information about all haiopy modules. .. toctree:: - :maxdepth: 1 + :maxdepth: 1 + + modules/haiopy.buffers diff --git a/docs/modules/haiopy.buffers.rst b/docs/modules/haiopy.buffers.rst new file mode 100644 index 0000000..5886ff8 --- /dev/null +++ b/docs/modules/haiopy.buffers.rst @@ -0,0 +1,8 @@ +haiopy.buffers +============== + +.. automodule:: haiopy.buffers + :members: + :inherited-members: + :undoc-members: + :show-inheritance: diff --git a/haiopy/__init__.py b/haiopy/__init__.py index ab59fe2..7ad5d4f 100644 --- a/haiopy/__init__.py +++ b/haiopy/__init__.py @@ -3,3 +3,11 @@ __author__ = """The pyfar developers""" __email__ = 'marco.berzborn@akustik.rwth-aachen.de' __version__ = '0.1.0' + + +from .devices import AudioDevice + + +__all__ = [ + 'AudioDevice' +] diff --git a/haiopy/buffers.py b/haiopy/buffers.py new file mode 100644 index 0000000..fd71a8d --- /dev/null +++ b/haiopy/buffers.py @@ -0,0 +1,231 @@ +import numpy as np +import pyfar as pf +from abc import abstractmethod +from threading import Event + + +class _Buffer(object): + """Abstract base class for audio buffers for block-wise iteration. + + The base class primarily implements buffer state related functionality. + """ + + def __init__(self, block_size) -> None: + """Create a Buffer object with a given block size. + + Parameters + ---------- + block_size : _type_ + _description_ + """ + self._check_block_size(block_size) + self._block_size = block_size + self._buffer = None + self._is_active = Event() + self._is_finished = Event() + + def _check_block_size(self, block_size): + """Check if the block size is an integer.""" + if type(block_size) != int: + raise ValueError("The block size needs to be an integer") + + def _set_block_size(self, block_size): + """Private block size setter implementing validity checks.""" + self.check_if_active() + self._check_block_size(block_size) + self._block_size = block_size + + @property + def block_size(self): + """Returns the block size of the buffer in samples""" + return self._block_size + + @block_size.setter + def block_size(self, block_size): + """Set the block size in samples. Only integer values are supported""" + self._set_block_size(block_size) + + @property + @abstractmethod + def sampling_rate(self): + """Return sampling rate.""" + pass + + def __iter__(self): + return self + + def __next__(self): + """Next dunder method for iteration""" + self._start() + return self.next() + + @abstractmethod + def next(self): + """Next method which for sub-class specific handling of data.""" + raise NotImplementedError() + + @property + def is_active(self): + """Return the state of the buffer. + `True` if the buffer is active, `False` if inactive.""" + return self._is_active.is_set() + + def check_if_active(self): + """Check if the buffer is active and raise an exception if so. + If the buffer is active a BufferError exception is raised. In case the + buffer is currently inactive, the method simply passes without any + return value. This method should always be called before attempting to + modify properties of the buffer to prevent undefined behavior during + iteration of the buffer. + + Raises + ------ + BufferError + Exception is raised if the buffer is currently active. + """ + if self.is_active: + raise BufferError( + "The buffer needs to be inactive to be modified.") + + def _stop(self, msg="Buffer iteration stopped."): + """Stop buffer iteration and set the state to inactive.""" + self._is_active.clear() + self._is_finished.set() + raise StopIteration(msg) + + def _start(self): + """Set the state to active. + Additional operations required before iterating the sub-class can be + implemented in the respective sub-class.""" + self._is_active.set() + self._is_finished.clear() + + def _reset(self): + """Stop and reset the buffer. + Resetting the buffer is implemented in the respective sub-class""" + self._is_active.clear() + self._is_finished.clear() + raise StopIteration("Resetting the buffer.") + + +class SignalBuffer(_Buffer): + """Buffer to block wise iterate a `pyfar.Signal` + + Examples + -------- + + >>> import pyfar as pf + >>> from haiopy.buffers import SignalBuffer + >>> block_size = 512 + >>> sine = pf.signals.sine(440, 4*block_size) + >>> buffer = SignalBuffer(block_size, sine) + >>> for block in buffer: + >>> print(block) + + + """ + + def __init__(self, block_size, signal) -> None: + """Initialize a `SignalBuffer` with a given block size from a + `pyfar.Signal`. + If the number of audio samples is not an integer multiple of the + block size, the last block will be filled with zeros. + + Parameters + ---------- + block_size : int + The block size in samples + signal : pyfar.Signal + The audio data to be block wise iterated. + + """ + super().__init__(block_size) + if not isinstance(signal, pf.Signal): + raise ValueError("signal must be a pyfar.Signal object.") + if signal.time.ndim > 2: + raise ValueError("Only one-dimensional arrays are allowed") + self._data = self._pad_data(signal) + self._update_data() + self._index = 0 + + def _pad_data(self, data): + """Pad the signal with zeros to avoid partially filled blocks + + Parameters + ---------- + data : pyfar.Signal + The input audio signal. + + Returns + ------- + pyfar.Signal + Zero-padded signal. + """ + n_samples = data.n_samples + if np.mod(n_samples, self._block_size) > 0: + pad_samples = self.block_size - np.mod(n_samples, self.block_size) + return pf.dsp.pad_zeros(data, pad_samples, mode='after') + else: + return data + + @property + def n_channels(self): + """The number of audio channels as integer.""" + return self.data.cshape[0] + + @property + def sampling_rate(self): + """The sampling rate of the underlying data.""" + return self.data.sampling_rate + + @property + def n_blocks(self): + """The number of blocks contained in the buffer.""" + return self._n_blocks + + @property + def index(self): + """The current block index as integer.""" + return self._index + + @property + def data(self): + """Return the underlying signal if the buffer is not active.""" + self.check_if_active() + return self._data + + @data.setter + def data(self, data): + """Set the underlying signal if the buffer is not active.""" + self.check_if_active() + self._data = self._pad_data(data) + self._update_data() + + def _set_block_size(self, block_size): + super()._set_block_size(block_size) + self._update_data() + + def _update_data(self): + """Update the data block strided of the underlying data. + The function creates a block-wise view of the numpy data array storing + the time domain data. + """ + self.check_if_active() + self._n_blocks = int(np.ceil(self.data.n_samples / self.block_size)) + self._strided_data = np.lib.stride_tricks.as_strided( + self.data.time, + (*self.data.cshape, self.n_blocks, self.block_size)) + + def next(self): + """Return the next audio block as numpy array and increment the block + index. + """ + if self._index < self._n_blocks: + current = self._index + self._index += 1 + return self._strided_data[..., current, :] + self._stop("The buffer is empty.") + + def _reset(self): + self._index = 0 + super()._reset() diff --git a/haiopy/devices.py b/haiopy/devices.py index bd2a3c1..28b0ef0 100644 --- a/haiopy/devices.py +++ b/haiopy/devices.py @@ -1,4 +1,11 @@ +from multiprocessing import Event +import numpy as np +import sys import sounddevice as sd +from abc import abstractmethod + +from haiopy.buffers import SignalBuffer +import pyfar as pf def list_devices(): @@ -8,89 +15,522 @@ def list_devices(): class _Device(object): def __init__( self, - id, + name, sampling_rate, block_size, dtype): super().__init__() + self._name = name + self._sampling_rate = sampling_rate + self._block_size = block_size + self._dtype = dtype @property def name(self): - raise NotImplementedError('Abstract method') + return self._name - def playback(): - pass + @property + def id(self): + return self._id - def record(): - pass + def sampling_rate(self): + return self._sampling_rate - def playback_record(): - pass + def block_size(self): + return self._block_size - def initialize_playback(): - pass + def dtype(self): + return self.dtype - def initialize_record(): - pass - def initialize_playback_record(): - pass +class AudioDevice(_Device): + def __init__( + self, + identifier=0, + sampling_rate=44100, + block_size=512, + dtype='float32', + ): + + identifier = sd.query_devices(identifier)['name'] + + super().__init__( + name=sd.query_devices(identifier)['name'], + sampling_rate=sampling_rate, + block_size=block_size, + dtype=dtype + ) + self._id = identifier + # self._extra_settings = extra_settings + + self._callback = None + self._stream = None + self._input_buffer = None + self._output_buffer = None - def abort(): + self._stream_finished = Event() + + @property + def id(self): + return self._id + + @abstractmethod + def check_settings(**kwargs): pass + @property + def name(self): + """The name of the device + """ + return self._name -class AudioDevice(_Device): + @property + def sampling_rate(self): + """The sampling rate of the audio device. + """ + return self._sampling_rate + + @sampling_rate.setter + def sampling_rate(self, value): + self.check_settings(value, self.dtype, self._extra_settings) + + @property + def block_size(self): + """The block size of the audio buffer. + """ + return self._block_size + + @block_size.setter + def block_size(self, block_size): + self._block_size = block_size + self.output_buffer.block_size = block_size + + @property + def dtype(self): + return self._dtype + + @property + @abstractmethod + def stream(self): + """The sounddevice audio stream. + """ + return self._stream + + def finished_callback(self) -> None: + """Custom callback after a audio stream has finished.""" + print("I'm finished.") + + def _finished_callback(self) -> None: + """Private portaudio callback after a audio stream has finished.""" + self._stream_finished.set() + self.finished_callback() + self.stream.stop() + + def start(self): + """Start the audio stream""" + if self.stream.closed: + print("Stream is closed. Try re-initializing.", file=sys.stderr) + return + + elif not self.stream.active: + self._stream_finished.clear() + self.stream.start() + else: + print("Stream is already active.", file=sys.stderr) + + def wait(self): + """Wait for the audio stream to finish.""" + self._stream_finished.wait(timeout=None) + + def abort(self): + """Stop the audio steam without finishing remaining buffers.""" + if self.stream.active is True: + self.stream.abort() + self._stop_buffer() + + def close(self): + """Close the audio device and release the sound card lock.""" + if self.stream is not None: + self.stream.close() + self._stop_buffer() + + def stop(self): + """Stop the audio stream after finishing the current buffer.""" + if self.stream.active is True: + self.stream.stop() + self._stop_buffer() + + @abstractmethod + def _stop_buffer(self): + raise NotImplementedError() + + @abstractmethod + def _reset_buffer(self): + raise NotImplementedError() + + +class InputAudioDevice(AudioDevice): def __init__( self, - id, - sampling_rate, - block_size, - dtype, + identifier=sd.default.device['input'], + sampling_rate=44100, + block_size=512, + channels=[1], + dtype='float32', + input_buffer=None, latency=None, extra_settings=None, - # finished_callback=None, clip_off=None, dither_off=None, never_drop_input=None, prime_output_buffers_using_stream_callback=None ): - super().__init__(id, sampling_rate, block_size, dtype) + + max_channel = np.max(channels) + n_channels = len(channels) + + sd.check_input_settings( + device=identifier, + channels=np.max([n_channels, max_channel+1]), + dtype=dtype, + extra_settings=extra_settings, + samplerate=sampling_rate) + + super().__init__( + identifier=identifier, + sampling_rate=sampling_rate, + block_size=block_size, + dtype=dtype) + + self._input_channels = channels + self.initialize() + + if input_buffer is None: + input_buffer = SignalBuffer( + self.block_size, + pf.Signal(np.zeros( + (self.n_channels_input, self.block_size), + dtype=self.dtype), + self.sampling_rate, fft_norm='rms')) + + self._input_buffer = input_buffer + + def check_settings( + self, + n_channels=None, + sampling_rate=None, + dtype=None, + extra_settings=None): + """Check if settings are compatible with the physical devices. + + Parameters + ---------- + n_channels : int + The number of channels to be used + sampling_rate : int + The audio sampling rate + dtype : np.float32, np.int8, np.int16, np.int32 + The audio buffer data type + extra_settings : extra settings + Audio API specific settings. + """ + sd.check_input_settings( + device=self.id, + channels=n_channels, + dtype=dtype, + extra_settings=extra_settings, + samplerate=sampling_rate) @property - def stream(): - pass + def input_channels(self): + """The number of input channels supported by the device""" + return self._input_channels - @staticmethod - def callback(): - pass + @property + def n_channels_input(self): + """The total number of output channels. - def playback(data): - # fill queue, stream.start() - pass + Returns + ------- + int + The number of output channels + """ + return len(self._input_channels) - def record(n_samples): - # stream start, read into the queue - pass + @property + def _n_channels_stream(self): + """The number of output channels required for the stream. + This includes a number of unused pre-pended channels which need to be + filled with zeros before writing the portaudio buffer. + """ + return np.max(self._input_channels) + 1 - def playback_record(data): - # see combination above - pass + @property + def max_channels_input(self): + """The number of output channels supported by the device""" + return sd.query_devices(self.id, 'input')['max_output_channels'] - def initialize_playback(channels): - # init queue, define callback, init stream - pass + def _set_block_size(self, block_size): + self.input_buffer.block_size = block_size - def initialize_record(channels): - pass + def input_callback(self, indata, frames, time, status): + assert frames == self.block_size + if status.output_underflow: + print('Output underflow: increase blocksize?', file=sys.stderr) + raise sd.CallbackAbort('Buffer underflow') + assert not status - def initialize_playback_record(input_channels, output_channels): - pass + try: + next(self.input_buffer)[:] = indata[..., self.input_channels].T + except StopIteration as e: + raise sd.CallbackStop("Buffer empty") from e - def abort(): - # abort - pass + def initialize(self): + """Initialize the playback stream for a given number of channels. + """ + ostream = sd.InputStream( + self.sampling_rate, + self.block_size, + self.id, + self._n_channels_stream, + self.dtype, + callback=self.input_callback, + finished_callback=self._finished_callback) + self._stream = ostream - def close(): - # remove stream - pass + def initialize_buffer(self): + """Initialize the buffer + """ + self.input_buffer._start() + self.input_buffer._is_active.wait() + + @property + def input_buffer(self): + return self._input_buffer + + @input_buffer.setter + def input_buffer(self, buffer): + if buffer.block_size != self.block_size: + raise ValueError( + "The buffer's block size does not match. ", + f"Needs to be {self.block_size}") + + if buffer.n_channels != self.n_channels_input: + raise ValueError( + "The buffer's channel number does not match the channel " + f"mapping. Currently used channels are {self.output_channels}") + + self._input_buffer = buffer + + def _stop_buffer(self): + self.input_buffer._stop() + + def start(self): + self.initialize_buffer() + super().start() + + def wait(self): + super().wait() + self.input_buffer._is_finished.wait() + + +class OutputAudioDevice(AudioDevice): + + def __init__( + self, + identifier=sd.default.device['output'], + sampling_rate=44100, + block_size=512, + channels=[1], + dtype='float32', + output_buffer=None, + latency=None, + extra_settings=None, + clip_off=None, + dither_off=None, + never_drop_input=None, + prime_output_buffers_using_stream_callback=None): + + # First check the settings before continuing + max_channel = np.max(channels) + n_channels = len(channels) + sd.check_output_settings( + device=identifier, + channels=np.max([n_channels, max_channel+1]), + dtype=dtype, + extra_settings=extra_settings, + samplerate=sampling_rate) + + super().__init__( + identifier=identifier, + sampling_rate=sampling_rate, + block_size=block_size, + dtype=dtype) + + self._output_channels = channels + + if output_buffer is None: + output_buffer = SignalBuffer( + self.block_size, + pf.Signal(np.zeros( + (self.n_channels_output, self.block_size), + dtype=self.dtype), + self.sampling_rate, fft_norm='rms')) + # if output_buffer.data.shape[0] != self.n_channels_output: + # raise ValueError( + # "The shape of the buffer does not match the channel mapping") + self.output_buffer = output_buffer + self.initialize() + + def check_settings( + self, + n_channels=None, + sampling_rate=None, + dtype=None, + extra_settings=None): + """Check if settings are compatible with the physical devices. + + Parameters + ---------- + n_channels : int + The number of channels to be used + sampling_rate : int + The audio sampling rate + dtype : np.float32, np.int8, np.int16, np.int32 + The audio buffer data type + extra_settings : extra settings + Audio API specific settings. + + Raises + ------ + PortAudioError + If the settings are incompatible with the device an exception is + raised. + """ + sd.check_output_settings( + device=self.id, + channels=n_channels, + dtype=dtype, + extra_settings=extra_settings, + samplerate=sampling_rate) + + @property + def output_channels(self): + return self._output_channels + + @property + def n_channels_output(self): + """The total number of output channels. + + Returns + ------- + int + The number of output channels + """ + return len(self._output_channels) + + @property + def _n_channels_stream(self): + """The number of output channels required for the stream. + + This includes a number of unused pre-pended channels which need to be + filled with zeros before writing the portaudio buffer. In case of + using only the first channel, portaudio plays back a mono signal, + which will be broadcasted to the first two channels. To avoid this, + the minimum number of channels opened is always two, the unused second + channel is filled with zeros. + """ + return np.max((2, np.max(self._output_channels) + 1)) + + @property + def max_channels_output(self): + """The number of output channels supported by the device""" + return sd.query_devices(self.id, 'output')['max_output_channels'] + + def output_callback(self, outdata, frames, time, status) -> None: + """Portudio callback for output streams + + Parameters + ---------- + outdata : array + Output buffer view + frames : int + Length of the buffer + time : PaTimestamp + Timestamp of the callback event + status : sounddevice.CallbackFlags + Portaudio status flags + + Raises + ------ + sd.CallbackAbort + Abort the playback if a buffer underflow occurs. + sd.CallbackStop + Stop the playback if the output queue is empty. + """ + assert frames == self.block_size + if status.output_underflow: + print('Output underflow: increase blocksize?', file=sys.stderr) + raise sd.CallbackAbort('Buffer underflow') + assert not status + + try: + # Write a block to an array with all required output channels + # including zeros for unused channels. Required as sounddevice does + # not support routing matrices + self._stream_block_out[self.output_channels] = next( + self.output_buffer) + outdata[:] = self._stream_block_out.T + except StopIteration as e: + raise sd.CallbackStop("Buffer empty") from e + + def initialize(self): + """Initialize the playback stream for a given number of channels.""" + ostream = sd.OutputStream( + self.sampling_rate, + self.block_size, + self.id, + self._n_channels_stream, + self.dtype, + callback=self.output_callback, + finished_callback=self._finished_callback) + self._stream = ostream + # Init array buffering a block of all required output channels + # including zeros for unused channels. Required as sounddevice does + # not support routing matrices + self._stream_block_out = np.zeros( + (self._n_channels_stream, self.block_size), dtype=self.dtype) + + def initialize_buffer(self): + self.output_buffer._start() + self.output_buffer._is_active.wait() + + @property + def output_buffer(self): + return self._output_buffer + + @output_buffer.setter + def output_buffer(self, buffer): + if buffer.block_size != self.block_size: + raise ValueError( + "The buffer's block size does not match. ", + f"Needs to be {self.block_size}") + + if buffer.n_channels != self.n_channels_output: + raise ValueError( + "The buffer's channel number does not match the channel " + f"mapping. Currently used channels are {self.output_channels}") + + self._output_buffer = buffer + + def _stop_buffer(self): + self._output_buffer._stop() + + def start(self): + self.output_buffer._start() + self.output_buffer._is_active.wait() + super().start() + + def wait(self): + super().wait() + self.output_buffer._is_finished.wait() diff --git a/haiopy/io.py b/haiopy/io.py index 9cfa4bc..6feef3d 100644 --- a/haiopy/io.py +++ b/haiopy/io.py @@ -1,4 +1,4 @@ -from . import devices +from . import devices # noqa: F401 TODO: remove this after implementation class _AudioIO(object): diff --git a/setup.cfg b/setup.cfg index 930a60b..33e9721 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,5 +22,6 @@ exclude = docs test = pytest [tool:pytest] -collect_ignore = ['setup.py'] +collect_ignore = ['setup.py', 'private'] +norecursedirs = private build dist *.egg venv *vendor* diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..f04c8da --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,52 @@ +import pytest +from .utils import signal_buffer_stub +from haiopy.buffers import SignalBuffer +import pyfar as pf +import numpy as np + + +@pytest.fixture +def empty_buffer_stub(): + """Create a stub representing an empty ArrayBuffer. + + Returns + ------- + ArrayBuffer + Stub of ArrayBuffer + """ + + block_size = 512 + n_blocks = 10 + data = np.zeros((1, n_blocks*block_size), dtype='float32') + + buffer = signal_buffer_stub(block_size, data) + duration = block_size*n_blocks/buffer.sampling_rate + + return buffer, duration + + +@pytest.fixture +def sine_buffer_stub(): + """Create a stub representing an empty ArrayBuffer. + + Returns + ------- + buffer: SignalBuffer + Stub of SignalBuffer + duration: float + Duration of the buffer in seconds. Required if waiting for the buffer + to finish is required. + + """ + sampling_rate = 44100 + block_size = 512 + n_blocks = 86 + data = np.zeros((1, n_blocks*block_size), dtype='float32') + t = np.arange(0, block_size*n_blocks) + data = np.sin(2*np.pi*t*(block_size + 1)/sampling_rate)*10**(-6/20) + + data = np.atleast_2d(data).astype('float32') + buffer = SignalBuffer(block_size, pf.Signal(data, sampling_rate)) + duration = block_size*n_blocks/sampling_rate + + return buffer, duration diff --git a/tests/sounddevice_mocks.py b/tests/sounddevice_mocks.py new file mode 100644 index 0000000..4aaef64 --- /dev/null +++ b/tests/sounddevice_mocks.py @@ -0,0 +1,23 @@ +import pytest +from unittest.mock import MagicMock + +import numpy as np +import sounddevice as sd + + +def output_stream_mock(block_size=512, sampling_rate=44100, channels=1): + # pass + stream = MagicMock(spec_set=sd.OutputStream) + stream.samplerate = sampling_rate + stream.blocksize = block_size + stream.device = 0 + stream.channels = channels + stream.dtype = np.float32 + stream.latency = 0.1 + # stream.extra_settings = None + # stream.clip_off = False + # stream.dither_off = False + # stream.never_drop_input = False + # stream.prime_output_buffers_using_stream_callback = False + + return stream diff --git a/tests/test_buffers.py b/tests/test_buffers.py new file mode 100644 index 0000000..7c3a98c --- /dev/null +++ b/tests/test_buffers.py @@ -0,0 +1,209 @@ +import numpy as np +import numpy.testing as npt +from haiopy.buffers import _Buffer, SignalBuffer +import pytest +import pyfar as pf + + +def test_buffer_block_size(): + + block_size = 512 + buffer = _Buffer(block_size) + + assert buffer._block_size == block_size + + assert buffer.block_size == block_size + + new_block_size = 128 + buffer.block_size = int(new_block_size) + assert buffer._block_size == new_block_size + + with pytest.raises( + ValueError, match='The block size needs to be an integer'): + buffer.block_size = float(10) + + with pytest.raises( + ValueError, match='The block size needs to be an integer'): + _Buffer(float(10)) + + +def test_buffer_state(): + block_size = 512 + + # create new buffer + buffer = _Buffer(block_size) + buffer._start() + assert buffer.is_active is True + + # check if the correct error is raised on stopping + with pytest.raises(StopIteration, match="iteration stopped"): + buffer._stop() + + assert buffer.is_active is False + + # The buffer will be automatically set to be active after the first call + # to the __next__ method + # The pytest raises is required here, as the sub-class specific next + # method is an abstract class method + with pytest.raises(NotImplementedError): + next(buffer) + assert buffer._is_active is True + + # check_if_active() raises an exception if buffer is active + with pytest.raises(BufferError, match="needs to be inactive"): + buffer.check_if_active() + + # check iterator dunder + assert iter(buffer) == buffer + + +def test_signal_buffer(): + sampling_rate = 44100 + n_blocks = 10 + block_size = 512 + n_samples = block_size*n_blocks + sine = pf.signals.sine( + 440, n_samples, amplitude=[1, 1], sampling_rate=sampling_rate) + + with pytest.raises(ValueError, match='one-dimensional'): + SignalBuffer( + block_size, + pf.Signal(np.zeros((2, 3, block_size), 'float32'), sampling_rate)) + + with pytest.raises(ValueError, match='must be a pyfar.Signal'): + SignalBuffer(block_size, [1, 2, 3]) + + buffer = SignalBuffer(block_size, sine) + + assert buffer._n_blocks == n_blocks + assert buffer.n_blocks == n_blocks + + # test sampling rate getter + assert buffer.sampling_rate == sampling_rate + + # test number of channels + assert buffer.n_channels == 2 + + # check if the initial index s correct + assert buffer._index == 0 + assert buffer.index == 0 + + # check if the data arrays are correct + npt.assert_array_equal(buffer._data.time, sine.time) + npt.assert_array_equal(buffer.data.time, sine.time) + + # check if the data strides are correct + strided_buffer_data = np.lib.stride_tricks.as_strided( + sine.time, (*sine.cshape, n_blocks, block_size)) + npt.assert_array_equal( + buffer._strided_data, strided_buffer_data) + + # check first step + block_data = next(buffer) + npt.assert_array_equal(block_data, strided_buffer_data[..., 0, :]) + + # check second step + block_data = next(buffer) + npt.assert_array_equal(block_data, strided_buffer_data[..., 1, :]) + + # check if a error is raised if the end of the buffer is reached + with pytest.raises(StopIteration, match="buffer is empty"): + while True: + next(buffer) + + # test the looping blocks + buffer = SignalBuffer(block_size, sine) + for idx, block in enumerate(buffer): + assert buffer.is_active is True + npt.assert_array_equal( + block, strided_buffer_data[..., idx, :]) + + # check if state is set to inactive after loop finished + assert buffer.is_active is False + + +def test_signal_buffer_padding(): + sampling_rate = 44100 + n_samples = 800 + + n_blocks = 2 + block_size = 512 + sine = pf.signals.sine( + 440, n_samples, amplitude=[1], sampling_rate=sampling_rate) + + buffer = SignalBuffer(block_size, sine) + + assert buffer.data.n_samples == n_blocks*block_size + + expected_data = np.concatenate(( + np.squeeze(sine.time), + np.zeros(n_blocks*block_size-n_samples, dtype=float))) + + npt.assert_equal(np.squeeze(buffer.data.time), expected_data) + + +def test_writing_signal_buffer(): + sampling_rate = 44100 + block_size = 512 + + block_data = np.atleast_2d(np.arange(block_size)) + + sig = pf.Signal(np.zeros(block_size, dtype='float32'), sampling_rate) + buffer = SignalBuffer(block_size, sig) + + next(buffer)[:] = block_data + + # we need to stop the buffer which raises a StopIteration error + with pytest.raises(StopIteration): + buffer._stop() + np.testing.assert_array_equal(buffer.data.time, block_data) + + +def test_signal_buffer_updates(): + sampling_rate = 44100 + n_blocks = 10 + block_size = 512 + n_samples = block_size*n_blocks + noise = pf.signals.noise( + n_samples, rms=[1, 1], sampling_rate=sampling_rate) + sine = pf.signals.sine( + 440, n_samples, amplitude=[1, 1], sampling_rate=sampling_rate) + + # Create a new buffer + buffer = SignalBuffer(block_size, noise) + + # Set a new signal as data for the buffer + buffer.data = sine + npt.assert_array_equal(buffer._data.time, sine.time) + npt.assert_array_equal(buffer.data.time, sine.time) + + # The new block size is 4 times smaller than the old one + new_block_size = 128 + buffer.block_size = new_block_size + + # The data itself is not touched in this case + npt.assert_array_equal(buffer._data.time, sine.time) + npt.assert_array_equal(buffer.data.time, sine.time) + + # Stride the array with the new block size + # The new number of blocks is an integer multiple of the old block size + new_n_blocks = n_samples // new_block_size + strided_buffer_data = np.lib.stride_tricks.as_strided( + sine.time, (*sine.time.shape[:-1], new_n_blocks, new_block_size)) + npt.assert_array_equal( + buffer._strided_data, strided_buffer_data) + + # Check if Errors are raised when buffer is in use + next(buffer) + assert buffer.is_active is True + + # Setting the block size is not allowed if the buffer is active + with pytest.raises(BufferError, match="needs to be inactive"): + buffer.block_size = 512 + + # Setting and getting the data is not allowed if the buffer is active + with pytest.raises(BufferError, match="needs to be inactive"): + buffer.data = sine + + with pytest.raises(BufferError, match="needs to be inactive"): + buffer.data diff --git a/tests/test_devices.py b/tests/test_devices.py new file mode 100644 index 0000000..51287a5 --- /dev/null +++ b/tests/test_devices.py @@ -0,0 +1,25 @@ +from haiopy import devices +from . import utils +from . import sounddevice_mocks as sdm +from unittest.mock import patch + + +@patch('sounddevice.query_devices', new=utils.query_devices) +def test_audio_device(): + devices.AudioDevice(0) + + +@patch('sounddevice.query_devices', new=utils.query_devices) +@patch('sounddevice.check_input_settings', new=utils.check_input_settings) +def test_check_input_settings(): + in_device = devices.InputAudioDevice() + in_device.check_settings() + + +@patch('sounddevice.query_devices', new=utils.query_devices) +@patch('sounddevice.check_output_settings', new=utils.check_output_settings) +@patch('sounddevice.OutputStream', new=sdm.output_stream_mock()) +def test_check_output_settings(empty_buffer_stub): + out_device = devices.OutputAudioDevice( + output_buffer=empty_buffer_stub) + out_device.check_settings() diff --git a/tests/test_devices_physical.py b/tests/test_devices_physical.py new file mode 100644 index 0000000..26b1aaa --- /dev/null +++ b/tests/test_devices_physical.py @@ -0,0 +1,147 @@ +from haiopy import devices +import sounddevice as sd +from . import utils +from unittest.mock import patch, MagicMock +import time +import pytest +import pyfar as pf + + +def default_device_multiface_fireface(): + device_list = sd.query_devices() + found = False + + valid_devices = ['Multiface', 'Fireface', 'Scarlett 2i4'] + + for valid_device in valid_devices: + for identifier, device in enumerate(device_list): + if valid_device in device['name']: + found = True + break + if not found: + raise ValueError( + "Please connect Fireface or Multiface, or specify test device.") + + return identifier, device + # default = MagicMock(spec_sec=sd.default) + # default.device = [idx, idx] + # default._default_device = (idx, idx) + + # return default + + +def test_default_device_helper(): + identifier, device = default_device_multiface_fireface() + fireface = 'Fireface' in sd.query_devices(identifier)['name'] + multiface = 'Multiface' in sd.query_devices(identifier)['name'] + scarlett = 'Scarlett 2i4' in sd.query_devices(identifier)['name'] + assert fireface or multiface or scarlett + + if fireface: + assert device['max_input_channels'] == 18 + assert device['max_output_channels'] == 18 + + if scarlett: + assert device['max_input_channels'] == 2 + assert device['max_output_channels'] == 4 + +# ----------------------------------------------------------------------------- +# Input Device Tests +# ----------------------------------------------------------------------------- + + +def test_check_input_settings(): + identifier, config = default_device_multiface_fireface() + + default_sampling_rate = config['default_samplerate'] + + # Create device + in_device = devices.InputAudioDevice(identifier) + + # Check sampling rate + in_device.check_settings(sampling_rate=default_sampling_rate) + with pytest.raises(sd.PortAudioError, match="Invalid"): + in_device.check_settings(sampling_rate=10) + + # Check the dtype, apparently this raises a ValueError if invalid + in_device.check_settings(dtype='float32') + with pytest.raises(ValueError, match="Invalid"): + in_device.check_settings(dtype=float) + + # Check number of channels + in_device.check_settings(n_channels=config['max_input_channels']) + with pytest.raises(sd.PortAudioError, match="Invalid"): + in_device.check_settings(config['max_input_channels']+10) + + +def test_recording(empty_buffer_stub): + + buffer = empty_buffer_stub[0] + assert pf.dsp.rms(buffer.data) < 1e-14 + + identifier, config = default_device_multiface_fireface() + + in_device = devices.InputAudioDevice( + identifier=identifier, + input_buffer=buffer, + channels=[1]) + in_device.check_settings() + + in_device.start() + assert in_device.input_buffer.is_active is True + in_device.wait() + assert in_device.input_buffer.is_active is False + + assert pf.dsp.rms(in_device.input_buffer.data) > 1e-10 + +# ----------------------------------------------------------------------------- +# Output Device Tests +# ----------------------------------------------------------------------------- + + +def test_check_output_settings(empty_buffer_stub): + identifier, config = default_device_multiface_fireface() + channels = [3] + block_size = 512 + + buffer = empty_buffer_stub[0] + + out_device = devices.OutputAudioDevice( + identifier, 44100, block_size, channels=channels, dtype='float32', + output_buffer=buffer) + + # Check sampling rate + out_device.check_settings(sampling_rate=config['default_samplerate']) + with pytest.raises(sd.PortAudioError, match="Invalid"): + out_device.check_settings(sampling_rate=10) + + # Check the dtype, apparently this raises a ValueError if invalid + out_device.check_settings(dtype='float32') + with pytest.raises(ValueError, match="Invalid"): + out_device.check_settings(dtype=float) + + # Check number of channels + out_device.check_settings(n_channels=config['max_output_channels']) + with pytest.raises(sd.PortAudioError, match="Invalid"): + out_device.check_settings(config['max_output_channels']+10) + + +def test_sine_playback(sine_buffer_stub): + + buffer = sine_buffer_stub[0] + duration = sine_buffer_stub[1] + identifier, config = default_device_multiface_fireface() + + sampling_rate = config['default_samplerate'] + + out_device = devices.OutputAudioDevice( + identifier=identifier, + output_buffer=buffer, + channels=[0], + sampling_rate=sampling_rate) + out_device.check_settings() + + out_device.start() + assert out_device.output_buffer.is_active is True + out_device.wait() + assert out_device.output_buffer.is_active is False diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..9ba1894 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,109 @@ +import pytest +from unittest import mock +import numpy as np +import pyfar as pf +from haiopy.buffers import SignalBuffer + + +def default_devices(): + return [0, 0] + + +def query_devices(id=None, kind=None): + if kind == 'input': + return { + 'name': "MockDevice", + 'index': 0, + 'hostapi': 'CoreAudio', + 'max_input_channels': 8, + 'default_low_input_latency': 0.1, + 'default_high_input_latency': 0.15, + 'default_samplerate': 44100 + } + elif kind == 'output': + return { + 'name': "MockInputDevice", + 'index': 0, + 'hostapi': 'CoreAudio', + 'max_output_channels': 8, + 'default_low_output_latency': 0.1, + 'default_high_output_latency': 0.15, + 'default_samplerate': 44100 + } + else: + return { + 'name': "MockOutput", + 'index': 0, + 'hostapi': 'CoreAudio', + 'max_input_channels': 8, + 'max_output_channels': 8, + 'default_low_input_latency': 0.1, + 'default_low_output_latency': 0.1, + 'default_high_input_latency': 0.15, + 'default_high_output_latency': 0.15, + 'default_samplerate': 44100 + } + + +def supported_mock_device_parameters(): + return { + 'samplerate': [44.1e3, 48e3, 2*44.1e3, 96e3, 192e3], + 'dtype': ['float32'], + 'channels': [8]} + + +def check_output_settings( + device=None, + channels=None, + dtype=None, + extra_settings=None, + samplerate=None): + """So far this only passes for all settings""" + pass + + +def check_input_settings( + device=None, + channels=None, + dtype=None, + extra_settings=None, + samplerate=None): + """So far this only passes for all settings""" + pass + + +def signal_buffer_stub(block_size=512, data=np.zeros((1, 512))): + """Generate a ArrayBuffer Stub with given block size and data + + Parameters + ---------- + block_size : int + Block size for the sound card callback + data : array_like, float32, int24, int16, int8 + The data of the buffer + """ + if np.mod(data.shape[-1], block_size) != 0: + raise ValueError( + 'The data needs to be an integer multiple of the block size') + + n_blocks = data.shape[-1] // block_size + + sig = pf.Signal(data, 44100, fft_norm='rms') + + # def next_block(): + # strided = np.lib.stride_tricks.as_strided( + # data, (*data.shape[:-1], n_blocks, block_size)) + + # for idx in range(n_blocks): + # yield strided[..., idx, :] + + # buffer = mock.MagicMock(spec_set=ArrayBuffer(block_size, data)) + buffer = SignalBuffer(block_size, sig) + + # buffer.data = data + # buffer._strided_data = np.atleast_3d(data) + # buffer.next = np.atleast_3d(next_block) + # buffer.n_blocks = n_blocks + # buffer.block_size = block_size + + return buffer