From dbb54f9291455921201f940eb495ee250af879ab Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 13:43:17 +0100 Subject: [PATCH 01/13] add process_num to Process --- tests/supervisors/test_multiprocess.py | 4 ++-- uvicorn/supervisors/multiprocess.py | 13 ++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index e1f594efe..283ddf42c 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -50,13 +50,13 @@ def run(sockets: list[socket.socket] | None) -> None: def test_process_ping_pong() -> None: - process = Process(Config(app=app), target=lambda x: None, sockets=[]) + process = Process(Config(app=app), target=lambda x: None, sockets=[], process_num=0) threading.Thread(target=process.always_pong, daemon=True).start() assert process.ping() def test_process_ping_pong_timeout() -> None: - process = Process(Config(app=app), target=lambda x: None, sockets=[]) + process = Process(Config(app=app), target=lambda x: None, sockets=[], process_num=0) assert not process.ping(0.1) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index e198fe780..c085d1cce 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -28,7 +28,9 @@ def __init__( config: Config, target: Callable[[list[socket] | None], None], sockets: list[socket], + process_num: int, ) -> None: + self.process_num = process_num self.real_target = target self.parent_conn, self.child_conn = Pipe() @@ -120,8 +122,8 @@ def __init__( signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig)) def init_processes(self) -> None: - for _ in range(self.processes_num): - process = Process(self.config, self.target, self.sockets) + for process_num in range(self.processes_num): + process = Process(self.config, self.target, self.sockets, process_num) process.start() self.processes.append(process) @@ -137,7 +139,7 @@ def restart_all(self) -> None: for idx, process in enumerate(self.processes): process.terminate() process.join() - new_process = Process(self.config, self.target, self.sockets) + new_process = Process(self.config, self.target, self.sockets, process.process_num) new_process.start() self.processes[idx] = new_process @@ -174,7 +176,7 @@ def keep_subprocess_alive(self) -> None: return # pragma: full coverage logger.info(f"Child process [{process.pid}] died") - process = Process(self.config, self.target, self.sockets) + process = Process(self.config, self.target, self.sockets, process.process_num) process.start() self.processes[idx] = process @@ -206,8 +208,9 @@ def handle_hup(self) -> None: # pragma: py-win32 def handle_ttin(self) -> None: # pragma: py-win32 logger.info("Received SIGTTIN, increasing the number of processes.") + processes_num = self.processes_num self.processes_num += 1 - process = Process(self.config, self.target, self.sockets) + process = Process(self.config, self.target, self.sockets, processes_num) process.start() self.processes.append(process) From 06f774340721a325c80746c60436a9d7cd053470 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 13:45:52 +0100 Subject: [PATCH 02/13] pass process_num to app state --- tests/supervisors/test_multiprocess.py | 4 ++-- uvicorn/lifespan/on.py | 4 ++++ uvicorn/server.py | 18 +++++++++++------- uvicorn/supervisors/multiprocess.py | 6 +++--- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 283ddf42c..f51c16f69 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -50,13 +50,13 @@ def run(sockets: list[socket.socket] | None) -> None: def test_process_ping_pong() -> None: - process = Process(Config(app=app), target=lambda x: None, sockets=[], process_num=0) + process = Process(Config(app=app), target=lambda x, y: None, sockets=[], process_num=0) threading.Thread(target=process.always_pong, daemon=True).start() assert process.ping() def test_process_ping_pong_timeout() -> None: - process = Process(Config(app=app), target=lambda x: None, sockets=[], process_num=0) + process = Process(Config(app=app), target=lambda x, y: None, sockets=[], process_num=0) assert not process.ping(0.1) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index 09df984ea..21327c88c 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -78,6 +78,10 @@ async def shutdown(self) -> None: async def main(self) -> None: try: app = self.config.loaded_app + + # inject worker id into app state + app.app.state.uvicorn_worker_id = self.state['uvicorn_worker_id'] + scope: LifespanScope = { "type": "lifespan", "asgi": {"version": self.config.asgi_version, "spec_version": "2.0"}, diff --git a/uvicorn/server.py b/uvicorn/server.py index f14026f16..8253d9aa9 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -60,15 +60,15 @@ def __init__(self, config: Config) -> None: self._captured_signals: list[int] = [] - def run(self, sockets: list[socket.socket] | None = None) -> None: + def run(self, sockets: list[socket.socket] | None = None, process_num: int = 0) -> None: self.config.setup_event_loop() - return asyncio.run(self.serve(sockets=sockets)) + return asyncio.run(self.serve(sockets=sockets, process_num=process_num)) - async def serve(self, sockets: list[socket.socket] | None = None) -> None: + async def serve(self, sockets: list[socket.socket] | None = None, process_num: int = 0) -> None: with self.capture_signals(): - await self._serve(sockets) + await self._serve(sockets, process_num) - async def _serve(self, sockets: list[socket.socket] | None = None) -> None: + async def _serve(self, sockets: list[socket.socket] | None = None, process_num: int = 0) -> None: process_id = os.getpid() config = self.config @@ -81,7 +81,7 @@ async def _serve(self, sockets: list[socket.socket] | None = None) -> None: color_message = "Started server process [" + click.style("%d", fg="cyan") + "]" logger.info(message, process_id, extra={"color_message": color_message}) - await self.startup(sockets=sockets) + await self.startup(sockets=sockets, process_num=process_num) if self.should_exit: return await self.main_loop() @@ -91,7 +91,11 @@ async def _serve(self, sockets: list[socket.socket] | None = None) -> None: color_message = "Finished server process [" + click.style("%d", fg="cyan") + "]" logger.info(message, process_id, extra={"color_message": color_message}) - async def startup(self, sockets: list[socket.socket] | None = None) -> None: + async def startup(self, sockets: list[socket.socket] | None = None, process_num: int = 0) -> None: + # inject process_num as worker id into lifespan state + worker_id = process_num + 1 + self.lifespan.state['uvicorn_worker_id'] = worker_id + await self.lifespan.startup() if self.lifespan.should_exit: self.should_exit = True diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index c085d1cce..1c4af9cca 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -26,7 +26,7 @@ class Process: def __init__( self, config: Config, - target: Callable[[list[socket] | None], None], + target: Callable[[list[socket] | None, int], None], sockets: list[socket], process_num: int, ) -> None: @@ -62,7 +62,7 @@ def target(self, sockets: list[socket] | None = None) -> Any: # pragma: no cove ) threading.Thread(target=self.always_pong, daemon=True).start() - return self.real_target(sockets) + return self.real_target(sockets, self.process_num) def is_alive(self, timeout: float = 5) -> bool: if not self.process.is_alive(): @@ -105,7 +105,7 @@ class Multiprocess: def __init__( self, config: Config, - target: Callable[[list[socket] | None], None], + target: Callable[[list[socket] | None, int], None], sockets: list[socket], ) -> None: self.config = config From e3018052b481c0c047bc1d43bb12d43819d73ee4 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 14:46:08 +0100 Subject: [PATCH 03/13] remove unnecessary state "processes_num" --- uvicorn/supervisors/multiprocess.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 1c4af9cca..fc5f85c3f 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -112,7 +112,6 @@ def __init__( self.target = target self.sockets = sockets - self.processes_num = config.workers self.processes: list[Process] = [] self.should_exit = threading.Event() @@ -121,8 +120,12 @@ def __init__( for sig in SIGNALS: signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig)) + @property + def processes_num(self) -> int: + return len(self.processes) + def init_processes(self) -> None: - for process_num in range(self.processes_num): + for process_num in range(self.config.workers): process = Process(self.config, self.target, self.sockets, process_num) process.start() self.processes.append(process) @@ -208,9 +211,8 @@ def handle_hup(self) -> None: # pragma: py-win32 def handle_ttin(self) -> None: # pragma: py-win32 logger.info("Received SIGTTIN, increasing the number of processes.") - processes_num = self.processes_num - self.processes_num += 1 - process = Process(self.config, self.target, self.sockets, processes_num) + process_num = self.processes_num + process = Process(self.config, self.target, self.sockets, process_num) process.start() self.processes.append(process) @@ -219,7 +221,6 @@ def handle_ttou(self) -> None: # pragma: py-win32 if self.processes_num <= 1: logger.info("Already reached one process, cannot decrease the number of processes anymore.") return - self.processes_num -= 1 process = self.processes.pop() process.terminate() process.join() From c2b45bccae650587e1b2cf70ce05c21ae7112936 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 15:09:54 +0100 Subject: [PATCH 04/13] docs about new worker id --- docs/deployment.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/deployment.md b/docs/deployment.md index d69fcf88e..47ce6c988 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -198,6 +198,8 @@ You can also manage child processes by sending specific signals to the main proc - `SIGTTIN`: Increase the number of worker processes by one. - `SIGTTOU`: Decrease the number of worker processes by one. +Additionally, if the built-in process manager is used uvicorn will provide you with an unique worker ID for each worker. This worker ID will be injected into the [state](https://asgi.readthedocs.io/en/latest/specs/lifespan.html#lifespan-state) of your application as the entry `'uvicorn_worker_id': int`. This ID is consistent across restarts and enables you to define idempotent startup- and shutdown-routines for each worker process. + ### Gunicorn !!! warning From 396f846b40bf29693185c706249282814dd804f2 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 16:15:37 +0100 Subject: [PATCH 05/13] make linter happy --- uvicorn/lifespan/on.py | 2 +- uvicorn/server.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index 21327c88c..90c9ec35d 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -80,7 +80,7 @@ async def main(self) -> None: app = self.config.loaded_app # inject worker id into app state - app.app.state.uvicorn_worker_id = self.state['uvicorn_worker_id'] + app.app.state.uvicorn_worker_id = self.state["uvicorn_worker_id"] scope: LifespanScope = { "type": "lifespan", diff --git a/uvicorn/server.py b/uvicorn/server.py index 8253d9aa9..d2576de90 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -94,7 +94,7 @@ async def _serve(self, sockets: list[socket.socket] | None = None, process_num: async def startup(self, sockets: list[socket.socket] | None = None, process_num: int = 0) -> None: # inject process_num as worker id into lifespan state worker_id = process_num + 1 - self.lifespan.state['uvicorn_worker_id'] = worker_id + self.lifespan.state["uvicorn_worker_id"] = worker_id await self.lifespan.startup() if self.lifespan.should_exit: From fe854b4ede2397317c0c69bf870c18b5846f7465 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 16:18:58 +0100 Subject: [PATCH 06/13] adapt stub run function for tests --- tests/supervisors/test_multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index f51c16f69..aff274f41 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -44,7 +44,7 @@ async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable pass # pragma: no cover -def run(sockets: list[socket.socket] | None) -> None: +def run(sockets: list[socket.socket] | None, process_num: int) -> None: while True: # pragma: no cover time.sleep(1) From 976df967b414997d21dbd9658c0d467058364be7 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 16:27:38 +0100 Subject: [PATCH 07/13] treat uvicorn_worker_id as optional in lifespan state --- uvicorn/lifespan/on.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index 90c9ec35d..dba8f86f5 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -3,7 +3,7 @@ import asyncio import logging from asyncio import Queue -from typing import Any, Union +from typing import cast, Any, Optional, Union from uvicorn import Config from uvicorn._types import ( @@ -80,7 +80,9 @@ async def main(self) -> None: app = self.config.loaded_app # inject worker id into app state - app.app.state.uvicorn_worker_id = self.state["uvicorn_worker_id"] + uvicorn_worker_id = cast(Optional[int], self.state.get("uvicorn_worker_id")) + if uvicorn_worker_id is not None: + app.app.state.uvicorn_worker_id = uvicorn_worker_id scope: LifespanScope = { "type": "lifespan", From fadc0d8c100f103cd1bfed1dec72eb1528be32fb Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 16:35:53 +0100 Subject: [PATCH 08/13] fix import order --- uvicorn/lifespan/on.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index dba8f86f5..7a0fe3aad 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -3,7 +3,7 @@ import asyncio import logging from asyncio import Queue -from typing import cast, Any, Optional, Union +from typing import Any, Optional, Union, cast from uvicorn import Config from uvicorn._types import ( From decf3b81548b3ffb90617cf413b45bb554adba47 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 16:48:20 +0100 Subject: [PATCH 09/13] pass test pipeline --- tests/protocols/test_websocket.py | 4 ++-- uvicorn/lifespan/on.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/protocols/test_websocket.py b/tests/protocols/test_websocket.py index 15ccfdd7d..b0d3b38c8 100644 --- a/tests/protocols/test_websocket.py +++ b/tests/protocols/test_websocket.py @@ -1147,8 +1147,8 @@ async def open_connection(url: str): async def test_lifespan_state(ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int): expected_states: list[dict[str, typing.Any]] = [ - {"a": 123, "b": [1]}, - {"a": 123, "b": [1, 2]}, + {"a": 123, "b": [1], "uvicorn_worker_id": 1}, + {"a": 123, "b": [1, 2], "uvicorn_worker_id": 1}, ] actual_states: list[dict[str, typing.Any]] = [] diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index 7a0fe3aad..bd75fbec0 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -81,7 +81,7 @@ async def main(self) -> None: # inject worker id into app state uvicorn_worker_id = cast(Optional[int], self.state.get("uvicorn_worker_id")) - if uvicorn_worker_id is not None: + if uvicorn_worker_id is not None and not callable(app.app) and "state" in app.app: app.app.state.uvicorn_worker_id = uvicorn_worker_id scope: LifespanScope = { From f5fd21ab2503133b04bf67ce4ce4fa37b7b33be3 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 16:54:42 +0100 Subject: [PATCH 10/13] improve condition for uvicorn_worker_id injection into app state --- uvicorn/lifespan/on.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index bd75fbec0..65f24d6b6 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -81,7 +81,7 @@ async def main(self) -> None: # inject worker id into app state uvicorn_worker_id = cast(Optional[int], self.state.get("uvicorn_worker_id")) - if uvicorn_worker_id is not None and not callable(app.app) and "state" in app.app: + if uvicorn_worker_id is not None and hasattr(app.app, "__iter__") and "state" in app.app: app.app.state.uvicorn_worker_id = uvicorn_worker_id scope: LifespanScope = { From 9b491729f37b69be95e4a522a3e7c80e31b4603e Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 17:08:50 +0100 Subject: [PATCH 11/13] use more modern version for types --- uvicorn/lifespan/on.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index 65f24d6b6..1badbc61d 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -3,7 +3,7 @@ import asyncio import logging from asyncio import Queue -from typing import Any, Optional, Union, cast +from typing import Any from uvicorn import Config from uvicorn._types import ( @@ -16,13 +16,16 @@ LifespanStartupFailedEvent, ) -LifespanReceiveMessage = Union[LifespanStartupEvent, LifespanShutdownEvent] -LifespanSendMessage = Union[ - LifespanStartupFailedEvent, - LifespanShutdownFailedEvent, - LifespanStartupCompleteEvent, - LifespanShutdownCompleteEvent, -] +LifespanReceiveMessage = ( + LifespanStartupEvent + | LifespanShutdownEvent +) +LifespanSendMessage = ( + LifespanStartupFailedEvent + | LifespanShutdownFailedEvent + | LifespanStartupCompleteEvent + | LifespanShutdownCompleteEvent +) STATE_TRANSITION_ERROR = "Got invalid state transition on lifespan protocol." @@ -80,7 +83,7 @@ async def main(self) -> None: app = self.config.loaded_app # inject worker id into app state - uvicorn_worker_id = cast(Optional[int], self.state.get("uvicorn_worker_id")) + uvicorn_worker_id: int | None = self.state.get("uvicorn_worker_id") if uvicorn_worker_id is not None and hasattr(app.app, "__iter__") and "state" in app.app: app.app.state.uvicorn_worker_id = uvicorn_worker_id From 339d6445aea1113a5d6ef0ef4fbcf3c3bc1c9918 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 17:32:50 +0100 Subject: [PATCH 12/13] Revert "use more modern version for types" This reverts commit 9b491729f37b69be95e4a522a3e7c80e31b4603e. --- uvicorn/lifespan/on.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index 1badbc61d..65f24d6b6 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -3,7 +3,7 @@ import asyncio import logging from asyncio import Queue -from typing import Any +from typing import Any, Optional, Union, cast from uvicorn import Config from uvicorn._types import ( @@ -16,16 +16,13 @@ LifespanStartupFailedEvent, ) -LifespanReceiveMessage = ( - LifespanStartupEvent - | LifespanShutdownEvent -) -LifespanSendMessage = ( - LifespanStartupFailedEvent - | LifespanShutdownFailedEvent - | LifespanStartupCompleteEvent - | LifespanShutdownCompleteEvent -) +LifespanReceiveMessage = Union[LifespanStartupEvent, LifespanShutdownEvent] +LifespanSendMessage = Union[ + LifespanStartupFailedEvent, + LifespanShutdownFailedEvent, + LifespanStartupCompleteEvent, + LifespanShutdownCompleteEvent, +] STATE_TRANSITION_ERROR = "Got invalid state transition on lifespan protocol." @@ -83,7 +80,7 @@ async def main(self) -> None: app = self.config.loaded_app # inject worker id into app state - uvicorn_worker_id: int | None = self.state.get("uvicorn_worker_id") + uvicorn_worker_id = cast(Optional[int], self.state.get("uvicorn_worker_id")) if uvicorn_worker_id is not None and hasattr(app.app, "__iter__") and "state" in app.app: app.app.state.uvicorn_worker_id = uvicorn_worker_id From 7be3aeaecb64c7d252b07b4940a61124b7322cc8 Mon Sep 17 00:00:00 2001 From: whilenot Date: Thu, 5 Dec 2024 17:35:24 +0100 Subject: [PATCH 13/13] fix condition --- uvicorn/lifespan/on.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/lifespan/on.py b/uvicorn/lifespan/on.py index 65f24d6b6..8b40a8a27 100644 --- a/uvicorn/lifespan/on.py +++ b/uvicorn/lifespan/on.py @@ -81,7 +81,7 @@ async def main(self) -> None: # inject worker id into app state uvicorn_worker_id = cast(Optional[int], self.state.get("uvicorn_worker_id")) - if uvicorn_worker_id is not None and hasattr(app.app, "__iter__") and "state" in app.app: + if uvicorn_worker_id is not None and hasattr(app.app, "state"): app.app.state.uvicorn_worker_id = uvicorn_worker_id scope: LifespanScope = {