diff --git a/alpha_automl/automl_manager.py b/alpha_automl/automl_manager.py index 2ff85b22..86a29a81 100644 --- a/alpha_automl/automl_manager.py +++ b/alpha_automl/automl_manager.py @@ -14,6 +14,7 @@ NEW_PRIMITIVES = {} SPLITTING_STRATEGY = 'holdout' SAMPLE_SIZE = 2000 +MAX_RUNNING_PROCESSES = multiprocessing.cpu_count() logger = logging.getLogger(__name__) @@ -29,6 +30,7 @@ def __init__(self, output_folder, time_bound, time_bound_run, task, verbose): self.y = None self.scoring = None self.splitting_strategy = None + self.found_pipelines = None self.verbose = verbose def search_pipelines(self, X, y, scoring, splitting_strategy, automl_hyperparams=None): @@ -57,13 +59,16 @@ def _search_pipelines(self, automl_hyperparams): queue = multiprocessing.Queue() search_process = multiprocessing.Process(target=search_pipelines_proc, args=(X, y, self.scoring, internal_splitting_strategy, self.task, - self.time_bound, automl_hyperparams, metadata, - self.output_folder, self.verbose, queue + automl_hyperparams, metadata, self.output_folder, self.verbose, + queue ) ) search_process.start() - found_pipelines = 0 + self.found_pipelines = 0 + scoring_pool = multiprocessing.Pool() + self.running_processes = 0 + pipelines_to_score = [] while True: result = queue.get() @@ -71,7 +76,7 @@ def _search_pipelines(self, automl_hyperparams): if result == 'DONE': search_process.terminate() search_process.join(10) - logger.debug(f'Found {found_pipelines} pipelines') + logger.debug(f'Found {self.found_pipelines} pipelines') logger.debug('Search done') break @@ -81,24 +86,33 @@ def _search_pipelines(self, automl_hyperparams): yield {'pipeline': pipeline, 'message': 'FOUND'} if need_rescoring: - score, start_time, end_time = score_pipeline(pipeline.get_pipeline(), self.X, self.y, self.scoring, - self.splitting_strategy, self.task) - pipeline.set_score(score) - pipeline.set_start_time(start_time) - pipeline.set_end_time(end_time) - - if score is not None: + pipelines_to_score.append(pipeline) + if self.running_processes < MAX_RUNNING_PROCESSES: + scoring_pool.apply_async(score_pipeline, args=(pipelines_to_score.pop(0).get_pipeline(), self.X, + self.y, self.scoring, self.splitting_strategy, + self.task), + callback=self._callback_score_pipeline) + else: logger.debug(f'Pipeline scored successfully, score={score}') - found_pipelines += 1 + self.found_pipelines += 1 yield {'pipeline': pipeline, 'message': 'SCORED'} if time.time() > search_start_time + self.time_bound: logger.debug('Reached search timeout') search_process.terminate() search_process.join(10) - logger.debug(f'Found {found_pipelines} pipelines') + scoring_pool.close() + scoring_pool.join() + logger.debug(f'Found {self.found_pipelines} pipelines') break + def _callback_score_pipeline(self, result): + pipeline = result + logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}') + self.found_pipelines += 1 + self.running_processes -= 1 + yield {'pipeline': pipeline, 'message': 'SCORED'} + def check_automl_hyperparams(self, automl_hyperparams): if 'use_automatic_grammar' not in automl_hyperparams: automl_hyperparams['use_automatic_grammar'] = USE_AUTOMATIC_GRAMMAR