Given a schedule in a maestro
project, runs the pipelines that are scheduled to execute
based on the current time.
Arguments
- schedule
object of type MaestroSchedule created using
build_schedule()
- orch_frequency
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc.
- check_datetime
datetime against which to check the running of pipelines (default is current system time in UTC)
- resources
named list of shared resources made available to pipelines as needed
- run_all
run all pipelines regardless of the schedule (default is
FALSE
) - useful for testing. Does not apply to pipes with amaestroSkip
tag.- n_show_next
show the next n scheduled pipes
- cores
number of cpu cores to run if running in parallel. If > 1,
furrr
is used and a multisession plan must be executed in the orchestrator (see details)- logging
whether or not to write the logs to a file (default =
FALSE
)- log_file
path to the log file (ignored if
logging == FALSE
)- log_file_max_bytes
numeric specifying the maximum number of bytes allowed in the log file before purging the log (within a margin of error)
- quiet
silence metrics to the console (default =
FALSE
)
Details
Pipeline schedule logic
The function run_schedule()
examines each pipeline in the schedule table and determines
whether it is scheduled to run at the current time using some simple time arithmetic. We assume
run_schedule(schedule, check_datetime = Sys.time())
, but this need not be the case.
Output
run_schedule()
returns the same MaestroSchedule object with modified attributes. Use get_status()
to examine the status of each pipeline and use get_artifacts()
to get any return values from the
pipelines as a list.
Pipelines with arguments (resources)
If a pipeline takes an argument that doesn't include a default value, these can be supplied
in the orchestrator via run_schedule(resources = list(arg1 = val))
. The name of the argument
used by the pipeline must match the name of the argument in the list. Currently, each named
resource must refer to a single object. In other words, you can't have two pipes using
the same argument but requiring different values.
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
# Runs the schedule every 1 day
run_schedule(
schedule,
orch_frequency = "1 day",
quiet = TRUE
)
# Runs the schedule every 15 minutes
run_schedule(
schedule,
orch_frequency = "15 minutes",
quiet = TRUE
)
}