diff --git a/src/main/kotlin/jp/nephy/tweetstorm/Config.kt b/src/main/kotlin/jp/nephy/tweetstorm/Config.kt index a44aadb..44feb52 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/Config.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/Config.kt @@ -8,6 +8,7 @@ import jp.nephy.jsonkt.nullableString import jp.nephy.jsonkt.parse import jp.nephy.penicillin.PenicillinClient import jp.nephy.penicillin.core.allIds +import kotlinx.io.core.use import java.nio.file.Path import java.nio.file.Paths @@ -104,20 +105,25 @@ data class Config(override val json: ImmutableJsonObject): JsonModel { val activity by lazy { activitySec?.times(1000) ?: activityMs } } - val twitter = PenicillinClient { - account { - application(ck, cs) - token(at, ats) + val twitter: PenicillinClient + get() = PenicillinClient { + account { + application(ck, cs) + token(at, ats) + } + skipEmulationChecking() + httpClient(Apache) } - skipEmulationChecking() - httpClient(Apache) - } val user by lazy { - twitter.account.verifyCredentials().complete().result + twitter.use { + it.account.verifyCredentials().complete().result + } } val friends by lazy { - twitter.friend.listIds(count = 5000).untilLast().allIds + twitter.use { + it.friend.listIds(count = 5000).untilLast().allIds + } } } } diff --git a/src/main/kotlin/jp/nephy/tweetstorm/TaskManager.kt b/src/main/kotlin/jp/nephy/tweetstorm/TaskManager.kt index eefafcc..237a43a 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/TaskManager.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/TaskManager.kt @@ -29,6 +29,7 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable { it += initialStream } private val streamsMutex = Mutex() + private val twitterClient = account.twitter fun anyClients(): Boolean { return streams.isNotEmpty() @@ -37,46 +38,46 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable { private val tasks = object { val produce = mutableListOf>().also { if (account.listId != null) { - it += ListTimeline(account) + it += ListTimeline(account, twitterClient) val listContainsSelf = try { - account.twitter.list.member(listId = account.listId, userId = account.user.id).complete() + twitterClient.list.member(listId = account.listId, userId = account.user.id).complete() true } catch (e: PenicillinException) { false } if (listContainsSelf && account.syncList.enabled) { - it += UserTimeline(account) { status -> + it += UserTimeline(account, twitterClient) { status -> status.retweetedStatus == null && status.inReplyToUserId != null && status.inReplyToUserId!! !in account.friends } - it += MentionTimeline(account) { status -> + it += MentionTimeline(account, twitterClient) { status -> status.user.id !in account.friends } } else { - it += UserTimeline(account) - it += MentionTimeline(account) + it += UserTimeline(account, twitterClient) + it += MentionTimeline(account, twitterClient) } } else { - it += HomeTimeline(account) - it += UserTimeline(account) { status -> + it += HomeTimeline(account, twitterClient) + it += UserTimeline(account, twitterClient) { status -> status.retweetedStatus == null && status.inReplyToUserId != null && status.inReplyToUserId!! !in account.friends } - it += MentionTimeline(account) { status -> + it += MentionTimeline(account, twitterClient) { status -> status.user.id !in account.friends } } if (account.enableDirectMessage) { - it += DirectMessage(account) + it += DirectMessage(account, twitterClient) } if (account.filterStream.tracks.isNotEmpty() || account.filterStream.follows.isNotEmpty()) { - it += FilterStream(account) + it += FilterStream(account, twitterClient) } if (account.enableSampleStream) { - it += SampleStream(account) + it += SampleStream(account, twitterClient) } if (account.enableActivity && account.t4i.at != null && account.t4i.ats != null) { @@ -88,7 +89,7 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable { val regular = mutableListOf().also { if (account.syncList.enabled && account.listId != null) { - it += SyncList(account) + it += SyncList(account, twitterClient) } } } @@ -185,6 +186,8 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable { } override fun close() { + twitterClient.close() + runBlocking(Dispatchers.Default) { masterJob.cancelChildren() masterJob.cancelAndJoin() diff --git a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Activity.kt b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Activity.kt index 079bde5..ac3af79 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Activity.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Activity.kt @@ -26,21 +26,22 @@ import java.util.concurrent.TimeUnit import kotlin.coroutines.CoroutineContext class Activity(account: Config.Account): ProduceTask(account) { - private val twitter = PenicillinClient { - account { - application(OfficialClient.OAuth1a.TwitterForiPhone) - token(account.t4i.at!!, account.t4i.ats!!) - } - emulationMode = EmulationMode.TwitterForiPhone - skipEmulationChecking() - } - @ExperimentalCoroutinesApi override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) { val lastId = atomic(0L) while (isActive) { try { - val activities = twitter.activity.aboutMe().awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS) ?: continue + val activities = PenicillinClient { + account { + application(OfficialClient.OAuth1a.TwitterForiPhone) + token(account.t4i.at!!, account.t4i.ats!!) + } + emulationMode = EmulationMode.TwitterForiPhone + skipEmulationChecking() + }.use { + it.activity.aboutMe().awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS) + } ?: continue + if (activities.isNotEmpty()) { val lastIdOrNull = if (lastId.value > 0) lastId.value else null if (lastIdOrNull != null) { @@ -95,7 +96,6 @@ class Activity(account: Config.Account): ProduceTask(account) { delay(account.refresh.activity) } catch (e: CancellationException) { - twitter.close() break } catch (e: PenicillinException) { if (e.error == TwitterErrorMessage.RateLimitExceeded) { diff --git a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/DirectMessage.kt b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/DirectMessage.kt index dbc6163..0a955c8 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/DirectMessage.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/DirectMessage.kt @@ -1,5 +1,6 @@ package jp.nephy.tweetstorm.task.producer +import jp.nephy.penicillin.PenicillinClient import jp.nephy.penicillin.core.PenicillinException import jp.nephy.penicillin.core.TwitterErrorMessage import jp.nephy.tweetstorm.Config @@ -17,13 +18,13 @@ import java.time.Instant import java.util.concurrent.TimeUnit import kotlin.coroutines.CoroutineContext -class DirectMessage(account: Config.Account): ProduceTask(account) { +class DirectMessage(account: Config.Account, private val client: PenicillinClient): ProduceTask(account) { @ExperimentalCoroutinesApi override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) { val lastId = atomic(0L) while (isActive) { try { - val messages = account.twitter.directMessageEvent.list(count = 200).awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS) ?: continue + val messages = client.directMessageEvent.list(count = 200).awaitWithTimeout(config.app.apiTimeout, TimeUnit.MILLISECONDS) ?: continue if (messages.result.events.isNotEmpty()) { val lastIdOrNull = if (lastId.value > 0) lastId.value else null if (lastIdOrNull != null) { diff --git a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/FilterStream.kt b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/FilterStream.kt index 936db13..1268985 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/FilterStream.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/FilterStream.kt @@ -1,6 +1,7 @@ package jp.nephy.tweetstorm.task.producer import jp.nephy.jsonkt.ImmutableJsonObject +import jp.nephy.penicillin.PenicillinClient import jp.nephy.penicillin.core.streaming.FilterStreamListener import jp.nephy.tweetstorm.Config import jp.nephy.tweetstorm.task.ProduceTask @@ -9,10 +10,10 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.produce import kotlin.coroutines.CoroutineContext -class FilterStream(account: Config.Account): ProduceTask(account) { +class FilterStream(account: Config.Account, private val client: PenicillinClient): ProduceTask(account) { @ExperimentalCoroutinesApi override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) { - val stream = account.twitter.stream.filter(track = account.filterStream.tracks, follow = account.filterStream.follows).await().listen(object: FilterStreamListener { + client.stream.filter(track = account.filterStream.tracks, follow = account.filterStream.follows).await().listen(object: FilterStreamListener { override suspend fun onAnyJson(json: ImmutableJsonObject) { send(JsonObjectData(json)) } @@ -24,14 +25,13 @@ class FilterStream(account: Config.Account): ProduceTask(account override suspend fun onDisconnect() { logger.warn { "Disconnected from FilterStream." } } - }).startAsync(autoReconnect = true) - - while (isActive) { - try { - delay(1000) - } catch (e: CancellationException) { - stream.close() - break + }).startAsync(autoReconnect = true).use { + while (isActive) { + try { + delay(1000) + } catch (e: CancellationException) { + break + } } } } diff --git a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/SampleStream.kt b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/SampleStream.kt index a03c9dc..b02b8d4 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/SampleStream.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/SampleStream.kt @@ -1,6 +1,7 @@ package jp.nephy.tweetstorm.task.producer import jp.nephy.jsonkt.ImmutableJsonObject +import jp.nephy.penicillin.PenicillinClient import jp.nephy.penicillin.core.streaming.SampleStreamListener import jp.nephy.tweetstorm.Config import jp.nephy.tweetstorm.task.ProduceTask @@ -9,10 +10,10 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.produce import kotlin.coroutines.CoroutineContext -class SampleStream(account: Config.Account): ProduceTask(account) { +class SampleStream(account: Config.Account, private val client: PenicillinClient): ProduceTask(account) { @ExperimentalCoroutinesApi override fun channel(context: CoroutineContext, parent: Job) = GlobalScope.produce(context + parent) { - val stream = account.twitter.stream.sample().await().listen(object: SampleStreamListener { + client.stream.sample().await().listen(object: SampleStreamListener { override suspend fun onAnyJson(json: ImmutableJsonObject) { send(JsonObjectData(json)) } @@ -24,14 +25,13 @@ class SampleStream(account: Config.Account): ProduceTask(account override suspend fun onDisconnect() { logger.warn { "Disconnected from SampleStream." } } - }).startAsync(autoReconnect = true) - - while (isActive) { - try { - delay(1000) - } catch (e: CancellationException) { - stream.close() - break + }).startAsync(autoReconnect = true).use { + while (isActive) { + try { + delay(1000) + } catch (e: CancellationException) { + break + } } } } diff --git a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Timeline.kt b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Timeline.kt index 0a555be..7281976 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Timeline.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/task/producer/Timeline.kt @@ -2,6 +2,7 @@ package jp.nephy.tweetstorm.task.producer import jp.nephy.jsonkt.asMutable import jp.nephy.jsonkt.string +import jp.nephy.penicillin.PenicillinClient import jp.nephy.penicillin.core.PenicillinException import jp.nephy.penicillin.core.PenicillinJsonArrayAction import jp.nephy.penicillin.core.TwitterErrorMessage @@ -20,20 +21,20 @@ import java.time.Instant import java.util.concurrent.TimeUnit import kotlin.coroutines.CoroutineContext -class ListTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.listTimeline, { - account.twitter.list.timeline(listId = account.listId, count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") +class ListTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.listTimeline, { + client.list.timeline(listId = account.listId, count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") }, filter) -class HomeTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.homeTimeline, { - account.twitter.timeline.home(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") +class HomeTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.homeTimeline, { + client.timeline.home(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") }, filter) -class UserTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.userTimeline, { - account.twitter.timeline.user(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") +class UserTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.userTimeline, { + client.timeline.user(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") }, filter) -class MentionTimeline(account: Config.Account, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.mentionTimeline, { - account.twitter.timeline.mention(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") +class MentionTimeline(account: Config.Account, client: PenicillinClient, filter: (Status) -> Boolean = { true }): TimelineTask(account, account.refresh.mentionTimeline, { + client.timeline.mention(count = 200, sinceId = it, includeEntities = true, includeRTs = true, includeMyRetweet = true, tweetMode = "extended") }, filter) abstract class TimelineTask(account: Config.Account, private val time: Long, private val source: (lastId: Long?) -> PenicillinJsonArrayAction, private val filter: (Status) -> Boolean): ProduceTask(account) { diff --git a/src/main/kotlin/jp/nephy/tweetstorm/task/regular/SyncList.kt b/src/main/kotlin/jp/nephy/tweetstorm/task/regular/SyncList.kt index bc5490d..74632f7 100644 --- a/src/main/kotlin/jp/nephy/tweetstorm/task/regular/SyncList.kt +++ b/src/main/kotlin/jp/nephy/tweetstorm/task/regular/SyncList.kt @@ -1,38 +1,36 @@ package jp.nephy.tweetstorm.task.regular +import jp.nephy.penicillin.PenicillinClient import jp.nephy.penicillin.core.allIds import jp.nephy.penicillin.core.allUsers import jp.nephy.tweetstorm.Config import jp.nephy.tweetstorm.task.RegularTask import java.util.concurrent.TimeUnit -class SyncList(account: Config.Account): RegularTask(account, 5, TimeUnit.MINUTES) { +class SyncList(account: Config.Account, private val client: PenicillinClient): RegularTask(account, 5, TimeUnit.MINUTES) { override suspend fun run() { val followingIds = if (account.syncList.includeSelf) { - account.twitter.friend.listIds(count = 5000).untilLast().allIds + account.user.id + client.friend.listIds(count = 5000).untilLast().allIds + account.user.id } else { - account.twitter.friend.listIds(count = 5000).untilLast().allIds + client.friend.listIds(count = 5000).untilLast().allIds } if (followingIds.size > 5000) { logger.warn { "This list exceeded 5000 members limit." } return } - - val listMemberIds = account.twitter.list.members(listId = account.listId, count = 5000).untilLast().allUsers.map { it.id } - + val listMemberIds = client.list.members(listId = account.listId, count = 5000).untilLast().allUsers.map { it.id } val willBeRemoved = listMemberIds - followingIds if (willBeRemoved.isNotEmpty()) { willBeRemoved.chunked(100).forEach { - account.twitter.list.removeMembers(listId = account.listId, userIds = it).await() + client.list.removeMembers(listId = account.listId, userIds = it).await() } logger.debug { "Removing ${willBeRemoved.size} user(s)." } } - val willBeAdded = followingIds - listMemberIds if (willBeAdded.isNotEmpty()) { willBeAdded.chunked(100).forEach { - account.twitter.list.addMembers(listId = account.listId, userIds = it).await() + client.list.addMembers(listId = account.listId, userIds = it).await() } logger.debug { "Adding ${willBeAdded.size} user(s)." } }