This repository has been archived by the owner on Apr 30, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathgrant-ssh-access-forced-command.py
executable file
·318 lines (234 loc) · 10.8 KB
/
grant-ssh-access-forced-command.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
#!/usr/bin/env python3
'''
Grant SSH access for a given user by fetching his public key from the server.
This script should be used as SSH forced command.
Testing this script with a local mock service:
.. code-block:: bash
$ sudo touch /etc/ssh-access-granting-service.yaml
$ sudo chown $USER /etc/ssh-access-granting-service.yaml
$ echo 'ssh_access_granting_service_url: "http://localhost:9000"' > /etc/ssh-access-granting-service.yaml
$ # serve your own public key via HTTP
$ mkdir -p public-keys/testuser
$ cp ~/.ssh/id_rsa.pub public-keys/testuser/sshkey.pub
$ python3 -m http.server 9000 &
$ ./grant-ssh-access-forced-command.py grant-ssh-access testuser
$ ssh testuser@localhost # try logging in
'''
import argparse
import datetime
import ipaddress
import os
import pwd
import re
import requests
import shlex
import socket
import subprocess
import sys
import syslog
import tempfile
import yaml
import time
from pathlib import Path
USER_NAME_PATTERN = re.compile('^[a-z][a-z0-9-]{0,31}$')
HOST_NAME_PATTERN = re.compile('^[a-z0-9.-]{0,255}$')
CONFIG_FILE_PATH = Path('/etc/ssh-access-granting-service.yaml')
MARKER = '(generated by even)'
WELCOME_MESSAGE = 'Your SSH access was granted by even (SSH access granting service) on {date}'
REVOKED_MESSAGE = 'Your SSH access was revoked by even (SSH access granting service) on {date}'
USER_COMMENT = 'SSH user created by even (SSH access granting service) on {date}'
DEFAULT_SHELL = '/bin/bash'
def date():
now = datetime.datetime.now()
return now.strftime('%Y-%m-%d %H:%M:%S')
def fix_ssh_pubkey(user: str, pubkey: str):
'''Validate that the given public SSH key looks like a valid OpenSSH key which can be used in authorized_keys'''
pubkey = pubkey.strip()
parts = pubkey.split()[:2] # just the type and the key, the "mail" is probably wrong
if not parts:
raise ValueError('Invalid SSH public key... the key is empty')
if not (parts[0].startswith('ssh-') or parts[0].startswith('ecdsa-')):
raise ValueError('Invalid SSH public key... no "rsa", "dsa" or "ecdsa" key...')
# TODO? check if it can be base64 decoded?
if len(parts[1]) % 4:
raise ValueError('Invalid SSH public key... length modulo 4 is not 0')
pubkey = ' '.join(parts)
if pubkey.find('@') != -1:
raise ValueError('Invalid SSH public key... no space between key and mail address')
# add user name as comment
pubkey += ' %s' % user
return pubkey
def get_config():
if CONFIG_FILE_PATH.exists():
with CONFIG_FILE_PATH.open('rb') as fd:
config = yaml.safe_load(fd)
else:
config = yaml.safe_load(subprocess.check_output(['sudo', 'cat', '/var/lib/cloud/instance/user-data.txt']))
return config
def get_service_url():
'''Get the service URL from the global config file or from cloud config YAML'''
config = get_config()
url = config['ssh_access_granting_service_url'].rstrip('/')
return url
def download_public_key(url, name):
'''Download the SSH public key for the given user name from URL'''
r = requests.get('{url}/public-keys/{name}/sshkey.pub'.format(url=url, name=name), timeout=10)
if r.status_code != 200:
raise Exception('Failed to download public key for "{}" from {}: server returned status {}'.format(
name, url, r.status_code))
pubkey = fix_ssh_pubkey(name, r.text)
return pubkey
def add_our_mark(pubkey):
return '{} {}'.format(pubkey, MARKER)
def add_forced_command(pubkey, forced_command):
if forced_command:
return 'command="{}",no-port-forwarding,no-X11-forwarding,no-agent-forwarding,no-pty {}'.format(forced_command, pubkey)
else:
return pubkey
def user_exists(user_name: str) -> bool:
try:
pwd.getpwnam(user_name)
return True
except:
return False
def get_keys_file_path(user_name: str) -> Path:
pw_entry = pwd.getpwnam(user_name)
ssh_dir = Path(pw_entry.pw_dir) / '.ssh'
keys_file = ssh_dir / 'authorized_keys'
return keys_file
def generate_authorized_keys(user_name: str, keys_file: Path, pubkey: str, forced_command: str=None):
ssh_dir = keys_file.parent
subprocess.check_call(['sudo', 'mkdir', '-p', str(ssh_dir)])
subprocess.check_call(['sudo', 'chown', user_name, str(ssh_dir)])
subprocess.check_call(['sudo', 'chmod', '0700', str(ssh_dir)])
# NOTE: we write the temporary SSH public key into tmpfs (shm) to also work in "disk full" situations
with tempfile.NamedTemporaryFile(suffix='{name}-sshkey.pub'.format(name=user_name), dir='/dev/shm') as fd:
fd.write(add_our_mark(add_forced_command(pubkey, forced_command)).encode('utf-8'))
fd.flush()
shell_template = 'cat {temp} > {keys_file} && chown {name} {keys_file} && chmod 600 {keys_file}'
subprocess.check_call(['sudo', 'sh', '-c',
shell_template.format(temp=fd.name, name=user_name, keys_file=keys_file)])
def write_welcome_message(home_dir: Path):
'''Write SSH welcome banner to ~/.profile'''
profile_path = home_dir / '.profile'
command = 'echo "echo {}" > {}'.format(shlex.quote(WELCOME_MESSAGE.format(date=date())), profile_path)
subprocess.check_call(['sudo', 'sh', '-c', command])
def is_remote_host_allowed(remote_host: str):
config = get_config()
allowed_networks = config.get('allowed_remote_networks', [])
host_ips = []
for addrinfo in socket.getaddrinfo(remote_host, 22, proto=socket.IPPROTO_TCP):
host_ips.append(ipaddress.ip_address(addrinfo[4][0]))
for net in allowed_networks:
for host_ip in host_ips:
if host_ip in ipaddress.ip_network(net):
return True
return False
def grant_ssh_access(args):
user_name = args.name
url = get_service_url()
pubkey = download_public_key(url, user_name)
try:
pwd.getpwnam(user_name)
except:
config = get_config()
try:
subprocess.check_call(['sudo', 'useradd',
'--user-group',
'--groups', ','.join(config.get('user_groups', ['adm'])),
'--shell', DEFAULT_SHELL,
'--create-home',
# colon is not allowed in the comment field..
'--comment', USER_COMMENT.format(date=date()).replace(':', '-'),
user_name])
except:
# out of disk space? try to continue anyway
pass
try:
keys_file = get_keys_file_path(user_name)
generate_authorized_keys(user_name, keys_file, pubkey)
write_welcome_message(keys_file.parent.parent)
except:
# out of disk space? use fallback and allow login via root
# /root/.ssh/ must be mounted as tmpfs (memory disk) for this to work!
generate_authorized_keys('root', Path('/root/.ssh/authorized_keys'), pubkey)
if args.remote_host:
if not is_remote_host_allowed(args.remote_host):
raise Exception('Remote host "{}" is not in one of the allowed networks'.format(args.remote_host))
grant_ssh_access_on_remote_host(user_name, args.remote_host)
def execute_ssh(user: str, host: str, command: str):
subprocess.check_call(['ssh',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'StrictHostKeyChecking=no',
'-o', 'BatchMode=yes',
'-o', 'ConnectTimeout=10',
'-l', 'granting-service', host, command, user])
def grant_ssh_access_on_remote_host(user: str, host: str):
execute_ssh(user, host, 'grant-ssh-access')
def revoke_ssh_access_on_remote_host(user: str, host: str):
execute_ssh(user, host, 'revoke-ssh-access')
def is_generated_by_us(keys_file):
'''verify that the user was created by us'''
output = subprocess.check_output(['sudo', 'cat', str(keys_file)])
return MARKER.encode('utf-8') in output
def kill_all_processes(user_name: str):
'''try to write session before killing all processes'''
subprocess.call(['sudo', 'killall', '-u', user_name])
time.sleep(2)
subprocess.call(['sudo', 'killall', '-KILL', '-u', user_name, '-w'])
def revoke_ssh_access(args: list):
user_name = args.name
if not args.keep_local and user_exists(user_name):
url = get_service_url()
pubkey = download_public_key(url, user_name)
keys_file = get_keys_file_path(user_name)
if not is_generated_by_us(keys_file):
raise Exception('Cannot revoke SSH access from user "{}": ' +
'the user was not created by this script.\n'.format(user_name))
forced_command = 'echo {}'.format(shlex.quote(REVOKED_MESSAGE.format(date=date())))
generate_authorized_keys(user_name, keys_file, pubkey, forced_command)
kill_all_processes(user_name)
if args.remote_host:
if not is_remote_host_allowed(args.remote_host):
raise Exception('Remote host "{}" is not in one of the allowed networks'.format(args.remote_host))
revoke_ssh_access_on_remote_host(user_name, args.remote_host)
def fail_on_missing_command():
sys.stderr.write('Missing command argument\n')
sys.exit(1)
def user_name(val: str):
'''Validate user name parameter'''
if not USER_NAME_PATTERN.match(val):
raise argparse.ArgumentTypeError('Invalid user name')
return val
def host_name(val: str):
'''Validate host name parameter'''
if not HOST_NAME_PATTERN.match(val):
raise argparse.ArgumentTypeError('Invalid host name')
return val
def main(argv: list):
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
cmd = subparsers.add_parser('grant-ssh-access')
cmd.set_defaults(func=grant_ssh_access)
cmd.add_argument('name', help='User name', type=user_name)
cmd.add_argument('--remote-host', help='Remote host to add user on', type=host_name)
cmd = subparsers.add_parser('revoke-ssh-access')
cmd.set_defaults(func=revoke_ssh_access)
cmd.add_argument('name', help='User name', type=user_name)
cmd.add_argument('--remote-host', help='Remote host to remove user from', type=host_name)
cmd.add_argument('--keep-local', help='Keep local SSH access, only remove on remote host', action='store_true')
args = parser.parse_args(argv)
if not hasattr(args, 'func'):
fail_on_missing_command()
syslog.openlog(ident=os.path.basename(__file__), logoption=syslog.LOG_PID, facility=syslog.LOG_AUTH)
syslog.syslog(' '.join(argv))
try:
args.func(args)
except Exception as e:
sys.stderr.write('ERROR: {}\n'.format(e))
sys.exit(1)
if __name__ == '__main__':
original_command = os.environ.get('SSH_ORIGINAL_COMMAND')
if original_command:
sys.argv[1:] = original_command.split()
main(sys.argv[1:])