Skip to content

Commit

Permalink
dashboard writes on a seperate file so we can use on slurm etc
Browse files Browse the repository at this point in the history
  • Loading branch information
octopuscabbage committed Nov 21, 2022
1 parent ee84b3b commit b81173f
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 71 deletions.
4 changes: 1 addition & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ numpy = "*"
tqdm = "*"
joblib = "*"
humanhash3 = "*"
dill = "*"
dill = ">=0.3.6"
scikit-learn="*"
python-dateutil = "*"

[requires]
python_version = "3.6"
8 changes: 8 additions & 0 deletions scripts/smdash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sys

from smallab.dashboard.dashboard import start_dashboard


if __name__ == "__main__":
experiment_name = sys.argv[1]
start_dashboard(experiment_name)
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def read(fname):

setup(
name="smallab",
version="2.1.0",
version="2.2.0",
url='https://github.com/octopuscabbage/smallab',
packages=find_packages(),
install_requires=required,
Expand All @@ -29,3 +29,9 @@ def read(fname):
long_description=read('README.md'),
long_description_content_type='text/markdown',
)
setup(
name='smdash',
version='1.0',
packages=find_packages(),
scripts=['scripts/smdash.py'],
)
216 changes: 151 additions & 65 deletions smallab/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
import math
import os
import time
from os.path import join, exists
from pathlib import Path
Expand All @@ -17,7 +18,7 @@
from smallab.dashboard.dashboard_events import (BeginEvent, CompleteEvent, ProgressEvent, LogEvent,
StartExperimentEvent, RegistrationCompleteEvent,
RegisterEvent, FailedEvent)
from smallab.file_locations import get_save_directory
from smallab.file_locations import get_dashboard_file, get_specification_save_dir, get_save_directory


