maestro 0.6.0

New capabilities for flagging pipelines, setting priorities, and optimizing your orchestration

R
data pipelines
orchestration
maestro
packages
release
Author

Will Hipson

Published

May 15, 2025

Modified

May 20, 2025

There are a bunch of new features to share as part of the 0.6.0 release of maestro:

  1. maestroFlags tag and accompanying get_flags() function for tagging pipelines.
  2. maestroPriority tag for determining the order in which simultaneously scheduled pipelines are executed.
  3. New get_slot_usage() function to help identify busy (or quiet) time slots in the schedule.
  4. maestroStartTime tag is more flexible to allow for HH:MM:SS formats.

If you haven’t heard of maestro, it’s a package that helps you schedule your R scripts all in a single project using tags. You can learn more about it here.

Get it from CRAN:

install.packages("maestro")

Flags

A flag is an arbitrary string that could be used to classify or label a pipeline.1 You can now add any number of flags to a pipeline using the maestroFlags tag like so:

# ./pipelines

# You could use tags to classify a pipeline as critical
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-06-03
#' @maestroFlags critical 
super_important <- function() {
  # Obv. does something important
}

# You can have as many flags as you want separated by spaces
#' @maestroFrequency hourly
#' @maestroStartTime 2025-04-05 12:30:00
#' @maestroFlags aviation api-access
airlines <- function() {
  # Accesses airlines from an API or whatever
}

Once you’ve flagged some pipelines, you can access the flags for all pipelines in the schedule as a data.frame using get_flags().

library(maestro)

schedule <- build_schedule(quiet = TRUE)

get_flags(schedule)
# A tibble: 3 × 2
  pipe_name       flag      
  <chr>           <chr>     
1 super_important critical  
2 airlines        aviation  
3 airlines        api-access

This table could be used, for example, to send statuses reports to particular groups based on the tags, or trigger warnings/errors based on the criticality of the pipelines that failed. In these cases, it’s helpful to join the table with either get_status() or get_schedule().

Priority

Sometimes you have multiple pipelines that run at the same time - say, if you have two hourly pipelines running on the same cadence. You may want to control the order in which these pipelines are executed2. The new maestroPriority tag allows you to configure the priority in which pipelines are executed:

#' @maestroFrequency 1 hour
#' @maestroStartTime 10:00:00
im_less_important <- function() {
  # some less important stuff
}

#' @maestroFrequency 1 hour
#' @maestroStartTime 10:00:00
#' @maestroPriority 1
i_go_first <- function() {
  # this needs to happen first
}

These pipelines run every hour on the 00 minute. The second pipeline has maestroPriority 1, indicating that it goes first when the orchestrator kicks off the pipelines. Pipelines without a priority always go last and pipelines with the same priority level use default ordering (alphabetical by script path name and then line number) within their own priority level.

Slot Usage

As a maestro project grows it can become increasingly difficult to know when is the best time to schedule a pipeline. You typically want to avoid scheduling a bunch of pipelines at the same time (unless they need to be executed together or at that particular time), and you don’t want a ton of empty time slots (i.e., times where the orchestrator kicks off no pipelines).

Behold, the get_slot_usage() function!

This function looks ahead to all scheduled runs of pipelines in the project and returns a data.frame indicating the pipelines that are scheduled to run on each time slot. It’s easier to understand how this works in practice.

Let’s create a bunch of pipelines first:

#' ./pipelines
#' @maestroFrequency hourly
#' @maestroStartTime 14:00:00
hourly <- function() {
  
}

#' @maestroFrequency daily
#' @maestroStartTime 14:00:00
daily <- function() {
  
}

#' @maestroFrequency 3 hours
#' @maestroStartTime 00:00:00
every_3_hours <- function() {
  
}

#' @maestroFrequency weekly
#' @maestroStartTime 2025-05-15 04:00:00
weekly <- function() {
  
}

#' @maestroFrequency daily
#' @maestroDays 4 9 16 20
some_days <- function() {
  
}

In this example we’re considering running the orchestrator every 1 hour and we want to see for each hour time slot what pipelines are scheduled to run:

schedule <- build_schedule(quiet = TRUE)

get_slot_usage(
  schedule,
  orch_frequency = "1 hour",
  slot_interval = "hour"
)
# A tibble: 24 × 3
   slot  n_runs pipe_names                      
   <chr>  <int> <chr>                           
 1 00:00      3 hourly, every_3_hours, some_days
 2 01:00      1 hourly                          
 3 02:00      1 hourly                          
 4 03:00      2 hourly, every_3_hours           
 5 04:00      2 hourly, weekly                  
 6 05:00      1 hourly                          
 7 06:00      2 hourly, every_3_hours           
 8 07:00      1 hourly                          
 9 08:00      1 hourly                          
10 09:00      2 hourly, every_3_hours           
# ℹ 14 more rows

We can see that things are fairly evenly distributed aside from the hour 00 which has 3 pipelines scheduled. There are also many times where only 1 pipeline runs, so if we have a pipeline that runs daily we’d want to schedule it at a less busy time.

We can change the slot_interval argument to any other valid unit of time to get a different picture.

get_slot_usage(
  schedule,
  orch_frequency = "1 hour",
  slot_interval = "day"
)
# A tibble: 31 × 3
   slot  n_runs pipe_names                                     
   <chr>  <int> <chr>                                          
 1 01         4 hourly, daily, every_3_hours, weekly           
 2 02         4 hourly, daily, every_3_hours, weekly           
 3 03         4 hourly, daily, every_3_hours, weekly           
 4 04         5 hourly, daily, every_3_hours, weekly, some_days
 5 05         4 hourly, daily, every_3_hours, weekly           
 6 06         4 hourly, daily, every_3_hours, weekly           
 7 07         4 hourly, daily, every_3_hours, weekly           
 8 08         4 hourly, daily, every_3_hours, weekly           
 9 09         5 hourly, daily, every_3_hours, weekly, some_days
10 10         4 hourly, daily, every_3_hours, weekly           
# ℹ 21 more rows

A few things to consider when using get_slot_usage():

  1. It looks at all future instances of when a pipeline will run not just the next unit of time. In the last example, a weekly pipeline appears to run every day but it’s just because all those days on any given month and year will involve running that pipeline.
  2. Usually you should keep orch_frequency the same as it is in your use of run_schedule(), but slot_interval could depend on what frequency a new pipeline is. In general, you should use one more frequency unit of time than your proposed pipeline. For example, if you’re planning a daily pipeline, use slot_interval = "hour" to identify what hour it should on.
  3. This function is meant to be used interactively when you’re developing a maestro project. It doesn’t serve much value running in production.

Flexible Start Time

A minor improvement was made to the maestroStartTime tag to allow the use of HH:MM:SS formatting for timestamps. This is particularly useful if you have a pipeline that runs hourly or more frequent because the choice of start date was arbitrary. It’ll assume that the pipeline start date is the current date that the schedule was built.

Conclusion

Check out the release notes for more details on what’s new in version 0.6.0. If you find any bugs or want to suggest new features and improvements, please add them here or reach out to me on LinkedIn.

Happy orchestrating!

Footnotes

  1. Perhaps a more appropriate name for this concept is tag. However, it would be confusing to have a maestro tag that is itself called maestroTags.↩︎

  2. This is not the same as having a DAG pipeline where pipelines are chained together.↩︎