From f5d88225f75ff8883b429dce989dfe83d67551d3 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 4 Dec 2024 16:37:20 +0200 Subject: [PATCH 01/21] - train rf model in the background while worker processes use older version - minor fix: type hint --- smac/intensifier/abstract_intensifier.py | 2 +- smac/model/random_forest/random_forest.py | 86 ++++++++++++++++++++--- 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/smac/intensifier/abstract_intensifier.py b/smac/intensifier/abstract_intensifier.py index b7a5ae1ca..41c80c22f 100644 --- a/smac/intensifier/abstract_intensifier.py +++ b/smac/intensifier/abstract_intensifier.py @@ -62,7 +62,7 @@ def __init__( ): self._scenario = scenario self._config_selector: ConfigSelector | None = None - self._config_generator: Iterator[ConfigSelector] | None = None + self._config_generator: Iterator[Configuration] | None = None self._runhistory: RunHistory | None = None if seed is None: diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index 72685803f..9a1062cea 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import Any +from typing import Any, TYPE_CHECKING +from threading import Thread, Condition, Event, Lock import numpy as np from ConfigSpace import ConfigurationSpace @@ -11,10 +12,68 @@ from smac.constants import N_TREES, VERY_SMALL_NUMBER from smac.model.random_forest import AbstractRandomForest +if TYPE_CHECKING: + from pyrfr.regression import forest_opts as ForestOpts + __copyright__ = "Copyright 2022, automl.org" __license__ = "3-clause BSD" +class RFTrainer(Thread): + def __init__(self): + self._model: BinaryForest | None = None + # we could use a RWLockFair from https://pypi.org/project/readerwriterlock/, but it seems to be a bit of an + # overkill since critical section are rather short + self.model_lock = Lock() + self.model_available = Event() + + self.data = None + self.opts = None + self.data_cv = Condition() + + super().__init__(daemon=True) + self.start() + + @property + def model(self): + self.model_available.wait() + with self.model_lock: + return self._model + + def submit_for_training(self, data: DataContainer, opts: ForestOpts): + # use condition variable to wake up the trainer thread if it's sleeping + with self.data_cv: + assert data is not None + # overwrite with latest training data + self.data = data + self.opts = opts + self.data_cv.notify() + + def run(self) -> None: + while True: + # sleep until new data is submitted for training + with self.data_cv: + while self.data is None: + self.data_cv.wait() + data = self.data + self.data = None + + # here we could (conditionally) call self.model_available.clear() in order to make _some_ worker threads + # wait for training to finish before receiving a new configuration to try, depending on CPU load; we might + # have to replace the Event by a Condition + + _rf = regression.binary_rss_forest() + _rf.options = self.opts + + _rf.fit(data, rng=self._rng) + + with self.model_lock: + self._model = _rf + + if not self.model_available.is_set(): + self.model_available.set() + + class RandomForest(AbstractRandomForest): """Random forest that takes instance features into account. @@ -85,7 +144,7 @@ def __init__( self._rf_opts.tree_opts.epsilon_purity = eps_purity self._rf_opts.tree_opts.max_num_nodes = max_nodes self._rf_opts.compute_law_of_total_variance = False - self._rf: BinaryForest | None = None + self._rf = RFTrainer() self._log_y = log_y # Case to `int` incase we get an `np.integer` type @@ -142,16 +201,18 @@ def _train(self, X: np.ndarray, y: np.ndarray) -> RandomForest: # self.X = X # self.y = y.flatten() + data = self._init_data_container(X, y) + if self._n_points_per_tree <= 0: self._rf_opts.num_data_points_per_tree = X.shape[0] else: self._rf_opts.num_data_points_per_tree = self._n_points_per_tree - self._rf = regression.binary_rss_forest() - self._rf.options = self._rf_opts + self._rf.submit_for_training(data, self._rf_opts) - data = self._init_data_container(X, y) - self._rf.fit(data, rng=self._rng) + # call this to make sure that there exists a trained model before returning (actually, not sure this is + # required, since we check within predict() anyway) + # _ = self._rf.model return self @@ -198,7 +259,9 @@ def _predict( if covariance_type != "diagonal": raise ValueError("`covariance_type` can only take `diagonal` for this model.") - assert self._rf is not None + rf = self._rf.model + + assert rf is not None X = self._impute_inactive(X) if self._log_y: @@ -207,7 +270,7 @@ def _predict( # Gather data in a list of 2d arrays and get statistics about the required size of the 3d array for row_X in X: - preds_per_tree = self._rf.all_leaf_values(row_X) + preds_per_tree = rf.all_leaf_values(row_X) all_preds.append(preds_per_tree) max_num_leaf_data = max(map(len, preds_per_tree)) third_dimension = max(max_num_leaf_data, third_dimension) @@ -227,7 +290,7 @@ def _predict( else: means, vars_ = [], [] for row_X in X: - mean_, var = self._rf.predict_mean_var(row_X) + mean_, var = rf.predict_mean_var(row_X) means.append(mean_) vars_.append(var) @@ -273,11 +336,12 @@ def predict_marginalized(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]: if X.shape[1] != len(self._bounds): raise ValueError("Rows in X should have %d entries but have %d!" % (len(self._bounds), X.shape[1])) - assert self._rf is not None + rf = self._rf.model + assert rf is not None X = self._impute_inactive(X) X_feat = list(self._instance_features.values()) - dat_ = self._rf.predict_marginalized_over_instances_batch(X, X_feat, self._log_y) + dat_ = rf.predict_marginalized_over_instances_batch(X, X_feat, self._log_y) dat_ = np.array(dat_) # 3. compute statistics across trees From 642d8fc3f346d737d3246d475e57641de7662ce9 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Tue, 10 Dec 2024 17:04:20 +0200 Subject: [PATCH 02/21] work in progress: replace threads by multiprocessing, use shared memory (backed by mmap on posix-compliant OSs) for data, and queues for synchronization --- smac/model/random_forest/random_forest.py | 154 ++++++++++++++++++++-- 1 file changed, 145 insertions(+), 9 deletions(-) diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index 9a1062cea..b1c6d403f 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -1,9 +1,14 @@ from __future__ import annotations -from typing import Any, TYPE_CHECKING -from threading import Thread, Condition, Event, Lock +from typing import Any, Optional, TYPE_CHECKING +from enum import IntEnum, unique, auto + +import math +from multiprocessing import Process, Queue, Lock, shared_memory +from contextlib import contextmanager import numpy as np +import numpy.typing as npt from ConfigSpace import ConfigurationSpace from pyrfr import regression from pyrfr.regression import binary_rss_forest as BinaryForest @@ -19,25 +24,156 @@ __license__ = "3-clause BSD" -class RFTrainer(Thread): +# make it IntEnum for easier serialization +@unique +class DataCommand(IntEnum): + RESIZE = auto() # trainer proc doesn't have to reinit shared mem, just read more lines from the buffer + GROW = auto() # trainer proc has to reint shared mem bc it has been reallocated + SHUTDOWN = auto() # join thread + + +def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool: + return np.issubdtype(dtype2, dtype1) and np.issubdtype(dtype1, dtype2) + + +@contextmanager +def single_read_shared_mem(name: str): + shm = shared_memory.SharedMemory(name) + try: + yield shm + finally: + shm.close() + + +class GrowingSharedArrayReaderView: + basename_X: str = 'X' + basename_y: str = 'y' + + def __init__(self, lock: Lock): + self.lock = lock + self.shm_X: Optional[shared_memory.SharedMemory] = None + self.shm_y: Optional[shared_memory.SharedMemory] = None + + def __del__(self): + if self.shm_X is not None: + self.shm_X.close() + if self.shm_y is not None: + self.shm_y.close() + + @property + def capacity(self) -> Optional[int]: + if self.shm_y is None: + return None + assert self.shm_y.size % np.float64.itemsize == 0 + return self.shm_y.size / np.float64.itemsize + + @property + def row_size(self) -> Optional[int]: + if self.shm_X is None: + return None + if self.shm_X.size == 0: + assert self.shm_y.size == 0 + return 0 + assert self.shm_X.size % self.shm_y.size == 0 + return self.shm_X.size // self.shm_y.size + + def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: + X = np.ndarray(shape=(self.capacity, self.row_size), dtype=np.float64, buffer=self.shm_X.buf) + y = np.ndarray(shape=(self.capacity,), dtype=np.float64, buffer=self.shm_y.buf) + return X[:size], y[:size] + + def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: + # TODO: measure perf: currently, we opted for releasing the memory immediately after read, which might make + # things a bit slower because we need to create the shared mem object every time even if it wasn't changed. + # This has the advantage that it requires less memory when reallocation occurs, and it simplifies memory + # management (i.e., I don't know if `unlink` blocks until all handles are `close`d) + # [parentheses in 'with' only work starting with python 3.10] + with self.lock, single_read_shared_mem(f'{self.basename_X}_{shm_id}') as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: + self.shm_X, self.shm_y = shm_X, shm_y + shared_X, shared_y = self.np_view(size) + X, y = np.array(shared_X), np.array(shared_y) # make copies + self.shm_X = self.shm_y = None + return X, y + + +class GrowingSharedArray(GrowingSharedArrayReaderView): + def __init__(self): + self.shm_id: int = 0 + self.growth_rate = 1.5 + super().__init__(lock=Lock()) + + def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> None: + assert len(X) == len(y) + assert X.ndim == 2 + assert y.ndim == 1 + assert dtypes_are_equal(X.dtype, np.float64) + assert dtypes_are_equal(y.dtype, np.float64) + assert X.dtype.itemsize == 8 + assert y.dtype.itemsize == 8 + + size = len(y) + grow = size > self.capacity + if grow: + if self.capacity: + n_growth = math.ceil(math.log(size / self.capacity, self.growth_rate)) + capacity = int(math.ceil(self.capacity * self.growth_rate ** n_growth)) + self.shm_id += 1 + else: + assert self.shm_X is None + assert self.shm_y is None + capacity = size + + if self.row_size is not None: + assert X.shape[1] == self.row_size + + shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{self.shm_id}', create=True, + size=capacity * self.row_size * X.dtype.itemsize) + shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{self.shm_id}', create=True, + size=capacity * y.dtype.itemsize) + + with self.lock: + if grow: + if self.capacity: + assert self.shm_X is not None + self.shm_X.close() + self.shm_X.unlink() + assert self.shm_y is not None + self.shm_y.close() + self.shm_y.unlink() + self.shm_X = shm_X + self.shm_y = shm_y + X_buf, y_buf = self.np_view(size) + X_buf[...] = X + y_buf[...] = y + + +class RFTrainer(Process): def __init__(self): self._model: BinaryForest | None = None - # we could use a RWLockFair from https://pypi.org/project/readerwriterlock/, but it seems to be a bit of an - # overkill since critical section are rather short self.model_lock = Lock() - self.model_available = Event() + self.model_queue = Queue(maxsize=1) - self.data = None + self.X = None + self.y = None self.opts = None - self.data_cv = Condition() + self.data_queue = Queue(maxsize=1) super().__init__(daemon=True) self.start() @property def model(self): - self.model_available.wait() + model = None + while True: + m = self.model_queue.get(block=False) + if m is None: + break + else: + model = m + with self.model_lock: + if model is not None: + self._model = model return self._model def submit_for_training(self, data: DataContainer, opts: ForestOpts): From 7492c48fd88529a7f43da5d8dd63b9445ed97c08 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 10:46:26 +0200 Subject: [PATCH 03/21] work in progress: keep handle to shared memory until change instead of re-opening every time --- smac/model/random_forest/random_forest.py | 46 +++++++++++------------ 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index b1c6d403f..db789845c 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -5,7 +5,6 @@ import math from multiprocessing import Process, Queue, Lock, shared_memory -from contextlib import contextmanager import numpy as np import numpy.typing as npt @@ -36,21 +35,13 @@ def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool: return np.issubdtype(dtype2, dtype1) and np.issubdtype(dtype1, dtype2) -@contextmanager -def single_read_shared_mem(name: str): - shm = shared_memory.SharedMemory(name) - try: - yield shm - finally: - shm.close() - - class GrowingSharedArrayReaderView: basename_X: str = 'X' basename_y: str = 'y' def __init__(self, lock: Lock): self.lock = lock + self.shm_id: Optional[int] = None self.shm_X: Optional[shared_memory.SharedMemory] = None self.shm_y: Optional[shared_memory.SharedMemory] = None @@ -83,22 +74,28 @@ def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.fl return X[:size], y[:size] def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: - # TODO: measure perf: currently, we opted for releasing the memory immediately after read, which might make - # things a bit slower because we need to create the shared mem object every time even if it wasn't changed. - # This has the advantage that it requires less memory when reallocation occurs, and it simplifies memory - # management (i.e., I don't know if `unlink` blocks until all handles are `close`d) - # [parentheses in 'with' only work starting with python 3.10] - with self.lock, single_read_shared_mem(f'{self.basename_X}_{shm_id}') as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: - self.shm_X, self.shm_y = shm_X, shm_y + with self.lock: + # single_read_shared_mem() as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: + if shm_id != self.shm_id: + self.shm_X.close() + del self.shm_X + self.shm_X = None + + self.shm_y.close() + del self.shm_y + self.shm_y = None + + self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') + self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') + shared_X, shared_y = self.np_view(size) X, y = np.array(shared_X), np.array(shared_y) # make copies - self.shm_X = self.shm_y = None + return X, y class GrowingSharedArray(GrowingSharedArrayReaderView): def __init__(self): - self.shm_id: int = 0 self.growth_rate = 1.5 super().__init__(lock=Lock()) @@ -122,6 +119,7 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No assert self.shm_X is None assert self.shm_y is None capacity = size + self.shm_id = 0 if self.row_size is not None: assert X.shape[1] == self.row_size @@ -153,8 +151,6 @@ def __init__(self): self.model_lock = Lock() self.model_queue = Queue(maxsize=1) - self.X = None - self.y = None self.opts = None self.data_queue = Queue(maxsize=1) @@ -176,7 +172,7 @@ def model(self): self._model = model return self._model - def submit_for_training(self, data: DataContainer, opts: ForestOpts): + def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], opts: ForestOpts): # use condition variable to wake up the trainer thread if it's sleeping with self.data_cv: assert data is not None @@ -198,6 +194,8 @@ def run(self) -> None: # wait for training to finish before receiving a new configuration to try, depending on CPU load; we might # have to replace the Event by a Condition + data = self._init_data_container(X, y) + _rf = regression.binary_rss_forest() _rf.options = self.opts @@ -337,14 +335,12 @@ def _train(self, X: np.ndarray, y: np.ndarray) -> RandomForest: # self.X = X # self.y = y.flatten() - data = self._init_data_container(X, y) - if self._n_points_per_tree <= 0: self._rf_opts.num_data_points_per_tree = X.shape[0] else: self._rf_opts.num_data_points_per_tree = self._n_points_per_tree - self._rf.submit_for_training(data, self._rf_opts) + self._rf.submit_for_training(X, y, self._rf_opts) # call this to make sure that there exists a trained model before returning (actually, not sure this is # required, since we check within predict() anyway) From d9c3867661e3e387cc0f4e30222f0583e9fa42e5 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 11:41:44 +0200 Subject: [PATCH 04/21] refactor: encapsulation: move multiprocessing implementation to dedicated subpackage --- .../multiproc_util/GrowingSharedArray.py | 125 +++++++++++ .../random_forest/multiproc_util/RFTrainer.py | 75 +++++++ .../random_forest/multiproc_util/__init__.py | 22 ++ smac/model/random_forest/random_forest.py | 196 +----------------- 4 files changed, 224 insertions(+), 194 deletions(-) create mode 100644 smac/model/random_forest/multiproc_util/GrowingSharedArray.py create mode 100644 smac/model/random_forest/multiproc_util/RFTrainer.py create mode 100644 smac/model/random_forest/multiproc_util/__init__.py diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py new file mode 100644 index 000000000..6f4575ea1 --- /dev/null +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Optional + +import math +from multiprocessing import Lock, shared_memory + +import numpy as np +from numpy import typing as npt + + +def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool: + return np.issubdtype(dtype2, dtype1) and np.issubdtype(dtype1, dtype2) + + +class GrowingSharedArrayReaderView: + basename_X: str = 'X' + basename_y: str = 'y' + + def __init__(self, lock: Lock): + self.lock = lock + self.shm_id: Optional[int] = None + self.shm_X: Optional[shared_memory.SharedMemory] = None + self.shm_y: Optional[shared_memory.SharedMemory] = None + + def __del__(self): + if self.shm_X is not None: + self.shm_X.close() + if self.shm_y is not None: + self.shm_y.close() + + @property + def capacity(self) -> Optional[int]: + if self.shm_y is None: + return None + assert self.shm_y.size % np.float64.itemsize == 0 + return self.shm_y.size / np.float64.itemsize + + @property + def row_size(self) -> Optional[int]: + if self.shm_X is None: + return None + if self.shm_X.size == 0: + assert self.shm_y.size == 0 + return 0 + assert self.shm_X.size % self.shm_y.size == 0 + return self.shm_X.size // self.shm_y.size + + def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: + X = np.ndarray(shape=(self.capacity, self.row_size), dtype=np.float64, buffer=self.shm_X.buf) + y = np.ndarray(shape=(self.capacity,), dtype=np.float64, buffer=self.shm_y.buf) + return X[:size], y[:size] + + def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: + with self.lock: + # single_read_shared_mem() as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: + if shm_id != self.shm_id: + self.shm_X.close() + del self.shm_X + self.shm_X = None + + self.shm_y.close() + del self.shm_y + self.shm_y = None + + self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') + self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') + + shared_X, shared_y = self.np_view(size) + X, y = np.array(shared_X), np.array(shared_y) # make copies + + return X, y + + +class GrowingSharedArray(GrowingSharedArrayReaderView): + def __init__(self): + self.growth_rate = 1.5 + super().__init__(lock=Lock()) + + def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> None: + assert len(X) == len(y) + assert X.ndim == 2 + assert y.ndim == 1 + assert dtypes_are_equal(X.dtype, np.float64) + assert dtypes_are_equal(y.dtype, np.float64) + assert X.dtype.itemsize == 8 + assert y.dtype.itemsize == 8 + + size = len(y) + grow = size > self.capacity + if grow: + if self.capacity: + n_growth = math.ceil(math.log(size / self.capacity, self.growth_rate)) + capacity = int(math.ceil(self.capacity * self.growth_rate ** n_growth)) + self.shm_id += 1 + else: + assert self.shm_X is None + assert self.shm_y is None + capacity = size + self.shm_id = 0 + + if self.row_size is not None: + assert X.shape[1] == self.row_size + + shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{self.shm_id}', create=True, + size=capacity * self.row_size * X.dtype.itemsize) + shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{self.shm_id}', create=True, + size=capacity * y.dtype.itemsize) + + with self.lock: + if grow: + if self.capacity: + assert self.shm_X is not None + self.shm_X.close() + self.shm_X.unlink() + assert self.shm_y is not None + self.shm_y.close() + self.shm_y.unlink() + self.shm_X = shm_X + self.shm_y = shm_y + X_buf, y_buf = self.np_view(size) + X_buf[...] = X + y_buf[...] = y + diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py new file mode 100644 index 000000000..69807f40e --- /dev/null +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import TYPE_CHECKING, Optional + +from multiprocessing import Process, Lock, Queue + +from numpy import typing as npt +import numpy as np + +if TYPE_CHECKING: + from pyrfr.regression import binary_rss_forest as BinaryForest, forest_opts as ForestOpts + + +class RFTrainer(Process): + def __init__(self): + self._model: Optional[BinaryForest] = None + self.model_lock = Lock() + self.model_queue = Queue(maxsize=1) + + self.opts = None + self.data_queue = Queue(maxsize=1) + + super().__init__(daemon=True) + self.start() + + @property + def model(self): + model = None + while True: + m = self.model_queue.get(block=False) + if m is None: + break + else: + model = m + + with self.model_lock: + if model is not None: + self._model = model + return self._model + + def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], opts: ForestOpts): + # use condition variable to wake up the trainer thread if it's sleeping + with self.data_cv: + assert data is not None + # overwrite with latest training data + self.data = data + self.opts = opts + self.data_cv.notify() + + def run(self) -> None: + while True: + # sleep until new data is submitted for training + with self.data_cv: + while self.data is None: + self.data_cv.wait() + data = self.data + self.data = None + + # here we could (conditionally) call self.model_available.clear() in order to make _some_ worker threads + # wait for training to finish before receiving a new configuration to try, depending on CPU load; we might + # have to replace the Event by a Condition + + data = self._init_data_container(X, y) + + _rf = regression.binary_rss_forest() + _rf.options = self.opts + + _rf.fit(data, rng=self._rng) + + with self.model_lock: + self._model = _rf + + if not self.model_available.is_set(): + self.model_available.set() diff --git a/smac/model/random_forest/multiproc_util/__init__.py b/smac/model/random_forest/multiproc_util/__init__.py new file mode 100644 index 000000000..b5937cb18 --- /dev/null +++ b/smac/model/random_forest/multiproc_util/__init__.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +doc of what is in the file +""" + +__author__ = "Iddo Software " +__copyright__ = "Copyright © 2022, Client & Iddo Software. All Rights Reserved." +__license__ = "Proprietary" +__version__ = "0.1" +__maintainer__ = "Iddo Software " +__email__ = "bogdan@iddo.ro" +__status__ = "Development" # can also be "Prototype" or "Production" + + +def main(): + pass + + +if __name__ == '__main__': + main() diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index db789845c..2e5f527ef 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -1,213 +1,21 @@ from __future__ import annotations -from typing import Any, Optional, TYPE_CHECKING -from enum import IntEnum, unique, auto - -import math -from multiprocessing import Process, Queue, Lock, shared_memory +from typing import Any import numpy as np -import numpy.typing as npt from ConfigSpace import ConfigurationSpace from pyrfr import regression -from pyrfr.regression import binary_rss_forest as BinaryForest + from pyrfr.regression import default_data_container as DataContainer from smac.constants import N_TREES, VERY_SMALL_NUMBER from smac.model.random_forest import AbstractRandomForest -if TYPE_CHECKING: - from pyrfr.regression import forest_opts as ForestOpts __copyright__ = "Copyright 2022, automl.org" __license__ = "3-clause BSD" -# make it IntEnum for easier serialization -@unique -class DataCommand(IntEnum): - RESIZE = auto() # trainer proc doesn't have to reinit shared mem, just read more lines from the buffer - GROW = auto() # trainer proc has to reint shared mem bc it has been reallocated - SHUTDOWN = auto() # join thread - - -def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool: - return np.issubdtype(dtype2, dtype1) and np.issubdtype(dtype1, dtype2) - - -class GrowingSharedArrayReaderView: - basename_X: str = 'X' - basename_y: str = 'y' - - def __init__(self, lock: Lock): - self.lock = lock - self.shm_id: Optional[int] = None - self.shm_X: Optional[shared_memory.SharedMemory] = None - self.shm_y: Optional[shared_memory.SharedMemory] = None - - def __del__(self): - if self.shm_X is not None: - self.shm_X.close() - if self.shm_y is not None: - self.shm_y.close() - - @property - def capacity(self) -> Optional[int]: - if self.shm_y is None: - return None - assert self.shm_y.size % np.float64.itemsize == 0 - return self.shm_y.size / np.float64.itemsize - - @property - def row_size(self) -> Optional[int]: - if self.shm_X is None: - return None - if self.shm_X.size == 0: - assert self.shm_y.size == 0 - return 0 - assert self.shm_X.size % self.shm_y.size == 0 - return self.shm_X.size // self.shm_y.size - - def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: - X = np.ndarray(shape=(self.capacity, self.row_size), dtype=np.float64, buffer=self.shm_X.buf) - y = np.ndarray(shape=(self.capacity,), dtype=np.float64, buffer=self.shm_y.buf) - return X[:size], y[:size] - - def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: - with self.lock: - # single_read_shared_mem() as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: - if shm_id != self.shm_id: - self.shm_X.close() - del self.shm_X - self.shm_X = None - - self.shm_y.close() - del self.shm_y - self.shm_y = None - - self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') - self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') - - shared_X, shared_y = self.np_view(size) - X, y = np.array(shared_X), np.array(shared_y) # make copies - - return X, y - - -class GrowingSharedArray(GrowingSharedArrayReaderView): - def __init__(self): - self.growth_rate = 1.5 - super().__init__(lock=Lock()) - - def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> None: - assert len(X) == len(y) - assert X.ndim == 2 - assert y.ndim == 1 - assert dtypes_are_equal(X.dtype, np.float64) - assert dtypes_are_equal(y.dtype, np.float64) - assert X.dtype.itemsize == 8 - assert y.dtype.itemsize == 8 - - size = len(y) - grow = size > self.capacity - if grow: - if self.capacity: - n_growth = math.ceil(math.log(size / self.capacity, self.growth_rate)) - capacity = int(math.ceil(self.capacity * self.growth_rate ** n_growth)) - self.shm_id += 1 - else: - assert self.shm_X is None - assert self.shm_y is None - capacity = size - self.shm_id = 0 - - if self.row_size is not None: - assert X.shape[1] == self.row_size - - shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{self.shm_id}', create=True, - size=capacity * self.row_size * X.dtype.itemsize) - shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{self.shm_id}', create=True, - size=capacity * y.dtype.itemsize) - - with self.lock: - if grow: - if self.capacity: - assert self.shm_X is not None - self.shm_X.close() - self.shm_X.unlink() - assert self.shm_y is not None - self.shm_y.close() - self.shm_y.unlink() - self.shm_X = shm_X - self.shm_y = shm_y - X_buf, y_buf = self.np_view(size) - X_buf[...] = X - y_buf[...] = y - - -class RFTrainer(Process): - def __init__(self): - self._model: BinaryForest | None = None - self.model_lock = Lock() - self.model_queue = Queue(maxsize=1) - - self.opts = None - self.data_queue = Queue(maxsize=1) - - super().__init__(daemon=True) - self.start() - - @property - def model(self): - model = None - while True: - m = self.model_queue.get(block=False) - if m is None: - break - else: - model = m - - with self.model_lock: - if model is not None: - self._model = model - return self._model - - def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], opts: ForestOpts): - # use condition variable to wake up the trainer thread if it's sleeping - with self.data_cv: - assert data is not None - # overwrite with latest training data - self.data = data - self.opts = opts - self.data_cv.notify() - - def run(self) -> None: - while True: - # sleep until new data is submitted for training - with self.data_cv: - while self.data is None: - self.data_cv.wait() - data = self.data - self.data = None - - # here we could (conditionally) call self.model_available.clear() in order to make _some_ worker threads - # wait for training to finish before receiving a new configuration to try, depending on CPU load; we might - # have to replace the Event by a Condition - - data = self._init_data_container(X, y) - - _rf = regression.binary_rss_forest() - _rf.options = self.opts - - _rf.fit(data, rng=self._rng) - - with self.model_lock: - self._model = _rf - - if not self.model_available.is_set(): - self.model_available.set() - - class RandomForest(AbstractRandomForest): """Random forest that takes instance features into account. From 6a449b9e3e5eb76a64579a222e369c48653161ac Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 11:43:10 +0200 Subject: [PATCH 05/21] delete metadata --- .../random_forest/multiproc_util/__init__.py | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/__init__.py b/smac/model/random_forest/multiproc_util/__init__.py index b5937cb18..56fafa58b 100644 --- a/smac/model/random_forest/multiproc_util/__init__.py +++ b/smac/model/random_forest/multiproc_util/__init__.py @@ -1,22 +1,2 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- - -""" -doc of what is in the file -""" - -__author__ = "Iddo Software " -__copyright__ = "Copyright © 2022, Client & Iddo Software. All Rights Reserved." -__license__ = "Proprietary" -__version__ = "0.1" -__maintainer__ = "Iddo Software " -__email__ = "bogdan@iddo.ro" -__status__ = "Development" # can also be "Prototype" or "Production" - - -def main(): - pass - - -if __name__ == '__main__': - main() From 413e905383af08486dae8715eab126a2000a92b6 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 12:21:17 +0200 Subject: [PATCH 06/21] - refactor: encapsulation of some code in growing array class - work in progress: switch from threads to processes --- .../multiproc_util/GrowingSharedArray.py | 31 +++++++------ .../random_forest/multiproc_util/RFTrainer.py | 43 ++++++++++++++++--- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index 6f4575ea1..15045fe06 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -24,11 +24,26 @@ def __init__(self, lock: Lock): self.shm_X: Optional[shared_memory.SharedMemory] = None self.shm_y: Optional[shared_memory.SharedMemory] = None - def __del__(self): + def open(self, shm_id: int): + if shm_id != self.shm_id: + self.close() + self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') + self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') + self.shm_id = shm_id + + def close(self): if self.shm_X is not None: self.shm_X.close() + del self.shm_X + self.shm_X = None if self.shm_y is not None: self.shm_y.close() + del self.shm_y + self.shm_y = None + self.shm_id = None + + def __del__(self): + self.close() @property def capacity(self) -> Optional[int]: @@ -54,19 +69,7 @@ def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.fl def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: with self.lock: - # single_read_shared_mem() as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: - if shm_id != self.shm_id: - self.shm_X.close() - del self.shm_X - self.shm_X = None - - self.shm_y.close() - del self.shm_y - self.shm_y = None - - self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') - self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') - + self.open(shm_id) shared_X, shared_y = self.np_view(size) X, y = np.array(shared_X), np.array(shared_y) # make copies diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 69807f40e..00a8812c2 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -3,16 +3,49 @@ from typing import TYPE_CHECKING, Optional -from multiprocessing import Process, Lock, Queue +from multiprocessing import Lock, Queue from numpy import typing as npt import numpy as np +from pyrfr.regression import binary_rss_forest if TYPE_CHECKING: from pyrfr.regression import binary_rss_forest as BinaryForest, forest_opts as ForestOpts +from .GrowingSharedArray import GrowingSharedArrayReaderView -class RFTrainer(Process): + +SHUTDOWN = -1 + + +def training_loop_proc(data_queue: Queue, data_lock: Lock): + shared_arrs = GrowingSharedArrayReaderView(data_lock) + while True: + # discard all but the last msg in the queue + msg = None + while True: + m = data_queue.get(block=False) + if m is None: + break + else: + msg = m + + if msg == SHUTDOWN: + break + + shm_id, size = msg + + X, y = shared_arrs.get_data(shm_id, size) + + data = init_data_container(X, y) + + _rf = binary_rss_forest() + _rf.options = opts + + _rf.fit(data, rng=self._rng) + + +class RFTrainer: def __init__(self): self._model: Optional[BinaryForest] = None self.model_lock = Lock() @@ -26,6 +59,7 @@ def __init__(self): @property def model(self): + # discard all but the last model in the queue model = None while True: m = self.model_queue.get(block=False) @@ -61,12 +95,7 @@ def run(self) -> None: # wait for training to finish before receiving a new configuration to try, depending on CPU load; we might # have to replace the Event by a Condition - data = self._init_data_container(X, y) - - _rf = regression.binary_rss_forest() - _rf.options = self.opts - _rf.fit(data, rng=self._rng) with self.model_lock: self._model = _rf From 984c59fc8aa8783c74ba31e1b5e616c1a4615efb Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 16:52:05 +0200 Subject: [PATCH 07/21] - refactor: encapsulation: move training data init where it can be used within background thread - finish first version of switch from threads to processes --- .../random_forest/multiproc_util/RFTrainer.py | 163 +++++++++++------- smac/model/random_forest/random_forest.py | 57 +----- smac/model/random_forest/util.py | 43 +++++ 3 files changed, 148 insertions(+), 115 deletions(-) create mode 100644 smac/model/random_forest/util.py diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 00a8812c2..dd1f7d8fc 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -1,34 +1,55 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from typing import TYPE_CHECKING, Optional +from typing import Iterable, Optional -from multiprocessing import Lock, Queue +from multiprocessing import Lock, Queue, Process +import queue from numpy import typing as npt import numpy as np -from pyrfr.regression import binary_rss_forest +from pyrfr import regression +from pyrfr.regression import binary_rss_forest as BinaryForest -if TYPE_CHECKING: - from pyrfr.regression import binary_rss_forest as BinaryForest, forest_opts as ForestOpts - -from .GrowingSharedArray import GrowingSharedArrayReaderView +from .GrowingSharedArray import GrowingSharedArrayReaderView, GrowingSharedArray +from ..util import init_data_container SHUTDOWN = -1 -def training_loop_proc(data_queue: Queue, data_lock: Lock): +def rf_training_loop( + model_queue: Queue, data_queue: Queue, data_lock: Lock, + # init rf train + bounds: Iterable[tuple[float, float]], seed: int, + # rf opts + n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, + max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int +): + rf_opts = regression.forest_opts() + rf_opts.num_trees = n_trees + rf_opts.do_bootstrapping = bootstrapping + rf_opts.tree_opts.max_features = max_features + rf_opts.tree_opts.min_samples_to_split = min_samples_split + rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf + rf_opts.tree_opts.max_depth = max_depth + rf_opts.tree_opts.epsilon_purity = eps_purity + rf_opts.tree_opts.max_num_nodes = max_nodes + rf_opts.compute_law_of_total_variance = False + if n_points_per_tree > 0: + rf_opts.num_data_points_per_tree = n_points_per_tree + + # Case to `int` incase we get an `np.integer` type + rng = regression.default_random_engine(int(seed)) shared_arrs = GrowingSharedArrayReaderView(data_lock) while True: + msg = data_queue.get() # wait for training data or shutdown signal # discard all but the last msg in the queue - msg = None while True: - m = data_queue.get(block=False) - if m is None: + try: + msg = data_queue.get(block=False) + except queue.Empty: break - else: - msg = m if msg == SHUTDOWN: break @@ -37,68 +58,82 @@ def training_loop_proc(data_queue: Queue, data_lock: Lock): X, y = shared_arrs.get_data(shm_id, size) - data = init_data_container(X, y) + data = init_data_container(X, y, bounds) + + if n_points_per_tree <= 0: + rf_opts.num_data_points_per_tree = len(X) + + rf = BinaryForest() + rf.options = rf_opts - _rf = binary_rss_forest() - _rf.options = opts + rf.fit(data, rng) - _rf.fit(data, rng=self._rng) + # remove previous models from pipe, if any + while True: + m = model_queue.get(block=False) + if m is None: + break + model_queue.put(rf) class RFTrainer: - def __init__(self): + def __init__( + # init rf train + self, bounds: Iterable[tuple[float, float]], seed: int, + # rf opts + n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, + max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int + ) -> None: self._model: Optional[BinaryForest] = None - self.model_lock = Lock() self.model_queue = Queue(maxsize=1) - - self.opts = None self.data_queue = Queue(maxsize=1) - - super().__init__(daemon=True) - self.start() + self.shared_arrs = GrowingSharedArray() + + self.training_loop_proc = Process(daemon=True, target=rf_training_loop, name='rf_trainer', args=( + self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, + max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree + )) + self.training_loop_proc.start() + + super().__init__() + + def close(self): + # I think this might be redundant, since according to the official docs, close and join_thread are called + # anyway when garbage-collecting queues, and we don't use JoinableQueues + if self.data_queue is not None: + if self.training_loop_proc is not None: + self.data_queue.put(SHUTDOWN) + self.data_queue.close() + self.data_queue.join_thread() + del self.data_queue + self.data_queue = None + + if self.training_loop_proc is not None: + # wait for training to finish + self.training_loop_proc.join() + del self.training_loop_proc + self.training_loop_proc = None + + if self.model_queue is not None: + _ = self.model # flush the model queue, and return latest model + self.model_queue.close() + self.model_queue.join_thread() + del self.model_queue + self.model_queue = None + + def __del__(self): + self.close() @property - def model(self): + def model(self) -> Optional[BinaryForest]: # discard all but the last model in the queue - model = None while True: - m = self.model_queue.get(block=False) - if m is None: + try: + self._model = self.model_queue.get(block=False) + except queue.Empty: break - else: - model = m - - with self.model_lock: - if model is not None: - self._model = model - return self._model - - def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], opts: ForestOpts): - # use condition variable to wake up the trainer thread if it's sleeping - with self.data_cv: - assert data is not None - # overwrite with latest training data - self.data = data - self.opts = opts - self.data_cv.notify() - - def run(self) -> None: - while True: - # sleep until new data is submitted for training - with self.data_cv: - while self.data is None: - self.data_cv.wait() - data = self.data - self.data = None - - # here we could (conditionally) call self.model_available.clear() in order to make _some_ worker threads - # wait for training to finish before receiving a new configuration to try, depending on CPU load; we might - # have to replace the Event by a Condition - - - - with self.model_lock: - self._model = _rf + return self._model - if not self.model_available.is_set(): - self.model_available.set() + def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): + self.shared_arrs.set_data(X, y) + self.data_queue.put((self.shared_arrs.shm_id, len(X))) diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index 2e5f527ef..cd82f6ee1 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -6,10 +6,9 @@ from ConfigSpace import ConfigurationSpace from pyrfr import regression -from pyrfr.regression import default_data_container as DataContainer - from smac.constants import N_TREES, VERY_SMALL_NUMBER -from smac.model.random_forest import AbstractRandomForest +from . import AbstractRandomForest +from .multiproc_util.RFTrainer import RFTrainer __copyright__ = "Copyright 2022, automl.org" @@ -76,20 +75,10 @@ def __init__( max_features = 0 if ratio_features > 1.0 else max(1, int(len(self._types) * ratio_features)) - self._rf_opts = regression.forest_opts() - self._rf_opts.num_trees = n_trees - self._rf_opts.do_bootstrapping = bootstrapping - self._rf_opts.tree_opts.max_features = max_features - self._rf_opts.tree_opts.min_samples_to_split = min_samples_split - self._rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf - self._rf_opts.tree_opts.max_depth = max_depth - self._rf_opts.tree_opts.epsilon_purity = eps_purity - self._rf_opts.tree_opts.max_num_nodes = max_nodes - self._rf_opts.compute_law_of_total_variance = False - self._rf = RFTrainer() + self._rf = RFTrainer(self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, + min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree) self._log_y = log_y - # Case to `int` incase we get an `np.integer` type self._rng = regression.default_random_engine(int(seed)) self._n_trees = n_trees @@ -143,12 +132,7 @@ def _train(self, X: np.ndarray, y: np.ndarray) -> RandomForest: # self.X = X # self.y = y.flatten() - if self._n_points_per_tree <= 0: - self._rf_opts.num_data_points_per_tree = X.shape[0] - else: - self._rf_opts.num_data_points_per_tree = self._n_points_per_tree - - self._rf.submit_for_training(X, y, self._rf_opts) + self._rf.submit_for_training(X, y) # call this to make sure that there exists a trained model before returning (actually, not sure this is # required, since we check within predict() anyway) @@ -156,35 +140,6 @@ def _train(self, X: np.ndarray, y: np.ndarray) -> RandomForest: return self - def _init_data_container(self, X: np.ndarray, y: np.ndarray) -> DataContainer: - """Fills a pyrfr default data container s.t. the forest knows categoricals and bounds for continous data. - - Parameters - ---------- - X : np.ndarray [#samples, #hyperparameter + #features] - Input data points. - Y : np.ndarray [#samples, #objectives] - The corresponding target values. - - Returns - ------- - data : DataContainer - The filled data container that pyrfr can interpret. - """ - # Retrieve the types and the bounds from the ConfigSpace - data = regression.default_data_container(X.shape[1]) - - for i, (mn, mx) in enumerate(self._bounds): - if np.isnan(mx): - data.set_type_of_feature(i, mn) - else: - data.set_bounds_of_feature(i, mn, mx) - - for row_X, row_y in zip(X, y): - data.add_data_point(row_X, row_y) - - return data - def _predict( self, X: np.ndarray, @@ -216,7 +171,7 @@ def _predict( third_dimension = max(max_num_leaf_data, third_dimension) # Transform list of 2d arrays into a 3d array - preds_as_array = np.zeros((X.shape[0], self._rf_opts.num_trees, third_dimension)) * np.nan + preds_as_array = np.zeros((X.shape[0], self._n_trees, third_dimension)) * np.nan for i, preds_per_tree in enumerate(all_preds): for j, pred in enumerate(preds_per_tree): preds_as_array[i, j, : len(pred)] = pred diff --git a/smac/model/random_forest/util.py b/smac/model/random_forest/util.py new file mode 100644 index 000000000..56d76753a --- /dev/null +++ b/smac/model/random_forest/util.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Iterable, TYPE_CHECKING + +import numpy as np +from pyrfr import regression + +if TYPE_CHECKING: + from pyrfr.regression import default_data_container as DataContainer + from numpy import typing as npt + + +def init_data_container( + X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], bounds: Iterable[tuple[float, float]] +) -> DataContainer: + """Fills a pyrfr default data container s.t. the forest knows categoricals and bounds for continous data. + + Parameters + ---------- + X : np.ndarray [#samples, #hyperparameter + #features] + Input data points. + Y : np.ndarray [#samples, #objectives] + The corresponding target values. + + Returns + ------- + data : DataContainer + The filled data container that pyrfr can interpret. + """ + # Retrieve the types and the bounds from the ConfigSpace + data = regression.default_data_container(X.shape[1]) + + for i, (mn, mx) in enumerate(bounds): + if np.isnan(mx): + data.set_type_of_feature(i, mn) + else: + data.set_bounds_of_feature(i, mn, mx) + + for row_X, row_y in zip(X, y): + data.add_data_point(row_X, row_y) + + return data From 75eb34edd011cfc403e44d7429cda4feef806a57 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 17:58:11 +0200 Subject: [PATCH 08/21] minor fixes in sync semantics --- .../random_forest/multiproc_util/RFTrainer.py | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index dd1f7d8fc..2cebb7235 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -115,7 +115,7 @@ def close(self): self.training_loop_proc = None if self.model_queue is not None: - _ = self.model # flush the model queue, and return latest model + _ = self.model # try to flush the model queue, and store the latest model self.model_queue.close() self.model_queue.join_thread() del self.model_queue @@ -125,15 +125,34 @@ def __del__(self): self.close() @property - def model(self) -> Optional[BinaryForest]: - # discard all but the last model in the queue - while True: - try: - self._model = self.model_queue.get(block=False) - except queue.Empty: - break + def model(self) -> BinaryForest: + if self._model is None: + if self.model_queue is None: + raise RuntimeError('rf training loop process has been stopped before being able to train a model') + # wait until the first training is done + self._model = self.model_queue.get() + + if self.model_queue is not None: + # discard all but the last model in the queue + while True: + try: + self._model = self.model_queue.get(block=False) + except queue.Empty: + break return self._model def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): self.shared_arrs.set_data(X, y) + + if self.data_queue is None: + raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data') + + # flush queue before pushing new data onto it + while True: + try: + old_data = self.data_queue.get(block=False) + except queue.Empty: + break + else: + assert old_data != SHUTDOWN self.data_queue.put((self.shared_arrs.shm_id, len(X))) From ed506248b834b5ae5e62e59e49ad4e5e94c03862 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 19:05:47 +0200 Subject: [PATCH 09/21] minor fixes in sync --- .../random_forest/multiproc_util/RFTrainer.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 2cebb7235..99467bd12 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -42,8 +42,9 @@ def rf_training_loop( # Case to `int` incase we get an `np.integer` type rng = regression.default_random_engine(int(seed)) shared_arrs = GrowingSharedArrayReaderView(data_lock) + while True: - msg = data_queue.get() # wait for training data or shutdown signal + msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal # discard all but the last msg in the queue while True: try: @@ -68,11 +69,13 @@ def rf_training_loop( rf.fit(data, rng) - # remove previous models from pipe, if any + # remove previous models from queue, if any, and replace them with the latest while True: - m = model_queue.get(block=False) - if m is None: + try: + old_rf = model_queue.get(block=False) + except queue.Empty: break + model_queue.put(rf) @@ -85,10 +88,10 @@ def __init__( max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int ) -> None: self._model: Optional[BinaryForest] = None - self.model_queue = Queue(maxsize=1) - self.data_queue = Queue(maxsize=1) self.shared_arrs = GrowingSharedArray() + self.model_queue = Queue(maxsize=1) + self.data_queue = Queue(maxsize=1) self.training_loop_proc = Process(daemon=True, target=rf_training_loop, name='rf_trainer', args=( self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree From 65edb8d9e2715502417c57e27c893e0a0ede196b Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Thu, 12 Dec 2024 08:38:26 +0200 Subject: [PATCH 10/21] add flag to allow user switch to emulating the old behavior, i.e., to wait until training is done before being able to query the model to suggest a new config to try --- .../random_forest/multiproc_util/RFTrainer.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 99467bd12..5463a2434 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -15,7 +15,7 @@ from ..util import init_data_container -SHUTDOWN = -1 +SHUTDOWN = None def rf_training_loop( @@ -25,7 +25,7 @@ def rf_training_loop( # rf opts n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int -): +) -> None: rf_opts = regression.forest_opts() rf_opts.num_trees = n_trees rf_opts.do_bootstrapping = bootstrapping @@ -51,14 +51,12 @@ def rf_training_loop( msg = data_queue.get(block=False) except queue.Empty: break - - if msg == SHUTDOWN: - break + else: + if msg == SHUTDOWN: + return shm_id, size = msg - X, y = shared_arrs.get_data(shm_id, size) - data = init_data_container(X, y, bounds) if n_points_per_tree <= 0: @@ -66,16 +64,14 @@ def rf_training_loop( rf = BinaryForest() rf.options = rf_opts - rf.fit(data, rng) - # remove previous models from queue, if any, and replace them with the latest + # remove previous models from queue, if any, before pushing the latest model while True: try: - old_rf = model_queue.get(block=False) + _ = model_queue.get(block=False) except queue.Empty: break - model_queue.put(rf) @@ -85,10 +81,13 @@ def __init__( self, bounds: Iterable[tuple[float, float]], seed: int, # rf opts n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, - max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int + max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int, + # process synchronization + sync: bool = False ) -> None: self._model: Optional[BinaryForest] = None self.shared_arrs = GrowingSharedArray() + self.sync = sync self.model_queue = Queue(maxsize=1) self.data_queue = Queue(maxsize=1) @@ -159,3 +158,6 @@ def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.floa else: assert old_data != SHUTDOWN self.data_queue.put((self.shared_arrs.shm_id, len(X))) + + if self.sync: + self._model = self.model_queue.get() From 818ffd0e35b56a09932790dd7abca7bb3f7fa4c4 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Thu, 12 Dec 2024 12:56:10 +0200 Subject: [PATCH 11/21] - fix bugs in arrays backed by shared memory - better synchronization / signalling between optimization loop and training loop - refactor: - improve shared array semantics - encapsulation: reuse more allocation / cleanup code - defensive: extra checks - other minor fixes / improvements --- .../multiproc_util/GrowingSharedArray.py | 80 ++++++++++++------- .../random_forest/multiproc_util/RFTrainer.py | 80 ++++++++++++++----- smac/model/random_forest/random_forest.py | 3 + smac/model/random_forest/util.py | 2 + tests/test_model/test_rf.py | 2 +- 5 files changed, 118 insertions(+), 49 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index 15045fe06..6d86034a9 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -23,55 +23,68 @@ def __init__(self, lock: Lock): self.shm_id: Optional[int] = None self.shm_X: Optional[shared_memory.SharedMemory] = None self.shm_y: Optional[shared_memory.SharedMemory] = None + self.size: Optional[int] = None - def open(self, shm_id: int): + def open(self, shm_id: int, size: int): if shm_id != self.shm_id: self.close() self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') self.shm_id = shm_id + self.size = size - def close(self): + def close_impl(self, unlink=False): if self.shm_X is not None: self.shm_X.close() + if unlink: + self.shm_X.unlink() del self.shm_X self.shm_X = None if self.shm_y is not None: self.shm_y.close() + if unlink: + self.shm_y.unlink() del self.shm_y self.shm_y = None self.shm_id = None + self.size = None + + def close(self): + self.close_impl() def __del__(self): self.close() @property - def capacity(self) -> Optional[int]: + def capacity(self) -> int: if self.shm_y is None: - return None - assert self.shm_y.size % np.float64.itemsize == 0 - return self.shm_y.size / np.float64.itemsize + return 0 + assert self.shm_y.size % np.dtype(np.float64).itemsize == 0 + return self.shm_y.size // np.dtype(np.float64).itemsize @property def row_size(self) -> Optional[int]: if self.shm_X is None: return None if self.shm_X.size == 0: - assert self.shm_y.size == 0 - return 0 + return None assert self.shm_X.size % self.shm_y.size == 0 return self.shm_X.size // self.shm_y.size - def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: + @property + def X(self): X = np.ndarray(shape=(self.capacity, self.row_size), dtype=np.float64, buffer=self.shm_X.buf) + return X[:self.size] + + @property + def y(self): y = np.ndarray(shape=(self.capacity,), dtype=np.float64, buffer=self.shm_y.buf) - return X[:size], y[:size] + return y[:self.size] def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: with self.lock: - self.open(shm_id) - shared_X, shared_y = self.np_view(size) - X, y = np.array(shared_X), np.array(shared_y) # make copies + self.open(shm_id, size) + X, y = np.array(self.X), np.array(self.y) # make copies and release lock to minimize critical section return X, y @@ -81,6 +94,12 @@ def __init__(self): self.growth_rate = 1.5 super().__init__(lock=Lock()) + def close(self): + self.close_impl(unlink=True) + + def __del__(self): + self.close() + def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> None: assert len(X) == len(y) assert X.ndim == 2 @@ -96,33 +115,34 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No if self.capacity: n_growth = math.ceil(math.log(size / self.capacity, self.growth_rate)) capacity = int(math.ceil(self.capacity * self.growth_rate ** n_growth)) - self.shm_id += 1 + shm_id = self.shm_id + 1 else: assert self.shm_X is None assert self.shm_y is None capacity = size - self.shm_id = 0 + shm_id = 0 + row_size = X.shape[1] if self.row_size is not None: - assert X.shape[1] == self.row_size - - shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{self.shm_id}', create=True, - size=capacity * self.row_size * X.dtype.itemsize) - shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{self.shm_id}', create=True, + assert row_size == self.row_size + shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}', create=True, + size=capacity * row_size * X.dtype.itemsize) + shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}', create=True, size=capacity * y.dtype.itemsize) with self.lock: if grow: if self.capacity: - assert self.shm_X is not None - self.shm_X.close() - self.shm_X.unlink() - assert self.shm_y is not None - self.shm_y.close() - self.shm_y.unlink() + # TODO: here before rallocating we unlink the underlying shared memory without making sure that the + # training loop process has had a chance to close it first, so this might lead to some warnings + # references: + # - https://stackoverflow.com/a/63004750/2447427 + # - https://github.com/python/cpython/issues/84140 + # - https://github.com/python/cpython/issues/82300 - provides a fix that turns off tracking + self.close() self.shm_X = shm_X self.shm_y = shm_y - X_buf, y_buf = self.np_view(size) - X_buf[...] = X - y_buf[...] = y - + self.shm_id = shm_id + self.size = size + self.X[...] = X + self.y[...] = y diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 5463a2434..c4c10c634 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -45,6 +45,8 @@ def rf_training_loop( while True: msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal + must_shutdown = msg == SHUTDOWN + # discard all but the last msg in the queue while True: try: @@ -52,11 +54,15 @@ def rf_training_loop( except queue.Empty: break else: - if msg == SHUTDOWN: - return + must_shutdown |= msg == SHUTDOWN + if must_shutdown: + shared_arrs.close() + model_queue.put(SHUTDOWN) + break shm_id, size = msg X, y = shared_arrs.get_data(shm_id, size) + # TODO: when shm_id changes, notify main thread it can call unlink the shared memory bc we called close() on it data = init_data_container(X, y, bounds) if n_points_per_tree <= 0: @@ -76,19 +82,34 @@ def rf_training_loop( class RFTrainer: - def __init__( - # init rf train - self, bounds: Iterable[tuple[float, float]], seed: int, - # rf opts - n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, - max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int, - # process synchronization - sync: bool = False - ) -> None: - self._model: Optional[BinaryForest] = None - self.shared_arrs = GrowingSharedArray() + def __init__(self, + # init rf train + bounds: Iterable[tuple[float, float]], seed: int, + # rf opts + n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, + max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int, + # process synchronization + sync: bool = False) -> None: self.sync = sync + self._model: Optional[BinaryForest] = None + self.shared_arrs: Optional[GrowingSharedArray] = None + self.model_queue: Optional[Queue] = None + self.data_queue: Optional[Queue] = None + self.training_loop_proc: Optional[Process] = None + + self.open(bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, + eps_purity, max_nodes, n_points_per_tree) + + super().__init__() + + def open(self, + # init rf train + bounds: Iterable[tuple[float, float]], seed: int, + # rf opts + n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, + max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> None: + self.shared_arrs = GrowingSharedArray() self.model_queue = Queue(maxsize=1) self.data_queue = Queue(maxsize=1) self.training_loop_proc = Process(daemon=True, target=rf_training_loop, name='rf_trainer', args=( @@ -97,8 +118,6 @@ def __init__( )) self.training_loop_proc.start() - super().__init__() - def close(self): # I think this might be redundant, since according to the official docs, close and join_thread are called # anyway when garbage-collecting queues, and we don't use JoinableQueues @@ -117,12 +136,28 @@ def close(self): self.training_loop_proc = None if self.model_queue is not None: - _ = self.model # try to flush the model queue, and store the latest model + # flush the model queue, and store the latest model + while True: + msg = self.model_queue.get() + # wait for SHUTDOWN message, because that guarantees that shared_arrs.close() has been called within + # the training process; this way we make sure we call unlink only after close has had the chance to be + # called within the child process + if msg == SHUTDOWN: + break + else: + self._model = msg self.model_queue.close() self.model_queue.join_thread() del self.model_queue self.model_queue = None + # make sure this is called after SHUTDOWN was received because we want the trainer process to call + # shared_arrs.close() before we call unlink + if self.shared_arrs is not None: + self.shared_arrs.close() + del self.shared_arrs + self.shared_arrs = None + def __del__(self): self.close() @@ -132,15 +167,24 @@ def model(self) -> BinaryForest: if self.model_queue is None: raise RuntimeError('rf training loop process has been stopped before being able to train a model') # wait until the first training is done - self._model = self.model_queue.get() + msg = self.model_queue.get() + if msg == SHUTDOWN: + raise RuntimeError("the shutdown message wasn't supposed to end up here") + else: + self._model = msg if self.model_queue is not None: # discard all but the last model in the queue while True: try: - self._model = self.model_queue.get(block=False) + msg = self.model_queue.get(block=False) except queue.Empty: break + else: + if msg == SHUTDOWN: + raise RuntimeError("the shutdown message wasn't supposed to end up here") + else: + self._model = msg return self._model def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index cd82f6ee1..ce1bd950a 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -105,6 +105,9 @@ def __init__( # self._seed, # ] + def __del__(self): + self._rf.close() + @property def meta(self) -> dict[str, Any]: # noqa: D102 meta = super().meta diff --git a/smac/model/random_forest/util.py b/smac/model/random_forest/util.py index 56d76753a..973445a3b 100644 --- a/smac/model/random_forest/util.py +++ b/smac/model/random_forest/util.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +from __future__ import annotations + from typing import Iterable, TYPE_CHECKING import numpy as np diff --git a/tests/test_model/test_rf.py b/tests/test_model/test_rf.py index c81549a16..8fd80e5d2 100644 --- a/tests/test_model/test_rf.py +++ b/tests/test_model/test_rf.py @@ -19,7 +19,7 @@ def _get_cs(n_dimensions): configspace = ConfigurationSpace(seed=0) for i in range(n_dimensions): - configspace.add_hyperparameter(UniformFloatHyperparameter("x%d" % i, 0, 1)) + configspace.add(UniformFloatHyperparameter("x%d" % i, 0, 1)) return configspace From cc89e1646bb65f57eb96237dcdf770f49ee43eac Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Thu, 12 Dec 2024 13:07:20 +0200 Subject: [PATCH 12/21] disable resource tracking for shared memory instances to avoid warnings --- .../multiproc_util/GrowingSharedArray.py | 27 +++++----- .../random_forest/multiproc_util/RFTrainer.py | 4 +- .../multiproc_util/SharedMemory.py | 52 +++++++++++++++++++ smac/model/random_forest/random_forest.py | 12 ++--- 4 files changed, 76 insertions(+), 19 deletions(-) create mode 100644 smac/model/random_forest/multiproc_util/SharedMemory.py diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index 6d86034a9..afcaec2d4 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -4,7 +4,8 @@ from typing import Optional import math -from multiprocessing import Lock, shared_memory +from multiprocessing import Lock +from .SharedMemory import SharedMemory import numpy as np from numpy import typing as npt @@ -21,15 +22,15 @@ class GrowingSharedArrayReaderView: def __init__(self, lock: Lock): self.lock = lock self.shm_id: Optional[int] = None - self.shm_X: Optional[shared_memory.SharedMemory] = None - self.shm_y: Optional[shared_memory.SharedMemory] = None + self.shm_X: Optional[SharedMemory] = None + self.shm_y: Optional[SharedMemory] = None self.size: Optional[int] = None def open(self, shm_id: int, size: int): if shm_id != self.shm_id: self.close() - self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') - self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') + self.shm_X = SharedMemory(f'{self.basename_X}_{shm_id}', track=False) + self.shm_y = SharedMemory(f'{self.basename_y}_{shm_id}', track=False) self.shm_id = shm_id self.size = size @@ -125,20 +126,22 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No row_size = X.shape[1] if self.row_size is not None: assert row_size == self.row_size - shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}', create=True, - size=capacity * row_size * X.dtype.itemsize) - shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}', create=True, - size=capacity * y.dtype.itemsize) + shm_X = SharedMemory(f'{self.basename_X}_{shm_id}', create=True, + size=capacity * row_size * X.dtype.itemsize, track=False) + shm_y = SharedMemory(f'{self.basename_y}_{shm_id}', create=True, size=capacity * y.dtype.itemsize, + track=False) with self.lock: if grow: if self.capacity: - # TODO: here before rallocating we unlink the underlying shared memory without making sure that the - # training loop process has had a chance to close it first, so this might lead to some warnings + # here, before, reallocating we unlink the underlying shared memory without making sure that the + # training loop process has had a chance to close() it first, so this might lead to some warnings # references: # - https://stackoverflow.com/a/63004750/2447427 # - https://github.com/python/cpython/issues/84140 - # - https://github.com/python/cpython/issues/82300 - provides a fix that turns off tracking + # - https://github.com/python/cpython/issues/82300 + # - comment provides a fix that turns off tracking: + # https://github.com/python/cpython/issues/82300#issuecomment-2169035092 self.close() self.shm_X = shm_X self.shm_y = shm_y diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index c4c10c634..2b79eced2 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -62,7 +62,9 @@ def rf_training_loop( shm_id, size = msg X, y = shared_arrs.get_data(shm_id, size) - # TODO: when shm_id changes, notify main thread it can call unlink the shared memory bc we called close() on it + # when shm_id changes, here we should notify main thread it can call unlink the shared memory bc we called + # close() on it + # UPDATE: we avoided the warnings by disabling tracking for shared memory data = init_data_container(X, y, bounds) if n_points_per_tree <= 0: diff --git a/smac/model/random_forest/multiproc_util/SharedMemory.py b/smac/model/random_forest/multiproc_util/SharedMemory.py new file mode 100644 index 000000000..6420a83ac --- /dev/null +++ b/smac/model/random_forest/multiproc_util/SharedMemory.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +"""https://github.com/python/cpython/issues/82300#issuecomment-2169035092 +""" + +from typing import Optional +import sys +import threading +from multiprocessing import resource_tracker as _mprt +from multiprocessing import shared_memory as _mpshm + + +if sys.version_info >= (3, 13): + SharedMemory = _mpshm.SharedMemory +else: + class SharedMemory(_mpshm.SharedMemory): + __lock = threading.Lock() + + def __init__( + self, name: Optional[str] = None, create: bool = False, + size: int = 0, *, track: bool = True + ) -> None: + self._track = track + + # if tracking, normal init will suffice + if track: + return super().__init__(name=name, create=create, size=size) + + # lock so that other threads don't attempt to use the + # register function during this time + with self.__lock: + # temporarily disable registration during initialization + orig_register = _mprt.register + _mprt.register = self.__tmp_register + + # initialize; ensure original register function is + # re-instated + try: + super().__init__(name=name, create=create, size=size) + finally: + _mprt.register = orig_register + + @staticmethod + def __tmp_register(*args, **kwargs) -> None: + return + + def unlink(self) -> None: + if _mpshm._USE_POSIX and self._name: + _mpshm._posixshmem.shm_unlink(self._name) + if self._track: + _mprt.unregister(self._name, "shared_memory") \ No newline at end of file diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index ce1bd950a..5b5834bfb 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -75,8 +75,8 @@ def __init__( max_features = 0 if ratio_features > 1.0 else max(1, int(len(self._types) * ratio_features)) - self._rf = RFTrainer(self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, - min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree) + self._rf_trainer = RFTrainer(self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, + min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree) self._log_y = log_y self._rng = regression.default_random_engine(int(seed)) @@ -106,7 +106,7 @@ def __init__( # ] def __del__(self): - self._rf.close() + self._rf_trainer.close() @property def meta(self) -> dict[str, Any]: # noqa: D102 @@ -135,7 +135,7 @@ def _train(self, X: np.ndarray, y: np.ndarray) -> RandomForest: # self.X = X # self.y = y.flatten() - self._rf.submit_for_training(X, y) + self._rf_trainer.submit_for_training(X, y) # call this to make sure that there exists a trained model before returning (actually, not sure this is # required, since we check within predict() anyway) @@ -157,7 +157,7 @@ def _predict( if covariance_type != "diagonal": raise ValueError("`covariance_type` can only take `diagonal` for this model.") - rf = self._rf.model + rf = self._rf_trainer.model assert rf is not None X = self._impute_inactive(X) @@ -234,7 +234,7 @@ def predict_marginalized(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]: if X.shape[1] != len(self._bounds): raise ValueError("Rows in X should have %d entries but have %d!" % (len(self._bounds), X.shape[1])) - rf = self._rf.model + rf = self._rf_trainer.model assert rf is not None X = self._impute_inactive(X) From 880e0cc3f714a25473640781b1cb5935e88932e1 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Thu, 12 Dec 2024 13:55:56 +0200 Subject: [PATCH 13/21] use uuids instead of incremental ids to avoid collisions when running multiple sessions in parallel, e.g., when running tests --- .../random_forest/multiproc_util/GrowingSharedArray.py | 6 +++--- smac/model/random_forest/multiproc_util/RFTrainer.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index afcaec2d4..92cf1b51e 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- - +import uuid from typing import Optional import math @@ -116,12 +116,12 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No if self.capacity: n_growth = math.ceil(math.log(size / self.capacity, self.growth_rate)) capacity = int(math.ceil(self.capacity * self.growth_rate ** n_growth)) - shm_id = self.shm_id + 1 else: assert self.shm_X is None assert self.shm_y is None capacity = size - shm_id = 0 + + shm_id = uuid.uuid4().int row_size = X.shape[1] if self.row_size is not None: diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 2b79eced2..074cc9efd 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -54,7 +54,7 @@ def rf_training_loop( except queue.Empty: break else: - must_shutdown |= msg == SHUTDOWN + must_shutdown = must_shutdown or msg == SHUTDOWN if must_shutdown: shared_arrs.close() model_queue.put(SHUTDOWN) @@ -133,7 +133,7 @@ def close(self): if self.training_loop_proc is not None: # wait for training to finish - self.training_loop_proc.join() + self.training_loop_proc.join() # TODO: fix: this happens to hang del self.training_loop_proc self.training_loop_proc = None From 8458d416c6323eff9668527d5d21b9084ca7d084 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Thu, 12 Dec 2024 16:49:37 +0200 Subject: [PATCH 14/21] minor aesthetic refactors --- .../multiproc_util/GrowingSharedArray.py | 18 +++++++++++------- .../multiproc_util/SharedMemory.py | 3 ++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index 92cf1b51e..1f378829d 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -5,7 +5,12 @@ import math from multiprocessing import Lock -from .SharedMemory import SharedMemory + +# from multiprocessing.shared_memory import SharedMemory +from .SharedMemory import SharedMemory as TrackableSharedMemory +def SharedMemory(*args, **kwargs) -> TrackableSharedMemory: + return TrackableSharedMemory(*args, track=False, **kwargs) + import numpy as np from numpy import typing as npt @@ -29,8 +34,8 @@ def __init__(self, lock: Lock): def open(self, shm_id: int, size: int): if shm_id != self.shm_id: self.close() - self.shm_X = SharedMemory(f'{self.basename_X}_{shm_id}', track=False) - self.shm_y = SharedMemory(f'{self.basename_y}_{shm_id}', track=False) + self.shm_X = SharedMemory(f'{self.basename_X}_{shm_id}') + self.shm_y = SharedMemory(f'{self.basename_y}_{shm_id}') self.shm_id = shm_id self.size = size @@ -121,15 +126,14 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No assert self.shm_y is None capacity = size - shm_id = uuid.uuid4().int + shm_id = uuid.uuid4().int # self.shm_id + 1 if self.shm_id else 0 row_size = X.shape[1] if self.row_size is not None: assert row_size == self.row_size shm_X = SharedMemory(f'{self.basename_X}_{shm_id}', create=True, - size=capacity * row_size * X.dtype.itemsize, track=False) - shm_y = SharedMemory(f'{self.basename_y}_{shm_id}', create=True, size=capacity * y.dtype.itemsize, - track=False) + size=capacity * row_size * X.dtype.itemsize) + shm_y = SharedMemory(f'{self.basename_y}_{shm_id}', create=True, size=capacity * y.dtype.itemsize) with self.lock: if grow: diff --git a/smac/model/random_forest/multiproc_util/SharedMemory.py b/smac/model/random_forest/multiproc_util/SharedMemory.py index 6420a83ac..31dc5b194 100644 --- a/smac/model/random_forest/multiproc_util/SharedMemory.py +++ b/smac/model/random_forest/multiproc_util/SharedMemory.py @@ -25,7 +25,8 @@ def __init__( # if tracking, normal init will suffice if track: - return super().__init__(name=name, create=create, size=size) + super().__init__(name=name, create=create, size=size) + return # lock so that other threads don't attempt to use the # register function during this time From ae9b821b4afd54957eab91a92ba8780604e61bfd Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Thu, 12 Dec 2024 16:51:03 +0200 Subject: [PATCH 15/21] more minor aesthetic refactors --- tests/test_acquisition/test_maximizers.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/test_acquisition/test_maximizers.py b/tests/test_acquisition/test_maximizers.py index d7698e0a2..6407540db 100644 --- a/tests/test_acquisition/test_maximizers.py +++ b/tests/test_acquisition/test_maximizers.py @@ -1,21 +1,14 @@ from __future__ import annotations -from typing import Any - -import os import unittest import unittest.mock import numpy as np import pytest from ConfigSpace import ( - Categorical, Configuration, ConfigurationSpace, - EqualsCondition, Float, - InCondition, - Integer, ) from ConfigSpace.hyperparameters import ( BetaIntegerHyperparameter, @@ -24,7 +17,6 @@ UniformFloatHyperparameter, UniformIntegerHyperparameter, ) -from ConfigSpace.read_and_write import pcs from scipy.spatial.distance import euclidean from smac.acquisition.function import EI @@ -35,8 +27,6 @@ RandomSearch, ) from smac.model.random_forest.random_forest import RandomForest -from smac.runhistory.runhistory import RunHistory -from smac.runner.abstract_runner import StatusType __copyright__ = "Copyright 2021, AutoML.org Freiburg-Hannover" __license__ = "3-clause BSD" From f05d0bf95fc1ef17659fef5b1dbe4e583f4fbbdb Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Fri, 13 Dec 2024 10:21:34 +0200 Subject: [PATCH 16/21] - add cleanup semantics in hierarchy of objects containing random forests to terminate the training loop process gracefully (to as high as an extent as possible) - add (and then disable) some code that prints to console to help debug inter-process synchronization - refactor: renames for improved legibility - refactor: encapsulate and reuse - add option to run testing code without pytest for debug - modify some testing code to avoid deprecation warnings --- .../function/abstract_acquisition_function.py | 7 ++ .../abstract_acqusition_maximizer.py | 7 ++ smac/facade/abstract_facade.py | 13 +++ smac/intensifier/abstract_intensifier.py | 7 ++ smac/main/config_selector.py | 11 +++ smac/model/abstract_model.py | 6 ++ .../multiproc_util/GrowingSharedArray.py | 6 +- .../random_forest/multiproc_util/RFTrainer.py | 88 ++++++++++++------ smac/model/random_forest/random_forest.py | 5 +- tests/test_acquisition/test_maximizers.py | 90 +++++++++++++++++-- .../test_ask_and_tell_intensifier.py | 28 +++++- 11 files changed, 228 insertions(+), 40 deletions(-) diff --git a/smac/acquisition/function/abstract_acquisition_function.py b/smac/acquisition/function/abstract_acquisition_function.py index 519f5b3d0..a0fb3533f 100644 --- a/smac/acquisition/function/abstract_acquisition_function.py +++ b/smac/acquisition/function/abstract_acquisition_function.py @@ -23,6 +23,13 @@ class AbstractAcquisitionFunction: def __init__(self) -> None: self._model: AbstractModel | None = None + def close(self): + if self._model: + self._model.close() + + def __del__(self): + self.close() + @property def name(self) -> str: """Returns the full name of the acquisition function.""" diff --git a/smac/acquisition/maximizer/abstract_acqusition_maximizer.py b/smac/acquisition/maximizer/abstract_acqusition_maximizer.py index e148cb6ed..4892e021c 100644 --- a/smac/acquisition/maximizer/abstract_acqusition_maximizer.py +++ b/smac/acquisition/maximizer/abstract_acqusition_maximizer.py @@ -46,6 +46,13 @@ def __init__( self._seed = seed self._rng = np.random.RandomState(seed=seed) + def close(self): + if self.acquisition_function: + self.acquisition_function.close() + + def __del__(self): + self.close() + @property def acquisition_function(self) -> AbstractAcquisitionFunction | None: """The acquisition function used for maximization.""" diff --git a/smac/facade/abstract_facade.py b/smac/facade/abstract_facade.py index 9a2031099..e0e4a7530 100644 --- a/smac/facade/abstract_facade.py +++ b/smac/facade/abstract_facade.py @@ -230,6 +230,19 @@ def __init__( # every time new information are available self._optimizer.register_callback(self._intensifier.get_callback(), index=0) + def close(self): + if self._model: + self._model.close() + if self._acquisition_function: + self._acquisition_function.close() + if self._acquisition_maximizer: + self._acquisition_maximizer.close() + if self._config_selector: + self._config_selector.close() + + def __del__(self): + self.close() + @property def scenario(self) -> Scenario: """The scenario object which holds all environment information.""" diff --git a/smac/intensifier/abstract_intensifier.py b/smac/intensifier/abstract_intensifier.py index 41c80c22f..5eed76d66 100644 --- a/smac/intensifier/abstract_intensifier.py +++ b/smac/intensifier/abstract_intensifier.py @@ -80,6 +80,13 @@ def __init__( # Reset everything self.reset() + def close(self): + if self._config_selector: + self._config_selector.close() + + def __del__(self): + self.close() + def reset(self) -> None: """Reset the internal variables of the intensifier.""" self._tf_seeds: list[int] = [] diff --git a/smac/main/config_selector.py b/smac/main/config_selector.py index 4e3574d58..ea688ace7 100644 --- a/smac/main/config_selector.py +++ b/smac/main/config_selector.py @@ -82,6 +82,17 @@ def __init__( # Processed configurations should be stored here; this is important to not return the same configuration twice self._processed_configs: list[Configuration] = [] + def close(self): + if self._model: + self._model.close() + if self._acquisition_maximizer: + self._acquisition_maximizer.close() + if self._acquisition_function: + self._acquisition_function.close() + + def __del__(self): + self.close() + def _set_components( self, initial_design: AbstractInitialDesign, diff --git a/smac/model/abstract_model.py b/smac/model/abstract_model.py index 80a7312c4..f6106a202 100644 --- a/smac/model/abstract_model.py +++ b/smac/model/abstract_model.py @@ -82,6 +82,12 @@ def __init__( # Initial types array which is used to reset the type array at every call to `self.train()` self._initial_types = copy.deepcopy(self._types) + def close(self): + pass + + def __del__(self): + self.close() + @property def meta(self) -> dict[str, Any]: """Returns the meta data of the created object.""" diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index 1f378829d..e060c64f6 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -7,9 +7,9 @@ from multiprocessing import Lock # from multiprocessing.shared_memory import SharedMemory -from .SharedMemory import SharedMemory as TrackableSharedMemory -def SharedMemory(*args, **kwargs) -> TrackableSharedMemory: - return TrackableSharedMemory(*args, track=False, **kwargs) +from .SharedMemory import SharedMemory as UntrackableSharedMemory +def SharedMemory(*args, **kwargs) -> UntrackableSharedMemory: + return UntrackableSharedMemory(*args, track=False, **kwargs) import numpy as np diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 074cc9efd..76f355802 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from typing import Iterable, Optional +from typing import Iterable, Optional, Union from multiprocessing import Lock, Queue, Process import queue @@ -44,8 +44,12 @@ def rf_training_loop( shared_arrs = GrowingSharedArrayReaderView(data_lock) while True: + # print('TRAINER WAIT MSG', flush=True) msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal + # print(f'TRAINER GOT MSG: {msg}', flush=True) must_shutdown = msg == SHUTDOWN + # if must_shutdown: + # print(f'TRAINER GOT SHUTDOWN 1', flush=True) # discard all but the last msg in the queue while True: @@ -54,10 +58,18 @@ def rf_training_loop( except queue.Empty: break else: + # if msg == SHUTDOWN: + # print(f'TRAINER GOT SHUTDOWN 2', flush=True) must_shutdown = must_shutdown or msg == SHUTDOWN if must_shutdown: shared_arrs.close() + # TODO: empty queue before pushing SHUTDOWN + # print(f'TRAINER SENDS SHUTDOWN CONFIRMATION', flush=True) model_queue.put(SHUTDOWN) + # print(f'TRAINER FINISHED SEND SHUTDOWN CONFIRMATION', flush=True) + model_queue.close() + # model_queue.join_thread() # TODO: enable this again + # print(f'TRAINER BYE BYE', flush=True) break shm_id, size = msg @@ -74,13 +86,17 @@ def rf_training_loop( rf.options = rf_opts rf.fit(data, rng) + # print(f'TRAINER FINISHED TRAINING', flush=True) + # remove previous models from queue, if any, before pushing the latest model while True: try: _ = model_queue.get(block=False) except queue.Empty: break + # print(f'TRAINER SENDING MODEL', flush=True) model_queue.put(rf) + # print(f'TRAINER SENDING MODEL DONE', flush=True) class RFTrainer: @@ -114,33 +130,33 @@ def open(self, self.shared_arrs = GrowingSharedArray() self.model_queue = Queue(maxsize=1) self.data_queue = Queue(maxsize=1) - self.training_loop_proc = Process(daemon=True, target=rf_training_loop, name='rf_trainer', args=( - self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, - max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree - )) + self.training_loop_proc = Process( + target=rf_training_loop, daemon=True, name='rf_trainer', + args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, + max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, + n_points_per_tree) + ) self.training_loop_proc.start() def close(self): - # I think this might be redundant, since according to the official docs, close and join_thread are called - # anyway when garbage-collecting queues, and we don't use JoinableQueues + # send kill signal to training process if self.data_queue is not None: if self.training_loop_proc is not None: - self.data_queue.put(SHUTDOWN) + # print('MAIN SEND SHUTDOWN', flush=True) + self.send_to_training_loop_proc(SHUTDOWN) + # print('MAIN FINISHED SEND SHUTDOWN', flush=True) + # make sure the shutdown message is flush before moving on self.data_queue.close() self.data_queue.join_thread() - del self.data_queue - self.data_queue = None - - if self.training_loop_proc is not None: - # wait for training to finish - self.training_loop_proc.join() # TODO: fix: this happens to hang - del self.training_loop_proc - self.training_loop_proc = None - if self.model_queue is not None: + # wait till the training process died + if self.model_queue is not None and self.training_loop_proc is not None and self.training_loop_proc.is_alive(): # flush the model queue, and store the latest model while True: + # print('MAIN WAIT SHUTDOWN CONFIRM', flush=True) msg = self.model_queue.get() + # print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else msg} ' + # f'AFTER WAITING FOR SHUTDOWN CONFIRMATION', flush=True) # wait for SHUTDOWN message, because that guarantees that shared_arrs.close() has been called within # the training process; this way we make sure we call unlink only after close has had the chance to be # called within the child process @@ -148,8 +164,23 @@ def close(self): break else: self._model = msg - self.model_queue.close() - self.model_queue.join_thread() + + if self.training_loop_proc is not None: + # wait for training to finish + if self.training_loop_proc.is_alive(): + self.training_loop_proc.join() + del self.training_loop_proc + self.training_loop_proc = None + + # I think this might be redundant, since according to the official docs, close and join_thread are called + # anyway when garbage-collecting queues, and we don't use JoinableQueues + if self.data_queue is not None: + del self.data_queue + self.data_queue = None + + if self.model_queue is not None: + # self.model_queue.close() + # self.model_queue.join_thread() del self.model_queue self.model_queue = None @@ -189,13 +220,8 @@ def model(self) -> BinaryForest: self._model = msg return self._model - def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): - self.shared_arrs.set_data(X, y) - - if self.data_queue is None: - raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data') - - # flush queue before pushing new data onto it + def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]): + # empty queue before pushing new data onto it while True: try: old_data = self.data_queue.get(block=False) @@ -203,7 +229,15 @@ def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.floa break else: assert old_data != SHUTDOWN - self.data_queue.put((self.shared_arrs.shm_id, len(X))) + self.data_queue.put(data_info) + + def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): + self.shared_arrs.set_data(X, y) + + if self.data_queue is None: + raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data') + + self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) if self.sync: self._model = self.model_queue.get() diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index 5b5834bfb..4049ffd1a 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -105,9 +105,12 @@ def __init__( # self._seed, # ] - def __del__(self): + def close(self): self._rf_trainer.close() + def __del__(self): + self.close() + @property def meta(self) -> dict[str, Any]: # noqa: D102 meta = super().meta diff --git a/tests/test_acquisition/test_maximizers.py b/tests/test_acquisition/test_maximizers.py index 6407540db..9fbd89892 100644 --- a/tests/test_acquisition/test_maximizers.py +++ b/tests/test_acquisition/test_maximizers.py @@ -175,9 +175,7 @@ def test_get_next_by_random_search(): # TestLocalSearch # -------------------------------------------------------------- - -@pytest.fixture -def configspace() -> ConfigurationSpace: +def get_configspace() -> ConfigurationSpace: cs = ConfigurationSpace(seed=0) a = Float("a", (0, 1), default=0.5) @@ -191,25 +189,42 @@ def configspace() -> ConfigurationSpace: @pytest.fixture -def model(configspace: ConfigurationSpace): +def configspace() -> ConfigurationSpace: + return get_configspace() + + +def get_model(configspace: ConfigurationSpace) -> RandomForest: model = RandomForest(configspace) np.random.seed(0) - X = np.random.rand(100, len(configspace.get_hyperparameters())) - y = 1 - (np.sum(X, axis=1) / len(configspace.get_hyperparameters())) + X = np.random.rand(100, len(configspace.values())) + y = 1 - (np.sum(X, axis=1) / len(configspace.values())) model.train(X, y) return model @pytest.fixture -def acquisition_function(model): +def model(configspace: ConfigurationSpace) -> RandomForest: + model = get_model(configspace) + # return model + yield model + model.close() + + + +def get_acquisition_function(model): ei = EI() ei.update(model=model, eta=0.5) return ei +@pytest.fixture +def acquisition_function(model): + return get_acquisition_function(model) + + def test_local_search(configspace): def acquisition_function(points): rval = [] @@ -257,6 +272,9 @@ class AcquisitionFunction: def __call__(self, X): return np.array([x.get_array().sum() for x in X]).reshape((-1, 1)) + def close(self): + pass + ls = LocalSearch( configspace=configspace, acquisition_function=AcquisitionFunction(), @@ -375,6 +393,9 @@ def __call__(self, arrays): rval.append([-rosenbrock_4d(array)]) return np.array(rval) + def close(self): + pass + budget_kwargs = {"max_steps": 2, "n_steps_plateau_walk": 2, "local_search_iterations": 2} prs_0 = LocalAndSortedRandomSearch( @@ -417,3 +438,58 @@ def test_differential_evolution(configspace, acquisition_function): values = rs._maximize(start_points, 1) values[0][1].origin == "Acquisition Function Maximizer: Differential Evolution" + + +# manual testing + +def differential_evolution(): + cs = get_configspace() + m = get_model(cs) + af = get_acquisition_function(m) + test_differential_evolution(cs, af) + + +def min_repro_differential_evolution_bug(): + cs = ConfigurationSpace(seed=0) + a = Float("a", (0, 1), default=0.5) + cs.add(a) + + model = RandomForest(cs) + + af = EI() + af.update(model=model, eta=0.5) + + np.random.seed(0) + X = np.random.rand(100, len(cs.values())) + y = 1 - (np.sum(X, axis=1) / len(cs.values())) + model.train(X, y) + + start_points = cs.sample_configuration(100) + # start_point = cs.sample_configuration() # this circumvents bug + rs = DifferentialEvolution(cs, af, challengers=1000) + values = rs._maximize(start_points, 1) + values[0][1].origin == "Acquisition Function Maximizer: Differential Evolution" + # model._rf_trainer.close() # this circumvents the bug + + +def random_search(): + cs = get_configspace() + m = get_model(cs) + af = get_acquisition_function(m) + test_random_search(cs, af) + + +def main(): + # TODO: running all these three IN THIS ORDER causes a hang, probably because of the dependency graph growing too + # complex for the garbage collector to handle, so RFTrainer.close() is never called. In order to avoid hangs while + # running tests, we explicitly call RFTrainer.close() during model fixture teardown + print('differential_evolution:') + differential_evolution() + print('\nmin_repro_differential_evolution_bug:') + min_repro_differential_evolution_bug() + print('\nrandom_search:') + random_search() + + +if __name__ == '__main__': + main() diff --git a/tests/test_ask_and_tell/test_ask_and_tell_intensifier.py b/tests/test_ask_and_tell/test_ask_and_tell_intensifier.py index 4fd5ab1ca..2c491bdb9 100644 --- a/tests/test_ask_and_tell/test_ask_and_tell_intensifier.py +++ b/tests/test_ask_and_tell/test_ask_and_tell_intensifier.py @@ -9,8 +9,7 @@ __license__ = "3-clause BSD" -@pytest.fixture -def make_facade(digits_dataset, make_sgd) -> HyperparameterOptimizationFacade: +def get_make_facade(digits_dataset, make_sgd) -> HyperparameterOptimizationFacade: def create( deterministic: bool = True, use_instances: bool = False, max_config_calls: int = 5 ) -> HyperparameterOptimizationFacade: @@ -47,6 +46,11 @@ def create( return create +@pytest.fixture +def make_facade(digits_dataset, make_sgd) -> HyperparameterOptimizationFacade: + return get_make_facade(digits_dataset, make_sgd) + + # -------------------------------------------------------------- # Test tell without ask # -------------------------------------------------------------- @@ -171,3 +175,23 @@ def test_multiple_asks_successively(make_facade): # Make sure the trials are different assert trial_info not in info info += [trial_info] + + +def ask_and_tell_after_optimization(): + from ..fixtures.datasets import DigitsDataset, Dataset + from ..fixtures.models import SGD + digits_dataset = DigitsDataset() + + def make_sgd(dataset: Dataset) -> SGD: + return SGD(dataset) + + make_facade = get_make_facade(digits_dataset, make_sgd) + test_ask_and_tell_after_optimization(make_facade) + + +def main(): + ask_and_tell_after_optimization() + + +if __name__ == '__main__': + main() From cb76f8c5512e4d93d7e2099a8b738cae93ccb3fc Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Fri, 13 Dec 2024 12:16:36 +0200 Subject: [PATCH 17/21] - improve sync between optimization and training loops - refactor: - renames: improved legibility - easier debug printing for sync between opt and train loop processes --- .../multiproc_util/GrowingSharedArray.py | 6 +- .../random_forest/multiproc_util/RFTrainer.py | 107 ++++++++++-------- tests/test_acquisition/test_maximizers.py | 8 +- 3 files changed, 65 insertions(+), 56 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index e060c64f6..9eda8c406 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -39,7 +39,7 @@ def open(self, shm_id: int, size: int): self.shm_id = shm_id self.size = size - def close_impl(self, unlink=False): + def close_shm(self, unlink=False): if self.shm_X is not None: self.shm_X.close() if unlink: @@ -56,7 +56,7 @@ def close_impl(self, unlink=False): self.size = None def close(self): - self.close_impl() + self.close_shm() def __del__(self): self.close() @@ -101,7 +101,7 @@ def __init__(self): super().__init__(lock=Lock()) def close(self): - self.close_impl(unlink=True) + self.close_shm(unlink=True) def __del__(self): self.close() diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 76f355802..383e66fb2 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -5,6 +5,7 @@ from multiprocessing import Lock, Queue, Process import queue +import sys from numpy import typing as npt import numpy as np @@ -18,6 +19,18 @@ SHUTDOWN = None +ENABLE_DBG_PRINT = False + + +def debug_print(*args, file=sys.stdout, **kwargs): + if ENABLE_DBG_PRINT: + print(*args, **kwargs, flush=True, file=file) + file.flush() + +# TODO: the type of the value passed for the 'bounds' param below is a tuple of tuples. Might this add some memory +# dependency between the processes which might mess up the cleanup process? + + def rf_training_loop( model_queue: Queue, data_queue: Queue, data_lock: Lock, # init rf train @@ -43,36 +56,44 @@ def rf_training_loop( rng = regression.default_random_engine(int(seed)) shared_arrs = GrowingSharedArrayReaderView(data_lock) + def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]): + # remove previous models from queue, if any, before pushing the latest model + while True: + try: + _ = model_queue.get(block=False) + except queue.Empty: + break + debug_print(f'TRAINER SENDING {"SHUTDOWN CONFIRM" if msg == SHUTDOWN else "MODEL"}', file=sys.stderr) + model_queue.put(msg) + debug_print(f'TRAINER SENDING {"SHUTDOWN CONFIRM" if msg == SHUTDOWN else "MODEL"} DONE', file=sys.stderr) + while True: - # print('TRAINER WAIT MSG', flush=True) - msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal - # print(f'TRAINER GOT MSG: {msg}', flush=True) - must_shutdown = msg == SHUTDOWN - # if must_shutdown: - # print(f'TRAINER GOT SHUTDOWN 1', flush=True) - - # discard all but the last msg in the queue + debug_print('TRAINER WAIT MSG', file=sys.stderr) + data_msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal + debug_print(f'TRAINER GOT MSG: {data_msg}', file=sys.stderr) + must_shutdown = data_msg == SHUTDOWN + if must_shutdown: + debug_print(f'TRAINER GOT SHUTDOWN 1', file=sys.stderr) + + # discard all but the last data_msg in the queue while True: try: - msg = data_queue.get(block=False) + data_msg = data_queue.get(block=False) except queue.Empty: break else: - # if msg == SHUTDOWN: - # print(f'TRAINER GOT SHUTDOWN 2', flush=True) - must_shutdown = must_shutdown or msg == SHUTDOWN + if data_msg == SHUTDOWN: + debug_print(f'TRAINER GOT SHUTDOWN 2', file=sys.stderr) + must_shutdown = must_shutdown or data_msg == SHUTDOWN if must_shutdown: shared_arrs.close() - # TODO: empty queue before pushing SHUTDOWN - # print(f'TRAINER SENDS SHUTDOWN CONFIRMATION', flush=True) - model_queue.put(SHUTDOWN) - # print(f'TRAINER FINISHED SEND SHUTDOWN CONFIRMATION', flush=True) + send_to_optimization_loop_process(SHUTDOWN) + # don't kill current process until we make sure the queue's underlying pipe is flushed model_queue.close() - # model_queue.join_thread() # TODO: enable this again - # print(f'TRAINER BYE BYE', flush=True) + model_queue.join_thread() break - shm_id, size = msg + shm_id, size = data_msg X, y = shared_arrs.get_data(shm_id, size) # when shm_id changes, here we should notify main thread it can call unlink the shared memory bc we called # close() on it @@ -84,19 +105,11 @@ def rf_training_loop( rf = BinaryForest() rf.options = rf_opts + debug_print(f'TRAINER STARTS TRAINING', file=sys.stderr) rf.fit(data, rng) - - # print(f'TRAINER FINISHED TRAINING', flush=True) - - # remove previous models from queue, if any, before pushing the latest model - while True: - try: - _ = model_queue.get(block=False) - except queue.Empty: - break - # print(f'TRAINER SENDING MODEL', flush=True) - model_queue.put(rf) - # print(f'TRAINER SENDING MODEL DONE', flush=True) + debug_print(f'TRAINER FINISHED TRAINING', file=sys.stderr) + send_to_optimization_loop_process(rf) + debug_print(f'TRAINER BYE BYE', file=sys.stderr) class RFTrainer: @@ -131,7 +144,9 @@ def open(self, self.model_queue = Queue(maxsize=1) self.data_queue = Queue(maxsize=1) self.training_loop_proc = Process( - target=rf_training_loop, daemon=True, name='rf_trainer', + target=rf_training_loop, + daemon=True, + # name='rf_trainer', args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree) @@ -142,21 +157,23 @@ def close(self): # send kill signal to training process if self.data_queue is not None: if self.training_loop_proc is not None: - # print('MAIN SEND SHUTDOWN', flush=True) + debug_print('MAIN SEND SHUTDOWN') self.send_to_training_loop_proc(SHUTDOWN) - # print('MAIN FINISHED SEND SHUTDOWN', flush=True) + debug_print('MAIN FINISHED SEND SHUTDOWN') # make sure the shutdown message is flush before moving on self.data_queue.close() self.data_queue.join_thread() + del self.data_queue + self.data_queue = None # wait till the training process died if self.model_queue is not None and self.training_loop_proc is not None and self.training_loop_proc.is_alive(): # flush the model queue, and store the latest model while True: - # print('MAIN WAIT SHUTDOWN CONFIRM', flush=True) + debug_print('MAIN WAIT SHUTDOWN CONFIRM') msg = self.model_queue.get() - # print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else msg} ' - # f'AFTER WAITING FOR SHUTDOWN CONFIRMATION', flush=True) + debug_print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else "MODEL"}' + f' AFTER WAITING FOR SHUTDOWN CONFIRMATION') # wait for SHUTDOWN message, because that guarantees that shared_arrs.close() has been called within # the training process; this way we make sure we call unlink only after close has had the chance to be # called within the child process @@ -172,15 +189,7 @@ def close(self): del self.training_loop_proc self.training_loop_proc = None - # I think this might be redundant, since according to the official docs, close and join_thread are called - # anyway when garbage-collecting queues, and we don't use JoinableQueues - if self.data_queue is not None: - del self.data_queue - self.data_queue = None - if self.model_queue is not None: - # self.model_queue.close() - # self.model_queue.join_thread() del self.model_queue self.model_queue = None @@ -221,6 +230,9 @@ def model(self) -> BinaryForest: return self._model def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]): + if self.data_queue is None: + raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data') + # empty queue before pushing new data onto it while True: try: @@ -233,11 +245,6 @@ def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUT def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): self.shared_arrs.set_data(X, y) - - if self.data_queue is None: - raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data') - self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) - if self.sync: self._model = self.model_queue.get() diff --git a/tests/test_acquisition/test_maximizers.py b/tests/test_acquisition/test_maximizers.py index 9fbd89892..fdac06619 100644 --- a/tests/test_acquisition/test_maximizers.py +++ b/tests/test_acquisition/test_maximizers.py @@ -480,9 +480,11 @@ def random_search(): def main(): - # TODO: running all these three IN THIS ORDER causes a hang, probably because of the dependency graph growing too - # complex for the garbage collector to handle, so RFTrainer.close() is never called. In order to avoid hangs while - # running tests, we explicitly call RFTrainer.close() during model fixture teardown + from smac.model.random_forest.multiproc_util import RFTrainer + RFTrainer.ENABLE_DBG_PRINT = True + # TODO: running ALL these three IN THIS ORDER causes a hang, probably because of the dependency graph growing too + # complex and circular for the garbage collector to handle, so RFTrainer.close() is never called. In order to avoid + # hangs while running tests, we explicitly call RFTrainer.close() in model fixture teardown print('differential_evolution:') differential_evolution() print('\nmin_repro_differential_evolution_bug:') From 6198d8f8afff3923e22089630295d8f99578ac87 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Fri, 13 Dec 2024 16:46:26 +0200 Subject: [PATCH 18/21] add flag to switch off (concurrent) background training entirely (default to old behavior) --- .../random_forest/multiproc_util/RFTrainer.py | 120 ++++++++++-------- smac/model/random_forest/random_forest.py | 7 +- smac/model/random_forest/util.py | 45 ++++++- tests/test_acquisition/test_functions.py | 18 ++- 4 files changed, 125 insertions(+), 65 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 383e66fb2..41b6b975c 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -9,11 +9,21 @@ from numpy import typing as npt import numpy as np -from pyrfr import regression -from pyrfr.regression import binary_rss_forest as BinaryForest +from pyrfr.regression import (binary_rss_forest as BinaryForest, default_random_engine as DefaultRandomEngine, + forest_opts as ForestOpts) from .GrowingSharedArray import GrowingSharedArrayReaderView, GrowingSharedArray -from ..util import init_data_container +from ..util import get_rf_opts, train + +from enum import Enum, auto, unique + + +@unique +class Concurrency(Enum): + THREADING = auto() + THREADING_SYNCED = auto() + MULTIPROC = auto() + MULTIPROC_SYNCED = auto() SHUTDOWN = None @@ -27,10 +37,9 @@ def debug_print(*args, file=sys.stdout, **kwargs): print(*args, **kwargs, flush=True, file=file) file.flush() + # TODO: the type of the value passed for the 'bounds' param below is a tuple of tuples. Might this add some memory # dependency between the processes which might mess up the cleanup process? - - def rf_training_loop( model_queue: Queue, data_queue: Queue, data_lock: Lock, # init rf train @@ -39,21 +48,11 @@ def rf_training_loop( n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int ) -> None: - rf_opts = regression.forest_opts() - rf_opts.num_trees = n_trees - rf_opts.do_bootstrapping = bootstrapping - rf_opts.tree_opts.max_features = max_features - rf_opts.tree_opts.min_samples_to_split = min_samples_split - rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf - rf_opts.tree_opts.max_depth = max_depth - rf_opts.tree_opts.epsilon_purity = eps_purity - rf_opts.tree_opts.max_num_nodes = max_nodes - rf_opts.compute_law_of_total_variance = False - if n_points_per_tree > 0: - rf_opts.num_data_points_per_tree = n_points_per_tree - - # Case to `int` incase we get an `np.integer` type - rng = regression.default_random_engine(int(seed)) + rf_opts = get_rf_opts(n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, + eps_purity, max_nodes, n_points_per_tree) + + # Cast to `int` incase we get an `np.integer` type + rng = DefaultRandomEngine(int(seed)) shared_arrs = GrowingSharedArrayReaderView(data_lock) def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]): @@ -98,16 +97,9 @@ def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]): # when shm_id changes, here we should notify main thread it can call unlink the shared memory bc we called # close() on it # UPDATE: we avoided the warnings by disabling tracking for shared memory - data = init_data_container(X, y, bounds) - if n_points_per_tree <= 0: - rf_opts.num_data_points_per_tree = len(X) + rf = train(rng, rf_opts, n_points_per_tree, bounds, X, y) - rf = BinaryForest() - rf.options = rf_opts - debug_print(f'TRAINER STARTS TRAINING', file=sys.stderr) - rf.fit(data, rng) - debug_print(f'TRAINER FINISHED TRAINING', file=sys.stderr) send_to_optimization_loop_process(rf) debug_print(f'TRAINER BYE BYE', file=sys.stderr) @@ -120,8 +112,8 @@ def __init__(self, n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int, # process synchronization - sync: bool = False) -> None: - self.sync = sync + background_training: Optional[Concurrency] = Concurrency.MULTIPROC) -> None: + self.background_training = background_training self._model: Optional[BinaryForest] = None self.shared_arrs: Optional[GrowingSharedArray] = None @@ -129,29 +121,43 @@ def __init__(self, self.data_queue: Optional[Queue] = None self.training_loop_proc: Optional[Process] = None - self.open(bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, - eps_purity, max_nodes, n_points_per_tree) + # in case we disable training in the background, and we need these objects in the main thread + self.opts: ForestOpts = get_rf_opts(n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, + max_depth, eps_purity, max_nodes, n_points_per_tree) + self.n_points_per_tree: int = n_points_per_tree + self.bounds = tuple(bounds) + + # this is NOT used when training in background + # Cast to `int` incase we get an `np.integer` type + self.rng = DefaultRandomEngine(int(seed)) + + self.open(seed) super().__init__() - def open(self, - # init rf train - bounds: Iterable[tuple[float, float]], seed: int, - # rf opts - n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, - max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> None: - self.shared_arrs = GrowingSharedArray() - self.model_queue = Queue(maxsize=1) - self.data_queue = Queue(maxsize=1) - self.training_loop_proc = Process( - target=rf_training_loop, - daemon=True, - # name='rf_trainer', - args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, - max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, - n_points_per_tree) - ) - self.training_loop_proc.start() + def open(self, seed: int) -> None: + assert self.background_training is None or self.background_training in Concurrency + if self.background_training is None: + pass + elif self.background_training is Concurrency.THREADING: + raise NotImplementedError + elif self.background_training is Concurrency.THREADING_SYNCED: + raise NotImplementedError + else: + self.shared_arrs = GrowingSharedArray() + self.model_queue = Queue(maxsize=1) + self.data_queue = Queue(maxsize=1) + self.training_loop_proc = Process( + target=rf_training_loop, + daemon=True, + name='rf_trainer', + args=(self.model_queue, self.data_queue, self.shared_arrs.lock, self.bounds, seed, self.opts.num_trees, + self.opts.do_bootstrapping, self.opts.tree_opts.max_features, + self.opts.tree_opts.min_samples_to_split, self.opts.tree_opts.min_samples_in_leaf, + self.opts.tree_opts.max_depth, self.opts.tree_opts.epsilon_purity, + self.opts.tree_opts.max_num_nodes, self.n_points_per_tree) + ) + self.training_loop_proc.start() def close(self): # send kill signal to training process @@ -227,6 +233,7 @@ def model(self) -> BinaryForest: raise RuntimeError("the shutdown message wasn't supposed to end up here") else: self._model = msg + return self._model def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]): @@ -244,7 +251,12 @@ def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUT self.data_queue.put(data_info) def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): - self.shared_arrs.set_data(X, y) - self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) - if self.sync: - self._model = self.model_queue.get() + if self.background_training is None: + self._model = train(self.rng, self.opts, self.n_points_per_tree, self.bounds, X, y) + else: + if self.background_training in (Concurrency.THREADING, Concurrency.THREADING_SYNCED): + raise NotImplementedError + self.shared_arrs.set_data(X, y) + self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) + if self.background_training in Concurrency.MULTIPROC_SYNCED: + self._model = self.model_queue.get() diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index 4049ffd1a..9afcda9b5 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -4,7 +4,6 @@ import numpy as np from ConfigSpace import ConfigurationSpace -from pyrfr import regression from smac.constants import N_TREES, VERY_SMALL_NUMBER from . import AbstractRandomForest @@ -76,10 +75,12 @@ def __init__( max_features = 0 if ratio_features > 1.0 else max(1, int(len(self._types) * ratio_features)) self._rf_trainer = RFTrainer(self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, - min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree) + min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree, + background_training=None) self._log_y = log_y - self._rng = regression.default_random_engine(int(seed)) + # this is NOT used when training in background + self._rng = self._rf_trainer.rng self._n_trees = n_trees self._n_points_per_tree = n_points_per_tree diff --git a/smac/model/random_forest/util.py b/smac/model/random_forest/util.py index 973445a3b..d520eb1d3 100644 --- a/smac/model/random_forest/util.py +++ b/smac/model/random_forest/util.py @@ -6,16 +6,33 @@ from typing import Iterable, TYPE_CHECKING import numpy as np -from pyrfr import regression +from pyrfr.regression import (default_data_container as DataContainer, forest_opts as ForestOpts, + binary_rss_forest as BinaryForest, default_random_engine as DefaultRandomEngine) if TYPE_CHECKING: - from pyrfr.regression import default_data_container as DataContainer from numpy import typing as npt -def init_data_container( - X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], bounds: Iterable[tuple[float, float]] -) -> DataContainer: +def get_rf_opts(n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, + max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> ForestOpts: + rf_opts = ForestOpts() + rf_opts.num_trees = n_trees + rf_opts.do_bootstrapping = bootstrapping + rf_opts.tree_opts.max_features = max_features + rf_opts.tree_opts.min_samples_to_split = min_samples_split + rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf + rf_opts.tree_opts.max_depth = max_depth + rf_opts.tree_opts.epsilon_purity = eps_purity + rf_opts.tree_opts.max_num_nodes = max_nodes + rf_opts.compute_law_of_total_variance = False + if n_points_per_tree > 0: + rf_opts.num_data_points_per_tree = n_points_per_tree + + return rf_opts + + +def init_data_container(X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], + bounds: Iterable[tuple[float, float]]) -> DataContainer: """Fills a pyrfr default data container s.t. the forest knows categoricals and bounds for continous data. Parameters @@ -31,7 +48,7 @@ def init_data_container( The filled data container that pyrfr can interpret. """ # Retrieve the types and the bounds from the ConfigSpace - data = regression.default_data_container(X.shape[1]) + data = DataContainer(X.shape[1]) for i, (mn, mx) in enumerate(bounds): if np.isnan(mx): @@ -43,3 +60,19 @@ def init_data_container( data.add_data_point(row_X, row_y) return data + + +def train(rng: DefaultRandomEngine, rf_opts: ForestOpts, n_points_per_tree: int, bounds: Iterable[tuple[float, float]], + X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> BinaryForest: + data = init_data_container(X, y, bounds) + + if n_points_per_tree <= 0: + rf_opts.num_data_points_per_tree = len(X) + + rf = BinaryForest() + rf.options = rf_opts + + rf.fit(data, rng) + + return rf + diff --git a/tests/test_acquisition/test_functions.py b/tests/test_acquisition/test_functions.py index 53c8b5f3a..f4e093a72 100644 --- a/tests/test_acquisition/test_functions.py +++ b/tests/test_acquisition/test_functions.py @@ -36,6 +36,9 @@ def predict_marginalized(self, X): [np.mean(X, axis=1).reshape((1, -1))] * self.num_targets ).reshape((-1, 1)) + def close(self): + pass + class MockModelDual: def __init__(self, num_targets=1): @@ -46,6 +49,9 @@ def predict_marginalized(self, X): [np.mean(X, axis=1).reshape((1, -1))] * self.num_targets ).reshape((-1, 2)) + def close(self): + pass + class MockPrior: def __init__(self, pdf, max_density): @@ -116,6 +122,9 @@ def predict_marginalized(self, X): def update_prior(self, hyperparameter_dict): self._configspace.get_hyperparameters_dict.return_value = hyperparameter_dict + def close(self): + pass + class MockModelRNG(MockModel): def __init__(self, num_targets=1, seed=0): @@ -154,9 +163,13 @@ def acquisition_function(model): # Test AbstractAcquisitionFunction # -------------------------------------------------------------- +class CloseableString(str): + def close(self): + pass + def test_update_model_and_eta(model, acquisition_function): - model = "abc" + model = CloseableString("abc") assert acquisition_function._eta is None acquisition_function.update(model=model, eta=0.1) assert acquisition_function.model == model @@ -164,7 +177,8 @@ def test_update_model_and_eta(model, acquisition_function): def test_update_with_kwargs(acquisition_function): - acquisition_function.update(model="abc", eta=0.0, other="hi there:)") + model = CloseableString("abc") + acquisition_function.update(model=model, eta=0.0, other="hi there:)") assert acquisition_function.model == "abc" From d956bfbe9e4a0a24953e1b02cbbfe12df067a416 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Fri, 13 Dec 2024 17:00:08 +0200 Subject: [PATCH 19/21] fix typo that caused bug --- smac/model/random_forest/multiproc_util/RFTrainer.py | 4 ++-- smac/model/random_forest/random_forest.py | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 41b6b975c..ffce5196e 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -112,7 +112,7 @@ def __init__(self, n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int, # process synchronization - background_training: Optional[Concurrency] = Concurrency.MULTIPROC) -> None: + background_training: Optional[Concurrency] = None) -> None: self.background_training = background_training self._model: Optional[BinaryForest] = None @@ -258,5 +258,5 @@ def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.floa raise NotImplementedError self.shared_arrs.set_data(X, y) self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) - if self.background_training in Concurrency.MULTIPROC_SYNCED: + if self.background_training is Concurrency.MULTIPROC_SYNCED: self._model = self.model_queue.get() diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index 9afcda9b5..a3e00deef 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -7,7 +7,7 @@ from smac.constants import N_TREES, VERY_SMALL_NUMBER from . import AbstractRandomForest -from .multiproc_util.RFTrainer import RFTrainer +from .multiproc_util.RFTrainer import RFTrainer, Concurrency __copyright__ = "Copyright 2022, automl.org" @@ -74,9 +74,11 @@ def __init__( max_features = 0 if ratio_features > 1.0 else max(1, int(len(self._types) * ratio_features)) - self._rf_trainer = RFTrainer(self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, - min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree, - background_training=None) + self._rf_trainer = RFTrainer( + self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, + min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree, + # background_training=Concurrency.MULTIPROC + ) self._log_y = log_y # this is NOT used when training in background From 396d2f54e1d1a897f7da57c7cae32dac5b668260 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Tue, 17 Dec 2024 08:45:35 +0200 Subject: [PATCH 20/21] default enable background training to run tests --- smac/model/random_forest/random_forest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index a3e00deef..6a798694d 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -77,7 +77,7 @@ def __init__( self._rf_trainer = RFTrainer( self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree, - # background_training=Concurrency.MULTIPROC + background_training=Concurrency.MULTIPROC ) self._log_y = log_y From 4b3016795c1f15af930f07b556b65befafa19154 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 18 Dec 2024 12:47:27 +0200 Subject: [PATCH 21/21] use setproctitle, if available to rename rf trainer process, so it's more easily identifiable in monitoring tools --- .../multiproc_util/GrowingSharedArray.py | 11 ++++++----- smac/model/random_forest/multiproc_util/RFTrainer.py | 10 ++++++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index 9eda8c406..2a4762137 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -1,19 +1,20 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import uuid from typing import Optional import math from multiprocessing import Lock +import uuid +import numpy as np +from numpy import typing as npt + # from multiprocessing.shared_memory import SharedMemory from .SharedMemory import SharedMemory as UntrackableSharedMemory -def SharedMemory(*args, **kwargs) -> UntrackableSharedMemory: - return UntrackableSharedMemory(*args, track=False, **kwargs) -import numpy as np -from numpy import typing as npt +def SharedMemory(*args, **kwargs) -> UntrackableSharedMemory: + return UntrackableSharedMemory(*args, track=False, **kwargs) def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool: diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index ffce5196e..365055c97 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -17,6 +17,13 @@ from enum import Enum, auto, unique +try: + from setproctitle import setproctitle +except ImportError: + setproctitle = None +else: + import uuid + @unique class Concurrency(Enum): @@ -48,6 +55,9 @@ def rf_training_loop( n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int ) -> None: + if setproctitle is not None: + setproctitle(f'rf_trainer_{uuid.uuid4().int}'[:15]) + rf_opts = get_rf_opts(n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree)