Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model assignment refactor 10 01 25 #478

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 97 additions & 32 deletions refact_webgui/webgui/selfhost_model_assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from refact_webgui.webgui.selfhost_webutils import log
from refact_known_models import models_mini_db, passthrough_mini_db

from typing import List, Dict, Any
from typing import List, Dict, Any, Set, Optional


__all__ = ["ModelAssigner"]


ALLOWED_N_CTX = [2 ** p for p in range(10, 20)]
SHARE_GPU_BACKENDS = ["transformers", "autogptq"]
ALLOWED_GPUS_SHARD = [2 ** p for p in range(10)]


def has_context_switch(filter_caps: List[str]) -> bool:
Expand Down Expand Up @@ -47,6 +47,41 @@ def gpus_shard(self) -> int:
return max([rec["gpus_shard"] for rec in self.model_assign.values()])


@dataclass
class ModelWatchdogDConfig:
backend: str
model_name: str
gpus: List[int]
share_gpu: bool
n_ctx: Optional[int] = None
has_loras: bool = False

def dump(self, model_cfg_j: Dict) -> str:
model_cfg_j["command_line"].extend(["--model", self.model_name])
if self.backend not in ["transformers", "autogptq"]:
if self.n_ctx is not None:
model_cfg_j["command_line"].extend(["--n-ctx", self.n_ctx])
if not self.has_loras:
model_cfg_j["command_line"].append("--loraless")

model_cfg_j["gpus"] = self.gpus
model_cfg_j["share_gpu"] = self.share_gpu
del model_cfg_j["unfinished"]

cfg_fn = "-".join([
"model",
self.model_name.lower().replace('/', '-'),
*[f"{gpu:02d}" for gpu in self.gpus]
]) + ".cfg"

fn = os.path.join(env.DIR_WATCHDOG_D, cfg_fn)
with open(fn + ".tmp", "w") as f:
json.dump(model_cfg_j, f, indent=4)
os.rename(fn + ".tmp", fn)

return cfg_fn


class ModelAssigner:

def __init__(self):
Expand All @@ -55,6 +90,14 @@ def __init__(self):
for model_name, model_info in self.models_db.items()
}

@property
def shard_gpu_backends(self) -> Set[str]:
return {"transformers"}

@property
def share_gpu_backends(self) -> Set[str]:
return {"transformers", "autogptq"}

@property
def models_db(self) -> Dict[str, Any]:
return models_mini_db
Expand All @@ -74,13 +117,17 @@ def _model_assign_to_groups(self, model_assign: Dict[str, Dict]) -> List[ModelGr
if model_name not in self.models_db.keys():
log(f"unknown model '{model_name}', skipping")
continue
if assignment["gpus_shard"] not in [1, 2, 4]:
model_dict = self.models_db[model_name]
if (assignment["gpus_shard"] not in ALLOWED_GPUS_SHARD or
assignment["gpus_shard"] > model_dict.get("max_gpus_shard", assignment["gpus_shard"])):
log(f"invalid shard count {assignment['gpus_shard']}, skipping '{model_name}'")
continue
if self.models_db[model_name]["backend"] not in ["transformers"] and assignment["gpus_shard"] > 1:
log(f"sharding not supported for '{self.models_db['backend']}' backend, skipping '{model_name}'")
if (assignment["gpus_shard"] > 1 and
model_dict["backend"] not in self.shard_gpu_backends):
log(f"sharding not supported for '{model_dict['backend']}' backend, skipping '{model_name}'")
continue
if assignment.get("share_gpu", False) and self.models_db[model_name]["backend"] in SHARE_GPU_BACKENDS:
if (assignment.get("share_gpu", False)
and model_dict["backend"] in self.share_gpu_backends):
if not shared_group.model_assign:
model_groups.append(shared_group)
shared_group.model_assign[model_name] = assignment
Expand Down Expand Up @@ -109,7 +156,7 @@ def _model_assign_filter(self, model_assign: Dict[str, Any]) -> Dict[str, Any]:

def _share_gpu_filter(self, model_assign: Dict[str, Any]) -> Dict[str, Any]:
def _update_share_gpu(model: str, record: Dict) -> Dict:
allow_share_gpu = self.models_db[model]["backend"] in SHARE_GPU_BACKENDS
allow_share_gpu = self.models_db[model]["backend"] in self.share_gpu_backends
record["share_gpu"] = record.get("share_gpu", False) and allow_share_gpu
return record

Expand All @@ -118,45 +165,55 @@ def _update_share_gpu(model: str, record: Dict) -> Dict:
for model_name, model_cfg in model_assign.items()
}

@property
def _model_cfg_template(self) -> Dict:
return json.load(open(os.path.join(env.DIR_WATCHDOG_TEMPLATES, "model.cfg")))

def _has_loras(self, model_name: str) -> bool:
active_loras = get_active_loras(self.models_db)
return bool(active_loras.get(model_name, {}).get("loras", []))

def _model_inference_setup(self, inference_config: Dict[str, Any]) -> Dict[str, Any]:
gpus = self.gpus["gpus"]
model_groups = self._model_assign_to_groups(inference_config["model_assign"])
cursor = 0
allowed_to_exist = []
required_memory_exceed_available = False
more_models_than_gpus = False
more_models_than_gpus = sum([mg.gpus_shard() for mg in model_groups]) > len(gpus)

model_configs = []
for model_group in model_groups:
models_message = ' '.join([f"'{model_name}'" for model_name in model_group.model_assign.keys()])
log(f"assign models {models_message}, cursor {cursor}, gpus_shard {model_group.gpus_shard()}")
next_cursor = cursor + model_group.gpus_shard()
if cursor + model_group.gpus_shard() > len(gpus):
more_models_than_gpus = True
if next_cursor > len(gpus):
break

for model_name, assignment in model_group.model_assign.items():
for idx, model_cursor in enumerate(range(cursor, next_cursor, assignment["gpus_shard"])):
cfg_out = f"model-{model_name.lower().replace('/', '-')}-{idx}.cfg"
allowed_to_exist.append(cfg_out)
fn = os.path.join(env.DIR_WATCHDOG_D, cfg_out)
with open(fn + ".tmp", "w") as f:
model_cfg_j = self._model_cfg_template()
model_cfg_j["command_line"].append("--model")
model_cfg_j["command_line"].append(model_name)
model_cfg_j["gpus"] = list(range(model_cursor, model_cursor + assignment["gpus_shard"]))
model_cfg_j["share_gpu"] = assignment.get("share_gpu", False)
del model_cfg_j["unfinished"]
json.dump(model_cfg_j, f, indent=4)
os.rename(fn + ".tmp", fn)
for _ in range(model_group.gpus_shard()):
if gpus[cursor]["mem_total_mb"] < model_group.required_memory_mb(self.models_db):
required_memory_exceed_available = True
cursor += 1
for model_cursor in range(cursor, next_cursor, assignment["gpus_shard"]):
model_configs.append(ModelWatchdogDConfig(
backend=self.models_db.get(model_name, {}).get("backend", ""),
model_name=model_name,
gpus=list(range(model_cursor, model_cursor + assignment["gpus_shard"])),
share_gpu=assignment.get("share_gpu", False),
n_ctx=assignment.get("n_ctx", None),
has_loras=self._has_loras(model_name),
))
for _ in range(model_group.gpus_shard()):
if gpus[cursor]["mem_total_mb"] < model_group.required_memory_mb(self.models_db):
required_memory_exceed_available = True
cursor += 1

# dump configs
allowed_to_exist = set()
for config in model_configs:
fn = config.dump(self._model_cfg_template)
allowed_to_exist.add(fn)
log(f"assign model {config.model_name}, gpus {config.gpus}: {fn}")

log("required_memory_exceed_available %d" % required_memory_exceed_available)
log("more_models_than_gpus %d" % more_models_than_gpus)
cfgs_on_disk = [cfg for cfg in os.listdir(env.DIR_WATCHDOG_D) if
cfg.endswith(".cfg") and cfg.startswith("model-")]

# remove configs that are not allowed now
for cfg_fn in cfgs_on_disk:
if cfg_fn not in allowed_to_exist:
try:
Expand Down Expand Up @@ -236,6 +293,14 @@ def models_info(self):
available_n_ctx = list(filter(lambda n_ctx: n_ctx <= default_n_ctx, ALLOWED_N_CTX))
assert default_n_ctx in available_n_ctx, \
f"default n_ctx {default_n_ctx} not in {available_n_ctx}"
available_shards = [1]
if rec["backend"] in self.shard_gpu_backends:
max_gpus = len(self.gpus["gpus"])
max_available_shards = min(max_gpus, rec.get("max_gpus_shard", max_gpus))
available_shards = [
gpus_shard for gpus_shard in ALLOWED_GPUS_SHARD
if gpus_shard <= max_available_shards
]
info.append({
"name": k,
"backend": rec["backend"],
Expand All @@ -245,10 +310,10 @@ def models_info(self):
"has_finetune": has_finetune,
"has_embeddings": bool("embeddings" in rec["filter_caps"]),
"has_chat": bool("chat" in rec["filter_caps"]),
"has_sharding": rec["backend"] in ["transformers"],
"has_share_gpu": rec["backend"] in SHARE_GPU_BACKENDS,
"has_share_gpu": rec["backend"] in self.share_gpu_backends,
"default_n_ctx": default_n_ctx,
"available_n_ctx": available_n_ctx,
"available_shards": available_shards,
"is_deprecated": bool(rec.get("deprecated", False)),
"repo_status": self._models_repo_status[k],
"repo_url": f"https://huggingface.co/{rec['model_path']}",
Expand Down
4 changes: 2 additions & 2 deletions refact_webgui/webgui/static/tab-model-hosting.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ function render_models_assigned(models) {
}
finetune_info_factory(index, models_info, finetune_info, finetune_runs, models_data.multiple_loras);

if (models_info[index].hasOwnProperty('has_sharding') && models_info[index].has_sharding) {
if (models_info[index].available_shards.length > 1) {
const select_gpus_div = document.createElement("div");
select_gpus_div.setAttribute("class", "btn-group btn-group-sm");
select_gpus_div.setAttribute("role", "group");
select_gpus_div.setAttribute("aria-label", "basic radio toggle button group");

[1, 2, 4].forEach((gpus_shard_n) => {
models_info[index].available_shards.forEach((gpus_shard_n) => {
const input_name = `gpu-${index}`;
const input_id = `${input_name}-${gpus_shard_n}`;

Expand Down
2 changes: 1 addition & 1 deletion refact_webgui/webgui/tab_models_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def validate_mode(cls, v: str):


class TabHostModelRec(BaseModel):
gpus_shard: int = Query(default=1, ge=1, le=4)
gpus_shard: int = Query(default=1, ge=1, le=1024)
share_gpu: bool = False
n_ctx: Optional[int] = None

Expand Down
Loading