-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathopenstreetmap.py
135 lines (108 loc) Β· 4.89 KB
/
openstreetmap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Literal
import xmltodict
from asyncache import cached
from cachetools import TTLCache
from config import CHANGESET_ID_PLACEHOLDER, TAG_MAX_LENGTH
from utils import ensure_list, get_http_client
@dataclass(frozen=True, kw_only=True, slots=True)
class UploadResult:
ok: bool
error_code: int | None
error_message: str | None
changeset_id: int | None
class OpenStreetMap:
def __init__(self, *, access_token: str | None = None):
headers = {'Authorization': f'Bearer {access_token}'} if access_token else None
self._http = get_http_client('https://api.openstreetmap.org/api', headers=headers)
async def __aenter__(self) -> 'OpenStreetMap':
await self._http.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._http.__aexit__(exc_type, exc_val, exc_tb)
async def get_changeset_maxsize(self) -> int:
r = await self._http.get('/capabilities')
r.raise_for_status()
caps = xmltodict.parse(r.text)
return int(caps['osm']['api']['changesets']['@maximum_elements'])
async def get_relation(self, relation_id: str | int, *, json: bool = True) -> dict:
return (await self._get_elements('relations', (relation_id,), json=json))[0]
async def get_way(self, way_id: str | int, *, json: bool = True) -> dict:
return (await self._get_elements('ways', (way_id,), json=json))[0]
async def get_node(self, node_id: str | int, *, json: bool = True) -> dict:
return (await self._get_elements('nodes', (node_id,), json=json))[0]
async def get_relations(self, relation_ids: Iterable[str | int], *, json: bool = True) -> list[dict]:
return await self._get_elements('relations', relation_ids, json=json)
async def get_ways(self, way_ids: Iterable[str | int], *, json: bool = True) -> list[dict]:
return await self._get_elements('ways', way_ids, json=json)
async def get_nodes(self, node_ids: Iterable[str | int], *, json: bool = True) -> list[dict]:
return await self._get_elements('nodes', node_ids, json=json)
@cached(TTLCache(maxsize=1024, ttl=60))
async def _get_elements(
self,
elements_type: Literal['nodes', 'ways', 'relations'],
element_ids: Iterable[str | int],
json: bool,
) -> list[dict]:
r = await self._http.get(
f'/0.6/{elements_type}{".json" if json else ""}',
params={elements_type: ','.join(map(str, element_ids))},
)
r.raise_for_status()
if json:
return r.json()['elements']
else:
return ensure_list(xmltodict.parse(r.text)['osm'][elements_type[:-1]])
async def get_authorized_user(self) -> dict:
r = await self._http.get('/0.6/user/details.json')
r.raise_for_status()
return r.json()['user']
async def upload_osm_change(self, osm_change: str, tags: dict[str, str]) -> UploadResult:
assert 'comment' in tags, 'You must provide a comment'
for key, value in tuple(tags.items()):
# remove empty tags
if not value:
del tags[key]
continue
# stringify the value
if not isinstance(value, str):
value = str(value)
tags[key] = value
# trim value if too long
if len(value) > TAG_MAX_LENGTH:
print(f'π§ Warning: Trimming {key} value because it exceeds {TAG_MAX_LENGTH} characters: {value}')
tags[key] = value[: TAG_MAX_LENGTH - 1] + 'β¦'
changeset_dict = {'osm': {'changeset': {'tag': [{'@k': k, '@v': v} for k, v in tags.items()]}}}
changeset = xmltodict.unparse(changeset_dict)
r = await self._http.put(
'/0.6/changeset/create',
content=changeset,
headers={'Content-Type': 'text/xml; charset=utf-8'},
follow_redirects=False,
)
r.raise_for_status()
changeset_id_raw = r.text
changeset_id = int(changeset_id_raw)
osm_change = osm_change.replace(CHANGESET_ID_PLACEHOLDER, changeset_id_raw)
upload_resp = await self._http.post(
f'/0.6/changeset/{changeset_id_raw}/upload',
content=osm_change,
headers={'Content-Type': 'text/xml; charset=utf-8'},
timeout=150,
)
r = await self._http.put(f'/0.6/changeset/{changeset_id_raw}/close')
r.raise_for_status()
if not upload_resp.is_success:
return UploadResult(
ok=False,
error_code=upload_resp.status_code,
error_message=upload_resp.text,
changeset_id=changeset_id,
)
return UploadResult(
ok=True,
error_code=None,
error_message=None,
changeset_id=changeset_id,
)