Skip to content

Commit

Permalink
Merge pull request #531 from Pennyw0rth/neff-refactor-ssh
Browse files Browse the repository at this point in the history
  • Loading branch information
NeffIsBack authored Jan 4, 2025
2 parents fb96e78 + 23b4a23 commit f9ce149
Showing 1 changed file with 114 additions and 102 deletions.
216 changes: 114 additions & 102 deletions nxc/protocols/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def __init__(self, args, db, host):
self.protocol = "SSH"
self.remote_version = "Unknown SSH Version"
self.server_os_platform = "Linux"
self.shell_access = False
self.admin_privs = False
self.uac = ""
super().__init__(args, db, host)

Expand Down Expand Up @@ -77,11 +79,118 @@ def create_conn_obj(self):
except OSError:
return False

def check_if_admin(self):
self.admin_privs = False
def plaintext_login(self, username, password, private_key=""):
self.username = username
self.password = password
try:
if self.args.key_file or private_key:
self.logger.debug(f"Logging {self.host} with username: {username}, keyfile: {self.args.key_file}")
self.conn.connect(
self.host,
port=self.port,
username=username,
passphrase=password if password != "" else None,
pkey=private_key,
key_filename=self.args.key_file,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)
# If we get the private key from the file, we need to load it into the database
if self.args.key_file:
with open(self.args.key_file) as f:
private_key = f.read().rstrip("\n")
cred_id = self.db.add_credential("key", username, password, key=private_key)
else:
self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}")
self.conn.connect(
self.host,
port=self.port,
username=username,
password=password,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)
cred_id = self.db.add_credential("plaintext", username, password)

self.check_shell(cred_id)

secret = process_secret(self.password) if not self.args.key_file else f"{process_secret(self.password)} (keyfile: {self.args.key_file})"
display_shell_access = f"{self.uac}{self.server_os_platform}{' - Shell access!' if self.shell_access else ''}"
self.logger.success(f"{self.username}:{process_secret(secret)} {self.mark_pwned()} {highlight(display_shell_access)}")
return True
except AuthenticationException as e:
if "Private key file is encrypted" in str(e):
self.logger.fail(f"{username}:{process_secret(password)} Could not load private key, error: {e}")
else:
self.logger.fail(f"{username}:{process_secret(password)}")
except SSHException as e:
if "Invalid key" in str(e):
self.logger.fail(f"{username}:{process_secret(password)} Could not decrypt private key, invalid password")
elif "Error reading SSH protocol banner" in str(e):
self.logger.error(f"Internal Paramiko error for {username}:{process_secret(password)}, {e}")
else:
self.logger.exception(e)
except Exception as e:
self.logger.exception(e)
self.conn.close()
return False

def check_shell(self, cred_id):
host_id = self.db.get_hosts(self.host)[0].id

# Some IOT devices will not raise exception in self.conn._transport.auth_password / self.conn._transport.auth_publickey
# Check Linux
stdout = self.conn.exec_command("id")[1].read().decode(self.args.codec, errors="ignore")
if stdout:
self.server_os_platform = "Linux"
self.logger.debug(f"Linux detected for user: {stdout}")
self.shell_access = True
self.db.add_loggedin_relation(cred_id, host_id, shell=self.shell_access)
self.check_linux_priv()
if self.admin_privs:
self.logger.debug(f"User {self.username} logged in successfully and is root!")
if self.args.key_file:
self.db.add_admin_user("key", self.username, self.password, host_id=host_id, cred_id=cred_id)
else:
self.db.add_admin_user("plaintext", self.username, self.password, host_id=host_id, cred_id=cred_id)
return

# Check Windows
stdout = self.conn.exec_command("whoami /priv")[1].read().decode(self.args.codec, errors="ignore")
if stdout:
self.server_os_platform = "Windows"
self.logger.debug("Windows detected")
self.shell_access = True
self.db.add_loggedin_relation(cred_id, host_id, shell=self.shell_access)
self.check_windows_priv(stdout)
if self.admin_privs:
self.logger.debug(f"User {self.username} logged in successfully and is admin!")
if self.args.key_file:
self.db.add_admin_user("key", self.username, self.password, host_id=host_id, cred_id=cred_id)
else:
self.db.add_admin_user("plaintext", self.username, self.password, host_id=host_id, cred_id=cred_id)
return

# No shell access
self.shell_access = False
self.logger.debug(f"User: {self.username} can't get a basic shell")
self.server_os_platform = "Network Devices"
self.db.add_loggedin_relation(cred_id, host_id, shell=self.shell_access)

def check_windows_priv(self, stdout):
if "SeDebugPrivilege" in stdout:
self.admin_privs = True
elif "SeUndockPrivilege" in stdout:
self.admin_privs = True
self.uac = "with UAC - "

def check_linux_priv(self):
if self.args.sudo_check:
self.check_if_admin_sudo()
self.check_linux_priv_sudo()
return

# we could add in another method to check by piping in the password to sudo
Expand All @@ -108,7 +217,7 @@ def check_if_admin(self):
self.logger.display(tips)
return

def check_if_admin_sudo(self):
def check_linux_priv_sudo(self):
if not self.password:
self.logger.error("Check admin with sudo does not support using a private key")
return
Expand Down Expand Up @@ -184,103 +293,6 @@ def check_if_admin_sudo(self):
self.logger.error("Command: 'mkfifo' unavailable, running command with 'sudo' failed")
return

def plaintext_login(self, username, password, private_key=""):
self.username = username
self.password = password
stdout = None
try:
if self.args.key_file or private_key:
self.logger.debug(f"Logging {self.host} with username: {username}, keyfile: {self.args.key_file}")

self.conn.connect(
self.host,
port=self.port,
username=username,
passphrase=password if password != "" else None,
key_filename=private_key if private_key else self.args.key_file,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)

cred_id = self.db.add_credential(
"key",
username,
password if password != "" else "",
key=private_key,
)

else:
self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}")
self.conn.connect(
self.host,
port=self.port,
username=username,
password=password,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)
cred_id = self.db.add_credential("plaintext", username, password)

# Some IOT devices will not raise exception in self.conn._transport.auth_password / self.conn._transport.auth_publickey
_, stdout, _ = self.conn.exec_command("id")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
except AuthenticationException:
self.logger.fail(f"{username}:{process_secret(password)}")
except SSHException as e:
if "Invalid key" in str(e):
self.logger.fail(f"{username}:{process_secret(password)} Could not decrypt private key, error: {e}")
if "Error reading SSH protocol banner" in str(e):
self.logger.error(f"Internal Paramiko error for {username}:{process_secret(password)}, {e}")
else:
self.logger.exception(e)
except Exception as e:
self.logger.exception(e)
self.conn.close()
return False
else:
shell_access = False
host_id = self.db.get_hosts(self.host)[0].id

if not stdout:
_, stdout, _ = self.conn.exec_command("whoami /priv")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
self.server_os_platform = "Windows"
if "SeDebugPrivilege" in stdout:
self.admin_privs = True
elif "SeUndockPrivilege" in stdout:
self.admin_privs = True
self.uac = "with UAC - "

if not stdout:
self.logger.debug(f"User: {self.username} can't get a basic shell")
self.server_os_platform = "Network Devices"
shell_access = False
else:
shell_access = True

self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)

if shell_access and self.server_os_platform == "Linux":
self.check_if_admin()
if self.admin_privs:
self.logger.debug(f"User {username} logged in successfully and is root!")
if self.args.key_file:
self.db.add_admin_user("key", username, password, host_id=host_id, cred_id=cred_id)
else:
self.db.add_admin_user("plaintext", username, password, host_id=host_id, cred_id=cred_id)

if self.args.key_file:
password = f"{process_secret(password)} (keyfile: {self.args.key_file})"

display_shell_access = f"{self.uac}{self.server_os_platform}{' - Shell access!' if shell_access else ''}"
self.logger.success(f"{username}:{process_secret(password)} {self.mark_pwned()} {highlight(display_shell_access)}")

return True

def put_file_single(self, sftp_conn, src, dst):
self.logger.display(f'Copying "{src}" to "{dst}"')
try:
Expand Down Expand Up @@ -325,6 +337,6 @@ def execute(self, payload=None, get_output=False):
else:
self.logger.success("Executed command")
if get_output:
for line in stdout.split("\n"):
for line in stdout.replace("\r\n", "\n").rstrip("\n").split("\n"):
self.logger.highlight(line.strip("\n"))
return stdout

0 comments on commit f9ce149

Please sign in to comment.