Skip to content

Commit

Permalink
Merge pull request #168 from peppy/use-helper-method
Browse files Browse the repository at this point in the history
  • Loading branch information
smoogipoo authored Nov 14, 2023
2 parents ecbd1f5 + b5fda07 commit 2fae652
Show file tree
Hide file tree
Showing 17 changed files with 59 additions and 51 deletions.
10 changes: 0 additions & 10 deletions osu.Server.Queues.ScoreStatisticsProcessor/Commands/BaseCommand.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
using Dapper;
using McMaster.Extensions.CommandLineUtils;
using MySqlConnector;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Models;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Maintenance
{
[Command("cleanup", Description = "Delete non-preserved scores which are stale enough.")]
public class DeleteNonPreservedScoresCommand : BaseCommand
public class DeleteNonPreservedScoresCommand
{
/// <summary>
/// How many hours non-preserved scores should be retained before being purged.
Expand All @@ -21,8 +22,8 @@ public class DeleteNonPreservedScoresCommand : BaseCommand

public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)
{
using (var readConnection = Queue.GetDatabaseConnection())
using (var deleteConnection = Queue.GetDatabaseConnection())
using (var readConnection = DatabaseAccess.GetConnection())
using (var deleteConnection = DatabaseAccess.GetConnection())
using (var deleteCommand = deleteConnection.CreateCommand())
{
deleteCommand.CommandText =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
using Dapper;
using McMaster.Extensions.CommandLineUtils;
using MySqlConnector;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;
using osu.Server.Queues.ScoreStatisticsProcessor.Models;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Maintenance
{
[Command("mark-non-preserved", Description = "Mark any scores which no longer need to be preserved.")]
public class MarkNonPreservedScoresCommand : BaseCommand
public class MarkNonPreservedScoresCommand
{
private readonly ElasticQueueProcessor elasticQueueProcessor = new ElasticQueueProcessor();

Expand All @@ -28,7 +29,7 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)

Console.WriteLine($"Running for ruleset {RulesetId}");

using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
{
Console.WriteLine("Fetching all users...");
int[] userIds = (await db.QueryAsync<int>($"SELECT `user_id` FROM {databaseInfo.UserStatsTable}")).ToArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
using osu.Game.Rulesets;
using osu.Game.Rulesets.Scoring;
using osu.Game.Scoring;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Maintenance
{
[Command("migrate-playlist-scores", Description = "Migrate scores from `multiplayer_scores` to `solo_scores`.")]
public class MigratePlaylistScoresToSoloScoresCommand : BaseCommand
public class MigratePlaylistScoresToSoloScoresCommand
{
/// <summary>
/// The playlist room ID to reprocess.
Expand All @@ -32,7 +33,7 @@ public class MigratePlaylistScoresToSoloScoresCommand : BaseCommand

public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)
{
using var db = Queue.GetDatabaseConnection();
using var db = DatabaseAccess.GetConnection();

int[] playlistIds;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
using osu.Game.Rulesets;
using osu.Game.Rulesets.Scoring;
using osu.Game.Scoring;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;

// ReSharper disable InconsistentNaming
namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Maintenance
{
[Command("recalculate-playlist-scores", Description = "Process all scores in a specific playlist, recalculating and writing any changes.")]
public class RecalculatePlaylistTotalScoresCommand : BaseCommand
public class RecalculatePlaylistTotalScoresCommand
{
/// <summary>
/// The playlist room ID to reprocess.
Expand All @@ -35,7 +36,7 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)
{
foreach (string id in PlaylistIds.Split(','))
{
using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
{
var playlistItems = await db.QueryAsync<MultiplayerPlaylistItem>("SELECT * FROM multiplayer_playlist_items WHERE room_id = @PlaylistId", new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
using System.Threading;
using System.Threading.Tasks;
using McMaster.Extensions.CommandLineUtils;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;
using osu.Server.Queues.ScoreStatisticsProcessor.Processors;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance
{
public abstract class PerformanceCommand : BaseCommand
public abstract class PerformanceCommand
{
protected ScorePerformanceProcessor ScoreProcessor { get; private set; } = null!;
protected UserTotalPerformanceProcessor TotalProcessor { get; private set; } = null!;
Expand Down Expand Up @@ -61,7 +62,7 @@ protected async Task ProcessUserTotals(int[] userIds, CancellationToken cancella

await ProcessPartitioned(userIds, async userId =>
{
using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
{
var userStats = await DatabaseHelper.GetUserStatsAsync(userId, RulesetId, db);

Expand Down Expand Up @@ -90,7 +91,7 @@ protected async Task ProcessUserScores(int[] userIds, CancellationToken cancella

await ProcessPartitioned(userIds, async userId =>
{
using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
await ScoreProcessor.ProcessUserScoresAsync(userId, RulesetId, db);
Console.WriteLine($"Processed {Interlocked.Increment(ref processedCount)} of {userIds.Length}");
}, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Dapper;
using McMaster.Extensions.CommandLineUtils;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance.Scores
Expand All @@ -25,7 +26,7 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo

long currentUserId;

using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
{
if (Continue)
currentUserId = await DatabaseHelper.GetCountAsync(databaseInfo.LastProcessedPpUserCount, db);
Expand All @@ -38,7 +39,7 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo

int? totalCount;

using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
{
totalCount = await db.QuerySingleAsync<int?>($"SELECT COUNT(`user_id`) FROM {databaseInfo.UserStatsTable} WHERE `user_id` >= @UserId", new
{
Expand All @@ -57,7 +58,7 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo
{
int[] userIds;

using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
{
userIds = (await db.QueryAsync<int>($"SELECT `user_id` FROM {databaseInfo.UserStatsTable} WHERE `user_id` > @UserId ORDER BY `user_id` LIMIT @Limit", new
{
Expand All @@ -71,14 +72,14 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo

await ProcessPartitioned(userIds, async userId =>
{
using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
await ScoreProcessor.ProcessUserScoresAsync(userId, RulesetId, db);
Console.WriteLine($"Processed {Interlocked.Increment(ref processedCount)} of {totalCount}");
}, cancellationToken);

currentUserId = userIds.Max();

using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
await DatabaseHelper.SetCountAsync(databaseInfo.LastProcessedPpUserCount, currentUserId, db);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using JetBrains.Annotations;
using McMaster.Extensions.CommandLineUtils;
using osu.Server.QueueProcessor;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance.Scores
{
Expand All @@ -28,7 +29,7 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo

await ProcessPartitioned(scoreIds, async id =>
{
using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
await ScoreProcessor.ProcessScoreAsync(id, db);
Console.WriteLine($"Processed {Interlocked.Increment(ref processedCount)} of {scoreIds.Length}");
}, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Dapper;
using JetBrains.Annotations;
using McMaster.Extensions.CommandLineUtils;
using osu.Server.QueueProcessor;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance.Scores
{
Expand All @@ -23,7 +24,7 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo
{
int[] userIds;

using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
userIds = (await db.QueryAsync<int>(Statement)).ToArray();

await ProcessUserScores(userIds, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Dapper;
using McMaster.Extensions.CommandLineUtils;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance.UserTotals
Expand All @@ -22,7 +23,7 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo

Console.WriteLine("Fetching all users...");

using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
userIds = (await db.QueryAsync<int>($"SELECT `user_id` FROM {databaseInfo.UserStatsTable}")).ToArray();

Console.WriteLine($"Fetched {userIds.Length} users");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Queue
{
[Command("clear", Description = "Completely empties the processing queue")]
public class ClearQueueCommand : BaseCommand
public class ClearQueueCommand
{
private readonly ScoreStatisticsQueueProcessor queue = new ScoreStatisticsQueueProcessor();

public Task<int> OnExecuteAsync(CancellationToken cancellationToken)
{
Queue.ClearQueue();
queue.ClearQueue();
Console.WriteLine("Queue has been cleared!");
return Task.FromResult(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Dapper;
using McMaster.Extensions.CommandLineUtils;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Models;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Queue
Expand All @@ -31,7 +32,7 @@ namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Queue
/// non-imported (lazer-first) scores anyway...
/// </remarks>
[Command("delete-high-scores", Description = "Deletes already-imported high scores from the solo_scores table.")]
public class DeleteImportedHighScoresCommand : BaseCommand
public class DeleteImportedHighScoresCommand
{
/// <summary>
/// The high score ID to start deleting imported high scores from.
Expand All @@ -54,7 +55,7 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)

Thread.Sleep(5000);

using (var conn = Queue.GetDatabaseConnection())
using (var conn = DatabaseAccess.GetConnection())
{
while (!cancellationToken.IsCancellationRequested)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using osu.Game.Rulesets.Scoring.Legacy;
using osu.Game.Scoring;
using osu.Game.Scoring.Legacy;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;
using osu.Server.Queues.ScoreStatisticsProcessor.Models;

Expand All @@ -38,7 +39,7 @@ namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Queue
/// which can be used for tie-breaker scenarios.
/// </remarks>
[Command("import-high-scores", Description = "Imports high scores from the osu_scores_high tables into the new solo_scores table.")]
public class ImportHighScoresCommand : BaseCommand
public class ImportHighScoresCommand
{
/// <summary>
/// The ruleset to run this import job for.
Expand Down Expand Up @@ -141,7 +142,7 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)
lastId = StartId.Value;
else
{
using (var db = Queue.GetDatabaseConnection())
using (var db = DatabaseAccess.GetConnection())
lastId = db.QuerySingleOrDefault<ulong?>($"SELECT MAX(old_score_id) FROM solo_scores_legacy_id_map WHERE ruleset_id = {RulesetId}") ?? 0;

Console.WriteLine($"StartId not provided, using last legacy ID map entry ({lastId})");
Expand All @@ -159,7 +160,7 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)

Thread.Sleep(5000);

using (var dbMainQuery = Queue.GetDatabaseConnection())
using (var dbMainQuery = DatabaseAccess.GetConnection())
{
while (!cancellationToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -276,7 +277,7 @@ void queueNextBatch()
if (batch.Count == 0)
return;

runningBatches.Add(new BatchInserter(ruleset, () => Queue.GetDatabaseConnection(), batch.ToArray(), SkipExisting, SkipNew));
runningBatches.Add(new BatchInserter(ruleset, batch.ToArray(), SkipExisting, SkipNew));
batch.Clear();
}
}
Expand Down Expand Up @@ -353,7 +354,6 @@ private void checkSlaveLatency(MySqlConnection db)
private class BatchInserter
{
private readonly Ruleset ruleset;
private readonly Func<MySqlConnection> getConnection;
private readonly bool skipExisting;
private readonly bool skipNew;

Expand All @@ -363,10 +363,9 @@ private class BatchInserter

public List<long> IndexableSoloScoreIDs { get; } = new List<long>();

public BatchInserter(Ruleset ruleset, Func<MySqlConnection> getConnection, HighScore[] scores, bool skipExisting, bool skipNew)
public BatchInserter(Ruleset ruleset, HighScore[] scores, bool skipExisting, bool skipNew)
{
this.ruleset = ruleset;
this.getConnection = getConnection;
this.skipExisting = skipExisting;
this.skipNew = skipNew;

Expand All @@ -376,7 +375,7 @@ public BatchInserter(Ruleset ruleset, Func<MySqlConnection> getConnection, HighS

public async Task Run(HighScore[] scores)
{
using (var db = getConnection())
using (var db = DatabaseAccess.GetConnection())
using (var transaction = await db.BeginTransactionAsync())
using (var insertCommand = db.CreateCommand())
using (var updateCommand = db.CreateCommand())
Expand Down
Loading

0 comments on commit 2fae652

Please sign in to comment.