# This is a globally accessible queue which stores the events for the dashboard
Expand Down Expand Up @@ -373,14 +374,26 @@ def draw_header_widget(row, stdscr, experiment_name, width, complete, active, re
completed_failed_string += " !!! You are logging too fast !!!"
stdscr.addstr(row, 0, completed_failed_string)

t = timeestimator.compute_time_stats(specification_progress, active, registered)
t = timeestimator.compute_expectation()
if t is not None:
expected, lower, higher = t
completion_stats_string = "Completion - Expected: {expectedstr} - Upper: {upperstr} - Lower: {lowerstr}".format(
expectedstr=display_time(expected), upperstr=display_time(higher), lowerstr=display_time(lower))
expected = t
completion_stats_string = "Completion - Expected: {expectedstr}".format(
expectedstr=display_time(expected) )
stdscr.addstr(row, width - len(completion_stats_string), completion_stats_string)
row += 1
stdscr.addstr(row, 0, "-" * width)
stdscr.addstr(row, 0, "=" * width)
row+=1
completed_iterations = timeestimator.completed_iterations
total_iterations = timeestimator.total_number_of_iterations
progress_str = f"Iteration Progress: {completed_iterations} / {total_iterations} "
stdscr.addstr(row, 0, progress_str)
num_astericks = math.floor(completed_iterations / total_iterations * (width - len(progress_str)))
num_dashes = width-len(progress_str) - num_astericks
stdscr.addstr(row, len(progress_str), "*" * num_astericks)
stdscr.addstr(row, len(progress_str) + num_astericks, "-" * num_dashes)
row+= 1
stdscr.addstr(row, 0, "=" * width)

row += 1
return row

Expand All @@ -390,7 +403,7 @@ def draw_specifications_widget(row, stdscr, active, registered, width, specifica
start_row = row
# Decide to draw in single or double column
second_column_begins = math.floor(width / 2)
max_height = math.floor(height / 2)
max_height = math.floor(height - 4)
use_double_column_layout = width >= 40 and len(active) + len(registered) > max_height - row
on_second_column = False
try:
Expand Down Expand Up @@ -433,24 +446,24 @@ def draw_specifications_widget(row, stdscr, active, registered, width, specifica
stdscr.addstr(row, width - len(status_string), status_string)
specification_readout_end_index = width - len(status_string)

specification = str(specification_id_to_specification[active_specification])
specification_string_start_index = specification_readout_index % len(specification)
max_allowed_length = specification_readout_end_index - specification_readout_start_index - 1
if len(specification) <= max_allowed_length:
stdscr.addstr(row, specification_readout_start_index,
specification)
else:
overflow = specification_string_start_index + max_allowed_length - len(specification) - 1
if overflow > 0:
stdscr.addstr(row, specification_readout_start_index,
specification[
specification_string_start_index:specification_string_start_index + max_allowed_length] + " " + specification[
:overflow])
# specification = str(specification_id_to_specification[active_specification])
# specification_string_start_index = specification_readout_index % len(specification)
# max_allowed_length = specification_readout_end_index - specification_readout_start_index - 1
# if len(specification) <= max_allowed_length:
# stdscr.addstr(row, specification_readout_start_index,
# specification)
# else:
# overflow = specification_string_start_index + max_allowed_length - len(specification) - 1
# if overflow > 0:
# stdscr.addstr(row, specification_readout_start_index,
# specification[
# specification_string_start_index:specification_string_start_index + max_allowed_length] + " " + specification[
# :overflow])

else:
stdscr.addstr(row, specification_readout_start_index,
specification[
specification_string_start_index:specification_string_start_index + max_allowed_length])
# else:
# stdscr.addstr(row, specification_readout_start_index,
# specification[
# specification_string_start_index:specification_string_start_index + max_allowed_length])
row += 1
if row >= max_height:
if use_double_column_layout and not on_second_column:
Expand Down Expand Up @@ -480,61 +493,101 @@ def draw_log_widget(row, stdscr, width, height, log_spool):
row += 1
return row

class SimpleTimeEstimator():
def __init__(self):
self.total_number_of_iterations = 0
self.completed_iterations = 0
self.start_time = 0
self.specification_progress = dict()

def record_progress(self,specification,progress,maximum):
if specification not in self.specification_progress:
self.total_number_of_iterations += maximum
self.specification_progress[specification] = 0
previous_progress = self.specification_progress[specification]
self.completed_iterations += progress - previous_progress
self.specification_progress[specification] = progress


def record_start_time(self,start_time):
self.start_time = start_time

def compute_expectation(self):
current_time = time.time()
time_elapsed = current_time - self.start_time

seconds_per_iteration = time_elapsed / self.completed_iterations

remaining_iterations = self.total_number_of_iterations - self.completed_iterations
return remaining_iterations * seconds_per_iteration

def run(stdscr, eventQueue, name):



def run(stdscr, name):
specification_ids_to_specification = dict()
max_events_per_frame = 1000
max_log_spool_events = 10**3
timeestimator = TimeEstimator(name)
timeestimator = SimpleTimeEstimator()
log_spool = []
active = []
complete = []
experiment_name = ""
experiment_name = name
specification_progress = dict()
registered = []
failed = []
start_time = time.time()
specification_readout_index = 0
i = 0
while True:
if time.time() - start_time > 1:
specification_readout_index += 1
# Drain Queue:
try:
i = 0
while not eventQueue.empty() and i < max_events_per_frame:
i += 1
event = eventQueue.get()
if isinstance(event, BeginEvent):
registered.remove(event.specification_id)
active.append(event.specification_id)
timeestimator.record_start(event.specification_id)

elif isinstance(event, CompleteEvent):
active.remove(event.specification_id)
complete.append(event.specification_id)
timeestimator.record_completion(event.specification_id)
elif isinstance(event, ProgressEvent):
specification_progress[event.specification_id] = (event.progress, event.max)
timeestimator.record_iteration(event.specification_id, event.progress)
elif isinstance(event, LogEvent):
if not event.message.isspace():
log_spool.append(event.message)
elif isinstance(event, StartExperimentEvent):
experiment_name = event.name
elif isinstance(event, RegisterEvent):
registered.append(event.specification_id)
spec = dict(event.specification)
specification_ids_to_specification[event.specification_id] = spec
timeestimator.update_specification_ids(specification_ids_to_specification)
timeestimator.update_possible_values(spec)
elif isinstance(event, FailedEvent):
active.remove(event.specification_id)
failed.append(event.specification_id)

elif isinstance(event, RegistrationCompleteEvent):
timeestimator.fit_encoders()
else:
print("Dashboard action not understood")
with open(get_dashboard_file(name),"r") as f:
lines = f.readlines()
for event_string in lines[i:]:
split = event_string.split(",")
key = split[0]
if key == 'BEGIN':
specification_id = split[1].replace("\n","")
start_time = float(split[2])
registered.remove(specification_id)
active.append(specification_id)
elif key == 'COMPLETE':
specification_id = split[1].replace("\n","")
active.remove(specification_id)
complete.append(specification_id)
elif key == 'PROGRESS':
specification_id = split[1].replace("\n","")
progress = float(split[2])
maximum = float(split[3])

specification_progress[specification_id] = (progress, maximum)
timeestimator.record_progress(specification_id, progress,maximum)
elif isinstance(key, LogEvent):
pass
#if not event.message.isspace():
# log_spool.apame
elif key == 'REGISTER':
specification_id = split[1].replace("\n","")
registered.append(specification_id)
# with open(os.path.join(get_specification_save_dir(name), specification_id + ".json"),"r") as j_f:
# specification = json.load(j_f)
# spec = dict(specification)
#specification_ids_to_specification[specification_id] = spec
elif key == "FAILED":
specification_id = split[1].replace("\n","")
active.remove(specification_id)
failed.append(specification_id)

elif key == "REGISTRATION_COMPLETE":
pass
elif key == "START":
timeestimator.record_start_time(start_time)
else:
print(f"Dashboard action not understood: {split}")
i = len(lines)
in_slow_mode = False
if i == max_events_per_frame:
in_slow_mode = True
Expand All @@ -547,14 +600,47 @@ def run(stdscr, eventQueue, name):
specification_progress, timeestimator, failed, in_slow_mode=in_slow_mode)
row = draw_specifications_widget(row, stdscr, active, registered, width, specification_progress, height,
failed, specification_ids_to_specification, specification_readout_index)
row = draw_log_widget(row, stdscr, width, height, log_spool)
#row = draw_log_widget(row, stdscr, width, height, log_spool)
stdscr.refresh()
time.sleep(0.1)
log_spool = log_spool[-max_log_spool_events:]
except Exception as e:
logging.getLogger("smallab.dashboard").error("Dashboard Error {}".format(e), exc_info=True)


def start_dashboard(eventQueue, name):
curses.wrapper(run, eventQueue, name)
def start_dashboard(name):
curses.wrapper(run, name)

def write_dashboard(eventQueue,name):
try:
os.remove(get_dashboard_file(name))
except FileNotFoundError:
pass
os.makedirs(get_specification_save_dir(name),exist_ok=True)
while True:
with open(get_dashboard_file(name),"a") as f:
while not eventQueue.empty():
event = eventQueue.get()
if isinstance(event, BeginEvent):
f.write(f"BEGIN,{event.specification_id},{time.time()}\n")
elif isinstance(event, CompleteEvent):
f.write(f"COMPLETE,{event.specification_id}\n" )
elif isinstance(event, ProgressEvent):
f.write(f"PROGRESS,{event.specification_id},{event.progress},{event.max}\n")
elif isinstance(event, LogEvent):
#f.write(f"LOG,{event.message}")
pass
elif isinstance(event, StartExperimentEvent):
f.write(f"START,{event.name}\n")
elif isinstance(event, RegisterEvent):
# with open(os.path.join(get_specification_save_dir(name), event.specification_id + ".json"),"w") as j_f:
# json.dump(event.specification,j_f)
f.write(f"REGISTER,{event.specification_id}\n")

elif isinstance(event, FailedEvent):
f.write(f"FAILED,{event.specification_id}\n")
elif isinstance(event, RegistrationCompleteEvent):
f.write(f"REGISRTATION_COMPLETE\n")
else:
print("Dashboard action not understood")
time.sleep(0.2)
6 changes: 6 additions & 0 deletions smallab/file_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ def get_experiment_local_storage(name):
def get_specification_local_storage(name, specification, experiment: ExperimentBase):
expr_name = experiment.get_name(specification)
return os.path.join(get_experiment_local_storage(name),expr_name)

def get_dashboard_file(name):
return os.path.join(get_save_directory(name), ".dashboard.csv")

def get_specification_save_dir(name):
return os.path.join(get_save_directory(name), "specifications")
4 changes: 2 additions & 2 deletions smallab/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os

from smallab.callbacks import CallbackManager
from smallab.dashboard.dashboard import start_dashboard
from smallab.dashboard.dashboard import start_dashboard, write_dashboard
from smallab.dashboard.dashboard_events import StartExperimentEvent, RegisterEvent, RegistrationCompleteEvent, \
ProgressEvent
from smallab.dashboard.utils import put_in_event_queue, LogToEventQueue
Expand Down Expand Up @@ -137,7 +137,7 @@ def run(self, name: typing.AnyStr, specifications: typing.List[Specification], e
sh.setFormatter(formatter)
logger.addHandler(sh)
else:
dashboard_process = ctx.Process(target=start_dashboard, args=(eventQueue, name))
dashboard_process = ctx.Process(target=write_dashboard, args=(eventQueue,name))
dashboard_process.start()
experiment.set_logging_folder(folder_loc)

Expand Down

0 comments on commit b81173f

Please sign in to comment.