Skip to content

Commit

Permalink
fix: add revoke event to delete all current subscriptions & disable t…
Browse files Browse the repository at this point in the history
…he user
  • Loading branch information
Geczy authored Jul 2, 2024
1 parent 499ea28 commit 66125bb
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 0 deletions.
2 changes: 2 additions & 0 deletions packages/twitch-events/src/SubscribeEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { transformBetData } from './twitch/events/transformers/transformBetData.
import { transformPollData } from './twitch/events/transformers/transformPollData.js'
import { offlineEvent } from './twitch/lib/offlineEvent.js'
import { onlineEvent } from './twitch/lib/onlineEvent.js'
import { revokeEvent } from './twitch/lib/revokeEvent.js'
import { updateUserEvent } from './twitch/lib/updateUserEvent.js'
import { DOTABOD_EVENTS_ROOM, eventsIOConnected, socketIo } from './utils/socketUtils.js'

Expand All @@ -17,6 +18,7 @@ export const SubscribeEvents = (accountIds: string[]) => {
const promises: any[] = []
accountIds.forEach((userId) => {
try {
promises.push(listener.onUserAuthorizationRevoke(userId, revokeEvent))
promises.push(listener.onStreamOnline(userId, onlineEvent))
promises.push(listener.onStreamOffline(userId, offlineEvent))
promises.push(listener.onUserUpdate(userId, updateUserEvent))
Expand Down
174 changes: 174 additions & 0 deletions packages/twitch-events/src/twitch/lib/revokeEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { getAppToken } from '@twurple/auth'
import type { EventSubUserAuthorizationRevokeEvent } from '@twurple/eventsub-base'
import supabase from '../../db/supabase'

async function disableChannel(broadcasterId: string) {
const { data: user } = await supabase
.from('accounts')
.select('userId')
.eq('providerAccountId', broadcasterId)
.single()

if (!user) {
console.log('twitch-events Failed to find user', broadcasterId)
return
}

const { data: settings } = await supabase
.from('settings')
.select('key, value')
.eq('userId', user?.userId)

if (!settings) {
console.log('twitch-events Failed to find settings', broadcasterId)
return
}

if (settings.find((s) => s.key === 'commandDisable' && s.value === true)) {
console.log('twitch-events User already disabled', broadcasterId)
return
}

console.log('twitch-events Disabling user', broadcasterId)
await supabase.from('settings').upsert(
{
userId: user.userId,
key: 'commandDisable',
value: true,
},
{
onConflict: 'userId, key',
},
)
}

export async function getTwitchHeaders(): Promise<Record<string, string>> {
const appToken = await getAppToken(
process.env.TWITCH_CLIENT_ID || '',
process.env.TWITCH_CLIENT_SECRET || '',
)

return {
'Client-Id': process.env.TWITCH_CLIENT_ID || '',
Authorization: `Bearer ${appToken?.accessToken}`,
Accept: 'application/json',
'Accept-Encoding': 'gzip',
}
}

const headers = await getTwitchHeaders()

export async function fetchSubscriptions(providerId: string, cursor?: string): Promise<any> {
const url = new URL('https://api.twitch.tv/helix/eventsub/subscriptions')
url.searchParams.append('user_id', providerId)
if (cursor) {
url.searchParams.append('after', cursor)
}

const response = await fetch(url.toString(), { method: 'GET', headers })
if (response.status !== 200) {
throw new Error(
`Failed to fetch subscriptions with providerId ${providerId}: ${
response.status
} // ${await response.text()}`,
)
}
return response.json()
}

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}

// Assuming this counter is defined outside of the deleteSubscription function
let fetchRequestCounter = 0
const maxRequestsBeforePause = 800
const pauseDuration = 65000 // 65 seconds in milliseconds

async function deleteSubscription(id: string) {
let retryDelay = 60 // Start with a 60 second delay
const maxRetries = 5 // Maximum number of retries
let attempt = 0 // Current attempt

while (attempt < maxRetries) {
// Check if we need to pause before making the next request
if (fetchRequestCounter % maxRequestsBeforePause === 0 && fetchRequestCounter !== 0) {
console.log(`Pausing for ${pauseDuration / 1000} seconds to avoid rate limit...`)
await sleep(pauseDuration) // Wait for 65 seconds
}

const response = await fetch(`https://api.twitch.tv/helix/eventsub/subscriptions?id=${id}`, {
method: 'DELETE',
headers,
})

fetchRequestCounter++ // Increment the request counter after each fetch

const rateLimitRemaining = Number.parseInt(
response.headers.get('ratelimit-remaining') || '0',
10,
)
const rateLimitReset =
Number.parseInt(response.headers.get('ratelimit-reset') || '0', 10) * 1000 // Convert to milliseconds
const currentTime = Date.now()

if (response.ok) {
console.log('Delete rate limit:', rateLimitRemaining)
return response // Exit function if request was successful
}

if (rateLimitRemaining === 0 && currentTime < rateLimitReset) {
// Calculate wait time until rate limit reset, plus a small buffer
const waitTime = rateLimitReset - currentTime + 100 // Adding a 100ms buffer
console.log(`Rate limit exceeded. Waiting for ${waitTime}ms`)
await sleep(waitTime) // Wait until rate limit is reset
attempt++ // Increment attempt counter
retryDelay *= 1.2 // Exponential back-off
} else {
// If the request failed for reasons other than rate limit, throw an error
throw new Error(
`Failed to delete subscription: ${response.status} // ${await response.text()}`,
)
}
}

throw new Error('Exceeded maximum retry attempts for deleteSubscription')
}

async function deleteAllSubscriptionsForProvider(providerId: string): Promise<void> {
let cursor: string | undefined
do {
// Fetch subscriptions for the given provider ID
const data = await fetchSubscriptions(providerId, cursor)
const subscriptionsForProvider = data.data.filter(
(sub: any) => sub.condition.broadcaster_user_id === providerId,
)

console.log('Found subscriptions', subscriptionsForProvider.length)

// Delete each subscription found for the provider
for (const sub of subscriptionsForProvider) {
await deleteSubscription(sub.id)
}

// Update cursor for next page of subscriptions, if any
cursor = data.pagination?.cursor
} while (cursor) // Continue until there are no more pages

console.log(`All subscriptions deleted for provider ID: ${providerId}`)
}

export async function revokeEvent(data: EventSubUserAuthorizationRevokeEvent) {
console.log(`${data.userId} just revoked`)

await deleteAllSubscriptionsForProvider(data.userId)

await supabase
.from('accounts')
.update({
requires_refresh: true,
})
.eq('providerAccountId', data.userId)

await disableChannel(data.userId)
}

0 comments on commit 66125bb

Please sign in to comment.