From cad8a2ab2863cd33ac3b31eb6e1be4926c9bfb37 Mon Sep 17 00:00:00 2001 From: elementl Date: Fri, 17 Jan 2025 19:58:47 +0000 Subject: [PATCH] Release 1.9.9 (tracks 1786e0c997605854e1b85405d4c1572e83891678) --- .../deployment/alert_policies/commands.py | 3 - .../alert_policies/config_schema.py | 490 ------------------ .../dagster_cloud_cli/version.py | 2 +- dagster-cloud-cli/pyproject.toml | 2 +- .../dagster_cloud_examples/version.py | 2 +- dagster-cloud-examples/pyproject.toml | 2 +- dagster-cloud/dagster_cloud/version.py | 2 +- .../dagster_cloud/workspace/ecs/launcher.py | 30 +- .../workspace/ecs/run_launcher.py | 1 + .../dagster_cloud/workspace/ecs/utils.py | 10 +- .../user_code_launcher/user_code_launcher.py | 2 +- dagster-cloud/pyproject.toml | 4 +- 12 files changed, 46 insertions(+), 504 deletions(-) delete mode 100644 dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py diff --git a/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/commands.py b/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/commands.py index d4058c4..c5a5a08 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/commands.py +++ b/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/commands.py @@ -6,7 +6,6 @@ from .... import gql, ui from ....config_utils import dagster_cloud_options -from .config_schema import INSIGHTS_ALERT_POLICIES_SCHEMA, process_alert_policies_config DEFAULT_ALERT_POLICIES_YAML_FILENAME = "alert_policies.yaml" @@ -47,8 +46,6 @@ def sync_command( config = yaml.load(f.read(), Loader=yaml.SafeLoader) try: - process_alert_policies_config(config, INSIGHTS_ALERT_POLICIES_SCHEMA) - alert_policies = gql.reconcile_alert_policies(client, config) ui.print(f"Synced alert policies: {', '.join(alert_policies)}") diff --git a/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py b/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py deleted file mode 100644 index b578a24..0000000 --- a/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py +++ /dev/null @@ -1,490 +0,0 @@ -from typing import Any - -import dagster._check as check -from dagster import Array, Enum, EnumValue, Field, Selector, Shape -from dagster._config import validate_config -from dagster._core.definitions.repository_definition.valid_definitions import ( - SINGLETON_REPOSITORY_NAME, -) - -from dagster_cloud_cli.core.alert_types import InsightsAlertComparisonOperator - - -def validate_alert_policy_config(alert_policy_config, schema: Any): - validation = validate_config(schema, alert_policy_config) - return [error.message for error in validation.errors] if validation.errors else [] - - -def validate_alert_policies_config(alert_policies_config, schema: Any): - validation = validate_config(schema, alert_policies_config) - return [error.message for error in validation.errors] if validation.errors else [] - - -def process_alert_policies_config(alert_policies_config, schema: Any): - validation = validate_config(ALERT_POLICIES_SCHEMA, alert_policies_config) - - check.invariant( - validation.success, - ", ".join([error.message for error in validation.errors] if validation.errors else []), - ) - - # Validate each individual alert policy - for alert_policy_config in alert_policies_config["alert_policies"]: - check.invariant( - "tags" not in alert_policy_config or len(alert_policy_config["tags"]) > 0, - "When setting tags for an alert policy, the configuration " - "must contain at least one tag.", - ) - - email_notification_service = alert_policy_config["notification_service"].get("email") - - # Validate email alerts - if email_notification_service: - check.invariant( - len(email_notification_service["email_addresses"]) > 0, - "When creating an alert policy to send email alerts, " - "the configuration must contain at least one email address.", - ) - - -TAGS_FIELD = Field( - config=Array( - Shape( - fields={ - "key": Field( - config=str, - is_required=True, - description="Specify a tag key.", - ), - "value": Field( - config=str, - is_required=True, - description="Specify a tag value.", - ), - }, - description="A tag key-value pair.", - ) - ), - description=( - "The alert policy will apply to code artifacts that have all the specified tags." - " When tags are explicitly omitted, this alert policy will apply to all code" - " artifacts." - ), - is_required=False, -) - -TARGET_TYPES_SCHEMA = { - "asset_group_target": Field( - config=Shape( - fields={ - "asset_group": Field( - config=str, - is_required=True, - description="The name of the asset group.", - ), - "location_name": Field( - config=str, - is_required=True, - description=("The name of the code location that contains the asset" " group."), - ), - "repo_name": Field( - config=str, - is_required=False, - description=( - "The name of the repository that contains the asset" - " group. Only required if there are multiple" - " repositories with the same code location." - ), - default_value=SINGLETON_REPOSITORY_NAME, - ), - } - ) - ), - "asset_selection_target": Field( - config=Shape( - fields={ - "asset_selection": Field( - config=str, - is_required=True, - description="The asset selection to target.", - ), - "location_name": Field( - config=str, - is_required=False, - description=("The name of the code location to target. Optional."), - ), - "repo_name": Field( - config=str, - is_required=False, - description=("The name of the repository to target. Optional."), - ), - } - ) - ), - "asset_key_target": Field( - config=Shape( - fields={ - "asset_key": Field( - config=Array(str), - is_required=True, - description="The key of the asset.", - ) - } - ) - ), - "long_running_job_threshold_target": Field( - config=Shape( - fields={ - "threshold_seconds": Field( - config=float, - is_required=True, - description="The threshold value to alert if exceeded.", - ), - "tags": TAGS_FIELD, - } - ) - ), - "run_result_target": Field( - config=Shape( - fields={ - "tags": TAGS_FIELD, - } - ) - ), -} - - -insights_operator_enum = Enum.from_python_enum(InsightsAlertComparisonOperator) - - -INSIGHTS_TARGET_TYPES_SCHEMA = { - **TARGET_TYPES_SCHEMA, - "insights_deployment_threshold_target": Field( - config=Shape( - fields={ - "metric_name": Field( - config=str, - is_required=True, - description="The name of the metric to target.", - ), - "threshold": Field( - config=float, - is_required=True, - description="The threshold value to alert if exceeded.", - ), - "selection_period_days": Field( - config=int, - is_required=True, - description="The number of days to use for the selection period.", - ), - "operator": Field( - config=insights_operator_enum, - is_required=True, - description="The operator to use for the threshold comparison.", - ), - } - ) - ), - "insights_asset_group_threshold_target": Field( - config=Shape( - fields={ - "metric_name": Field( - config=str, - is_required=True, - description="The name of the metric to target.", - ), - "threshold": Field( - config=float, - is_required=True, - description="The threshold value to alert if exceeded.", - ), - "selection_period_days": Field( - config=int, - is_required=True, - description="The number of days to use for the selection period.", - ), - "operator": Field( - config=insights_operator_enum, - is_required=True, - description="The operator to use for the threshold comparison.", - ), - "asset_group": Shape( - fields={ - "location_name": Field( - config=str, - is_required=True, - description="The name of the code location that contains the asset group.", - ), - "asset_group_name": Field( - config=str, - is_required=True, - description="The name of the asset group.", - ), - "repo_name": Field( - config=str, - is_required=False, - description=( - "The name of the repository that contains the asset group." - ), - default_value=SINGLETON_REPOSITORY_NAME, - ), - } - ), - } - ) - ), - "insights_asset_threshold_target": Field( - config=Shape( - fields={ - "metric_name": Field( - config=str, - is_required=True, - description="The name of the metric to target.", - ), - "threshold": Field( - config=float, - is_required=True, - description="The threshold value to alert if exceeded.", - ), - "selection_period_days": Field( - config=int, - is_required=True, - description="The number of days to use for the selection period.", - ), - "operator": Field( - config=insights_operator_enum, - is_required=True, - description="The operator to use for the threshold comparison.", - ), - "asset_key": Field( - config=Array(str), - is_required=True, - description="The key of the asset.", - ), - } - ) - ), - "insights_job_threshold_target": Field( - config=Shape( - fields={ - "metric_name": Field( - config=str, - is_required=True, - description="The name of the metric to target.", - ), - "threshold": Field( - config=float, - is_required=True, - description="The threshold value to alert if exceeded.", - ), - "selection_period_days": Field( - config=int, - is_required=True, - description="The number of days to use for the selection period.", - ), - "operator": Field( - config=insights_operator_enum, - is_required=True, - description="The operator to use for the threshold comparison.", - ), - "job": Shape( - fields={ - "job_name": Field( - config=str, - is_required=True, - description="The name of the job.", - ), - "location_name": Field( - config=str, - is_required=True, - description="The name of the code location that contains the job.", - ), - "repo_name": Field( - config=str, - is_required=False, - description=("The name of the repository that contains the job."), - default_value=SINGLETON_REPOSITORY_NAME, - ), - } - ), - } - ) - ), - "credit_limit_target": Field(config={}), -} - -ALERT_EVENT_TYPES = [ - EnumValue("JOB_FAILURE", description="Alert on job failure."), - EnumValue("JOB_SUCCESS", description="Alert on job success."), - EnumValue("JOB_LONG_RUNNING", description="Alert on job running past a specified time limit."), - EnumValue("TICK_FAILURE", description="Alert on schedule/sensor failure."), - EnumValue("AGENT_UNAVAILABLE", description="Alert on agent downtime."), - EnumValue("CODE_LOCATION_ERROR", description="Alert on code location error."), - EnumValue( - "ASSET_MATERIALIZATION_SUCCESS", - description="Alert when an asset successfully materializes.", - ), - EnumValue( - "ASSET_MATERIALIZATION_FAILURE", - description=("Alert when a planned asset materialization fails to occur."), - ), - EnumValue("ASSET_CHECK_PASSED", description="Alert on asset check success."), - EnumValue( - "ASSET_CHECK_EXECUTION_FAILURE", - description=("Alert when a planned asset check fails before it evaluates."), - ), - EnumValue( - "ASSET_CHECK_SEVERITY_WARN", - description=("Alert when a planned asset check fails with severity warn."), - ), - EnumValue( - "ASSET_CHECK_SEVERITY_ERROR", - description=("Alert when a planned asset check fails with severity error."), - ), - EnumValue( - "ASSET_OVERDUE", - description="Alert when an asset is overdue, based on its freshness policy.", - ), -] - -INSIGHTS_ALERT_EVENT_TYPES = [ - *ALERT_EVENT_TYPES, - EnumValue( - "INSIGHTS_CONSUMPTION_EXCEEDED", - description="Alert when insights consumption exceeds the threshold.", - ), -] - -ALERT_POLICY_SCHEMA, INSIGHTS_ALERT_POLICY_SCHEMA = [ - Shape( - fields={ - "name": Field( - config=str, - is_required=True, - description="Alert policy name.", - ), - "description": Field( - config=str, - default_value="", - description="Description of alert policy", - ), - "tags": TAGS_FIELD, - "event_types": Field( - config=Array( - Enum( - name="AlertPolicyEventType", - enum_values=event_types, - ) - ), - description="The selected system event types that will trigger the alert policy.", - ), - "notification_service": Field( - Selector( - fields={ - "email": Field( - config=Shape( - fields={ - "email_addresses": Field( - config=Array(str), - is_required=True, - description="Email addresses to send alerts to.", - ) - } - ), - description=( - "Details to customize email notifications for this alert policy." - ), - ), - "slack": Field( - config=Shape( - fields={ - "slack_workspace_name": Field( - config=str, - is_required=True, - description="The name of your slack workspace.", - ), - "slack_channel_name": Field( - config=str, - is_required=True, - description=( - "The name of the slack channel in which to post alerts." - ), - ), - } - ) - ), - "email_owners": Field( - config=Shape( - fields={ - "default_email_addresses": Field( - config=Array(str), - is_required=False, - default_value=[], - description="Default email addresses to send notifications for assets without defined owners.", - ) - } - ) - ), - "microsoft_teams": Field( - config=Shape( - fields={ - "webhook_url": Field( - config=str, - is_required=True, - description="The incoming webhook URL for your Microsoft Team connector. " - "Must match the form https://xxxxx.webhook.office.com/xxxxx", - ) - } - ) - ), - "pagerduty": Field( - config=Shape( - fields={ - "integration_key": Field( - config=str, - is_required=True, - description="The integration key for your PagerDuty app.", - ) - } - ) - ), - } - ), - is_required=True, - description="Configure how the alert policy should send a notification.", - ), - "enabled": Field( - config=bool, - default_value=True, - description="Whether the alert policy is active or not.", - ), - "alert_targets": Field( - config=Array( - Selector( - fields=target_types_schema, - description=( - "Information for targeting events for this alert policy. If no target is" - " specified, the alert policy will apply to all events of a particular" - " type." - ), - ), - ), - is_required=False, - ), - }, - description="Details to customize an alert policy in Dagster Cloud.", - ) - for target_types_schema, event_types in ( - (TARGET_TYPES_SCHEMA, ALERT_EVENT_TYPES), - (INSIGHTS_TARGET_TYPES_SCHEMA, INSIGHTS_ALERT_EVENT_TYPES), - ) -] - -INSIGHTS_ALERT_POLICIES_SCHEMA, ALERT_POLICIES_SCHEMA = [ - Shape( - fields={ - "alert_policies": Array(alert_policy_schema), - } - ) - for alert_policy_schema in (INSIGHTS_ALERT_POLICY_SCHEMA, ALERT_POLICY_SCHEMA) -] diff --git a/dagster-cloud-cli/dagster_cloud_cli/version.py b/dagster-cloud-cli/dagster_cloud_cli/version.py index 6394d54..5986f10 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/version.py +++ b/dagster-cloud-cli/dagster_cloud_cli/version.py @@ -1 +1 @@ -__version__ = "1.9.8" +__version__ = "1.9.9" diff --git a/dagster-cloud-cli/pyproject.toml b/dagster-cloud-cli/pyproject.toml index 6a3d839..42cfed8 100644 --- a/dagster-cloud-cli/pyproject.toml +++ b/dagster-cloud-cli/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ "Operating System :: OS Independent", ] dependencies = [ - "dagster==1.9.8", + "dagster==1.9.9", "packaging>=20.9", "questionary", "requests", diff --git a/dagster-cloud-examples/dagster_cloud_examples/version.py b/dagster-cloud-examples/dagster_cloud_examples/version.py index 6394d54..5986f10 100644 --- a/dagster-cloud-examples/dagster_cloud_examples/version.py +++ b/dagster-cloud-examples/dagster_cloud_examples/version.py @@ -1 +1 @@ -__version__ = "1.9.8" +__version__ = "1.9.9" diff --git a/dagster-cloud-examples/pyproject.toml b/dagster-cloud-examples/pyproject.toml index ed92c31..1616452 100644 --- a/dagster-cloud-examples/pyproject.toml +++ b/dagster-cloud-examples/pyproject.toml @@ -19,7 +19,7 @@ classifiers = [ "Operating System :: OS Independent", ] dependencies = [ - "dagster-cloud==1.9.8", + "dagster-cloud==1.9.9", ] [project.license] diff --git a/dagster-cloud/dagster_cloud/version.py b/dagster-cloud/dagster_cloud/version.py index 6394d54..5986f10 100644 --- a/dagster-cloud/dagster_cloud/version.py +++ b/dagster-cloud/dagster_cloud/version.py @@ -1 +1 @@ -__version__ = "1.9.8" +__version__ = "1.9.9" diff --git a/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py b/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py index 9dc8c5e..ac5fa9f 100644 --- a/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py +++ b/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py @@ -84,6 +84,8 @@ def __init__( run_ecs_tags: Optional[Sequence[Mapping[str, Optional[str]]]] = None, server_health_check: Optional[Mapping[str, Any]] = None, enable_ecs_exec=False, + server_task_definition_prefix: str = "server", + run_task_definition_prefix: str = "run", **kwargs, ): self.ecs = boto3.client("ecs") @@ -141,6 +143,22 @@ def __init__( run_sidecar_containers, "run_sidecar_containers" ) + self.server_task_definition_prefix = check.str_param( + server_task_definition_prefix, "server_task_definition_prefix" + ) + check.invariant( + len(self.server_task_definition_prefix) <= 16, + "server_task_definition_prefix must be at most 16 characters", + ) + self.run_task_definition_prefix = check.str_param( + run_task_definition_prefix, "run_task_definition_prefix" + ) + + check.invariant( + len(self.run_task_definition_prefix) <= 16, + "run_task_definition_prefix must be at most 16 characters", + ) + self.server_ecs_tags = check.opt_sequence_param(server_ecs_tags, "server_ecs_tags") self.run_ecs_tags = check.opt_sequence_param(run_ecs_tags, "run_ecs_tags") @@ -269,6 +287,12 @@ def config_type(cls): is_required=False, default_value=False, ), + "server_task_definition_prefix": Field( + str, is_required=False, default_value="server" + ), + "run_task_definition_prefix": Field( + str, is_required=False, default_value="dagsterrun" + ), }, SHARED_ECS_CONFIG, SHARED_USER_CODE_LAUNCHER_CONFIG, @@ -428,7 +452,10 @@ def _start_new_server_spinup( self._logger.info(f"Creating a new service for {deployment_name}:{location_name}...") family = get_server_task_definition_family( - self._instance.organization_name, deployment_name, location_name + self.server_task_definition_prefix, + self._instance.organization_name, + deployment_name, + location_name, ) system_tags = {**self._get_dagster_tags(deployment_name, location_name), **tags} @@ -696,6 +723,7 @@ def _run_launcher_kwargs(self) -> Dict[str, Any]: run_ecs_tags=self.run_ecs_tags, container_name=CONTAINER_NAME, run_resources=self.run_resources, + task_definition_prefix=self.run_task_definition_prefix, ) def run_launcher(self) -> CloudEcsRunLauncher: # pyright: ignore[reportIncompatibleMethodOverride], fix me! diff --git a/dagster-cloud/dagster_cloud/workspace/ecs/run_launcher.py b/dagster-cloud/dagster_cloud/workspace/ecs/run_launcher.py index 722ef4b..a3c0665 100644 --- a/dagster-cloud/dagster_cloud/workspace/ecs/run_launcher.py +++ b/dagster-cloud/dagster_cloud/workspace/ecs/run_launcher.py @@ -9,6 +9,7 @@ class CloudEcsRunLauncher(EcsRunLauncher[DagsterCloudAgentInstance]): def _get_run_task_definition_family(self, run) -> str: return get_run_task_definition_family( + self._task_definition_prefix, self._instance.organization_name, check.not_none(self._instance.deployment_name), check.not_none(run.remote_job_origin), diff --git a/dagster-cloud/dagster_cloud/workspace/ecs/utils.py b/dagster-cloud/dagster_cloud/workspace/ecs/utils.py index 0c8325c..12a4119 100644 --- a/dagster-cloud/dagster_cloud/workspace/ecs/utils.py +++ b/dagster-cloud/dagster_cloud/workspace/ecs/utils.py @@ -33,6 +33,7 @@ def _get_family_hash(name, max_length=32, hash_size=8): def get_server_task_definition_family( + task_definition_prefix: str, organization_name: Optional[str], deployment_name: str, location_name: str, @@ -43,9 +44,12 @@ def get_server_task_definition_family( m = hashlib.sha1() m.update(location_name.encode("utf-8")) + # '{16}_{64}_{64}_{64}': max 211 characters truncated_location_name = _get_family_hash(location_name, max_length=64) - final_family = f"server_{organization_name}_{deployment_name}_{truncated_location_name}" + final_family: str = ( + f"{task_definition_prefix}_{organization_name}_{deployment_name}_{truncated_location_name}" + ) assert len(final_family) <= 255 @@ -53,6 +57,7 @@ def get_server_task_definition_family( def get_run_task_definition_family( + task_definition_prefix: str, organization_name: Optional[str], deployment_name: str, job_origin: RemoteJobOrigin, @@ -64,12 +69,13 @@ def get_run_task_definition_family( repo_name = job_origin.repository_origin.repository_name location_name = job_origin.repository_origin.code_location_origin.location_name + assert len(task_definition_prefix) <= 16 assert len(str(organization_name)) <= 64 assert len(deployment_name) <= 64 # '{16}_{64}_{64}_{32}_{32}_{32}': max 245 characters - final_family = f"run_{organization_name}_{deployment_name}_{_get_family_hash(location_name)}_{_get_family_hash(repo_name)}_{_get_family_hash(job_name)}" + final_family = f"{task_definition_prefix}_{organization_name}_{deployment_name}_{_get_family_hash(location_name)}_{_get_family_hash(repo_name)}_{_get_family_hash(job_name)}" assert len(final_family) <= 255 diff --git a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py index cbe795d..a5f6f1b 100644 --- a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py +++ b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py @@ -655,7 +655,7 @@ def _ensure_snapshot_uploaded( file = _file_for_format(object_bytes, upload_data.format) response = self._instance.requests_managed_retries_session.put( url=upload_data.presigned_put_url, - files={"file": file}, + data=file, ) raise_http_error(response) diff --git a/dagster-cloud/pyproject.toml b/dagster-cloud/pyproject.toml index d34442b..7e53f22 100644 --- a/dagster-cloud/pyproject.toml +++ b/dagster-cloud/pyproject.toml @@ -29,8 +29,8 @@ classifiers = [ "Operating System :: OS Independent", ] dependencies = [ - "dagster==1.9.8", - "dagster-cloud-cli==1.9.8", + "dagster==1.9.9", + "dagster-cloud-cli==1.9.9", "opentelemetry-api>=1.28.2,<2", "opentelemetry-sdk>=1.28.2,<2", "opentelemetry-exporter-otlp-proto-grpc>=1.28.2,<2",