Given a schedule in a maestro project, runs the pipelines that are scheduled to execute based on the current time.


  orch_frequency = "1 day",
  check_datetime = lubridate::now(tzone = "UTC"),
  resources = list(),
  run_all = FALSE,
  n_show_next = 5,
  cores = 1,
  logging = FALSE,
  log_file = NULL,
  log_file_max_bytes = 1e+06,
  quiet = FALSE



a table of scheduled pipelines generated from build_schedule()


of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc.


datetime against which to check the running of pipelines (default is current system time in UTC)


named list of shared resources made available to pipelines as needed


run all pipelines regardless of the schedule (default is FALSE) - useful for testing. Does not apply to pipes with a maestroSkip tag.


show the next n scheduled pipes


number of cpu cores to run if running in parallel. If > 1, furrr is used and a multisession plan must be executed in the orchestrator (see details)


whether or not to write the logs to a file (default = FALSE)


path to the log file (ignored if logging == FALSE)


numeric specifying the maximum number of bytes allowed in the log file before purging the log (within a margin of error)


silence metrics to the console (default = FALSE)


list with named elements status and artifacts


Pipeline schedule logic

The function run_schedule() examines each pipeline in the schedule table and determines whether it is scheduled to run at the current time using some simple time arithmetic. We assume run_schedule(schedule, check_datetime = Sys.time()), but this need not be the case.


run_schedule() returns a list with two elements: status and artifacts. Status is a data.frame where each row is a pipeline and the columns are information about the pipeline status, execution time, etc. Artifacts are any values returned from pipelines.

Pipelines with arguments (resources)

If a pipeline takes an argument that doesn't include a default value, these can be supplied in the orchestrator via run_schedule(resources = list(arg1 = val)). The name of the argument used by the pipeline must match the name of the argument in the list. Currently, each named resource must refer to a single object. In other words, you can't have two pipes using the same argument but requiring different values.

Running in parallel

Pipelines can be run in parallel using the cores argument. First, you must run future::plan(future::multisession) in the orchestrator. Then, supply the desired number of cores to the cores argument. Note that console output appears different in multicore mode.


# Runs the schedule every 1 day
  orch_frequency = "1 day",
  quiet = TRUE
#> $status
#> # A tibble: 14 × 10
#>    pipe_name script_path invoked success pipeline_started    pipeline_ended     
#>    <chr>     <chr>       <lgl>   <lgl>   <dttm>              <dttm>             
#>  1 pipe      tests/test… TRUE    FALSE   2024-08-28 11:24:41 2024-08-28 11:24:41
#>  2 get_mtca… tests/test… TRUE    FALSE   2024-08-28 11:24:41 2024-08-28 11:24:41
#>  3 wait      tests/test… FALSE   NA      NA                  NA                 
#>  4 add       tests/test… FALSE   NA      NA                  NA                 
#>  5 something tests/test… FALSE   NA      NA                  NA                 
#>  6 something tests/test… TRUE    FALSE   2024-08-28 11:24:41 2024-08-28 11:24:41
#>  7 get_mtca… tests/test… TRUE    FALSE   2024-08-28 11:24:41 2024-08-28 11:24:41
#>  8 specific… tests/test… TRUE    FALSE   2024-08-28 11:24:41 2024-08-28 11:24:41
#>  9 specific… tests/test… FALSE   NA      NA                  NA                 
#> 10 specific… tests/test… TRUE    FALSE   2024-08-28 11:24:41 2024-08-28 11:24:41
#> 11 specific… tests/test… FALSE   NA      NA                  NA                 
#> 12 specific… tests/test… FALSE   NA      NA                  NA                 
#> 13 specific… tests/test… FALSE   NA      NA                  NA                 
#> 14 specific… tests/test… FALSE   NA      NA                  NA                 
#> # ℹ 4 more variables: errors <int>, warnings <int>, messages <int>,
#> #   next_run <dttm>
#> $artifacts
#> named list()

# Runs the schedule every 15 minutes
  orch_frequency = "15 minutes",
  quiet = TRUE
#> $status
#> # A tibble: 14 × 10
#>    pipe_name script_path invoked success pipeline_started    pipeline_ended     
#>    <chr>     <chr>       <lgl>   <lgl>   <dttm>              <dttm>             
#>  1 pipe      tests/test… TRUE    FALSE   2024-08-28 11:24:42 2024-08-28 11:24:42
#>  2 get_mtca… tests/test… FALSE   NA      NA                  NA                 
#>  3 wait      tests/test… FALSE   NA      NA                  NA                 
#>  4 add       tests/test… FALSE   NA      NA                  NA                 
#>  5 something tests/test… FALSE   NA      NA                  NA                 
#>  6 something tests/test… FALSE   NA      NA                  NA                 
#>  7 get_mtca… tests/test… FALSE   NA      NA                  NA                 
#>  8 specific… tests/test… FALSE   NA      NA                  NA                 
#>  9 specific… tests/test… FALSE   NA      NA                  NA                 
#> 10 specific… tests/test… FALSE   NA      NA                  NA                 
#> 11 specific… tests/test… FALSE   NA      NA                  NA                 
#> 12 specific… tests/test… FALSE   NA      NA                  NA                 
#> 13 specific… tests/test… FALSE   NA      NA                  NA                 
#> 14 specific… tests/test… FALSE   NA      NA                  NA                 
#> # ℹ 4 more variables: errors <int>, warnings <int>, messages <int>,
#> #   next_run <dttm>
#> $artifacts
#> named list()