Skip to content

Commit

Permalink
Merge branch 'master' into order-by-score-id
Browse files Browse the repository at this point in the history
  • Loading branch information
bdach authored Feb 1, 2024
2 parents 8a5edb2 + dab042e commit 7ca7a66
Showing 1 changed file with 68 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public class VerifyImportedScoresCommand
[Option(CommandOptionType.SingleValue, Template = "--start-id")]
public ulong? StartId { get; set; }

/// <summary>
/// The ruleset to run this verify job for.
/// </summary>
[Option(CommandOptionType.SingleValue, Template = "--ruleset-id")]
public int RulesetId { get; set; }

/// <summary>
/// The number of scores to run in each batch. Setting this higher will cause larger SQL statements for insert.
/// </summary>
Expand All @@ -40,62 +46,75 @@ public class VerifyImportedScoresCommand

private ElasticQueuePusher? elasticQueueProcessor;

private int skipOutput;

public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)
{
var rulesetSpecifics = LegacyDatabaseHelper.GetRulesetSpecifics(RulesetId);

ulong lastId = StartId ?? 0;
int deleted = 0;
int fail = 0;

using var conn = DatabaseAccess.GetConnection();

lastId = await conn.QuerySingleAsync<ulong?>(
"SELECT id FROM scores WHERE ruleset_id = @rulesetId AND legacy_score_id = (SELECT MIN(legacy_score_id) FROM scores WHERE ruleset_id = @rulesetId AND id >= @lastId AND legacy_score_id > 0)", new
{
lastId,
rulesetId = RulesetId,
}) ?? lastId;

Console.WriteLine();
Console.WriteLine($"Verifying scores starting from {lastId}");
Console.WriteLine($"Verifying scores starting from {lastId} for ruleset {RulesetId}");

elasticQueueProcessor = new ElasticQueuePusher();
Console.WriteLine($"Indexing to elasticsearch queue(s) {elasticQueueProcessor.ActiveQueues}");

if (DryRun)
Console.WriteLine("RUNNING IN DRY RUN MODE.");

using var conn = DatabaseAccess.GetConnection();

while (!cancellationToken.IsCancellationRequested)
{
HashSet<ElasticQueuePusher.ElasticScoreItem> elasticItems = new HashSet<ElasticQueuePusher.ElasticScoreItem>();

IEnumerable<ComparableScore> importedScores = await conn.QueryAsync<ComparableScore>(
IEnumerable<ComparableScore> importedScores = await conn.QueryAsync(
"SELECT `id`, "
+ "`ruleset_id`, "
+ "`legacy_score_id`, "
+ "`legacy_total_score`, "
+ "`total_score`, "
+ "`rank`, "
+ "`pp` "
+ "FROM scores "
+ "WHERE id >= @lastId AND legacy_score_id IS NOT NULL ORDER BY id LIMIT @batchSize", new
+ "s.`rank`, "
+ "s.`pp`, "
+ "h.* "
+ "FROM scores s "
+ $"LEFT JOIN {rulesetSpecifics.HighScoreTable} h ON (legacy_score_id = score_id)"
+ "WHERE id BETWEEN @lastId AND (@lastId + @batchSize - 1) AND legacy_score_id IS NOT NULL AND ruleset_id = @rulesetId ORDER BY id",
(ComparableScore score, HighScore highScore) =>
{
score.HighScore = highScore;
return score;
},
new
{
lastId,
rulesetId = RulesetId,
batchSize = BatchSize
});
}, splitOn: "score_id");

// gather high scores for each ruleset
foreach (var rulesetScores in importedScores.GroupBy(s => s.ruleset_id))
if (!importedScores.Any())
{
var rulesetSpecifics = LegacyDatabaseHelper.GetRulesetSpecifics(rulesetScores.Key);

var highScores = (await conn.QueryAsync<HighScore>(
$"SELECT * FROM {rulesetSpecifics.HighScoreTable} WHERE score_id IN ({string.Join(',', rulesetScores.Select(s => s.legacy_score_id))})"))
.ToDictionary(s => s.score_id, s => s);

foreach (var score in rulesetScores)
if (lastId > await conn.QuerySingleAsync<ulong>("SELECT MAX(id) FROM scores"))
{
if (highScores.TryGetValue(score.legacy_score_id!.Value, out var highScore))
score.HighScore = highScore;
Console.WriteLine("All done!");
break;
}
}

if (!importedScores.Any())
{
Console.WriteLine("All done!");
break;
lastId += (ulong)BatchSize;

if (++skipOutput % 100 == 0)
Console.WriteLine($"Skipped up to {lastId}...");
continue;
}

elasticItems.Clear();
Expand All @@ -121,23 +140,35 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)

try
{
// Score was set via lazer, we have nothing to verify.
if (importedScore.legacy_score_id == null) continue;

// Score was deleted in legacy table.
//
// Importantly, `legacy_score_id` of 0 implies a non-high-score (which doesn't have a matching entry).
// We should leave these.
if (importedScore.HighScore == null)
{
Interlocked.Increment(ref deleted);
requiresIndexing = true;

if (!DryRun)
if (importedScore.legacy_score_id > 0)
{
await conn.ExecuteAsync("DELETE FROM scores WHERE id = @id", new
Interlocked.Increment(ref deleted);
requiresIndexing = true;

if (!DryRun)
{
id = importedScore.id
});
}
await conn.ExecuteAsync("DELETE FROM scores WHERE id = @id", new
{
id = importedScore.id
});
}

continue;
continue;
}
else
{
// Score was sourced from the osu_scores table, and we don't really care about verifying these.
continue;
}
}

if (DeleteOnly)
Expand Down Expand Up @@ -246,10 +277,10 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)
Console.WriteLine($"Queued {elasticItems.Count} items for indexing");
}

lastId = importedScores.Max(s => s.id) + 1;

Console.SetCursorPosition(0, Console.GetCursorPosition().Top);
Console.Write($"Processed up to {lastId} ({deleted} deleted {fail} failed)");
Console.Write($"Processed up to {importedScores.Max(s => s.id)} ({deleted} deleted {fail} failed)");

lastId += (ulong)BatchSize;
}

return 0;
Expand Down

0 comments on commit 7ca7a66

Please sign in to comment.