Skip to content

Commit

Permalink
Merge pull request #284 from peppy/pp-2024
Browse files Browse the repository at this point in the history
Score processing command updates for upcoming (and future PP deploys)
  • Loading branch information
smoogipoo authored Oct 21, 2024
2 parents 69b9c80 + 4fdf3a9 commit cd9e5ac
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected async Task ProcessUserScores(uint[] userIds, CancellationToken cancell
await ProcessPartitioned(userIds, async userId =>
{
using (var db = DatabaseAccess.GetConnection())
await ScoreProcessor.ProcessUserScoresAsync(userId, RulesetId, db);
await ScoreProcessor.ProcessUserScoresAsync(userId, RulesetId, db, cancellationToken: cancellationToken);

Console.WriteLine($"Processed {Interlocked.Increment(ref processedCount)} of {userIds.Length}");
}, cancellationToken);
Expand All @@ -103,7 +103,6 @@ protected async Task ProcessPartitioned<T>(IEnumerable<T> values, Func<T, Task>
await Task.WhenAll(Partitioner
.Create(values)
.GetPartitions(Threads)
.AsParallel()
.Select(processPartition));

async Task processPartition(IEnumerator<T> partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// See the LICENCE file in the repository root for full licence text.

using System;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -12,76 +14,92 @@

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance.Scores
{
[Command(Name = "all", Description = "Computes pp of all scores from all users.")]
[Command(Name = "all-users", Description = "Computes pp of all scores from all users.")]
public class UpdateAllScoresByUserCommand : PerformanceCommand
{
private const int max_users_per_query = 10000;
private const int max_users_per_query = 1000;

[Option(Description = "Continue where a previously aborted 'all' run left off.")]
public bool Continue { get; set; }

/// <summary>
/// Whether to adjust processing rate based on slave latency. Defaults to <c>false</c>.
/// </summary>
[Option(CommandOptionType.SingleOrNoValue, Template = "--check-slave-latency")]
public bool CheckSlaveLatency { get; set; }

protected override async Task<int> ExecuteAsync(CancellationToken cancellationToken)
{
using var db = DatabaseAccess.GetConnection();

LegacyDatabaseHelper.RulesetDatabaseInfo databaseInfo = LegacyDatabaseHelper.GetRulesetSpecifics(RulesetId);

long currentUserId;

using (var db = DatabaseAccess.GetConnection())
if (Continue)
currentUserId = await DatabaseHelper.GetCountAsync(databaseInfo.LastProcessedPpUserCount, db);
else
{
if (Continue)
currentUserId = await DatabaseHelper.GetCountAsync(databaseInfo.LastProcessedPpUserCount, db);
else
{
currentUserId = 0;
await DatabaseHelper.SetCountAsync(databaseInfo.LastProcessedPpUserCount, 0, db);
}
currentUserId = 0;
await DatabaseHelper.SetCountAsync(databaseInfo.LastProcessedPpUserCount, 0, db);
}

int? totalCount;
ulong totalScores = 0;
double rate = 0;
Stopwatch sw = new Stopwatch();

using (var db = DatabaseAccess.GetConnection())
Console.WriteLine("Fetching users all users...");

ulong? totalUsers = await db.QuerySingleAsync<ulong?>($"SELECT COUNT(`user_id`) FROM {databaseInfo.UserStatsTable} WHERE `user_id` >= @UserId", new
{
totalCount = await db.QuerySingleAsync<int?>($"SELECT COUNT(`user_id`) FROM {databaseInfo.UserStatsTable} WHERE `user_id` >= @UserId", new
{
UserId = currentUserId
});
UserId = currentUserId
}, commandTimeout: 300000);

if (totalCount == null)
throw new InvalidOperationException("Could not find user ID count.");
}
if (totalUsers == null)
throw new InvalidOperationException("Could not find user ID count.");

Console.WriteLine($"Processing all users starting from UserID {currentUserId}");
Console.WriteLine($"Processing all {totalUsers:N0} users starting from UserID {currentUserId:N0}");

int processedCount = 0;
int processedUsers = 0;

while (!cancellationToken.IsCancellationRequested)
{
uint[] userIds;
if (CheckSlaveLatency)
await SlaveLatencyChecker.CheckSlaveLatency(db, cancellationToken);

using (var db = DatabaseAccess.GetConnection())
sw.Restart();

uint[] userIds = (await db.QueryAsync<uint>($"SELECT `user_id` FROM {databaseInfo.UserStatsTable} WHERE `user_id` > @UserId ORDER BY `user_id` LIMIT @Limit", new
{
userIds = (await db.QueryAsync<uint>($"SELECT `user_id` FROM {databaseInfo.UserStatsTable} WHERE `user_id` > @UserId ORDER BY `user_id` LIMIT @Limit", new
{
UserId = currentUserId,
Limit = max_users_per_query
})).ToArray();
}
UserId = currentUserId,
Limit = max_users_per_query
})).ToArray();

if (userIds.Length == 0)
break;

await ProcessPartitioned(userIds, async userId =>
{
using (var db = DatabaseAccess.GetConnection())
await ScoreProcessor.ProcessUserScoresAsync(userId, RulesetId, db);

Console.WriteLine($"Processed {Interlocked.Increment(ref processedCount)} of {totalCount}");
using (var partitionDb = DatabaseAccess.GetConnection())
using (var transaction = await partitionDb.BeginTransactionAsync(IsolationLevel.ReadUncommitted, cancellationToken))
{
Interlocked.Add(ref totalScores, (ulong)await ScoreProcessor.ProcessUserScoresAsync(userId, RulesetId, db, transaction, cancellationToken));
await transaction.CommitAsync(cancellationToken);
}
}, cancellationToken);

currentUserId = userIds.Max();
processedUsers += userIds.Length;

if (rate == 0)
rate = ((double)userIds.Length / sw.ElapsedMilliseconds * 1000);
else
rate = rate * 0.95 + 0.05 * ((double)userIds.Length / sw.ElapsedMilliseconds * 1000);

Console.WriteLine(ScoreProcessor.BeatmapStore?.GetCacheStats());
Console.WriteLine($"id: {currentUserId:N0} changed scores: {totalScores:N0} ({processedUsers:N0} of {totalUsers:N0} {(float)processedUsers / totalUsers:P1}) {rate:N0}/s");

using (var db = DatabaseAccess.GetConnection())
await DatabaseHelper.SetCountAsync(databaseInfo.LastProcessedPpUserCount, currentUserId, db);
await DatabaseHelper.SetCountAsync(databaseInfo.LastProcessedPpUserCount, currentUserId, db);
}

return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using McMaster.Extensions.CommandLineUtils;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;
using osu.Server.Queues.ScoreStatisticsProcessor.Models;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance.Scores
{
[Command(Name = "all", Description = "Computes pp of all scores from all users.")]
public class UpdateAllScoresCommand : PerformanceCommand
{
private const int max_scores_per_query = 5000;

[Option(Description = "Score ID to start processing from.")]
public ulong From { get; set; }

/// <summary>
/// Whether to adjust processing rate based on slave latency. Defaults to <c>false</c>.
/// </summary>
[Option(CommandOptionType.SingleOrNoValue, Template = "--check-slave-latency")]
public bool CheckSlaveLatency { get; set; }

protected override async Task<int> ExecuteAsync(CancellationToken cancellationToken)
{
using var db = DatabaseAccess.GetConnection();

ulong currentScoreId = From;
ulong? lastScoreId = await db.QuerySingleAsync<ulong>("SELECT MAX(id) FROM scores");

ulong processedCount = 0;
ulong changedPp = 0;
double rate = 0;
Stopwatch sw = new Stopwatch();

var scoresQuery = db.Query<SoloScore>("SELECT * FROM scores WHERE `id` > @ScoreId AND `id` <= @LastScoreId ORDER BY `id`", new
{
ScoreId = currentScoreId,
LastScoreId = lastScoreId,
}, buffered: false);

using var scoresEnum = scoresQuery.GetEnumerator();

Console.WriteLine($"Processing all scores up to {lastScoreId}, starting from {currentScoreId}");

Task<List<SoloScore>> nextScores = getNextScores();

while (!cancellationToken.IsCancellationRequested)
{
if (CheckSlaveLatency)
await SlaveLatencyChecker.CheckSlaveLatency(db, cancellationToken);

sw.Restart();

var scores = await nextScores;
nextScores = getNextScores();

if (scores.Count == 0)
break;

await Task.WhenAll(Partitioner.Create(scores).GetPartitions(Threads).Select(async partition =>
{
using (var connection = DatabaseAccess.GetConnection())
using (var transaction = await connection.BeginTransactionAsync(IsolationLevel.ReadUncommitted, cancellationToken))
using (partition)
{
while (partition.MoveNext())
{
if (cancellationToken.IsCancellationRequested)
return;

bool changed = await ScoreProcessor.ProcessScoreAsync(partition.Current, connection, transaction);

if (changed)
Interlocked.Increment(ref changedPp);
}

await transaction.CommitAsync(cancellationToken);
}
}));

if (cancellationToken.IsCancellationRequested)
return -1;

Interlocked.Add(ref processedCount, (ulong)scores.Count);

currentScoreId = scores.Last().id;

if (rate == 0)
rate = ((double)scores.Count / sw.ElapsedMilliseconds * 1000);
else
rate = rate * 0.95 + 0.05 * ((double)scores.Count / sw.ElapsedMilliseconds * 1000);

Console.WriteLine(ScoreProcessor.BeatmapStore?.GetCacheStats());
Console.WriteLine($"processed up to: {currentScoreId:N0} changed: {changedPp:N0} {(float)processedCount / (lastScoreId - From):P1} {rate:N0}/s");
}

return 0;

Task<List<SoloScore>> getNextScores() => Task.Run(() =>
{
List<SoloScore> scores = new List<SoloScore>(max_scores_per_query);

for (int i = 0; i < max_scores_per_query && scoresEnum.MoveNext(); i++)
scores.Add(scoresEnum.Current);

return scores;
}, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance
{
[Command("scores", Description = "Updates individual score PP values.")]
[Subcommand(typeof(UpdateAllScoresCommand))]
[Subcommand(typeof(UpdateAllScoresByUserCommand))]
[Subcommand(typeof(UpdateScoresFromListCommand))]
[Subcommand(typeof(UpdateScoresFromSqlCommand))]
Expand Down
Loading

0 comments on commit cd9e5ac

Please sign in to comment.