diff --git a/ocw/lib/EC2.py b/ocw/lib/EC2.py index 11f6fa4a..2ec1c2d9 100644 --- a/ocw/lib/EC2.py +++ b/ocw/lib/EC2.py @@ -1,6 +1,6 @@ import traceback import time -from datetime import date, datetime, timedelta, timezone +from datetime import date, datetime, timedelta from typing import Dict import boto3 from botocore.exceptions import ClientError @@ -22,10 +22,6 @@ def __init__(self, namespace: str): self.all_regions = ConfigFile().getList('default/ec2_regions') else: self.all_regions = self.get_all_regions() - if PCWConfig.has('clusters/ec2_regions'): - self.cluster_regions = ConfigFile().getList('clusters/ec2_regions') - else: - self.cluster_regions = self.get_all_regions() def __new__(cls, vault_namespace: str): if vault_namespace not in EC2.__instances: @@ -140,45 +136,6 @@ def delete_instance(self, region: str, instance_id: str): else: raise ex - def wait_for_empty_nodegroup_list(self, region: str, cluster_name: str, timeout_minutes: int = 20): - if self.dry_run: - self.log_info("Skip waiting due to dry-run mode") - return None - self.log_dbg("Waiting empty nodegroup list in {}", cluster_name) - end = datetime.now(timezone.utc) + timedelta(minutes=timeout_minutes) - resp_nodegroup = self.eks_client(region).list_nodegroups(clusterName=cluster_name) - - while datetime.now(timezone.utc) < end and len(resp_nodegroup['nodegroups']) > 0: - time.sleep(20) - resp_nodegroup = self.eks_client(region).list_nodegroups(clusterName=cluster_name) - if len(resp_nodegroup['nodegroups']) > 0: - self.log_dbg("Still waiting for {} nodegroups to disappear", len(resp_nodegroup['nodegroups'])) - return None - - def delete_all_clusters(self) -> None: - self.log_info("Deleting all clusters!") - for region in self.cluster_regions: - response = self.eks_client(region).list_clusters() - if len(response['clusters']): - self.log_dbg("Found {} cluster(s) in {}", len(response['clusters']), region) - for cluster in response['clusters']: - resp_nodegroup = self.eks_client(region).list_nodegroups(clusterName=cluster) - if len(resp_nodegroup['nodegroups']): - self.log_dbg("Found {} nodegroups for {}", len(resp_nodegroup['nodegroups']), cluster) - for nodegroup in resp_nodegroup['nodegroups']: - if self.dry_run: - self.log_info("Skipping {} nodegroup deletion due to dry-run mode", nodegroup) - else: - self.log_info("Deleting {}", nodegroup) - self.eks_client(region).delete_nodegroup( - clusterName=cluster, nodegroupName=nodegroup) - self.wait_for_empty_nodegroup_list(region, cluster) - if self.dry_run: - self.log_info("Skipping {} cluster deletion due to dry-run mode", cluster) - else: - self.log_info("Finally deleting {} cluster", cluster) - self.eks_client(region).delete_cluster(name=cluster) - def cleanup_all(self) -> None: valid_period_days = PCWConfig.get_feature_property('cleanup', 'ec2-max-age-days', self._namespace) @@ -190,27 +147,16 @@ def cleanup_all(self) -> None: if PCWConfig.getBoolean('cleanup/vpc_cleanup', self._namespace): self.cleanup_vpcs() - def delete_elastic_ips(self, region): - self.log_info('Deleting elastic IPs in {}'.format(region)) - response = self.ec2_client(region).describe_addresses() - for addr in response['Addresses']: - if 'AssociationId' in addr: - self.log_info('Disassosiate IP with AssociationId:{}'.format(addr['AssociationId'])) - self.ec2_client(region).disassociate_address(AssociationId=addr['AssociationId']) - self.log_info('Release IP with AllocationId:{}'.format(addr['AllocationId'])) - self.ec2_client(region).release_address(AllocationId=addr['AllocationId']) - - def delete_vpc(self, region: str, vpc, vpc_id: str) -> None: + def delete_vpc(self, region: str, vpc, vpc_id: str): try: self.log_info('{} has no associated instances. Initializing cleanup of it', vpc) - self.delete_elastic_ips(region) - self.delete_internet_gw(vpc) - self.delete_routing_tables(vpc) - self.delete_vpc_endpoints(region, vpc_id) + self.delete_routing_tables(region, vpc_id) self.delete_security_groups(vpc) - self.delete_vpc_peering_connections(region, vpc_id) self.delete_network_acls(vpc) self.delete_vpc_subnets(vpc) + self.delete_internet_gw(vpc) + self.delete_vpc_endpoints(region, vpc_id) + self.delete_vpc_peering_connections(region, vpc_id) if self.dry_run: self.log_info('Deletion of VPC skipped due to dry_run mode') else: @@ -277,16 +223,35 @@ def delete_vpc_endpoints(self, region, vpc_id): self.log_info('Deleting {}', end_point) self.ec2_client(region).delete_vpc_endpoints(VpcEndpointIds=[end_point['VpcEndpointId']]) - def delete_routing_tables(self, vpc) -> None: + def delete_routing_tables(self, region: str, vpc_id: str) -> None: self.log_dbg('Call delete_routing_tables') - for rtable in vpc.route_tables.all(): - # we can not delete main RouteTable's , not main one don't have associations_attributes - if len(rtable.associations_attribute) == 0: + vpc_filter = [{"Name": "vpc-id", "Values": [vpc_id]}] + route_tables = self.ec2_client(region).describe_route_tables(Filters=vpc_filter)['RouteTables'] + self.log_dbg('Got {} routing tables', len(route_tables)) + for route_table in route_tables: + for association in route_table['Associations']: + if not association['Main']: + if self.dry_run: + self.log_info('{} disassociation with routing table won\'t happen due to dry_run mode', + association['RouteTableAssociationId']) + else: + self.log_info('{} disassociation with routing table will happen', + association['RouteTableAssociationId']) + self.ec2_client(region).disassociate_route_table(AssociationId=association['RouteTableAssociationId']) + for route in route_table['Routes']: + if route['GatewayId'] != 'local': + if self.dry_run: + self.log_info('{} route will not be deleted due to dry_run mode', route_table) + else: + self.log_info('{} route will be deleted', route_table) + self.ec2_client(region).delete_route(RouteTableId=route_table['RouteTableId'], + DestinationCidrBlock=route['DestinationCidrBlock']) + if route_table['Associations'] == []: if self.dry_run: - self.log_info('{} will be not deleted due to dry_run mode', rtable) + self.log_info('{} routing table will not be deleted due to dry_run mode', route_table['RouteTableId']) else: - self.log_info('Deleting {}', rtable) - rtable.delete() + self.log_info('{} routing table will be deleted due to dry_run mode', route_table['RouteTableId']) + self.ec2_client(region).delete_route_table(RouteTableId=route_table['RouteTableId']) def delete_internet_gw(self, vpc) -> None: self.log_dbg('Call delete_internet_gw') @@ -307,25 +272,32 @@ def cleanup_vpcs(self) -> None: response = self.ec2_client(region).describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['false']}]) self.log_dbg("Found {} VPC\'s in {}", len(response['Vpcs']), region) for response_vpc in response['Vpcs']: - self.log_dbg('Found {} in {}. (OwnerId={}).', response_vpc['VpcId'], region, response_vpc['OwnerId']) + vpc_id = response_vpc['VpcId'] + if self.volume_protected(response_vpc): + self.log_dbg('{} has protection tag pcw_ignore obey the order!', vpc_id) + continue + self.log_dbg('Found {} in {}. (OwnerId={}).', vpc_id, region, response_vpc['OwnerId']) if PCWConfig.getBoolean('cleanup/vpc-notify-only', self._namespace): - vpc_notify.append(response_vpc["VpcId"]) + vpc_notify.append(vpc_id) else: - resource_vpc = self.ec2_resource(region).Vpc(response_vpc['VpcId']) - can_be_deleted = True - for subnet in resource_vpc.subnets.all(): - if len(list(subnet.instances.all())) > 0: - self.log_info('{} has associated instance(s) so can not be deleted', - response_vpc['VpcId']) - can_be_deleted = False - break - if can_be_deleted: - del_responce = self.delete_vpc(region, resource_vpc, response_vpc['VpcId']) + resource_vpc = self.ec2_resource(region).Vpc(vpc_id) + if self.vpc_can_be_deleted(resource_vpc, vpc_id): + del_responce = self.delete_vpc(region, resource_vpc, vpc_id) if del_responce is not None: + self.log_err(del_responce) vpc_errors.append(del_responce) elif not self.dry_run: - vpc_locked.append(f'{response_vpc["VpcId"]} (OwnerId={response_vpc["OwnerId"]})\ - in {region} is locked') + vpc_locked.append(f'{vpc_id} (OwnerId={response_vpc["OwnerId"]}) in {region} is locked') + self.report_cleanup_results(vpc_errors, vpc_notify, vpc_locked) + + def vpc_can_be_deleted(self, resource_vpc, vpc_id) -> bool: + for subnet in resource_vpc.subnets.all(): + if len(list(subnet.instances.all())) > 0: + self.log_info('{} has associated instance(s) so can not be deleted', vpc_id) + return False + return True + + def report_cleanup_results(self, vpc_errors: list, vpc_notify: list, vpc_locked: list) -> None: if len(vpc_errors) > 0: send_mail(f'Errors on VPC deletion in [{self._namespace}]', '\n'.join(vpc_errors)) if len(vpc_notify) > 0: diff --git a/ocw/lib/eks.py b/ocw/lib/eks.py index 903b8544..6c830111 100644 --- a/ocw/lib/eks.py +++ b/ocw/lib/eks.py @@ -1,7 +1,10 @@ import os import json +import time +from datetime import datetime, timedelta, timezone import kubernetes import boto3 +from webui.PCWConfig import PCWConfig, ConfigFile from ocw.lib.provider import Provider from ocw.lib.k8s import clean_jobs @@ -11,13 +14,13 @@ class EKS(Provider): __instances = {} default_region: str = 'eu-central-1' + __cluster_regions = [] def __new__(cls, vault_namespace): if vault_namespace not in EKS.__instances: EKS.__instances[vault_namespace] = self = object.__new__(cls) self.__eks_client = {} self.__kubectl_client = {} - self.__cluster_regions = None self.__aws_dir = None return EKS.__instances[vault_namespace] @@ -25,19 +28,19 @@ def __new__(cls, vault_namespace): def __init__(self, namespace: str): super().__init__(namespace) self.create_credentials_file() + if len(EKS.__cluster_regions) == 0: + if PCWConfig.has('clusters/ec2_regions'): + EKS.__cluster_regions = ConfigFile().getList('clusters/ec2_regions') + else: + regions_query = self.cmd_exec(f"aws ec2 describe-regions --query 'Regions[].RegionName'\ + --output json --region {EKS.default_region}") + EKS.__cluster_regions = json.loads(regions_query.stdout) def aws_dir(self): if self.__aws_dir is None: self.__aws_dir = os.path.expanduser("~/.aws") return self.__aws_dir - def list_regions(self): - if self.__cluster_regions is None: - regions_query = self.cmd_exec( - f"aws ec2 describe-regions --query 'Regions[].RegionName' --output json --region {EKS.default_region}") - self.__cluster_regions = json.loads(regions_query.stdout) - return self.__cluster_regions - def create_credentials_file(self) -> None: creds_file = f"{self.aws_dir()}/credentials" @@ -82,7 +85,7 @@ def kubectl_client(self, region: str, cluster_name: str): def all_clusters(self) -> dict: clusters = {} - for region in self.list_regions(): + for region in EKS.__cluster_regions: self.log_dbg("Checking clusters in {}", region) response = self.eks_client(region).list_clusters() if 'clusters' in response and len(response['clusters']) > 0: @@ -98,8 +101,47 @@ def all_clusters(self) -> dict: del clusters[region] return clusters + def wait_for_empty_nodegroup_list(self, region: str, cluster_name: str, timeout_minutes: int = 20): + if self.dry_run: + self.log_info("Skip waiting due to dry-run mode") + return None + self.log_dbg("Waiting empty nodegroup list in {}", cluster_name) + end = datetime.now(timezone.utc) + timedelta(minutes=timeout_minutes) + resp_nodegroup = self.eks_client(region).list_nodegroups(clusterName=cluster_name) + + while datetime.now(timezone.utc) < end and len(resp_nodegroup['nodegroups']) > 0: + time.sleep(20) + resp_nodegroup = self.eks_client(region).list_nodegroups(clusterName=cluster_name) + if len(resp_nodegroup['nodegroups']) > 0: + self.log_dbg("Still waiting for {} nodegroups to disappear", len(resp_nodegroup['nodegroups'])) + return None + + def delete_all_clusters(self) -> None: + self.log_info("Deleting all clusters!") + for region in self.__cluster_regions: + response = self.eks_client(region).list_clusters() + if len(response['clusters']): + self.log_dbg("Found {} cluster(s) in {}", len(response['clusters']), region) + for cluster in response['clusters']: + resp_nodegroup = self.eks_client(region).list_nodegroups(clusterName=cluster) + if len(resp_nodegroup['nodegroups']): + self.log_dbg("Found {} nodegroups for {}", len(resp_nodegroup['nodegroups']), cluster) + for nodegroup in resp_nodegroup['nodegroups']: + if self.dry_run: + self.log_info("Skipping {} nodegroup deletion due to dry-run mode", nodegroup) + else: + self.log_info("Deleting {}", nodegroup) + self.eks_client(region).delete_nodegroup( + clusterName=cluster, nodegroupName=nodegroup) + self.wait_for_empty_nodegroup_list(region, cluster) + if self.dry_run: + self.log_info("Skipping {} cluster deletion due to dry-run mode", cluster) + else: + self.log_info("Finally deleting {} cluster", cluster) + self.eks_client(region).delete_cluster(name=cluster) + def cleanup_k8s_jobs(self): - for region in self.list_regions(): + for region in self.__cluster_regions: self.log_dbg(f"Region {region}") clusters = self.eks_client(region).list_clusters()['clusters'] for cluster_name in clusters: diff --git a/tests/test_ec2.py b/tests/test_ec2.py index 62c636bb..027afbc1 100644 --- a/tests/test_ec2.py +++ b/tests/test_ec2.py @@ -93,6 +93,39 @@ class MockedEC2Client(): snapshotid_i_have_ami = 'you_can_not_delete_me' delete_snapshot_raise_error = False delete_vpc_endpoints_called = False + disassociate_route_table_called = False + delete_route_called = False + routing_tables = {'RouteTables': [ + { + 'Associations': [ + {'Main': True}, + {'RouteTableAssociationId': '1', + 'Main': False} + ], + 'Routes': [ + {'GatewayId': 'local'}, + {'GatewayId': 'not_local', + 'RouteTableId': '1', + 'DestinationCidrBlock': 'CIDR' + } + ], + 'RouteTableId': '1' + }, + { + 'Associations': [ + {'RouteTableAssociationId': '2', + 'Main': False} + ], + 'Routes': [ + {'GatewayId': 'local'}, + {'GatewayId': 'not_local', + 'RouteTableId': '2', + 'DestinationCidrBlock': 'CIDR' + } + ], + 'RouteTableId': '2' + } + ]} ec2_snapshots = {snapshotid_to_delete: 'snapshot', snapshotid_i_have_ami: 'snapshot'} @@ -130,6 +163,17 @@ def delete_vpc_endpoints(self, VpcEndpointIds): def describe_vpc_peering_connections(self, Filters): return MockedEC2Client.response + def disassociate_route_table(self, AssociationId): + if AssociationId == '2': + MockedEC2Client.disassociate_route_table_called = True + + def describe_route_tables(self, Filters): + return MockedEC2Client.routing_tables + + def delete_route(self, RouteTableId, DestinationCidrBlock): + if RouteTableId == '2': + MockedEC2Client.delete_route_called = True + class MockedSMTP: mimetext = '' @@ -295,13 +339,11 @@ def test_cleanup_uploader_vpc_no_mail_sent_due_dry_run(ec2_patch_for_vpc): def test_delete_vpc_deleting_everything(ec2_patch, monkeypatch): - def mocked_delete_elastic_ips(arg1, arg2): - delete_vpc_calls_stack.append('delete_elastic_ips') def mocked_delete_internet_gw(arg1, arg2): delete_vpc_calls_stack.append('delete_internet_gw') - def mocked_delete_routing_tables(arg1, arg2): + def mocked_delete_routing_tables(arg1, arg2, arg3): delete_vpc_calls_stack.append('delete_routing_tables') def mocked_delete_vpc_endpoints(arg1, arg2, arg3): @@ -322,7 +364,6 @@ def mocked_delete_vpc_subnets(arg1, arg2): # emulated that there is no linked running instance to VPC which we trying to delete MockedInstances.is_empty = True - monkeypatch.setattr(EC2, 'delete_elastic_ips', mocked_delete_elastic_ips) monkeypatch.setattr(EC2, 'delete_internet_gw', mocked_delete_internet_gw) monkeypatch.setattr(EC2, 'delete_routing_tables', mocked_delete_routing_tables) monkeypatch.setattr(EC2, 'delete_vpc_endpoints', mocked_delete_vpc_endpoints) @@ -332,17 +373,16 @@ def mocked_delete_vpc_subnets(arg1, arg2): monkeypatch.setattr(EC2, 'delete_vpc_subnets', mocked_delete_vpc_subnets) ec2_patch.delete_vpc('region', MockedVpc('vpcId'), 'vpcId') - assert delete_vpc_calls_stack == ['delete_elastic_ips', 'delete_internet_gw', 'delete_routing_tables', - 'delete_vpc_endpoints', 'delete_security_groups', - 'delete_vpc_peering_connections', 'delete_network_acls', - 'delete_vpc_subnets', 'boto3_delete_vpc'] + assert delete_vpc_calls_stack == ['delete_routing_tables', 'delete_security_groups', 'delete_network_acls', + 'delete_vpc_subnets', 'delete_internet_gw', 'delete_vpc_endpoints', + 'delete_vpc_peering_connections', 'boto3_delete_vpc'] def test_delete_vpc_return_exception_str(ec2_patch_for_vpc, monkeypatch): - def mocked_dont_call_it(arg1, arg2): + def mocked_dont_call_it(arg1, arg2, arg3): raise Exception - monkeypatch.setattr(EC2, 'delete_elastic_ips', mocked_dont_call_it) + monkeypatch.setattr(EC2, 'delete_routing_tables', mocked_dont_call_it) ret = ec2_patch_for_vpc.delete_vpc('region', MockedVpc('vpcId'), 'vpcId') assert '[vpcId]Exception on VPC deletion. Traceback (most recent call last)' in ret @@ -367,8 +407,9 @@ def test_delete_internet_gw(ec2_patch): def test_delete_routing_tables(ec2_patch): - ec2_patch.delete_routing_tables(MockedVpc('vpcId')) - assert MockedCollectionItem.delete_called == 2 + ec2_patch.delete_routing_tables(MockedVpc('vpcId'), 'vpcId') + assert MockedEC2Client.disassociate_route_table_called + assert MockedEC2Client.delete_route_called def test_delete_vpc_endpoints(ec2_patch): @@ -379,7 +420,7 @@ def test_delete_vpc_endpoints(ec2_patch): def test_delete_security_groups(ec2_patch): ec2_patch.delete_security_groups(MockedVpc('vpcId')) - assert MockedCollectionItem.delete_called == 3 + assert MockedCollectionItem.delete_called == 2 def test_delete_vpc_peering_connections(ec2_patch): @@ -390,12 +431,12 @@ def test_delete_vpc_peering_connections(ec2_patch): def test_delete_network_acls(ec2_patch): ec2_patch.delete_network_acls(MockedVpc('vpcId')) - assert MockedCollectionItem.delete_called == 4 + assert MockedCollectionItem.delete_called == 3 def test_delete_vpc_subnets(ec2_patch): ec2_patch.delete_vpc_subnets(MockedVpc('vpcId')) - assert MockedCollectionItem.delete_called == 5 + assert MockedCollectionItem.delete_called == 4 assert MockedInterface.delete_called diff --git a/tests/test_eks.py b/tests/test_eks.py index 729b6869..6afd1ecf 100644 --- a/tests/test_eks.py +++ b/tests/test_eks.py @@ -1,7 +1,7 @@ import os +from datetime import datetime, timezone, timedelta import pytest import kubernetes -from datetime import datetime, timezone, timedelta from ocw.lib.provider import Provider from ocw.lib.eks import EKS from webui.PCWConfig import PCWConfig @@ -94,23 +94,24 @@ def __init__(self, name, age): class MockedSubprocessReturn(): - def __init__(self, returncode=0, stderr=""): + def __init__(self, returncode=0, stdout="", stderr=""): self.returncode = returncode self.stderr = stderr + self.stdout = stdout @pytest.fixture def eks_patch(monkeypatch): + def mocked_cmd_exec(self, cmd): + if "describe-regions" in cmd: + return MockedSubprocessReturn(stdout="[\"region1\"]") + return MockedSubprocessReturn(0) + monkeypatch.setattr(PCWConfig, 'get_feature_property', mock_get_feature_property) monkeypatch.setattr(Provider, 'read_auth_json', lambda *args, **kwargs: {'access_key': 'key', 'secret_key': 'secret'}) - monkeypatch.setattr(EKS, 'list_regions', lambda self: ['region1']) - monkeypatch.setattr(Provider, 'cmd_exec', lambda *args, **kwargs: MockedSubprocessReturn(0)) + monkeypatch.setattr(Provider, 'cmd_exec', mocked_cmd_exec) monkeypatch.setattr(EKS, 'aws_dir', lambda self: '/tmp') - - # monkeypatch.setattr(EC2, 'check_credentials', lambda *args, **kwargs: True) - # monkeypatch.setattr(EKS, 'create_credentials_file', lambda *args, **kwargs: '{}') - return EKS('fake') @@ -139,7 +140,7 @@ def test_create_credentials_file(eks_patch, monkeypatch): assert os.path.exists("/tmp/credentials") # Invalid credentials, 'aws sts get-caller-identity' fails - monkeypatch.setattr(Provider, 'cmd_exec', lambda *args, **kwargs: MockedSubprocessReturn(1, "test")) + monkeypatch.setattr(Provider, 'cmd_exec', lambda *args, **kwargs: MockedSubprocessReturn(returncode=1, stderr="test")) error = "Invalid credentials, the credentials cannot be verified by'aws sts get-caller-identity' with the error: test" with pytest.raises(RuntimeError, match=error): eks_patch.create_credentials_file()