A tmux-free alternative to experimentTmux that dispatches parallel workers as background R processes via callr. Workers claim jobs from a GoogleSheets or local-RDS queue, source global_path for each job, and write all console output to per-worker log files on localhost.

The function returns immediately (non-blocking) with a handle object of class "experimentFuture". Use awaitExperimentFuture to block until all workers finish, or print(ef) to check live status. Because workers write to files via callr::r_bg()'s stdout / stderr arguments, logs appear in real time and can be followed with tail -f.

experimentFuture(
  df,
  global_path = "global.R",
  cores = NULL,
  n_workers = if (is.null(cores)) 4L else length(cores),
  queue_path = NULL,
  on_interrupt = c("requeue", "fail"),
  ss_id = NULL,
  forceLocalQueueToGS = FALSE,
  email = getOption("gargle_oauth_email"),
  cache_path = getOption("gargle_oauth_cache"),
  runNameLabel = quote(colnames(q)[1:2]),
  log_dir = "logs",
  activeRunningPath = getOption("spades.activeRunningPath"),
  sp_dev_path = NULL,
  local_pat_file = NULL,
  copyModules = FALSE,
  ...
)

Arguments

df

data.frame of parameter combinations. Each row is one job.

global_path

Path to the R script each worker sources per job. Defaults to "global.R" in the current directory.

cores

NULL for local parallel workers, or a character vector of SSH hostnames for remote workers. When cores is provided, .setup_remote_machine is called for each unique host before workers are launched, replicating the full SpaDES environment (packages, GitHub PAT, gargle OAuth cache) on each remote machine.

n_workers

Number of parallel workers. Defaults to length(cores) for remote workers, or 4L for local.

queue_path

Path to the local RDS queue file. Created automatically if it does not yet exist. Defaults to <dirname(global_path)>/future_queue.rds.

on_interrupt

"requeue" (default) re-queues interrupted jobs as PENDING; "fail" marks them INTERRUPTED permanently.

ss_id

Google Sheets ID (or Drive folder ID) for the shared queue. When provided workers use the GS backend instead of the local RDS file.

forceLocalQueueToGS

If TRUE, overwrite the GS sheet with the local df even if the sheet already contains rows.

email

Gargle OAuth e-mail for Google Sheets auth.

cache_path

Gargle OAuth cache directory.

runNameLabel

Quoted expression evaluated in the job environment to derive a human-readable run name (used in log messages and queue metadata).

log_dir

Directory for per-worker log files. Created if needed. Defaults to "logs" relative to the current working directory.

activeRunningPath

Directory for Running_*.rds marker files (file-based backend only).

sp_dev_path

Local path to SpaDES.project source tree to sync to remote workers (optional; uses installed binary if NULL).

local_pat_file

Path to a file containing a GitHub PAT to copy to remote workers.

copyModules

Logical. If TRUE and remote hosts are present, rsyncs the directory given by getOption("spades.modulePath") to the same absolute path on each remote host after .setup_remote_machine() completes. Issues a warning and skips if the option is unset. Default FALSE.

...

Additional named arguments stored in .future_dots.rds and loaded into each worker's .GlobalEnv before sourcing global_path.

Value

An object of class "experimentFuture" (a list) containing:

procs

List of callr::r_bg process objects, one per local worker (or future objects for remote cluster workers).

log_files

Character vector of log file paths.

log_dir

Absolute path to the log directory.

queue_path

Absolute path to the queue RDS file.

cores

The cores argument as supplied.

Examples

if (FALSE) { # \dontrun{
## -- Minimal: build a tiny global.R, then run a 2 x 2 experiment ---------
tdir <- file.path(tempdir(), "experimentFuture-demo")
dir.create(tdir, showWarnings = FALSE, recursive = TRUE)
writeLines(
  'message("scenario=", .scenario, " rep=", .rep); Sys.sleep(2)',
  file.path(tdir, "global.R")
)
expt <- expand.grid(.scenario = c("A", "B"), .rep = 1:2,
                    stringsAsFactors = FALSE)

ef <- experimentFuture(
  df          = expt,
  global_path = file.path(tdir, "global.R"),
  n_workers   = 2L,
  queue_path  = file.path(tdir, "future_queue.rds"),
  log_dir     = file.path(tdir, "logs")
)

## -- Live inspection while workers run -----------------------------------
print(ef)                                         # alive/done per worker
experimentMonitor(ef)                             # pid + machine + runName
experimentMonitor(ef, stats = TRUE)               # adds CPU / RAM / state
queueRead(ef$queue_path)                          # full queue snapshot
experimentFutureList(ef)                          # cluster-wide pid list
cat(readLines(ef$log_files[[1L]]), sep = "\n")    # tail one log

awaitExperimentFuture(ef)    # blocks until both workers exit

## -- Killing workers ----------------------------------------------------

# Graceful stop: workers finish their CURRENT job, then exit.
# Any remaining PENDING jobs stay in the queue and can be resumed later
# by calling experimentFuture() again with the same queue_path.
killExperimentFuture(ef)

# Immediate stop (force): workers are killed immediately.
# Jobs that were mid-execution may remain as RUNNING in the queue; reset them with:
#   tmuxRefreshQueueStatus(ef$queue_path)   # file-based backend
# The GS backend reclaims stale RUNNING entries automatically before each new claim.
killExperimentFuture(ef, force = TRUE)
tmuxRefreshQueueStatus(ef$queue_path)   # clean up stale RUNNING entries

# Cluster-wide kill (works for `cores = c(...)` clusters too):
# sends SIGTERM to every worker on every machine, waits for exit, runs
# tmuxRefreshQueueStatus(), and pushes the demotion to the Google Sheet
# if `ss_id` was used (via the <queue_path>.ss_id sidecar).
experimentFutureList(ef, kill = TRUE)

## -- Resuming after a kill ----------------------------------------------

# Jobs left as PENDING (or INTERRUPTED with on_interrupt = "requeue") are
# automatically picked up when you call experimentFuture() again with the
# same queue_path  -- no need to re-specify df.
ef2 <- experimentFuture(
  df          = expt,         # ignored if queue_path already exists
  global_path = file.path(tdir, "global.R"),
  n_workers   = 2L,
  queue_path  = file.path(tdir, "future_queue.rds"),
  log_dir     = file.path(tdir, "logs")
)
awaitExperimentFuture(ef2)   # wait for remaining jobs to finish

queueRead(ef2$queue_path)    # full snapshot (data.table)
table(queueRead(ef2$queue_path)$status)   # all DONE

cat(readLines(ef2$log_files[[1]]), sep = "\n")   # inspect worker 1 log

## -- Remote workers (pre-setup required) -------------------------------
ef <- experimentFuture(
  df             = expt,
  global_path    = file.path(tdir, "global.R"),
  cores          = c("node01", "node02"),
  n_workers      = 2L,
  ss_id          = "YOUR_GOOGLE_SHEET_ID",
  email          = "you@example.com",
  cache_path     = "~/.cache/gargle",
  local_pat_file = "~/.github_pat"
)
killExperimentFuture(ef)     # graceful stop on remote workers too
} # }