Skip to content

Commit

Permalink
Merge pull request #12 from tibber/toni/resubscribe
Browse files Browse the repository at this point in the history
Fix resubscription for graphql-ws
  • Loading branch information
toini authored Oct 1, 2024
2 parents 7bc46c5 + 501d964 commit 2b17c1d
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions src/Tibber.Sdk/RealTimeMeasurementListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,18 @@ private void UnsubscribeObserver(HomeStreamObserverCollection collection, IObser
collection.Observers.Remove(observer);
}

private async Task ResubscribeStream(Guid homeId, int subscriptionId, CancellationToken cancellationToken)
{
await UnsubscribeStream(subscriptionId, cancellationToken);
await SubscribeStream(homeId, subscriptionId, cancellationToken);
}

private async Task SubscribeStream(Guid homeId, int subscriptionId, CancellationToken cancellationToken)
{
Trace.WriteLine($"subscribe to {homeId}");
Trace.WriteLine($"subscribe to home id {homeId} with subscription id {subscriptionId}");

await ExecuteStreamRequest(
//$@"{{""payload"":{{""query"":""subscription{{testMeasurement(count:2, complete:false){{timestamp,power,powerReactive,powerProduction,powerProductionReactive,accumulatedConsumption,accumulatedConsumptionLastHour,accumulatedProduction,accumulatedProductionLastHour,accumulatedCost,accumulatedReward,currency,minPower,averagePower,maxPower,minPowerProduction,maxPowerProduction,voltagePhase1,voltagePhase2,voltagePhase3,currentL1,currentL2,currentL3,lastMeterConsumption,lastMeterProduction,powerFactor,signalStrength}}}}"",""variables"":{{}},""extensions"":{{}}}},""type"":""subscribe"",""id"":""{subscriptionId}""}}",
$@"{{""payload"":{{""query"":""subscription{{liveMeasurement(homeId:\""{homeId}\""){{timestamp,power,powerReactive,powerProduction,powerProductionReactive,accumulatedConsumption,accumulatedConsumptionLastHour,accumulatedProduction,accumulatedProductionLastHour,accumulatedCost,accumulatedReward,currency,minPower,averagePower,maxPower,minPowerProduction,maxPowerProduction,voltagePhase1,voltagePhase2,voltagePhase3,currentL1,currentL2,currentL3,lastMeterConsumption,lastMeterProduction,powerFactor,signalStrength}}}}"",""variables"":{{}},""extensions"":{{}}}},""type"":""subscribe"",""id"":""{subscriptionId}""}}",
cancellationToken);

