From 933da8332d93434c2f33140cc99faf6609523a21 Mon Sep 17 00:00:00 2001 From: Matt Pryor Date: Tue, 23 Jul 2024 11:48:00 +0100 Subject: [PATCH 1/2] Rewrite operator to use easykube controllers --- Dockerfile | 10 +- capi_addons/models/v1alpha1/base.py | 8 +- capi_addons/operator.py | 678 ++++++++++++++-------------- requirements.txt | 17 +- setup.cfg | 5 +- 5 files changed, 348 insertions(+), 370 deletions(-) diff --git a/Dockerfile b/Dockerfile index 4f77b3d..d69712a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -66,12 +66,4 @@ COPY --from=python-builder /venv /venv USER $APP_UID ENTRYPOINT ["tini", "-g", "--"] -CMD [ \ - "/venv/bin/kopf", \ - "run", \ - "--module", \ - "capi_addons.operator", \ - "--all-namespaces", \ - "--liveness", \ - "http://0.0.0.0:8000/healthz" \ -] +CMD ["/venv/bin/cluster-api-addon-provider"] diff --git a/capi_addons/models/v1alpha1/base.py b/capi_addons/models/v1alpha1/base.py index 485d316..cb672ff 100644 --- a/capi_addons/models/v1alpha1/base.py +++ b/capi_addons/models/v1alpha1/base.py @@ -378,15 +378,15 @@ async def init_metadata( else settings.secret_annotation_prefix ) cloud_identity_name = cloud_identity["metadata"]["name"] - cloud_identity_label = f"{cloud_identity_label_prefix}/{cloud_identity_name}" + cloud_identity_label = f"{cloud_identity_name}.{cloud_identity_label_prefix}/uses" self.metadata.labels[cloud_identity_label] = "" # Add labels that allow us to filter by the configmaps and secrets that we reference self.metadata.labels.update({ - f"{settings.configmap_annotation_prefix}/{name}": "" + f"{name}.{settings.configmap_annotation_prefix}/uses": "" for name in self.list_configmaps() }) self.metadata.labels.update({ - f"{settings.secret_annotation_prefix}/{name}": "" + f"{name}.{settings.secret_annotation_prefix}/uses": "" for name in self.list_secrets() }) labels_updated = self.metadata.labels != previous_labels @@ -707,7 +707,7 @@ async def get_chart( cluster: typing.Dict[str, typing.Any], infra_cluster: typing.Dict[str, typing.Any], cloud_identity: typing.Optional[typing.Dict[str, typing.Any]] - ) -> contextlib.AbstractAsyncContextManager[Chart]: + ) -> typing.AsyncIterator[Chart]: # Write the files for the ephemeral chart with tempfile.TemporaryDirectory() as chart_directory: # Create the directories for the CRDs and templates diff --git a/capi_addons/operator.py b/capi_addons/operator.py index b9afce2..fa082ba 100644 --- a/capi_addons/operator.py +++ b/capi_addons/operator.py @@ -4,21 +4,21 @@ import functools import hashlib import logging -import sys import tempfile +import typing as t -import kopf +from pydantic.json import pydantic_encoder -from easykube import Configuration, ApiError, resources as k8s, PRESENT +from easykube import PRESENT, Configuration, AsyncClient, ApiError, runtime -from kube_custom_resource import CustomResourceRegistry +from kube_custom_resource import runtime_utils from pyhelm3 import Client as HelmClient from pyhelm3 import errors as helm_errors from . import models, template, utils -from .models import v1alpha1 as api from .config import settings +from .models import v1alpha1 as api logger = logging.getLogger(__name__) @@ -28,101 +28,38 @@ template_loader = template.Loader() -# Initialise an easykube config object from the environment -from pydantic.json import pydantic_encoder -ek_config = Configuration.from_environment(json_encoder = pydantic_encoder) - - -def create_ek_client(): - """ - Returns an easykube client for the default config. - """ - return ek_config.async_client(default_field_manager = settings.easykube_field_manager) - - -# Create a registry of custom resources and populate it from the models module -registry = CustomResourceRegistry(settings.api_group, settings.crd_categories) -registry.discover_models(models) - - -@kopf.on.startup() -async def apply_settings(**kwargs): - """ - Apply kopf settings and register CRDs. - """ - kopf_settings = kwargs["settings"] - kopf_settings.persistence.finalizer = f"{settings.annotation_prefix}/finalizer" - kopf_settings.persistence.progress_storage = kopf.AnnotationsProgressStorage( - prefix = settings.annotation_prefix - ) - kopf_settings.persistence.diffbase_storage = kopf.AnnotationsDiffBaseStorage( - prefix = settings.annotation_prefix, - key = "last-handled-configuration", - ) - kopf_settings.watching.client_timeout = settings.watch_timeout - async with create_ek_client() as ek_client: - # Apply the CRDs - for crd in registry: - try: - await ek_client.apply_object(crd.kubernetes_resource()) - except Exception: - logger.exception( - "error applying CRD %s.%s - exiting", - crd.plural_name, - crd.api_group - ) - sys.exit(1) - # Give Kubernetes a chance to create the APIs for the CRDs - await asyncio.sleep(0.5) - # Check to see if the APIs for the CRDs are up - # If they are not, the kopf watches will not start properly so we exit and get restarted - for crd in registry: - preferred_version = next(k for k, v in crd.versions.items() if v.storage) - api_version = f"{crd.api_group}/{preferred_version}" - try: - _ = await ek_client.get(f"/apis/{api_version}/{crd.plural_name}") - except Exception: - logger.exception( - "api for %s.%s not available - exiting", - crd.plural_name, - crd.api_group - ) - sys.exit(1) - - -def addon_handler(register_fn, **kwargs): +async def fetch_ref( + client: AsyncClient, + ref: t.Dict[str, t.Any], + default_namespace: str +) -> t.Dict[str, t.Any]: """ - Decorator that registers a handler with kopf for every addon that is defined. + Returns the object that is referred to by a ref. """ - def decorator(func): - @functools.wraps(func) - async def handler(**handler_kwargs): - # Get the model instance associated with the Kubernetes resource, making - # sure to account for nested addon handlers - if "addon" not in handler_kwargs: - handler_kwargs["addon"] = registry.get_model_instance(handler_kwargs["body"]) - return await func(**handler_kwargs) - for crd in registry: - preferred_version = next(k for k, v in crd.versions.items() if v.storage) - api_version = f"{crd.api_group}/{preferred_version}" - handler = register_fn(api_version, crd.kind, **kwargs)(handler) - return handler - return decorator + # By default, look for a secret unless otherwise specified + api_version = ref.get("apiVersion", "v1") + kind = ref.get("kind", "Secret") + name = ref["name"] + namespace = ref.get("namespace", default_namespace) + resource = await client.api(api_version).resource(kind) + return await resource.fetch(name, namespace = namespace) @contextlib.asynccontextmanager -async def clients_for_cluster(kubeconfig_secret): +async def clients_for_cluster( + kubeconfig_secret: t.Dict[str, t.Any] +) -> t.AsyncIterator[t.Tuple[AsyncClient, HelmClient]]: """ Context manager that yields a tuple of (easykube client, Helm client) configured to target the given Cluster API cluster. """ # We pass a Helm client that is configured for the target cluster with tempfile.NamedTemporaryFile() as kubeconfig: - kubeconfig_data = base64.b64decode(kubeconfig_secret.data["value"]) + kubeconfig_data = base64.b64decode(kubeconfig_secret["data"]["value"]) kubeconfig.write(kubeconfig_data) kubeconfig.flush() # Get an easykube client for the target cluster - ek_client_target = ( + ek_client = ( Configuration .from_kubeconfig_data(kubeconfig_data, json_encoder = pydantic_encoder) .async_client(default_field_manager = settings.easykube_field_manager) @@ -137,323 +74,380 @@ async def clients_for_cluster(kubeconfig_secret): unpack_directory = settings.helm_client.unpack_directory ) # Yield the clients as a tuple - async with ek_client_target: - yield (ek_client_target, helm_client) - - -async def fetch_ref(ek_client, ref, default_namespace): - """ - Returns the object that is referred to by a ref. - """ - # By default, look for a secret unless otherwise specified - api_version = ref.get("apiVersion", "v1") - kind = ref.get("kind", "Secret") - name = ref["name"] - namespace = ref.get("namespace", default_namespace) - resource = await ek_client.api(api_version).resource(kind) - return await resource.fetch(name, namespace = namespace) + async with ek_client: + yield (ek_client, helm_client) -async def until_deleted(addon): +async def until_addon_deleted(client: AsyncClient, addon: api.Addon): """ Runs until the given addon is deleted. Used as a circuit-breaker to stop an in-progress install/upgrade when an addon is deleted. """ - async with create_ek_client() as ek_client: - ekapi = ek_client.api(addon.api_version) - resource = await ekapi.resource(addon._meta.plural_name) - try: - while True: - await asyncio.sleep(5) - addon_obj = await resource.fetch( - addon.metadata.name, - namespace = addon.metadata.namespace - ) - if addon_obj.metadata.get("deletionTimestamp"): - return - except asyncio.CancelledError: - return - - -@addon_handler(kopf.on.create) -# Run when the annotations are updated as well as the spec -# This means that when we change an annotation in response to a configmap or secret being -# changed, the upgrade logic will be executed for the new configmap or secret content -@addon_handler(kopf.on.update, field = "metadata.annotations") -@addon_handler(kopf.on.update, field = "spec") -# Also run on resume - if nothing has changed, no Helm release will be made -@addon_handler(kopf.on.resume) -async def handle_addon_updated(addon, **kwargs): - """ - Executes whenever an addon is created or the annotations or spec of an addon are updated. + ekapi = client.api(addon.api_version) + resource = await ekapi.resource(addon._meta.plural_name) + try: + while True: + await asyncio.sleep(5) + addon_obj = await resource.fetch( + addon.metadata.name, + namespace = addon.metadata.namespace + ) + if addon_obj.metadata.get("deletionTimestamp"): + return + except asyncio.CancelledError: + return - Every type of addon eventually ends up as a Helm release on the target cluster as we - want release semantics even if the manifests are not rendered by Helm - in particular - we want to identify and remove resources that are no longer part of the addon. - For non-Helm addons, this is done by turning the manifests into an "ephemeral chart" - which is rendered and then disposed. +async def reconcile_addon_normal( + client: AsyncClient, + addon: api.Addon, + logger: logging.Logger +) -> t.Tuple[api.Addon, runtime.Result]: """ - async with create_ek_client() as ek_client: - try: - # Make sure the status has been initialised at the earliest possible opportunity - await addon.init_status(ek_client) - # Check that the cluster associated with the addon exists - ekapi = await ek_client.api_preferred_version("cluster.x-k8s.io") - resource = await ekapi.resource("clusters") - cluster = await resource.fetch( - addon.spec.cluster_name, + Reconciles the given instance. + """ + try: + # Make sure the status has been initialised at the earliest possible opportunity + await addon.init_status(client) + # Check that the cluster associated with the addon exists + ekapi = await client.api_preferred_version("cluster.x-k8s.io") + resource = await ekapi.resource("clusters") + cluster = await resource.fetch( + addon.spec.cluster_name, + namespace = addon.metadata.namespace + ) + # Get the infra cluster for the CAPI cluster + infra_cluster = await fetch_ref( + client, + cluster.spec["infrastructureRef"], + cluster.metadata.namespace + ) + # Get the cloud identity for the infra cluster, if it exists + # It is made available to templates in case they need to configure access to the cloud + id_ref = infra_cluster.spec.get("identityRef") + if id_ref: + cloud_identity = await fetch_ref(client, id_ref, cluster.metadata.namespace) + else: + cloud_identity = None + # Make sure that the cluster owns the addon and the addon has the required labels + await addon.init_metadata(client, cluster, cloud_identity) + # Make sure any referenced configmaps or secrets will be watched + configmaps = await client.api("v1").resource("configmaps") + for configmap in addon.list_configmaps(): + await configmaps.patch( + configmap, + {"metadata": {"labels": {settings.watch_label: ""}}}, namespace = addon.metadata.namespace ) - # Get the infra cluster for the CAPI cluster - infra_cluster = await fetch_ref( - ek_client, - cluster.spec["infrastructureRef"], - cluster.metadata.namespace + secrets = await client.api("v1").resource("secrets") + for secret in addon.list_secrets(): + await secrets.patch( + secret, + {"metadata": {"labels": {settings.watch_label: ""}}}, + namespace = addon.metadata.namespace ) - # Get the cloud identity for the infra cluster, if it exists - # It is made available to templates in case they need to configure access to the cloud - id_ref = infra_cluster.spec.get("identityRef") - if id_ref: - cloud_identity = await fetch_ref(ek_client, id_ref, cluster.metadata.namespace) - else: - cloud_identity = None - # Make sure that the cluster owns the addon and the addon has the required labels - await addon.init_metadata(ek_client, cluster, cloud_identity) - # Make sure any referenced configmaps or secrets will be watched - configmaps = await ek_client.api("v1").resource("configmaps") - for configmap in addon.list_configmaps(): - await configmaps.patch( - configmap, - {"metadata": {"labels": {settings.watch_label: ""}}}, - namespace = addon.metadata.namespace - ) - secrets = await ek_client.api("v1").resource("secrets") - for secret in addon.list_secrets(): - await secrets.patch( - secret, - {"metadata": {"labels": {settings.watch_label: ""}}}, - namespace = addon.metadata.namespace - ) - # If the cloud identity is a secret or configmap in the same namespace, watch it - if ( - cloud_identity and - cloud_identity["apiVersion"] == "v1" and - cloud_identity["kind"] in {"ConfigMap", "Secret"} and - cloud_identity["metadata"]["namespace"] == cluster.metadata.namespace and - settings.watch_label not in cloud_identity.metadata.get("labels", {}) - ): - cloud_identity = await ek_client.patch_object( - cloud_identity, - {"metadata": {"labels": {settings.watch_label: ""}}} - ) - # Ensure that the cluster is in a suitable state to proceed - # For bootstrap addons, just wait for the control plane to be initialised - # For non-bootstrap addons, wait for the cluster to be ready - ready = utils.check_condition( - cluster, - "ControlPlaneInitialized" if addon.spec.bootstrap else "Ready" + # If the cloud identity is a secret or configmap in the same namespace, watch it + if ( + cloud_identity and + cloud_identity["apiVersion"] == "v1" and + cloud_identity["kind"] in {"ConfigMap", "Secret"} and + cloud_identity["metadata"]["namespace"] == cluster.metadata.namespace and + settings.watch_label not in cloud_identity.metadata.get("labels", {}) + ): + cloud_identity = await client.patch_object( + cloud_identity, + {"metadata": {"labels": {settings.watch_label: ""}}} ) - if not ready: - raise kopf.TemporaryError( - f"cluster '{addon.spec.cluster_name}' is not ready", - delay = settings.temporary_error_delay + # Ensure that the cluster is in a suitable state to proceed + # For bootstrap addons, just wait for the control plane to be initialised + # For non-bootstrap addons, wait for the cluster to be ready + ready = utils.check_condition( + cluster, + "ControlPlaneInitialized" if addon.spec.bootstrap else "Ready" + ) + if not ready: + logger.warn("cluster '%s' is not ready", addon.spec.cluster_name) + return addon, runtime.Result(True, settings.temporary_error_delay) + # The kubeconfig for the cluster is in a secret + secret = await secrets.fetch( + f"{cluster.metadata.name}-kubeconfig", + namespace = cluster.metadata.namespace + ) + # Get easykube and Helm clients for the target cluster and use them to deploy the addon + async with clients_for_cluster(secret) as (client_target, helm_client): + # If the addon is deleted while an install or upgrade is in progress, cancel it + install_upgrade_task = asyncio.create_task( + addon.install_or_upgrade( + template_loader, + client, + client_target, + helm_client, + cluster, + infra_cluster, + cloud_identity ) - # The kubeconfig for the cluster is in a secret - secret = await k8s.Secret(ek_client).fetch( - f"{cluster.metadata.name}-kubeconfig", - namespace = cluster.metadata.namespace ) - # Get easykube and Helm clients for the target cluster and use them to deploy the addon - async with clients_for_cluster(secret) as (ek_client_target, helm_client): - # If the addon is deleted while an install or upgrade is in progress, cancel it - install_upgrade_task = asyncio.create_task( - addon.install_or_upgrade( - template_loader, - ek_client, - ek_client_target, - helm_client, - cluster, - infra_cluster, - cloud_identity - ) - ) - wait_for_delete_task = asyncio.create_task(until_deleted(addon)) - done, pending = await asyncio.wait( - { install_upgrade_task, wait_for_delete_task }, - return_when = asyncio.FIRST_COMPLETED - ) - # Cancel the pending tasks and give them a chance to clean up - for task in pending: - task.cancel() - try: - _ = await task - except asyncio.CancelledError: - continue - # Ensure that any exceptions from the completed tasks are raised - for task in done: + wait_for_delete_task = asyncio.create_task(until_addon_deleted(client, addon)) + done, pending = await asyncio.wait( + { install_upgrade_task, wait_for_delete_task }, + return_when = asyncio.FIRST_COMPLETED + ) + # Cancel the pending tasks and give them a chance to clean up + for task in pending: + task.cancel() + try: _ = await task - # Handle expected errors by converting them to kopf errors - # This suppresses the stack trace in logs/events - # Let unexpected errors bubble without suppressing the stack trace so they can be debugged - except ApiError as exc: - # 404 and 409 are recoverable - if exc.status_code in {404, 409}: - raise kopf.TemporaryError(str(exc), delay = settings.temporary_error_delay) - # All other 4xx errors are probably permanent, as they are client errors - # They can likely only be resolved by changing the spec - elif 400 <= exc.status_code < 500: - addon.set_phase(api.AddonPhase.FAILED, str(exc)) - await addon.save_status(ek_client) - raise kopf.PermanentError(str(exc)) - else: - raise - except ( - helm_errors.ConnectionError, - helm_errors.CommandCancelledError - ) as exc: - # Assume connection errors are temporary - raise kopf.TemporaryError(str(exc), delay = settings.temporary_error_delay) - except ( - helm_errors.ChartNotFoundError, - helm_errors.FailedToRenderChartError, - helm_errors.ResourceAlreadyExistsError, - helm_errors.InvalidResourceError - ) as exc: - # These are permanent errors that can only be resolved by changing the spec + except asyncio.CancelledError: + continue + # Ensure that any exceptions from the completed tasks are raised + for task in done: + _ = await task + # Handle expected errors by converting them to results + # This suppresses the stack trace in logs/events + # Let unexpected errors bubble without suppressing the stack trace so they can be debugged + except ApiError as exc: + # 404 and 409 are recoverable + if exc.status_code in {404, 409}: + logger.warn(str(exc)) + return addon, runtime.Result(True, settings.temporary_error_delay) + # All other 4xx errors are probably permanent, as they are client errors + # They can likely only be resolved by changing the spec + elif 400 <= exc.status_code < 500: + logger.error(str(exc)) addon.set_phase(api.AddonPhase.FAILED, str(exc)) - await addon.save_status(ek_client) - raise kopf.PermanentError(str(exc)) + await addon.save_status(client) + return addon, runtime.Result() + else: + raise + except ( + helm_errors.ConnectionError, + helm_errors.CommandCancelledError + ) as exc: + logger.warn(str(exc)) + return addon, runtime.Result(True, settings.temporary_error_delay) + except ( + helm_errors.ChartNotFoundError, + helm_errors.FailedToRenderChartError, + helm_errors.ResourceAlreadyExistsError, + helm_errors.InvalidResourceError + ) as exc: + # These are permanent errors that can only be resolved by changing the spec + logger.error(str(exc)) + addon.set_phase(api.AddonPhase.FAILED, str(exc)) + await addon.save_status(client) + return addon, runtime.Result() + else: + return addon, runtime.Result() -@addon_handler(kopf.on.delete) -async def handle_addon_deleted(addon, **kwargs): +async def reconcile_addon_delete( + client: AsyncClient, + addon: api.Addon, + logger: logging.Logger +) -> t.Tuple[api.Addon, runtime.Result]: """ - Executes whenever an addon is deleted. + Reconciles the deletion of the given instance. """ - async with create_ek_client() as ek_client: - try: - ekapi = await ek_client.api_preferred_version("cluster.x-k8s.io") - resource = await ekapi.resource("clusters") - cluster = await resource.fetch( - addon.spec.cluster_name, - namespace = addon.metadata.namespace - ) - except ApiError as exc: - # If the cluster does not exist, we are done - if exc.status_code == 404: - return - else: - raise + try: + ekapi = await client.api_preferred_version("cluster.x-k8s.io") + resource = await ekapi.resource("clusters") + cluster = await resource.fetch( + addon.spec.cluster_name, + namespace = addon.metadata.namespace + ) + except ApiError as exc: + # If the cluster does not exist, we are done + if exc.status_code == 404: + return addon, runtime.Result() else: - # If the cluster is deleting, there is nothing for us to do - if cluster.status.phase == "Deleting": - return + raise + else: + # If the cluster is deleting, there is nothing for us to do + if cluster.status.phase == "Deleting": + return addon, runtime.Result() + secrets = await client.api("v1").resource("secrets") + try: + secret = await secrets.fetch( + f"{cluster.metadata.name}-kubeconfig", + namespace = cluster.metadata.namespace + ) + except ApiError as exc: + # If the kubeconfig does not exist, we are done + if exc.status_code == 404: + return addon, runtime.Result() + else: + raise + async with clients_for_cluster(secret) as (client_target, helm_client): try: - secret = await k8s.Secret(ek_client).fetch( - f"{cluster.metadata.name}-kubeconfig", - namespace = cluster.metadata.namespace - ) + await addon.uninstall(client, client_target, helm_client) except ApiError as exc: - # If the cluster does not exist, we are done - if exc.status_code == 404: - return + if exc.status_code == 409: + logger.warn(str(exc)) + return addon, runtime.Result(True, settings.temporary_error_delay) else: raise - clients = clients_for_cluster(secret) - async with clients as (ek_client_target, helm_client): + except helm_errors.ReleaseNotFoundError: + # If the release doesn't exist, we are done + return addon, runtime.Result() + except helm_errors.ConnectionError as exc: + # Assume connection errors are temporary + logger.warn(str(exc)) + return addon, runtime.Result(True, settings.temporary_error_delay) + else: + return addon, runtime.Result() + + +async def reconcile_addon( + model: t.Type[api.Addon], + client: AsyncClient, + request: runtime.Request +) -> t.Optional[runtime.Result]: + """ + Handles a request to reconcile an addon. + """ + model_logger = runtime_utils.AnnotatedLogger(logger, settings.api_group, model) + mapper = runtime_utils.Mapper(client, settings.api_group, model) + addon = await mapper.fetch(request) + if not addon: + model_logger.info("Could not find instance", extra = {"instance": request.key}) + return runtime.Result() + addon_logger = runtime_utils.AnnotatedLogger(logger, settings.api_group, addon) + if not addon.metadata.deletion_timestamp: + addon = await mapper.ensure_finalizer(addon, settings.api_group) + _, result = await reconcile_addon_normal(client, addon, addon_logger) + return result + else: + addon, result = await reconcile_addon_delete(client, addon, addon_logger) + # If a delete is reconciled without a requeue, assume we can remove the finalizer + if not result.requeue: try: - await addon.uninstall(ek_client, ek_client_target, helm_client) + await mapper.remove_finalizer(addon, settings.api_group) except ApiError as exc: if exc.status_code == 409: - raise kopf.TemporaryError(str(exc), delay = settings.temporary_error_delay) + return runtime.Result(True, settings.temporary_error_delay) else: raise - except helm_errors.ReleaseNotFoundError: - # If the release doesn't exist, we are done - return - except helm_errors.ConnectionError as exc: - # Assume connection errors are temporary - raise kopf.TemporaryError(str(exc), delay = settings.temporary_error_delay) + return result -def compute_checksum(data): +async def reconcile_config( + kind: t.Literal["ConfigMap", "Secret"], + client: AsyncClient, + request: runtime.Request +) -> t.Optional[runtime.Result]: """ - Compute the checksum of the given data from a configmap or secret. - """ - data_str = ";".join(sorted(f"{k}={v}" for k, v in data.items())).encode() - return hashlib.sha256(data_str).hexdigest() - - -async def handle_config_event(ek_client, type, name, namespace, body, annotation_prefix): - """ - Handles an event for a watched configmap or secret by updating annotations - using the specified prefix. + Handles a request to reconcile a configmap or secret by annotating addons that use the + config with the current checksum of the data. """ + resource = await client.api("v1").resource(kind) + try: + config = await resource.fetch(request.name, namespace = request.namespace) + except ApiError as exc: + if exc.status_code == 404: + config = None + else: + raise # Compute the annotation value - if type == "DELETED": + if not config or config.metadata.get("deletionTimestamp"): # If the config was deleted, indicate that in the annotation annotation_value = "deleted" else: # Otherwise, compute the checksum of the config data - data = body.get("data", {}) + data = config.get("data", {}) data_str = ";".join(sorted(f"{k}={v}" for k, v in data.items())).encode() annotation_value = hashlib.sha256(data_str).hexdigest() - # Update any addons that depend on the config - for crd in registry: - preferred_version = next(k for k, v in crd.versions.items() if v.storage) - ekapi = ek_client.api(f"{crd.api_group}/{preferred_version}") - resource = await ekapi.resource(crd.plural_name) - async for addon_obj in resource.list( - labels = {f"{annotation_prefix}/{name}": PRESENT}, - namespace = namespace - ): - addon = registry.get_model_instance(addon_obj) - annotation = f"{annotation_prefix}/{name}" + # Calculate the search labels and checksum annotation names + prefix = ( + settings.configmap_annotation_prefix + if kind == "ConfigMap" + else settings.secret_annotation_prefix + ) + search_labels = {f"{request.name}.{prefix}/uses": PRESENT} + checksum_annotation = f"{request.name}.{prefix}/checksum" + # Annotate any addons that depend on the config + for model in [api.Manifests, api.HelmRelease]: + ekapi = client.api(f"{settings.api_group}/{model._meta.version}") + resource = await ekapi.resource(model._meta.plural_name) + addons = resource.list(labels = search_labels, namespace = request.namespace) + async for addon in addons: # If the annotation is already set to the correct value, we are done - if addon.metadata.annotations.get(annotation) == annotation_value: + if addon.metadata.get("annotations", {}).get(checksum_annotation) == annotation_value: continue # Otherwise, try to patch the annotation for the addon try: _ = await resource.patch( addon.metadata.name, - {"metadata": {"annotations": {annotation: annotation_value}}}, + {"metadata": {"annotations": {checksum_annotation: annotation_value}}}, namespace = addon.metadata.namespace ) except ApiError as exc: # If the addon got deleted, that is fine if exc.status_code != 404: raise + return runtime.Result() -@kopf.on.event("configmap", labels = { settings.watch_label: kopf.PRESENT }) -async def handle_configmap_event(type, name, namespace, body, **kwargs): +async def run(): """ - Executes every time a watched configmap is changed. + Runs the operator logic. """ - async with create_ek_client() as ek_client: - await handle_config_event( - ek_client, - type, - name, - namespace, - body, - settings.configmap_annotation_prefix + logger.info("Creating Kubernetes client") + config = Configuration.from_environment(json_encoder = pydantic_encoder) + client = config.async_client(default_field_manager = settings.easykube_field_manager) + + async with client: + logger.info("Registering CRDs") + await runtime_utils.register_crds( + client, + settings.api_group, + models, + categories = settings.crd_categories ) + # Create a manager + manager = runtime.Manager() -@kopf.on.event("secret", labels = { settings.watch_label: kopf.PRESENT }) -async def handle_secret_event(type, name, namespace, body, **kwargs): + # Register the controllers + logger.info("Creating Manifests controller") + runtime_utils.create_controller_for_model( + manager, + settings.api_group, + api.Manifests, + functools.partial(reconcile_addon, api.Manifests) + ) + logger.info("Creating HelmRelease controller") + runtime_utils.create_controller_for_model( + manager, + settings.api_group, + api.HelmRelease, + functools.partial(reconcile_addon, api.HelmRelease) + ) + logger.info("Creating ConfigMap controller") + manager.create_controller( + "v1", + "ConfigMap", + functools.partial(reconcile_config, "ConfigMap"), + labels = {settings.watch_label: PRESENT} + ) + logger.info("Creating Secret controller") + manager.create_controller( + "v1", + "Secret", + functools.partial(reconcile_config, "Secret"), + labels = {settings.watch_label: PRESENT} + ) + + # Run the manager + logger.info("Starting manager") + await manager.run(client) + + +def main(): """ - Executes every time a watched secret is changed. + Launches the operator using asyncio. """ - async with create_ek_client() as ek_client: - await handle_config_event( - ek_client, - type, - name, - namespace, - body, - settings.secret_annotation_prefix - ) + # Configure the logging + settings.logging.apply() + # Run the operator using the default loop + asyncio.run(run()) diff --git a/requirements.txt b/requirements.txt index 5a37341..dc34b72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,31 +1,20 @@ -aiohttp==3.9.5 -aiosignal==1.3.1 annotated-types==0.7.0 anyio==4.4.0 -async-timeout==4.0.3 -attrs==23.2.0 certifi==2024.7.4 -charset-normalizer==3.3.2 click==8.1.7 configomatic==0.3.1 -easykube==0.3.3 +-e git+https://github.com/stackhpc/easykube.git@feat/k8s-controller#egg=easykube exceptiongroup==1.2.2 -frozenlist==1.4.1 h11==0.14.0 httpcore==1.0.5 httpx==0.27.0 idna==3.7 -iso8601==2.1.0 Jinja2==3.1.4 -kopf==1.37.2 -kube-custom-resource==0.4.0 +-e git+https://github.com/stackhpc/kube-custom-resource.git@feat/runtime-utils#egg=kube_custom_resource MarkupSafe==2.1.5 -multidict==6.0.5 pydantic==2.8.2 pydantic_core==2.20.1 -pyhelm3==0.3.3 -python-json-logger==2.0.7 +pyhelm3==0.3.4 PyYAML==6.0.1 sniffio==1.3.1 typing_extensions==4.12.2 -yarl==1.9.4 diff --git a/setup.cfg b/setup.cfg index db71eeb..280db8b 100755 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,10 @@ install_requires = easykube jinja2 kube-custom-resource - kopf pydantic pyhelm3 pyyaml + +[options.entry_points] +console_scripts = + cluster-api-addon-provider = capi_addons.operator:main From 6fca8de8ddcdc158a8ac0044aeeebe0ecaf7d926 Mon Sep 17 00:00:00 2001 From: Matt Pryor Date: Tue, 23 Jul 2024 11:53:56 +0100 Subject: [PATCH 2/2] Include git in builder image for now --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index d69712a..620809d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,7 @@ RUN set -ex; \ FROM ubuntu:jammy AS python-builder RUN apt-get update && \ - apt-get install -y python3 python3-venv && \ + apt-get install -y git python3 python3-venv && \ rm -rf /var/lib/apt/lists/* RUN python3 -m venv /venv && \