Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FlintJob to handle all query types in warmpool mode #979

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

saranrajnk
Copy link

@saranrajnk saranrajnk commented Dec 9, 2024

Description

This PR introduces support for FlintJob to handle all types of queries — interactive, streaming, and batch — with all data sources in warmpool mode. Additionally, FlintJob will also support non-warmpool mode for streaming and batch queries, configurable via a Spark configuration setting.

Changes:

  • FlintJob does not use sessionId for query execution.
  • The client assigns the query at runtime for all query types in warmpool mode and executes them.
  • FlintJob now supports interactive queries by reusing several functions from FlintREPL, with access modifiers updated to public.
  • Added configuration for non-warmpool mode to support streaming and batch queries.
  • Emits metrics success, failure and latency metrics

Related Issues

Check List

  • Updated documentation (docs/ppl-lang/README.md)
  • Implemented unit tests
  • Implemented tests for combination with other commands
  • New added source code should include a copyright header
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

logInfo(s"WarmpoolEnabled: ${warmpoolEnabled}")

if (!warmpoolEnabled) {
val jobType = sparkSession.conf.get("spark.flint.job.type", FlintJobType.BATCH)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason to have the conf key hard-coded here?
We can probably do FlintSparkConf.JOB_TYPE.key, which similar to FlintSparkConf.WARMPOOL_ENABLED.key, on above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code was doing the same; I just wrapped it in an if block. I can modify this if needed.

CustomLogging.logInfo(s"""Job type is: ${jobType}""")
sparkSession.conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

val dataSource = conf.get("spark.flint.datasource.name", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for DATA_SOURCE_NAME

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code was doing the same; I just wrapped it in an if block. I can modify this if needed.

Copy link
Collaborator

@ykmr1224 ykmr1224 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify and document how WarmPool is abstracted and can be enabled/disabled?

val warmpoolEnabled = conf.get(FlintSparkConf.WARMPOOL_ENABLED.key, "false").toBoolean
logInfo(s"WarmpoolEnabled: ${warmpoolEnabled}")

if (!warmpoolEnabled) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduced huge if/else block, which reduce the readability/maintainability a lot. Can you split the class for warmpool and original interactive job?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this, let's abstract the common interface and move from there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

val queryId = conf.get(FlintSparkConf.QUERY_ID.key, "")
}

def queryLoop(commandContext: CommandContext, segmentName: String): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks similar to FlintREPL.queryLoop method, but modified. It would become very difficult to maintain since we need to very carefully maintain both to be consistent.
Can you add abstraction and avoid duplicates?
Can you add abstraction and avoid duplicates?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This redundancy is expected. After this PR is merged, FlintREPL will be deprecated. FlintJob will be the single point of entry for all types of queries.

Comment on lines +550 to +644
def getSegmentName(sparkSession: SparkSession): String = {
val maxExecutorsCount =
sparkSession.conf.get(FlintSparkConf.MAX_EXECUTORS_COUNT.key, "unknown")
String.format("%se", maxExecutorsCount)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This segmentName is specific to warmpool logic; let us create abstractions on warmpool and record metrics via AOP.

@@ -610,15 +610,15 @@ object FlintREPL extends Logging with FlintJobExecutor {
}
}

private def handleCommandTimeout(
def handleCommandTimeout(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both FlintJob and FlintREPL extends FlintJobExecutor, consider refactor common methods to FlintJobExecutor

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -32,7 +32,8 @@ case class JobOperator(
dataSource: String,
resultIndex: String,
jobType: String,
streamingRunningCount: AtomicInteger)
streamingRunningCount: AtomicInteger,
statementContext: Map[String, Any] = Map.empty[String, Any])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of adding statementContext which belongs to FlintStatement model?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getNextStatement call in FlintJob retrieves all the query information, including the statementContext. However, when the query is a streaming or batch query, we need to invoke the JobOperator. Currently, the JobOperator class only accepts the query, queryId, and other related information, but it does not include the statementContext. As a result, when the JobOperator calls executeStatement, we may encounter issues. To resolve this, the statementContext should be passed to the JobOperator.

Additionally, before calling executeStatement, the FlintStatement is constructed. Currently, the FlintStatement does not include the statementContext. However, with the introduction of the warmpool, it becomes necessary to include the statementContext in the FlintStatement, as there is client-side logic that depends on it.

val warmpoolEnabled = conf.get(FlintSparkConf.WARMPOOL_ENABLED.key, "false").toBoolean
logInfo(s"WarmpoolEnabled: ${warmpoolEnabled}")

if (!warmpoolEnabled) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this, let's abstract the common interface and move from there.

@saranrajnk saranrajnk force-pushed the nexus-wp-feat branch 2 times, most recently from cd81203 to 044aeea Compare December 20, 2024 20:42
Signed-off-by: Shri Saran Raj N <[email protected]>
val DEFAULT_QUERY_LOOP_EXECUTION_FREQUENCY = 100L
}

case class WarmpoolJob(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks it does not have test. Can you add tests to cover warmpool use cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I take this up in subsequent PRs ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a good practice to separate PR for implementation and unit test. It would also become harder to review the unit test separately.

* @param flintStatement
* Flint statement
*/
private def finalizeCommand(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it duplicate with FlintREPL? Can you check other duplicate and avoid it whenever possible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This differs from the FlintREPL code, as I require new changes that can't be achieved with the same function used in FlintREPL. Therefore, some redundancy in this function is unavoidable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the only difference is emit metrics and logging. If those are beneficial for Warmpool, should we generalize it and use for FlintREPL as well?

Copy link
Collaborator

@noCharger noCharger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the concept of interactive / batch / streaming job for warm pool?

"spark.flint.job.queryLoopExecutionFrequency",
DEFAULT_QUERY_LOOP_EXECUTION_FREQUENCY)

val sessionManager = instantiateSessionManager(sparkSession, resultIndexOption)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need session manager for warmpool?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are bunch of places where sessionManager needs to passed. The resultIndex for interactive query with custom datasource was being read from this sessionManagerImpl. So this has minimal dependency still.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the concept of interactive / batch / streaming job for warm pool?

Why ? This classification is required for the WP logic. Also, It would be difficult to remove this at this point. Maybe we can pick it up later if required.

}
}

def queryLoop(commandContext: CommandContext): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the concept of query loop for warm pool?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warmpool requires multiple iterations as well before running the actual query.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants