Skip to content

Commit

Permalink
Use a pool of workers to score pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
roquelopez committed Feb 12, 2024
1 parent 5113daa commit b2f2a3d
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions alpha_automl/automl_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NEW_PRIMITIVES = {}
SPLITTING_STRATEGY = 'holdout'
SAMPLE_SIZE = 2000
MAX_RUNNING_PROCESSES = multiprocessing.cpu_count()

logger = logging.getLogger(__name__)

Expand All @@ -29,6 +30,7 @@ def __init__(self, output_folder, time_bound, time_bound_run, task, verbose):
self.y = None
self.scoring = None
self.splitting_strategy = None
self.found_pipelines = None
self.verbose = verbose

def search_pipelines(self, X, y, scoring, splitting_strategy, automl_hyperparams=None):
Expand Down Expand Up @@ -57,21 +59,24 @@ def _search_pipelines(self, automl_hyperparams):
queue = multiprocessing.Queue()
search_process = multiprocessing.Process(target=search_pipelines_proc,
args=(X, y, self.scoring, internal_splitting_strategy, self.task,
self.time_bound, automl_hyperparams, metadata,
self.output_folder, self.verbose, queue
automl_hyperparams, metadata, self.output_folder, self.verbose,
queue
)
)

search_process.start()
found_pipelines = 0
self.found_pipelines = 0
scoring_pool = multiprocessing.Pool()
self.running_processes = 0
pipelines_to_score = []

while True:
result = queue.get()

if result == 'DONE':
search_process.terminate()
search_process.join(10)
logger.debug(f'Found {found_pipelines} pipelines')
logger.debug(f'Found {self.found_pipelines} pipelines')
logger.debug('Search done')
break

Expand All @@ -81,24 +86,33 @@ def _search_pipelines(self, automl_hyperparams):
yield {'pipeline': pipeline, 'message': 'FOUND'}

if need_rescoring:
score, start_time, end_time = score_pipeline(pipeline.get_pipeline(), self.X, self.y, self.scoring,
self.splitting_strategy, self.task)
pipeline.set_score(score)
pipeline.set_start_time(start_time)
pipeline.set_end_time(end_time)

if score is not None:
pipelines_to_score.append(pipeline)
if self.running_processes < MAX_RUNNING_PROCESSES:
scoring_pool.apply_async(score_pipeline, args=(pipelines_to_score.pop(0).get_pipeline(), self.X,
self.y, self.scoring, self.splitting_strategy,
self.task),
callback=self._callback_score_pipeline)
else:
logger.debug(f'Pipeline scored successfully, score={score}')
found_pipelines += 1
self.found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}

if time.time() > search_start_time + self.time_bound:
logger.debug('Reached search timeout')
search_process.terminate()
search_process.join(10)
logger.debug(f'Found {found_pipelines} pipelines')
scoring_pool.close()
scoring_pool.join()
logger.debug(f'Found {self.found_pipelines} pipelines')
break

def _callback_score_pipeline(self, result):
pipeline = result
logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}')
self.found_pipelines += 1
self.running_processes -= 1
yield {'pipeline': pipeline, 'message': 'SCORED'}

def check_automl_hyperparams(self, automl_hyperparams):
if 'use_automatic_grammar' not in automl_hyperparams:
automl_hyperparams['use_automatic_grammar'] = USE_AUTOMATIC_GRAMMAR
Expand Down

0 comments on commit b2f2a3d

Please sign in to comment.