diff --git a/src/audio/sinks/audioworklets/webaudio_sink_processor.audioworklet.ts b/src/audio/sinks/audioworklets/webaudio_sink_processor.audioworklet.ts index 8a475c6d..d6986c07 100644 --- a/src/audio/sinks/audioworklets/webaudio_sink_processor.audioworklet.ts +++ b/src/audio/sinks/audioworklets/webaudio_sink_processor.audioworklet.ts @@ -1,6 +1,5 @@ import { CircularTypedArray } from '../../../utils/circularTypedArray'; -import { OPUS_ENCODER_RATE, OPUS_ENCODER_CHUNK_SAMPLES_COUNT, MAX_LATENCY } from '../../../utils/constants'; -import { SynchronizedAudioBuffer } from '../../../utils/audio/synchronizedAudioBuffer'; +import { OPUS_ENCODER_RATE, MAX_LATENCY } from '../../../utils/constants'; const CHANNELS = 2; const BUFFER_SIZE = MAX_LATENCY * (OPUS_ENCODER_RATE / 1000) * CHANNELS; @@ -14,11 +13,7 @@ declare const sampleRate: number; // @ts-ignore class RawPcmPlayerProcessor extends AudioWorkletProcessor { chunkBuffer = new Float32Array(128 * CHANNELS); - currentSampleIndex = -1; buffer = new CircularTypedArray(Float32Array, BUFFER_SIZE); - synchronizedBuffer: SynchronizedAudioBuffer; - lastReceivedStreamTime = -1; - audioClockDrift = -1; port: MessagePort; @@ -28,27 +23,18 @@ class RawPcmPlayerProcessor extends AudioWorkletProcessor { } handleMessage_(event) { - if (event.data.type === 'init') { - this.synchronizedBuffer = new SynchronizedAudioBuffer(this.buffer, CHANNELS, this.getIdealAudioPosition, { debug: event.data.debug }); - } if (event.data.type === 'chunk') { - const offset = event.data.i * OPUS_ENCODER_CHUNK_SAMPLES_COUNT * CHANNELS; - this.buffer.set(event.data.chunk, offset); - this.audioClockDrift = event.data.audioClockDrift; + this.buffer.set(event.data.chunk, event.data.offset); + console.log(`received chunk at diff: ${(event.data.offset / 2) - currentFrame}`); } } - getIdealAudioPosition = () => currentFrame - this.audioClockDrift - process(inputs, outputs) { - if (!this.synchronizedBuffer || this.audioClockDrift === -1) { - return true; - } - const chunkBuffer = this.synchronizedBuffer.readNextChunk(outputs[0][0].length); + this.buffer.getInTypedArray(this.chunkBuffer, currentFrame * CHANNELS, this.chunkBuffer.length); for (let sampleIndex = 0; sampleIndex < outputs[0][0].length; sampleIndex++) { - outputs[0][0][sampleIndex] = chunkBuffer[sampleIndex * CHANNELS]; - outputs[0][1][sampleIndex] = chunkBuffer[sampleIndex * CHANNELS + 1]; + outputs[0][0][sampleIndex] = this.chunkBuffer[sampleIndex * CHANNELS]; + outputs[0][1][sampleIndex] = this.chunkBuffer[sampleIndex * CHANNELS + 1]; } return true; diff --git a/src/audio/sinks/webaudio_sink.ts b/src/audio/sinks/webaudio_sink.ts index c2095390..ea17bb45 100644 --- a/src/audio/sinks/webaudio_sink.ts +++ b/src/audio/sinks/webaudio_sink.ts @@ -1,6 +1,5 @@ // This is only used in a browser context -import debug from '../../utils/environment/log'; import { AudioChunkStreamOutput } from '../../utils/audio/chunk_stream'; import { isBrowser } from '../../utils/environment/isBrowser'; import { AudioSink } from './audio_sink'; @@ -8,8 +7,9 @@ import { AudioSource } from '../sources/audio_source'; import { WebAudioSinkDescriptor } from './sink_type'; import { AudioSourcesSinksManager } from '../audio_sources_sinks_manager'; import { AudioInstance } from '../utils'; -import { OPUS_ENCODER_RATE } from '../../utils/constants'; +import { OPUS_ENCODER_CHUNK_SAMPLES_COUNT, OPUS_ENCODER_RATE } from '../../utils/constants'; import { NumericStatsTracker } from '../../utils/basicNumericStatsTracker'; +import { DriftAwareAudioBufferTransformer } from '../../utils/audio/synchronizedAudioBuffer'; const AUDIO_DRIFT_HISTORY_INTERVAL = 50; const AUDIO_DRIFT_HISTORY_DURATION = 2 * 60 * 1000; @@ -19,10 +19,11 @@ export const isWebAudioAvailable = () => typeof AudioContext === 'function' && t export class WebAudioSink extends AudioSink { type: 'webaudio' = 'webaudio'; local: true = true; - workletNode: AudioWorkletNode; - context: AudioContext; - cleanAudioContext; - audioClockDriftHistory = new NumericStatsTracker((v) => v, AUDIO_DRIFT_HISTORY_DURATION / AUDIO_DRIFT_HISTORY_INTERVAL); + private workletNode: AudioWorkletNode; + private context: AudioContext; + private cleanAudioContext: () => void; + private audioClockDriftHistory = new NumericStatsTracker((v) => v, AUDIO_DRIFT_HISTORY_DURATION / AUDIO_DRIFT_HISTORY_INTERVAL); + private audioBufferTransformer: DriftAwareAudioBufferTransformer; constructor(descriptor: WebAudioSinkDescriptor, manager: AudioSourcesSinksManager) { super(descriptor, manager); @@ -38,18 +39,6 @@ export class WebAudioSink extends AudioSink { throw new Error('Webaudio sink already started'); } - // we cannot put this class in the global file scope as it will be created by the nodejs process - // which will throw an error because AudioWorkletNode only exists browser side - class RawPcmPlayerWorklet extends AudioWorkletNode { - constructor(context) { - super(context, 'rawPcmPlayerProcessor', { - numberOfOutputs: 1, - numberOfInputs: 0, - outputChannelCount: [source.channels], - }); - } - } - if (!this.context) { this.context = new AudioContext({ sampleRate: OPUS_ENCODER_RATE, @@ -59,25 +48,18 @@ export class WebAudioSink extends AudioSink { latencyHint: 0.01, }); } - // eslint-disable-next-line - const audioworkletPath = require('./audioworklets/webaudio_sink_processor.audioworklet.ts'); - await this.context.audioWorklet.addModule(audioworkletPath); - this.workletNode = new RawPcmPlayerWorklet(this.context); - this.workletNode.port.postMessage({ - type: 'init', - debug: debug.enabled('soundsync:audioSinkDebug'), - }); const volumeNode = this.context.createGain(); volumeNode.gain.value = this.volume; - this.workletNode.connect(volumeNode); - volumeNode.connect(this.context.destination); - - this.context.resume(); - const syncDeviceVolume = () => { volumeNode.gain.value = this.volume; }; this.on('update', syncDeviceVolume); + + this.audioBufferTransformer = new DriftAwareAudioBufferTransformer( + this.channels, + () => Math.floor(this.audioClockDriftHistory.mean()), + ); + const driftRegisterInterval = setInterval(this.registerDrift, AUDIO_DRIFT_HISTORY_INTERVAL); // this should be set before any await to make sure we have the clean method available if _stopSink is called between _startSink ends this.cleanAudioContext = () => { @@ -87,9 +69,30 @@ export class WebAudioSink extends AudioSink { this.context.suspend(); delete this.context; this.cleanAudioContext = undefined; + this.audioClockDriftHistory.flush(); clearInterval(driftRegisterInterval); }; + // we cannot put this class in the global file scope as it will be created by the nodejs process + // which will throw an error because AudioWorkletNode only exists browser side + class RawPcmPlayerWorklet extends AudioWorkletNode { + constructor(context) { + super(context, 'rawPcmPlayerProcessor', { + numberOfOutputs: 1, + numberOfInputs: 0, + outputChannelCount: [source.channels], + }); + } + } + // eslint-disable-next-line + const audioworkletPath = require('./audioworklets/webaudio_sink_processor.audioworklet.ts'); + await this.context.audioWorklet.addModule(audioworkletPath); + this.workletNode = new RawPcmPlayerWorklet(this.context); + this.workletNode.connect(volumeNode); + volumeNode.connect(this.context.destination); + + this.context.resume(); + // The context can be blocked from starting because of new webaudio changes // we need to wait for a user input to start it if (this.context.state === 'suspended') { @@ -120,12 +123,12 @@ export class WebAudioSink extends AudioSink { return; } const chunk = new Float32Array(data.chunk.buffer, data.chunk.byteOffset, data.chunk.byteLength / Float32Array.BYTES_PER_ELEMENT); + const { bufferTimestamp, buffer } = this.audioBufferTransformer.transformChunk(chunk, (data.i * OPUS_ENCODER_CHUNK_SAMPLES_COUNT)); this.workletNode.port.postMessage({ type: 'chunk', - i: data.i, - chunk, - audioClockDrift: this.audioClockDriftHistory.full(1) ? this.audioClockDriftHistory.mean() : -1, - }, [chunk.buffer]); // we transfer the chunk.buffer to the audio worklet to prevent a memory copy + chunk: buffer, + offset: bufferTimestamp * this.channels, + }, [buffer.buffer]); // we transfer the buffer.buffer to the audio worklet to prevent a memory copy } registerDrift = () => { @@ -134,12 +137,12 @@ export class WebAudioSink extends AudioSink { // audio context is not started yet, could be because it's waiting for a user interaction to start return; } - const audioClockDrift = (( + const audioClockDrift = ((contextTime * 1000) - ( this.pipedSource.peer.getCurrentTime(true) - this.pipedSource.startedAt - this.pipedSource.latency + this.latencyCorrection - ) - (contextTime * 1000)) * (this.rate / 1000); + )) * (this.rate / 1000); if (!Number.isNaN(audioClockDrift)) { this.audioClockDriftHistory.push(audioClockDrift); } diff --git a/src/utils/audio/synchronizedAudioBuffer.ts b/src/utils/audio/synchronizedAudioBuffer.ts index e28852c5..6cfb59c9 100644 --- a/src/utils/audio/synchronizedAudioBuffer.ts +++ b/src/utils/audio/synchronizedAudioBuffer.ts @@ -3,7 +3,6 @@ // It will hard or soft sync depending on the clock drift between the audio device and the ideal time import { l } from '../environment/log'; -import { CircularTypedArray } from '../circularTypedArray'; import { HARD_SYNC_MIN_AUDIO_DRIFT, SOFT_SYNC_MIN_AUDIO_DRIFT, OPUS_ENCODER_RATE } from '../constants'; const DRIFT_CORRECTION_MIN_INTERVAL = OPUS_ENCODER_RATE * 5; // force minimum 5 seconds between each latency correction @@ -114,83 +113,3 @@ export class DriftAwareAudioBufferTransformer { return { bufferTimestamp, buffer }; } } - -export class SynchronizedAudioBuffer { - private returnBuffer = Buffer.alloc(128 * Float32Array.BYTES_PER_ELEMENT * 2); // start with a reasonably large buffer that will be resized if necessary - private typedReturnBuffer = new Float32Array(this.returnBuffer.buffer); - private delayedDriftCorrection = 0; - // number of samples to wait until starting to correct for drift again, used to prevent over correction - private ignoreDriftFor = 0; - public softSyncThreshold: number; - public hardSyncThreshold: number; - - constructor( - public buffer: CircularTypedArray, - public channels: number, - public idealPositionPerChannelGetter: () => number, - { - debug = false, - softSyncThreshold = SOFT_SYNC_MIN_AUDIO_DRIFT, - hardSyncThreshold = HARD_SYNC_MIN_AUDIO_DRIFT, - } = {}, - ) { - // eslint-disable-next-line no-console - this.log = debug ? console.log : () => null; - this.softSyncThreshold = softSyncThreshold; - this.hardSyncThreshold = hardSyncThreshold; - } - - log: (str: string) => void; - - readNextChunk(samplesPerChannel: number) { - const idealBufferPosition = this.idealPositionPerChannelGetter() * this.channels; - let sampleDelta = 0; - if (this.buffer.getReaderPointer() === 0) { - this.buffer.setReaderPointer(idealBufferPosition); - } - // this.log(`= ideal position ${idealBufferPosition}, current position ${this.buffer.getReaderPointer()}, diff ${idealBufferPosition - this.buffer.getReaderPointer()}`); - if (this.delayedDriftCorrection) { - sampleDelta = Math.floor(Math.min(samplesPerChannel * 0.02, Math.abs(this.delayedDriftCorrection) * 0.1)) * Math.sign(this.delayedDriftCorrection); // max 1% sample to remove or duplicate, or 10% of drift - this.delayedDriftCorrection -= sampleDelta; - if (sampleDelta === 0) { - this.log(`= finished delayed soft drift correction`); - this.delayedDriftCorrection = 0; - this.ignoreDriftFor = DRIFT_CORRECTION_MIN_INTERVAL; - } - } else if (!this.ignoreDriftFor) { - const drift = Math.floor(idealBufferPosition - this.buffer.getReaderPointer()); - const driftDuration = drift / (OPUS_ENCODER_RATE / 1000); - if (Math.abs(driftDuration) > this.hardSyncThreshold) { - // the drift is too important, this can happens in case the CPU was locked for a while (after suspending the device for example) - // this will induce a audible glitch - this.buffer.setReaderPointer(idealBufferPosition); - this.ignoreDriftFor = DRIFT_CORRECTION_MIN_INTERVAL; - this.log(`= hard sync: ${driftDuration}ms`); - } else if (Math.abs(driftDuration) > this.softSyncThreshold) { - // we should be correcting for the drift but it's small enough that we can do this only by adding - // or removing some samples in the output buffer - // if drift is > 0, it means the audio device is going too fast - // so we need to slow down the rate at which we read from the audio buffer to go back to the correct time - sampleDelta = Math.floor(Math.min(samplesPerChannel * 0.02, Math.abs(drift) * 0.1)) * Math.sign(drift); // max 1% sample to remove or duplicate, or 10% of drift - this.ignoreDriftFor = DRIFT_CORRECTION_MIN_INTERVAL; - this.delayedDriftCorrection = Math.floor((drift - sampleDelta) * 0.4); - this.log(`= soft sync: ${driftDuration}ms (${drift} samples), injecting ${sampleDelta} samples now`); - } - } - if (this.ignoreDriftFor) { - this.ignoreDriftFor = Math.max(0, this.ignoreDriftFor - samplesPerChannel); - } - const samplesToReadByChannel = samplesPerChannel + sampleDelta; - if (this.returnBuffer.byteLength !== samplesToReadByChannel * this.channels * Float32Array.BYTES_PER_ELEMENT) { - this.returnBuffer = Buffer.alloc(samplesToReadByChannel * this.channels * Float32Array.BYTES_PER_ELEMENT); - this.typedReturnBuffer = new Float32Array(this.returnBuffer.buffer); - } - this.buffer.getAtReaderPointerInTypedArray(this.typedReturnBuffer, samplesToReadByChannel * this.channels); - const buffer = smartResizeAudioBuffer( - this.typedReturnBuffer, - samplesPerChannel, - this.channels, - ); - return buffer; - } -} diff --git a/src/utils/circularTypedArray.ts b/src/utils/circularTypedArray.ts index 2a52bdc2..ef99b0f8 100644 --- a/src/utils/circularTypedArray.ts +++ b/src/utils/circularTypedArray.ts @@ -53,7 +53,10 @@ export class CircularTypedArray { } set(data: T, offset: number) { - const realOffset = offset % this.buffer.length; + let realOffset = offset % this.buffer.length; + if (realOffset < 0) { + realOffset += this.buffer.length; + } const overflow = Math.max(0, (realOffset + data.length) - this.buffer.length); if (!overflow) { this.buffer.set(data, realOffset);