forked from icgc-dcc/Daco2Ego
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_ego_client.py
84 lines (61 loc) · 2.5 KB
/
test_ego_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import json
from oauthlib.oauth2 import BackendApplicationClient
from requests.auth import HTTPBasicAuth
from requests_oauthlib import OAuth2Session
from ego_client import EgoClient
def read_config(name="tests/test_ego.conf"):
with open(name) as f:
conf = json.load(f)
return conf
def get_oauth_authenticated_client(base_url, client_id, client_secret):
auth = HTTPBasicAuth(client_id, client_secret)
client = BackendApplicationClient(client_id=client_id)
oauth = OAuth2Session(client=client)
oauth.fetch_token(token_url=base_url + '/oauth/token', auth=auth)
return oauth
def init():
config = read_config()
client_id = config['client']['client_id']
client_secret = config['client']['client_secret']
base_url = config['client']['base_url']
dac_api_url = config['client']['dac_api_url']
rest_client = get_oauth_authenticated_client(base_url, client_id, client_secret)
ego_client = EgoClient(base_url, rest_client, dac_api_url)
return ego_client
def test_ego_client():
client = init()
config = read_config()
daco_group_name = config['client']['daco_group']
cloud_group_name = config['client']['cloud_group']
# users cannot be created manually in Ego 4, so this test needs to be run with an existing user
# this user should not be in the daco or cloud group when test is initiated
user, name = "[email protected]", "Test User"
if not client.user_exists(user):
assert client.ego_user_not_found(user)
exists(client, user, True)
has_daco(client, user, False)
has_cloud(client, user, False)
client.add(daco_group_name, [user])
exists(client, user, True)
has_daco(client, user, True)
has_cloud(client, user, False)
client.add(cloud_group_name,[user])
has_daco(client, user, True)
has_cloud(client, user, True)
client.remove(cloud_group_name,[user])
has_daco(client, user, True)
has_cloud(client, user, False)
client.remove(daco_group_name,[user])
has_daco(client, user, False)
has_cloud(client, user, False)
exists(client, user, True)
def exists(client, user, status):
assert client.user_exists(user) == status
def has_daco(client, user, status):
config = read_config()
daco_group_name = config['client']['daco_group']
assert client.is_member(daco_group_name, user) == status
def has_cloud(client, user, status):
config = read_config()
cloud_group_name = config['client']['cloud_group']
assert client.is_member(cloud_group_name, user) == status