Chain your maestro pipelines DAG style

Version 0.4.0 introduces tags for creating connected pipelines

R
data pipelines
orchestration
maestro
packages
release
Author

Will Hipson

Published

November 22, 2024

Modified

November 22, 2024

I’m thrilled to introduce the biggest maestro update yet. DAGs! A DAG (directed acyclic graph) in the context of data pipelines is when you have data processing steps that connect into other steps until a final step is reached. Almost all data orchestration platforms use the concept of DAGs to increase reusability and isolation of discrete components. As of maestro 0.4.0, DAGs are now possible using maestroInputs and maestroOutputs tags. This post will go through the motivation and implementation of this new feature.

If you haven’t heard of maestro, it’s a package that helps you schedule your R scripts all in a single project using tags. You can learn more about it here.

Get it from CRAN:

install.packages("maestro")

Why DAGs?

Let’s imagine we have a data pipeline where we want to extract data, clean/transform it, train a model, and send the predictions to a database. We can take each of these steps and chain them together so that the output of ‘extract’ is automatically fed into ‘clean/transform’, and so on.

The advantage of doing this in maestro is that you get better observability and retracability along each step. As we’ll see, we can more clearly identify where errors occur and even recover intermediate results.

DAGs in maestro

In short, a DAG pipeline is created using either maestroInputs or maestroOutputs tags. Both are valid but usually only one is needed. Simply put, a pipeline with a tag #' @maestroInputs start_pipe receives the output from a pipeline called start_pipe. Alternatively, we could use #' @maestroOutputs end_pipe to indicate that the pipeline called end_pipe receives the input of the current pipeline.

Let’s see an example where we make model predictions on the nycflights13 data.

#' /pipelines/model_flights.R
#' @maestroFrequency daily
#' @maestroStartTime 2024-11-22 09:00:00
#' @maestroOutputs process_flights
extract_flights <- function() {
  
  # Imagine this is from a source where the data changes
  nycflights13::flights
}

#' @maestroOutputs train_model
process_flights <- function(.input) {
  
  daily_flights <- .input |> 
    dplyr::mutate(date = lubridate::make_date(year, month, day)) |> 
    dplyr::summarise(
      n_flights = dplyr::n(), .by = date
    )
  
  # A simple time series
  ts(data = daily_flights$n_flights, frequency = 365)
}

#' @maestroOutputs forecast_flights
train_model <- function(.input) {
  
  # A simple ARIMA model (using the {forecast} package would be better)
  .input |> 
    arima(order = c(1, 1, 1))
}

#' @maestro
forecast_flights <- function(.input) {
  
  # Forecast the next 7 days
  pred_obj <- predict(.input, n.ahead = 7)
  pred_obj$pred
}

We won’t focus much on the content of the functions. Instead, pay attention to the use of maestroOutputs. Each function that outputs into another references the name of that function. The last function forecast_flights just uses a generic #' @maestro tag to indicate that it is part of the maestro project. Also note the use of the special keyword .input. This argument must be supplied to all functions receiving an input. Use this argument to capture the data being passed each step along the pipeline.

Now we can build the schedule like always.

# /orchestrator.R
library(maestro)

schedule <- build_schedule(quiet = TRUE)

We can verify that the DAG is properly defined using the show_network() function on our newly created schedule.

show_network(schedule)

Now we can run the schedule. For testing purposes, we’ll set run_all = TRUE so that the pipeline runs no matter what the scheduling is.

run_schedule(
  schedule,
  run_all = TRUE
)
── [2024-11-22 14:25:11]
Running pipelines ▶ 
ℹ extract_flights
✔ extract_flights [768ms]
ℹ |-process_flights
✔ |-process_flights [24ms]
ℹ   |-train_model
✔   |-train_model [9ms]
ℹ   |-  |-forecast_flights
✔   |-  |-forecast_flights [5ms]
── [2024-11-22 14:25:11]
Pipeline execution completed ■ | 0.833 sec elapsed 
✔ 4 successes | → 0 skipped | ! 0 warnings | ✖ 0 errors | ◼ 4 total
────────────────────────────────────────────────────────────────────────────────
── Maestro Schedule with 4 pipelines:  
• Success

We can see from the console output that the whole pipeline ran successfully. If we want to get the output from each of the steps, we can use get_artifacts(). This returns intermediate results too, which can be helpful if you want to retrieve state after a failed run of the schedule.

artifacts <- get_artifacts(schedule)
artifacts$forecast_flights
Time Series:
Start = c(2, 1) 
End = c(2, 7) 
Frequency = 365 
[1] 942.7232 934.9722 933.7773 933.5931 933.5647 933.5603 933.5596

Maestro can be used to create any valid DAG (e.g., branching, joining, etc.). I hope this new addition to maestro super charges your data orchestration.

Check out the release notes for more details on what’s new in version 0.4.0. If you find any bugs or want to suggest new features and improvements, please add them here or reach out to me on LinkedIn.

Happy orchestrating!