Skip to content

Commit

Permalink
Updated cleanup to remove firewall rules and raise error if VPC wasn'…
Browse files Browse the repository at this point in the history
…t deleted properly
  • Loading branch information
Costya-Y committed Sep 3, 2024
1 parent b16d674 commit cf417a6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 53 deletions.
17 changes: 12 additions & 5 deletions cloudshell/cp/gcp/flows/cleanup_infra_flow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from __future__ import annotations

from contextlib import suppress

from attr import define
from cloudshell.cp.core.flows.cleanup_sandbox_infra import \
AbstractCleanupSandboxInfraFlow
from cloudshell.cp.core.request_actions.models import CleanupNetwork
from google.api_core.exceptions import NotFound
from typing_extensions import TYPE_CHECKING

from cloudshell.cp.gcp.handlers.firewall_rule import FirewallRuleHandler
from cloudshell.cp.gcp.handlers.ssh_keys import SSHKeysHandler
from cloudshell.cp.gcp.handlers.subnet import SubnetHandler
from cloudshell.cp.gcp.handlers.vpc import VPCHandler
Expand Down Expand Up @@ -66,11 +70,10 @@ def delete_vpc_components(self, network_name: str) -> None:
subnet_handler.delete(subnet_name=subnet)

# Delete firewall rules
# firewall_rules = self.firewall_client.list(project=self.credentials.project_id, filter=f"network={network_name}")
# for rule in firewall_rules:
# self.logger.info(f"Deleting firewall rule: {rule.name}")
# operation = self.firewall_client.delete(project=self.credentials.project_id, firewall=rule.name)
# self.wait_for_operation(name=operation.name)
firewall_handler = FirewallRuleHandler(self.config.credentials)
for rule in firewall_handler.list_firewall_rules_by_network(network_name):
self.logger.info(f"Deleting firewall rule: {rule.name}")
firewall_handler.delete(rule_name=rule.name)

# Delete routes
# routes = self.route_client.list(project=self.credentials.project_id, filter=f"network={network_name}")
Expand All @@ -82,5 +85,9 @@ def delete_vpc_components(self, network_name: str) -> None:
network_handler = VPCHandler(self.config.credentials)
network_handler.delete(network_name=network_name)

with suppress(NotFound):
network_handler.get_vpc_by_name(network_name)
raise Exception(f"Failed to delete VPC '{network_name}'.")

self.logger.info(f"All components of VPC '{network_name}' deleted "
f"successfully.")
62 changes: 14 additions & 48 deletions cloudshell/cp/gcp/handlers/firewall_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from google.api_core.exceptions import NotFound
from google.cloud import compute_v1
from google.cloud.compute_v1.services.firewalls.pagers import ListPager
from google.cloud.compute_v1.types import Allowed, Denied

from cloudshell.cp.gcp.handlers.base import BaseGCPHandler
Expand Down Expand Up @@ -34,33 +35,19 @@ def get_lower_priority(self, security_group_name: str) -> int:
priorities = [rule.priority for rule in firewall.allowed]
return min(priorities) - 1 if priorities else 1000

# def create(
# self,
# firewall_policy_name: str,
# network_name: str,
# allow_policy_rules: list,
# deny_policy_rules: list,
# ) -> Firewall:
# """"""
# # Define the firewall settings
# firewall = compute_v1.AddRuleRegionNetworkFirewallPolicyRequest()
# firewall.name = firewall_policy_name
# firewall.network = (
# f"projects/{self.credentials.project_id}/global/networks/{network_name}"
# )
# firewall.allowed = allow_policy_rules
# firewall.denied = deny_policy_rules
#
# # Create the firewall
# operation = self.firewall_client.insert(
# project=self.credentials.project_id, firewall_resource=firewall
# )
#
# # Wait for the operation to complete
# self.wait_for_operation(name=operation.name)
#
# logger.info(f"Security group '{firewall_policy_name}' created successfully.")
# return self.get_firewall_policy_by_name(firewall_policy_name).name
def list_firewall_rules_by_network(self, network_name) -> list[Firewall]:
"""List all Security Groups."""
rules = []
request = compute_v1.ListFirewallsRequest(project=self.credentials.project_id)

for firewall in self.firewall_client.list(request=request):
if firewall.network.endswith(
f"projects/{self.credentials.project_id}"
f"/global/networks/{network_name}"
):
rules.append(firewall)

return rules

def get_firewall_rule_by_name(self, rule_name: str) -> (
Firewall):
Expand All @@ -70,27 +57,6 @@ def get_firewall_rule_by_name(self, rule_name: str) -> (
project=self.credentials.project_id, firewall=rule_name
)

# def get_or_create_firewall_policy_by_name(
# self,
# firewall_policy_name: str,
# network: str,
# allow_policy_rules: list,
# deny_policy_rules: list
# ) -> Firewall:
# """Get Security Group instance by its name."""
# logger.info("Getting security group")
# with suppress(NotFound):
# return self.firewall_client.get(
# project=self.credentials.project_id,
# firewall=firewall_policy_name,
# )
# return self.create(
# firewall_policy_name,
# network,
# allow_policy_rules,
# deny_policy_rules
# )

def delete(self, rule_name: str) -> None:
"""Delete Security Group."""
operation = self.firewall_client.delete(
Expand Down

0 comments on commit cf417a6

Please sign in to comment.