From b43a6c83139a03e7589bf22c6ddbdb8b16f39c10 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 22 Jan 2025 14:48:55 +0800 Subject: [PATCH] refactor: address review comments --- src/servers/src/pipeline.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index c4b01b489ef6..5d11df2eff2b 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -32,23 +32,20 @@ use crate::metrics::{ use crate::query_handler::PipelineHandlerRef; #[inline] -pub(crate) fn pipeline_exec_with_intermediate_state( +fn pipeline_exec_with_intermediate_state( pipeline: &Arc>, intermediate_state: &mut Vec, transformed: &mut Vec, dispatched: &mut BTreeMap>>, db: &str, transform_timer: &Instant, - is_top_level: bool, ) -> Result<()> { let r = pipeline .exec_mut(intermediate_state) .inspect_err(|_| { - if is_top_level { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db, METRIC_FAILURE_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - } + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db, METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); }) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; @@ -118,22 +115,20 @@ pub(crate) async fn run_pipeline( let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?; let transform_timer = std::time::Instant::now(); - let mut intermediate_state = pipeline.init_intermediate_state(); let mut transformed = Vec::with_capacity(values.len()); let mut dispatched: BTreeMap>> = BTreeMap::new(); match values { PipelineExecInput::Original(array) => { + let mut intermediate_state = pipeline.init_intermediate_state(); for v in array { pipeline .prepare(v, &mut intermediate_state) .inspect_err(|_| { - if is_top_level { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db, METRIC_FAILURE_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - } + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db, METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); }) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; @@ -145,7 +140,6 @@ pub(crate) async fn run_pipeline( &mut dispatched, db, &transform_timer, - is_top_level, )?; pipeline.reset_intermediate_state(&mut intermediate_state); @@ -160,7 +154,6 @@ pub(crate) async fn run_pipeline( &mut dispatched, db, &transform_timer, - is_top_level, )?; } }