install.packages("maestro")
I’m thrilled to announce the release of maestro
, a new package for developing, scheduling, and monitoring data pipelines. Here, I’ll walk through what maestro does and why you’d want to use it.
The Motivation
Let’s start with the why. Data engineers often have to manage multiple data pipelines - dozens or maybe even hundreds of pipelines. Scheduling and monitoring these pipelines in production poses challenges, particularly as the number of pipelines increases. Maestro takes a holistic approach by managing multiple independent pipelines in a single R project.1 No need to maintain several R projects or use bulky orchestration tools.
One Orchestrator, Many Pipelines
There are two components to a maestro project: an orchestrator and a collection of pipelines. A pipeline is a function that performs a task, such as extracting data and then storing it somewhere. The goal of maestro is to manage (i.e., schedule and monitor) multiple pipelines. This is done using an orchestrator script. The orchestrator script runs maestro commands like build_schedule()
and run_schedule()
to execute the pipelines and monitor them.
Getting Started
library(maestro)
It is best to use maestro in an R project. After installing the package, you can create a new project using create_maestro()
or in RStudio > New Project > Maestro Project. You’ll find yourself in an R project with a structure like this:
my_project_directory
├── orchestrator.R
└── pipelines
└── my_pipe.R
Our First Pipeline
Let’s take a look at my_pipe.R
that was created:
#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-06-03
#' @maestroTz UTC
#' @maestroLogLevel INFO
<- function() {
my_pipe
# Pipeline code
}
The script has been populated with R code to assign a function called my_pipe. The function has no code in the body so it won’t do anything just yet. Above the function declaration are some specially formatted code comments. These are maestro tags and they’re used to configure the scheduling and operation of the pipeline.2
The tags most important for scheduling are maestroFrequency
and maestroStartTime
. maestroFrequency
is how often to run the pipeline. It takes a number and a time unit, like 1 day, 3 hours, 15 minutes, or 6 months. maestroStartTime
is a Date (yyyy-mm-dd) or timestamp (yyyy-mm-dd HH:MM:SS) indicating when the schedule starts.
Configuring the start time is important if you have specific times you want it to run. If, for example, you want the pipeline to run once daily at 12:30, you’d use @maestroStartTime 2024-06-03 12:00:00
(note here that the date part doesn’t matter unless you schedule it in the future).
We won’t concern ourselves with the other tags for now; just know that there are more and they all have default values.
Now let’s get the pipeline to do something. In the spirit of typical data engineering tasks, we’ll create an ETL (Extract, Transform, Load) pipeline that gets data from a source, transforms it by adding a new column, and loads it into storage.
Making a Useful Pipeline
We’ll use the open API from Environment Canada called Geomet for meteorological data and we’ll use DuckDB for storage. We’ll need the httr2
and duckdb
packages for extraction and storage, respectively.
#' Located in ./pipelines/my_pipe.R
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-06-03
<- function() {
geomet_stations
# Formulate the request
<- httr2::request(
req "https://api.weather.gc.ca/collections/climate-stations/items"
|>
) ::req_url_query(
httr2limit = 1000,
skipGeometry = TRUE
)
# Send the request and interpret the response
<- req |>
resp ::req_perform() |>
httr2::resp_body_json(simplifyVector = TRUE)
httr2
# Get the properties element where the rectangular data is located
<- resp$features$properties
stations_dat
# Clean the names
<- stations_dat |>
stations_clean ::clean_names() |>
janitor::remove_empty(which = c("rows", "cols")) |>
janitor::mutate(
dplyrinsert_time = lubridate::now(tzone = "UTC")
)
# Connect to a local in-memory duckdb
<- DBI::dbConnect(duckdb::duckdb())
conn
# Create and write to a table
::dbWriteTable(
DBI
conn, name = "geomet_stations_transactional",
value = stations_clean
)
# Test that it worked in the return
<- DBI::dbGetQuery(
res
conn, statement = "
select * from geomet_stations_transactional
order by insert_time desc
limit 10
"
|>
) ::as_tibble()
dplyr
::dbDisconnect(conn)
DBI
return(
res
) }
Orchestrate It
Now that we have a single useful pipeline, let’s orchestrate it (in practice, we’d probably have more than one pipeline). We’ll set the orchestrator to run at a daily frequency (this does not actually cause it to run daily, we need something else external to the R project to actually run it). For testing purposes, we’ll then run this interactively:
# Located in ./orchestrator.R
library(maestro)
<- build_schedule()
schedule
<- run_schedule(
orch_result
schedule,orch_frequency = "1 day"
)
ℹ 1 script successfully parsed ── [2024-09-23 10:29:02] Running pipelines ▶ ✔ geomet_stations [566ms] ── [2024-09-23 10:29:02] Pipeline execution completed ■ | 0.57 sec elapsed ✔ 1 success | → 0 skipped | ! 0 warnings | ✖ 0 errors | ◼ 1 total ──────────────────────────────────────────────────────────────────────────────── ── Next scheduled pipelines ❯ Pipe name | Next scheduled run • geomet_stations | 2024-09-25
Deployment
Ok, so we ran the orchestrator interactively, but this is only useful for testing. In practice, we want to deploy this on a server and have it run every day. To be clear: maestro does not do this for you - it just assumes that you are doing this and behaves accordingly. In other words, when you declare orch_frequency = "1 day"
you are saying that you intend to run the orchestrator every 1 day.
The first decision to make about deployment is local vs. cloud. If you own the server it’s local3, if you rent the hardware and connect to it remotely it’s cloud. Here, we’ll run through a straightforward local deployment because it requires less configuration and won’t cost you anything.
Mac/Linux: cronR
Cron is a job scheduler for the Mac/Linux systems. You can use cronR
to interface with it via R.
library(cronR)
<- cron_rscript(
cmd "orchestrator.R",
workdir = getwd()
)
cron_add(
cmd, frequency = "daily",
id = "maestro",
ask = FALSE
)
Windows: taskscheduleR
Windows users can use taskscheduleR
to schedule a job via R:
library(taskscheduleR)
taskscheduler_create(
taskname = "maestro",
rscript = "orchestrator.R",
schedule = "DAILY",
exec_path = getwd()
)
Final Remarks
In this post, we saw how maestro can be used to orchestrate multiple pipelines in a single R project. Maestro is in its early development, but I encourage you to try it out. We’re using it in production to orchestrate 18 production pipelines at the Halifax International Airport!
I’d be remiss not to mention a few caveats of maestro:
- It should only be used for light-medium scale batch jobs. If you need to do streaming and/or heavy load jobs, it’s probably not for you.
- Configuring the schedule for the orchestrator requires some thought. If you have several pipelines at different intervals and times you’ll want to choose a frequency that makes sense. You can use
suggest_orch_frequency()
to get a heuristic suggestion based on a schedule. Even then - you need to make sure you actually run the orchestrator at this frequency. Weird things happen if your orchestrator actually runs more or less frequently than you said it would.
That said, I think maestro is great for small-medium sized pipeline orchestration. If you’re looking to deploy maestro on the cloud, this blog post will help you get started in Google Compute Platform (GCP).
Notes
This post was created using R version 4.4.1 (2024-06-14) and maestro version 0.3.0.
Footnotes
A popular R package for data pipelines that has been around for some time is targets. Maestro is different from targets in that the focus of maestro is to orchestrate multiple independent data pipelines whereas targets is for a single connected pipeline. Maestro would be more suited toward ETL data engineering where targets is for complex analytics pipelines.↩︎
R users will recognize that maestro uses custom roxygen2 tags.↩︎
Yes, this includes your personal laptop or desktop; however, it probably goes to sleep when not used. If you’re using a Mac, you can use
pmset
in the command line to get around this (https://www.dssw.co.uk/reference/pmset/).↩︎