Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs for workers #140

Merged
merged 17 commits into from
Dec 9, 2024
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: hipercow
Title: High Performance Computing
Version: 1.0.51
Version: 1.0.52
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "[email protected]"),
person("Wes", "Hinsley", role = "aut"),
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export(hipercow_purge)
export(hipercow_resources)
export(hipercow_resources_validate)
export(hipercow_rrq_controller)
export(hipercow_rrq_stop_workers_once_idle)
export(hipercow_rrq_workers_submit)
export(hipercow_unconfigure)
export(task_cancel)
Expand Down
4 changes: 2 additions & 2 deletions R/environment.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ hipercow_environment_create <- function(name = "default", packages = NULL,

if (name == "rrq" && is_rrq_enabled(root)) {
cli::cli_alert_info("Refreshing existing rrq worker environments")
controller <- hipercow_rrq_controller(root)
controller <- hipercow_rrq_controller(root = root)
## or rrq::rrq_worker_refresh(controller) in recent rrq
rrq_message_send("REFRESH", controller = controller)
rrq::rrq_message_send("REFRESH", controller = controller)
}
}

Expand Down
20 changes: 6 additions & 14 deletions R/example.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
##'
##' @param runner Start a runner? If `TRUE` (the default) we start a
##' background process with `callr::r_bg` that will pick tasks off a
##' queue and run them. Or pass an integer, and start more than one
##' runner.
##' queue and run them.
##'
##' @param with_logging Run each task with logging; this is quite a
##' bit slower, but enables examples that use [task_log_show] etc.
Expand Down Expand Up @@ -45,15 +44,10 @@ hipercow_example_helper <- function(runner = TRUE,
owd <- setwd(normalize_path(path))
}

if (is.logical(runner)) {
runner <- as.integer(runner)
}
if (runner > 0) {
if (runner) {
args <- list(path, with_logging)
px <- lapply(seq_len(runner), function(i) {
callr::r_bg(example_runner, args, package = TRUE,
stdout = NULL, stderr = NULL)
})
px <- callr::r_bg(example_runner, args, package = TRUE,
stdout = NULL, stderr = NULL)
}

## Set required envvar for pkgdepends (see conan2's setup.R which
Expand All @@ -65,10 +59,8 @@ hipercow_example_helper <- function(runner = TRUE,

cleanup <- function() {
cli::cli_alert_info("Cleaning up example")
if (runner > 0) {
for (p in px) {
p$kill()
}
if (runner) {
px$kill()
}
if (new_directory) {
setwd(owd)
Expand Down
29 changes: 29 additions & 0 deletions R/rrq.R
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,35 @@ hipercow_rrq_workers_submit <- function(n,
}


##' Tell workers to exit once work is complete
##'
##' @title Tell workers to exit once complete
##'
##' @inheritParams hipercow_rrq_controller
##'
##' @return Nothing, called for side effects only
##' @export
hipercow_rrq_stop_workers_once_idle <- function(root = NULL) {
r <- hipercow_rrq_controller(root = root)
worker_ids <- rrq::rrq_worker_list(controller = r)
n <- length(worker_ids)
if (n == 0) {
cli::cli_alert_warning("No workers to send messages to")
} else {
rrq::rrq_message_send("TIMEOUT_SET", 0, worker_ids, controller = r)
cfg <- rrq::rrq_worker_config_read("hipercow", controller = r)
cli::cli_alert_success("Sent message to {n} worker{?s}")
cli::cli_alert_info(
"Workers will stop {cfg$poll_queue} second{?s} after their last task")
status <- table(rrq::rrq_worker_status(worker_ids, controller = r))
status_str <- paste(
sprintf("%s (%d)", names(status), status), collapse = ", ")
cli::cli_alert_info(
"Current worker status: {status_str}")
}
}


rrq_prepare <- function(driver, root, offload_threshold_size,
..., call = NULL) {
ensure_package("rrq")
Expand Down
2 changes: 2 additions & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ articles:
- title: Advanced topics
navbar: Advanced topics
contents:
- workers
- administration
- migration

Expand Down Expand Up @@ -132,6 +133,7 @@ reference:
contents:
- hipercow_rrq_controller
- hipercow_rrq_workers_submit
- hipercow_rrq_stop_workers_once_idle

- title: Utilities
desc: >-
Expand Down
2 changes: 1 addition & 1 deletion drivers/windows/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: hipercow.windows
Title: DIDE HPC Support for Windows
Version: 1.0.51
Version: 1.0.52
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "[email protected]"),
person("Wes", "Hinsley", role = "aut"),
Expand Down
2 changes: 2 additions & 0 deletions hipercow.Rproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ LaTeX: pdfLaTeX
BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source

SpellingDictionary: en_GB
3 changes: 1 addition & 2 deletions man/hipercow_example_helper.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions man/hipercow_rrq_stop_workers_once_idle.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 26 additions & 2 deletions tests/testthat/test-rrq.R
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ test_that("checks rrq version", {
numeric_version("0.7.19"),
numeric_version("0.7.20"),
numeric_version("0.7.21"))
mockery::stub(hipercow_rrq_controller, "package_version_if_installed",
mock_version, depth = 2)
testthat::local_mocked_bindings(
package_version_if_installed = mock_version)

expect_error(hipercow_rrq_controller(root = path),
paste("Package rrq is not installed. Version 0.7.20 or greater",
Expand All @@ -317,3 +317,27 @@ test_that("checks rrq version", {

expect_no_error(suppressMessages(hipercow_rrq_controller(root = path)))
})


test_that("refresh worker environment when updating rrq", {
skip_if_no_redis()
path <- withr::local_tempdir()
init_quietly(path, driver = "example")
withr::defer(rrq::rrq_default_controller_clear())
writeLines("a <- 1", file.path(path, "src.R"))
msg <- capture_messages(
hipercow_environment_create("rrq", sources = "src.R", root = path))
expect_length(msg, 1)
expect_match(msg[[1]], "Created environment 'rrq'")

expect_message(
r <- hipercow_rrq_controller(root = path),
"Created new rrq queue")

msg <- capture_messages(
hipercow_environment_create("rrq", sources = "src.R", root = path))
expect_length(msg, 3)
expect_match(msg[[1]], "Environment 'rrq' is unchanged")
expect_match(msg[[2]], "Refreshing existing rrq worker environments")
expect_match(msg[[3]], "Using existing rrq queue")
})
38 changes: 38 additions & 0 deletions tests/testthat/test-zzz-integration.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,41 @@ test_that("Can run task in parallel", {
expect_equal(res[[2]][[1]], 2)
expect_true(res[[1]][[2]] != res[[2]][[2]])
})


test_that("Can turn off workers by message", {
skip_if_not_installed("callr")
skip_if_no_redis()

path <- withr::local_tempdir()
init_quietly(path, driver = "example")
suppressMessages({
r <- hipercow_rrq_controller(root = path)
cfg <- rrq::rrq_worker_config_read("hipercow", controller = r)
cfg$poll_queue <- 1
rrq::rrq_worker_config_save("hipercow", cfg, controller = r)
launch_example_workers(path)
info <- withr::with_dir(path, hipercow_rrq_workers_submit(1))
id <- info$worker_id
})

msg <- capture_messages(hipercow_rrq_stop_workers_once_idle(path))
expect_length(msg, 4)
expect_match(msg[[1]], "Using existing rrq queue")
expect_match(msg[[2]], "Sent message to 1 worker")
expect_match(msg[[3]], "Workers will stop 1 second after their last task")
expect_match(msg[[4]], "Current worker status: IDLE (1)", fixed = TRUE)

Sys.sleep(2)
expect_equal(unname(rrq::rrq_worker_status(id)), "EXITED")
logs <- rrq::rrq_worker_log_tail(id, n = 4)
expect_equal(logs$command,
c("MESSAGE", "RESPONSE", "HEARTBEAT", "STOP"))
expect_equal(logs$message,
c("TIMEOUT_SET", "TIMEOUT_SET", "stopping", "OK (TIMEOUT)"))

msg <- capture_messages(hipercow_rrq_stop_workers_once_idle(path))
expect_length(msg, 2)
expect_match(msg[[1]], "Using existing rrq queue")
expect_match(msg[[2]], "No workers to send messages to")
})
2 changes: 2 additions & 0 deletions vignettes/administration.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ in your `.Renviron` indicating where we should work. We'll make lots of directo
Each vignette can be built by running (ideally in a fresh session with the working directory as the package root)

```r
hipercow.windows:::windows_check_credentials()
knitr::knit("vignettes_src/windows.Rmd", "vignettes/windows.Rmd")
knitr::knit("vignettes_src/packages.Rmd", "vignettes/packages.Rmd")
knitr::knit("vignettes_src/workers.Rmd", "vignettes/workers.Rmd")
knitr::knit("vignettes_src/stan.Rmd", "vignettes/stan.Rmd")
```

Expand Down
Loading
Loading