-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathbulk_index.py
223 lines (184 loc) · 6.65 KB
/
bulk_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""Use this to populate a search index for testing."""
import os
import json
import tempfile
import operator
from typing import List
from itertools import groupby
import click
from search.process import transform
from search.domain import asdict, DocMeta
from search.services import metadata, index
from search.factory import create_ui_web_app
app = create_ui_web_app()
app.app_context().push()
get_paper_id = operator.attrgetter("paper_id")
@app.cli.command()
@click.option(
"--print_indexable",
"-i",
is_flag=True,
help="Index to ES and print the indexable JSON to stdout.",
)
@click.option("--paper_id", "-p", help="Index specified paper id")
@click.option(
"--id_list", "-l", help="Index paper IDs in a file (one ID per line)"
)
@click.option("--cache-dir", "-c", help="Specify the cache directory."
" Install papers from a cache on disk. Note: this will"
" preempt checking for new versions of papers that are"
" in the cache.")
@click.option("--no-cache", help="Disable the cache feature",
is_flag=True, default=False)
@click.option("--quiet", "-q", help="Only print on failure",
is_flag=True, default=False)
def populate(
print_indexable: bool,
paper_id: str,
id_list: str,
cache_dir: str,
no_cache: bool,
quiet: bool,
) -> None:
"""Populate the search index with some test data."""
if cache_dir and no_cache:
raise RuntimeError("Cannot set both no cache and cache dir")
if not no_cache:
cache_dir = init_cache(cache_dir)
else:
cache_dir = None
index_count = 0
if paper_id: # Index a single paper.
TO_INDEX = [paper_id]
elif id_list: # Index a list of papers.
TO_INDEX = load_id_list(id_list)
else:
TO_INDEX = load_id_sample()
approx_size = len(TO_INDEX)
retrieve_chunk_size = 50
index_chunk_size = 250
chunk: List[str] = []
meta: List[DocMeta] = []
index.SearchSession.create_index()
progress = NoopContextManager() if quiet \
else click.progressbar(length=approx_size)
try:
with progress as index_bar:
last = len(TO_INDEX) - 1
for i, paper_id in enumerate(TO_INDEX):
this_meta = []
if cache_dir:
this_meta = from_cache(cache_dir, paper_id)
if this_meta:
meta += this_meta
else:
chunk.append(paper_id)
if len(chunk) == retrieve_chunk_size or (chunk and i == last):
try:
new_meta = metadata.bulk_retrieve(chunk)
except metadata.ConnectionFailed: # Try again.
new_meta = metadata.bulk_retrieve(chunk)
meta += new_meta
chunk = []
if not no_cache:
# Add metadata to the cache.
new_meta_srt = sorted(new_meta, key=get_paper_id)
for paper_id, grp in groupby(new_meta_srt, get_paper_id):
to_cache(cache_dir, paper_id, [dm for dm in grp])
# Index papers on a different chunk cycle, and at the very end.
if len(meta) >= index_chunk_size or i == last:
# Transform to Document.
docs = [transform.to_search_document(dm) for dm in meta]
# Add to index.
index.SearchSession.bulk_add_documents(docs)
if print_indexable:
for document in docs:
click.echo(json.dumps(asdict(docs)))
index_count += len(docs)
meta = []
index_bar.update(i)
finally:
if not quiet:
click.echo(f"Indexed {index_count} documents in total")
if cache_dir:
click.echo(
f"Cache path: {cache_dir}; use `-c {cache_dir}` to reuse in"
f" subsequent calls")
def init_cache(cache_dir: str) -> None:
"""Configure the processor to use a local cache for docmeta."""
# Create cache directory if it doesn't exist
if not (
cache_dir
and os.path.exists(cache_dir)
and os.access(cache_dir, os.W_OK)
):
cache_dir = tempfile.mkdtemp()
return cache_dir
def from_cache(cache_dir: str, arxiv_id: str) -> List[DocMeta]:
"""
Get the docmeta document from a local cache, if available.
Parameters
----------
arxiv_id : str
Returns
-------
:class:`.DocMeta` or None if document is not found in cache
"""
try:
if not cache_dir:
return [] # caching is disabled
fname = "%s.json" % arxiv_id.replace("/", "_")
cache_path = os.path.join(cache_dir, fname)
if not os.path.exists(cache_path):
raise RuntimeError("No cached document")
with open(cache_path) as f:
data: dict = json.load(f)
return [DocMeta(**datum) for datum in data] # type: ignore
# See https://github.com/python/mypy/issues/3937
except RuntimeError:
return []
def to_cache(cache_dir: str, arxiv_id: str, docmeta: List[DocMeta]) -> None:
"""
Add a document to the local cache, if available.
Parameters
----------
arxiv_id : str
docmeta : :class:`.DocMeta`
Raises
------
RuntimeError
Raised when the cache is not available, or the document could not
be added to the cache.
"""
if not cache_dir:
return
fname = "%s.json" % arxiv_id.replace("/", "_")
cache_path = os.path.join(cache_dir, fname)
try:
with open(cache_path, "w") as f:
json.dump([asdict(dm) for dm in docmeta], f)
except Exception as ex:
raise RuntimeError(str(ex)) from ex
def load_id_list(path: str) -> List[str]:
"""Load a list of paper IDs from ``path``."""
if not os.path.exists(path):
raise RuntimeError("Path does not exist: %s" % path)
return
with open(path) as f:
# Stream from the file, in case it's large.
return [ident.strip() for ident in f]
def load_id_sample() -> List[str]:
"""Load a list of IDs from the testing sample."""
with open("tests/data/sample.json") as f:
return [datum["id"] for datum in json.load(f).get("sample")]
class NoopContextManager(object):
"""Context manager that does nothing to replace click progressbar in
quiet mode."""
def __enter__(self):
return self
def __exit__(self, *args):
pass
def update(self, i):
pass
if __name__ == "__main__":
populate()