install.packages("maestro")
I’m thrilled to introduce the biggest maestro update yet. DAGs! A DAG (directed acyclic graph) in the context of data pipelines is when you have data processing steps that connect into other steps until a final step is reached. Almost all data orchestration platforms use the concept of DAGs to increase reusability and isolation of discrete components. As of maestro 0.4.0, DAGs are now possible using maestroInputs
and maestroOutputs
tags. This post will go through the motivation and implementation of this new feature.
If you haven’t heard of maestro, it’s a package that helps you schedule your R scripts all in a single project using tags. You can learn more about it here.
Get it from CRAN:
Why DAGs?
Let’s imagine we have a data pipeline where we want to extract data, clean/transform it, train a model, and send the predictions to a database. We can take each of these steps and chain them together so that the output of ‘extract’ is automatically fed into ‘clean/transform’, and so on.
The advantage of doing this in maestro is that you get better observability and retracability along each step. As we’ll see, we can more clearly identify where errors occur and even recover intermediate results.
DAGs in maestro
In short, a DAG pipeline is created using either maestroInputs
or maestroOutputs
tags. Both are valid but usually only one is needed. Simply put, a pipeline with a tag #' @maestroInputs start_pipe
receives the output from a pipeline called start_pipe
. Alternatively, we could use #' @maestroOutputs end_pipe
to indicate that the pipeline called end_pipe
receives the input of the current pipeline.
Let’s see an example where we make model predictions on the nycflights13
data.
#' /pipelines/model_flights.R
#' @maestroFrequency daily
#' @maestroStartTime 2024-11-22 09:00:00
#' @maestroOutputs process_flights
<- function() {
extract_flights
# Imagine this is from a source where the data changes
::flights
nycflights13
}
#' @maestroOutputs train_model
<- function(.input) {
process_flights
<- .input |>
daily_flights ::mutate(date = lubridate::make_date(year, month, day)) |>
dplyr::summarise(
dplyrn_flights = dplyr::n(), .by = date
)
# A simple time series
ts(data = daily_flights$n_flights, frequency = 365)
}
#' @maestroOutputs forecast_flights
<- function(.input) {
train_model
# A simple ARIMA model (using the {forecast} package would be better)
|>
.input arima(order = c(1, 1, 1))
}
#' @maestro
<- function(.input) {
forecast_flights
# Forecast the next 7 days
<- predict(.input, n.ahead = 7)
pred_obj $pred
pred_obj }
We won’t focus much on the content of the functions. Instead, pay attention to the use of maestroOutputs
. Each function that outputs into another references the name of that function. The last function forecast_flights
just uses a generic #' @maestro
tag to indicate that it is part of the maestro project. Also note the use of the special keyword .input
. This argument must be supplied to all functions receiving an input. Use this argument to capture the data being passed each step along the pipeline.
Now we can build the schedule like always.
# /orchestrator.R
library(maestro)
<- build_schedule(quiet = TRUE) schedule
We can verify that the DAG is properly defined using the show_network()
function on our newly created schedule.
show_network(schedule)
Now we can run the schedule. For testing purposes, we’ll set run_all = TRUE
so that the pipeline runs no matter what the scheduling is.
run_schedule(
schedule,run_all = TRUE
)
── [2024-11-22 14:25:11]
Running pipelines ▶
ℹ extract_flights
✔ extract_flights [768ms]
ℹ |-process_flights
✔ |-process_flights [24ms]
ℹ |-train_model
✔ |-train_model [9ms]
ℹ |- |-forecast_flights
✔ |- |-forecast_flights [5ms]
── [2024-11-22 14:25:11]
Pipeline execution completed ■ | 0.833 sec elapsed
✔ 4 successes | → 0 skipped | ! 0 warnings | ✖ 0 errors | ◼ 4 total
────────────────────────────────────────────────────────────────────────────────
── Maestro Schedule with 4 pipelines:
• Success
We can see from the console output that the whole pipeline ran successfully. If we want to get the output from each of the steps, we can use get_artifacts()
. This returns intermediate results too, which can be helpful if you want to retrieve state after a failed run of the schedule.
<- get_artifacts(schedule)
artifacts $forecast_flights artifacts
Time Series:
Start = c(2, 1)
End = c(2, 7)
Frequency = 365
[1] 942.7232 934.9722 933.7773 933.5931 933.5647 933.5603 933.5596
Maestro can be used to create any valid DAG (e.g., branching, joining, etc.). I hope this new addition to maestro super charges your data orchestration.
Check out the release notes for more details on what’s new in version 0.4.0. If you find any bugs or want to suggest new features and improvements, please add them here or reach out to me on LinkedIn.
Happy orchestrating!