Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring of the ssh module to separate auth check / cmd checks #292

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 89 additions & 82 deletions nxc/protocols/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
import logging
import time
from io import StringIO

from nxc.config import process_secret
from nxc.connection import connection, highlight
Expand All @@ -20,6 +21,7 @@ def __init__(self, args, db, host):
self.remote_version = "Unknown SSH Version"
self.server_os_platform = "Linux"
self.uac = ""
self.user_principal = ""
super().__init__(args, db, host)

def proto_flow(self):
Expand Down Expand Up @@ -74,7 +76,7 @@ def create_conn_obj(self):
except OSError:
return False

def check_if_admin(self):
def check_linux_priv(self):
self.admin_privs = False

if self.args.sudo_check:
Expand Down Expand Up @@ -184,99 +186,104 @@ def check_if_admin_sudo(self):
def plaintext_login(self, username, password, private_key=""):
self.username = username
self.password = password
stdout = None
try:
key_content = None
if self.args.key_file or private_key:
self.logger.debug(f"Logging {self.host} with username: {username}, keyfile: {self.args.key_file}")

self.conn.connect(
self.host,
port=self.port,
username=username,
passphrase=password if password != "" else None,
key_filename=private_key if private_key else self.args.key_file,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)

cred_id = self.db.add_credential(
"key",
username,
password if password != "" else "",
key=private_key,
)

else:
self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}")
self.conn.connect(
self.host,
port=self.port,
username=username,
password=password,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)
cred_id = self.db.add_credential("plaintext", username, password)

# Some IOT devices will not raise exception in self.conn._transport.auth_password / self.conn._transport.auth_publickey
_, stdout, _ = self.conn.exec_command("id")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
except AuthenticationException:
self.logger.fail(f"{username}:{process_secret(password)}")
except SSHException as e:
if "Invalid key" in str(e):
self.logger.fail(f"{username}:{process_secret(password)} Could not decrypt private key, error: {e}")
if "Error reading SSH protocol banner" in str(e):
self.logger.error(f"Internal Paramiko error for {username}:{process_secret(password)}, {e}")
key_content = self.read_key_file(private_key or self.args.key_file)
self.connect_with_key(username, password, key_content)
cred_type = "key"
else:
self.logger.exception(e)
self.connect_with_password(username, password)
cred_type = "plaintext"

cred_id = self.db.add_credential(cred_type, username, password, key=key_content)
shell_access = self.test_shell_access()
host_id = self.db.get_hosts(self.host)[0].id
if self.admin_privs:
self.logger.debug(f"User {username} logged in successfully and is root!")
self.db.add_admin_user(cred_type, username, password, host_id=host_id, cred_id=cred_id)
display_shell_access = f"({self.user_principal}) {self.server_os_platform} - {'Shell access!' if shell_access else 'Undetermined shell access.'}"
if self.args.key_file:
password = f"{process_secret(password)} (keyfile: {self.args.key_file})"
self.logger.success(f"{username}:{password} {self.mark_pwned()} {highlight(display_shell_access)}")
self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)
return shell_access
except Exception as e:
self.logger.exception(e)
self.conn.close()
self.handle_connection_failure(e)
return False
else:
shell_access = False
host_id = self.db.get_hosts(self.host)[0].id
finally:
self.conn.close()

if not stdout:
_, stdout, _ = self.conn.exec_command("whoami /priv")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
self.server_os_platform = "Windows"
if "SeDebugPrivilege" in stdout:
self.admin_privs = True
elif "SeUndockPrivilege" in stdout:
self.admin_privs = True
self.uac = "with UAC - "
def read_key_file(self, private_key):
if self.args.key_file:
with open(self.args.key_file) as f:
private_key = f.read()
return private_key

if not stdout:
self.logger.debug(f"User: {self.username} can't get a basic shell")
self.server_os_platform = "Network Devices"
shell_access = False
else:
shell_access = True
def connect_with_key(self, username, password, key_content):
self.logger.debug(f"Logging {self.host} with username: {self.username}, key: {self.args.key_file}")
pkey = paramiko.RSAKey.from_private_key(StringIO(key_content))
self.conn.connect(self.host, port=self.port, username=username, passphrase=password, pkey=pkey, look_for_keys=False, allow_agent=False)

self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)
def connect_with_password(self, username, password):
self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}")
self.conn.connect(self.host, port=self.port, username=username, password=password, look_for_keys=False, allow_agent=False)

if shell_access and self.server_os_platform == "Linux":
self.check_if_admin()
if self.admin_privs:
self.logger.debug(f"User {username} logged in successfully and is root!")
if self.args.key_file:
self.db.add_admin_user("key", username, password, host_id=host_id, cred_id=cred_id)
def test_shell_access(self):
try:
user_stdout = self.execute_command("whoami")
if user_stdout:
self.user_principal = user_stdout.strip()
self.logger.debug(f"User: {self.user_principal}")
if "\\" in user_stdout: # Likely a Windows username in DOMAIN\user format
priv_stdout = self.execute_command("whoami /priv")
if priv_stdout:
self.server_os_platform = "Windows"
self.check_windows_priv(priv_stdout)
else:
self.db.add_admin_user("plaintext", username, password, host_id=host_id, cred_id=cred_id)

if self.args.key_file:
password = f"{process_secret(password)} (keyfile: {self.args.key_file})"
self.logger.error(f"User: {self.user_principal} unable to determine Windows privileges")
return True
else:
self.server_os_platform = "Linux"
self.check_linux_priv()
return True
else:
uname_stdout = self.execute_command("uname")
if uname_stdout.strip():
self.logger.debug("Basic shell access check passed.")
self.server_os_platform = uname_stdout
return True
else:
self.logger.debug(f"User: {self.username} Shell status undermined; key commands not found.")
self.server_os_platform = "Network Devices"
return False
except Exception as e:
self.logger.error(f"Authenticated but failed to execute command: {e}")
return False

def execute_command(self, command):
_, stdout, _ = self.conn.exec_command(command)
return stdout.read().decode(self.args.codec, errors="ignore")


display_shell_access = f"{self.uac}{self.server_os_platform}{' - Shell access!' if shell_access else ''}"
self.logger.success(f"{username}:{process_secret(password)} {self.mark_pwned()} {highlight(display_shell_access)}")
def check_windows_priv(self, output):
if "SeDebugPrivilege" in output:
self.admin_privs = True
self.user_principal = "admin"
elif "SeUndockPrivilege" in output:
self.admin_privs = True
self.user_principal = "admin (UAC)"
else:
self.user_principal = "user (low priv)"

return True
def handle_connection_failure(self, exception):
error_msg = str(exception)
if self.args.key_file:
self.password = f"{process_secret(self.password)} (keyfile: {self.args.key_file})"
if "OpenSSH private key file checkints do not match" in error_msg:
self.logger.fail(f"{self.username}:{self.password} - Could not decrypt key file, wrong password")
else:
self.logger.fail(f"{self.username}:{self.password} {exception}")

def execute(self, payload=None, get_output=False):
if not payload and self.args.execute:
Expand Down