Given a schedule in a maestro
project, runs the pipelines that are scheduled to execute
based on the current time.
Arguments
- schedule
a table of scheduled pipelines generated from
build_schedule()
- orch_frequency
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc.
- check_datetime
datetime against which to check the running of pipelines (default is current system time in UTC)
- resources
named list of shared resources made available to pipelines as needed
- run_all
run all pipelines regardless of the schedule (default is
FALSE
) - useful for testing. Does not apply to pipes with amaestroSkip
tag.- n_show_next
show the next n scheduled pipes
- cores
number of cpu cores to run if running in parallel. If > 1,
furrr
is used and a multisession plan must be executed in the orchestrator (see details)- logging
whether or not to write the logs to a file (default =
FALSE
)- log_file
path to the log file (ignored if
logging == FALSE
)- log_file_max_bytes
numeric specifying the maximum number of bytes allowed in the log file before purging the log (within a margin of error)
- quiet
silence metrics to the console (default =
FALSE
)
Details
Pipeline schedule logic
The function run_schedule()
examines each pipeline in the schedule table and determines
whether it is scheduled to run at the current time using some simple time arithmetic. We assume
run_schedule(schedule, check_datetime = Sys.time())
, but this need not be the case.
Output
run_schedule()
returns a list with two elements: status
and artifacts
. Status is a data.frame where
each row is a pipeline and the columns are information about the pipeline status, execution time, etc. Artifacts
are any values returned from pipelines.
Pipelines with arguments (resources)
If a pipeline takes an argument that doesn't include a default value, these can be supplied
in the orchestrator via run_schedule(resources = list(arg1 = val))
. The name of the argument
used by the pipeline must match the name of the argument in the list. Currently, each named
resource must refer to a single object. In other words, you can't have two pipes using
the same argument but requiring different values.
Examples
# Runs the schedule every 1 day
run_schedule(
example_schedule,
orch_frequency = "1 day",
quiet = TRUE
)
#> $status
#> # A tibble: 14 × 10
#> pipe_name script_path invoked success pipeline_started pipeline_ended
#> <chr> <chr> <lgl> <lgl> <dttm> <dttm>
#> 1 pipe tests/test… TRUE FALSE 2024-08-28 11:24:41 2024-08-28 11:24:41
#> 2 get_mtca… tests/test… TRUE FALSE 2024-08-28 11:24:41 2024-08-28 11:24:41
#> 3 wait tests/test… FALSE NA NA NA
#> 4 add tests/test… FALSE NA NA NA
#> 5 something tests/test… FALSE NA NA NA
#> 6 something tests/test… TRUE FALSE 2024-08-28 11:24:41 2024-08-28 11:24:41
#> 7 get_mtca… tests/test… TRUE FALSE 2024-08-28 11:24:41 2024-08-28 11:24:41
#> 8 specific… tests/test… TRUE FALSE 2024-08-28 11:24:41 2024-08-28 11:24:41
#> 9 specific… tests/test… FALSE NA NA NA
#> 10 specific… tests/test… TRUE FALSE 2024-08-28 11:24:41 2024-08-28 11:24:41
#> 11 specific… tests/test… FALSE NA NA NA
#> 12 specific… tests/test… FALSE NA NA NA
#> 13 specific… tests/test… FALSE NA NA NA
#> 14 specific… tests/test… FALSE NA NA NA
#> # ℹ 4 more variables: errors <int>, warnings <int>, messages <int>,
#> # next_run <dttm>
#>
#> $artifacts
#> named list()
#>
# Runs the schedule every 15 minutes
run_schedule(
example_schedule,
orch_frequency = "15 minutes",
quiet = TRUE
)
#> $status
#> # A tibble: 14 × 10
#> pipe_name script_path invoked success pipeline_started pipeline_ended
#> <chr> <chr> <lgl> <lgl> <dttm> <dttm>
#> 1 pipe tests/test… TRUE FALSE 2024-08-28 11:24:42 2024-08-28 11:24:42
#> 2 get_mtca… tests/test… FALSE NA NA NA
#> 3 wait tests/test… FALSE NA NA NA
#> 4 add tests/test… FALSE NA NA NA
#> 5 something tests/test… FALSE NA NA NA
#> 6 something tests/test… FALSE NA NA NA
#> 7 get_mtca… tests/test… FALSE NA NA NA
#> 8 specific… tests/test… FALSE NA NA NA
#> 9 specific… tests/test… FALSE NA NA NA
#> 10 specific… tests/test… FALSE NA NA NA
#> 11 specific… tests/test… FALSE NA NA NA
#> 12 specific… tests/test… FALSE NA NA NA
#> 13 specific… tests/test… FALSE NA NA NA
#> 14 specific… tests/test… FALSE NA NA NA
#> # ℹ 4 more variables: errors <int>, warnings <int>, messages <int>,
#> # next_run <dttm>
#>
#> $artifacts
#> named list()
#>