From f3bb2baa4f0dcd72e3d5a69f1f8f23e7996a7c60 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Fri, 4 Mar 2022 16:52:14 -0500 Subject: [PATCH 1/3] Add configurable progress and session info indicators Configuration options are added to allow override of the spark statement progress indicator and for the html table of session info upon startup. Both come with defaults that implement the same function as was done prior to this change, so there would be no impact in case options are not set. --- CHANGELOG.md | 5 ++- README.md | 37 +++++++++++++++++++ sparkmagic/example_config.json | 17 +++++---- .../sparkmagic/livyclientlib/command.py | 16 +++----- .../sparkmagic/livyclientlib/livysession.py | 18 +++++---- sparkmagic/sparkmagic/utils/configuration.py | 19 +++++++--- sparkmagic/sparkmagic/utils/progress.py | 32 ++++++++++++++++ sparkmagic/sparkmagic/utils/startupinfo.py | 20 ++++++++++ sparkmagic/sparkmagic/utils/utils.py | 32 ++++++++++------ 9 files changed, 151 insertions(+), 45 deletions(-) create mode 100644 sparkmagic/sparkmagic/utils/progress.py create mode 100644 sparkmagic/sparkmagic/utils/startupinfo.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ec3c033a..f0a868c98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## NEXT RELEASE +* Added customization options for startup information and progress indicator + ## 0.19.1 ### Bug Fixes @@ -16,7 +18,7 @@ ### Features * Added one internal magic to enable retry of session creation. Thanks @edwardps -* New `%%pretty` magic for pretty printing a dataframe as an HTML table. Thanks @hegary +* New `%%pretty` magic for pretty printing a dataframe as an HTML table. Thanks @hegary * Update Endpoint widget to shield passwords when entering them in the ipywidget. Thanks @J0rg3M3nd3z @jodom961 ## 0.18.0 @@ -112,4 +114,3 @@ * Updated code to work with Livy 0.5 and later, where Python 3 support is not a different kind of session. Thanks to Gianmario Spacagna for contributing some of the code, and G-Research for sponsoring Itamar Turner-Trauring's time. * Fixed `AttributeError` on `None`, thanks to Eric Dill. * `recovering` session status won't cause a blow up anymore. Thanks to G-Research for sponsoring Itamar Turner-Trauring's time. - diff --git a/README.md b/README.md index 41e89eacd..3278169c8 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,43 @@ If you want any registered livy sessions to be cleaned up on exit regardless of } ``` +## Notebook customizations + +There are several ways in which sparkmagic gives feedback via the Jupyter notebook display system. This section +describes customizations available to modify the default behavior for such interactions, where applicable. + +### Startup info table + +When a session starts up, sparkmagic displays a table of session information including application name and the links to +sparkUI and logs. The display may be overridden by specifying a custom class: + +```json +{ + "startup_info_display_class": "module.path.classname" +} +``` + +The class should be a subclass of [StartupInfoDisplay](sparkmagic/sparkmagic/utils/startupinfo.py). It will be passed +the ipython_display, the LivySession object and the current session id. It should implement the `write_msg(msg)` method +to write a line of status output (by default this writes text to the current cell output), and the `display_info()` +method to show session information (by default, this displays an HTML table). + +### Statement progress indicator + +By default, sparkmagic uses the FloatProgress ipython widget to display the progress of a statement in the cell +output. If this is not desired, override the class used to construct the progress indicator: + +```json +{ + "progress_indicator_class": "module.path.classname" +} +``` + +The class should be a subclass of [ProgressIndicator](sparkmagic/sparkmagic/utils/progress.py) and will be passed the +session object and statement_id as arguments to the constructor. The `update(value_in_pct)` method will be called on +progress and the `close()` method will be called when the statement completes. + + ### Conf overrides in code In addition to the conf at `~/.sparkmagic/config.json`, sparkmagic conf can be overridden programmatically in a notebook. diff --git a/sparkmagic/example_config.json b/sparkmagic/example_config.json index f2891a7c6..c19a77a26 100644 --- a/sparkmagic/example_config.json +++ b/sparkmagic/example_config.json @@ -21,20 +21,20 @@ "logging_config": { "version": 1, "formatters": { - "magicsFormatter": { + "magicsFormatter": { "format": "%(asctime)s\t%(levelname)s\t%(message)s", "datefmt": "" } }, "handlers": { - "magicsHandler": { + "magicsHandler": { "class": "hdijupyterutils.filehandler.MagicsFileHandler", "formatter": "magicsFormatter", "home_path": "~/.sparkmagic" } }, "loggers": { - "magicsLogger": { + "magicsLogger": { "handlers": ["magicsHandler"], "level": "DEBUG", "propagate": 0 @@ -43,7 +43,7 @@ }, "authenticators": { "Kerberos": "sparkmagic.auth.kerberos.Kerberos", - "None": "sparkmagic.auth.customauth.Authenticator", + "None": "sparkmagic.auth.customauth.Authenticator", "Basic_Access": "sparkmagic.auth.basic.Basic" }, @@ -63,15 +63,18 @@ "coerce_dataframe": true, "max_results_sql": 2500, "pyspark_dataframe_encoding": "utf-8", - + "heartbeat_refresh_seconds": 30, "livy_server_heartbeat_timeout_seconds": 0, "heartbeat_retry_seconds": 10, "server_extension_default_kernel_name": "pysparkkernel", "custom_headers": {}, - + "retry_policy": "configurable", "retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5], - "configurable_retry_policy_max_retries": 8 + "configurable_retry_policy_max_retries": 8, + + "progress_indicator_class": "sparkmagic.utils.progress.defaultProgressIndicator", + "startup_info_display_class": "sparkmagic.utils.startupinfo.defaultStartupInfoDisplay" } diff --git a/sparkmagic/sparkmagic/livyclientlib/command.py b/sparkmagic/sparkmagic/livyclientlib/command.py index 309a6158a..ce8f9fbb5 100644 --- a/sparkmagic/sparkmagic/livyclientlib/command.py +++ b/sparkmagic/sparkmagic/livyclientlib/command.py @@ -11,6 +11,7 @@ import sparkmagic.utils.configuration as conf from sparkmagic.utils.sparklogger import SparkLog from sparkmagic.utils.sparkevents import SparkEvents +from sparkmagic.utils.utils import get_progress_indicator_class from sparkmagic.utils.constants import MAGICS_LOGGER_NAME, FINAL_STATEMENT_STATUS, \ MIMETYPE_IMAGE_PNG, MIMETYPE_TEXT_HTML, MIMETYPE_TEXT_PLAIN, \ COMMAND_INTERRUPTED_MSG, COMMAND_CANCELLATION_FAILED_MSG @@ -26,6 +27,7 @@ def __init__(self, code, spark_events=None): if spark_events is None: spark_events = SparkEvents() self._spark_events = spark_events + self.progress_indicator_class = get_progress_indicator_class() def __repr__(self): return "Command({}, ...)".format(repr(self.code)) @@ -69,16 +71,8 @@ def execute(self, session): def _get_statement_output(self, session, statement_id): retries = 1 - progress = FloatProgress(value=0.0, - min=0, - max=1.0, - step=0.01, - description='Progress:', - bar_style='info', - orientation='horizontal', - layout=Layout(width='50%', height='25px') - ) - session.ipython_display.display(progress) + + progress = self.progress_indicator_class(session, statement_id) while True: statement = session.http_client.get_statement(session.id, statement_id) @@ -87,7 +81,7 @@ def _get_statement_output(self, session, statement_id): self.logger.debug(u"Status of statement {} is {}.".format(statement_id, status)) if status not in FINAL_STATEMENT_STATUS: - progress.value = statement.get('progress', 0.0) + progress.update(statement.get('progress', 0.0)) session.sleep(retries) retries += 1 else: diff --git a/sparkmagic/sparkmagic/livyclientlib/livysession.py b/sparkmagic/sparkmagic/livyclientlib/livysession.py index 81512d856..1e0659270 100644 --- a/sparkmagic/sparkmagic/livyclientlib/livysession.py +++ b/sparkmagic/sparkmagic/livyclientlib/livysession.py @@ -8,7 +8,7 @@ import sparkmagic.utils.constants as constants from sparkmagic.utils.sparklogger import SparkLog from sparkmagic.utils.sparkevents import SparkEvents -from sparkmagic.utils.utils import get_sessions_info_html +from sparkmagic.utils.utils import get_startup_info_display_class from .configurableretrypolicy import ConfigurableRetryPolicy from .command import Command from .exceptions import LivyClientTimeoutException, \ @@ -118,6 +118,8 @@ def __init__(self, http_client, properties, ipython_display, self.id = session_id self.session_info = u"" + self.startup_info_display = get_startup_info_display_class() + self._heartbeat_thread = None if session_id == -1: self.status = constants.NOT_STARTED_SESSION_STATUS @@ -139,7 +141,8 @@ def start(self): self.id = r[u"id"] self.status = str(r[u"state"]) - self.ipython_display.writeln(u"Starting Spark application") + startup_info = self.startup_info_display(self.ipython_display, [self], self.id) + startup_info.write_msg(u"Starting Spark application") # Start heartbeat thread to keep Livy interactive session alive. self._start_heartbeat_thread() @@ -151,24 +154,23 @@ def start(self): raise LivyClientTimeoutException(u"Session {} did not start up in {} seconds." .format(self.id, conf.livy_session_startup_timeout_seconds())) - html = get_sessions_info_html([self], self.id) - self.ipython_display.html(html) + startup_info.display_info() command = Command("spark") (success, out, mimetype) = command.execute(self) if success: - self.ipython_display.writeln(u"SparkSession available as 'spark'.") + startup_info.write_msg(u"SparkSession available as 'spark'.") self.sql_context_variable_name = "spark" else: command = Command("sqlContext") (success, out, mimetype) = command.execute(self) if success: - self.ipython_display.writeln(u"SparkContext available as 'sc'.") + startup_info.write_msg(u"SparkContext available as 'sc'.") if ("hive" in out.lower()): - self.ipython_display.writeln(u"HiveContext available as 'sqlContext'.") + startup_info.write_msg(u"HiveContext available as 'sqlContext'.") else: - self.ipython_display.writeln(u"SqlContext available as 'sqlContext'.") + startup_info.write_msg(u"SqlContext available as 'sqlContext'.") self.sql_context_variable_name = "sqlContext" else: raise SqlContextNotFoundException(u"Neither SparkSession nor HiveContext/SqlContext is available.") diff --git a/sparkmagic/sparkmagic/utils/configuration.py b/sparkmagic/sparkmagic/utils/configuration.py index 515492b43..8498bb351 100644 --- a/sparkmagic/sparkmagic/utils/configuration.py +++ b/sparkmagic/sparkmagic/utils/configuration.py @@ -22,7 +22,7 @@ d = {} path = join_paths(HOME_PATH, CONFIG_FILE) - + def override(config, value): _override(d, path, config, value) @@ -60,8 +60,8 @@ def authenticators(): u"None": u"sparkmagic.auth.customauth.Authenticator", u"Basic_Access": u"sparkmagic.auth.basic.Basic" } - - + + # Configs def get_session_properties(language): @@ -78,8 +78,8 @@ def session_configs(): @_with_override def kernel_python_credentials(): return {u'username': u'', u'base64_password': u'', u'url': u'http://localhost:8998', u'auth': NO_AUTH} - - + + def base64_kernel_python_credentials(): return _credentials_override(kernel_python_credentials) @@ -99,7 +99,7 @@ def kernel_scala_credentials(): return {u'username': u'', u'base64_password': u'', u'url': u'http://localhost:8998', u'auth': NO_AUTH} -def base64_kernel_scala_credentials(): +def base64_kernel_scala_credentials(): return _credentials_override(kernel_scala_credentials) @_with_override @@ -271,6 +271,13 @@ def kerberos_auth_configuration(): "mutual_authentication": REQUIRED } +@_with_override +def progress_indicator_class(): + return 'sparkmagic.utils.progress.defaultProgressIndicator' + +@_with_override +def startup_info_display_class(): + return 'sparkmagic.utils.startupinfo.defaultStartupInfoDisplay' def _credentials_override(f): """Provides special handling for credentials. It still calls _override(). diff --git a/sparkmagic/sparkmagic/utils/progress.py b/sparkmagic/sparkmagic/utils/progress.py new file mode 100644 index 000000000..a4ca3d2d5 --- /dev/null +++ b/sparkmagic/sparkmagic/utils/progress.py @@ -0,0 +1,32 @@ +from ipywidgets.widgets import FloatProgress, Layout + +class ProgressIndicator: + def __init__(self, session, statement_id): + pass + + def update(self, value): + pass + + def close(self): + pass + +class defaultProgressIndicator(ProgressIndicator): + def __init__(self, session, statement_id): + self.session = session + self.statement_id = statement_id + self.progress = FloatProgress(value=0.0, + min=0, + max=1.0, + step=0.01, + description='Progress:', + bar_style='info', + orientation='horizontal', + layout=Layout(width='50%', height='25px') + ) + self.session.ipython_display.display(self.progress) + + def update(self, value): + self.progress.value = value + + def close(self): + self.progress.close() diff --git a/sparkmagic/sparkmagic/utils/startupinfo.py b/sparkmagic/sparkmagic/utils/startupinfo.py new file mode 100644 index 000000000..2cf8de82a --- /dev/null +++ b/sparkmagic/sparkmagic/utils/startupinfo.py @@ -0,0 +1,20 @@ +from sparkmagic.utils.utils import get_sessions_info_html + +class StartupInfoDisplay: + def __init__(self, ipython_display, sessions_info, current_session_id): + self.ipython_display = ipython_display + self.sessions_info = sessions_info + self.current_session_id = current_session_id + + def write_msg(self, msg): + pass + + def display_info(self): + pass + +class defaultStartupInfoDisplay(StartupInfoDisplay): + def write_msg(self, msg): + self.ipython_display.writeln(msg) + + def display_info(self): + self.ipython_display.html(get_sessions_info_html(self.sessions_info, self.current_session_id)) diff --git a/sparkmagic/sparkmagic/utils/utils.py b/sparkmagic/sparkmagic/utils/utils.py index 5af205182..454f5c74d 100644 --- a/sparkmagic/sparkmagic/utils/utils.py +++ b/sparkmagic/sparkmagic/utils/utils.py @@ -27,12 +27,12 @@ def parse_argstring_or_throw(magic_func, argstring, parse_argstring=parse_argstr return parse_argstring(magic_func, argstring) except UsageError as e: raise BadUserDataException(str(e)) - - + + def coerce_pandas_df_to_numeric_datetime(df): for column_name in df.columns: coerced = False - + if df[column_name].isnull().all(): continue @@ -64,7 +64,7 @@ def records_to_dataframe(records_text, kind, coerce=None): df = pd.DataFrame(data_array) - if len(data_array) > 0: + if len(data_array) > 0: # This will assign the columns in the right order. If we simply did # df = pd.DataFrame(data_array, columns=data_array[0].keys()) # in the code defining df, above, we could get an issue where the first element @@ -75,7 +75,7 @@ def records_to_dataframe(records_text, kind, coerce=None): if len(data.keys()) == len(df.columns): df = df[list(data.keys())] break - + if coerce is None: coerce = conf.coerce_dataframe() if coerce: @@ -94,6 +94,12 @@ def get_sessions_info_html(info_sessions, current_session_id): return html +def load_class_from_string(full_class): + module, class_name = full_class.rsplit('.', 1) + class_module = importlib.import_module(module) + class_class = getattr(class_module, class_name) + return class_class + def initialize_auth(args): """Creates an authenticatior class instance for the given auth type @@ -103,10 +109,10 @@ def initialize_auth(args): Returns: An instance of a valid Authenticator or None if args.auth is 'None' - + Raises: sparkmagic.livyclientlib.BadUserConfigurationException: if args.auth is not a valid - authenticator class. + authenticator class. """ if args.auth is None: auth = conf.get_auth_value(args.user, args.password) @@ -114,15 +120,19 @@ def initialize_auth(args): auth = args.auth if auth == constants.NO_AUTH: return None - else: + else: full_class = conf.authenticators().get(auth) if full_class is None: raise BadUserConfigurationException(u"Auth '{}' not supported".format(auth)) - module, class_name = (full_class).rsplit('.', 1) - events_handler_module = importlib.import_module(module) - auth_class = getattr(events_handler_module, class_name) + auth_class = load_class_from_string(full_class) return auth_class(args) +def get_progress_indicator_class(): + return load_class_from_string(conf.progress_indicator_class()) + +def get_startup_info_display_class(): + return load_class_from_string(conf.startup_info_display_class()) + class Namespace: """Namespace to initialize authenticator class with""" def __init__(self, **kwargs): From 6004297168105078b5eb624334c37b709aaca32d Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Tue, 3 May 2022 13:42:00 -0400 Subject: [PATCH 2/3] Updates from PR --- README.md | 9 +++++---- sparkmagic/sparkmagic/livyclientlib/command.py | 1 + .../sparkmagic/livyclientlib/livysession.py | 2 +- .../sparkmagic/tests/test_configuration.py | 16 +++++++++++++--- sparkmagic/sparkmagic/utils/configuration.py | 4 ++-- sparkmagic/sparkmagic/utils/progress.py | 7 ++++++- sparkmagic/sparkmagic/utils/startupinfo.py | 10 +++++----- 7 files changed, 33 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 3278169c8..5744b4c8b 100644 --- a/README.md +++ b/README.md @@ -190,7 +190,7 @@ sparkUI and logs. The display may be overridden by specifying a custom class: The class should be a subclass of [StartupInfoDisplay](sparkmagic/sparkmagic/utils/startupinfo.py). It will be passed the ipython_display, the LivySession object and the current session id. It should implement the `write_msg(msg)` method -to write a line of status output (by default this writes text to the current cell output), and the `display_info()` +to write a line of status output (by default this writes text to the current cell output), and the `display()` method to show session information (by default, this displays an HTML table). ### Statement progress indicator @@ -205,9 +205,10 @@ output. If this is not desired, override the class used to construct the progres ``` The class should be a subclass of [ProgressIndicator](sparkmagic/sparkmagic/utils/progress.py) and will be passed the -session object and statement_id as arguments to the constructor. The `update(value_in_pct)` method will be called on -progress and the `close()` method will be called when the statement completes. - +session object and statement_id as arguments to the constructor. The `display()` method will be called after +initialization and should arrange to display the widget via the `ipython_display` attribute of the session object. The +`update(value_in_pct)` method will be called on progress and the `close()` method will be called when the statement +completes. By default this uses a horizontal FloatProgress widget. ### Conf overrides in code diff --git a/sparkmagic/sparkmagic/livyclientlib/command.py b/sparkmagic/sparkmagic/livyclientlib/command.py index ce8f9fbb5..20679dc2b 100644 --- a/sparkmagic/sparkmagic/livyclientlib/command.py +++ b/sparkmagic/sparkmagic/livyclientlib/command.py @@ -73,6 +73,7 @@ def _get_statement_output(self, session, statement_id): retries = 1 progress = self.progress_indicator_class(session, statement_id) + progress.display() while True: statement = session.http_client.get_statement(session.id, statement_id) diff --git a/sparkmagic/sparkmagic/livyclientlib/livysession.py b/sparkmagic/sparkmagic/livyclientlib/livysession.py index 1e0659270..208eb73f2 100644 --- a/sparkmagic/sparkmagic/livyclientlib/livysession.py +++ b/sparkmagic/sparkmagic/livyclientlib/livysession.py @@ -154,7 +154,7 @@ def start(self): raise LivyClientTimeoutException(u"Session {} did not start up in {} seconds." .format(self.id, conf.livy_session_startup_timeout_seconds())) - startup_info.display_info() + startup_info.display() command = Command("spark") (success, out, mimetype) = command.execute(self) diff --git a/sparkmagic/sparkmagic/tests/test_configuration.py b/sparkmagic/sparkmagic/tests/test_configuration.py index 52ca0e2f5..6a9a1febd 100644 --- a/sparkmagic/sparkmagic/tests/test_configuration.py +++ b/sparkmagic/sparkmagic/tests/test_configuration.py @@ -1,15 +1,17 @@ from mock import MagicMock -from nose.tools import assert_equals, assert_not_equals, raises, with_setup +from nose.tools import assert_equals, assert_not_equals, assert_true, raises, with_setup import json import sparkmagic.utils.configuration as conf from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException from sparkmagic.utils.constants import AUTH_BASIC, NO_AUTH - +from sparkmagic.utils.utils import get_progress_indicator_class, get_startup_info_display_class +from sparkmagic.utils.progress import ProgressIndicator +from sparkmagic.utils.startupinfo import StartupInfoDisplay def _setup(): conf.override_all({}) - + @with_setup(_setup) def test_configuration_override_base64_password(): @@ -78,3 +80,11 @@ def test_share_config_between_pyspark_and_pyspark3(): kpc = { 'username': 'U', 'password': 'P', 'base64_password': 'cGFzc3dvcmQ=', 'url': 'L', 'auth': AUTH_BASIC } overrides = { conf.kernel_python_credentials.__name__: kpc } assert_equals(conf.base64_kernel_python3_credentials(), conf.base64_kernel_python_credentials()) + +@with_setup(_setup) +def test_default_progress_class_valid(): + assert_true(issubclass(get_progress_indicator_class(), ProgressIndicator)) + +@with_setup(_setup) +def test_default_startup_class_valid(): + assert_true(issubclass(get_startup_info_display_class(), StartupInfoDisplay)) diff --git a/sparkmagic/sparkmagic/utils/configuration.py b/sparkmagic/sparkmagic/utils/configuration.py index 8498bb351..040053cdb 100644 --- a/sparkmagic/sparkmagic/utils/configuration.py +++ b/sparkmagic/sparkmagic/utils/configuration.py @@ -273,11 +273,11 @@ def kerberos_auth_configuration(): @_with_override def progress_indicator_class(): - return 'sparkmagic.utils.progress.defaultProgressIndicator' + return 'sparkmagic.utils.progress.HorizontalFloatProgressWidgetIndicator' @_with_override def startup_info_display_class(): - return 'sparkmagic.utils.startupinfo.defaultStartupInfoDisplay' + return 'sparkmagic.utils.startupinfo.HTMLTableStartupInfoDisplay' def _credentials_override(f): """Provides special handling for credentials. It still calls _override(). diff --git a/sparkmagic/sparkmagic/utils/progress.py b/sparkmagic/sparkmagic/utils/progress.py index a4ca3d2d5..d54671066 100644 --- a/sparkmagic/sparkmagic/utils/progress.py +++ b/sparkmagic/sparkmagic/utils/progress.py @@ -4,13 +4,16 @@ class ProgressIndicator: def __init__(self, session, statement_id): pass + def display(self): + pass + def update(self, value): pass def close(self): pass -class defaultProgressIndicator(ProgressIndicator): +class HorizontalFloatProgressWidgetIndicator(ProgressIndicator): def __init__(self, session, statement_id): self.session = session self.statement_id = statement_id @@ -23,6 +26,8 @@ def __init__(self, session, statement_id): orientation='horizontal', layout=Layout(width='50%', height='25px') ) + + def display(self): self.session.ipython_display.display(self.progress) def update(self, value): diff --git a/sparkmagic/sparkmagic/utils/startupinfo.py b/sparkmagic/sparkmagic/utils/startupinfo.py index 2cf8de82a..dfcdcc3cd 100644 --- a/sparkmagic/sparkmagic/utils/startupinfo.py +++ b/sparkmagic/sparkmagic/utils/startupinfo.py @@ -9,12 +9,12 @@ def __init__(self, ipython_display, sessions_info, current_session_id): def write_msg(self, msg): pass - def display_info(self): + def display(self): pass -class defaultStartupInfoDisplay(StartupInfoDisplay): +class HTMLTableStartupInfoDisplay(StartupInfoDisplay): + def display(self): + self.ipython_display.html(get_sessions_info_html(self.sessions_info, self.current_session_id)) + def write_msg(self, msg): self.ipython_display.writeln(msg) - - def display_info(self): - self.ipython_display.html(get_sessions_info_html(self.sessions_info, self.current_session_id)) From acfdb850825f49982330e7a92e21a379800337ea Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Tue, 3 May 2022 16:04:42 -0400 Subject: [PATCH 3/3] Merge latest from upstream --- .../sparkmagic/livyclientlib/command.py | 21 ++++++++++++----- .../sparkmagic/livyclientlib/livysession.py | 16 +++++++------ .../sparkmagic/tests/test_configuration.py | 23 +++++++++++++++---- sparkmagic/sparkmagic/utils/configuration.py | 14 ++++++++--- sparkmagic/sparkmagic/utils/progress.py | 19 ++++++++------- sparkmagic/sparkmagic/utils/startupinfo.py | 6 ++++- sparkmagic/sparkmagic/utils/utils.py | 9 ++++++-- 7 files changed, 77 insertions(+), 31 deletions(-) diff --git a/sparkmagic/sparkmagic/livyclientlib/command.py b/sparkmagic/sparkmagic/livyclientlib/command.py index 20280a06a..66c734abf 100644 --- a/sparkmagic/sparkmagic/livyclientlib/command.py +++ b/sparkmagic/sparkmagic/livyclientlib/command.py @@ -12,11 +12,20 @@ from sparkmagic.utils.sparklogger import SparkLog from sparkmagic.utils.sparkevents import SparkEvents from sparkmagic.utils.utils import get_progress_indicator_class -from sparkmagic.utils.constants import MAGICS_LOGGER_NAME, FINAL_STATEMENT_STATUS, \ - MIMETYPE_IMAGE_PNG, MIMETYPE_TEXT_HTML, MIMETYPE_TEXT_PLAIN, \ - COMMAND_INTERRUPTED_MSG, COMMAND_CANCELLATION_FAILED_MSG -from .exceptions import LivyUnexpectedStatusException, SparkStatementCancelledException, \ - SparkStatementCancellationFailedException +from sparkmagic.utils.constants import ( + MAGICS_LOGGER_NAME, + FINAL_STATEMENT_STATUS, + MIMETYPE_IMAGE_PNG, + MIMETYPE_TEXT_HTML, + MIMETYPE_TEXT_PLAIN, + COMMAND_INTERRUPTED_MSG, + COMMAND_CANCELLATION_FAILED_MSG, +) +from .exceptions import ( + LivyUnexpectedStatusException, + SparkStatementCancelledException, + SparkStatementCancellationFailedException, +) class Command(ObjectWithGuid): @@ -112,7 +121,7 @@ def _get_statement_output(self, session, statement_id): ) if status not in FINAL_STATEMENT_STATUS: - progress.update(statement.get('progress', 0.0)) + progress.update(statement.get("progress", 0.0)) session.sleep(retries) retries += 1 else: diff --git a/sparkmagic/sparkmagic/livyclientlib/livysession.py b/sparkmagic/sparkmagic/livyclientlib/livysession.py index 112e9e3ee..cad47b777 100644 --- a/sparkmagic/sparkmagic/livyclientlib/livysession.py +++ b/sparkmagic/sparkmagic/livyclientlib/livysession.py @@ -166,8 +166,10 @@ def start(self): self.id = r["id"] self.status = str(r["state"]) - startup_info = self.startup_info_display(self.ipython_display, [self], self.id) - startup_info.write_msg(u"Starting Spark application") + startup_info = self.startup_info_display( + self.ipython_display, [self], self.id + ) + startup_info.write_msg("Starting Spark application") # Start heartbeat thread to keep Livy interactive session alive. self._start_heartbeat_thread() @@ -188,17 +190,17 @@ def start(self): (success, out, mimetype) = command.execute(self) if success: - startup_info.write_msg(u"SparkSession available as 'spark'.") + startup_info.write_msg("SparkSession available as 'spark'.") self.sql_context_variable_name = "spark" else: command = Command("sqlContext") (success, out, mimetype) = command.execute(self) if success: - startup_info.write_msg(u"SparkContext available as 'sc'.") - if ("hive" in out.lower()): - startup_info.write_msg(u"HiveContext available as 'sqlContext'.") + startup_info.write_msg("SparkContext available as 'sc'.") + if "hive" in out.lower(): + startup_info.write_msg("HiveContext available as 'sqlContext'.") else: - startup_info.write_msg(u"SqlContext available as 'sqlContext'.") + startup_info.write_msg("SqlContext available as 'sqlContext'.") self.sql_context_variable_name = "sqlContext" else: raise SqlContextNotFoundException( diff --git a/sparkmagic/sparkmagic/tests/test_configuration.py b/sparkmagic/sparkmagic/tests/test_configuration.py index e03f5e288..b43df151d 100644 --- a/sparkmagic/sparkmagic/tests/test_configuration.py +++ b/sparkmagic/sparkmagic/tests/test_configuration.py @@ -5,10 +5,14 @@ import sparkmagic.utils.configuration as conf from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException from sparkmagic.utils.constants import AUTH_BASIC, NO_AUTH -from sparkmagic.utils.utils import get_progress_indicator_class, get_startup_info_display_class +from sparkmagic.utils.utils import ( + get_progress_indicator_class, + get_startup_info_display_class, +) from sparkmagic.utils.progress import ProgressIndicator from sparkmagic.utils.startupinfo import StartupInfoDisplay + def _setup(): conf.override_all({}) @@ -116,14 +120,25 @@ def test_configuration_raise_error_for_bad_base64_password(): @with_setup(_setup) def test_share_config_between_pyspark_and_pyspark3(): - kpc = { 'username': 'U', 'password': 'P', 'base64_password': 'cGFzc3dvcmQ=', 'url': 'L', 'auth': AUTH_BASIC } - overrides = { conf.kernel_python_credentials.__name__: kpc } - assert_equals(conf.base64_kernel_python3_credentials(), conf.base64_kernel_python_credentials()) + kpc = { + "username": "U", + "password": "P", + "base64_password": "cGFzc3dvcmQ=", + "url": "L", + "auth": AUTH_BASIC, + } + overrides = {conf.kernel_python_credentials.__name__: kpc} + assert_equals( + conf.base64_kernel_python3_credentials(), + conf.base64_kernel_python_credentials(), + ) + @with_setup(_setup) def test_default_progress_class_valid(): assert_true(issubclass(get_progress_indicator_class(), ProgressIndicator)) + @with_setup(_setup) def test_default_startup_class_valid(): assert_true(issubclass(get_startup_info_display_class(), StartupInfoDisplay)) diff --git a/sparkmagic/sparkmagic/utils/configuration.py b/sparkmagic/sparkmagic/utils/configuration.py index 8e55742f7..c59426d2e 100644 --- a/sparkmagic/sparkmagic/utils/configuration.py +++ b/sparkmagic/sparkmagic/utils/configuration.py @@ -96,7 +96,12 @@ def session_configs(): @_with_override def kernel_python_credentials(): - return {u'username': u'', u'base64_password': u'', u'url': u'http://localhost:8998', u'auth': NO_AUTH} + return { + "username": "", + "base64_password": "", + "url": "http://localhost:8998", + "auth": NO_AUTH, + } def base64_kernel_python_credentials(): @@ -299,13 +304,16 @@ def cleanup_all_sessions_on_exit(): def kerberos_auth_configuration(): return {"mutual_authentication": REQUIRED} + @_with_override def progress_indicator_class(): - return 'sparkmagic.utils.progress.HorizontalFloatProgressWidgetIndicator' + return "sparkmagic.utils.progress.HorizontalFloatProgressWidgetIndicator" + @_with_override def startup_info_display_class(): - return 'sparkmagic.utils.startupinfo.HTMLTableStartupInfoDisplay' + return "sparkmagic.utils.startupinfo.HTMLTableStartupInfoDisplay" + def _credentials_override(f): """Provides special handling for credentials. It still calls _override(). diff --git a/sparkmagic/sparkmagic/utils/progress.py b/sparkmagic/sparkmagic/utils/progress.py index d54671066..22f442a27 100644 --- a/sparkmagic/sparkmagic/utils/progress.py +++ b/sparkmagic/sparkmagic/utils/progress.py @@ -1,5 +1,6 @@ from ipywidgets.widgets import FloatProgress, Layout + class ProgressIndicator: def __init__(self, session, statement_id): pass @@ -13,18 +14,20 @@ def update(self, value): def close(self): pass + class HorizontalFloatProgressWidgetIndicator(ProgressIndicator): def __init__(self, session, statement_id): self.session = session self.statement_id = statement_id - self.progress = FloatProgress(value=0.0, - min=0, - max=1.0, - step=0.01, - description='Progress:', - bar_style='info', - orientation='horizontal', - layout=Layout(width='50%', height='25px') + self.progress = FloatProgress( + value=0.0, + min=0, + max=1.0, + step=0.01, + description="Progress:", + bar_style="info", + orientation="horizontal", + layout=Layout(width="50%", height="25px"), ) def display(self): diff --git a/sparkmagic/sparkmagic/utils/startupinfo.py b/sparkmagic/sparkmagic/utils/startupinfo.py index dfcdcc3cd..1606c1750 100644 --- a/sparkmagic/sparkmagic/utils/startupinfo.py +++ b/sparkmagic/sparkmagic/utils/startupinfo.py @@ -1,5 +1,6 @@ from sparkmagic.utils.utils import get_sessions_info_html + class StartupInfoDisplay: def __init__(self, ipython_display, sessions_info, current_session_id): self.ipython_display = ipython_display @@ -12,9 +13,12 @@ def write_msg(self, msg): def display(self): pass + class HTMLTableStartupInfoDisplay(StartupInfoDisplay): def display(self): - self.ipython_display.html(get_sessions_info_html(self.sessions_info, self.current_session_id)) + self.ipython_display.html( + get_sessions_info_html(self.sessions_info, self.current_session_id) + ) def write_msg(self, msg): self.ipython_display.writeln(msg) diff --git a/sparkmagic/sparkmagic/utils/utils.py b/sparkmagic/sparkmagic/utils/utils.py index 7b7895bb0..94a338cf7 100644 --- a/sparkmagic/sparkmagic/utils/utils.py +++ b/sparkmagic/sparkmagic/utils/utils.py @@ -105,12 +105,14 @@ def get_sessions_info_html(info_sessions, current_session_id): return html + def load_class_from_string(full_class): - module, class_name = full_class.rsplit('.', 1) + module, class_name = full_class.rsplit(".", 1) class_module = importlib.import_module(module) class_class = getattr(class_module, class_name) return class_class + def initialize_auth(args): """Creates an authenticatior class instance for the given auth type @@ -134,16 +136,19 @@ def initialize_auth(args): else: full_class = conf.authenticators().get(auth) if full_class is None: - raise BadUserConfigurationException(u"Auth '{}' not supported".format(auth)) + raise BadUserConfigurationException("Auth '{}' not supported".format(auth)) auth_class = load_class_from_string(full_class) return auth_class(args) + def get_progress_indicator_class(): return load_class_from_string(conf.progress_indicator_class()) + def get_startup_info_display_class(): return load_class_from_string(conf.startup_info_display_class()) + class Namespace: """Namespace to initialize authenticator class with"""