Expand All @@ -189,13 +196,14 @@ await ExecuteStreamRequest(
private async Task UnsubscribeStream(int subscriptionId, CancellationToken cancellationToken)
{
Trace.WriteLine($"unsubscribe subscription with id {subscriptionId}");
await ExecuteStreamRequest($@"{{""type"":""complete"",""id"":{subscriptionId}}}", cancellationToken);
await ExecuteStreamRequest($@"{{""type"":""complete"",""id"":""{subscriptionId}""}}", cancellationToken);
}

private Task ExecuteStreamRequest(string request, CancellationToken cancellationToken)
{
var stopSubscriptionRequest = new ArraySegment<byte>(Encoding.ASCII.GetBytes(request));
return _wssClient.SendAsync(stopSubscriptionRequest, WebSocketMessageType.Text, true, cancellationToken);
Trace.WriteLine($"send message; client state {_wssClient.State} {_wssClient.CloseStatus} {_wssClient.CloseStatusDescription} {request}");
var requestBytes = new ArraySegment<byte>(Encoding.ASCII.GetBytes(request));
return _wssClient.SendAsync(requestBytes, WebSocketMessageType.Text, true, cancellationToken);
}

private async Task Initialize(Uri websocketSubscriptionUrl, CancellationToken cancellationToken)
Expand All @@ -216,7 +224,7 @@ private async Task Initialize(Uri websocketSubscriptionUrl, CancellationToken ca

Trace.WriteLine("web socket connected");

var connectionInitMessage = new WebSocketConnectionInitMessage{ Payload = connectionInitPayload };
var connectionInitMessage = new WebSocketConnectionInitMessage { Payload = connectionInitPayload };
var json = JsonConvert.SerializeObject(connectionInitMessage, TibberApiClient.JsonSerializerSettings);
var init = new ArraySegment<byte>(Encoding.UTF8.GetBytes(json));

Expand Down Expand Up @@ -253,6 +261,7 @@ private async void StartListening()

do
{
Trace.WriteLine($"receive message; client state {_wssClient.State} {_wssClient.CloseStatus} {_wssClient.CloseStatusDescription}");
result = await _wssClient.ReceiveAsync(_receiveBuffer, _cancellationTokenSource.Token);
var json = Encoding.ASCII.GetString(_receiveBuffer.Array, 0, result.Count);
stringBuilder.Append(json);
Expand Down Expand Up @@ -285,7 +294,7 @@ private async void StartListening()
if (!_cancellationTokenSource.IsCancellationRequested)
{
Trace.WriteLine("connection re-established; re-initialize data streams");
SubscribeStreams(c => true);
ResubscribeStreams(c => true);
continue;
}
}
Expand Down Expand Up @@ -315,6 +324,7 @@ private async void StartListening()
continue;

homeStreamObserverCollection.LastMessageReceivedAt = DateTimeOffset.UtcNow;
homeStreamObserverCollection.ReconnectionAttempts = 0;

foreach (var message in measurementGroup)
{
Expand Down Expand Up @@ -364,13 +374,13 @@ private async void StartListening()
} while (!_cancellationTokenSource.IsCancellationRequested);
}

private void SubscribeStreams(Func<HomeStreamObserverCollection, bool> predicate)
private void ResubscribeStreams(Func<HomeStreamObserverCollection, bool> predicate)
{
lock (_homeObservables)
{
var subscriptionTask = (Task)Task.FromResult(0);
foreach (var collection in _homeObservables.Values.Where(predicate))
subscriptionTask = subscriptionTask.ContinueWith(_ => SubscribeStream(collection.Observable.HomeId, collection.Observable.SubscriptionId, _cancellationTokenSource.Token));
subscriptionTask = subscriptionTask.ContinueWith(_ => ResubscribeStream(collection.Observable.HomeId, collection.Observable.SubscriptionId, _cancellationTokenSource.Token));
}
}

Expand Down Expand Up @@ -441,9 +451,9 @@ private async Task TryReconnect()
{
try
{
var delay = GetDelaySeconds(failures);
Trace.WriteLine($"retrying to connect in {delay} seconds");
await Task.Delay(TimeSpan.FromSeconds(delay), _cancellationTokenSource.Token);
var delay = GetDelay(failures);
Trace.WriteLine($"retrying to connect in {delay.TotalSeconds} seconds");
await Task.Delay(delay, _cancellationTokenSource.Token);

Trace.WriteLine("check there is a valid real time device");
var homes = await _tibberApiClient.ValidateRealtimeDevice();
Expand All @@ -464,20 +474,31 @@ private void CheckDataStreamAlive(object state)
{
var now = DateTimeOffset.UtcNow;

SubscribeStreams(
ResubscribeStreams(
c =>
{
var sinceLastMessageMs = (now - c.LastMessageReceivedAt).TotalMilliseconds;
if (sinceLastMessageMs <= StreamReSubscriptionCheckPeriodMs)
return false;

Trace.WriteLine($"home {c.Observable.HomeId} subscription {c.Observable.SubscriptionId}: no data received during last {sinceLastMessageMs:N0} ms; re-initialize data stream");
c.LastMessageReceivedAt = now;
// Data not received during past minute; delay exponentially and then resubscribe
var sinceLastReconnectionMs = (now - c.LastReconnectionAttemptAt).TotalMilliseconds;
var delay = GetDelay(c.ReconnectionAttempts);
if (sinceLastReconnectionMs <= delay.TotalMilliseconds)
{
Trace.WriteLine($"{now:yyyy-MM-dd HH:mm:ss.fff zzz} home {c.Observable.HomeId} subscription {c.Observable.SubscriptionId}: no data received during last {sinceLastMessageMs:N0} ms; reconnection attempts {c.ReconnectionAttempts}; resubscription delay {delay.TotalSeconds}s not passed yet");
return false;
}

Trace.WriteLine($"{now:yyyy-MM-dd HH:mm:ss.fff zzz} home {c.Observable.HomeId} subscription {c.Observable.SubscriptionId}: no data received during last {sinceLastMessageMs:N0} ms; reconnection attempts {c.ReconnectionAttempts}; re-initialize data stream");
c.ReconnectionAttempts++;
c.LastReconnectionAttemptAt = now;

return true;
});
}

private static int GetDelaySeconds(int failures)
private static TimeSpan GetDelay(int failures)
{
// Jitter of 5 to 60 seconds
var jitter = Random.Next(5, 60);
Expand All @@ -487,7 +508,7 @@ private static int GetDelaySeconds(int failures)

// Max one day 60 * 60 * 24
const double oneDayInSeconds = (double)60 * 60 * 24;
return jitter + (int)Math.Min(delay, oneDayInSeconds);
return TimeSpan.FromSeconds(jitter + (int)Math.Min(delay, oneDayInSeconds));
}

private class WebSocketConnectionInitMessage
Expand Down Expand Up @@ -517,13 +538,18 @@ private class WebSocketData
{
[JsonProperty("liveMeasurement")]
public RealTimeMeasurement RealTimeMeasurement { get; set; }

[JsonProperty("testMeasurement")]
public RealTimeMeasurement TestMeasurement { set { RealTimeMeasurement = value; } }
}

private class HomeStreamObserverCollection
{
public readonly List<IObserver<RealTimeMeasurement>> Observers = new();
public HomeRealTimeMeasurementObservable Observable;
public DateTimeOffset LastMessageReceivedAt = DateTimeOffset.MaxValue;
public DateTimeOffset LastReconnectionAttemptAt = DateTimeOffset.MinValue;
public int ReconnectionAttempts = 0;
}

private class Unsubscriber : IDisposable
Expand Down

0 comments on commit 2b17c1d

Please sign in to comment.