Skip to content

Commit

Permalink
Implement FlintJob to handle all query types in WP mode
Browse files Browse the repository at this point in the history
Signed-off-by: Shri Saran Raj N <[email protected]>
  • Loading branch information
Shri Saran Raj N committed Dec 20, 2024
1 parent b9cf14e commit cd81203
Show file tree
Hide file tree
Showing 8 changed files with 1,228 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,56 @@ public final class MetricConstants {
*/
public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count";

/**
* Metric for tracking the count of streaming jobs failed during query execution
*/
public static final String STREAMING_EXECUTION_FAILED_METRIC = "streaming.execution.failed.count";

/**
* Metric for tracking the count of streaming jobs failed during query result write
*/
public static final String STREAMING_RESULT_WRITER_FAILED_METRIC = "streaming.writer.failed.count";

/**
* Metric for tracking the latency of query execution (start to complete query execution) excluding result write.
*/
public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime";
public static final String QUERY_EXECUTION_TIME_METRIC = "streaming.query.execution.processingTime";

/**
* Metric for tracking the latency of query result write only (excluding query execution)
*/
public static final String QUERY_RESULT_WRITER_TIME_METRIC = "streaming.result.writer.processingTime";

/**
* Metric for tracking the latency of query total execution including result write.
*/
public static final String QUERY_TOTAL_TIME_METRIC = "streaming.query.total.processingTime";

/**
* Metric for tracking the latency of query execution (start to complete query execution) excluding result write.
*/
public static final String STATEMENT_QUERY_EXECUTION_TIME_METRIC = "statement.query.execution.processingTime";

/**
* Metric for tracking the latency of query result write only (excluding query execution)
*/
public static final String STATEMENT_RESULT_WRITER_TIME_METRIC = "statement.result.writer.processingTime";

/**
* Metric for tracking the latency of query total execution including result write.
*/
public static final String STATEMENT_QUERY_TOTAL_TIME_METRIC = "statement.query.total.processingTime";

/**
* Metric for tracking the count of interactive jobs failed during query execution
*/
public static final String STATEMENT_EXECUTION_FAILED_METRIC = "statement.execution.failed.count";

/**
* Metric for tracking the count of interactive jobs failed during query result write
*/
public static final String STATEMENT_RESULT_WRITER_FAILED_METRIC = "statement.writer.failed.count";


/**
* Metric for query count of each query type (DROP/VACUUM/ALTER/REFRESH/CREATE INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ object FlintSparkConf {
.doc("Enable external scheduler for index refresh")
.createWithDefault("false")

val WARMPOOL_ENABLED =
FlintConfig("spark.flint.job.warmpoolEnabled")
.createWithDefault("false")

val MAX_EXECUTORS_COUNT = FlintConfig("spark.dynamicAllocation.maxExecutors").createOptional()

val EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD =
FlintConfig("spark.flint.job.externalScheduler.interval")
.doc("Interval threshold in minutes for external scheduler to trigger index refresh")
Expand Down Expand Up @@ -246,6 +252,10 @@ object FlintSparkConf {
FlintConfig(s"spark.flint.job.requestIndex")
.doc("Request index")
.createOptional()
val RESULT_INDEX =
FlintConfig(s"spark.flint.job.resultIndex")
.doc("Result index")
.createOptional()
val EXCLUDE_JOB_IDS =
FlintConfig(s"spark.flint.deployment.excludeJobs")
.doc("Exclude job ids")
Expand Down
Loading

0 comments on commit cd81203

Please sign in to comment.