diff --git a/nxc/protocols/ssh.py b/nxc/protocols/ssh.py index 633b1a5d2..2b92885bb 100644 --- a/nxc/protocols/ssh.py +++ b/nxc/protocols/ssh.py @@ -52,7 +52,11 @@ def proto_logger(self): ) def print_host_info(self): - self.logger.display(self.remote_version if self.remote_version != "Unknown SSH Version" else f"{self.remote_version}, skipping...") + self.logger.display( + self.remote_version + if self.remote_version != "Unknown SSH Version" + else f"{self.remote_version}, skipping..." + ) return True def enum_host_info(self): @@ -65,7 +69,13 @@ def create_conn_obj(self): self.conn = paramiko.SSHClient() self.conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: - self.conn.connect(self.host, port=self.port, timeout=self.args.ssh_timeout, look_for_keys=False, allow_agent=False) + self.conn.connect( + self.host, + port=self.port, + timeout=self.args.ssh_timeout, + look_for_keys=False, + allow_agent=False, + ) except AuthenticationException: return True except SSHException: @@ -75,7 +85,7 @@ def create_conn_obj(self): except OSError: return False - def check_if_admin(self): + def check_linux_priv(self): self.admin_privs = False if self.args.sudo_check: @@ -91,7 +101,10 @@ def check_if_admin(self): "(root)": [True, None], "NOPASSWD: ALL": [True, None], "(ALL : ALL) ALL": [True, None], - "(sudo)": [False, f"Current user: '{self.username}' was in 'sudo' group, please try '--sudo-check' to check if user can run sudo shell"] + "(sudo)": [ + False, + f"Current user: '{self.username}' was in 'sudo' group, please try '--sudo-check' to check if user can run sudo shell", + ], } for keyword in admin_flag: match = re.findall(re.escape(keyword), stdout) @@ -108,7 +121,9 @@ def check_if_admin(self): def check_if_admin_sudo(self): if not self.password: - self.logger.error("Check admin with sudo does not support using a private key") + self.logger.error( + "Check admin with sudo does not support using a private key" + ) return if self.args.sudo_check_method: @@ -122,14 +137,20 @@ def check_if_admin_sudo(self): if "stdin" in stdout: shadow_backup = f"/tmp/{uuid.uuid4()}" # sudo support stdin password - self.conn.exec_command(f"echo {self.password} | sudo -S cp /etc/shadow {shadow_backup} >/dev/null 2>&1 &") - self.conn.exec_command(f"echo {self.password} | sudo -S chmod 777 {shadow_backup} >/dev/null 2>&1 &") + self.conn.exec_command( + f"echo {self.password} | sudo -S cp /etc/shadow {shadow_backup} >/dev/null 2>&1 &" + ) + self.conn.exec_command( + f"echo {self.password} | sudo -S chmod 777 {shadow_backup} >/dev/null 2>&1 &" + ) tries = 1 while True: self.logger.info(f"Checking {shadow_backup} if it existed") _, _, stderr = self.conn.exec_command(f"ls {shadow_backup}") if tries >= self.args.get_output_tries: - self.logger.info(f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user") + self.logger.info( + f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user" + ) break if stderr.read().decode("utf-8"): time.sleep(2) @@ -139,9 +160,13 @@ def check_if_admin_sudo(self): self.admin_privs = True break self.logger.info(f"Remove up temporary files {shadow_backup}") - self.conn.exec_command(f"echo {self.password} | sudo -S rm -rf {shadow_backup}") + self.conn.exec_command( + f"echo {self.password} | sudo -S rm -rf {shadow_backup}" + ) else: - self.logger.error("Command: 'sudo' not support stdin mode, running command with 'sudo' failed") + self.logger.error( + "Command: 'sudo' not support stdin mode, running command with 'sudo' failed" + ) return else: _, stdout, _ = self.conn.exec_command("mkfifo --help") @@ -152,144 +177,180 @@ def check_if_admin_sudo(self): pipe_stdin = f"/tmp/systemd-{uuid.uuid4()}" pipe_stdout = f"/tmp/systemd-{uuid.uuid4()}" shadow_backup = f"/tmp/{uuid.uuid4()}" - self.conn.exec_command(f"mkfifo {pipe_stdin}; tail -f {pipe_stdin} | /bin/sh 2>&1 > {pipe_stdout} >/dev/null 2>&1 &") + self.conn.exec_command( + f"mkfifo {pipe_stdin}; tail -f {pipe_stdin} | /bin/sh 2>&1 > {pipe_stdout} >/dev/null 2>&1 &" + ) # 'script -qc /bin/sh /dev/null' means "upgrade" the shell, like reverse shell from netcat - self.conn.exec_command(f"echo 'script -qc /bin/sh /dev/null' > {pipe_stdin}") - self.conn.exec_command(f"echo 'sudo -s' > {pipe_stdin} && echo '{self.password}' > {pipe_stdin}") + self.conn.exec_command( + f"echo 'script -qc /bin/sh /dev/null' > {pipe_stdin}" + ) + self.conn.exec_command( + f"echo 'sudo -s' > {pipe_stdin} && echo '{self.password}' > {pipe_stdin}" + ) # Sometime the pipe will hanging(only happen with paramiko) # Can't get "whoami" or "id" result in pipe_stdout, maybe something wrong using pipe with paramiko # But one thing I can confirm, is the command was executed even can't get result from pipe_stdout tries = 1 - self.logger.info(f"Copy /etc/shadow to {shadow_backup} if pass the sudo auth") + self.logger.info( + f"Copy /etc/shadow to {shadow_backup} if pass the sudo auth" + ) while True: self.logger.info(f"Checking {shadow_backup} if it existed") _, _, stderr = self.conn.exec_command(f"ls {shadow_backup}") if tries >= self.args.get_output_tries: - self.logger.info(f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user") + self.logger.info( + f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user" + ) break if stderr.read().decode("utf-8"): time.sleep(2) - self.conn.exec_command(f"echo 'cp /etc/shadow {shadow_backup} && chmod 777 {shadow_backup}' > {pipe_stdin}") + self.conn.exec_command( + f"echo 'cp /etc/shadow {shadow_backup} && chmod 777 {shadow_backup}' > {pipe_stdin}" + ) tries += 1 else: self.logger.info(f"{shadow_backup} existed") self.admin_privs = True break - self.logger.info(f"Remove up temporary files {shadow_backup} {pipe_stdin} {pipe_stdout}") - self.conn.exec_command(f"echo 'rm -rf {shadow_backup}' > {pipe_stdin} && rm -rf {pipe_stdin} {pipe_stdout}") + self.logger.info( + f"Remove up temporary files {shadow_backup} {pipe_stdin} {pipe_stdout}" + ) + self.conn.exec_command( + f"echo 'rm -rf {shadow_backup}' > {pipe_stdin} && rm -rf {pipe_stdin} {pipe_stdout}" + ) else: - self.logger.error("Command: 'mkfifo' unavailable, running command with 'sudo' failed") + self.logger.error( + "Command: 'mkfifo' unavailable, running command with 'sudo' failed" + ) return def plaintext_login(self, username, password, private_key=None): self.username = username self.password = password - private_key = "" - stdout = None try: + key_content = None if self.args.key_file or private_key: - self.logger.debug("Logging in with key") - - if self.args.key_file: - with open(self.args.key_file) as f: - private_key = f.read() + key_content = self.read_key_file(private_key or self.args.key_file) + self.connect_with_key(username, password, key_content) + cred_type = "key" + else: + self.connect_with_password(username, password) + cred_type = "plaintext" - pkey = paramiko.RSAKey.from_private_key(StringIO(private_key)) - self.conn.connect( - self.host, - port=self.port, - username=username, - passphrase=password if password != "" else None, - pkey=pkey, - look_for_keys=False, - allow_agent=False, + cred_id = self.db.add_credential( + cred_type, username, password, key=key_content + ) + shell_access = self.test_shell_access() + host_id = self.db.get_hosts(self.host)[0].id + if self.admin_privs: + self.logger.debug( + f"User {username} logged in successfully and is root!" ) - - cred_id = self.db.add_credential( - "key", - username, - password if password != "" else "", - key=private_key, + self.db.add_admin_user( + cred_type, username, password, host_id=host_id, cred_id=cred_id ) - - else: - self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}") - self.conn.connect( - self.host, - port=self.port, - username=username, - password=password, - look_for_keys=False, - allow_agent=False, - ) - cred_id = self.db.add_credential("plaintext", username, password) - - # Some IOT devices will not raise exception in self.conn._transport.auth_password / self.conn._transport.auth_publickey - _, stdout, _ = self.conn.exec_command("id") - stdout = stdout.read().decode(self.args.codec, errors="ignore") - except Exception as e: + display_shell_access = f"({self.user_principal}) {self.server_os_platform} - {'Shell access!' if shell_access else 'No shell access.'}" if self.args.key_file: password = f"{process_secret(password)} (keyfile: {self.args.key_file})" - if "OpenSSH private key file checkints do not match" in str(e): - self.logger.fail(f"{username}:{password} - Could not decrypt key file, wrong password") - else: - self.logger.fail(f"{username}:{password} {e}") - self.conn.close() + self.logger.success( + f"{username}:{password} {self.mark_pwned()} {highlight(display_shell_access)}" + ) + self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access) + return shell_access + except Exception as e: + self.handle_connection_failure(e) return False - else: - shell_access = False - host_id = self.db.get_hosts(self.host)[0].id + finally: + self.conn.close() - if not stdout: - _, stdout, _ = self.conn.exec_command("whoami /priv") - stdout = stdout.read().decode(self.args.codec, errors="ignore") - self.server_os_platform = "Windows" - self.user_principal = "admin" - if "SeDebugPrivilege" in stdout: - self.admin_privs = True - elif "SeUndockPrivilege" in stdout: - self.admin_privs = True - self.user_principal = "admin (UAC)" - else: - # non admin (low priv) - self.user_principal = "admin (low priv)" + def read_key_file(self, private_key): + if self.args.key_file: + with open(self.args.key_file) as f: + private_key = f.read() + return private_key - if not stdout: + def connect_with_key(self, username, password, key_content): + self.logger.debug( + f"Logging {self.host} with username: {self.username}, key: {self.args.key_file}" + ) + pkey = paramiko.RSAKey.from_private_key(StringIO(key_content)) + self.conn.connect( + self.host, + port=self.port, + username=username, + passphrase=password, + pkey=pkey, + look_for_keys=False, + allow_agent=False, + ) + + def connect_with_password(self, username, password): + self.logger.debug( + f"Logging {self.host} with username: {self.username}, password: {self.password}" + ) + self.conn.connect( + self.host, + port=self.port, + username=username, + password=password, + look_for_keys=False, + allow_agent=False, + ) + + def test_shell_access(self): + try: + intial_stdout = self.execute_command("whoami") + if intial_stdout: + self.user_principal = intial_stdout.strip() + self.logger.debug( + f"Initial access check passed. User: {self.user_principal}" + ) + if ( + "\\" in intial_stdout + ): # Likely a Windows username in DOMAIN\user format + stdout = self.execute_command("whoami /priv") + if stdout: + self.server_os_platform = "Windows" + self.check_windows_priv(stdout) + else: + self.server_os_platform == "Linux" + self.check_linux_priv() + else: self.logger.debug(f"User: {self.username} can't get a basic shell") self.server_os_platform = "Network Devices" - shell_access = False - else: - shell_access = True - - self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access) + return bool(intial_stdout) + except Exception as e: + self.logger.error(f"Authenticated but failed to execute command: {e}") + return False - if shell_access and self.server_os_platform == "Linux": - self.check_if_admin() - if self.admin_privs: - self.logger.debug(f"User {username} logged in successfully and is root!") - if self.args.key_file: - self.db.add_admin_user("key", username, password, host_id=host_id, cred_id=cred_id) - else: - self.db.add_admin_user( - "plaintext", - username, - password, - host_id=host_id, - cred_id=cred_id, - ) + def execute_command(self, command): + _, stdout, _ = self.conn.exec_command(command) + output = stdout.read().decode(self.args.codec, errors="ignore") + return output - if self.args.key_file: - password = f"{process_secret(password)} (keyfile: {self.args.key_file})" + def check_windows_priv(self, output): + if "SeDebugPrivilege" in output: + self.admin_privs = True + self.user_principal = "admin" + elif "SeUndockPrivilege" in output: + self.admin_privs = True + self.user_principal = "admin (UAC)" + else: + self.user_principal = "user (low priv)" - display_shell_access = "{} {} {}".format( - f"({self.user_principal})" if self.admin_privs else f"(non {self.user_principal})", - self.server_os_platform, - "- Shell access!" if shell_access else "" + def handle_connection_failure(self, exception): + error_msg = str(exception) + if self.args.key_file: + self.password = ( + f"{process_secret(self.password)} (keyfile: {self.args.key_file})" ) - self.logger.success(f"{username}:{password} {self.mark_pwned()} {highlight(display_shell_access)}") - - return True + if "OpenSSH private key file checkints do not match" in error_msg: + self.logger.fail( + f"{self.username}:{self.password} - Could not decrypt key file, wrong password" + ) + else: + self.logger.fail(f"{self.username}:{self.password} {exception}") def execute(self, payload=None, get_output=False): if not payload and self.args.execute: