Skip to content

Commit

Permalink
refactoring of the ssh module to separate auth check / cmd checks
Browse files Browse the repository at this point in the history
  • Loading branch information
nikaiw committed May 7, 2024
1 parent c499d92 commit 0d84cbe
Showing 1 changed file with 169 additions and 108 deletions.
277 changes: 169 additions & 108 deletions nxc/protocols/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ def proto_logger(self):
)

def print_host_info(self):
self.logger.display(self.remote_version if self.remote_version != "Unknown SSH Version" else f"{self.remote_version}, skipping...")
self.logger.display(
self.remote_version
if self.remote_version != "Unknown SSH Version"
else f"{self.remote_version}, skipping..."
)
return True

def enum_host_info(self):
Expand All @@ -65,7 +69,13 @@ def create_conn_obj(self):
self.conn = paramiko.SSHClient()
self.conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self.conn.connect(self.host, port=self.port, timeout=self.args.ssh_timeout, look_for_keys=False, allow_agent=False)
self.conn.connect(
self.host,
port=self.port,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
)
except AuthenticationException:
return True
except SSHException:
Expand All @@ -75,7 +85,7 @@ def create_conn_obj(self):
except OSError:
return False

def check_if_admin(self):
def check_linux_priv(self):
self.admin_privs = False

if self.args.sudo_check:
Expand All @@ -91,7 +101,10 @@ def check_if_admin(self):
"(root)": [True, None],
"NOPASSWD: ALL": [True, None],
"(ALL : ALL) ALL": [True, None],
"(sudo)": [False, f"Current user: '{self.username}' was in 'sudo' group, please try '--sudo-check' to check if user can run sudo shell"]
"(sudo)": [
False,
f"Current user: '{self.username}' was in 'sudo' group, please try '--sudo-check' to check if user can run sudo shell",
],
}
for keyword in admin_flag:
match = re.findall(re.escape(keyword), stdout)
Expand All @@ -108,7 +121,9 @@ def check_if_admin(self):

def check_if_admin_sudo(self):
if not self.password:
self.logger.error("Check admin with sudo does not support using a private key")
self.logger.error(
"Check admin with sudo does not support using a private key"
)
return

if self.args.sudo_check_method:
Expand All @@ -122,14 +137,20 @@ def check_if_admin_sudo(self):
if "stdin" in stdout:
shadow_backup = f"/tmp/{uuid.uuid4()}"
# sudo support stdin password
self.conn.exec_command(f"echo {self.password} | sudo -S cp /etc/shadow {shadow_backup} >/dev/null 2>&1 &")
self.conn.exec_command(f"echo {self.password} | sudo -S chmod 777 {shadow_backup} >/dev/null 2>&1 &")
self.conn.exec_command(
f"echo {self.password} | sudo -S cp /etc/shadow {shadow_backup} >/dev/null 2>&1 &"
)
self.conn.exec_command(
f"echo {self.password} | sudo -S chmod 777 {shadow_backup} >/dev/null 2>&1 &"
)
tries = 1
while True:
self.logger.info(f"Checking {shadow_backup} if it existed")
_, _, stderr = self.conn.exec_command(f"ls {shadow_backup}")
if tries >= self.args.get_output_tries:
self.logger.info(f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user")
self.logger.info(
f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user"
)
break
if stderr.read().decode("utf-8"):
time.sleep(2)
Expand All @@ -139,9 +160,13 @@ def check_if_admin_sudo(self):
self.admin_privs = True
break
self.logger.info(f"Remove up temporary files {shadow_backup}")
self.conn.exec_command(f"echo {self.password} | sudo -S rm -rf {shadow_backup}")
self.conn.exec_command(
f"echo {self.password} | sudo -S rm -rf {shadow_backup}"
)
else:
self.logger.error("Command: 'sudo' not support stdin mode, running command with 'sudo' failed")
self.logger.error(
"Command: 'sudo' not support stdin mode, running command with 'sudo' failed"
)
return
else:
_, stdout, _ = self.conn.exec_command("mkfifo --help")
Expand All @@ -152,144 +177,180 @@ def check_if_admin_sudo(self):
pipe_stdin = f"/tmp/systemd-{uuid.uuid4()}"
pipe_stdout = f"/tmp/systemd-{uuid.uuid4()}"
shadow_backup = f"/tmp/{uuid.uuid4()}"
self.conn.exec_command(f"mkfifo {pipe_stdin}; tail -f {pipe_stdin} | /bin/sh 2>&1 > {pipe_stdout} >/dev/null 2>&1 &")
self.conn.exec_command(
f"mkfifo {pipe_stdin}; tail -f {pipe_stdin} | /bin/sh 2>&1 > {pipe_stdout} >/dev/null 2>&1 &"
)
# 'script -qc /bin/sh /dev/null' means "upgrade" the shell, like reverse shell from netcat
self.conn.exec_command(f"echo 'script -qc /bin/sh /dev/null' > {pipe_stdin}")
self.conn.exec_command(f"echo 'sudo -s' > {pipe_stdin} && echo '{self.password}' > {pipe_stdin}")
self.conn.exec_command(
f"echo 'script -qc /bin/sh /dev/null' > {pipe_stdin}"
)
self.conn.exec_command(
f"echo 'sudo -s' > {pipe_stdin} && echo '{self.password}' > {pipe_stdin}"
)
# Sometime the pipe will hanging(only happen with paramiko)
# Can't get "whoami" or "id" result in pipe_stdout, maybe something wrong using pipe with paramiko
# But one thing I can confirm, is the command was executed even can't get result from pipe_stdout
tries = 1
self.logger.info(f"Copy /etc/shadow to {shadow_backup} if pass the sudo auth")
self.logger.info(
f"Copy /etc/shadow to {shadow_backup} if pass the sudo auth"
)
while True:
self.logger.info(f"Checking {shadow_backup} if it existed")
_, _, stderr = self.conn.exec_command(f"ls {shadow_backup}")
if tries >= self.args.get_output_tries:
self.logger.info(f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user")
self.logger.info(
f"The file {shadow_backup} does not exist, the pipe may be hanging. Increase the number of tries with the option '--get-output-tries' or change other method with '--sudo-check-method'. If it's still failing, maybe sudo shell does not work with the current user"
)
break

if stderr.read().decode("utf-8"):
time.sleep(2)
self.conn.exec_command(f"echo 'cp /etc/shadow {shadow_backup} && chmod 777 {shadow_backup}' > {pipe_stdin}")
self.conn.exec_command(
f"echo 'cp /etc/shadow {shadow_backup} && chmod 777 {shadow_backup}' > {pipe_stdin}"
)
tries += 1
else:
self.logger.info(f"{shadow_backup} existed")
self.admin_privs = True
break
self.logger.info(f"Remove up temporary files {shadow_backup} {pipe_stdin} {pipe_stdout}")
self.conn.exec_command(f"echo 'rm -rf {shadow_backup}' > {pipe_stdin} && rm -rf {pipe_stdin} {pipe_stdout}")
self.logger.info(
f"Remove up temporary files {shadow_backup} {pipe_stdin} {pipe_stdout}"
)
self.conn.exec_command(
f"echo 'rm -rf {shadow_backup}' > {pipe_stdin} && rm -rf {pipe_stdin} {pipe_stdout}"
)
else:
self.logger.error("Command: 'mkfifo' unavailable, running command with 'sudo' failed")
self.logger.error(
"Command: 'mkfifo' unavailable, running command with 'sudo' failed"
)
return

def plaintext_login(self, username, password, private_key=None):
self.username = username
self.password = password
private_key = ""
stdout = None
try:
key_content = None
if self.args.key_file or private_key:
self.logger.debug("Logging in with key")

if self.args.key_file:
with open(self.args.key_file) as f:
private_key = f.read()
key_content = self.read_key_file(private_key or self.args.key_file)
self.connect_with_key(username, password, key_content)
cred_type = "key"
else:
self.connect_with_password(username, password)
cred_type = "plaintext"

pkey = paramiko.RSAKey.from_private_key(StringIO(private_key))
self.conn.connect(
self.host,
port=self.port,
username=username,
passphrase=password if password != "" else None,
pkey=pkey,
look_for_keys=False,
allow_agent=False,
cred_id = self.db.add_credential(
cred_type, username, password, key=key_content
)
shell_access = self.test_shell_access()
host_id = self.db.get_hosts(self.host)[0].id
if self.admin_privs:
self.logger.debug(
f"User {username} logged in successfully and is root!"
)

cred_id = self.db.add_credential(
"key",
username,
password if password != "" else "",
key=private_key,
self.db.add_admin_user(
cred_type, username, password, host_id=host_id, cred_id=cred_id
)

else:
self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}")
self.conn.connect(
self.host,
port=self.port,
username=username,
password=password,
look_for_keys=False,
allow_agent=False,
)
cred_id = self.db.add_credential("plaintext", username, password)

# Some IOT devices will not raise exception in self.conn._transport.auth_password / self.conn._transport.auth_publickey
_, stdout, _ = self.conn.exec_command("id")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
except Exception as e:
display_shell_access = f"({self.user_principal}) {self.server_os_platform} - {'Shell access!' if shell_access else 'No shell access.'}"
if self.args.key_file:
password = f"{process_secret(password)} (keyfile: {self.args.key_file})"
if "OpenSSH private key file checkints do not match" in str(e):
self.logger.fail(f"{username}:{password} - Could not decrypt key file, wrong password")
else:
self.logger.fail(f"{username}:{password} {e}")
self.conn.close()
self.logger.success(
f"{username}:{password} {self.mark_pwned()} {highlight(display_shell_access)}"
)
self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)
return shell_access
except Exception as e:
self.handle_connection_failure(e)
return False
else:
shell_access = False
host_id = self.db.get_hosts(self.host)[0].id
finally:
self.conn.close()

if not stdout:
_, stdout, _ = self.conn.exec_command("whoami /priv")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
self.server_os_platform = "Windows"
self.user_principal = "admin"
if "SeDebugPrivilege" in stdout:
self.admin_privs = True
elif "SeUndockPrivilege" in stdout:
self.admin_privs = True
self.user_principal = "admin (UAC)"
else:
# non admin (low priv)
self.user_principal = "admin (low priv)"
def read_key_file(self, private_key):
if self.args.key_file:
with open(self.args.key_file) as f:
private_key = f.read()
return private_key

if not stdout:
def connect_with_key(self, username, password, key_content):
self.logger.debug(
f"Logging {self.host} with username: {self.username}, key: {self.args.key_file}"
)
pkey = paramiko.RSAKey.from_private_key(StringIO(key_content))
self.conn.connect(
self.host,
port=self.port,
username=username,
passphrase=password,
pkey=pkey,
look_for_keys=False,
allow_agent=False,
)

def connect_with_password(self, username, password):
self.logger.debug(
f"Logging {self.host} with username: {self.username}, password: {self.password}"
)
self.conn.connect(
self.host,
port=self.port,
username=username,
password=password,
look_for_keys=False,
allow_agent=False,
)

def test_shell_access(self):
try:
intial_stdout = self.execute_command("whoami")
if intial_stdout:
self.user_principal = intial_stdout.strip()
self.logger.debug(
f"Initial access check passed. User: {self.user_principal}"
)
if (
"\\" in intial_stdout
): # Likely a Windows username in DOMAIN\user format
stdout = self.execute_command("whoami /priv")
if stdout:
self.server_os_platform = "Windows"
self.check_windows_priv(stdout)
else:
self.server_os_platform == "Linux"
self.check_linux_priv()
else:
self.logger.debug(f"User: {self.username} can't get a basic shell")
self.server_os_platform = "Network Devices"
shell_access = False
else:
shell_access = True

self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)
return bool(intial_stdout)
except Exception as e:
self.logger.error(f"Authenticated but failed to execute command: {e}")
return False

if shell_access and self.server_os_platform == "Linux":
self.check_if_admin()
if self.admin_privs:
self.logger.debug(f"User {username} logged in successfully and is root!")
if self.args.key_file:
self.db.add_admin_user("key", username, password, host_id=host_id, cred_id=cred_id)
else:
self.db.add_admin_user(
"plaintext",
username,
password,
host_id=host_id,
cred_id=cred_id,
)
def execute_command(self, command):
_, stdout, _ = self.conn.exec_command(command)
output = stdout.read().decode(self.args.codec, errors="ignore")
return output

if self.args.key_file:
password = f"{process_secret(password)} (keyfile: {self.args.key_file})"
def check_windows_priv(self, output):
if "SeDebugPrivilege" in output:
self.admin_privs = True
self.user_principal = "admin"
elif "SeUndockPrivilege" in output:
self.admin_privs = True
self.user_principal = "admin (UAC)"
else:
self.user_principal = "user (low priv)"

display_shell_access = "{} {} {}".format(
f"({self.user_principal})" if self.admin_privs else f"(non {self.user_principal})",
self.server_os_platform,
"- Shell access!" if shell_access else ""
def handle_connection_failure(self, exception):
error_msg = str(exception)
if self.args.key_file:
self.password = (
f"{process_secret(self.password)} (keyfile: {self.args.key_file})"
)
self.logger.success(f"{username}:{password} {self.mark_pwned()} {highlight(display_shell_access)}")

return True
if "OpenSSH private key file checkints do not match" in error_msg:
self.logger.fail(
f"{self.username}:{self.password} - Could not decrypt key file, wrong password"
)
else:
self.logger.fail(f"{self.username}:{self.password} {exception}")

def execute(self, payload=None, get_output=False):
if not payload and self.args.execute:
Expand Down

0 comments on commit 0d84cbe

Please sign in to comment.