Skip to contents
## Error in get(paste0(generic, ".", class), envir = get_method_env()) : 
##   object 'type_sum.accel' not found

A common task in data engineering is to automate, schedule, and monitor multiple data processing pipelines. This is called orchestration. maestro is an R package that helps orchestrate data pipelines.

A fully realized maestro project involves the following components and actions:

  1. Collection of pipelines (R functions to be orchestrated, such as batch ETL jobs)

  2. Orchestrator - an R script or Quarto doc that orchestrates the pipelines and monitors them

  3. A process external to R to schedule the orchestrator (e.g., cron, Posit Connect).

Project Setup

Create a maestro project in an existing project or a new project using create_maestro() or the New Project wizard in RStudio. This creates the orchestrator script and the folder of pipelines with one sample pipeline. Your project should look something like this:

maestro_project
├── maestro_project.Rproj
├── orchestrator.R
└── pipelines
    ├── my_pipe.R
    └── another_pipe.R

Pipelines

Pipelines are the jobs you want to automate, schedule, and monitor. For the most part, they’re regular R functions with a special sprinkling of comments.

Anatomy of a Pipeline

A pipeline is simply an R function with decorators called maestro tags. Maestro tags are special code comments used for communicating the scheduling and configuration of a pipeline to the orchestrator. Let’s take a quick look at the sample my_pipe.R:

#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-05-24
#' @maestroTz UTC
#' @maestroLogLevel INFO

my_pipe <- function() {

  # Pipeline code
}

my_pipe is a function with an empty body - so right now it won’t do anything. The comments above are interpreted by maestro as “this function is scheduled to run every day starting at 2024-05-24 (00:00:00) UTC time”.

maestroFrequency and maestroStartTime are the most important tags for scheduling. Frequency is how often you want the pipeline to run and can be formatted as a single string like hourly, daily, weekly, biweekly, etc. or with a number and a unit (e.g., 1 day, 3 hours, etc.).

Note that you don’t need to provide all these tags. A single maestro tag is enough to distinguish it as a pipeline. Pipelines missing tags will use consistent defaults (e.g., if maestroFrequency is missing the default is 1 day/daily).

In most use cases, the actual code inside of my_pipe would be to run an ETL job (extract data from a source, transform it, and load it into a file system or database). In technical terms, it’s the side effect of the code and not its return value that is important.

Here’s a more realistic, albeit impractical, example:

#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-05-24
#' @maestroTz UTC
#' @maestroLogLevel INFO

my_pipe <- function() {

  random_data <- data.frame(
    letters = sample(letters, 10),
    numbers = sample.int(10)
  )
  
  write.csv(random_data, file = tempfile())
}

Adding New Pipelines

A project with a single pipeline is ok, but in maestro is more useful when you have multiple jobs to run. You can add more pipelines to your pipelines directory manually or use create_pipeline():

create_pipeline(
  pipe_name = "another_pipeline",
  pipeline_dir = "pipelines",
  frequency = "1 hour",
  start_time = "2024-05-17 15:00:00",
  tz = "America/Halifax",
  log_level = "ERROR"
)

Orchestrator

The orchestrator is the process that schedules and monitors the pipelines.

Anatomy of the Orchestrator

The orchestrator can be an R script, Quarto/RMarkdown doc, but here we’ll use a regular R script. Here is where you’ll run maestro functions. The two main functions are build_schedule() and run_schedule().

library(maestro)

schedule <- build_schedule()

output <- run_schedule(
  schedule,
  orch_frequency = "1 hour"
)
Error in get(paste0(generic, ".", class), envir = get_method_env()) :           
  object 'type_sum.accel' not found                                             
 1 script successfully parsed                                                  
                                                                                
── [2025-01-07 14:39:08]                                                        
Running pipelines                                                              
                                                                                
── [2025-01-07 14:39:08]                                                        
Pipeline execution completed  | 0.02 sec elapsed                               
 0 successes |  1 skipped | ! 0 warnings |  0 errors |  1 total             
────────────────────────────────────────────────────────────────────────────────
                                                                                
── Next scheduled pipelines                                                    
Pipe name | Next scheduled run                                                  
• my_pipe | 2025-01-08                                                          

Building the schedule gets maestro to look through the pipelines in the pipelines folder and creates a schedule object. Then, you pass that to run_schedule() along with how often the orchestrator is supposed to run. It is important to tell maestro how often it’ll be checking the pipelines using the orch_frequency parameter. Here, we’re informing it that the orchestrator is running every 1 hour.

Importantly, it isn’t maestro’s job to actually run it this often - it’s your job to make sure it runs at that frequency (e.g., deploying it via cron or some cloud environment where code can be scheduled).1