Skip to contents
## Error in get(paste0(generic, ".", class), envir = get_method_env()) : 
##   object 'type_sum.accel' not found

A directed acyclic graph or DAG is a kind of network graph where nodes are connected by edges, and these connections cannot loop back or cycle. Most data orchestration tools lay out a data pipeline as a DAG where data is passed from one function to the next until it reaches the end. This allows for more module, single-purpose functions and can make it easier to identify where errors are occurring.

You can create DAG pipelines in maestro using the maestroInputs and/or maestroOutputs tags. Let’s see this in action.

We’ll create three simple pipelines. start outputs a vector, high_road takes an input and makes it all uppercase, low_road makes the input all lowercase. We use the maestroOutputs tag to indicate the names of the downstream pipelines (i.e., these pipelines use the output of the target pipeline as input) and we use the maestroInputs tag to indicate the names of pipelines that are used as input.1

Note the use of .input as a parameter for all pipelines that receive an input. It is important to have this here to enable the passing of data from inputs to outputs. It must be named .input.

#' ./pipelines/dags.R
#' @maestroOutputs high_road low_road
start <- function() {
  c('a', 'A')
}

#' @maestroInputs start
high_road <- function(.input) {
  toupper(.input)
}

#' @maestroInputs start
low_road <- function(.input) {
  tolower(.input)
}

Now we’ll create and run the schedule. Notice that the output in the console will reflect the network structure of the DAG.

# ./orchestrator.R
library(maestro)

schedule <- build_schedule(quiet = TRUE)

status <- run_schedule(
  schedule,
  run_all = TRUE
)

get_artifacts(schedule)
Error in get(paste0(generic, ".", class), envir = get_method_env()) :           
  object 'type_sum.accel' not found                                             
                                                                                
── [2025-01-07 14:39:10]                                                        
Running pipelines                                                              
 start [15ms]                                                                  
 |-high_road [29ms]                                                            
 |-low_road [8ms]                                                              
                                                                                
── [2025-01-07 14:39:10]                                                        
Pipeline execution completed  | 0.096 sec elapsed                              
 3 successes |  0 skipped | ! 0 warnings |  0 errors |  3 total             
────────────────────────────────────────────────────────────────────────────────
$start                                                                          
[1] "a" "A"                                                                     
                                                                                
$high_road                                                                      
[1] "A" "A"                                                                     
                                                                                
$low_road                                                                       
[1] "a" "a"                                                                     
                                                                                

ETL Example

A great case for using DAGs is with ETL/ELT pipelines. Each component of extract, transform, and load could be a single element in the DAG. Consider the example on the home page:

#' Example ETL pipeline
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-03-25 12:30:00
my_etl <- function() {
  
  # Pretend we're getting data from a source
  message("Get data")
  extracted <- mtcars
  
  # Transform
  message("Transforming")
  transformed <- extracted |> 
    dplyr::mutate(hp_deviation = hp - mean(hp))
  
  # Load - write to a location
  message("Writing")
  write.csv(transformed, file = paste0("transformed_mtcars_", Sys.Date(), ".csv"))
}

It’s pretty concise, so we probably wouldn’t bother breaking it apart in practice, but let’s do it for illustrative purposes (and also get rid of the messaging).

[1] TRUE
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-03-25 12:30:00
#' @maestroOutputs transform
extract <- function() {
  # Imagine this is something way more complicated, like a database call
  mtcars
}

#' @maestroOutputs load
transform <- function(.input) {
  .input |> 
    dplyr::mutate(hp_deviation = hp - mean(hp))
}

#' @maestro
load <- function(.input) {
  write.csv(.input, file = paste0("transformed_mtcars.csv"))
}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

status <- run_schedule(
  schedule,
  run_all = TRUE
)
                                                                                
── [2025-01-07 14:39:11]                                                        
Running pipelines                                                              
 extract [9ms]                                                                 
 |-transform [15ms]                                                            
   |-load [9ms]                                                                
                                                                                
── [2025-01-07 14:39:11]                                                        
Pipeline execution completed  | 0.056 sec elapsed                              
 3 successes |  0 skipped | ! 0 warnings |  0 errors |  3 total             
────────────────────────────────────────────────────────────────────────────────

When developing these pipelines, it is helpful to visualize the dependency structure. We can do this by calling show_network() on the schedule: