Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTO Task Overhaul (BugFix and Support to run multiple subtasks) #14623

Open
wants to merge 73 commits into
base: master
Choose a base branch
from

Conversation

noob-se7en
Copy link
Contributor

@noob-se7en noob-se7en commented Dec 9, 2024

Solves

  1. Issue: Support maxNumRowsPerTask in RealtimeToOfflineSegmentsTask #12857
    Issue description: Currently RealtimeToOfflineSegmentsTask that is used to move real time segments to offline segments, does not have the ability to tune maxNumRowsPerTask. This is the parameter that determines the input to a task. Without this configuration, we end up creating one minion task, which takes in all the input (i.e all segments that meet the criteria to be converted to offline segments) which prevents us from using other minions.
  2. Bugs: [Bug] Data Inconsistency in RealtimeToOffline Minion tasks #14659
    2.1. Currently for RTO task, watermark is updated in executor after segments are uploaded. Hence, there can be possible scenarios where segments were uploaded to offline table but RTO metadata watermark was not updated AND RTO task generator does not validate if the segment has already been processed in the previous minion run.

Proposed Solution:

  1. Divide a task into multiple subtasks based on max num of rows per subtask.
  2. Since Segment Lineage is only kept for replaced segment (not valid of RTO tasks), This PR uses similar logic to add segment lineage like data structure called ExpectedRealtimeToOfflineTaskResultInfo in RTO task metadata.
  3. Once subtasks are generated, during the execution of subtasks, each subtasks atomically updates ExpectedRealtimeToOfflineTaskResultInfo present in task metadata before uploading offline segments.
  4. In next iteration of generating RTO subtasks, using task metadata and current state of offline table, evaluate failed subtasks and re-schedule them. Incase of no failure pick new eligible segments.
  5. Update the watermark based on the startTime of the earliest segment selected only if new eligible segments are picked (i.e. no failure).

Alternate solution (dropped) - #14578

TODO - Need to add more tests for edge cases (pending).

@codecov-commenter
Copy link

codecov-commenter commented Dec 9, 2024

Codecov Report

Attention: Patch coverage is 76.36364% with 78 lines in your changes missing coverage. Please review.

Project coverage is 63.62%. Comparing base (59551e4) to head (279a339).
Report is 1625 commits behind head on master.

Files with missing lines Patch % Lines
...egments/RealtimeToOfflineSegmentsTaskExecutor.java 4.65% 41 Missing ⚠️
.../minion/RealtimeToOfflineSegmentsTaskMetadata.java 66.07% 17 Missing and 2 partials ⚠️
...gments/RealtimeToOfflineSegmentsTaskGenerator.java 92.38% 7 Missing and 9 partials ⚠️
.../minion/RealtimeToOfflineCheckpointCheckPoint.java 90.47% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14623      +/-   ##
============================================
+ Coverage     61.75%   63.62%   +1.86%     
- Complexity      207     1411    +1204     
============================================
  Files          2436     2709     +273     
  Lines        133233   151767   +18534     
  Branches      20636    23431    +2795     
============================================
+ Hits          82274    96556   +14282     
- Misses        44911    47941    +3030     
- Partials       6048     7270    +1222     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 ?
java-11 34.03% <59.39%> (-27.68%) ⬇️
java-21 63.61% <76.36%> (+1.99%) ⬆️
skip-bytebuffers-false 63.61% <76.36%> (+1.86%) ⬆️
skip-bytebuffers-true 63.60% <76.36%> (+35.87%) ⬆️
temurin 63.62% <76.36%> (+1.86%) ⬆️
unittests 63.61% <76.36%> (+1.86%) ⬆️
unittests1 56.13% <72.72%> (+9.24%) ⬆️
unittests2 34.03% <59.39%> (+6.30%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


private final String _tableNameWithType;
private final long _watermarkMs;
private long _watermarkMs;
private final Map<String, List<String>> _realtimeSegmentVsCorrespondingOfflineSegmentMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks more promising.
I think we might have to track this input->output segment on a per task basis and then undo everything if there's a task failure (task's execution should be treated as all or nothing). You could maintain 2 maps where the key of the map is taskId and value is list of segments (inputSegments, outputSegments).
If there's a task failure, you need to undo all that the task has done i.e remove all outputSegments (if they exist in offline table) and redo all inputSegments that the task picked.
The reason for the above is a single input segment can map to multiple output segments and multiple input can map to single output segment. The cleanest approach is undo what the task has done (either in the minion itself & in generator as fallback) if there's failure and retry the input segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code handles M -> N segment conversion. This edge case is handled currently as well, no?

@noob-se7en noob-se7en changed the title Adds Support of maxNumRowsPerTask in RTO Generator Adds Support of maxNumRowsPerTask in RTO Generator and Fixes Bugs Dec 10, 2024
@noob-se7en noob-se7en marked this pull request as ready for review December 10, 2024 09:45

private final String _tableNameWithType;
private final long _watermarkMs;
private long _windowStartMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this change cause backward incompatibility issues during deployments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. _windowStartMs will be read from znRecord.getLongField("watermarkMs", 0) like before.
So there shouldn't be any backward incompatibility issue.

* when a prev minion task is failed.
*
*/
public class ExpectedRealtimeToOfflineTaskResultInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: needs a better and shorter name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RTOTaskResult instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to include Expected . Maybe ExpectedRTOSubtaskResult

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have refactored var names.

