Skip to content

Commit

Permalink
refactor: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Jan 22, 2025
1 parent 5bd8798 commit b43a6c8
Showing 1 changed file with 8 additions and 15 deletions.
23 changes: 8 additions & 15 deletions src/servers/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,20 @@ use crate::metrics::{
use crate::query_handler::PipelineHandlerRef;

#[inline]
pub(crate) fn pipeline_exec_with_intermediate_state(
fn pipeline_exec_with_intermediate_state(
pipeline: &Arc<Pipeline<GreptimeTransformer>>,
intermediate_state: &mut Vec<pipeline::Value>,
transformed: &mut Vec<Row>,
dispatched: &mut BTreeMap<DispatchedTo, Vec<Vec<pipeline::Value>>>,
db: &str,
transform_timer: &Instant,
is_top_level: bool,
) -> Result<()> {
let r = pipeline
.exec_mut(intermediate_state)
.inspect_err(|_| {
if is_top_level {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db, METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
}
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db, METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
Expand Down Expand Up @@ -118,22 +115,20 @@ pub(crate) async fn run_pipeline(
let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?;

let transform_timer = std::time::Instant::now();
let mut intermediate_state = pipeline.init_intermediate_state();

let mut transformed = Vec::with_capacity(values.len());
let mut dispatched: BTreeMap<DispatchedTo, Vec<Vec<pipeline::Value>>> = BTreeMap::new();

match values {
PipelineExecInput::Original(array) => {
let mut intermediate_state = pipeline.init_intermediate_state();
for v in array {
pipeline
.prepare(v, &mut intermediate_state)
.inspect_err(|_| {
if is_top_level {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db, METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
}
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db, METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
Expand All @@ -145,7 +140,6 @@ pub(crate) async fn run_pipeline(
&mut dispatched,
db,
&transform_timer,
is_top_level,
)?;

pipeline.reset_intermediate_state(&mut intermediate_state);
Expand All @@ -160,7 +154,6 @@ pub(crate) async fn run_pipeline(
&mut dispatched,
db,
&transform_timer,
is_top_level,
)?;
}
}
Expand Down

0 comments on commit b43a6c8

Please sign in to comment.