From 3ca042fb2b03585efaf3cbb6ce132c09ce56fabe Mon Sep 17 00:00:00 2001 From: Draper <27962761+Drapersniper@users.noreply.github.com> Date: Wed, 30 Mar 2022 11:19:31 +0100 Subject: [PATCH] add port conflict resolution Signed-off-by: Draper <27962761+Drapersniper@users.noreply.github.com> --- redbot/cogs/audio/core/__init__.py | 1 + redbot/cogs/audio/core/abc.py | 2 + redbot/cogs/audio/core/commands/audioset.py | 2 + redbot/cogs/audio/core/tasks/lavalink.py | 58 ++++++++++++++++++--- redbot/cogs/audio/errors.py | 4 ++ redbot/cogs/audio/manager.py | 57 +++++++++++++++++++- 6 files changed, 116 insertions(+), 8 deletions(-) diff --git a/redbot/cogs/audio/core/__init__.py b/redbot/cogs/audio/core/__init__.py index 3d436423f71c..b9c6a8bcbf05 100644 --- a/redbot/cogs/audio/core/__init__.py +++ b/redbot/cogs/audio/core/__init__.py @@ -65,6 +65,7 @@ def __init__(self, bot: Red): self.skip_votes = {} self.play_lock = {} self.antispam: Dict[int, Dict[str, AntiSpam]] = defaultdict(lambda: defaultdict(AntiSpam)) + self._runtime_external_node = False self.lavalink_connect_task = None self._restore_task = None diff --git a/redbot/cogs/audio/core/abc.py b/redbot/cogs/audio/core/abc.py index 0e3d4e357504..c4548213fcea 100644 --- a/redbot/cogs/audio/core/abc.py +++ b/redbot/cogs/audio/core/abc.py @@ -66,6 +66,8 @@ class MixinMeta(ABC): _disconnected_players: MutableMapping[int, bool] global_api_user: MutableMapping[str, Any] + _runtime_external_node: bool + cog_cleaned_up: bool lavalink_connection_aborted: bool diff --git a/redbot/cogs/audio/core/commands/audioset.py b/redbot/cogs/audio/core/commands/audioset.py index a23949019f2d..0dd307bf78a8 100644 --- a/redbot/cogs/audio/core/commands/audioset.py +++ b/redbot/cogs/audio/core/commands/audioset.py @@ -1115,6 +1115,8 @@ async def command_audioset_settings(self, ctx: commands.Context): lavalink_version=lavalink.__version__, use_external_lavalink=_("Enabled") if global_data["use_external_lavalink"] + else _("Enabled (Temporary)") + if self._runtime_external_node else _("Disabled"), ) if ( diff --git a/redbot/cogs/audio/core/tasks/lavalink.py b/redbot/cogs/audio/core/tasks/lavalink.py index d3c961030825..04115741a345 100644 --- a/redbot/cogs/audio/core/tasks/lavalink.py +++ b/redbot/cogs/audio/core/tasks/lavalink.py @@ -1,7 +1,8 @@ import asyncio -from pathlib import Path +import pathlib import lavalink +import yaml from red_commons.logging import getLogger from redbot.core import data_manager @@ -11,7 +12,7 @@ from ..cog_utils import CompositeMetaClass log = getLogger("red.cogs.Audio.cog.Tasks.lavalink") -_ = Translator("Audio", Path(__file__)) +_ = Translator("Audio", pathlib.Path(__file__)) class LavalinkTasks(MixinMeta, metaclass=CompositeMetaClass): @@ -41,9 +42,8 @@ async def lavalink_attempt_connect(self, timeout: int = 50, manual: bool = False if self._restore_task: self._restore_task.cancel() if self.managed_node_controller is not None: - if not self.managed_node_controller._shutdown: - await self.managed_node_controller.shutdown() - await asyncio.sleep(5) + await self.managed_node_controller.shutdown() + await asyncio.sleep(5) await lavalink.close(self.bot) while retry_count < max_retries: configs = await self.config.all() @@ -65,6 +65,44 @@ async def lavalink_attempt_connect(self, timeout: int = 50, manual: bool = False except asyncio.TimeoutError: if self.managed_node_controller is not None: await self.managed_node_controller.shutdown() + if self._runtime_external_node is True: + log.warning("Attempting to connect to existing Lavalink Node.") + self.lavalink_connection_aborted = False + matching_processes = ( + await self.managed_node_controller.get_lavalink_process( + lazy_match=True + ) + ) + log.debug( + "Found %s processes with lavalink in the cmdline.", + len(matching_processes), + ) + valid_working_dirs = [ + cwd + for d in matching_processes + if d.get("name") == "java" and (cwd := d.get("cwd")) + ] + log.debug( + "Found %s java processed with a cwd set.", len(valid_working_dirs) + ) + for cwd in valid_working_dirs: + config = pathlib.Path(cwd) / "application.yml" + if config.exists() and config.is_file(): + log.debug( + "The following config file exists for an unmanaged Lavalink node %s", + config, + ) + try: + with config.open(mode="r") as config_data: + data = yaml.safe_load(config_data) + host = data["server"]["address"] + port = data["server"]["port"] + password = data["lavalink"]["server"]["password"] + break + except Exception: + log.verbose("Failed to read contents of %s", config) + continue + break if self.lavalink_connection_aborted is not True: log.critical( "Managed node startup timeout, aborting managed node startup." @@ -117,9 +155,15 @@ async def lavalink_attempt_connect(self, timeout: int = 50, manual: bool = False return except asyncio.TimeoutError: await lavalink.close(self.bot) - log.warning("Connecting to Lavalink node timed out, retrying...") retry_count += 1 - await asyncio.sleep(1) # prevent busylooping + if self._runtime_external_node is True: + log.warning( + "Attempt to connect to existing Lavalink node failed, aborting future reconnects." + ) + self.lavalink_connection_aborted = True + return + log.warning("Connecting to Lavalink node timed out, retrying...") + await asyncio.sleep(1) except Exception as exc: log.exception( "Unhandled exception whilst connecting to Lavalink node, aborting...", diff --git a/redbot/cogs/audio/errors.py b/redbot/cogs/audio/errors.py index 05d3d49c9a71..c0a9303af5e0 100644 --- a/redbot/cogs/audio/errors.py +++ b/redbot/cogs/audio/errors.py @@ -31,6 +31,10 @@ class ManagedLavalinkStartFailure(ManagedLavalinkNodeException): """Exception thrown when a managed Lavalink node fails to start""" +class PortAlreadyInUse(ManagedLavalinkStartFailure): + """Exception thrown when a managed Lavalink node fails to start due to a port conflict""" + + class ManagedLavalinkPreviouslyShutdownException(ManagedLavalinkNodeException): """Exception thrown when a managed Lavalink node already has been shutdown""" diff --git a/redbot/cogs/audio/manager.py b/redbot/cogs/audio/manager.py index 1e767c3fb599..1f40d40cda13 100644 --- a/redbot/cogs/audio/manager.py +++ b/redbot/cogs/audio/manager.py @@ -36,6 +36,7 @@ IncorrectProcessFound, NoProcessFound, NodeUnhealthy, + PortAlreadyInUse, ) from .utils import ( change_dict_naming_convention, @@ -127,6 +128,7 @@ class ServerManager: def __init__(self, config: Config, cog: "Audio", timeout: Optional[int] = None) -> None: self.ready: asyncio.Event = asyncio.Event() + self.abort_for_unmanaged: asyncio.Event = asyncio.Event() self._config = config self._proc: Optional[asyncio.subprocess.Process] = None # pylint:disable=no-member self._node_pid: Optional[int] = None @@ -135,6 +137,7 @@ def __init__(self, config: Config, cog: "Audio", timeout: Optional[int] = None) self.timeout = timeout self.cog = cog self._args = [] + self._current_config = {} @property def path(self) -> Optional[str]: @@ -218,6 +221,7 @@ async def _start(self, java_path: str) -> None: async def process_settings(self): data = change_dict_naming_convention(await self._config.yaml.all()) + self._current_config = data with open(LAVALINK_APP_YML, "w") as f: yaml.safe_dump(data, f) @@ -320,6 +324,14 @@ async def _wait_for_launcher(self) -> None: log.info("Managed Lavalink node is ready to receive requests.") break if _FAILED_TO_START.search(line): + if ( + f"Port {self._current_config['server']['port']} was already in use".encode() + in line + ): + raise PortAlreadyInUse( + f"Port {self._current_config['server']['port']} already in use. " + f"Managed Lavalink startup aborted." + ) raise ManagedLavalinkStartFailure( f"Lavalink failed to start: {line.decode().strip()}" ) @@ -333,6 +345,7 @@ async def _wait_for_launcher(self) -> None: async def shutdown(self) -> None: if self.start_monitor_task is not None: self.start_monitor_task.cancel() + self.abort_for_unmanaged.clear() await self._partial_shutdown() async def _partial_shutdown(self) -> None: @@ -441,8 +454,44 @@ async def maybe_download_jar(self): if not (LAVALINK_JAR_FILE.exists() and await self._is_up_to_date()): await self._download_jar() + @staticmethod + async def get_lavalink_process( + *matches: str, cwd: Optional[str] = None, lazy_match: bool = False + ): + process_list = [] + filter = [cwd] if cwd else [] + async for proc in AsyncIter(psutil.process_iter()): + try: + if cwd: + if not (proc.cwd() in filter): + continue + cmdline = proc.cmdline() + if (matches and all(a in cmdline for a in matches)) or ( + lazy_match and any("lavalink" in arg.lower() for arg in cmdline) + ): + proc_as_dict = proc.as_dict( + attrs=["pid", "name", "create_time", "status", "cmdline", "cwd"] + ) + process_list.append(proc_as_dict) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + return process_list + async def wait_until_ready(self, timeout: Optional[float] = None): - await asyncio.wait_for(self.ready.wait(), timeout=timeout or self.timeout) + tasks = [ + asyncio.create_task(c) for c in [self.ready.wait(), self.abort_for_unmanaged.wait()] + ] + done, pending = await asyncio.wait( + tasks, timeout=timeout or self.timeout, return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + if done: + done.pop().result() + if self.abort_for_unmanaged.is_set(): + raise asyncio.TimeoutError + if not self.ready.is_set(): + raise asyncio.TimeoutError async def start_monitor(self, java_path: str): retry_count = 0 @@ -528,6 +577,12 @@ async def start_monitor(self, java_path: str): log.critical(exc) self.cog.lavalink_connection_aborted = True return await self.shutdown() + except PortAlreadyInUse as exc: + log.critical(exc) + self.cog.lavalink_connection_aborted = False + self.cog._runtime_external_node = True + self.abort_for_unmanaged.set() + return await self.shutdown() except ManagedLavalinkNodeException as exc: delay = backoff.delay() log.critical(