Skip to content

Commit

Permalink
refactor: remove substrait serde for region query in standalone
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Apr 26, 2024
1 parent 934c7e3 commit ee45357
Show file tree
Hide file tree
Showing 21 changed files with 176 additions and 129 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ prost.workspace = true
rand.workspace = true
serde_json.workspace = true
snafu.workspace = true
substrait.workspace = true
tokio.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
tonic.workspace = true
Expand All @@ -42,7 +43,6 @@ tonic.workspace = true
common-grpc-expr.workspace = true
datanode.workspace = true
derive-new = "0.5"
substrait.workspace = true
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand Down
16 changes: 14 additions & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
Expand All @@ -24,14 +24,15 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::datanode_manager::Datanode;
use common_meta::datanode_manager::{Datanode, QueryRequest};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use tokio_stream::StreamExt;

use crate::error::{
Expand Down Expand Up @@ -63,6 +64,17 @@ impl Datanode for RegionRequester {
}

async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let plan = DFLogicalSubstraitConvertor
.encode(&request.plan)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?
.to_vec();
let request = api::v1::region::QueryRequest {
header: request.header,
region_id: request.region_id.as_u64(),
plan,
};

let ticket = Ticket {
ticket: request.encode_to_vec().into(),
};
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
Expand Down
16 changes: 15 additions & 1 deletion src/common/meta/src/datanode_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,27 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::{RegionRequest, RegionRequestHeader};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use datafusion_expr::LogicalPlan;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::peer::Peer;

/// The query request to be handled by the RegionServer(Datanode).
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,

/// The id of the region to be queried.
pub region_id: RegionId,

/// The form of the query: a logical plan.
pub plan: LogicalPlan,
}

/// The trait for handling requests to datanode.
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ mod tests {
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;

use crate::datanode_manager::QueryRequest;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/test_util/datanode_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_error::ext::{BoxedError, ErrorExt, StackError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use snafu::{ResultExt, Snafu};
use tokio::sync::mpsc;

use crate::datanode_manager::QueryRequest;
use crate::error::{self, Error, Result};
use crate::peer::Peer;
use crate::test_util::MockDatanodeHandler;
Expand Down
6 changes: 4 additions & 2 deletions src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef};
use crate::datanode_manager::{
Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef, QueryRequest,
};
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::DdlContext;
use crate::error::Result;
Expand Down
10 changes: 9 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ pub enum Error {
source: mito2::error::Error,
location: Location,
},

#[snafu(display("DataFusion"))]
DataFusion {
#[snafu(source)]
error: datafusion::error::DataFusionError,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -338,7 +345,8 @@ impl ErrorExt for Error {
| ShutdownInstance { .. }
| RegionEngineNotFound { .. }
| UnsupportedOutput { .. }
| GetRegionMetadata { .. } => StatusCode::Internal,
| GetRegionMetadata { .. }
| DataFusion { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
Expand Down
Loading

0 comments on commit ee45357

Please sign in to comment.