A Slurm-native sibling of experimentTmux and
experimentFuture. Submits n_workers long-lived SBATCH
jobs that each call tmuxRunWorkerLoop against the shared
queue, claiming and running rows until the queue is empty (or the
worker's stop file appears). Same queue / global.R / runNameLabel
/ statusCalculate semantics as the other two runners.
Returns a non-blocking handle of class "experimentSBATCH" carrying
the Slurm job IDs. Use awaitExperimentSBATCH to poll
squeue until all jobs leave the queue, or
killExperimentSBATCH to stop them (gracefully via stop files,
or immediately via scancel).
experimentSBATCH(
df,
global_path = "global.R",
n_workers = 4L,
queue_path = NULL,
on_interrupt = c("requeue", "fail"),
ss_id = NULL,
forceLocalQueueToGS = FALSE,
email = getOption("gargle_oauth_email"),
cache_path = getOption("gargle_oauth_cache"),
runNameLabel = quote(colnames(q)[1:2]),
log_dir = "logs",
activeRunningPath = getOption("spades.activeRunningPath"),
sbatch_opts = list(),
sbatch_cmd = "sbatch",
r_cmd = file.path(R.home("bin"), "Rscript"),
r_libs = .libPaths(),
dry_run = FALSE,
...
)data.frame of parameter combinations. Each row is one job.
Ignored if queue_path already exists (workers resume from the
existing queue).
Path to the R script each worker sources per job.
Defaults to "global.R" in the current directory. Must be on a
filesystem visible to compute nodes (e.g. shared NFS / Lustre).
Number of SBATCH jobs to submit. Defaults to 4L.
Path to the local RDS queue file. Created automatically
if it does not yet exist. Defaults to
<dirname(global_path)>/sbatch_queue.rds. Must be on shared
storage so all worker nodes can read/write it.
"requeue" (default) re-queues interrupted jobs
as PENDING; "fail" marks them INTERRUPTED permanently.
Google Sheets ID (or Drive folder ID) for the shared queue.
When provided workers use the GS backend in addition to the local RDS
file (mirroring experimentFuture).
If TRUE, overwrite the GS sheet with the
local df even if the sheet already contains rows.
Gargle OAuth e-mail for Google Sheets auth (only used when
ss_id is non-NULL).
Gargle OAuth cache directory.
Quoted expression evaluated in the job environment to
derive a human-readable run name (used in log messages and queue
metadata). Defaults to quote(colnames(q)[1:2]).
Directory for per-worker log files, generated job scripts,
and stop files. Created if needed. Defaults to "logs" relative
to the current working directory. Must be on shared storage.
Directory for Running_*.rds marker files
(file-based backend only).
Named list of SBATCH directives. Each name = value
becomes #SBATCH --<name>=<value> in the generated job script.
Underscores in names are translated to hyphens, so
cpus_per_task = 4 becomes #SBATCH --cpus-per-task=4.
Set a value to NULL or TRUE for flag-only directives
(#SBATCH --<name>). Common keys: partition, time,
mem, cpus_per_task, account, nodes,
ntasks_per_node, constraint, gres.
Path to the sbatch executable. Defaults to
"sbatch" (resolved on $PATH). Override on systems where
sbatch is wrapped or non-standard.
Path to the R interpreter to invoke on compute nodes.
Defaults to file.path(R.home("bin"), "Rscript").
Character vector of library paths to set via .libPaths()
inside each worker. Defaults to the master's .libPaths() so the
worker sees the same package set; override when compute nodes have a
different filesystem layout.
If TRUE, generate the job scripts but do not submit
them. Returns a handle whose job_ids are all NA. Useful
for inspecting what would be submitted.
Additional named arguments stored in .sbatch_dots.rds and
loaded into each worker's .GlobalEnv before sourcing
global_path.
An object of class "experimentSBATCH" (a list) containing:
job_idsInteger vector of Slurm job IDs (or NA
under dry_run = TRUE).
job_scriptsCharacter vector of generated SBATCH script paths.
log_filesCharacter vector of log file paths.
stop_filesCharacter vector of stop-file paths.
log_dirAbsolute path to the log directory.
queue_pathAbsolute path to the queue RDS file.
if (FALSE) { # \dontrun{
## -- Minimal: build a tiny global.R, then run a 2 x 2 experiment ---------
# Use a directory on your shared HPC filesystem (NFS / Lustre / BeeGFS).
tdir <- file.path(tempdir(), "experimentSBATCH-demo")
dir.create(tdir, showWarnings = FALSE, recursive = TRUE)
writeLines(
'message("scenario=", .scenario, " rep=", .rep); Sys.sleep(2)',
file.path(tdir, "global.R")
)
expt <- expand.grid(.scenario = c("A", "B"), .rep = 1:2,
stringsAsFactors = FALSE)
es <- experimentSBATCH(
df = expt,
global_path = file.path(tdir, "global.R"),
n_workers = 2L,
queue_path = file.path(tdir, "sbatch_queue.rds"),
log_dir = file.path(tdir, "logs"),
sbatch_opts = list(partition = "compute", time = "00:30:00", mem = "1G")
)
## -- Live inspection while jobs run --------------------------------------
print(es) # job IDs + squeue status
queueRead(es$queue_path) # full queue snapshot
experimentMonitor(queue_paths = es$queue_path) # cluster-wide pid + machine
experimentMonitor(queue_paths = es$queue_path, stats = TRUE) # + CPU/RAM
awaitExperimentSBATCH(es) # block until squeue empty
## -- Larger experiment with full sbatch_opts ------------------------------
es <- experimentSBATCH(
df = expt,
global_path = file.path(tdir, "global.R"),
n_workers = 4L,
queue_path = file.path(tdir, "sbatch_queue.rds"),
log_dir = file.path(tdir, "logs"),
sbatch_opts = list(
partition = "compute",
time = "24:00:00",
mem = "16G",
cpus_per_task = 4,
account = "my_alloc"
)
)
print(es) # job IDs + squeue status per worker
awaitExperimentSBATCH(es) # blocks until every job ID leaves squeue
# Graceful stop (workers exit between jobs, queue rows stay PENDING):
killExperimentSBATCH(es)
# Immediate stop (scancel; stale RUNNING entries can be cleaned up via:
# tmuxRefreshQueueStatus(es$queue_path)):
killExperimentSBATCH(es, force = TRUE)
tmuxRefreshQueueStatus(es$queue_path)
## -- Resume after stop ---------------------------------------------------
# Same `queue_path` -> DONE rows are skipped, demoted PENDING rows are
# re-claimed by the new sbatch jobs.
es2 <- experimentSBATCH(
df = expt, # ignored if queue exists
global_path = file.path(tdir, "global.R"),
n_workers = 2L,
queue_path = file.path(tdir, "sbatch_queue.rds"),
log_dir = file.path(tdir, "logs"),
sbatch_opts = list(partition = "compute", time = "00:30:00", mem = "1G")
)
awaitExperimentSBATCH(es2)
table(queueRead(es2$queue_path)$status) # all DONE
} # }