Skip to content

Commit

Permalink
SNOW-1637215: Add describe query telemetry (#2622)
Browse files Browse the repository at this point in the history
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.

Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1637215

2. Fill out the following pre-review checklist:

- [ ] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.
- [x] I acknowledge that I have ensured my changes to be thread-safe.
Follow the link for more information: [Thread-safe Developer
Guidelines](https://docs.google.com/document/d/162d_i4zZ2AfcGRXojj0jByt8EUq-DrSHPPnTa4QvwbA/edit#bookmark=id.e82u4nekq80k)

3. Please describe how your code solves the related issue.

Add telemetry for describe query mentioned in
https://docs.google.com/document/d/1DAN1I99BQd1CBBu_pLvWb9vLReTC7HIbfppxeVbXCSg/edit?tab=t.0#heading=h.r285g4qcslsz
  • Loading branch information
sfc-gh-jdu authored Nov 19, 2024
1 parent 53291b4 commit 60a35bf
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 11 deletions.
9 changes: 8 additions & 1 deletion src/snowflake/snowpark/_internal/analyzer/metadata_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

from enum import Enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional

Expand All @@ -17,6 +17,13 @@
from snowflake.snowpark._internal.analyzer.analyzer import Analyzer


class DescribeQueryTelemetryField(Enum):
TYPE_DESCRIBE_QUERY_DETAILS = "snowpark_describe_query_details"
SQL_TEXT = "sql_text"
E2E_TIME = "e2e_time"
STACK_TRACE = "stack_trace"


@dataclass(frozen=True)
class PlanMetadata:
"""
Expand Down
15 changes: 13 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/schema_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

import time
import traceback
from typing import TYPE_CHECKING, List, Union

import snowflake.snowpark
Expand Down Expand Up @@ -90,7 +91,17 @@ def analyze_attributes(
session._conn._cursor.description, session._conn.max_string_size
)

return session._get_result_attributes(sql)
# collect describe query details for telemetry
stack = traceback.extract_stack(limit=10)[:-1]
stack_trace = [frame.line for frame in stack] if len(stack) > 0 else None
start_time = time.time()
attributes = session._get_result_attributes(sql)
e2e_time = time.time() - start_time
session._conn._telemetry_client.send_describe_query_details(
session._session_id, sql, e2e_time, stack_trace
)

return attributes


def convert_result_meta_to_attribute(
Expand Down
23 changes: 23 additions & 0 deletions src/snowflake/snowpark/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from snowflake.snowpark._internal.compiler.telemetry_constants import (
CompilationStageTelemetryField,
)
from snowflake.snowpark._internal.analyzer.metadata_utils import (
DescribeQueryTelemetryField,
)
from snowflake.snowpark._internal.utils import (
get_application_name,
get_os_name,
Expand Down Expand Up @@ -561,3 +564,23 @@ def send_reduce_describe_query_telemetry(
},
}
self.send(message)

def send_describe_query_details(
self,
session_id: int,
sql_text: str,
e2e_time: float,
stack_trace: Optional[List[Optional[str]]],
):
message = {
**self._create_basic_telemetry_data(
DescribeQueryTelemetryField.TYPE_DESCRIBE_QUERY_DETAILS.value
),
TelemetryField.KEY_DATA.value: {
TelemetryField.SESSION_ID.value: session_id,
DescribeQueryTelemetryField.SQL_TEXT.value: sql_text,
DescribeQueryTelemetryField.E2E_TIME.value: e2e_time,
DescribeQueryTelemetryField.STACK_TRACE.value: stack_trace,
},
}
self.send(message)
41 changes: 33 additions & 8 deletions tests/integ/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:

expected_data = {"func_name": "UDTFRegistration.register", "category": "create"}
assert telemetry_tracker.find_message_in_log_data(
2, sum_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, sum_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"

sum_udtf = sum_udtf_partial()
select_partial = partial(df.select, sum_udtf(df.a, df.b))
Expand All @@ -1108,8 +1108,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:
"category": "usage",
}
assert telemetry_tracker.find_message_in_log_data(
2, select_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, select_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"

# udtf register from file
test_files = TestFiles(resources_path)
Expand All @@ -1127,8 +1127,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:
"category": "create",
}
assert telemetry_tracker.find_message_in_log_data(
2, my_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, my_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"
my_udtf = my_udtf_partial()

invoke_partial = partial(
Expand All @@ -1149,8 +1149,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:
"category": "usage",
}
assert telemetry_tracker.find_message_in_log_data(
2, invoke_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, invoke_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"


@pytest.mark.skip(
Expand Down Expand Up @@ -1284,3 +1284,28 @@ def send_telemetry():
data, type_, _ = telemetry_tracker.extract_telemetry_log_data(-1, send_telemetry)
assert data == expected_data
assert type_ == "snowpark_cursor_created"


def test_describe_query_details(session):
client = session._conn._telemetry_client

def send_telemetry():
client.send_describe_query_details(
session.session_id,
sql_text="select 1 as a, 2 as b",
e2e_time=0.01,
stack_trace=["line1", "line2"],
)

telemetry_tracker = TelemetryDataTracker(session)

expected_data = {
"session_id": session.session_id,
"sql_text": "select 1 as a, 2 as b",
"e2e_time": 0.01,
"stack_trace": ["line1", "line2"],
}

data, type_, _ = telemetry_tracker.extract_telemetry_log_data(-1, send_telemetry)
assert data == expected_data
assert type_ == "snowpark_describe_query_details"

0 comments on commit 60a35bf

Please sign in to comment.