A tmux-free alternative to experimentTmux that dispatches
parallel workers as background R processes via callr. Workers claim
jobs from a GoogleSheets or local-RDS queue, source global_path for
each job, and write all console output to per-worker log files on localhost.
The function returns immediately (non-blocking) with a handle object of
class "experimentFuture". Use awaitExperimentFuture to
block until all workers finish, or print(ef) to check live status.
Because workers write to files via callr::r_bg()'s stdout /
stderr arguments, logs appear in real time and can be followed with
tail -f.
experimentFuture(
df,
global_path = "global.R",
cores = NULL,
n_workers = if (is.null(cores)) 4L else length(cores),
queue_path = NULL,
on_interrupt = c("requeue", "fail"),
ss_id = NULL,
forceLocalQueueToGS = FALSE,
email = getOption("gargle_oauth_email"),
cache_path = getOption("gargle_oauth_cache"),
runNameLabel = quote(colnames(q)[1:2]),
log_dir = "logs",
activeRunningPath = getOption("spades.activeRunningPath"),
sp_dev_path = NULL,
local_pat_file = NULL,
copyModules = FALSE,
...
)data.frame of parameter combinations. Each row is one job.
Path to the R script each worker sources per job.
Defaults to "global.R" in the current directory.
NULL for local parallel workers, or a character vector
of SSH hostnames for remote workers. When cores is provided,
.setup_remote_machine is called for each unique host before
workers are launched, replicating the full SpaDES environment (packages,
GitHub PAT, gargle OAuth cache) on each remote machine.
Number of parallel workers. Defaults to
length(cores) for remote workers, or 4L for local.
Path to the local RDS queue file. Created automatically
if it does not yet exist. Defaults to
<dirname(global_path)>/future_queue.rds.
"requeue" (default) re-queues interrupted jobs
as PENDING; "fail" marks them INTERRUPTED permanently.
Google Sheets ID (or Drive folder ID) for the shared queue. When provided workers use the GS backend instead of the local RDS file.
If TRUE, overwrite the GS sheet with the
local df even if the sheet already contains rows.
Gargle OAuth e-mail for Google Sheets auth.
Gargle OAuth cache directory.
Quoted expression evaluated in the job environment to derive a human-readable run name (used in log messages and queue metadata).
Directory for per-worker log files. Created if needed.
Defaults to "logs" relative to the current working directory.
Directory for Running_*.rds marker files
(file-based backend only).
Local path to SpaDES.project source tree to sync to
remote workers (optional; uses installed binary if NULL).
Path to a file containing a GitHub PAT to copy to remote workers.
Logical. If TRUE and remote hosts are present,
rsyncs the directory given by getOption("spades.modulePath") to the
same absolute path on each remote host after .setup_remote_machine()
completes. Issues a warning and skips if the option is unset.
Default FALSE.
Additional named arguments stored in .future_dots.rds and
loaded into each worker's .GlobalEnv before sourcing
global_path.
An object of class "experimentFuture" (a list) containing:
procsList of callr::r_bg process objects, one per
local worker (or future objects for remote cluster workers).
log_filesCharacter vector of log file paths.
log_dirAbsolute path to the log directory.
queue_pathAbsolute path to the queue RDS file.
coresThe cores argument as supplied.
if (FALSE) { # \dontrun{
## -- Minimal: build a tiny global.R, then run a 2 x 2 experiment ---------
tdir <- file.path(tempdir(), "experimentFuture-demo")
dir.create(tdir, showWarnings = FALSE, recursive = TRUE)
writeLines(
'message("scenario=", .scenario, " rep=", .rep); Sys.sleep(2)',
file.path(tdir, "global.R")
)
expt <- expand.grid(.scenario = c("A", "B"), .rep = 1:2,
stringsAsFactors = FALSE)
ef <- experimentFuture(
df = expt,
global_path = file.path(tdir, "global.R"),
n_workers = 2L,
queue_path = file.path(tdir, "future_queue.rds"),
log_dir = file.path(tdir, "logs")
)
## -- Live inspection while workers run -----------------------------------
print(ef) # alive/done per worker
experimentMonitor(ef) # pid + machine + runName
experimentMonitor(ef, stats = TRUE) # adds CPU / RAM / state
queueRead(ef$queue_path) # full queue snapshot
experimentFutureList(ef) # cluster-wide pid list
cat(readLines(ef$log_files[[1L]]), sep = "\n") # tail one log
awaitExperimentFuture(ef) # blocks until both workers exit
## -- Killing workers ----------------------------------------------------
# Graceful stop: workers finish their CURRENT job, then exit.
# Any remaining PENDING jobs stay in the queue and can be resumed later
# by calling experimentFuture() again with the same queue_path.
killExperimentFuture(ef)
# Immediate stop (force): workers are killed immediately.
# Jobs that were mid-execution may remain as RUNNING in the queue; reset them with:
# tmuxRefreshQueueStatus(ef$queue_path) # file-based backend
# The GS backend reclaims stale RUNNING entries automatically before each new claim.
killExperimentFuture(ef, force = TRUE)
tmuxRefreshQueueStatus(ef$queue_path) # clean up stale RUNNING entries
# Cluster-wide kill (works for `cores = c(...)` clusters too):
# sends SIGTERM to every worker on every machine, waits for exit, runs
# tmuxRefreshQueueStatus(), and pushes the demotion to the Google Sheet
# if `ss_id` was used (via the <queue_path>.ss_id sidecar).
experimentFutureList(ef, kill = TRUE)
## -- Resuming after a kill ----------------------------------------------
# Jobs left as PENDING (or INTERRUPTED with on_interrupt = "requeue") are
# automatically picked up when you call experimentFuture() again with the
# same queue_path -- no need to re-specify df.
ef2 <- experimentFuture(
df = expt, # ignored if queue_path already exists
global_path = file.path(tdir, "global.R"),
n_workers = 2L,
queue_path = file.path(tdir, "future_queue.rds"),
log_dir = file.path(tdir, "logs")
)
awaitExperimentFuture(ef2) # wait for remaining jobs to finish
queueRead(ef2$queue_path) # full snapshot (data.table)
table(queueRead(ef2$queue_path)$status) # all DONE
cat(readLines(ef2$log_files[[1]]), sep = "\n") # inspect worker 1 log
## -- Remote workers (pre-setup required) -------------------------------
ef <- experimentFuture(
df = expt,
global_path = file.path(tdir, "global.R"),
cores = c("node01", "node02"),
n_workers = 2L,
ss_id = "YOUR_GOOGLE_SHEET_ID",
email = "you@example.com",
cache_path = "~/.cache/gargle",
local_pat_file = "~/.github_pat"
)
killExperimentFuture(ef) # graceful stop on remote workers too
} # }