From dfdcc7a7778835dc2def0d2e2947f46b21469a2f Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 2 Jan 2025 09:33:27 -0800 Subject: [PATCH 1/9] poc --- .../snowpark/_internal/analyzer/analyzer.py | 39 ++++++++++++-- .../snowpark/_internal/analyzer/expression.py | 22 +++++++- .../_internal/analyzer/select_statement.py | 4 ++ .../_internal/analyzer/snowflake_plan.py | 51 ++++++++++++++++++- src/snowflake/snowpark/_internal/utils.py | 21 ++++++++ src/snowflake/snowpark/column.py | 4 +- src/snowflake/snowpark/dataframe.py | 22 ++++++-- 7 files changed, 149 insertions(+), 14 deletions(-) diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer.py b/src/snowflake/snowpark/_internal/analyzer/analyzer.py index 328a7d5d0b3..7bc3a267dad 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer.py @@ -151,7 +151,10 @@ ) from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages from snowflake.snowpark._internal.telemetry import TelemetryField -from snowflake.snowpark._internal.utils import quote_name +from snowflake.snowpark._internal.utils import ( + quote_name, + merge_multiple_dicts_with_assertion, +) from snowflake.snowpark.types import BooleanType, _NumericType ARRAY_BIND_THRESHOLD = 512 @@ -165,8 +168,11 @@ def __init__(self, session: "snowflake.snowpark.session.Session") -> None: self.session = session self.plan_builder = SnowflakePlanBuilder(self.session) self.generated_alias_maps = {} + # key: expr_id, snowflake_plan_uuid ) -> value: alias + self.generated_alias_maps_v2 = {} self.subquery_plans = [] self.alias_maps_to_use: Optional[Dict[uuid.UUID, str]] = None + self.alias_maps_to_use_v2: Optional[Dict[uuid.UUID, str]] = None def analyze( self, @@ -368,7 +374,13 @@ def analyze( if isinstance(expr, Attribute): assert self.alias_maps_to_use is not None name = self.alias_maps_to_use.get(expr.expr_id, expr.name) - return quote_name(name) + name2 = self.alias_maps_to_use_v2.get( + (expr.expr_id, expr.snowflake_plan_uuid), expr.name + ) + if isinstance(name2, tuple): + name2 = name2[0] + print(name, name2) + return quote_name(name2) if isinstance(expr, UnresolvedAttribute): if expr.df_alias: @@ -630,7 +642,13 @@ def unary_expression_extractor( quoted_name = quote_name(expr.name) if isinstance(expr.child, Attribute): self.generated_alias_maps[expr.child.expr_id] = quoted_name + assert expr.child.snowflake_plan_uuid is not None + self.generated_alias_maps_v2[ + (expr.child.expr_id, expr.child.snowflake_plan_uuid) + ] = quoted_name assert self.alias_maps_to_use is not None + # TODO: + # assert self.alias_maps_to_use_v2 is not None for k, v in self.alias_maps_to_use.items(): if v == expr.child.name: self.generated_alias_maps[k] = quoted_name @@ -724,6 +742,7 @@ def window_frame_boundary( boundary: Expression, df_aliased_col_name_to_real_col_name: DefaultDict[str, Dict[str, str]], ) -> str: + # it means interval preceding if isinstance(boundary, UnaryMinus) and isinstance(boundary.child, Interval): return window_frame_boundary_expression( @@ -772,10 +791,14 @@ def to_sql_try_avoid_cast( def resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan: self.subquery_plans = [] self.generated_alias_maps = {} + self.generated_alias_maps_v2 = {} result = self.do_resolve(logical_plan) - + # result is a snowflake plan result.add_aliases(self.generated_alias_maps) + for k, v in self.generated_alias_maps_v2.items(): + new_dict = {k: (v, result.uuid)} + result.add_aliases_v2(new_dict) if self.subquery_plans: result = result.with_subqueries(self.subquery_plans) @@ -803,8 +826,11 @@ def do_resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan: if isinstance(logical_plan, Selectable): # Selectable doesn't have children. It already has the expr_to_alias dict. self.alias_maps_to_use = logical_plan.expr_to_alias.copy() + self.alias_maps_to_use_v2 = logical_plan.expr_to_alias_v2.copy() else: + use_maps = {} + use_maps_v2 = {} # get counts of expr_to_alias keys counts = Counter() for v in resolved_children.values(): @@ -821,6 +847,13 @@ def do_resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan: self.alias_maps_to_use = use_maps + use_maps_v2 = merge_multiple_dicts_with_assertion( + *[v.expr_to_alias_v2 for v in resolved_children.values()] + ) + + self.alias_maps_to_use_v2 = use_maps_v2 + # self.expr_to_alias_v2 = merge_multiple_dicts_with_assertion(*[v.expr_to_alias_v2 for v in resolved_children.values()]) + res = self.do_resolve_with_resolved_children( logical_plan, resolved_children, df_aliased_col_name_to_real_col_name ) diff --git a/src/snowflake/snowpark/_internal/analyzer/expression.py b/src/snowflake/snowpark/_internal/analyzer/expression.py index 34dc51763a8..a6215bb52ec 100644 --- a/src/snowflake/snowpark/_internal/analyzer/expression.py +++ b/src/snowflake/snowpark/_internal/analyzer/expression.py @@ -223,20 +223,38 @@ def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]: class Attribute(Expression, NamedExpression): - def __init__(self, name: str, datatype: DataType, nullable: bool = True) -> None: + def __init__( + self, + name: str, + datatype: DataType, + nullable: bool = True, + *, + snowflake_plan_uuid: str = None, + ) -> None: super().__init__() self.name = name self.datatype: DataType = datatype self.nullable = nullable + # non-breaking way to add snowflake_plan_uuid + self.snowflake_plan_uuid = snowflake_plan_uuid - def with_name(self, new_name: str) -> "Attribute": + def with_name( + self, new_name: str, *, snowflake_plan_uuid: str = None + ) -> "Attribute": if self.name == new_name: + # lazy update snowflake_plan_uuid + if not self.snowflake_plan_uuid: + self.snowflake_plan_uuid = snowflake_plan_uuid + else: + # one attribute can only belong to one snowflake plan + assert self.snowflake_plan_uuid == snowflake_plan_uuid return self else: return Attribute( snowflake.snowpark._internal.utils.quote_name(new_name), self.datatype, self.nullable, + snowflake_plan_uuid=snowflake_plan_uuid, ) @property diff --git a/src/snowflake/snowpark/_internal/analyzer/select_statement.py b/src/snowflake/snowpark/_internal/analyzer/select_statement.py index 13ac9c8e6db..8c4463f0d8e 100644 --- a/src/snowflake/snowpark/_internal/analyzer/select_statement.py +++ b/src/snowflake/snowpark/_internal/analyzer/select_statement.py @@ -236,6 +236,7 @@ def __init__( self._column_states: Optional[ColumnStateDict] = None self._snowflake_plan: Optional[SnowflakePlan] = None self.expr_to_alias = {} + self.expr_to_alias_v2 = {} self.df_aliased_col_name_to_real_col_name: DefaultDict[ str, Dict[str, str] ] = defaultdict(dict) @@ -315,6 +316,7 @@ def get_snowflake_plan(self, skip_schema_query) -> SnowflakePlan: df_aliased_col_name_to_real_col_name=self.df_aliased_col_name_to_real_col_name, source_plan=self, referenced_ctes=self.referenced_ctes, + expr_to_alias_v2=self.expr_to_alias_v2, ) # set api_calls to self._snowflake_plan outside of the above constructor # because the constructor copy api_calls. @@ -588,6 +590,7 @@ def __init__(self, snowflake_plan: LogicalPlan, *, analyzer: "Analyzer") -> None else analyzer.resolve(snowflake_plan) ) self.expr_to_alias.update(self._snowflake_plan.expr_to_alias) + self.expr_to_alias_v2.update(self._snowflake_plan.expr_to_alias_v2) self.df_aliased_col_name_to_real_col_name.update( self._snowflake_plan.df_aliased_col_name_to_real_col_name ) @@ -680,6 +683,7 @@ def __init__( self._projection_in_str = None self._query_params = None self.expr_to_alias.update(self.from_.expr_to_alias) + self.expr_to_alias_v2.update(self.from_.expr_to_alias_v2) self.df_aliased_col_name_to_real_col_name.update( self.from_.df_aliased_col_name_to_real_col_name ) diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py index d9209d7abb4..f553c6b4799 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py @@ -121,6 +121,14 @@ else: from collections.abc import Iterable +iiid = 0 + + +def get_next_id(): + global iiid + iiid += 1 + return str(iiid) + class SnowflakePlan(LogicalPlan): class Decorator: @@ -226,12 +234,14 @@ def __init__( referenced_ctes: Optional[Dict[WithQueryBlock, int]] = None, *, session: "snowflake.snowpark.session.Session", + expr_to_alias_v2: Optional[Dict] = None, ) -> None: super().__init__() self.queries = queries self.schema_query = schema_query self.post_actions = post_actions if post_actions else [] self.expr_to_alias = expr_to_alias if expr_to_alias else {} + self.expr_to_alias_v2 = expr_to_alias_v2 if expr_to_alias_v2 else {} self.session = session self.source_plan = source_plan self.is_ddl_on_temp_object = is_ddl_on_temp_object @@ -258,7 +268,8 @@ def __init__( self._cumulative_node_complexity: Optional[Dict[PlanNodeCategory, int]] = None # UUID for the plan to uniquely identify the SnowflakePlan object. We also use this # to UUID track queries that are generated from the same plan. - self._uuid = str(uuid.uuid4()) + self._uuid = get_next_id() + # self._uuid = str(uuid.uuid4()) # Metadata for the plan self._metadata: PlanMetadata = infer_metadata( self.source_plan, @@ -361,11 +372,16 @@ def attributes(self) -> List[Attribute]: # No simplifier case relies on this schema_query change to update SHOW TABLES to a nested sql friendly query. if not self.schema_query or not self.session.sql_simplifier_enabled: self.schema_query = schema_value_statement(attributes) + for attr in attributes: + attr.plan_uuid = self.uuid return attributes @cached_property def output(self) -> List[Attribute]: - return [Attribute(a.name, a.datatype, a.nullable) for a in self.attributes] + return [ + Attribute(a.name, a.datatype, a.nullable, snowflake_plan_uuid=self.uuid) + for a in self.attributes + ] @property def output_dict(self) -> Dict[str, Any]: @@ -509,6 +525,21 @@ def __deepcopy__(self, memodict={}) -> "SnowflakePlan": # noqa: B006 def add_aliases(self, to_add: Dict) -> None: self.expr_to_alias = {**self.expr_to_alias, **to_add} + def add_aliases_v2(self, to_add: Dict) -> None: + conflicted = False + for key in self.expr_to_alias_v2.keys() & to_add.keys(): # Find common keys + if self.expr_to_alias_v2[key] != to_add[key]: + conflicted = True + print( + f"need to overwrite, Conflict for key '{key}': {self.expr_to_alias_v2[key]} != {to_add[key]}" + ) + self.expr_to_alias_v2[key] = to_add[key] + for key in to_add.keys() - self.expr_to_alias_v2.keys(): # Find new keys + self.expr_to_alias_v2[key] = to_add[key] + if conflicted: + print("new mapping:", self.expr_to_alias_v2) + # self.expr_to_alias_v2 = {**to_add, **self.expr_to_alias_v2} + class SnowflakePlanBuilder: def __init__( @@ -592,6 +623,21 @@ def build_binary( }.items() if k not in common_columns } + + from snowflake.snowpark._internal.utils import ( + merge_multiple_dicts_with_assertion, + ) + + new_expr_to_alias_v2 = merge_multiple_dicts_with_assertion( + select_left.expr_to_alias_v2, select_right.expr_to_alias_v2 + ) + # new_expr_to_alias_v2 = { + # k: v + # for k, v in { + # **select_left.expr_to_alias_v2, + # **select_right.expr_to_alias_v2, + # }.items() + # } api_calls = [*select_left.api_calls, *select_right.api_calls] # Need to do a deduplication to avoid repeated query. @@ -639,6 +685,7 @@ def build_binary( api_calls=api_calls, session=self.session, referenced_ctes=referenced_ctes, + expr_to_alias_v2=new_expr_to_alias_v2, ) def query( diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index bd27e46f2fd..42130155fcc 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -1422,3 +1422,24 @@ def next(self) -> int: global_counter: GlobalCounter = GlobalCounter() + + +def merge_multiple_dicts_with_assertion(*dicts): + """ + Merge multiple dictionaries with an assertion that checks if values for + duplicate keys are the same. + + :param dicts: Multiple dictionaries to merge + :return: Merged dictionary + """ + merged_dict = {} + + for d in dicts: + for key, value in d.items(): + if key in merged_dict: + assert ( + merged_dict[key] == value + ), f"Conflict for key '{key}': {merged_dict[key]} != {value}" + merged_dict[key] = value + + return merged_dict diff --git a/src/snowflake/snowpark/column.py b/src/snowflake/snowpark/column.py index 7e9588d9d5d..43b26d7d882 100644 --- a/src/snowflake/snowpark/column.py +++ b/src/snowflake/snowpark/column.py @@ -1241,12 +1241,12 @@ def as_(self, alias: str, _emit_ast: bool = True) -> "Column": """Returns a new renamed Column. Alias of :func:`name`.""" return self.name(alias, variant="as_", _emit_ast=_emit_ast) - @publicapi + # @publicapi def alias(self, alias: str, _emit_ast: bool = True) -> "Column": """Returns a new renamed Column. Alias of :func:`name`.""" return self.name(alias, variant="alias", _emit_ast=_emit_ast) - @publicapi + # @publicapi def name( self, alias: str, diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 9bf175a18e4..4a9509c44bc 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -330,6 +330,15 @@ def _disambiguate( ], _emit_ast=False, ) + new_map = {} + for k, v in lhs_remapped._plan.expr_to_alias_v2.items(): + new_map[(k[0], lhs._plan.uuid)] = v + lhs_remapped._plan.expr_to_alias_v2 = new_map.copy() + + new_map = {} + for k, v in rhs_remapped._plan.expr_to_alias_v2.items(): + new_map[(k[0], rhs._plan.uuid)] = v + rhs_remapped._plan.expr_to_alias_v2 = new_map.copy() return lhs_remapped, rhs_remapped @@ -593,6 +602,7 @@ def __init__( if isinstance(plan, (SelectStatement, MockSelectStatement)): self._select_statement = plan plan.expr_to_alias.update(self._plan.expr_to_alias) + plan.expr_to_alias_v2.update(self._plan.expr_to_alias_v2) plan.df_aliased_col_name_to_real_col_name.update( self._plan.df_aliased_col_name_to_real_col_name ) @@ -624,8 +634,8 @@ def _set_ast_ref(self, sp_dataframe_expr_builder: Any) -> None: Given a field builder expression of the AST type SpDataframeExpr, points the builder to reference this dataframe. """ # TODO SNOW-1762262: remove once we generate the correct AST. - debug_check_missing_ast(self._ast_id, self) - sp_dataframe_expr_builder.sp_dataframe_ref.id.bitfield1 = self._ast_id + # debug_check_missing_ast(self._ast_id, self) + # sp_dataframe_expr_builder.sp_dataframe_ref.id.bitfield1 = self._ast_id @property def stat(self) -> DataFrameStatFunctions: @@ -1339,8 +1349,8 @@ def col(self, col_name: str, _emit_ast: bool = True) -> Column: else: return Column(self._resolve(col_name), _ast=expr) - @df_api_usage - @publicapi + # @df_api_usage + # @publicapi def select( self, *cols: Union[ @@ -5557,7 +5567,9 @@ def _resolve(self, col_name: str) -> Union[Expression, NamedExpression]: ) if len(cols) == 1: - return cols[0].with_name(normalized_col_name) + return cols[0].with_name( + normalized_col_name, snowflake_plan_uuid=self._plan.uuid + ) else: raise SnowparkClientExceptionMessages.DF_CANNOT_RESOLVE_COLUMN_NAME( col_name From ea70f2fe7651f4cfdb3287d8c738363e83bcf0e7 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 9 Jan 2025 20:50:38 -0800 Subject: [PATCH 2/9] update --- .../snowpark/_internal/analyzer/analyzer.py | 19 ++++++++----------- .../_internal/analyzer/select_statement.py | 1 + .../_internal/analyzer/snowflake_plan.py | 10 ++++++++++ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer.py b/src/snowflake/snowpark/_internal/analyzer/analyzer.py index 7bc3a267dad..395694beb45 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer.py @@ -372,14 +372,13 @@ def analyze( return expr.sql if isinstance(expr, Attribute): - assert self.alias_maps_to_use is not None - name = self.alias_maps_to_use.get(expr.expr_id, expr.name) + # assert self.alias_maps_to_use is not None + # name = self.alias_maps_to_use.get(expr.expr_id, expr.name) + assert self.alias_maps_to_use_v2 is not None name2 = self.alias_maps_to_use_v2.get( (expr.expr_id, expr.snowflake_plan_uuid), expr.name ) - if isinstance(name2, tuple): - name2 = name2[0] - print(name, name2) + # print(name, name2) return quote_name(name2) if isinstance(expr, UnresolvedAttribute): @@ -647,8 +646,6 @@ def unary_expression_extractor( (expr.child.expr_id, expr.child.snowflake_plan_uuid) ] = quoted_name assert self.alias_maps_to_use is not None - # TODO: - # assert self.alias_maps_to_use_v2 is not None for k, v in self.alias_maps_to_use.items(): if v == expr.child.name: self.generated_alias_maps[k] = quoted_name @@ -796,9 +793,10 @@ def resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan: result = self.do_resolve(logical_plan) # result is a snowflake plan result.add_aliases(self.generated_alias_maps) - for k, v in self.generated_alias_maps_v2.items(): - new_dict = {k: (v, result.uuid)} - result.add_aliases_v2(new_dict) + result.add_aliases_v2(self.generated_alias_maps_v2) + # for k, v in self.generated_alias_maps_v2.items(): + # new_dict = {k: (v, result.uuid)} + # result.add_aliases_v2(new_dict) if self.subquery_plans: result = result.with_subqueries(self.subquery_plans) @@ -852,7 +850,6 @@ def do_resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan: ) self.alias_maps_to_use_v2 = use_maps_v2 - # self.expr_to_alias_v2 = merge_multiple_dicts_with_assertion(*[v.expr_to_alias_v2 for v in resolved_children.values()]) res = self.do_resolve_with_resolved_children( logical_plan, resolved_children, df_aliased_col_name_to_real_col_name diff --git a/src/snowflake/snowpark/_internal/analyzer/select_statement.py b/src/snowflake/snowpark/_internal/analyzer/select_statement.py index 8c4463f0d8e..164792a008b 100644 --- a/src/snowflake/snowpark/_internal/analyzer/select_statement.py +++ b/src/snowflake/snowpark/_internal/analyzer/select_statement.py @@ -1046,6 +1046,7 @@ def select(self, cols: List[Expression]) -> "SelectStatement": new.expr_to_alias = copy( self.expr_to_alias ) # use copy because we don't want two plans to share the same list. If one mutates, the other ones won't be impacted. + new.expr_to_alias_v2 = copy(self.expr_to_alias_v2) new.flatten_disabled = self.flatten_disabled # no need to flatten the projection complexity since the select projection is already flattened. new._merge_projection_complexity_with_subquery = False diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py index f553c6b4799..bc6a3d846ca 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py @@ -339,6 +339,7 @@ def with_subqueries(self, subquery_plans: List["SnowflakePlan"]) -> "SnowflakePl new_schema_query, post_actions=new_post_actions, expr_to_alias=self.expr_to_alias, + expr_to_alias_v2=self.expr_to_alias_v2, session=self.session, source_plan=self.source_plan, api_calls=api_calls, @@ -473,6 +474,9 @@ def __copy__(self) -> "SnowflakePlan": self.df_aliased_col_name_to_real_col_name, session=self.session, referenced_ctes=self.referenced_ctes, + expr_to_alias_v2=dict(self.expr_to_alias_v2) + if self.expr_to_alias_v2 + else None, ) else: return SnowflakePlan( @@ -486,6 +490,9 @@ def __copy__(self) -> "SnowflakePlan": self.df_aliased_col_name_to_real_col_name, session=self.session, referenced_ctes=self.referenced_ctes, + expr_to_alias_v2=dict(self.expr_to_alias_v2) + if self.expr_to_alias_v2 + else None, ) def __deepcopy__(self, memodict={}) -> "SnowflakePlan": # noqa: B006 @@ -503,6 +510,9 @@ def __deepcopy__(self, memodict={}) -> "SnowflakePlan": # noqa: B006 expr_to_alias=copy.deepcopy(self.expr_to_alias) if self.expr_to_alias else None, + expr_to_alias_v2=copy.deepcopy(self.expr_to_alias_v2) + if self.expr_to_alias_v2 + else None, source_plan=copied_source_plan, is_ddl_on_temp_object=self.is_ddl_on_temp_object, api_calls=copy.deepcopy(self.api_calls) if self.api_calls else None, From 6d7de6c0eceea8acb436fe11ac7cd62fdb95faec Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 9 Jan 2025 21:52:18 -0800 Subject: [PATCH 3/9] minor --- .../snowpark/_internal/analyzer/analyzer.py | 1 - .../snowpark/_internal/analyzer/expression.py | 1 + .../snowpark/_internal/analyzer/snowflake_plan.py | 13 +++++-------- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer.py b/src/snowflake/snowpark/_internal/analyzer/analyzer.py index 395694beb45..0516a413592 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer.py @@ -378,7 +378,6 @@ def analyze( name2 = self.alias_maps_to_use_v2.get( (expr.expr_id, expr.snowflake_plan_uuid), expr.name ) - # print(name, name2) return quote_name(name2) if isinstance(expr, UnresolvedAttribute): diff --git a/src/snowflake/snowpark/_internal/analyzer/expression.py b/src/snowflake/snowpark/_internal/analyzer/expression.py index a6215bb52ec..4f400a1d204 100644 --- a/src/snowflake/snowpark/_internal/analyzer/expression.py +++ b/src/snowflake/snowpark/_internal/analyzer/expression.py @@ -241,6 +241,7 @@ def __init__( def with_name( self, new_name: str, *, snowflake_plan_uuid: str = None ) -> "Attribute": + assert snowflake_plan_uuid if self.name == new_name: # lazy update snowflake_plan_uuid if not self.snowflake_plan_uuid: diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py index bc6a3d846ca..895799e431e 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py @@ -536,19 +536,16 @@ def add_aliases(self, to_add: Dict) -> None: self.expr_to_alias = {**self.expr_to_alias, **to_add} def add_aliases_v2(self, to_add: Dict) -> None: - conflicted = False + # conflicted = False for key in self.expr_to_alias_v2.keys() & to_add.keys(): # Find common keys if self.expr_to_alias_v2[key] != to_add[key]: - conflicted = True - print( - f"need to overwrite, Conflict for key '{key}': {self.expr_to_alias_v2[key]} != {to_add[key]}" - ) + # conflicted = True + # print( + # f"need to overwrite, the expr has been realiased for key '{key}', old value {self.expr_to_alias_v2[key]} -> new value {to_add[key]}" + # ) self.expr_to_alias_v2[key] = to_add[key] for key in to_add.keys() - self.expr_to_alias_v2.keys(): # Find new keys self.expr_to_alias_v2[key] = to_add[key] - if conflicted: - print("new mapping:", self.expr_to_alias_v2) - # self.expr_to_alias_v2 = {**to_add, **self.expr_to_alias_v2} class SnowflakePlanBuilder: From 9d26c618381650e8565d8fbe81c7cb623c7dde8d Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Fri, 10 Jan 2025 10:38:06 -0800 Subject: [PATCH 4/9] set init plan uuid --- src/snowflake/snowpark/dataframe.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 881e1d9c9df..d6f271e6d02 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -630,6 +630,9 @@ def __init__( self.replace = self._na.replace self._alias: Optional[str] = None + for attr in self._output: + assert not attr.snowflake_plan_uuid + attr.snowflake_plan_uuid = self._plan.uuid def _set_ast_ref(self, sp_dataframe_expr_builder: Any) -> None: """ From 32601f5b3d950a8e8507beed19cea99795d185bb Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Fri, 10 Jan 2025 10:39:28 -0800 Subject: [PATCH 5/9] minor --- tests/unit/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 988590c52ce..13f730741d0 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -56,6 +56,7 @@ def mock_snowflake_plan(mock_query) -> Analyzer: fake_snowflake_plan = mock.create_autospec(SnowflakePlan) fake_snowflake_plan._id = "dummy id" fake_snowflake_plan.expr_to_alias = {} + fake_snowflake_plan.expr_to_alias_v2 = {} fake_snowflake_plan.df_aliased_col_name_to_real_col_name = {} fake_snowflake_plan.queries = [mock_query] fake_snowflake_plan.post_actions = [] From 12fc79225992f90f4f85b32040efad1ede6b8cce Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Fri, 10 Jan 2025 10:46:12 -0800 Subject: [PATCH 6/9] remove unrelated code --- src/snowflake/snowpark/column.py | 4 ++-- src/snowflake/snowpark/dataframe.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/snowflake/snowpark/column.py b/src/snowflake/snowpark/column.py index 43b26d7d882..7e9588d9d5d 100644 --- a/src/snowflake/snowpark/column.py +++ b/src/snowflake/snowpark/column.py @@ -1241,12 +1241,12 @@ def as_(self, alias: str, _emit_ast: bool = True) -> "Column": """Returns a new renamed Column. Alias of :func:`name`.""" return self.name(alias, variant="as_", _emit_ast=_emit_ast) - # @publicapi + @publicapi def alias(self, alias: str, _emit_ast: bool = True) -> "Column": """Returns a new renamed Column. Alias of :func:`name`.""" return self.name(alias, variant="alias", _emit_ast=_emit_ast) - # @publicapi + @publicapi def name( self, alias: str, diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index d6f271e6d02..d6e1793a964 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -1354,8 +1354,8 @@ def col(self, col_name: str, _emit_ast: bool = True) -> Column: else: return Column(self._resolve(col_name), _ast=expr) - # @df_api_usage - # @publicapi + @df_api_usage + @publicapi def select( self, *cols: Union[ From 747afc74930dee40d441ba0c550320fdd78d8944 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Fri, 10 Jan 2025 11:50:36 -0800 Subject: [PATCH 7/9] remove assertion --- src/snowflake/snowpark/_internal/analyzer/analyzer.py | 2 +- src/snowflake/snowpark/dataframe.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer.py b/src/snowflake/snowpark/_internal/analyzer/analyzer.py index 0516a413592..9a97dbcb5ef 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer.py @@ -640,7 +640,7 @@ def unary_expression_extractor( quoted_name = quote_name(expr.name) if isinstance(expr.child, Attribute): self.generated_alias_maps[expr.child.expr_id] = quoted_name - assert expr.child.snowflake_plan_uuid is not None + # assert expr.child.snowflake_plan_uuid is not None self.generated_alias_maps_v2[ (expr.child.expr_id, expr.child.snowflake_plan_uuid) ] = quoted_name diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index d6e1793a964..9199cda8b83 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -630,9 +630,11 @@ def __init__( self.replace = self._na.replace self._alias: Optional[str] = None - for attr in self._output: - assert not attr.snowflake_plan_uuid - attr.snowflake_plan_uuid = self._plan.uuid + # for attr in self._output: + # if not attr.snowflake_plan_uuid: + # attr.snowflake_plan_uuid = self._plan.uuid + # else: + # assert attr.snowflake_plan_uuid == self._plan.uuid def _set_ast_ref(self, sp_dataframe_expr_builder: Any) -> None: """ From 0c8054bb336f5071640c92a83bb9e6ecb748c4ca Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Fri, 10 Jan 2025 13:33:03 -0800 Subject: [PATCH 8/9] update --- .../snowpark/_internal/analyzer/expression.py | 12 ++++++------ tests/unit/test_query_plan_analysis.py | 4 ++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/snowflake/snowpark/_internal/analyzer/expression.py b/src/snowflake/snowpark/_internal/analyzer/expression.py index 4f400a1d204..68b5f1fc9f9 100644 --- a/src/snowflake/snowpark/_internal/analyzer/expression.py +++ b/src/snowflake/snowpark/_internal/analyzer/expression.py @@ -243,12 +243,12 @@ def with_name( ) -> "Attribute": assert snowflake_plan_uuid if self.name == new_name: - # lazy update snowflake_plan_uuid - if not self.snowflake_plan_uuid: - self.snowflake_plan_uuid = snowflake_plan_uuid - else: - # one attribute can only belong to one snowflake plan - assert self.snowflake_plan_uuid == snowflake_plan_uuid + # # lazy update snowflake_plan_uuid + # if not self.snowflake_plan_uuid: + # self.snowflake_plan_uuid = snowflake_plan_uuid + # else: + # # one attribute can only belong to one snowflake plan + # assert self.snowflake_plan_uuid == snowflake_plan_uuid return self else: return Attribute( diff --git a/tests/unit/test_query_plan_analysis.py b/tests/unit/test_query_plan_analysis.py index 3a78efabf12..dc73d4b40f4 100644 --- a/tests/unit/test_query_plan_analysis.py +++ b/tests/unit/test_query_plan_analysis.py @@ -166,6 +166,7 @@ def test_select_statement_individual_node_complexity( from_.pre_actions = None from_.post_actions = None from_.expr_to_alias = {} + from_.expr_to_alias_v2 = {} from_.df_aliased_col_name_to_real_col_name = {} plan_node = SelectStatement(from_=from_, analyzer=mock_analyzer) @@ -203,6 +204,7 @@ def test_set_statement_individual_node_complexity(mock_analyzer, set_operator): mock_selectable.pre_actions = None mock_selectable.post_actions = None mock_selectable.expr_to_alias = {} + mock_selectable.expr_to_alias_v2 = {} mock_selectable.df_aliased_col_name_to_real_col_name = {} set_operands = [ SetOperand(mock_selectable, set_operator), @@ -394,6 +396,7 @@ def test_select_statement_get_complexity_map_no_column_state(mock_analyzer): mock_from.pre_actions = None mock_from.post_actions = None mock_from.expr_to_alias = {} + mock_from.expr_to_alias_v2 = {} mock_from.df_aliased_col_name_to_real_col_name = {} select_statement = SelectStatement(analyzer=mock_analyzer, from_=mock_from) @@ -412,6 +415,7 @@ def test_select_statement_get_complexity_map_mismatch_projection_length(mock_ana mock_from.pre_actions = None mock_from.post_actions = None mock_from.expr_to_alias = {} + mock_from.expr_to_alias_v2 = {} mock_from.df_aliased_col_name_to_real_col_name = {} # create a select_statement with 2 projections From 7c200ed798e652f0360be6f974914aefaae5da9e Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Fri, 10 Jan 2025 14:51:12 -0800 Subject: [PATCH 9/9] update --- src/snowflake/snowpark/dataframe.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 9199cda8b83..aac8fb38706 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -630,11 +630,9 @@ def __init__( self.replace = self._na.replace self._alias: Optional[str] = None - # for attr in self._output: - # if not attr.snowflake_plan_uuid: - # attr.snowflake_plan_uuid = self._plan.uuid - # else: - # assert attr.snowflake_plan_uuid == self._plan.uuid + # update the output attributes to point to the current plan + for attr in self._output: + attr.snowflake_plan_uuid = self._plan.uuid def _set_ast_ref(self, sp_dataframe_expr_builder: Any) -> None: """