Hello maestro

A new package for orchestrating data pipelines in R

R
data engineering
data pipelines
orchestration
maestro
packages
deployment
release
Author

Will Hipson

Published

June 19, 2024

Modified

August 26, 2024

I’m thrilled to announce the release of maestro, a new package for developing, scheduling, and monitoring data pipelines. Here, I’ll walk through what maestro does and why you’d want to use it.

install.packages("maestro")

The Motivation

Let’s start with the why. Data engineers often have to manage multiple data pipelines - dozens or maybe even hundreds of pipelines. Scheduling and monitoring these pipelines in production poses challenges, particularly as the number of pipelines increases. Maestro takes a holistic approach by managing multiple independent pipelines in a single R project.1 No need to maintain several R projects or use bulky orchestration tools.

One Orchestrator, Many Pipelines

There are two components to a maestro project: an orchestrator and a collection of pipelines. A pipeline is a function that performs a task, such as extracting data and then storing it somewhere. The goal of maestro is to manage (i.e., schedule and monitor) multiple pipelines. This is done using an orchestrator script. The orchestrator script runs maestro commands like build_schedule() and run_schedule() to execute the pipelines and monitor them.

Getting Started

library(maestro)

It is best to use maestro in an R project. After installing the package, you can create a new project using create_maestro() or in RStudio > New Project > Maestro Project. You’ll find yourself in an R project with a structure like this:

my_project_directory
├── orchestrator.R
└── pipelines
    └── my_pipe.R

Our First Pipeline

Let’s take a look at my_pipe.R that was created:

#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-06-03
#' @maestroTz UTC
#' @maestroLogLevel INFO

my_pipe <- function() {

  # Pipeline code
}

The script has been populated with R code to assign a function called my_pipe. The function has no code in the body so it won’t do anything just yet. Above the function declaration are some specially formatted code comments. These are maestro tags and they’re used to configure the scheduling and operation of the pipeline.2

The tags most important for scheduling are maestroFrequency and maestroStartTime. maestroFrequency is how often to run the pipeline. It takes a number and a time unit, like 1 day, 3 hours, 15 minutes, or 6 months. maestroStartTime is a Date (yyyy-mm-dd) or timestamp (yyyy-mm-dd HH:MM:SS) indicating when the schedule starts.

Configuring the start time is important if you have specific times you want it to run. If, for example, you want the pipeline to run once daily at 12:30, you’d use @maestroStartTime 2024-06-03 12:00:00 (note here that the date part doesn’t matter unless you schedule it in the future).

We won’t concern ourselves with the other tags for now; just know that there are more and they all have default values.

Now let’s get the pipeline to do something. In the spirit of typical data engineering tasks, we’ll create an ETL (Extract, Transform, Load) pipeline that gets data from a source, transforms it by adding a new column, and loads it into storage.

Making a Useful Pipeline

We’ll use the open API from Environment Canada called Geomet for meteorological data and we’ll use DuckDB for storage. We’ll need the httr2 and duckdb packages for extraction and storage, respectively.

#' Located in ./pipelines/my_pipe.R
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-06-03
geomet_stations <- function() {
  
  # Formulate the request  
  req <- httr2::request(
    "https://api.weather.gc.ca/collections/climate-stations/items"
  ) |> 
    httr2::req_url_query(
      limit = 1000,
      skipGeometry = TRUE
    )
  
  # Send the request and interpret the response
  resp <- req |> 
    httr2::req_perform() |> 
    httr2::resp_body_json(simplifyVector = TRUE)
  
  # Get the properties element where the rectangular data is located
  stations_dat <- resp$features$properties
  
  # Clean the names
  stations_clean <- stations_dat |> 
    janitor::clean_names() |> 
    janitor::remove_empty(which = c("rows", "cols")) |> 
    dplyr::mutate(
      insert_time = lubridate::now(tzone = "UTC")
    )
  
  # Connect to a local in-memory duckdb
  conn <- DBI::dbConnect(duckdb::duckdb())
  
  # Create and write to a table
  DBI::dbWriteTable(
    conn, 
    name = "geomet_stations_transactional", 
    value = stations_clean
  )
  
  # Test that it worked in the return
  res <- DBI::dbGetQuery(
    conn, 
    statement = "
    select * from geomet_stations_transactional
    order by insert_time desc
    limit 10
    "
  ) |>
    dplyr::as_tibble()
  
  DBI::dbDisconnect(conn)
  
  return(
    res
  )
}

Orchestrate It

Now that we have a single useful pipeline, let’s orchestrate it (in practice, we’d probably have more than one pipeline). We’ll set the orchestrator to run at a daily frequency (this does not actually cause it to run daily, we need something else external to the R project to actually run it). For testing purposes, we’ll then run this interactively:

# Located in ./orchestrator.R
library(maestro)

schedule <- build_schedule()

orch_result <- run_schedule(
  schedule,
  orch_frequency = "1 day"
)
 1 script successfully parsed                                                  
                                                                                
── Running pipelines                                                           
 ./pipelines/my_pipe.R geomet_stations [749ms]                                 
                                                                                
── Pipeline execution completed  | 0.761 sec elapsed                           
 1 success |  0 skipped | ! 0 warnings |  0 errors |  1 total               
