diff --git a/Cargo.lock b/Cargo.lock index 6a209e795c8b..e8bfcdf74e82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1900,6 +1900,7 @@ dependencies = [ "common-telemetry", "common-time", "common-wal", + "datafusion-expr", "datatypes", "derive_builder 0.12.0", "etcd-client", diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 3c5f2a68fb76..afe6c462e603 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -34,6 +34,7 @@ prost.workspace = true rand.workspace = true serde_json.workspace = true snafu.workspace = true +substrait.workspace = true tokio.workspace = true tokio-stream = { workspace = true, features = ["net"] } tonic.workspace = true @@ -42,7 +43,6 @@ tonic.workspace = true common-grpc-expr.workspace = true datanode.workspace = true derive-new = "0.5" -substrait.workspace = true tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index a401fa434803..9c746be3c2f9 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::RegionRequest; use api::v1::ResponseHeader; use arc_swap::ArcSwapOption; use arrow_flight::Ticket; @@ -24,7 +24,7 @@ use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_grpc::flight::{FlightDecoder, FlightMessage}; -use common_meta::datanode_manager::Datanode; +use common_meta::datanode_manager::{Datanode, QueryRequest}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; @@ -32,6 +32,7 @@ use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use tokio_stream::StreamExt; use crate::error::{ @@ -63,6 +64,17 @@ impl Datanode for RegionRequester { } async fn handle_query(&self, request: QueryRequest) -> MetaResult { + let plan = DFLogicalSubstraitConvertor + .encode(&request.plan) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)? + .to_vec(); + let request = api::v1::region::QueryRequest { + header: request.header, + region_id: request.region_id.as_u64(), + plan, + }; + let ticket = Ticket { ticket: request.encode_to_vec().into(), }; diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 9132f6de559b..ca950dfda985 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -28,6 +28,7 @@ common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +datafusion-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 58990ce01dec..4c281a5a0180 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -15,13 +15,27 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::{RegionRequest, RegionRequestHeader}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; +use datafusion_expr::LogicalPlan; +use store_api::storage::RegionId; use crate::error::Result; use crate::peer::Peer; +/// The query request to be handled by the RegionServer(Datanode). +pub struct QueryRequest { + /// The header of this request. Often to store some context of the query. None means all to defaults. + pub header: Option, + + /// The id of the region to be queried. + pub region_id: RegionId, + + /// The form of the query: a logical plan. + pub plan: LogicalPlan, +} + /// The trait for handling requests to datanode. #[async_trait::async_trait] pub trait Datanode: Send + Sync { diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index e3bcf0c004d6..a2727e0211df 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -123,11 +123,12 @@ mod tests { use std::sync::Arc; use api::region::RegionResponse; - use api::v1::region::{QueryRequest, RegionRequest}; + use api::v1::region::RegionRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; + use crate::datanode_manager::QueryRequest; use crate::ddl::drop_database::cursor::DropDatabaseCursor; use crate::ddl::drop_database::executor::DropDatabaseExecutor; use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index 1649ebc00d47..7a7c77ad93ca 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::RegionRequest; use common_error::ext::{BoxedError, ErrorExt, StackError}; use common_error::status_code::StatusCode; use common_recordbatch::SendableRecordBatchStream; @@ -21,6 +21,7 @@ use common_telemetry::debug; use snafu::{ResultExt, Snafu}; use tokio::sync::mpsc; +use crate::datanode_manager::QueryRequest; use crate::error::{self, Error, Result}; use crate::peer::Peer; use crate::test_util::MockDatanodeHandler; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 4ed396509ada..3a91a6c6d67a 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -15,12 +15,14 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::RegionRequest; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; -use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef}; +use crate::datanode_manager::{ + Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef, QueryRequest, +}; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::DdlContext; use crate::error::Result; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index d7873812cf5d..792eeb3a0f3a 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -288,6 +288,13 @@ pub enum Error { source: mito2::error::Error, location: Location, }, + + #[snafu(display("DataFusion"))] + DataFusion { + #[snafu(source)] + error: datafusion::error::DataFusionError, + location: Location, + }, } pub type Result = std::result::Result; @@ -338,7 +345,8 @@ impl ErrorExt for Error { | ShutdownInstance { .. } | RegionEngineNotFound { .. } | UnsupportedOutput { .. } - | GetRegionMetadata { .. } => StatusCode::Internal, + | GetRegionMetadata { .. } + | DataFusion { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 1af0fda462cf..505d1b422a87 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -19,13 +19,14 @@ use std::ops::Deref; use std::sync::{Arc, Mutex, RwLock}; use api::region::RegionResponse; -use api::v1::region::{region_request, QueryRequest, RegionResponse as RegionResponseV1}; +use api::v1::region::{region_request, RegionResponse as RegionResponseV1}; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; use async_trait::async_trait; use bytes::Bytes; use common_error::ext::BoxedError; use common_error::status_code::StatusCode; +use common_meta::datanode_manager::QueryRequest; use common_query::logical_plan::Expr; use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::{DfPhysicalPlan, OutputData}; @@ -37,11 +38,14 @@ use common_telemetry::{info, warn}; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; -use datafusion::datasource::TableProvider; +use datafusion::datasource::{provider_as_source, TableProvider}; use datafusion::error::Result as DfResult; use datafusion::execution::context::SessionState; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::DataFusionError; -use datafusion_expr::{Expr as DfExpr, TableProviderFilterPushDown, TableType}; +use datafusion_expr::{ + Expr as DfExpr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType, +}; use datatypes::arrow::datatypes::SchemaRef; use futures_util::future::try_join_all; use metric_engine::engine::MetricEngine; @@ -51,7 +55,7 @@ use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as S use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; use session::context::{QueryContextBuilder, QueryContextRef}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; @@ -62,10 +66,10 @@ use table::table::scan::StreamScanAdapter; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, - FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, - RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, - UnsupportedOutputSnafu, + self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, + ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu, + HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, + Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -133,9 +137,97 @@ impl RegionServer { self.inner.handle_request(region_id, request).await } + async fn table_provider(&self, region_id: RegionId) -> Result> { + let status = self + .inner + .region_map + .get(®ion_id) + .context(RegionNotFoundSnafu { region_id })? + .clone(); + ensure!( + matches!(status, RegionEngineWithStatus::Ready(_)), + RegionNotReadySnafu { region_id } + ); + + self.inner + .table_provider_factory + .create(region_id, status.into_engine()) + .await + } + + /// Handle reads from remote. They're often query requests received by our Arrow Flight service. + pub async fn handle_remote_read( + &self, + request: api::v1::region::QueryRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let provider = self.table_provider(region_id).await?; + let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider)); + + let query_ctx: QueryContextRef = request + .header + .as_ref() + .map(|h| Arc::new(h.into())) + .unwrap_or_else(|| QueryContextBuilder::default().build()); + let state = self + .inner + .query_engine + .engine_context(query_ctx.clone()) + .state() + .clone(); + + let plan = DFLogicalSubstraitConvertor + .decode( + Bytes::from(request.plan), + catalog_list, + state, + query_ctx.clone(), + ) + .await + .context(DecodeLogicalPlanSnafu)?; + + self.inner + .handle_read(QueryRequest { + header: request.header, + region_id, + plan, + }) + .await + } + #[tracing::instrument(skip_all)] pub async fn handle_read(&self, request: QueryRequest) -> Result { - self.inner.handle_read(request).await + let provider = self.table_provider(request.region_id).await?; + + struct RegionDataSourceInjector { + source: Arc, + } + + impl TreeNodeRewriter for RegionDataSourceInjector { + type Node = LogicalPlan; + + fn f_up(&mut self, node: Self::Node) -> DfResult> { + Ok(match node { + LogicalPlan::TableScan(mut scan) => { + scan.source = self.source.clone(); + Transformed::yes(LogicalPlan::TableScan(scan)) + } + _ => Transformed::no(node), + }) + } + } + + let plan = request + .plan + .rewrite(&mut RegionDataSourceInjector { + source: provider_as_source(provider), + }) + .context(DataFusionSnafu)? + .data; + + self.inner + .handle_read(QueryRequest { plan, ..request }) + .await } /// Returns all opened and reportable regions. @@ -296,7 +388,7 @@ impl FlightCraft for RegionServer { request: Request, ) -> TonicResult>> { let ticket = request.into_inner().ticket; - let request = QueryRequest::decode(ticket.as_ref()) + let request = api::v1::region::QueryRequest::decode(ticket.as_ref()) .context(servers_error::InvalidFlightTicketSnafu)?; let tracing_context = request .header @@ -305,7 +397,7 @@ impl FlightCraft for RegionServer { .unwrap_or_default(); let result = self - .handle_read(request) + .handle_remote_read(request) .trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) .await?; @@ -333,10 +425,6 @@ impl RegionEngineWithStatus { RegionEngineWithStatus::Ready(engine) => engine, } } - - pub fn is_registering(&self) -> bool { - matches!(self, Self::Registering(_)) - } } impl Deref for RegionEngineWithStatus { @@ -613,51 +701,16 @@ impl RegionServerInner { pub async fn handle_read(&self, request: QueryRequest) -> Result { // TODO(ruihang): add metrics and set trace id - let QueryRequest { - header, - region_id, - plan, - } = request; - let region_id = RegionId::from_u64(region_id); - // Build query context from gRPC header - let ctx: QueryContextRef = header + let query_ctx: QueryContextRef = request + .header .as_ref() .map(|h| Arc::new(h.into())) .unwrap_or_else(|| QueryContextBuilder::default().build()); - // build dummy catalog list - let region_status = self - .region_map - .get(®ion_id) - .with_context(|| RegionNotFoundSnafu { region_id })? - .clone(); - - if region_status.is_registering() { - return error::RegionNotReadySnafu { region_id }.fail(); - } - - let table_provider = self - .table_provider_factory - .create(region_id, region_status.into_engine()) - .await?; - - let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); - let query_engine_ctx = self.query_engine.engine_context(ctx.clone()); - // decode substrait plan to logical plan and execute it - let logical_plan = DFLogicalSubstraitConvertor - .decode( - Bytes::from(plan), - catalog_list, - query_engine_ctx.state().clone(), - ctx.clone(), - ) - .await - .context(DecodeLogicalPlanSnafu)?; - let result = self .query_engine - .execute(logical_plan.into(), ctx) + .execute(request.plan.into(), query_ctx) .await .context(ExecuteLogicalPlanSnafu)?; diff --git a/src/frontend/src/instance/region_query.rs b/src/frontend/src/instance/region_query.rs index 844b9a7735ab..b2361175faca 100644 --- a/src/frontend/src/instance/region_query.rs +++ b/src/frontend/src/instance/region_query.rs @@ -14,16 +14,14 @@ use std::sync::Arc; -use api::v1::region::QueryRequest; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::{DatanodeManagerRef, QueryRequest}; use common_recordbatch::SendableRecordBatchStream; use partition::manager::PartitionRuleManagerRef; use query::error::{RegionQuerySnafu, Result as QueryResult}; use query::region_query::RegionQueryHandler; use snafu::ResultExt; -use store_api::storage::RegionId; use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result}; @@ -56,7 +54,7 @@ impl RegionQueryHandler for FrontendRegionQueryHandler { impl FrontendRegionQueryHandler { async fn do_get_inner(&self, request: QueryRequest) -> Result { - let region_id = RegionId::from_u64(request.region_id); + let region_id = request.region_id; let peer = &self .partition_manager diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 4001c4e59d61..6ee2840f10d2 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,11 +15,11 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionResponseV1}; +use api::v1::region::{RegionRequest, RegionResponse as RegionResponseV1}; use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; -use common_meta::datanode_manager::{Datanode, DatanodeManager, DatanodeRef}; +use common_meta::datanode_manager::{Datanode, DatanodeManager, DatanodeRef, QueryRequest}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; use common_recordbatch::SendableRecordBatchStream; diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 004d32e4a295..75d15ae2c221 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; -use common_catalog::consts::default_engine; use common_meta::RegionIdent; use crate::error::Result; @@ -91,8 +90,7 @@ impl HeartbeatHandler for RegionFailureHandler { datanode_id: stat.id, table_id: region_id.table_id(), region_number: region_id.region_number(), - // TODO(LFC): Use the actual table engine (maybe retrieve from heartbeat). - engine: default_engine().to_string(), + engine: x.engine.clone(), } }) .collect(), @@ -109,6 +107,7 @@ impl HeartbeatHandler for RegionFailureHandler { mod tests { use std::assert_matches::assert_matches; + use common_catalog::consts::default_engine; use common_meta::key::MAINTENANCE_KEY; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 1f294a836bff..57941c915d4e 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -18,9 +18,9 @@ use std::time::Duration; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; -use common_base::bytes::Bytes; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::BoxedError; +use common_meta::datanode_manager::QueryRequest; use common_meta::table_name::TableName; use common_plugins::GREPTIME_EXEC_READ_COST; use common_query::physical_plan::TaskContext; @@ -41,7 +41,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::EquivalenceProperties; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; -use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; +use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; @@ -121,7 +121,7 @@ impl MergeScanLogicalPlan { pub struct MergeScanExec { table: TableName, regions: Vec, - substrait_plan: Bytes, + plan: LogicalPlan, schema: SchemaRef, arrow_schema: ArrowSchemaRef, region_query_handler: RegionQueryHandlerRef, @@ -144,7 +144,7 @@ impl MergeScanExec { pub fn new( table: TableName, regions: Vec, - substrait_plan: Bytes, + plan: LogicalPlan, arrow_schema: &ArrowSchema, region_query_handler: RegionQueryHandlerRef, query_ctx: QueryContextRef, @@ -160,7 +160,7 @@ impl MergeScanExec { Ok(Self { table, regions, - substrait_plan, + plan, schema: schema_without_metadata, arrow_schema: arrow_schema_without_metadata, region_query_handler, @@ -171,7 +171,6 @@ impl MergeScanExec { } pub fn to_stream(&self, context: Arc) -> Result { - let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); @@ -181,6 +180,7 @@ impl MergeScanExec { let tracing_context = TracingContext::from_json(context.session_id().as_str()); let tz = self.query_ctx.timezone().to_string(); + let plan = self.plan.clone(); let stream = Box::pin(stream!({ MERGE_SCAN_REGIONS.observe(regions.len() as f64); let _finish_timer = metric.finish_time().timer(); @@ -194,8 +194,8 @@ impl MergeScanExec { dbname: dbname.clone(), timezone: tz.clone(), }), - region_id: region_id.into(), - plan: substrait_plan.clone(), + region_id, + plan: plan.clone(), }; let mut stream = region_query_handler .do_get(request) diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 1d29fe7aba29..786eb4effcd3 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -25,19 +25,17 @@ use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::TableReference; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion_optimizer::analyzer::Analyzer; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; -use crate::error; use crate::error::{CatalogSnafu, TableNotFoundSnafu}; use crate::region_query::RegionQueryHandlerRef; @@ -99,12 +97,6 @@ impl ExtensionPlanner for DistExtensionPlanner { // TODO(ruihang): generate different execution plans for different variant merge operation let schema = optimized_plan.schema().as_ref().into(); // Pass down the original plan, allow execution nodes to do their optimization - let amended_plan = Self::plan_with_full_table_name(input_plan.clone(), &table_name)?; - let substrait_plan = DFLogicalSubstraitConvertor - .encode(&amended_plan) - .context(error::EncodeSubstraitLogicalPlanSnafu)? - .into(); - let query_ctx = session_state .config() .get_extension() @@ -112,7 +104,7 @@ impl ExtensionPlanner for DistExtensionPlanner { let merge_scan_plan = MergeScanExec::new( table_name, regions, - substrait_plan, + input_plan.clone(), &schema, self.region_query_handler.clone(), query_ctx, @@ -129,12 +121,6 @@ impl DistExtensionPlanner { Ok(extractor.table_name) } - /// Apply the fully resolved table name to the TableScan plan - fn plan_with_full_table_name(plan: LogicalPlan, name: &TableName) -> Result { - plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name)) - .map(|x| x.data) - } - async fn get_regions(&self, table_name: &TableName) -> Result> { let table = self .catalog_manager @@ -229,24 +215,3 @@ impl TreeNodeVisitor for TableNameExtractor { } } } - -struct TableNameRewriter; - -impl TableNameRewriter { - fn rewrite_table_name( - plan: LogicalPlan, - name: &TableName, - ) -> datafusion_common::Result> { - Ok(match plan { - LogicalPlan::TableScan(mut table_scan) => { - table_scan.table_name = TableReference::full( - name.catalog_name.clone(), - name.schema_name.clone(), - name.table_name.clone(), - ); - Transformed::yes(LogicalPlan::TableScan(table_scan)) - } - _ => Transformed::no(plan), - }) - } -} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index f8fcb13abf09..b5538ba246e3 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -121,12 +121,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to encode Substrait logical plan"))] - EncodeSubstraitLogicalPlan { - source: substrait::error::Error, - location: Location, - }, - #[snafu(display("General SQL error"))] Sql { location: Location, @@ -273,7 +267,6 @@ impl ErrorExt for Error { | ColumnSchemaNoDefault { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, - EncodeSubstraitLogicalPlan { source, .. } => source.status_code(), ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(), diff --git a/src/query/src/region_query.rs b/src/query/src/region_query.rs index f9861103e62b..fe5fb0d7d8ec 100644 --- a/src/query/src/region_query.rs +++ b/src/query/src/region_query.rs @@ -14,8 +14,8 @@ use std::sync::Arc; -use api::v1::region::QueryRequest; use async_trait::async_trait; +use common_meta::datanode_manager::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 2df29580984b..6f24001ac436 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -75,7 +75,8 @@ impl ColumnMetadata { column_def.datatype_extension.clone(), ) .into(); - ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) + ColumnSchema::new(&column_def.name, data_type, column_def.is_nullable) + .with_time_index(column_def.semantic_type() == SemanticType::Timestamp) .with_default_constraint(default_constrain) .context(ConvertDatatypesSnafu) } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 6e91b37fd84f..08d79e1960e7 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -647,8 +647,6 @@ pub struct TableInfo { /// Id and version of the table. #[builder(default, setter(into))] pub ident: TableIdent, - - // TODO(LFC): Remove the catalog, schema and table names from TableInfo. /// Name of the table. #[builder(setter(into))] pub name: String, diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 6d7179b18da1..d379ea4daa44 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -552,7 +552,7 @@ CREATE TABLE {table_name} ( let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_read(RegionQueryRequest { + .handle_remote_read(RegionQueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), ..Default::default() diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 1e52162ef3ca..a9380c4d06fb 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -246,7 +246,7 @@ mod tests { let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_read(QueryRequest { + .handle_remote_read(QueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), ..Default::default()