Skip to content
This repository has been archived by the owner on Oct 5, 2023. It is now read-only.

Commit

Permalink
Fix memory leak on client disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
Slash Nephy committed Nov 8, 2018
1 parent ab58f00 commit 14b44e1
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 72 deletions.
24 changes: 15 additions & 9 deletions src/main/kotlin/jp/nephy/tweetstorm/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import jp.nephy.jsonkt.nullableString
import jp.nephy.jsonkt.parse
import jp.nephy.penicillin.PenicillinClient
import jp.nephy.penicillin.core.allIds
import kotlinx.io.core.use
import java.nio.file.Path
import java.nio.file.Paths

Expand Down Expand Up @@ -104,20 +105,25 @@ data class Config(override val json: ImmutableJsonObject): JsonModel {
val activity by lazy { activitySec?.times(1000) ?: activityMs }
}

val twitter = PenicillinClient {
account {
application(ck, cs)
token(at, ats)
val twitter: PenicillinClient
get() = PenicillinClient {
account {
application(ck, cs)
token(at, ats)
}
skipEmulationChecking()
httpClient(Apache)
}
skipEmulationChecking()
httpClient(Apache)
}

val user by lazy {
twitter.account.verifyCredentials().complete().result
twitter.use {
it.account.verifyCredentials().complete().result
}
}
val friends by lazy {
twitter.friend.listIds(count = 5000).untilLast().allIds
twitter.use {
it.friend.listIds(count = 5000).untilLast().allIds
}
}
}
}
29 changes: 16 additions & 13 deletions src/main/kotlin/jp/nephy/tweetstorm/TaskManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable {
it += initialStream
}
private val streamsMutex = Mutex()
private val twitterClient = account.twitter

