-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathiam_role.py
266 lines (248 loc) · 9.75 KB
/
iam_role.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
"""
This module hosts the class `iam_role` used to retrieve AWS IAM roles
through AWS Config advanced queries
"""
import logging
import json
import re
import urllib.parse
import pandas
from data_perimeter_helper.referential import (
config_adv
)
from data_perimeter_helper.variables import Variables as Var
from data_perimeter_helper.referential.ResourceType import ResourceType
logger = logging.getLogger(__name__)
regex_service_name = re.compile(r'(.*)\.amazonaws\.com')
regex_service_linked_role = re.compile(
r"arn:aws:iam::\d+:role/aws-service-role/.*"
)
class iam_role(ResourceType):
"""All AWS IAM roles"""
def __init__(self):
super().__init__(
type_name="AWS::IAM::Role",
unknown_value="IAM_ROLE_NOT_IN_CONFIG_AGGREGATOR"
)
def populate(self, *args, **kwargs) -> pandas.DataFrame:
""" Retrieve all IAM roles inventoried in AWS Config aggregator
https://github.com/awslabs/aws-config-resource-schema/blob/master/config/properties/resource-types/AWS::IAM::Role.properties.json
:return: DataFrame with all IAM roles
"""
config_query = '''
SELECT
accountId,
configuration.assumeRolePolicyDocument,
configuration.roleId,
configuration.arn,
configuration.tags
WHERE
resourceType = 'AWS::IAM::Role'
'''
logger.debug("[-] Submitting Config advanced query")
results = config_adv.submit_config_advanced_query(
query=config_query,
transform_to_pandas=False,
)
logger.debug("[+] Submitting Config advanced query")
logger.debug("[-] Converting results to DataFrame")
assert isinstance(results, list) # nosec: B101
df = pandas.DataFrame(
[json.loads(_) for _ in results]
)
if len(df.index) == 0:
return df
df_configuration = pandas.json_normalize(df['configuration']) # type: ignore
df = pandas.concat([df['accountId'], df_configuration], axis=1)
logger.debug("[+] Converting results to DataFrame")
self.detect_duplicate(df)
logger.debug("[-] Enriching result")
# URL decode trust policy
df['assumeRolePolicyDocument'] = [
json.loads(urllib.parse.unquote(trust_policy))
for trust_policy in df['assumeRolePolicyDocument']
]
# Retrieves allowed principals from trust policy
df['allowedPrincipalList'] = [
iam_role.get_principal_from_trust_policy(trust_policy)
for trust_policy in df['assumeRolePolicyDocument']
]
# Checks if principals are assumable by an AWS service
df['isServiceRole'] = [
iam_role.is_service_role(arn, allowed_principal_list)
for arn, allowed_principal_list in zip(
df['arn'], df['allowedPrincipalList']
)
]
# Checks if principals are service-linked roles
df['isServiceLinkedRole'] = [
iam_role.is_service_linked_role(arn)
for arn in df['arn']
]
# Checks if principals are federated
df['isNetworkPerimeterHumanRole'] = [
iam_role.is_network_perimeter_human_role(account_id, role_arn)
for account_id, role_arn in zip(
df['accountId'], df['arn']
)
]
# Dropping uneeded columns
df = df.drop(columns=['assumeRolePolicyDocument'])
return df
@staticmethod
def detect_duplicate(
dataframe: pandas.DataFrame
) -> bool:
"""Detect duplicate of IAM roles and raise a warning.
This can happen if AWS Config has been configured to record global
resources in multiple AWS regions"""
logger.debug("[-] Checking for duplicates in results")
if len(dataframe.index) == 0:
return False
if "roleId" not in dataframe:
return False
lookup_column = "roleId"
lookup_value = dataframe[lookup_column].iloc[0]
lookup_first = dataframe.loc[
dataframe[lookup_column] == lookup_value
]
if len(lookup_first.index) > 1:
logger.warning(
"Duplicates records of AWS IAM roles detected in AWS Config "
"advanced query results. You may have enabled in AWS Config "
"recording of global resources for multiple AWS regions. "
"The best practice is to record global resources only in your "
"main AWS region."
)
return True
return False
@staticmethod
def flat_list_tag(list_tag: list) -> dict:
"""Take a list of tag formatted as [{'key': str, 'value': str}] and
return a flatten dict"""
tags = {}
if not isinstance(list_tag, list):
return tags
for tag_as_dict in list_tag:
try:
tags[tag_as_dict['key']] = tag_as_dict['value']
except KeyError:
logger.error("Malformatted tags: %s", tag_as_dict)
continue
return tags
@staticmethod
def get_principal_from_principal_field(statement: dict) -> list:
""" Retrieves allowed principals for a given statement in the trust
relationship policy
:param statement: statement of the trust relationship policy
:raises ValueError: if the trust policy contains unexpected values
:return: List of allowed principals in the statement
[
{
'type': 'Service' | 'Principal', 'PRINCIPAL_AS_STRING',
'principal': str
}
]
"""
principal_field = statement['Principal']
list_principal_statement = []
if isinstance(principal_field, str):
list_principal_statement = [
{
"type": "PRINCIPAL_AS_STRING",
"principal": principal_field
}
]
elif isinstance(principal_field, dict):
for principal_type, principal_value in principal_field.items():
if isinstance(principal_value, list):
list_principal_statement.extend([
{
"type": principal_type,
"principal": item
} for item in principal_value
])
elif isinstance(principal_value, str):
list_principal_statement.extend([
{
"type": principal_type,
"principal": principal_value
}
])
else:
raise ValueError(
"principal_value is neither a string or a list"
)
else:
raise ValueError(
"principal_field is neither a string or a list"
)
return list_principal_statement
@staticmethod
def get_principal_from_trust_policy(trust_policy: dict) -> list:
""" Parse statements of an IAM Trust Policy and call function
get_principal_from_principal_field for each statement
:param trust_policy: list of statements in the trust policy.
:return: list of principals in the IAM role trust policy.
[
{
'type': 'Service' | 'Principal',
'principal': str
}
]
"""
list_principal_from_trust_policy = []
list_statement = trust_policy['Statement']
for statement in list_statement:
list_principal_from_trust_policy.extend(
iam_role.get_principal_from_principal_field(statement)
)
return list_principal_from_trust_policy
@staticmethod
def is_service_role(role_arn: str, list_allowed_principal: list) -> bool:
"""Return True if an IAM role is a service role.
:param role_arn: IAM role ARN
:param list_allowed_principal: list of principals in the IAM role
trust policy.
:return: True if Principal is a service role
"""
if regex_service_linked_role.match(role_arn):
return False
for allowed_principal in list_allowed_principal:
if allowed_principal['type'] == 'Service':
if regex_service_name.match(allowed_principal['principal']):
return True
return False
@staticmethod
def is_service_linked_role(role_arn: str) -> bool:
"""Return True if IAM role is a service-linked role
:param role_arn: IAM role ARN.
:return: True if the IAM role is a service-linked role.
"""
if regex_service_linked_role.match(role_arn):
return True
return False
@staticmethod
def is_network_perimeter_human_role(
account_id: str,
role_arn: str,
) -> bool:
"""Return True if an IAM role is listed as a network perimeter human
role in the data perimeter helper configuration file.
:param role_arn: IAM role ARN.
:param list_principal: list of principals in the IAM role trust policy.
:return: True if the IAM role is identified as a network perimeter
human role in the data perimeter helper configuration file.
"""
list_network_perimeter_human_principal = Var.get_account_configuration(
account_id=account_id,
configuration_key='network_perimeter_human_role_arn'
)
regex_from_list = "|".join(list_network_perimeter_human_principal)
if re.search(f"(?i)({regex_from_list})", role_arn):
return True
return False