forked from Lynnesbian/mstdn-ebooks
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfetch_posts.py
executable file
·259 lines (220 loc) · 8.61 KB
/
fetch_posts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-only
import sys
import anyio
import aiohttp
import platform
import pendulum
import operator
import aiosqlite
import contextlib
from yarl import URL
from pleroma import Pleroma, HandleRateLimits
from bs4 import BeautifulSoup
from functools import partial
from typing import Iterable, NewType
from utils import shield, suppress, http_session_factory
from third_party.utils import extract_post_content
UTC = pendulum.timezone('UTC')
JSON_CONTENT_TYPE = 'application/json'
ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json'
MIGRATION_VERSION = 1
class PostFetcher:
def __init__(self, *, config):
self.config = config
self.erroneous_accounts = []
async def __aenter__(self):
stack = contextlib.AsyncExitStack()
self._fedi = await stack.enter_async_context(
Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']),
)
self._http = await stack.enter_async_context(
http_session_factory(
headers={'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE])},
trust_env=True,
raise_for_status=True,
),
)
self._rl_handler = HandleRateLimits(self._http)
self._db = await stack.enter_async_context(aiosqlite.connect(self.config['db_path']))
await self._maybe_run_migrations()
await self._db.commit()
self._db.row_factory = aiosqlite.Row
self._ctx_stack = stack
return self
async def _maybe_run_migrations(self):
async with self._db.cursor() as cur, suppress(aiosqlite.OperationalError):
if await (await cur.execute('SELECT migration_version FROM migrations')).fetchone(): return
await self._run_migrations()
async def _run_migrations(self):
# TODO proper migrations, not just "has the schema ever been run" migrations
async with await (anyio.Path(__file__).parent/'schema.sql').open() as f:
schema = await f.read()
async with self._db.cursor() as cur:
await cur.executescript(schema)
await cur.execute('INSERT INTO migrations (migration_version) VALUES (?)', (MIGRATION_VERSION,))
async def __aexit__(self, *excinfo):
return await self._ctx_stack.__aexit__(*excinfo)
# username@instance
AccountHandle = NewType('AccountHandle', str)
async def fetch_all(self):
"""fetch all following accounts, or an iterable of accounts if provided"""
await self._fedi.verify_credentials()
self._completed_accounts = {}
async with anyio.create_task_group() as tg:
for fqn in map(self.fqn, await self._fedi.following()):
tg.start_soon(self._do_account, fqn)
def fqn(self, acc: dict):
try:
return acc['fqn']
except KeyError:
fqn = acc['acct']
if '@' in fqn: return fqn
return fqn + '@' + URL(self.config['site']).host
async def _do_account(self, acc: AccountHandle):
async with anyio.create_task_group() as tg:
self._completed_accounts[acc] = done_ev = anyio.Event()
tx, rx = anyio.create_memory_object_stream()
async with rx, tx:
tg.start_soon(self._process_pages, rx, acc)
tg.start_soon(self._fetch_account, tx, acc)
await done_ev.wait()
# processing is complete, so halt fetching.
# processing may complete before fetching if we get caught up on new posts.
tg.cancel_scope.cancel()
async def _process_pages(self, stream, account):
done_ev = self._completed_accounts[account]
try:
async for activity in stream:
try:
await self._insert_activity(activity)
except aiosqlite.IntegrityError as exc:
# LOL sqlite error handling is so bad
if exc.args[0].startswith('UNIQUE constraint failed: '):
# this means we've encountered an item we already have saved
# TODO we need to ignore this if we don't actually have all the posts.
# For example, if a prior fetch was interrupted, we'll have k pages of the most recent posts,
# but no more. But since we still have the most recent page saved, it'll *look* like
# we've saved everything, since we stop as soon as we encounter a post we already have.
# To fix this we can check against totalItems in the user's outbox.
break
self.erroneous_accounts.append(account)
raise
finally:
print('Saving posts from', account, 'to the DB')
await self._db.commit()
done_ev.set()
async def _insert_activity(self, activity):
if activity['type'] != 'Create':
# this isn't a post but something else (like, boost, reaction, etc)
return
obj = activity['object']
await self._db.execute(
"""
INSERT INTO posts (post_id, summary, content, published_at)
VALUES (?, ?, ?, ?)
""",
(
obj['id'],
# Pleroma returns an empty string here for posts without a CW,
# which is semantically incorrect IMO
obj['summary'] or None,
extract_post_content(obj['content']),
pendulum.parse(obj['published']).astimezone(pendulum.timezone('UTC')).timestamp(),
),
)
# TODO figure out why i put shield here lol
@shield
async def _fetch_account(self, tx, account: AccountHandle):
done_ev = self._completed_accounts[account]
try:
outbox = await self.fetch_outbox(account)
except Exception as exc:
import traceback
traceback.print_exception(type(exc), exc, exc.__traceback__)
done_ev.set()
self.erroneous_accounts.append(account)
return
print(f'Fetching posts for {account}...')
next_page_url = outbox['first']
while True:
print(f'Fetching {next_page_url}... ')
async with self._rl_handler.request('GET', next_page_url) as resp: page = await resp.json()
for activity in page['orderedItems']:
try:
await tx.send(activity)
except anyio.BrokenResourceError:
# already closed means we're already done
return
# show progress
#print('.', end='', flush=True)
if not (next_page_url := page.get('next')):
#done_ev.set()
break
done_ev.set()
async def fetch_outbox(self, handle):
"""finger handle, a fully-qualified ActivityPub actor name, returning their outbox URL"""
# it's fucking incredible how overengineered ActivityPub is btw
print('Fingering ', handle, '...', sep='')
username, at, instance = handle.lstrip('@').partition('@')
assert at == '@'
# i was planning on doing /.well-known/host-meta to find the webfinger URL, but
# 1) honk does not support host-meta
# 2) WebFinger is always located at the same location anyway
profile_url = await self._finger_actor(username, instance)
try:
async with self._http.get(profile_url) as resp: profile = await resp.json()
except aiohttp.ContentTypeError:
# we didn't get JSON, so just guess the outbox URL
outbox_url = profile_url + '/outbox'
else:
outbox_url = profile['outbox']
async with self._http.get(outbox_url) as resp: outbox = await resp.json()
assert outbox['type'] == 'OrderedCollection'
return outbox
async def _finger_actor(self, username, instance):
# despite HTTP being a direct violation of the WebFinger spec, assume e.g. Tor instances do not support
# HTTPS-over-onion
finger_url = f'http://{instance}/.well-known/webfinger?resource=acct:{username}@{instance}'
async with self._http.get(finger_url) as resp: finger_result = await resp.json()
return (profile_url := self._parse_webfinger_result(username, instance, finger_result))
def _parse_webfinger_result(self, username, instance, finger_result):
"""given webfinger data, return profile URL for handle"""
def check_content_type(type, ct): return ct == type or ct.startswith(type+';')
check_ap = partial(check_content_type, ACTIVITYPUB_CONTENT_TYPE)
try:
# note: the server might decide to return multiple links
# so we need to decide how to prefer one.
# i'd put "and yarl.URL(template).host == instance" here,
# but some instances have no subdomain for the handle yet use a subdomain for the canonical URL.
# Additionally, an instance could theoretically serve profile pages over I2P and the clearnet,
# for example.
return (profile_url := next(
link['href']
for link in finger_result['links']
if link['rel'] == 'self' and check_ap(link['type'])
))
except StopIteration:
# this should never happen either
raise RuntimeError(f'fatal: while fingering {username}@{instance}, failed to find a profile URL')
async def amain():
import json5 as json
import third_party.utils as utils
args = utils.arg_parser_factory(description='Fetch posts from all followed accounts').parse_args()
config = utils.load_config(args.cfg)
async with PostFetcher(config=config) as fetcher: await fetcher.fetch_all()
if (accs := fetcher.erroneous_accounts):
print(
'Exiting unsuccessfully due to previous errors in these accounts:',
', '.join(accs),
file=sys.stderr,
)
sys.exit(1)
def main():
try:
anyio.run(amain)
except KeyboardInterrupt:
# suppress the lengthy traceback
sys.exit(1)
if __name__ == '__main__':
main()