fun anyClients(): Boolean {
return streams.isNotEmpty()
Expand All @@ -37,46 +38,46 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable {
private val tasks = object {
val produce = mutableListOf<ProduceTask<*>>().also {
if (account.listId != null) {
it += ListTimeline(account)
it += ListTimeline(account, twitterClient)

val listContainsSelf = try {
account.twitter.list.member(listId = account.listId, userId = account.user.id).complete()
twitterClient.list.member(listId = account.listId, userId = account.user.id).complete()
true
} catch (e: PenicillinException) {
false
}

if (listContainsSelf && account.syncList.enabled) {
it += UserTimeline(account) { status ->
it += UserTimeline(account, twitterClient) { status ->
status.retweetedStatus == null && status.inReplyToUserId != null && status.inReplyToUserId!! !in account.friends
}
it += MentionTimeline(account) { status ->
it += MentionTimeline(account, twitterClient) { status ->
status.user.id !in account.friends
}
} else {
it += UserTimeline(account)
it += MentionTimeline(account)
it += UserTimeline(account, twitterClient)
it += MentionTimeline(account, twitterClient)
}
} else {
it += HomeTimeline(account)
it += UserTimeline(account) { status ->
it += HomeTimeline(account, twitterClient)
it += UserTimeline(account, twitterClient) { status ->
status.retweetedStatus == null && status.inReplyToUserId != null && status.inReplyToUserId!! !in account.friends
}
it += MentionTimeline(account) { status ->
it += MentionTimeline(account, twitterClient) { status ->
status.user.id !in account.friends
}
}

if (account.enableDirectMessage) {
it += DirectMessage(account)
it += DirectMessage(account, twitterClient)
}

if (account.filterStream.tracks.isNotEmpty() || account.filterStream.follows.isNotEmpty()) {
it += FilterStream(account)
it += FilterStream(account, twitterClient)
}

if (account.enableSampleStream) {
it += SampleStream(account)
it += SampleStream(account, twitterClient)
}

if (account.enableActivity && account.t4i.at != null && account.t4i.ats != null) {
Expand All @@ -88,7 +89,7 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable {

val regular = mutableListOf<RegularTask>().also {
if (account.syncList.enabled && account.listId != null) {
it += SyncList(account)
it += SyncList(account, twitterClient)
}
}
}
Expand Down Expand Up @@ -185,6 +186,8 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable {
}

override fun close() {
twitterClient.close()

runBlocking(Dispatchers.Default) {
masterJob.cancelChildren()
masterJob.cancelAndJoin()
Expand Down
22 changes: 11 additions & 11 deletions src/main/kotlin/jp/nephy/tweetstorm/task/producer/Activity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext

class Activity(account: Config.Account): ProduceTask<JsonData>(account) {
private val twitter = PenicillinClient {
account {
application(OfficialClient.OAuth1a.TwitterForiPhone)
token(account.t4i.at!!, account.t4i.ats!!)
}
emulationMode = EmulationMode.TwitterForiPhone
skipEmulationChecking()
}

@ExperimentalCoroutinesApi
override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) {
val lastId = atomic(0L)
while (isActive) {
try {
val activities = twitter.activity.aboutMe().awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS) ?: continue
val activities = PenicillinClient {
account {
application(OfficialClient.OAuth1a.TwitterForiPhone)
token(account.t4i.at!!, account.t4i.ats!!)
}
emulationMode = EmulationMode.TwitterForiPhone
skipEmulationChecking()
}.use {
it.activity.aboutMe().awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS)
} ?: continue

if (activities.isNotEmpty()) {
val lastIdOrNull = if (lastId.value > 0) lastId.value else null
if (lastIdOrNull != null) {
Expand Down Expand Up @@ -95,7 +96,6 @@ class Activity(account: Config.Account): ProduceTask<JsonData>(account) {

delay(account.refresh.activity)
} catch (e: CancellationException) {
twitter.close()
break
} catch (e: PenicillinException) {
if (e.error == TwitterErrorMessage.RateLimitExceeded) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package jp.nephy.tweetstorm.task.producer

import jp.nephy.penicillin.PenicillinClient
import jp.nephy.penicillin.core.PenicillinException
import jp.nephy.penicillin.core.TwitterErrorMessage
import jp.nephy.tweetstorm.Config
Expand All @@ -17,13 +18,13 @@ import java.time.Instant
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext

class DirectMessage(account: Config.Account): ProduceTask<JsonModelData>(account) {
class DirectMessage(account: Config.Account, private val client: PenicillinClient): ProduceTask<JsonModelData>(account) {
@ExperimentalCoroutinesApi
override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) {
val lastId = atomic(0L)
while (isActive) {
try {
val messages = account.twitter.directMessageEvent.list(count = 200).awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS) ?: continue
val messages = client.directMessageEvent.list(count = 200).awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS) ?: continue
if (messages.result.events.isNotEmpty()) {
val lastIdOrNull = if (lastId.value > 0) lastId.value else null
if (lastIdOrNull != null) {
Expand Down
20 changes: 10 additions & 10 deletions src/main/kotlin/jp/nephy/tweetstorm/task/producer/FilterStream.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jp.nephy.tweetstorm.task.producer

import jp.nephy.jsonkt.ImmutableJsonObject
import jp.nephy.penicillin.PenicillinClient
import jp.nephy.penicillin.core.streaming.FilterStreamListener
import jp.nephy.tweetstorm.Config
import jp.nephy.tweetstorm.task.ProduceTask
Expand All @@ -9,10 +10,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlin.coroutines.CoroutineContext

class FilterStream(account: Config.Account): ProduceTask<JsonObjectData>(account) {
class FilterStream(account: Config.Account, private val client: PenicillinClient): ProduceTask<JsonObjectData>(account) {
@ExperimentalCoroutinesApi
override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) {
val stream = account.twitter.stream.filter(track = account.filterStream.tracks, follow = account.filterStream.follows).await().listen(object: FilterStreamListener {
client.stream.filter(track = account.filterStream.tracks, follow = account.filterStream.follows).await().listen(object: FilterStreamListener {
override suspend fun onAnyJson(json: ImmutableJsonObject) {
send(JsonObjectData(json))
}
Expand All @@ -24,14 +25,13 @@ class FilterStream(account: Config.Account): ProduceTask<JsonObjectData>(account
override suspend fun onDisconnect() {
logger.warn { "Disconnected from FilterStream." }
}
}).startAsync(autoReconnect = true)

while (isActive) {
try {
delay(1000)
} catch (e: CancellationException) {
stream.close()
break
}).startAsync(autoReconnect = true).use {
while (isActive) {
try {
delay(1000)
} catch (e: CancellationException) {
break
}
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/main/kotlin/jp/nephy/tweetstorm/task/producer/SampleStream.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jp.nephy.tweetstorm.task.producer

import jp.nephy.jsonkt.ImmutableJsonObject
import jp.nephy.penicillin.PenicillinClient
import jp.nephy.penicillin.core.streaming.SampleStreamListener
import jp.nephy.tweetstorm.Config
import jp.nephy.tweetstorm.task.ProduceTask
Expand All @@ -9,10 +10,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlin.coroutines.CoroutineContext

class SampleStream(account: Config.Account): ProduceTask<JsonObjectData>(account) {
class SampleStream(account: Config.Account, private val client: PenicillinClient): ProduceTask<JsonObjectData>(account) {
@ExperimentalCoroutinesApi
override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) {
val stream = account.twitter.stream.sample().await().listen(object: SampleStreamListener {
client.stream.sample().await().listen(object: SampleStreamListener {
override suspend fun onAnyJson(json: ImmutableJsonObject) {
send(JsonObjectData(json))
}
Expand All @@ -24,14 +25,13 @@ class SampleStream(account: Config.Account): ProduceTask<JsonObjectData>(account
override suspend fun onDisconnect() {
logger.warn { "Disconnected from SampleStream." }
}
}).startAsync(autoReconnect = true)

while (isActive) {
try {
delay(1000)
} catch (e: CancellationException) {
stream.close()
break
}).startAsync(autoReconnect = true).use {
while (isActive) {
try {
delay(1000)
} catch (e: CancellationException) {
break
}
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions src/main/kotlin/jp/nephy/tweetstorm/task/producer/Timeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jp.nephy.tweetstorm.task.producer

import jp.nephy.jsonkt.asMutable
import jp.nephy.jsonkt.string
import jp.nephy.penicillin.PenicillinClient
import jp.nephy.penicillin.core.PenicillinException
import jp.nephy.penicillin.core.PenicillinJsonArrayAction
import jp.nephy.penicillin.core.TwitterErrorMessage
Expand All @@ -20,20 +21,20 @@ import java.time.Instant
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext

class ListTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.listTimeline, {
account.twitter.list.timeline(listId = account.listId, count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
class ListTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.listTimeline, {
client.list.timeline(listId = account.listId, count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
}, filter)

class HomeTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.homeTimeline, {
account.twitter.timeline.home(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
class HomeTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.homeTimeline, {
client.timeline.home(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
}, filter)

class UserTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.userTimeline, {
account.twitter.timeline.user(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
class UserTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.userTimeline, {
client.timeline.user(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
}, filter)

class MentionTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.mentionTimeline, {
account.twitter.timeline.mention(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
class MentionTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.mentionTimeline, {
client.timeline.mention(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended")
}, filter)

abstract class TimelineTask(account: Config.Account, private val time: Long, private val source: (lastId: Long?) -> PenicillinJsonArrayAction<Status>, private val filter: (Status) -> Boolean): ProduceTask<JsonObjectData>(account) {
Expand Down
16 changes: 7 additions & 9 deletions src/main/kotlin/jp/nephy/tweetstorm/task/regular/SyncList.kt
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
package jp.nephy.tweetstorm.task.regular

import jp.nephy.penicillin.PenicillinClient
import jp.nephy.penicillin.core.allIds
import jp.nephy.penicillin.core.allUsers
import jp.nephy.tweetstorm.Config
import jp.nephy.tweetstorm.task.RegularTask
import java.util.concurrent.TimeUnit

class SyncList(account: Config.Account): RegularTask(account, 5, TimeUnit.MINUTES) {
class SyncList(account: Config.Account, private val client: PenicillinClient): RegularTask(account, 5, TimeUnit.MINUTES) {
override suspend fun run() {
val followingIds = if (account.syncList.includeSelf) {
account.twitter.friend.listIds(count = 5000).untilLast().allIds + account.user.id
client.friend.listIds(count = 5000).untilLast().allIds + account.user.id
} else {
account.twitter.friend.listIds(count = 5000).untilLast().allIds
client.friend.listIds(count = 5000).untilLast().allIds
}

if (followingIds.size > 5000) {
logger.warn { "This list exceeded 5000 members limit." }
return
}

val listMemberIds = account.twitter.list.members(listId = account.listId, count = 5000).untilLast().allUsers.map { it.id }

val listMemberIds = client.list.members(listId = account.listId, count = 5000).untilLast().allUsers.map { it.id }
val willBeRemoved = listMemberIds - followingIds
if (willBeRemoved.isNotEmpty()) {
willBeRemoved.chunked(100).forEach {
account.twitter.list.removeMembers(listId = account.listId, userIds = it).await()
client.list.removeMembers(listId = account.listId, userIds = it).await()
}
logger.debug { "Removing ${willBeRemoved.size} user(s)." }
}

val willBeAdded = followingIds - listMemberIds
if (willBeAdded.isNotEmpty()) {
willBeAdded.chunked(100).forEach {
account.twitter.list.addMembers(listId = account.listId, userIds = it).await()
client.list.addMembers(listId = account.listId, userIds = it).await()
}
logger.debug { "Adding ${willBeAdded.size} user(s)." }
}
Expand Down

0 comments on commit 14b44e1

Please sign in to comment.