diff --git a/src/cache/src/lib.rs b/src/cache/src/lib.rs index d8a4785fa965..85dc9c05f1f3 100644 --- a/src/cache/src/lib.rs +++ b/src/cache/src/lib.rs @@ -19,9 +19,8 @@ use std::time::Duration; use catalog::kvbackend::new_table_cache; use common_meta::cache::{ - new_composite_table_route_cache, new_table_flownode_set_cache, new_table_info_cache, - new_table_name_cache, new_table_route_cache, CacheRegistry, CacheRegistryBuilder, - LayeredCacheRegistryBuilder, + new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, + new_table_route_cache, CacheRegistry, CacheRegistryBuilder, LayeredCacheRegistryBuilder, }; use common_meta::kv_backend::KvBackendRef; use moka::future::CacheBuilder; @@ -38,7 +37,6 @@ pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache"; pub const TABLE_CACHE_NAME: &str = "table_cache"; pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache"; pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache"; -pub const COMPOSITE_TABLE_ROUTE_CACHE: &str = "composite_table_route_cache"; pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry { // Builds table info cache @@ -103,9 +101,6 @@ pub fn with_default_composite_cache_registry( let table_name_cache = builder.get().context(error::CacheRequiredSnafu { name: TABLE_NAME_CACHE_NAME, })?; - let table_route_cache = builder.get().context(error::CacheRequiredSnafu { - name: TABLE_ROUTE_CACHE_NAME, - })?; // Builds table cache let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) @@ -119,20 +114,8 @@ pub fn with_default_composite_cache_registry( table_name_cache, )); - // Builds composite table route cache - let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) - .time_to_live(DEFAULT_CACHE_TTL) - .time_to_idle(DEFAULT_CACHE_TTI) - .build(); - let composite_table_route_cache = Arc::new(new_composite_table_route_cache( - COMPOSITE_TABLE_ROUTE_CACHE.to_string(), - cache, - table_route_cache, - )); - let registry = CacheRegistryBuilder::default() .add_cache(table_cache) - .add_cache(composite_table_route_cache) .build(); Ok(builder.add_cache_registry(registry)) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 93b4ee617f0e..e7a4ef4be39c 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -22,7 +22,7 @@ use common_catalog::consts::{ }; use common_config::Mode; use common_error::ext::BoxedError; -use common_meta::cache::CompositeTableRouteCacheRef; +use common_meta::cache::TableRouteCacheRef; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoValue; @@ -72,14 +72,14 @@ impl KvBackendCatalogManager { meta_client: Option>, backend: KvBackendRef, table_cache: TableCacheRef, - composite_table_route_cache: CompositeTableRouteCacheRef, + table_route_cache: TableRouteCacheRef, ) -> Arc { Arc::new_cyclic(|me| Self { mode, meta_client, partition_manager: Arc::new(PartitionRuleManager::new( backend.clone(), - composite_table_route_cache, + table_route_cache, )), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), system_catalog: SystemCatalog { diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 71353179b86c..0a07cfe49c8b 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -17,8 +17,8 @@ use std::time::Duration; use async_trait::async_trait; use cache::{ - build_fundamental_cache_registry, with_default_composite_cache_registry, - COMPOSITE_TABLE_ROUTE_CACHE, TABLE_CACHE_NAME, + build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, + TABLE_ROUTE_CACHE_NAME, }; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; @@ -268,18 +268,18 @@ impl StartCommand { .context(error::CacheRequiredSnafu { name: TABLE_CACHE_NAME, })?; - let composite_table_route_cache = + let table_route_cache = layered_cache_registry .get() .context(error::CacheRequiredSnafu { - name: COMPOSITE_TABLE_ROUTE_CACHE, + name: TABLE_ROUTE_CACHE_NAME, })?; let catalog_manager = KvBackendCatalogManager::new( opts.mode, Some(meta_client.clone()), cached_meta_backend.clone(), table_cache, - composite_table_route_cache, + table_route_cache, ) .await; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d501dafc8738..b2f8ceae3d74 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -17,8 +17,8 @@ use std::{fs, path}; use async_trait::async_trait; use cache::{ - build_fundamental_cache_registry, with_default_composite_cache_registry, - COMPOSITE_TABLE_ROUTE_CACHE, TABLE_CACHE_NAME, + build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, + TABLE_ROUTE_CACHE_NAME, }; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; @@ -399,16 +399,15 @@ impl StartCommand { let table_cache = layered_cache_registry.get().context(CacheRequiredSnafu { name: TABLE_CACHE_NAME, })?; - let composite_table_route_cache = - layered_cache_registry.get().context(CacheRequiredSnafu { - name: COMPOSITE_TABLE_ROUTE_CACHE, - })?; + let table_route_cache = layered_cache_registry.get().context(CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; let catalog_manager = KvBackendCatalogManager::new( dn_opts.mode, None, kv_backend.clone(), table_cache, - composite_table_route_cache, + table_route_cache, ) .await; diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index f776e54a7e48..b7d13a6f0ec0 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -24,8 +24,7 @@ pub use registry::{ LayeredCacheRegistryBuilder, LayeredCacheRegistryRef, }; pub use table::{ - new_composite_table_route_cache, new_table_info_cache, new_table_name_cache, - new_table_route_cache, CompositeTableRoute, CompositeTableRouteCache, - CompositeTableRouteCacheRef, TableInfoCache, TableInfoCacheRef, TableNameCache, - TableNameCacheRef, TableRouteCache, TableRouteCacheRef, + new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache, + TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache, + TableRouteCacheRef, }; diff --git a/src/common/meta/src/cache/table.rs b/src/common/meta/src/cache/table.rs index a595118c23f9..fa3bcbd30994 100644 --- a/src/common/meta/src/cache/table.rs +++ b/src/common/meta/src/cache/table.rs @@ -12,14 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod composite_table_route; mod table_info; mod table_name; mod table_route; -pub use composite_table_route::{ - new_composite_table_route_cache, CompositeTableRoute, CompositeTableRouteCache, - CompositeTableRouteCacheRef, -}; pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef}; pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef}; pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef}; diff --git a/src/common/meta/src/cache/table/composite_table_route.rs b/src/common/meta/src/cache/table/composite_table_route.rs deleted file mode 100644 index 4369a0cc06f2..000000000000 --- a/src/common/meta/src/cache/table/composite_table_route.rs +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use moka::future::Cache; -use snafu::OptionExt; -use store_api::storage::TableId; -use tokio::sync::RwLock; - -use crate::cache::table::{TableRoute, TableRouteCacheRef}; -use crate::cache::{CacheContainer, Initializer, Invalidator}; -use crate::error; -use crate::instruction::CacheIdent; -use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue}; - -/// [CompositeTableRoute] stores all level routes of a table. -/// - stores [PhysicalTableRouteValue] for the physical table. -/// - stores [LogicalTableRouteValue], [PhysicalTableRouteValue] for the logical table. -#[derive(Clone)] -pub enum CompositeTableRoute { - Physical(Arc), - Logical(Arc, Arc), -} - -impl CompositeTableRoute { - /// Returns true if it's physical table. - pub fn is_physical(&self) -> bool { - matches!(self, CompositeTableRoute::Physical(_)) - } - - /// Returns [PhysicalTableRouteValue] reference. - pub fn as_physical_table_route_ref(&self) -> &Arc { - match self { - CompositeTableRoute::Physical(route) => route, - CompositeTableRoute::Logical(_, route) => route, - } - } - - /// Returns [LogicalTableRouteValue] reference if it's [CompositeTableRoute::Logical]; Otherwise returns [None]. - pub fn as_logical_table_route_ref(&self) -> Option<&Arc> { - match self { - CompositeTableRoute::Physical(_) => None, - CompositeTableRoute::Logical(route, _) => Some(route), - } - } -} - -/// [CompositeTableRouteCache] caches the [TableId] to [CompositeTableRoute] mapping. -pub type CompositeTableRouteCache = CacheContainer, CacheIdent>; - -pub type CompositeTableRouteCacheRef = Arc; - -/// Constructs a [CompositeTableRouteCache]. -pub fn new_composite_table_route_cache( - name: String, - cache: Cache>, - table_route_cache: TableRouteCacheRef, -) -> CompositeTableRouteCache { - let physical_table_to_logical_tables = Arc::new(RwLock::new(Default::default())); - let init = init_factory(table_route_cache, physical_table_to_logical_tables.clone()); - let invalidator = invalidator_factory(physical_table_to_logical_tables); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) -} - -fn init_factory( - table_route_cache: TableRouteCacheRef, - physical_table_to_logical_tables: Arc>>>, -) -> Initializer> { - Arc::new(move |table_id| { - let table_route_cache = table_route_cache.clone(); - let physical_table_to_logical_tables = physical_table_to_logical_tables.clone(); - Box::pin(async move { - let table_route_value = table_route_cache - .get(*table_id) - .await? - .context(error::ValueNotExistSnafu)?; - match table_route_value.as_ref() { - TableRoute::Physical(physical_table_route) => Ok(Some(Arc::new( - CompositeTableRoute::Physical(physical_table_route.clone()), - ))), - TableRoute::Logical(logical_table_route) => { - let physical_table_id = logical_table_route.physical_table_id(); - let physical_table_route = table_route_cache - .get(physical_table_id) - .await? - .context(error::ValueNotExistSnafu)?; - - let physical_table_route = physical_table_route - .as_physical_table_route_ref() - .with_context(|| error::UnexpectedSnafu { - err_msg: format!( - "Expected the physical table route, but got logical table route, table: {table_id}" - ), - })?; - physical_table_to_logical_tables - .write() - .await - .entry(physical_table_id) - .or_default() - .push(*table_id); - - Ok(Some(Arc::new(CompositeTableRoute::Logical( - logical_table_route.clone(), - physical_table_route.clone(), - )))) - } - } - }) - }) -} - -fn invalidator_factory( - physical_table_to_logical_tables: Arc>>>, -) -> Invalidator, CacheIdent> { - Box::new(move |cache, ident| { - let physical_table_to_logical_tables = physical_table_to_logical_tables.clone(); - Box::pin(async move { - if let CacheIdent::TableId(table_id) = ident { - cache.invalidate(table_id).await; - if let Some(logical_table_ids) = physical_table_to_logical_tables - .write() - .await - .remove(table_id) - { - for logical_table_id in logical_table_ids { - cache.invalidate(&logical_table_id).await; - } - } - } - Ok(()) - }) - }) -} - -fn filter(ident: &CacheIdent) -> bool { - matches!(ident, CacheIdent::TableId(_)) -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use moka::future::CacheBuilder; - use store_api::storage::RegionId; - - use super::*; - use crate::cache::new_table_route_cache; - use crate::ddl::test_util::create_table::test_create_table_task; - use crate::ddl::test_util::test_create_logical_table_task; - use crate::key::table_route::TableRouteValue; - use crate::key::TableMetadataManager; - use crate::kv_backend::memory::MemoryKvBackend; - use crate::peer::Peer; - use crate::rpc::router::{Region, RegionRoute}; - - #[tokio::test] - async fn test_cache_with_physical_table_route() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); - let cache = CacheBuilder::new(128).build(); - let table_route_cache = Arc::new(new_table_route_cache( - "test".to_string(), - cache, - mem_kv.clone(), - )); - let cache = CacheBuilder::new(128).build(); - let cache = - new_composite_table_route_cache("test".to_string(), cache, table_route_cache.clone()); - - let result = cache.get(1024).await.unwrap(); - assert!(result.is_none()); - let task = test_create_table_task("my_table", 1024); - let table_id = 10; - let region_id = RegionId::new(table_id, 1); - let peer = Peer::empty(1); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - table_metadata_manager - .create_table_metadata( - task.table_info.clone(), - TableRouteValue::physical(region_routes.clone()), - HashMap::new(), - ) - .await - .unwrap(); - let table_route = cache.get(1024).await.unwrap().unwrap(); - assert_eq!( - (*table_route) - .clone() - .as_physical_table_route_ref() - .region_routes, - region_routes - ); - - assert!(table_route_cache.contains_key(&1024)); - assert!(cache.contains_key(&1024)); - cache - .invalidate(&[CacheIdent::TableId(1024)]) - .await - .unwrap(); - assert!(!cache.contains_key(&1024)); - } - - #[tokio::test] - async fn test_cache_with_logical_table_route() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); - let cache = CacheBuilder::new(128).build(); - let table_route_cache = Arc::new(new_table_route_cache( - "test".to_string(), - cache, - mem_kv.clone(), - )); - let cache = CacheBuilder::new(128).build(); - let cache = - new_composite_table_route_cache("test".to_string(), cache, table_route_cache.clone()); - - let result = cache.get(1024).await.unwrap(); - assert!(result.is_none()); - // Prepares table routes - let task = test_create_table_task("my_table", 1024); - let table_id = 10; - let region_id = RegionId::new(table_id, 1); - let peer = Peer::empty(1); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - table_metadata_manager - .create_table_metadata( - task.table_info.clone(), - TableRouteValue::physical(region_routes.clone()), - HashMap::new(), - ) - .await - .unwrap(); - let mut task = test_create_logical_table_task("logical"); - task.table_info.ident.table_id = 1025; - table_metadata_manager - .create_logical_tables_metadata(vec![( - task.table_info, - TableRouteValue::logical(1024, vec![RegionId::new(1025, 0)]), - )]) - .await - .unwrap(); - - assert!(!cache.contains_key(&1025)); - // Gets logical table route - let table_route = cache.get(1025).await.unwrap().unwrap(); - assert_eq!( - table_route - .as_logical_table_route_ref() - .unwrap() - .physical_table_id(), - 1024 - ); - assert_eq!( - table_route.as_physical_table_route_ref().region_routes, - region_routes - ); - - assert!(!cache.contains_key(&1024)); - // Gets physical table route - let table_route = cache.get(1024).await.unwrap().unwrap(); - assert_eq!( - table_route.as_physical_table_route_ref().region_routes, - region_routes - ); - assert!(table_route.is_physical()); - - cache - .invalidate(&[CacheIdent::TableId(1024)]) - .await - .unwrap(); - assert!(!cache.contains_key(&1024)); - assert!(!cache.contains_key(&1025)); - } -} diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 70dcad5c7e0f..f0993458a6aa 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -14,10 +14,10 @@ use std::sync::Arc; -use cache::{COMPOSITE_TABLE_ROUTE_CACHE, TABLE_FLOWNODE_SET_CACHE_NAME}; +use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::CatalogManagerRef; use common_base::Plugins; -use common_meta::cache::{CompositeTableRouteCacheRef, LayeredCacheRegistryRef}; +use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef}; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManager; @@ -98,15 +98,15 @@ impl FrontendBuilder { let node_manager = self.node_manager; let plugins = self.plugins.unwrap_or_default(); - let composite_table_route_cache: CompositeTableRouteCacheRef = self - .layered_cache_registry - .get() - .context(error::CacheRequiredSnafu { - name: COMPOSITE_TABLE_ROUTE_CACHE, - })?; + let table_route_cache: TableRouteCacheRef = + self.layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; let partition_manager = Arc::new(PartitionRuleManager::new( kv_backend.clone(), - composite_table_route_cache.clone(), + table_route_cache.clone(), )); let local_cache_invalidator = self @@ -169,7 +169,7 @@ impl FrontendBuilder { kv_backend.clone(), local_cache_invalidator, inserter.clone(), - composite_table_route_cache, + table_route_cache, )); plugins.insert::(statement_executor.clone()); diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 654550fb7e7d..8864d4b78f59 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; -use common_meta::cache::CompositeTableRouteCacheRef; +use common_meta::cache::TableRouteCacheRef; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; @@ -81,7 +81,7 @@ impl StatementExecutor { kv_backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, - composite_table_route_cache: CompositeTableRouteCacheRef, + table_route_cache: TableRouteCacheRef, ) -> Self { Self { catalog_manager, @@ -89,10 +89,7 @@ impl StatementExecutor { procedure_executor, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())), - partition_manager: Arc::new(PartitionRuleManager::new( - kv_backend, - composite_table_route_cache, - )), + partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)), cache_invalidator, inserter, } diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 4949d95e33fa..4a30928dc8c5 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -16,9 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use catalog::kvbackend::MetaKvBackend; -use common_meta::cache::{ - new_composite_table_route_cache, new_table_route_cache, CompositeTableRouteCacheRef, -}; +use common_meta::cache::{new_table_route_cache, TableRouteCacheRef}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -85,18 +83,12 @@ fn new_test_region_wal_options(regions: Vec) -> HashMap CompositeTableRouteCacheRef { +fn test_new_table_route_cache(kv_backend: KvBackendRef) -> TableRouteCacheRef { let cache = CacheBuilder::new(128).build(); - let table_route_cache = Arc::new(new_table_route_cache( + Arc::new(new_table_route_cache( "table_route_cache".to_string(), cache, kv_backend.clone(), - )); - let cache = CacheBuilder::new(128).build(); - Arc::new(new_composite_table_route_cache( - "composite_table_route_cache".to_string(), - cache, - table_route_cache, )) } @@ -120,11 +112,8 @@ pub(crate) async fn create_partition_rule_manager( kv_backend: KvBackendRef, ) -> PartitionRuleManagerRef { let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); - let composite_table_route_cache = test_new_composite_table_route_cache(kv_backend.clone()); - let partition_manager = Arc::new(PartitionRuleManager::new( - kv_backend, - composite_table_route_cache, - )); + let table_route_cache = test_new_table_route_cache(kv_backend.clone()); + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)); let regions = vec![1u32, 2, 3]; let region_wal_options = new_test_region_wal_options(regions.clone()); @@ -270,11 +259,8 @@ async fn test_find_regions() { let kv_backend = Arc::new(MetaKvBackend { client: Arc::new(MetaClient::default()), }); - let composite_table_route_cache = test_new_composite_table_route_cache(kv_backend.clone()); - let partition_manager = Arc::new(PartitionRuleManager::new( - kv_backend, - composite_table_route_cache, - )); + let table_route_cache = test_new_table_route_cache(kv_backend.clone()); + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)); // PARTITION BY RANGE (a) ( // PARTITION r1 VALUES LESS THAN (10), diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 5cad00bb8de3..f5a245dba335 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -192,6 +192,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Unexpected: {err_msg}"))] + Unexpected { + err_msg: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -210,6 +217,7 @@ impl ErrorExt for Error { | Error::InvalidInsertRequest { .. } | Error::InvalidDeleteRequest { .. } => StatusCode::InvalidArguments, Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal, + Error::Unexpected { .. } => StatusCode::Unexpected, Error::InvalidTableRouteData { .. } => StatusCode::Internal, Error::ConvertScalarValue { .. } => StatusCode::Internal, Error::FindDatanode { .. } => StatusCode::InvalidArguments, diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 742e37f0a628..3d192fc97700 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -16,12 +16,11 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::Rows; -use common_meta::cache::CompositeTableRouteCacheRef; +use common_meta::cache::{TableRoute, TableRouteCacheRef}; use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router; -use common_meta::rpc::router::RegionRoute; +use common_meta::rpc::router::{self, RegionRoute}; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -52,7 +51,7 @@ pub type PartitionRuleManagerRef = Arc; /// - filters (in case of select, deletion and update) pub struct PartitionRuleManager { table_route_manager: TableRouteManager, - composite_table_route_cache: CompositeTableRouteCacheRef, + table_route_cache: TableRouteCacheRef, } #[derive(Debug)] @@ -62,29 +61,46 @@ pub struct PartitionInfo { } impl PartitionRuleManager { - pub fn new( - kv_backend: KvBackendRef, - composite_table_route_cache: CompositeTableRouteCacheRef, - ) -> Self { + pub fn new(kv_backend: KvBackendRef, table_route_cache: TableRouteCacheRef) -> Self { Self { table_route_manager: TableRouteManager::new(kv_backend), - composite_table_route_cache, + table_route_cache, } } - pub async fn find_table_route( + pub async fn find_physical_table_route( &self, table_id: TableId, ) -> Result> { - let table_route = self - .composite_table_route_cache + match self + .table_route_cache .get(table_id) .await .context(error::TableRouteManagerSnafu)? - .context(error::TableRouteNotFoundSnafu { table_id })?; - - let table_route = table_route.as_physical_table_route_ref().clone(); - Ok(table_route) + .context(error::TableRouteNotFoundSnafu { table_id })? + .as_ref() + { + TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()), + TableRoute::Logical(logical_table_route) => { + let physical_table_id = logical_table_route.physical_table_id(); + let physical_table_route = self + .table_route_cache + .get(physical_table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + let physical_table_route = physical_table_route + .as_physical_table_route_ref() + .context(error::UnexpectedSnafu{ + err_msg: format!( + "Expected the physical table route, but got logical table route, table: {table_id}" + ), + })?; + + Ok(physical_table_route.clone()) + } + } } pub async fn batch_find_region_routes( @@ -108,7 +124,10 @@ impl PartitionRuleManager { } pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { - let region_routes = &self.find_table_route(table_id).await?.region_routes; + let region_routes = &self + .find_physical_table_route(table_id) + .await? + .region_routes; ensure!( !region_routes.is_empty(), error::FindTableRoutesSnafu { table_id } @@ -241,7 +260,7 @@ impl PartitionRuleManager { pub async fn find_region_leader(&self, region_id: RegionId) -> Result { let region_routes = &self - .find_table_route(region_id.table_id()) + .find_physical_table_route(region_id.table_id()) .await? .region_routes; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 13f431a738d7..4131d36b30d0 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -364,13 +364,13 @@ impl GreptimeDbClusterBuilder { ); let table_cache = cache_registry.get().unwrap(); - let composite_table_route_cache = cache_registry.get().unwrap(); + let table_route_cache = cache_registry.get().unwrap(); let catalog_manager = KvBackendCatalogManager::new( Mode::Distributed, Some(meta_client.clone()), cached_meta_backend.clone(), table_cache, - composite_table_route_cache, + table_route_cache, ) .await;