From 9817632101ed940e556c8f7455453c29f8fb5202 Mon Sep 17 00:00:00 2001 From: maco Date: Thu, 9 May 2024 22:47:55 +0800 Subject: [PATCH 1/7] feat: limiting the size of query results to Dashboard --- src/servers/src/http.rs | 14 ++++++ src/servers/src/http/handler.rs | 42 +++++++++++++++- src/servers/tests/http/http_handler_test.rs | 55 +++++++++++++++++---- src/table/src/test_util/memtable.rs | 6 ++- 4 files changed, 104 insertions(+), 13 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 4ba4e56b088c..d42d16c27959 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -336,6 +336,20 @@ impl Display for Epoch { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequestSource { + Dashboard, +} + +impl RequestSource { + pub fn parse(s: &str) -> Option { + match s { + "dashboard" => Some(RequestSource::Dashboard), + _ => None, + } + } +} + #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub enum HttpResponse { Arrow(ArrowResponse), diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 8a16065df2c4..f53f79a8b93d 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -41,7 +41,7 @@ use crate::http::influxdb_result_v1::InfluxdbV1Response; use crate::http::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, - HttpResponse, ResponseFormat, + HttpResponse, RequestSource, ResponseFormat, }; use crate::metrics_handler::MetricsHandler; use crate::query_handler::sql::ServerSqlQueryHandlerRef; @@ -62,6 +62,8 @@ pub struct SqlQuery { // specified time precision. Maybe greptimedb format can support this // param too. pub epoch: Option, + // request source, ["dashboard"] + pub source: Option, } /// Handler to execute sql @@ -93,6 +95,11 @@ pub async fn sql( .or(form_params.epoch) .map(|s| s.to_lowercase()) .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); + let source = if let Some(source) = query_params.source { + RequestSource::parse(&source.to_lowercase()) + } else { + None + }; let result = if let Some(sql) = &sql { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { @@ -117,7 +124,7 @@ pub async fn sql( Ok(outputs) => outputs, }; - let resp = match format { + let mut resp = match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await, ResponseFormat::Csv => CsvResponse::from_output(outputs).await, ResponseFormat::Table => TableResponse::from_output(outputs).await, @@ -125,9 +132,40 @@ pub async fn sql( ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, }; + if let Some(RequestSource::Dashboard) = source { + resp = from_dashboard(resp).await; + } resp.with_execution_time(start.elapsed().as_millis() as u64) } +pub async fn from_dashboard(resp: HttpResponse) -> HttpResponse { + let HttpResponse::GreptimedbV1(response) = resp else { + return resp; + }; + let GreptimedbV1Response { + output, + execution_time_ms, + resp_metrics, + } = response; + + let mut outputs = Vec::with_capacity(output.len()); + for query_result in output { + let GreptimeQueryOutput::Records(mut records) = query_result else { + outputs.push(query_result); + continue; + }; + if records.rows.len() > 1000 { + records.rows.resize(1000, vec![]); + } + outputs.push(GreptimeQueryOutput::Records(records)); + } + HttpResponse::GreptimedbV1(GreptimedbV1Response { + output: outputs, + execution_time_ms, + resp_metrics, + }) +} + /// Create a response from query result pub async fn from_output( outputs: Vec>, diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 0f0c8966ca5d..91b077d71fd5 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -19,10 +19,12 @@ use axum::extract::{Json, Query, RawBody, State}; use axum::http::header; use axum::response::IntoResponse; use axum::Form; +use common_telemetry::info; use headers::HeaderValue; use http_body::combinators::UnsyncBoxBody; use hyper::Response; use mime_guess::mime; +use servers::http::GreptimeQueryOutput::Records; use servers::http::{ handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpResponse, @@ -48,10 +50,8 @@ async fn test_sql_not_provided() { for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] { let query = http_handler::SqlQuery { - db: None, - sql: None, format: Some(format.to_string()), - epoch: None, + ..Default::default() }; let HttpResponse::Error(resp) = http_handler::sql( @@ -82,8 +82,9 @@ async fn test_sql_output_rows() { script_handler: None, }; + let query_sql = "select sum(uint32s) from numbers limit 20"; for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] { - let query = create_query(format); + let query = create_query(format, query_sql, None); let json = http_handler::sql( State(api_state.clone()), query, @@ -176,6 +177,41 @@ async fn test_sql_output_rows() { } } +#[tokio::test] +async fn test_dashboard_sql_limit() { + common_telemetry::init_default_ut_logging(); + + let sql_handler = create_testing_sql_query_handler(MemTable::specified_numbers_table(2000)); + let ctx = QueryContext::arc(); + ctx.set_current_user(Some(auth::userinfo_by_name(None))); + let api_state = ApiState { + sql_handler, + script_handler: None, + }; + let query = create_query( + "greptimedb_v1", + "select * from numbers", + Some("dashboard".to_string()), + ); + let json = http_handler::sql( + State(api_state.clone()), + query, + axum::Extension(ctx.clone()), + Form(http_handler::SqlQuery::default()), + ) + .await; + + if let HttpResponse::GreptimedbV1(resp) = json { + let output = resp.output().get(0).unwrap(); + match output { + Records(records) => { + assert_eq!(records.num_rows(), 1000); + } + _ => unreachable!(), + } + } +} + #[tokio::test] async fn test_sql_form() { common_telemetry::init_default_ut_logging(); @@ -484,21 +520,20 @@ fn create_invalid_script_query() -> Query { }) } -fn create_query(format: &str) -> Query { +fn create_query(format: &str, sql: &str, source: Option) -> Query { Query(http_handler::SqlQuery { - sql: Some("select sum(uint32s) from numbers limit 20".to_string()), - db: None, + sql: Some(sql.to_string()), format: Some(format.to_string()), - epoch: None, + source, + ..Default::default() }) } fn create_form(format: &str) -> Form { Form(http_handler::SqlQuery { sql: Some("select sum(uint32s) from numbers limit 20".to_string()), - db: None, format: Some(format.to_string()), - epoch: None, + ..Default::default() }) } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 22562fa1a719..737ef644f637 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -101,6 +101,10 @@ impl MemTable { /// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and /// column type "uint32". Column data increased from 0 to 100. pub fn default_numbers_table() -> TableRef { + Self::specified_numbers_table(100) + } + + pub fn specified_numbers_table(rows: u32) -> TableRef { let column_schemas = vec![ColumnSchema::new( "uint32s", ConcreteDataType::uint32_datatype(), @@ -108,7 +112,7 @@ impl MemTable { )]; let schema = Arc::new(Schema::new(column_schemas)); let columns: Vec = vec![Arc::new(UInt32Vector::from_slice( - (0..100).collect::>(), + (0..rows).collect::>(), ))]; let recordbatch = RecordBatch::new(schema, columns).unwrap(); MemTable::table("numbers", recordbatch) From 59fd0e0aa307739a4534d02a79b7c1abf9cfccda Mon Sep 17 00:00:00 2001 From: maco Date: Fri, 10 May 2024 13:18:39 +0800 Subject: [PATCH 2/7] optimize code --- src/servers/src/http/handler.rs | 20 ++++++++++---------- src/servers/tests/http/http_handler_test.rs | 8 +++----- src/session/src/context.rs | 10 +++++++++- src/session/src/session_config.rs | 15 +++++++++++++++ 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index f53f79a8b93d..552828ef607b 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -95,17 +95,16 @@ pub async fn sql( .or(form_params.epoch) .map(|s| s.to_lowercase()) .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); - let source = if let Some(source) = query_params.source { - RequestSource::parse(&source.to_lowercase()) - } else { - None - }; + let source = query_params + .source + .map(|s| RequestSource::parse(s.to_lowercase().as_str())) + .unwrap_or(None); let result = if let Some(sql) = &sql { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { Err((status, msg)) } else { - Ok(sql_handler.do_query(sql, query_ctx).await) + Ok(sql_handler.do_query(sql, query_ctx.clone()).await) } } else { Err(( @@ -133,12 +132,12 @@ pub async fn sql( }; if let Some(RequestSource::Dashboard) = source { - resp = from_dashboard(resp).await; + resp = from_dashboard(resp, query_ctx).await; } resp.with_execution_time(start.elapsed().as_millis() as u64) } -pub async fn from_dashboard(resp: HttpResponse) -> HttpResponse { +pub async fn from_dashboard(resp: HttpResponse, ctx: QueryContextRef) -> HttpResponse { let HttpResponse::GreptimedbV1(response) = resp else { return resp; }; @@ -147,6 +146,7 @@ pub async fn from_dashboard(resp: HttpResponse) -> HttpResponse { execution_time_ms, resp_metrics, } = response; + let query_limit = ctx.configuration_parameter().dashboard_query_limit(); let mut outputs = Vec::with_capacity(output.len()); for query_result in output { @@ -154,8 +154,8 @@ pub async fn from_dashboard(resp: HttpResponse) -> HttpResponse { outputs.push(query_result); continue; }; - if records.rows.len() > 1000 { - records.rows.resize(1000, vec![]); + if records.rows.len() > query_limit { + records.rows.resize(query_limit, vec![]); } outputs.push(GreptimeQueryOutput::Records(records)); } diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 91b077d71fd5..6f22ea3d8431 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -19,7 +19,6 @@ use axum::extract::{Json, Query, RawBody, State}; use axum::http::header; use axum::response::IntoResponse; use axum::Form; -use common_telemetry::info; use headers::HeaderValue; use http_body::combinators::UnsyncBoxBody; use hyper::Response; @@ -179,8 +178,6 @@ async fn test_sql_output_rows() { #[tokio::test] async fn test_dashboard_sql_limit() { - common_telemetry::init_default_ut_logging(); - let sql_handler = create_testing_sql_query_handler(MemTable::specified_numbers_table(2000)); let ctx = QueryContext::arc(); ctx.set_current_user(Some(auth::userinfo_by_name(None))); @@ -202,10 +199,11 @@ async fn test_dashboard_sql_limit() { .await; if let HttpResponse::GreptimedbV1(resp) = json { - let output = resp.output().get(0).unwrap(); + let output = resp.output().first().unwrap(); + let query_limit = ctx.configuration_parameter().dashboard_query_limit(); match output { Records(records) => { - assert_eq!(records.num_rows(), 1000); + assert_eq!(records.num_rows(), query_limit); } _ => unreachable!(), } diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 592b56b2fb3a..35bf2614f435 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -27,7 +27,9 @@ use common_time::Timezone; use derive_builder::Builder; use sql::dialect::{Dialect, GreptimeDbDialect, MySqlDialect, PostgreSqlDialect}; -use crate::session_config::{PGByteaOutputValue, PGDateOrder, PGDateTimeStyle}; +use crate::session_config::{ + DashboardQueryLimit, PGByteaOutputValue, PGDateOrder, PGDateTimeStyle, +}; use crate::SessionRef; pub type QueryContextRef = Arc; @@ -327,6 +329,7 @@ impl Display for Channel { pub struct ConfigurationVariables { postgres_bytea_output: ArcSwap, pg_datestyle_format: ArcSwap<(PGDateTimeStyle, PGDateOrder)>, + dashboard_query_limit: ArcSwap, } impl Clone for ConfigurationVariables { @@ -334,6 +337,7 @@ impl Clone for ConfigurationVariables { Self { postgres_bytea_output: ArcSwap::new(self.postgres_bytea_output.load().clone()), pg_datestyle_format: ArcSwap::new(self.pg_datestyle_format.load().clone()), + dashboard_query_limit: ArcSwap::new(self.dashboard_query_limit.load().clone()), } } } @@ -358,6 +362,10 @@ impl ConfigurationVariables { pub fn set_pg_datetime_style(&self, style: PGDateTimeStyle, order: PGDateOrder) { self.pg_datestyle_format.swap(Arc::new((style, order))); } + + pub fn dashboard_query_limit(&self) -> usize { + self.dashboard_query_limit.load().get() + } } #[cfg(test)] diff --git a/src/session/src/session_config.rs b/src/session/src/session_config.rs index 8b93ce2a2cc9..b8596ee265a9 100644 --- a/src/session/src/session_config.rs +++ b/src/session/src/session_config.rs @@ -176,3 +176,18 @@ impl TryFrom<&Value> for PGDateTimeStyle { } } } + +#[derive(Copy, Clone, Debug)] +pub struct DashboardQueryLimit(usize); + +impl Default for DashboardQueryLimit { + fn default() -> Self { + Self(1000) + } +} + +impl DashboardQueryLimit { + pub fn get(&self) -> usize { + self.0 + } +} From fdc410eac08c99d7ab303653af5f3becbdce2fd5 Mon Sep 17 00:00:00 2001 From: maco Date: Sat, 11 May 2024 17:47:12 +0800 Subject: [PATCH 3/7] fix by cr --- src/servers/src/http.rs | 15 ++++++++++++--- src/servers/src/http/handler.rs | 6 +++--- src/servers/tests/http/http_handler_test.rs | 12 ++++++++---- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d42d16c27959..7fa8509e71c5 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::fmt::Display; use std::net::SocketAddr; +use std::str::FromStr; use std::sync::Mutex as StdMutex; use std::time::Duration; @@ -41,6 +42,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::{ensure, ResultExt}; +use strum::EnumString; use tokio::sync::oneshot::{self, Sender}; use tokio::sync::Mutex; use tower::timeout::TimeoutLayer; @@ -190,6 +192,10 @@ impl From for OutputSchema { pub struct HttpRecordsOutput { schema: OutputSchema, rows: Vec>, + // total_rows is equal to rows.len() in most cases, + // the Dashboard query result may be truncated, so we need to return the total_rows. + #[serde(default)] + total_rows: usize, // plan level execution metrics #[serde(skip_serializing_if = "HashMap::is_empty")] @@ -224,6 +230,7 @@ impl HttpRecordsOutput { Ok(HttpRecordsOutput { schema: OutputSchema::from(schema), rows: vec![], + total_rows: 0, metrics: Default::default(), }) } else { @@ -244,6 +251,7 @@ impl HttpRecordsOutput { Ok(HttpRecordsOutput { schema: OutputSchema::from(schema), + total_rows: rows.len(), rows, metrics: Default::default(), }) @@ -336,15 +344,16 @@ impl Display for Epoch { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString)] pub enum RequestSource { + #[strum(ascii_case_insensitive)] Dashboard, } impl RequestSource { pub fn parse(s: &str) -> Option { - match s { - "dashboard" => Some(RequestSource::Dashboard), + match RequestSource::from_str(s) { + Ok(v) => Some(v), _ => None, } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 552828ef607b..8d342714b530 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -97,8 +97,7 @@ pub async fn sql( .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); let source = query_params .source - .map(|s| RequestSource::parse(s.to_lowercase().as_str())) - .unwrap_or(None); + .and_then(|s| RequestSource::parse(s.as_str())); let result = if let Some(sql) = &sql { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { @@ -155,7 +154,8 @@ pub async fn from_dashboard(resp: HttpResponse, ctx: QueryContextRef) -> HttpRes continue; }; if records.rows.len() > query_limit { - records.rows.resize(query_limit, vec![]); + records.rows.truncate(query_limit); + records.total_rows = query_limit; } outputs.push(GreptimeQueryOutput::Records(records)); } diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 6f22ea3d8431..1eab725d2a5e 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -112,7 +112,8 @@ async fn test_sql_output_rows() { [ 4950 ] - ] + ], + "total_rows": 1 }"# ); } @@ -253,7 +254,8 @@ async fn test_sql_form() { [ 4950 ] - ] + ], + "total_rows": 1 }"# ); } @@ -427,7 +429,8 @@ def test(n) -> vector[i64]: [ 4 ] - ] + ], + "total_rows": 5 }"# ); } @@ -494,7 +497,8 @@ def test(n, **params) -> vector[i64]: [ 46 ] - ] + ], + "total_rows": 5 }"# ); } From 73da54784e9deb085bfc67a938687619ce94b1fb Mon Sep 17 00:00:00 2001 From: maco Date: Sat, 11 May 2024 20:55:57 +0800 Subject: [PATCH 4/7] fix integration tests error --- tests-integration/tests/http.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 74ce4f6add1d..14c3e8cac265 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -147,7 +147,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]} + "records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]],"total_rows":10} })).unwrap() ); @@ -189,7 +189,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]]} + "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]],"total_rows":1} })).unwrap() ); @@ -207,7 +207,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -224,7 +224,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -241,13 +241,13 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( outputs[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); assert_eq!( outputs[1], serde_json::from_value::(json!({ - "records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}} + "records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}, "total_rows":0} })) .unwrap() ); @@ -276,7 +276,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( outputs[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -302,7 +302,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( outputs[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -673,7 +673,7 @@ def test(n) -> vector[f64]: assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]} + "records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]],"total_rows": 10} })).unwrap() ); From 98b9d63b64cdf8a963ec0be3a5eb52d05835b485 Mon Sep 17 00:00:00 2001 From: maco Date: Sun, 12 May 2024 22:21:03 +0800 Subject: [PATCH 5/7] remove RequestSource::parse --- src/servers/src/http.rs | 10 ---------- src/servers/src/http/handler.rs | 3 ++- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 7fa8509e71c5..17bbb112f108 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::fmt::Display; use std::net::SocketAddr; -use std::str::FromStr; use std::sync::Mutex as StdMutex; use std::time::Duration; @@ -350,15 +349,6 @@ pub enum RequestSource { Dashboard, } -impl RequestSource { - pub fn parse(s: &str) -> Option { - match RequestSource::from_str(s) { - Ok(v) => Some(v), - _ => None, - } - } -} - #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub enum HttpResponse { Arrow(ArrowResponse), diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 8d342714b530..214e077a9acb 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::env; +use std::str::FromStr; use std::time::Instant; use aide::transform::TransformOperation; @@ -97,7 +98,7 @@ pub async fn sql( .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); let source = query_params .source - .and_then(|s| RequestSource::parse(s.as_str())); + .and_then(|s| RequestSource::from_str(s.as_str()).ok()); let result = if let Some(sql) = &sql { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { From 01f81d19f25d26e785b49d87cd7e7a2667a6bdd9 Mon Sep 17 00:00:00 2001 From: maco Date: Mon, 13 May 2024 19:13:24 +0800 Subject: [PATCH 6/7] refactor: sql query params --- src/servers/src/http.rs | 35 +++++++++++++---- src/servers/src/http/csv_result.rs | 6 +++ src/servers/src/http/greptime_result_v1.rs | 6 +++ src/servers/src/http/handler.rs | 43 ++------------------- src/servers/src/http/table_result.rs | 6 +++ src/servers/tests/http/http_handler_test.rs | 9 ++--- src/session/src/context.rs | 10 +---- src/session/src/session_config.rs | 15 ------- 8 files changed, 55 insertions(+), 75 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 17bbb112f108..a06b8c6f4b20 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -41,7 +41,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::{ensure, ResultExt}; -use strum::EnumString; use tokio::sync::oneshot::{self, Sender}; use tokio::sync::Mutex; use tower::timeout::TimeoutLayer; @@ -343,12 +342,6 @@ impl Display for Epoch { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString)] -pub enum RequestSource { - #[strum(ascii_case_insensitive)] - Dashboard, -} - #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub enum HttpResponse { Arrow(ArrowResponse), @@ -370,6 +363,34 @@ impl HttpResponse { HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), } } + + pub fn with_limit(self, limit: usize) -> Self { + match self { + HttpResponse::Csv(resp) => resp.with_limit(limit).into(), + HttpResponse::Table(resp) => resp.with_limit(limit).into(), + HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(), + _ => self, + } + } +} + +pub fn process_with_limit( + mut outputs: Vec, + limit: usize, +) -> Vec { + outputs + .drain(..) + .map(|data| match data { + GreptimeQueryOutput::Records(mut records) => { + if records.rows.len() > limit { + records.rows.truncate(limit); + records.total_rows = limit; + } + GreptimeQueryOutput::Records(records) + } + _ => data, + }) + .collect() } impl IntoResponse for HttpResponse { diff --git a/src/servers/src/http/csv_result.rs b/src/servers/src/http/csv_result.rs index ad89ac21b7e4..d6b512653bde 100644 --- a/src/servers/src/http/csv_result.rs +++ b/src/servers/src/http/csv_result.rs @@ -23,6 +23,7 @@ use mime_guess::mime; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use super::process_with_limit; use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -65,6 +66,11 @@ impl CsvResponse { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } } impl IntoResponse for CsvResponse { diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/greptime_result_v1.rs index ee87a5dca639..9cb2924ba689 100644 --- a/src/servers/src/http/greptime_result_v1.rs +++ b/src/servers/src/http/greptime_result_v1.rs @@ -23,6 +23,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use super::header::GREPTIME_DB_HEADER_METRICS; +use super::process_with_limit; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -62,6 +63,11 @@ impl GreptimedbV1Response { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } } impl IntoResponse for GreptimedbV1Response { diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 214e077a9acb..fa8fe98e4cf1 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::env; -use std::str::FromStr; use std::time::Instant; use aide::transform::TransformOperation; @@ -42,7 +41,7 @@ use crate::http::influxdb_result_v1::InfluxdbV1Response; use crate::http::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, - HttpResponse, RequestSource, ResponseFormat, + HttpResponse, ResponseFormat, }; use crate::metrics_handler::MetricsHandler; use crate::query_handler::sql::ServerSqlQueryHandlerRef; @@ -63,8 +62,7 @@ pub struct SqlQuery { // specified time precision. Maybe greptimedb format can support this // param too. pub epoch: Option, - // request source, ["dashboard"] - pub source: Option, + pub limit: Option, } /// Handler to execute sql @@ -96,9 +94,6 @@ pub async fn sql( .or(form_params.epoch) .map(|s| s.to_lowercase()) .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); - let source = query_params - .source - .and_then(|s| RequestSource::from_str(s.as_str()).ok()); let result = if let Some(sql) = &sql { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { @@ -131,42 +126,12 @@ pub async fn sql( ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, }; - if let Some(RequestSource::Dashboard) = source { - resp = from_dashboard(resp, query_ctx).await; + if let Some(limit) = query_params.limit { + resp = resp.with_limit(limit); } resp.with_execution_time(start.elapsed().as_millis() as u64) } -pub async fn from_dashboard(resp: HttpResponse, ctx: QueryContextRef) -> HttpResponse { - let HttpResponse::GreptimedbV1(response) = resp else { - return resp; - }; - let GreptimedbV1Response { - output, - execution_time_ms, - resp_metrics, - } = response; - let query_limit = ctx.configuration_parameter().dashboard_query_limit(); - - let mut outputs = Vec::with_capacity(output.len()); - for query_result in output { - let GreptimeQueryOutput::Records(mut records) = query_result else { - outputs.push(query_result); - continue; - }; - if records.rows.len() > query_limit { - records.rows.truncate(query_limit); - records.total_rows = query_limit; - } - outputs.push(GreptimeQueryOutput::Records(records)); - } - HttpResponse::GreptimedbV1(GreptimedbV1Response { - output: outputs, - execution_time_ms, - resp_metrics, - }) -} - /// Create a response from query result pub async fn from_output( outputs: Vec>, diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/table_result.rs index a7fac46e89a7..dacef51beace 100644 --- a/src/servers/src/http/table_result.rs +++ b/src/servers/src/http/table_result.rs @@ -24,6 +24,7 @@ use mime_guess::mime; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use super::process_with_limit; use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -66,6 +67,11 @@ impl TableResponse { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } } impl Display for TableResponse { diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 1eab725d2a5e..7bbea044306f 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -189,7 +189,7 @@ async fn test_dashboard_sql_limit() { let query = create_query( "greptimedb_v1", "select * from numbers", - Some("dashboard".to_string()), + Some(1000), ); let json = http_handler::sql( State(api_state.clone()), @@ -201,10 +201,9 @@ async fn test_dashboard_sql_limit() { if let HttpResponse::GreptimedbV1(resp) = json { let output = resp.output().first().unwrap(); - let query_limit = ctx.configuration_parameter().dashboard_query_limit(); match output { Records(records) => { - assert_eq!(records.num_rows(), query_limit); + assert_eq!(records.num_rows(), 1000); } _ => unreachable!(), } @@ -522,11 +521,11 @@ fn create_invalid_script_query() -> Query { }) } -fn create_query(format: &str, sql: &str, source: Option) -> Query { +fn create_query(format: &str, sql: &str, limit: Option) -> Query { Query(http_handler::SqlQuery { sql: Some(sql.to_string()), format: Some(format.to_string()), - source, + limit, ..Default::default() }) } diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 35bf2614f435..592b56b2fb3a 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -27,9 +27,7 @@ use common_time::Timezone; use derive_builder::Builder; use sql::dialect::{Dialect, GreptimeDbDialect, MySqlDialect, PostgreSqlDialect}; -use crate::session_config::{ - DashboardQueryLimit, PGByteaOutputValue, PGDateOrder, PGDateTimeStyle, -}; +use crate::session_config::{PGByteaOutputValue, PGDateOrder, PGDateTimeStyle}; use crate::SessionRef; pub type QueryContextRef = Arc; @@ -329,7 +327,6 @@ impl Display for Channel { pub struct ConfigurationVariables { postgres_bytea_output: ArcSwap, pg_datestyle_format: ArcSwap<(PGDateTimeStyle, PGDateOrder)>, - dashboard_query_limit: ArcSwap, } impl Clone for ConfigurationVariables { @@ -337,7 +334,6 @@ impl Clone for ConfigurationVariables { Self { postgres_bytea_output: ArcSwap::new(self.postgres_bytea_output.load().clone()), pg_datestyle_format: ArcSwap::new(self.pg_datestyle_format.load().clone()), - dashboard_query_limit: ArcSwap::new(self.dashboard_query_limit.load().clone()), } } } @@ -362,10 +358,6 @@ impl ConfigurationVariables { pub fn set_pg_datetime_style(&self, style: PGDateTimeStyle, order: PGDateOrder) { self.pg_datestyle_format.swap(Arc::new((style, order))); } - - pub fn dashboard_query_limit(&self) -> usize { - self.dashboard_query_limit.load().get() - } } #[cfg(test)] diff --git a/src/session/src/session_config.rs b/src/session/src/session_config.rs index b8596ee265a9..8b93ce2a2cc9 100644 --- a/src/session/src/session_config.rs +++ b/src/session/src/session_config.rs @@ -176,18 +176,3 @@ impl TryFrom<&Value> for PGDateTimeStyle { } } } - -#[derive(Copy, Clone, Debug)] -pub struct DashboardQueryLimit(usize); - -impl Default for DashboardQueryLimit { - fn default() -> Self { - Self(1000) - } -} - -impl DashboardQueryLimit { - pub fn get(&self) -> usize { - self.0 - } -} From cda031a3e45ae45ef8eca7a5752159ad1b16817d Mon Sep 17 00:00:00 2001 From: maco Date: Mon, 13 May 2024 19:51:16 +0800 Subject: [PATCH 7/7] fix: unit test --- src/servers/tests/http/http_handler_test.rs | 46 +++++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 7bbea044306f..cb4a5e8ada9f 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -186,25 +186,35 @@ async fn test_dashboard_sql_limit() { sql_handler, script_handler: None, }; - let query = create_query( - "greptimedb_v1", - "select * from numbers", - Some(1000), - ); - let json = http_handler::sql( - State(api_state.clone()), - query, - axum::Extension(ctx.clone()), - Form(http_handler::SqlQuery::default()), - ) - .await; + for format in ["greptimedb_v1", "csv", "table"] { + let query = create_query(format, "select * from numbers", Some(1000)); + let sql_response = http_handler::sql( + State(api_state.clone()), + query, + axum::Extension(ctx.clone()), + Form(http_handler::SqlQuery::default()), + ) + .await; - if let HttpResponse::GreptimedbV1(resp) = json { - let output = resp.output().first().unwrap(); - match output { - Records(records) => { - assert_eq!(records.num_rows(), 1000); - } + match sql_response { + HttpResponse::GreptimedbV1(resp) => match resp.output().first().unwrap() { + Records(records) => { + assert_eq!(records.num_rows(), 1000); + } + _ => unreachable!(), + }, + HttpResponse::Csv(resp) => match resp.output().first().unwrap() { + Records(records) => { + assert_eq!(records.num_rows(), 1000); + } + _ => unreachable!(), + }, + HttpResponse::Table(resp) => match resp.output().first().unwrap() { + Records(records) => { + assert_eq!(records.num_rows(), 1000); + } + _ => unreachable!(), + }, _ => unreachable!(), } }