-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkedro_cli.py
executable file
·214 lines (184 loc) · 7.2 KB
/
kedro_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import os
from itertools import chain
from pathlib import Path
from typing import Dict, Iterable, Tuple
import click
from kedro.framework.cli import main as kedro_main
from kedro.framework.cli.catalog import catalog as catalog_group
from kedro.framework.cli.jupyter import jupyter as jupyter_group
from kedro.framework.cli.pipeline import pipeline as pipeline_group
from kedro.framework.cli.project import project_group
from kedro.framework.cli.utils import KedroCliError, env_option, split_string
from kedro.framework.context import load_context
from kedro.utils import load_obj
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
# get our package onto the python path
PROJ_PATH = Path(__file__).resolve().parent
ENV_ARG_HELP = """Run the pipeline in a configured environment. If not specified,
pipeline will run using environment `local`."""
FROM_INPUTS_HELP = (
"""A list of dataset names which should be used as a starting point."""
)
FROM_NODES_HELP = """A list of node names which should be used as a starting point."""
TO_NODES_HELP = """A list of node names which should be used as an end point."""
NODE_ARG_HELP = """Run only nodes with specified names."""
RUNNER_ARG_HELP = """Specify a runner that you want to run the pipeline with.
Available runners: `SequentialRunner`, `ParallelRunner` and `ThreadRunner`.
This option cannot be used together with --parallel."""
PARALLEL_ARG_HELP = """Run the pipeline using the `ParallelRunner`.
If not specified, use the `SequentialRunner`. This flag cannot be used together
with --runner."""
TAG_ARG_HELP = """Construct the pipeline using only nodes which have this tag
attached. Option can be used multiple times, what results in a
pipeline constructed from nodes having any of those tags."""
LOAD_VERSION_HELP = """Specify a particular dataset version (timestamp) for loading."""
CONFIG_FILE_HELP = """Specify a YAML configuration file to load the run
command arguments from. If command line arguments are provided, they will
override the loaded ones."""
PIPELINE_ARG_HELP = """Name of the modular pipeline to run.
If not set, the project pipeline is run by default."""
PARAMS_ARG_HELP = """Specify extra parameters that you want to pass
to the context initializer. Items must be separated by comma, keys - by colon,
example: param1:value1,param2:value2. Each parameter is split by the first comma,
so parameter values are allowed to contain colons, parameter keys are not."""
def _config_file_callback(ctx, param, value): # pylint: disable=unused-argument
"""Config file callback, that replaces command line options with config file
values. If command line options are passed, they override config file values.
"""
# for performance reasons
import anyconfig # pylint: disable=import-outside-toplevel
ctx.default_map = ctx.default_map or {}
section = ctx.info_name
if value:
config = anyconfig.load(value)[section]
ctx.default_map.update(config)
return value
def _get_values_as_tuple(values: Iterable[str]) -> Tuple[str, ...]:
return tuple(chain.from_iterable(value.split(",") for value in values))
def _reformat_load_versions( # pylint: disable=unused-argument
ctx, param, value
) -> Dict[str, str]:
"""Reformat data structure from tuple to dictionary for `load-version`.
E.g ('dataset1:time1', 'dataset2:time2') -> {"dataset1": "time1", "dataset2": "time2"}.
"""
load_versions_dict = {}
for load_version in value:
load_version_list = load_version.split(":", 1)
if len(load_version_list) != 2:
raise KedroCliError(
f"Expected the form of `load_version` to be "
f"`dataset_name:YYYY-MM-DDThh.mm.ss.sssZ`,"
f"found {load_version} instead"
)
load_versions_dict[load_version_list[0]] = load_version_list[1]
return load_versions_dict
def _split_params(ctx, param, value):
if isinstance(value, dict):
return value
result = {}
for item in split_string(ctx, param, value):
item = item.split(":", 1)
if len(item) != 2:
ctx.fail(
f"Invalid format of `{param.name}` option: Item `{item[0]}` must contain "
f"a key and a value separated by `:`."
)
key = item[0].strip()
if not key:
ctx.fail(
f"Invalid format of `{param.name}` option: Parameter key "
f"cannot be an empty string."
)
value = item[1].strip()
result[key] = _try_convert_to_numeric(value)
return result
def _try_convert_to_numeric(value):
try:
value = float(value)
except ValueError:
return value
return int(value) if value.is_integer() else value
@click.group(context_settings=CONTEXT_SETTINGS, name=__file__)
def cli():
"""Command line tools for manipulating a Kedro project."""
@cli.command()
@click.option(
"--from-inputs", type=str, default="", help=FROM_INPUTS_HELP, callback=split_string
)
@click.option(
"--from-nodes", type=str, default="", help=FROM_NODES_HELP, callback=split_string
)
@click.option(
"--to-nodes", type=str, default="", help=TO_NODES_HELP, callback=split_string
)
@click.option("--node", "-n", "node_names", type=str, multiple=True, help=NODE_ARG_HELP)
@click.option(
"--runner", "-r", type=str, default=None, multiple=False, help=RUNNER_ARG_HELP
)
@click.option("--parallel", "-p", is_flag=True, multiple=False, help=PARALLEL_ARG_HELP)
@env_option
@click.option("--tag", "-t", type=str, multiple=True, help=TAG_ARG_HELP)
@click.option(
"--load-version",
"-lv",
type=str,
multiple=True,
help=LOAD_VERSION_HELP,
callback=_reformat_load_versions,
)
@click.option("--pipeline", type=str, default=None, help=PIPELINE_ARG_HELP)
@click.option(
"--config",
"-c",
type=click.Path(exists=True, dir_okay=False, resolve_path=True),
help=CONFIG_FILE_HELP,
callback=_config_file_callback,
)
@click.option(
"--params", type=str, default="", help=PARAMS_ARG_HELP, callback=_split_params
)
def run(
tag,
env,
parallel,
runner,
node_names,
to_nodes,
from_nodes,
from_inputs,
load_version,
pipeline,
config,
params,
):
"""Run the pipeline."""
if parallel and runner:
raise KedroCliError(
"Both --parallel and --runner options cannot be used together. "
"Please use either --parallel or --runner."
)
runner = runner or "SequentialRunner"
if parallel:
runner = "ParallelRunner"
runner_class = load_obj(runner, "kedro.runner")
tag = _get_values_as_tuple(tag) if tag else tag
node_names = _get_values_as_tuple(node_names) if node_names else node_names
context = load_context(Path.cwd(), env=env, extra_params=params)
context.run(
tags=tag,
runner=runner_class(),
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
load_versions=load_version,
pipeline_name=pipeline,
)
cli.add_command(pipeline_group)
cli.add_command(catalog_group)
cli.add_command(jupyter_group)
for command in project_group.commands.values():
cli.add_command(command)
if __name__ == "__main__":
os.chdir(str(PROJ_PATH))
kedro_main()