────────────────────────────────────────────────────────────────────────────────
                                                                                
── Next scheduled pipelines                                                    
Pipe name | Next scheduled run                                                  
• geomet_stations | 2024-06-21                                                  

What we get back

We can see from the console output that the pipeline ran successfully. If we save it to a variable we get back a list with elements called and status and artifacts.

$status

The $status element is a data.frame where each row is a pipeline. It has information about the status and runtime of each pipeline.

orch_result$status
# A tibble: 1 × 10
  pipe_name  script_path invoked success pipeline_started    pipeline_ended     
  <chr>      <chr>       <lgl>   <lgl>   <dttm>              <dttm>             
1 geomet_st… ./pipeline… TRUE    TRUE    2024-06-19 14:50:51 2024-06-19 14:50:51
# ℹ 4 more variables: errors <int>, warnings <int>, messages <int>,
#   next_run <dttm>

$artifacts

$artifacts is where any return values from the pipelines will be. In our case, it’s the test sample of data inserted into the table:

orch_result$artifacts[[1]]
# A tibble: 10 × 33
   wmo_identifier   longitude fre_stn_operator_name                 display_code
   <chr>                <int> <chr>                                        <int>
 1 <NA>           -1114200000 <NA>                                             8
 2 <NA>           -1140700000 <NA>                                             4
 3 <NA>           -1114500000 <NA>                                             9
 4 <NA>           -1120200000 <NA>                                             4
 5 <NA>           -1105000000 <NA>                                             6
 6 71634           -653552000 Environnement et Changement climatiq…           NA
 7 <NA>            -633931040 <NA>                                            NA
 8 <NA>            -640340090 Réseau coopératif de stations climat…           NA
 9 71603           -660517000 NAV Canada                                      NA
10 71706           -630709000 NAV Canada                                      NA
# ℹ 29 more variables: eng_stn_operator_acronym <chr>,
#   fre_stn_operator_acronym <chr>, mly_first_date <chr>, hly_first_date <chr>,
#   fre_prov_name <chr>, country <chr>, climate_identifier <chr>,
#   publication_code <int>, timezone <chr>, last_date <chr>,
#   hly_last_date <chr>, has_hourly_data <chr>, elevation <chr>, stn_id <int>,
#   dly_first_date <chr>, dly_last_date <chr>, eng_stn_operator_name <chr>,
#   station_name <chr>, eng_prov_name <chr>, mly_last_date <chr>, …

Deployment

Ok, so we ran the orchestrator interactively, but this is only useful for testing. In practice, we want to deploy this on a server and have it run every day. To be clear: maestro does not do this for you - it just assumes that you are doing this and behaves accordingly. In other words, when you declare orch_frequency = "1 day" you are saying that you intend to run the orchestrator every 1 day.

The first decision to make about deployment is local vs. cloud. If you own the server it’s local3, if you rent the hardware and connect to it remotely it’s cloud. Here, we’ll run through a straightforward local deployment because it requires less configuration and won’t cost you anything.

Mac/Linux: cronR

Cron is a job scheduler for the Mac/Linux systems. You can use cronR to interface with it via R.

library(cronR)

cmd <- cron_rscript(
  "orchestrator.R", 
  workdir = getwd()
)

cron_add(
  cmd, 
  frequency = "daily",
  id = "maestro",
  ask = FALSE
)

Windows: taskscheduleR

Windows users can use taskscheduleR to schedule a job via R:

library(taskscheduleR)

taskscheduler_create(
  taskname = "maestro", 
  rscript = "orchestrator.R", 
  schedule = "DAILY",
  exec_path = getwd()
)

Final Remarks

In this post, we saw how maestro can be used to orchestrate multiple pipelines in a single R project. Maestro is in its early development, but I encourage you to try it out. We’re using it in production to orchestrate 18 production pipelines at the Halifax International Airport!

I’d be remiss not to mention a few caveats of maestro:

  1. It should only be used for light-medium scale batch jobs. If you need to do streaming and/or heavy load jobs, it’s probably not for you.
  2. Configuring the schedule for the orchestrator requires some thought. If you have several pipelines at different intervals and times you’ll want to choose a frequency that makes sense. You can use suggest_orch_frequency() to get a heuristic suggestion based on a schedule. Even then - you need to make sure you actually run the orchestrator at this frequency. Weird things happen if your orchestrator actually runs more or less frequently than you said it would.

That said, I think maestro is great for small-medium sized pipeline orchestration. If you’re looking to deploy maestro on the cloud, this blog post will help you get started in Google Compute Platform (GCP).

Notes

This post was created using R version 4.4.1 (2024-06-14) and maestro version 0.1.2.

Footnotes

  1. A popular R package for data pipelines that has been around for some time is targets. Maestro is different from targets in that the focus of maestro is to orchestrate multiple independent data pipelines whereas targets is for a single connected pipeline. Maestro would be more suited toward ETL data engineering where targets is for complex analytics pipelines.↩︎

  2. R users will recognize that maestro uses custom roxygen2 tags.↩︎

  3. Yes, this includes your personal laptop or desktop; however, it probably goes to sleep when not used. If you’re using a Mac, you can use pmset in the command line to get around this (https://www.dssw.co.uk/reference/pmset/).↩︎