forked from openmaptiles/openmaptiles-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrefresh-views
executable file
·151 lines (132 loc) · 5.95 KB
/
refresh-views
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python
"""
Refresh all PostgreSQL materialized views in parallel, taking into account cross-dependencies.
Usage:
refresh-views [--parallel=<count>] [--schema=<schema>]
refresh-views --help
refresh-views --version
Options:
-p --parallel=<count> Run up to this many parallel queries at the same time [default: 4].
-s --schema=<schema> Limit refreshes to a single schema.
-v --verbose Print additional debugging information.
--help Show this screen.
--version Show version.
PostgreSQL Connection Options:
-h --pghost=<host> Postgres hostname. By default uses PGHOST env or "localhost" if not set.
-P --pgport=<port> Postgres port. By default uses PGPORT env or "5432" if not set.
-d --dbname=<db> Postgres db name. By default uses PGDATABASE env or "openmaptiles" if not set.
-U --user=<user> Postgres user. By default uses PGUSER env or "openmaptiles" if not set.
--password=<password> Postgres password. By default uses PGPASSWORD env or "openmaptiles" if not set.
Postgres
These legacy environment variables should not be used, but they are still supported:
POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD
"""
import asyncio
import os.path
from collections import defaultdict
from datetime import datetime
from typing import List
import asyncpg
from docopt import docopt
import openmaptiles
from openmaptiles.utils import Action, run_actions
from openmaptiles.utils import coalesce
def main(args):
asyncio.run(async_main(
max_queries=int(args['--parallel']),
schema=args['--schema'],
pghost=coalesce(
args.get("--pghost"), os.getenv('POSTGRES_HOST'), os.getenv('PGHOST'),
'localhost'),
pgport=coalesce(
args.get("--pgport"), os.getenv('POSTGRES_PORT'), os.getenv('PGPORT'),
'5432'),
dbname=coalesce(
args.get("--dbname"), os.getenv('POSTGRES_DB'), os.getenv('PGDATABASE'),
'openmaptiles'),
user=coalesce(
args.get("--user"), os.getenv('POSTGRES_USER'), os.getenv('PGUSER'),
'openmaptiles'),
password=coalesce(
args.get("--password"), os.getenv('POSTGRES_PASSWORD'),
os.getenv('PGPASSWORD'), 'openmaptiles'),
verbose=args.get('--verbose'),
))
# This query gets all PostgreSQL object cross-dependencies.
# Later on, we only refresh materialized views, but we need all types of objects
# because in theory a materialized view could depend on a function that itself
# depends on two other materialized views.
# This query also includes any materialized views that do not depend on anything.
SQL_GET_MATERIALIZED_VIEWS = """\
WITH objects AS (
SELECT distinct
c2.oid AS oid,
n2.nspname AS objSchema,
c2.relname AS name,
c2.relkind AS type,
n.nspname AS dependsOnSchema,
c.relname AS dependsOnName,
c.relkind AS dependsOnType
FROM
pg_class c
JOIN pg_namespace n ON n.oid=c.relnamespace
JOIN pg_depend d ON d.refobjid=c.oid
JOIN pg_rewrite r ON r.oid=d.objid
JOIN pg_class c2 ON c2.oid=r.ev_class AND c2.oid != c.oid
JOIN pg_namespace n2 ON c2.relnamespace=n2.oid
JOIN pg_authid au ON au.oid=c2.relowner
WHERE c.relkind IN ('r','m','v','t','f')
)
(
-- all objects with all of their dependencies
SELECT * FROM objects
UNION ALL
-- any materialized views that have no dependencies
SELECT
c.oid AS oid, n.nspname AS objSchema, c.relname AS name, c.relkind AS type,
NULL AS dependsOnSchema, NULL AS dependsOnName, NULL AS dependsOnType
FROM
pg_class c
JOIN pg_namespace n ON n.oid=c.relnamespace
WHERE c.relkind = 'm' AND c.oid not in (select o.oid from objects o)
) order by objSchema, name, dependsOnSchema, dependsOnName
"""
async def async_main(schema, dbname, pghost, pgport, user, password, max_queries,
verbose):
print(f'Connecting to PostgreSQL at {pghost}:{pgport}, db={dbname}, user={user}...')
async with asyncpg.create_pool(
database=dbname, host=pghost, port=pgport, user=user, password=password,
min_size=1, max_size=max_queries,
) as pool:
print(f"Loading all materialized views from {dbname}...")
rows = defaultdict(list)
for row in await pool.fetch(SQL_GET_MATERIALIZED_VIEWS):
depends_on = None
if row['dependsonschema']:
depends_on = f"{row['dependsonschema']}.{row['dependsonname']}"
obj_id = (row['objschema'], row['name'], row['type'] == b'm')
rows[obj_id].append(depends_on)
actions = [Query(
action_id=f"{obj_id[0]}.{obj_id[1]}",
depends_on=[v for v in dependencies if v],
query=(None if not obj_id[2] or (schema and obj_id[0] != schema) else
f"REFRESH MATERIALIZED VIEW {obj_id[0]}.{obj_id[1]};")
) for obj_id, dependencies in rows.items()]
async def executor(action: Query, _: List):
if not action.query:
return # Actions without queries are treated as awaitable placeholders
info = f"{datetime.utcnow()} refreshing {action.action_id}"
if action.depends_on:
info += f" because {', '.join(action.depends_on)} finished."
print(info)
await pool.execute(action.query)
print(f"{datetime.utcnow()} finished refreshing {action.action_id}.")
print(f"Refreshing {sum((1 for v in actions if v.query))} materialized views"
f"{f' in schema {schema}' if schema else ''}...")
await run_actions(actions, executor, ignore_unknown=True, verbose=verbose)
class Query(Action):
def __init__(self, action_id: str, query: str, depends_on: List[str] = None):
super().__init__(action_id, depends_on)
self.query = query
if __name__ == '__main__':
main(docopt(__doc__, version=openmaptiles.__version__))