Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix healthcheck and streaming with whisper #59

Open
wants to merge 16 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion healthcheck.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ else
# GPU is being utilized, assuming healthy
exit 0
else
celery --app=celery_app.celeryapp inspect ping -d ${SERVICE_NAME}_worker@$HOSTNAME || exit 1
if [ "$SERVICE_MODE" = "websocket" ]
then
python3 websocket/healthcheck.py ${STREAMING_PORT:=80} || exit 1
AudranBert marked this conversation as resolved.
Show resolved Hide resolved
exit 0
AudranBert marked this conversation as resolved.
Show resolved Hide resolved
else
celery --app=celery_app.celeryapp inspect ping -d ${SERVICE_NAME}_worker@$HOSTNAME || exit 1
fi
fi
fi
1 change: 1 addition & 0 deletions kaldi/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN apt-get update && \
g++ \
make \
cmake \
curl \
git \
zlib1g-dev \
automake \
Expand Down
173 changes: 106 additions & 67 deletions test/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,55 @@
import websockets
import json
import os
import time
import shutil
import subprocess
import logging


logging.basicConfig(level=logging.INFO, filename="linstt_streaming.log", filemode="w")
logger = logging.getLogger(__name__)

async def send_data(websocket, stream, logger, stream_config):
"""Asynchronously load and send data to the WebSocket."""
duration = 0
try:
while True:
data = stream.read(int(stream_config['stream_duration'] * 2 * 16000))
duration += stream_config['stream_duration']
if stream_config['audio_file'] and not data:
logger.debug("Audio file finished")
break

if stream_config['vad']:
import auditok
audio_events = auditok.split(
data,
min_dur=0.2,
max_silence=0.3,
energy_threshold=65,
sampling_rate=16000,
sample_width=2,
channels=1
)
audio_events = list(audio_events)
if len(audio_events) == 0:
logger.debug(f"Full silence for chunk: {duration - stream_config['stream_duration']:.1f}s --> {duration:.1f}s")
await asyncio.sleep(stream_config['stream_wait'])
continue

await websocket.send(data)
logger.debug(f"Sent audio chunk: {duration - stream_config['stream_duration']:.1f}s --> {duration:.1f}s")
await asyncio.sleep(stream_config['stream_wait'])

except asyncio.CancelledError:
logger.debug("Data sending task cancelled.")
except Exception as e:
logger.error(f"Error in data sending: {e}")
logger.debug(f"Waiting before sending EOF")
await asyncio.sleep(5)
logger.debug(f"Sending EOF")
await websocket.send('{"eof" : 1}')

def linstt_streaming(*kargs, **kwargs):
text = asyncio.run(_linstt_streaming(*kargs, **kwargs))
Expand All @@ -14,91 +60,83 @@ async def _linstt_streaming(
audio_file,
ws_api = "ws://localhost:8080/streaming",
verbose = False,
language = None
language = None,
apply_vad = False
):

