diff --git a/Pipfile b/Pipfile index b607ca9..ab17b9c 100644 --- a/Pipfile +++ b/Pipfile @@ -11,9 +11,7 @@ numpy = "*" tqdm = "*" joblib = "*" humanhash3 = "*" -dill = "*" +dill = ">=0.3.6" scikit-learn="*" python-dateutil = "*" -[requires] -python_version = "3.6" diff --git a/scripts/smdash.py b/scripts/smdash.py new file mode 100644 index 0000000..cd2d77a --- /dev/null +++ b/scripts/smdash.py @@ -0,0 +1,8 @@ +import sys + +from smallab.dashboard.dashboard import start_dashboard + + +if __name__ == "__main__": + experiment_name = sys.argv[1] + start_dashboard(experiment_name) \ No newline at end of file diff --git a/setup.py b/setup.py index f235279..1291105 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ def read(fname): setup( name="smallab", - version="2.1.0", + version="2.2.0", url='https://github.com/octopuscabbage/smallab', packages=find_packages(), install_requires=required, @@ -29,3 +29,9 @@ def read(fname): long_description=read('README.md'), long_description_content_type='text/markdown', ) +setup( + name='smdash', + version='1.0', + packages=find_packages(), + scripts=['scripts/smdash.py'], +) diff --git a/smallab/dashboard/dashboard.py b/smallab/dashboard/dashboard.py index cff9cd0..0f3131b 100644 --- a/smallab/dashboard/dashboard.py +++ b/smallab/dashboard/dashboard.py @@ -6,6 +6,7 @@ import json import logging import math +import os import time from os.path import join, exists from pathlib import Path @@ -17,7 +18,7 @@ from smallab.dashboard.dashboard_events import (BeginEvent, CompleteEvent, ProgressEvent, LogEvent, StartExperimentEvent, RegistrationCompleteEvent, RegisterEvent, FailedEvent) -from smallab.file_locations import get_save_directory +from smallab.file_locations import get_dashboard_file, get_specification_save_dir, get_save_directory # This is a globally accessible queue which stores the events for the dashboard @@ -373,14 +374,26 @@ def draw_header_widget(row, stdscr, experiment_name, width, complete, active, re completed_failed_string += " !!! You are logging too fast !!!" stdscr.addstr(row, 0, completed_failed_string) - t = timeestimator.compute_time_stats(specification_progress, active, registered) + t = timeestimator.compute_expectation() if t is not None: - expected, lower, higher = t - completion_stats_string = "Completion - Expected: {expectedstr} - Upper: {upperstr} - Lower: {lowerstr}".format( - expectedstr=display_time(expected), upperstr=display_time(higher), lowerstr=display_time(lower)) + expected = t + completion_stats_string = "Completion - Expected: {expectedstr}".format( + expectedstr=display_time(expected) ) stdscr.addstr(row, width - len(completion_stats_string), completion_stats_string) row += 1 - stdscr.addstr(row, 0, "-" * width) + stdscr.addstr(row, 0, "=" * width) + row+=1 + completed_iterations = timeestimator.completed_iterations + total_iterations = timeestimator.total_number_of_iterations + progress_str = f"Iteration Progress: {completed_iterations} / {total_iterations} " + stdscr.addstr(row, 0, progress_str) + num_astericks = math.floor(completed_iterations / total_iterations * (width - len(progress_str))) + num_dashes = width-len(progress_str) - num_astericks + stdscr.addstr(row, len(progress_str), "*" * num_astericks) + stdscr.addstr(row, len(progress_str) + num_astericks, "-" * num_dashes) + row+= 1 + stdscr.addstr(row, 0, "=" * width) + row += 1 return row @@ -390,7 +403,7 @@ def draw_specifications_widget(row, stdscr, active, registered, width, specifica start_row = row # Decide to draw in single or double column second_column_begins = math.floor(width / 2) - max_height = math.floor(height / 2) + max_height = math.floor(height - 4) use_double_column_layout = width >= 40 and len(active) + len(registered) > max_height - row on_second_column = False try: @@ -433,24 +446,24 @@ def draw_specifications_widget(row, stdscr, active, registered, width, specifica stdscr.addstr(row, width - len(status_string), status_string) specification_readout_end_index = width - len(status_string) - specification = str(specification_id_to_specification[active_specification]) - specification_string_start_index = specification_readout_index % len(specification) - max_allowed_length = specification_readout_end_index - specification_readout_start_index - 1 - if len(specification) <= max_allowed_length: - stdscr.addstr(row, specification_readout_start_index, - specification) - else: - overflow = specification_string_start_index + max_allowed_length - len(specification) - 1 - if overflow > 0: - stdscr.addstr(row, specification_readout_start_index, - specification[ - specification_string_start_index:specification_string_start_index + max_allowed_length] + " " + specification[ - :overflow]) + # specification = str(specification_id_to_specification[active_specification]) + # specification_string_start_index = specification_readout_index % len(specification) + # max_allowed_length = specification_readout_end_index - specification_readout_start_index - 1 + # if len(specification) <= max_allowed_length: + # stdscr.addstr(row, specification_readout_start_index, + # specification) + # else: + # overflow = specification_string_start_index + max_allowed_length - len(specification) - 1 + # if overflow > 0: + # stdscr.addstr(row, specification_readout_start_index, + # specification[ + # specification_string_start_index:specification_string_start_index + max_allowed_length] + " " + specification[ + # :overflow]) - else: - stdscr.addstr(row, specification_readout_start_index, - specification[ - specification_string_start_index:specification_string_start_index + max_allowed_length]) + # else: + # stdscr.addstr(row, specification_readout_start_index, + # specification[ + # specification_string_start_index:specification_string_start_index + max_allowed_length]) row += 1 if row >= max_height: if use_double_column_layout and not on_second_column: @@ -480,61 +493,101 @@ def draw_log_widget(row, stdscr, width, height, log_spool): row += 1 return row +class SimpleTimeEstimator(): + def __init__(self): + self.total_number_of_iterations = 0 + self.completed_iterations = 0 + self.start_time = 0 + self.specification_progress = dict() + + def record_progress(self,specification,progress,maximum): + if specification not in self.specification_progress: + self.total_number_of_iterations += maximum + self.specification_progress[specification] = 0 + previous_progress = self.specification_progress[specification] + self.completed_iterations += progress - previous_progress + self.specification_progress[specification] = progress + + + def record_start_time(self,start_time): + self.start_time = start_time + + def compute_expectation(self): + current_time = time.time() + time_elapsed = current_time - self.start_time + + seconds_per_iteration = time_elapsed / self.completed_iterations + + remaining_iterations = self.total_number_of_iterations - self.completed_iterations + return remaining_iterations * seconds_per_iteration -def run(stdscr, eventQueue, name): + + + +def run(stdscr, name): specification_ids_to_specification = dict() max_events_per_frame = 1000 max_log_spool_events = 10**3 - timeestimator = TimeEstimator(name) + timeestimator = SimpleTimeEstimator() log_spool = [] active = [] complete = [] - experiment_name = "" + experiment_name = name specification_progress = dict() registered = [] failed = [] start_time = time.time() specification_readout_index = 0 + i = 0 while True: if time.time() - start_time > 1: specification_readout_index += 1 # Drain Queue: try: - i = 0 - while not eventQueue.empty() and i < max_events_per_frame: - i += 1 - event = eventQueue.get() - if isinstance(event, BeginEvent): - registered.remove(event.specification_id) - active.append(event.specification_id) - timeestimator.record_start(event.specification_id) - - elif isinstance(event, CompleteEvent): - active.remove(event.specification_id) - complete.append(event.specification_id) - timeestimator.record_completion(event.specification_id) - elif isinstance(event, ProgressEvent): - specification_progress[event.specification_id] = (event.progress, event.max) - timeestimator.record_iteration(event.specification_id, event.progress) - elif isinstance(event, LogEvent): - if not event.message.isspace(): - log_spool.append(event.message) - elif isinstance(event, StartExperimentEvent): - experiment_name = event.name - elif isinstance(event, RegisterEvent): - registered.append(event.specification_id) - spec = dict(event.specification) - specification_ids_to_specification[event.specification_id] = spec - timeestimator.update_specification_ids(specification_ids_to_specification) - timeestimator.update_possible_values(spec) - elif isinstance(event, FailedEvent): - active.remove(event.specification_id) - failed.append(event.specification_id) - - elif isinstance(event, RegistrationCompleteEvent): - timeestimator.fit_encoders() - else: - print("Dashboard action not understood") + with open(get_dashboard_file(name),"r") as f: + lines = f.readlines() + for event_string in lines[i:]: + split = event_string.split(",") + key = split[0] + if key == 'BEGIN': + specification_id = split[1].replace("\n","") + start_time = float(split[2]) + registered.remove(specification_id) + active.append(specification_id) + elif key == 'COMPLETE': + specification_id = split[1].replace("\n","") + active.remove(specification_id) + complete.append(specification_id) + elif key == 'PROGRESS': + specification_id = split[1].replace("\n","") + progress = float(split[2]) + maximum = float(split[3]) + + specification_progress[specification_id] = (progress, maximum) + timeestimator.record_progress(specification_id, progress,maximum) + elif isinstance(key, LogEvent): + pass + #if not event.message.isspace(): + # log_spool.apame + elif key == 'REGISTER': + specification_id = split[1].replace("\n","") + registered.append(specification_id) + # with open(os.path.join(get_specification_save_dir(name), specification_id + ".json"),"r") as j_f: + # specification = json.load(j_f) + # spec = dict(specification) + #specification_ids_to_specification[specification_id] = spec + elif key == "FAILED": + specification_id = split[1].replace("\n","") + active.remove(specification_id) + failed.append(specification_id) + + elif key == "REGISTRATION_COMPLETE": + pass + elif key == "START": + timeestimator.record_start_time(start_time) + else: + print(f"Dashboard action not understood: {split}") + i = len(lines) in_slow_mode = False if i == max_events_per_frame: in_slow_mode = True @@ -547,7 +600,7 @@ def run(stdscr, eventQueue, name): specification_progress, timeestimator, failed, in_slow_mode=in_slow_mode) row = draw_specifications_widget(row, stdscr, active, registered, width, specification_progress, height, failed, specification_ids_to_specification, specification_readout_index) - row = draw_log_widget(row, stdscr, width, height, log_spool) + #row = draw_log_widget(row, stdscr, width, height, log_spool) stdscr.refresh() time.sleep(0.1) log_spool = log_spool[-max_log_spool_events:] @@ -555,6 +608,39 @@ def run(stdscr, eventQueue, name): logging.getLogger("smallab.dashboard").error("Dashboard Error {}".format(e), exc_info=True) -def start_dashboard(eventQueue, name): - curses.wrapper(run, eventQueue, name) +def start_dashboard(name): + curses.wrapper(run, name) + +def write_dashboard(eventQueue,name): + try: + os.remove(get_dashboard_file(name)) + except FileNotFoundError: + pass + os.makedirs(get_specification_save_dir(name),exist_ok=True) + while True: + with open(get_dashboard_file(name),"a") as f: + while not eventQueue.empty(): + event = eventQueue.get() + if isinstance(event, BeginEvent): + f.write(f"BEGIN,{event.specification_id},{time.time()}\n") + elif isinstance(event, CompleteEvent): + f.write(f"COMPLETE,{event.specification_id}\n" ) + elif isinstance(event, ProgressEvent): + f.write(f"PROGRESS,{event.specification_id},{event.progress},{event.max}\n") + elif isinstance(event, LogEvent): + #f.write(f"LOG,{event.message}") + pass + elif isinstance(event, StartExperimentEvent): + f.write(f"START,{event.name}\n") + elif isinstance(event, RegisterEvent): + # with open(os.path.join(get_specification_save_dir(name), event.specification_id + ".json"),"w") as j_f: + # json.dump(event.specification,j_f) + f.write(f"REGISTER,{event.specification_id}\n") + elif isinstance(event, FailedEvent): + f.write(f"FAILED,{event.specification_id}\n") + elif isinstance(event, RegistrationCompleteEvent): + f.write(f"REGISRTATION_COMPLETE\n") + else: + print("Dashboard action not understood") + time.sleep(0.2) \ No newline at end of file diff --git a/smallab/file_locations.py b/smallab/file_locations.py index c17a758..14f1bc3 100644 --- a/smallab/file_locations.py +++ b/smallab/file_locations.py @@ -78,3 +78,9 @@ def get_experiment_local_storage(name): def get_specification_local_storage(name, specification, experiment: ExperimentBase): expr_name = experiment.get_name(specification) return os.path.join(get_experiment_local_storage(name),expr_name) + +def get_dashboard_file(name): + return os.path.join(get_save_directory(name), ".dashboard.csv") + +def get_specification_save_dir(name): + return os.path.join(get_save_directory(name), "specifications") \ No newline at end of file diff --git a/smallab/runner/runner.py b/smallab/runner/runner.py index 7f61f03..a68d5e2 100644 --- a/smallab/runner/runner.py +++ b/smallab/runner/runner.py @@ -7,7 +7,7 @@ import os from smallab.callbacks import CallbackManager -from smallab.dashboard.dashboard import start_dashboard +from smallab.dashboard.dashboard import start_dashboard, write_dashboard from smallab.dashboard.dashboard_events import StartExperimentEvent, RegisterEvent, RegistrationCompleteEvent, \ ProgressEvent from smallab.dashboard.utils import put_in_event_queue, LogToEventQueue @@ -137,7 +137,7 @@ def run(self, name: typing.AnyStr, specifications: typing.List[Specification], e sh.setFormatter(formatter) logger.addHandler(sh) else: - dashboard_process = ctx.Process(target=start_dashboard, args=(eventQueue, name)) + dashboard_process = ctx.Process(target=write_dashboard, args=(eventQueue,name)) dashboard_process.start() experiment.set_logging_folder(folder_loc)