Skip to content

Commit

Permalink
Merge pull request #281 from peppy/latency-check-as-helper
Browse files Browse the repository at this point in the history
Split latency checker out into a helper class
  • Loading branch information
bdach authored Oct 21, 2024
2 parents 6bfa8e5 + e6c594c commit ae99a8f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading.Tasks;
using Dapper;
using McMaster.Extensions.CommandLineUtils;
using MySqlConnector;
using osu.Game.Rulesets;
using osu.Server.QueueProcessor;
using osu.Server.Queues.ScoreStatisticsProcessor.Helpers;
Expand Down Expand Up @@ -66,20 +65,9 @@ public class ImportHighScoresCommand

private long lastCommitTimestamp;
private long startupTimestamp;
private long lastLatencyCheckTimestamp;

private ElasticQueuePusher? elasticQueueProcessor;

/// <summary>
/// The number of seconds between checks for slave latency.
/// </summary>
private const int seconds_between_latency_checks = 60;

/// <summary>
/// The latency a slave is allowed to fall behind before we start to panic.
/// </summary>
private const int maximum_slave_latency_seconds = 120;

private ulong maxProcessableId;
private ulong lastProcessedId;

Expand Down Expand Up @@ -113,7 +101,7 @@ public async Task<int> OnExecuteAsync(CancellationToken cancellationToken)
while (!cancellationToken.IsCancellationRequested)
{
if (CheckSlaveLatency)
await checkSlaveLatency(db, cancellationToken);
await SlaveLatencyChecker.CheckSlaveLatency(db, cancellationToken);

Console.WriteLine($"Fetching next scores from {lastProcessedId}...");
var highScores = await db.QueryAsync<HighScore>($"SELECT h.*, s.id as new_id, s.user_id as new_user_id FROM {highScoreTable} h "
Expand Down Expand Up @@ -269,33 +257,5 @@ private ulong getMaxProcessable(Ruleset ruleset)
}
}
}

private async Task checkSlaveLatency(MySqlConnection db, CancellationToken cancellationToken)
{
long currentTimestamp = DateTimeOffset.Now.ToUnixTimeSeconds();

if (currentTimestamp - lastLatencyCheckTimestamp < seconds_between_latency_checks)
return;

lastLatencyCheckTimestamp = DateTimeOffset.Now.ToUnixTimeSeconds();

int? latency;

do
{
// This latency is best-effort, and randomly queried from available hosts (with rough precedence of the importance of the host).
// When we detect a high latency value, a recovery period should be introduced where we are pretty sure that we're back in a good
// state before resuming operations.
latency = db.QueryFirstOrDefault<int?>("SELECT `count` FROM `osu_counts` WHERE NAME = 'slave_latency'");

if (latency == null || latency < maximum_slave_latency_seconds)
return;

Console.WriteLine($"Current slave latency of {latency} seconds exceeded maximum of {maximum_slave_latency_seconds} seconds.");
Console.WriteLine("Sleeping to allow catch-up.");

await Task.Delay(maximum_slave_latency_seconds * 1000, cancellationToken);
} while (latency > maximum_slave_latency_seconds);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using MySqlConnector;

namespace osu.Server.Queues.ScoreStatisticsProcessor.Helpers
{
public static class SlaveLatencyChecker
{
/// <summary>
/// The number of seconds between checks for slave latency.
/// </summary>
private const int seconds_between_latency_checks = 60;

/// <summary>
/// The latency a slave is allowed to fall behind before we start to panic.
/// </summary>
private const int maximum_slave_latency_seconds = 120;

private static long lastLatencyCheckTimestamp;

public static async Task CheckSlaveLatency(MySqlConnection db, CancellationToken cancellationToken)
{
long currentTimestamp = DateTimeOffset.Now.ToUnixTimeSeconds();

if (currentTimestamp - lastLatencyCheckTimestamp < seconds_between_latency_checks)
return;

lastLatencyCheckTimestamp = DateTimeOffset.Now.ToUnixTimeSeconds();

int? latency;

do
{
// This latency is best-effort, and randomly queried from available hosts (with rough precedence of the importance of the host).
// When we detect a high latency value, a recovery period should be introduced where we are pretty sure that we're back in a good
// state before resuming operations.
latency = db.QueryFirstOrDefault<int?>("SELECT `count` FROM `osu_counts` WHERE NAME = 'slave_latency'");

if (latency == null || latency < maximum_slave_latency_seconds)
return;

Console.WriteLine($"Current slave latency of {latency} seconds exceeded maximum of {maximum_slave_latency_seconds} seconds.");
Console.WriteLine("Sleeping to allow catch-up.");

await Task.Delay(maximum_slave_latency_seconds * 1000, cancellationToken);
} while (latency > maximum_slave_latency_seconds);
}
}
}

0 comments on commit ae99a8f

Please sign in to comment.