if verbose:
logger.setLevel(logging.DEBUG)
stream_config = {"language": language, "sample_rate": 16000, "vad": apply_vad, "stream_duration": 0.5, "stream_wait": 0.5}
if audio_file is None:
import pyaudio
# Init pyaudio
audio = pyaudio.PyAudio()
stream = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=2048)
if verbose > 1:
print("Start recording")
stream = audio.open(format=pyaudio.paInt16, channels=1, rate=stream_config['sample_rate'], input=True, frames_per_buffer=2048)
logger.debug("Start recording")
stream_config["audio_file"] = None
else:
subprocess.run(["ffmpeg", "-y", "-i", audio_file, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "tmp.wav"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subprocess.run(["ffmpeg", "-y", "-i", audio_file, "-acodec", "pcm_s16le", "-ar", str(stream_config['sample_rate']), "-ac", "1", "tmp.wav"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stream = open("tmp.wav", "rb")
logger.debug(f"Start streaming file {audio_file}")
stream_config["audio_file"] = audio_file
text = ""
partial = None
async with websockets.connect(ws_api) as websocket:
duration = 0
async with websockets.connect(ws_api, ping_interval=None, ping_timeout=None) as websocket:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping_interval / ping timeout : is that related to TCP breaks (HTTP WS keepAlive) ? Check comment below on handling only ConnectionClosedOK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to avoid closing websocket when it is not wanted. When testing, if I did not send any messages for a long time, the websocket closed by itself. By disabling the ping it doesn't close anymore.

if language is not None:
config = {"config" : {"sample_rate": 16000, "language": language}}
config = {"config" : {"sample_rate": stream_config['sample_rate'], "language": stream_config['language']}}
else:
config = {"config" : {"sample_rate": 16000}}
config = {"config" : {"sample_rate": stream_config['sample_rate']}}
await websocket.send(json.dumps(config))
while True:
data = stream.read(2*2*16000)
if audio_file and not data:
if verbose > 1:
print("\nAudio file finished")
break
await websocket.send(data)
res = await websocket.recv()
message = json.loads(res)
if message is None:
if verbose > 1:
print("\n Received None")
continue
if "partial" in message.keys():
partial = message["partial"]
if partial and verbose:
print_partial(partial)
elif "text" in message.keys():
line = message["text"]
if line and verbose:
print_final(line)
if line:
if text:
text += "\n"
text += line
elif verbose:
print(f"??? {message}")
if verbose > 1:
print("Sending EOF")
await websocket.send('{"eof" : 1}')
res = await websocket.recv()
message = json.loads(res)
if isinstance(message, str):
message = json.loads(message)
if verbose > 1:
print("Received EOF", message)
if text:
text += " "
text += message["text"]
send_task = asyncio.create_task(send_data(websocket, stream, logger, stream_config))
last_text_partial = None
try:
while True:
res = await websocket.recv()
message = json.loads(res)
if message is None:
logger.debug("\n Received None")
continue
if "text" in message.keys():
line = message["text"]
if line and verbose:
print_streaming(line, partial=False, last_partial=last_text_partial)
logger.debug(f'Final (after {duration:.1f}s): "{line}"')
last_text_partial = None
if line:
if text:
text += "\n"
text += line
elif "partial" in message.keys():
partial = message["partial"]
if partial and verbose:
print_streaming(partial, partial=True, last_partial=last_text_partial)
last_text_partial = partial
logger.debug(f'Partial (after {duration:.1f}s): "{partial}"')
elif verbose:
logger.debug(f"??? {message}")
except asyncio.CancelledError:
logger.debug("Message processing thread stopped as websocket was closed.")
except websockets.exceptions.ConnectionClosedOK:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, only ConnectionClosedOK is handled to cancel the websocket. Please review whether other exceptions could occur in this context. For example, a TCP RST might trigger a ConnectionClosed event, similar to how WebSocketDisconnect is handled in FastAPI.

Additionally, a finally block might be missing to handle unexpected exceptions and actively close the context for the current socket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of that script, that is supposed to be just for testing the streaming locally I'm not sure if we can encounter other errors. I will add a finally to close the websocket and try to see if something else can be triggered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,
Check actual implementation in the real Websocket server to ensure corner-cases are well handled please.

logger.debug("Websocket closed")
if verbose:
print_final("= FULL TRANSCRIPTION ", background="=")
terminal_size = shutil.get_terminal_size()
width = terminal_size.columns
print()
print(" FULL TRANSCRIPTION ".center(width, "-"))
print(text)
if audio_file is not None:
os.remove("tmp.wav")
return text

def print_partial(text):
text = text + "…"
terminal_size = shutil.get_terminal_size()
width = terminal_size.columns
start = ((len(text) - 1)// width) * width
if start > 0:
print(" "*width, end="\r")
if start < len(text) - 1:
print("…"+text[start+1:]+" "*(width-len(text)-start-1), end="\r")
else:
print(text[-width:], end="\r")
else:
print(text, end="\r")

def print_final(text, background=" "):
def print_streaming(text, partial=True, last_partial=None):
if partial:
text = text + "…"
terminal_size = shutil.get_terminal_size()
width = terminal_size.columns
print(background * width, end="\r")
if last_partial is not None:
number_of_lines = ((len(last_partial)+1)//width)+1
for i in range(number_of_lines):
print("\033[F\033[K", end="")
print(text)

if __name__ == "__main__":
Expand All @@ -110,9 +148,10 @@ def print_final(text, background=" "):
parser.add_argument('--server', help='Transcription server',
default="ws://localhost:8080/streaming",
)
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose mode")
parser.add_argument("-v", "--verbose", default=False, action="store_true", help="Verbose mode")
parser.add_argument("--audio_file", default=None, help="A path to an audio file to transcribe (if not provided, use mic)")
parser.add_argument("--language", default=None, help="Language model to use")
parser.add_argument("--apply_vad", default=False, action="store_true", help="Apply VAD to the audio stream before sending it to the server")
args = parser.parse_args()

res = linstt_streaming(args.audio_file, args.server, verbose=2 if args.verbose else 1, language=args.language)
res = linstt_streaming(args.audio_file, args.server, verbose=args.verbose, language=args.language, apply_vad=args.apply_vad)
17 changes: 17 additions & 0 deletions websocket/healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import argparse
import asyncio
import websockets

async def _linstt_streaming(port):
ws_api = f"ws://localhost:{port}"
async with websockets.connect(ws_api, ping_interval=None, ping_timeout=None) as websocket:
await websocket.close()
exit(0)
exit(1)

if __name__=="__main__":
parser = argparse.ArgumentParser(description="Test the streaming STT")
parser.add_argument("port", type=int, default=80, help="Port to connect to")
args = parser.parse_args()

asyncio.run(_linstt_streaming(args.port))
AudranBert marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion websocket/websocketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async def _fun_wrapper(ws):

async def WSServer(port: int):
"""Launch the websocket server"""
async with websockets.serve(_fun_wrapper, "0.0.0.0", serving_port):
async with websockets.serve(_fun_wrapper, "0.0.0.0", serving_port, ping_interval=None, ping_timeout=None):
await asyncio.Future()


Expand Down
2 changes: 1 addition & 1 deletion whisper/Dockerfile.ctranslate2
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ghcr.io/opennmt/ctranslate2:latest-ubuntu20.04-cuda12.2
LABEL maintainer="[email protected], [email protected], [email protected]"

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends ffmpeg git
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends ffmpeg git curl

# Install python dependencies
COPY whisper/requirements.ctranslate2.txt ./
Expand Down
2 changes: 1 addition & 1 deletion whisper/Dockerfile.ctranslate2.cpu
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.9
LABEL maintainer="[email protected], [email protected], [email protected]"

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends ffmpeg git
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends ffmpeg git curl

# Install python dependencies
COPY whisper/requirements.ctranslate2.txt ./
Expand Down
2 changes: 1 addition & 1 deletion whisper/Dockerfile.torch
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.9
LABEL maintainer="[email protected], [email protected], [email protected]"

RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg
RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg curl

# Install python dependencies
COPY whisper/requirements.torch.txt ./
Expand Down
2 changes: 1 addition & 1 deletion whisper/Dockerfile.torch.cpu
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.9
LABEL maintainer="[email protected], [email protected], [email protected]"

RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg
RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg curl

# Force CPU versions of torch
RUN pip3 install \
Expand Down
Loading