* The <code>_segmentsTo</code> denotes the expected offline segemnts.
* The <code>_id</code> denotes the unique identifier of object.
* The <code>_taskID</code> denotes the minion taskId.
* The <code>_taskFailure</code> denotes the status of minion task handling the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_taskStatus would be more apt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have refactored the code doc. We just only want to track whether task failed or not, Rest status are irrelevant here.

*/
public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {

private static final String WATERMARK_KEY = "watermarkMs";
private static final String WINDOW_START_KEY = "watermarkMs";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value should be windowStartMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Backward compatibility the value should remain same. Like when we deploy, watermark should be picked what was previously set.

* when a prev minion task is failed.
*
*/
public class ExpectedRealtimeToOfflineTaskResultInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RTOTaskResult instead?

private long _windowStartMs;
private long _windowEndMs;
private final Map<String, ExpectedRealtimeToOfflineTaskResultInfo> _idVsExpectedRealtimeToOfflineTaskResultInfo;
private final Map<String, String> _segmentNameVsExpectedRealtimeToOfflineTaskResultInfoId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename as _taskResultsMap instead? Consider simplifying the variable names throughtout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified var names.

_tableNameWithType = tableNameWithType;
_watermarkMs = watermarkMs;
_idVsExpectedRealtimeToOfflineTaskResultInfo = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name of the form taskToResults is easier to follow than taskVsResults

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed.

@@ -93,7 +93,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {

private static final String DEFAULT_BUCKET_PERIOD = "1d";
private static final String DEFAULT_BUFFER_PERIOD = "2d";
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = Integer.MAX_VALUE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important for case where this PR is deployed to controller first.

_taskFailure = taskFailure;
}

public String getTaskID() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this itself be a uniqueId instead of new _id field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to separate out logic of setting _taskId and this _id. This _taskId is just needed for logging of failed minion task.

* when a prev minion task is failed.
*
*/
public class ExpectedSubtaskResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this applicable for other tasks. If not, I'd suggest moving this data structure to a RTO specific module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we have already kept TaskMetadata in pinot-common module. We will have to move both RTOTaskMetadata and this class to RTO specific module. Because pinot-common does not import RTO specific module i.e. pinot-minion-builtin-tasks.

* when a prev minion task is failed.
*
*/
public class ExpectedSubtaskResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this something like RealtimeToOfflineCheckpoint or something ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

@@ -99,12 +107,10 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
long windowStartMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs() <= windowStartMs,
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs() == windowStartMs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice stricter check ! Please confirm this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is fine

@@ -156,6 +162,11 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));

// Since multiple subtasks run in parallel, there shouldn't be a name conflict.
// Append uuid
segmentProcessorConfigBuilder.setSegmentNameGenerator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this should be left to the user to setup the naming scheme and we pick a default/generic naming scheme, instead of using a specific one. Can we not use the default name generator in SegmentProcessorFramework. Isn't time based normalized name generator used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, Then we will have to add notes in documentations that this edge case can occur and user should keep uuid based segmentNameGenerator. I think user should not bother about this and Executor should handle this edge-case.

ExpectedSubtaskResult expectedSubtaskResult =
expectedSubtaskResultMap.get(id);
// if already marked as failure, no need to delete again.
if (expectedSubtaskResult.isTaskFailure()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this state ? Can we not use offline table as source of truth of whether a segment exists or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed and referenced in Executor.
Preconditions.checkState(prevExpectedSubtaskResult.isTaskFailure(), "ExpectedSubtaskResult can only be replaced if it's of a failed task");

Let's say There were 2 consecutive failures in RTO for a realtime segment. Executor will try to update the expectedSubtaskResult for a segment in the map only if existing entry is marked as failed.

private long _windowStartMs;
private long _windowEndMs;
private final Map<String, ExpectedSubtaskResult> _expectedSubtaskResultMap;
private final Map<String, String> _segmentNameToExpectedSubtaskResultID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this Map? Can we not use the previous map itself to get the list of segmentsFrom and segmentsTo for a specific taskId? We can the construct this map in memory, from that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this map. consider edge case where there are more than one successive failures of subtask.
For a given RealtimeSegment we don't know which is the latest ExpectedSubtaskResult.

Copy link
Contributor Author

@noob-se7en noob-se7en Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if I can refactor to simplify the readibility of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the code, pls re-check.

// we can clear the state of prev minion tasks as now it's useless.
if (!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
isEmpty()) {
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is idempotent and should be fine upon re-execution? (i.e if there's failure prior to updating this state in Zk)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
Since we reach here only when no prev minon failures are found, In the next generator run we will evaluate same and will reach here again.

Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
_clusterInfoAccessor);
configs.putAll(getBaseTaskConfigs(tableConfig, segmentNameList));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLList, MinionConstants.URL_SEPARATOR));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the order of download url list match segmentNameList, so that minion correctly uses the download url ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes order matches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests are pending, which will validate all these things

skipGenerate = true;
break;

// Get all offline table segments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add sufficient log.info (without flooding:)), that can help someone debug whats going on (specifically differentiating between happy and failure paths and when there's failure, what action took place/which segments had to be reprocessed or deleted...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants