diff --git a/packages/cubejs-backend-native/src/stream.rs b/packages/cubejs-backend-native/src/stream.rs index c05807e1a992d..9c7c45f2dcbbb 100644 --- a/packages/cubejs-backend-native/src/stream.rs +++ b/packages/cubejs-backend-native/src/stream.rs @@ -1,6 +1,7 @@ use cubesql::compile::engine::df::scan::{ transform_response, FieldValue, MemberField, RecordBatch, SchemaRef, ValueObject, }; +use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; @@ -211,7 +212,7 @@ impl ValueObject for JsValueObject<'_> { CubeError::user(format!("Can't get '{}' field value: {}", field_name, e)) })?; if let Ok(s) = value.downcast::(&mut self.cx) { - Ok(FieldValue::String(s.value(&mut self.cx))) + Ok(FieldValue::String(Cow::Owned(s.value(&mut self.cx)))) } else if let Ok(n) = value.downcast::(&mut self.cx) { Ok(FieldValue::Number(n.value(&mut self.cx))) } else if let Ok(b) = value.downcast::(&mut self.cx) { diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index 73e3baab72d14..27dd997793e27 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -1,10 +1,3 @@ -use std::{ - any::Any, - fmt, - sync::Arc, - task::{Context, Poll}, -}; - use async_trait::async_trait; use cubeclient::models::{V1LoadRequestQuery, V1LoadResult, V1LoadResultAnnotation}; pub use datafusion::{ @@ -27,6 +20,13 @@ pub use datafusion::{ }; use futures::Stream; use log::warn; +use std::{ + any::Any, + borrow::Cow, + fmt, + sync::Arc, + task::{Context, Poll}, +}; use crate::{ compile::{ @@ -446,8 +446,12 @@ struct CubeScanExecutionPlan { } #[derive(Debug)] -pub enum FieldValue { - String(String), +pub enum FieldValue<'a> { + // Why Cow? + // We use N-API via Neon (only for streaming), which doesn't allow us to build string reference, + // because V8 uses UTF-16 It allocates/converts a new strings while doing JsString.value() + // @see v8 WriteUtf8 for more details. Cow::Owned is used for this variant + String(Cow<'a, str>), Number(f64), Bool(bool), Null, @@ -475,31 +479,26 @@ impl ValueObject for JsonValueObject { Ok(self.rows.len()) } - fn get<'a>( - &'a mut self, + fn get( + &mut self, index: usize, field_name: &str, ) -> std::result::Result { - let option = self.rows[index].as_object_mut(); - let as_object = if let Some(as_object) = option { - as_object - } else { + let Some(as_object) = self.rows[index].as_object() else { return Err(CubeError::user(format!( "Unexpected response from Cube, row is not an object: {:?}", self.rows[index] ))); }; - let value = as_object - .get(field_name) - .unwrap_or(&Value::Null) - // TODO expose strings as references to avoid clonning - .clone(); + + let value = as_object.get(field_name).unwrap_or(&Value::Null); + Ok(match value { - Value::String(s) => FieldValue::String(s), + Value::String(s) => FieldValue::String(Cow::Borrowed(s)), Value::Number(n) => FieldValue::Number(n.as_f64().ok_or( DataFusionError::Execution(format!("Can't convert {:?} to float", n)), )?), - Value::Bool(b) => FieldValue::Bool(b), + Value::Bool(b) => FieldValue::Bool(*b), Value::Null => FieldValue::Null, x => { return Err(CubeError::user(format!( @@ -851,12 +850,15 @@ async fn load_data( .map(|v| v.iter().filter(|d| d.granularity.is_some()).count()) .unwrap_or(0) == 0; + let result = if no_members_query { let limit = request.limit.unwrap_or(1); let mut data = Vec::new(); + for _ in 0..limit { data.push(serde_json::Value::Null) } + V1LoadResult::new( V1LoadResultAnnotation { measures: json!(Vec::::new()), @@ -881,9 +883,9 @@ async fn load_data( data } else { - return Err(ArrowError::ComputeError(format!( - "Unable to extract result from Cube.js response", - ))); + return Err(ArrowError::ComputeError( + "Unable to extract results from response: results is empty".to_string(), + )); } }; @@ -1011,7 +1013,7 @@ pub fn transform_response( field_name, { (FieldValue::Number(number), builder) => builder.append_value(number.round() as i64)?, - (FieldValue::String(s), builder) => match s.parse::() { + (FieldValue::String(s), builder) => match s.parse::() { Ok(v) => builder.append_value(v)?, Err(error) => { warn!( @@ -1086,7 +1088,7 @@ pub fn transform_response( field_name, { (FieldValue::Bool(v), builder) => builder.append_value(v)?, - (FieldValue::String(v), builder) => match v.as_str() { + (FieldValue::String(v), builder) => match v.as_ref() { "true" | "1" => builder.append_value(true)?, "false" | "0" => builder.append_value(false)?, _ => { @@ -1109,12 +1111,12 @@ pub fn transform_response( field_name, { (FieldValue::String(s), builder) => { - let timestamp = NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.f") - .or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%d %H:%M:%S%.f")) - .or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S")) - .or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.fZ")) + let timestamp = NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.f") + .or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%d %H:%M:%S%.f")) + .or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S")) + .or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.fZ")) .or_else(|_| { - NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%d").map(|date| { + NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%d").map(|date| { date.and_hms_opt(0, 0, 0).unwrap() }) }) @@ -1145,12 +1147,12 @@ pub fn transform_response( field_name, { (FieldValue::String(s), builder) => { - let timestamp = NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.f") - .or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%d %H:%M:%S%.f")) - .or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S")) - .or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.fZ")) + let timestamp = NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.f") + .or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%d %H:%M:%S%.f")) + .or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S")) + .or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.fZ")) .or_else(|_| { - NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%d").map(|date| { + NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%d").map(|date| { date.and_hms_opt(0, 0, 0).unwrap() }) }) @@ -1181,10 +1183,10 @@ pub fn transform_response( field_name, { (FieldValue::String(s), builder) => { - let date = NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%d") + let date = NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%d") // FIXME: temporary solution for cases when expected type is Date32 // but underlying data is a Timestamp - .or_else(|_| NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%dT00:00:00.000")) + .or_else(|_| NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%dT00:00:00.000")) .map_err(|e| { DataFusionError::Execution(format!( "Can't parse date: '{}': {}", @@ -1395,11 +1397,11 @@ mod tests { "timeDimensions": [] }, "data": [ - {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderDate": null}, - {"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderDate": "2022-01-01 00:00:00.000"}, - {"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderDate": "2023-01-01 00:00:00.000"}, - {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000"}, - {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null} + {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": "City 1"}, + {"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderDate": "2022-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 2"}, + {"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderDate": "2023-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 3"}, + {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 4"}, + {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": null} ] } "#; @@ -1456,6 +1458,8 @@ mod tests { #[tokio::test] async fn test_df_cube_scan_execute() { + assert_eq!(std::mem::size_of::(), 24); + let schema = Arc::new(Schema::new(vec![ Field::new("KibanaSampleDataEcommerce.count", DataType::Utf8, false), Field::new("KibanaSampleDataEcommerce.count", DataType::Utf8, false), @@ -1475,6 +1479,7 @@ mod tests { DataType::Boolean, false, ), + Field::new("KibanaSampleDataEcommerce.city", DataType::Utf8, false), ])); let scan_node = CubeScanExecutionPlan { @@ -1498,6 +1503,7 @@ mod tests { dimensions: Some(vec![ "KibanaSampleDataEcommerce.isBool".to_string(), "KibanaSampleDataEcommerce.orderDate".to_string(), + "KibanaSampleDataEcommerce.city".to_string(), ]), segments: None, time_dimensions: None, @@ -1577,6 +1583,13 @@ mod tests { Some(false) ])) as ArrayRef, Arc::new(BooleanArray::from(vec![None, None, None, None, None,])) as ArrayRef, + Arc::new(StringArray::from(vec![ + Some("City 1"), + Some("City 2"), + Some("City 3"), + Some("City 4"), + None + ])) as ArrayRef, ], ) .unwrap()