Given a schedule in a maestro
project, runs the pipelines that are scheduled to execute
based on the current time.
Usage
run_schedule(
schedule,
orch_frequency = "1 day",
check_datetime = lubridate::now(tzone = "UTC"),
resources = list(),
run_all = FALSE,
n_show_next = 5,
cores = 1,
logging = lifecycle::deprecated(),
log_file = lifecycle::deprecated(),
log_file_max_bytes = 1e+06,
quiet = FALSE,
log_to_console = FALSE,
log_to_file = FALSE
)
Arguments
- schedule
object of type MaestroSchedule created using
build_schedule()
- orch_frequency
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc.
- check_datetime
datetime against which to check the running of pipelines (default is current system time in UTC)
- resources
named list of shared resources made available to pipelines as needed
- run_all
run all pipelines regardless of the schedule (default is
FALSE
) - useful for testing. Does not apply to pipes with amaestroSkip
tag.- n_show_next
show the next n scheduled pipes
- cores
number of cpu cores to run if running in parallel. If > 1,
furrr
is used and a multisession plan must be executed in the orchestrator (see details)- logging
whether or not to write the logs to a file (deprecated in 0.5.0 - use
log_to_file
and/orlog_to_console
arguments instead)- log_file
path to the log file (ignored if
log_to_file == FALSE
) (deprecated in 0.5.0 - uselog_to_file
)- log_file_max_bytes
numeric specifying the maximum number of bytes allowed in the log file before purging the log (within a margin of error)
- quiet
silence metrics to the console (default =
FALSE
). Note this does not affect messages generated from pipelines whenlog_to_console = TRUE
.- log_to_console
whether or not to include pipeline messages, warnings, errors to the console (default =
FALSE
) (see Logging & Console Output section)- log_to_file
either a boolean to indicate whether to create and append to a
maestro.log
or a character path to a specific log file. IfFALSE
orNULL
it will not log to a file.
Details
Pipeline schedule logic
The function run_schedule()
examines each pipeline in the schedule table and determines
whether it is scheduled to run at the current time using some simple time arithmetic. We assume
run_schedule(schedule, check_datetime = Sys.time())
, but this need not be the case.
Output
run_schedule()
returns the same MaestroSchedule object with modified attributes. Use get_status()
to examine the status of each pipeline and use get_artifacts()
to get any return values from the
pipelines as a list.
Pipelines with arguments (resources)
If a pipeline takes an argument that doesn't include a default value, these can be supplied
in the orchestrator via run_schedule(resources = list(arg1 = val))
. The name of the argument
used by the pipeline must match the name of the argument in the list. Currently, each named
resource must refer to a single object. In other words, you can't have two pipes using
the same argument but requiring different values.
Running in parallel
Pipelines can be run in parallel using the cores
argument. First, you must run future::plan(future::multisession)
in the orchestrator. Then, supply the desired number of cores to the cores
argument. Note that
console output appears different in multicore mode.
Logging & Console Output
By default, maestro
suppresses pipeline messages, warnings, and errors from appearing in the console, but
messages coming from print()
and other console logging packages like cli
and logger
are not suppressed
and will be interwoven into the output generated from run_schedule()
. Messages from cat()
and related functions are always suppressed
due to the nature of how those functions operate with standard output.
Users are advised to make use of R's message()
, warning()
, and stop()
functions in their pipelines
for managing conditions. Use log_to_console = TRUE
to print these to the console.
Maestro can generate a log file that is appended to each time the orchestrator is run. Use log_to_file = TRUE
or log_to_file = '[path-to-file]'
and
maestro will create/append to a file in the project directory.
This log file will be appended to until it exceeds the byte size defined in log_file_max_bytes
argument after which
the log file is deleted.
Examples
if (interactive()) {
pipeline_dir <- tempdir()
create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE)
schedule <- build_schedule(pipeline_dir = pipeline_dir)
# Runs the schedule every 1 day
run_schedule(
schedule,
orch_frequency = "1 day",
quiet = TRUE
)
# Runs the schedule every 15 minutes
run_schedule(
schedule,
orch_frequency = "15 minutes",
quiet = TRUE
)
}