Skip to content

Commit

Permalink
Merge branch 'main' into create-view
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 authored May 13, 2024
2 parents 46bdd71 + 9d12496 commit e13b4fb
Show file tree
Hide file tree
Showing 30 changed files with 605 additions and 179 deletions.
492 changes: 382 additions & 110 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |


## Cluster Mode
Expand Down Expand Up @@ -203,6 +205,8 @@
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |


### Metasrv
Expand Down Expand Up @@ -259,6 +263,8 @@
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |


### Datanode
Expand Down Expand Up @@ -370,3 +376,5 @@
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
6 changes: 6 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,9 @@ url = ""

## HTTP headers of Prometheus remote-write carry.
headers = { }

## The tracing options. Only effect when compiled with `tokio-console` feature.
[tracing]
## The tokio console address.
## +toml2docs:none-default
tokio_console_addr = "127.0.0.1"
6 changes: 6 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,9 @@ url = ""

## HTTP headers of Prometheus remote-write carry.
headers = { }

## The tracing options. Only effect when compiled with `tokio-console` feature.
[tracing]
## The tokio console address.
## +toml2docs:none-default
tokio_console_addr = "127.0.0.1"
6 changes: 6 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,9 @@ url = ""

## HTTP headers of Prometheus remote-write carry.
headers = { }

## The tracing options. Only effect when compiled with `tokio-console` feature.
[tracing]
## The tokio console address.
## +toml2docs:none-default
tokio_console_addr = "127.0.0.1"
6 changes: 6 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,9 @@ url = ""

## HTTP headers of Prometheus remote-write carry.
headers = { }

## The tracing options. Only effect when compiled with `tokio-console` feature.
[tracing]
## The tokio console address.
## +toml2docs:none-default
tokio_console_addr = "127.0.0.1"
2 changes: 1 addition & 1 deletion src/cmd/src/bin/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async fn start(cli: Command) -> Result<()> {
let _guard = common_telemetry::init_global_logging(
&app_name,
opts.logging_options(),
cli.global_options.tracing_options(),
&cli.global_options.tracing_options(),
opts.node_id(),
);

Expand Down
6 changes: 6 additions & 0 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_wal::config::DatanodeWalConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
Expand Down Expand Up @@ -146,6 +147,11 @@ impl StartCommand {
opts.logging.level.clone_from(&global_options.log_level);
}

opts.tracing = TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: global_options.tokio_console_addr.clone(),
};

if let Some(addr) = &self.rpc_addr {
opts.rpc_addr.clone_from(addr);
}
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use client::client_manager::DatanodeClients;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone;
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
Expand Down Expand Up @@ -162,6 +163,11 @@ impl StartCommand {
opts.logging.level.clone_from(&global_options.log_level);
}

opts.tracing = TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: global_options.tokio_console_addr.clone(),
};

let tls_opts = TlsOption::new(
self.tls_mode.clone(),
self.tls_cert_path.clone(),
Expand Down
24 changes: 15 additions & 9 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
Expand Down Expand Up @@ -98,8 +99,8 @@ struct StartCommand {
bind_addr: Option<String>,
#[clap(long)]
server_addr: Option<String>,
#[clap(long)]
store_addr: Option<String>,
#[clap(long, aliases = ["store-addr"], value_delimiter = ',', num_args = 1..)]
store_addrs: Option<Vec<String>>,
#[clap(short, long)]
config_file: Option<String>,
#[clap(short, long)]
Expand Down Expand Up @@ -141,6 +142,11 @@ impl StartCommand {
opts.logging.level.clone_from(&global_options.log_level);
}

opts.tracing = TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: global_options.tokio_console_addr.clone(),
};

if let Some(addr) = &self.bind_addr {
opts.bind_addr.clone_from(addr);
}
Expand All @@ -149,8 +155,8 @@ impl StartCommand {
opts.server_addr.clone_from(addr);
}

if let Some(addr) = &self.store_addr {
opts.store_addr.clone_from(addr);
if let Some(addrs) = &self.store_addrs {
opts.store_addrs.clone_from(addrs);
}

if let Some(selector_type) = &self.selector {
Expand Down Expand Up @@ -230,7 +236,7 @@ mod tests {
let cmd = StartCommand {
bind_addr: Some("127.0.0.1:3002".to_string()),
server_addr: Some("127.0.0.1:3002".to_string()),
store_addr: Some("127.0.0.1:2380".to_string()),
store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
selector: Some("LoadBased".to_string()),
..Default::default()
};
Expand All @@ -239,7 +245,7 @@ mod tests {
unreachable!()
};
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector);
}

Expand Down Expand Up @@ -275,7 +281,7 @@ mod tests {
};
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2379".to_string(), options.store_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
Expand Down Expand Up @@ -309,7 +315,7 @@ mod tests {
let cmd = StartCommand {
bind_addr: Some("127.0.0.1:3002".to_string()),
server_addr: Some("127.0.0.1:3002".to_string()),
store_addr: Some("127.0.0.1:2380".to_string()),
store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
selector: Some("LoadBased".to_string()),
..Default::default()
};
Expand Down Expand Up @@ -395,7 +401,7 @@ mod tests {
assert_eq!(opts.http.addr, "127.0.0.1:14000");

// Should be default value.
assert_eq!(opts.store_addr, "127.0.0.1:2379");
assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
},
);
}
Expand Down
9 changes: 8 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
Expand Down Expand Up @@ -124,6 +124,7 @@ pub struct StandaloneOptions {
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
}

impl StandaloneOptions {
Expand Down Expand Up @@ -156,6 +157,7 @@ impl Default for StandaloneOptions {
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
],
tracing: TracingOptions::default(),
}
}
}
Expand Down Expand Up @@ -302,6 +304,11 @@ impl StartCommand {
opts.logging.level.clone_from(&global_options.log_level);
}

opts.tracing = TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: global_options.tokio_console_addr.clone(),
};

let tls_opts = TlsOption::new(
self.tls_mode.clone(),
self.tls_cert_path.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ derive_builder.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
orc-rust = { git = "https://github.com/MichaelScofield/orc-rs.git", rev = "17347f5f084ac937863317df882218055c4ea8c1" }
orc-rust = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "502217315726314c4008808fe169764529640599" }
parquet.workspace = true
paste = "1.0"
regex = "1.7"
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: orc_rust::error::Error,
error: orc_rust::error::OrcError,
},

#[snafu(display("Failed to read object from path: {}", path))]
Expand Down
23 changes: 7 additions & 16 deletions src/common/datasource/src/file_format/orc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener
use datafusion::error::{DataFusionError, Result as DfResult};
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use orc_rust::arrow_reader::{create_arrow_schema, Cursor};
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
use orc_rust::reader::Reader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};

Expand All @@ -33,28 +32,20 @@ use crate::file_format::FileFormat;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;

pub async fn new_orc_cursor<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Cursor<R>> {
let reader = Reader::new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
let cursor = Cursor::root(reader).context(error::OrcReaderSnafu)?;
Ok(cursor)
}

pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
let cursor = new_orc_cursor(reader).await?;
Ok(ArrowStreamReader::new(cursor, None))
let reader_build = ArrowReaderBuilder::try_new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
Ok(reader_build.build_async())
}

pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
let cursor = new_orc_cursor(reader).await?;
Ok(create_arrow_schema(&cursor))
let reader = new_orc_stream_reader(reader).await?;
Ok(reader.schema().as_ref().clone())
}

#[async_trait]
Expand Down
16 changes: 7 additions & 9 deletions src/common/meta/src/ddl/create_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;

Expand All @@ -39,7 +40,7 @@ impl CreateDatabaseProcedure {
catalog: String,
schema: String,
create_if_not_exists: bool,
options: Option<HashMap<String, String>>,
options: HashMap<String, String>,
context: DdlContext,
) -> Self {
Self {
Expand Down Expand Up @@ -85,19 +86,14 @@ impl CreateDatabaseProcedure {
}

pub async fn on_create_metadata(&mut self) -> Result<Status> {
let value: Option<SchemaNameValue> = self
.data
.options
.as_ref()
.map(|hash_map_ref| hash_map_ref.try_into())
.transpose()?;
let value: SchemaNameValue = (&self.data.options).try_into()?;

self.context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(&self.data.catalog, &self.data.schema),
value,
Some(value),
self.data.create_if_not_exists,
)
.await?;
Expand Down Expand Up @@ -142,11 +138,13 @@ pub enum CreateDatabaseState {
CreateMetadata,
}

#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateDatabaseData {
pub state: CreateDatabaseState,
pub catalog: String,
pub schema: String,
pub create_if_not_exists: bool,
pub options: Option<HashMap<String, String>>,
#[serde_as(deserialize_as = "DefaultOnNull")]
pub options: HashMap<String, String>,
}
Loading

0 comments on commit e13b4fb

Please sign in to comment.