## Error in get(paste0(generic, ".", class), envir = get_method_env()) :
## object 'type_sum.accel' not found
A common task in data engineering is to automate, schedule, and
monitor multiple data processing pipelines. This is called
orchestration. maestro
is an R package
that helps orchestrate data pipelines.
A fully realized maestro
project involves the following
components and actions:
Collection of pipelines (R functions to be orchestrated, such as batch ETL jobs)
Orchestrator - an R script or Quarto doc that orchestrates the pipelines and monitors them
A process external to R to schedule the orchestrator (e.g., cron, Posit Connect).
Project Setup
Create a maestro
project in an existing project or a new
project using create_maestro()
or the New Project wizard in
RStudio. This creates the orchestrator script and the folder of
pipelines with one sample pipeline. Your project should look something
like this:
maestro_project
├── maestro_project.Rproj
├── orchestrator.R
└── pipelines
├── my_pipe.R
└── another_pipe.R
Pipelines
Pipelines are the jobs you want to automate, schedule, and monitor. For the most part, they’re regular R functions with a special sprinkling of comments.
Anatomy of a Pipeline
A pipeline is simply an R function with decorators called maestro
tags. Maestro tags are special code comments used for communicating the
scheduling and configuration of a pipeline to the orchestrator. Let’s
take a quick look at the sample my_pipe.R
:
#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-05-24
#' @maestroTz UTC
#' @maestroLogLevel INFO
my_pipe <- function() {
# Pipeline code
}
my_pipe
is a function with an empty body - so right now
it won’t do anything. The comments above are interpreted by
maestro
as “this function is scheduled to run every day
starting at 2024-05-24 (00:00:00) UTC time”.
maestroFrequency and maestroStartTime are the most important tags for scheduling. Frequency is how often you want the pipeline to run and can be formatted as a single string like hourly, daily, weekly, biweekly, etc. or with a number and a unit (e.g., 1 day, 3 hours, etc.).
Note that you don’t need to provide all these tags. A single maestro
tag is enough to distinguish it as a pipeline. Pipelines missing tags
will use consistent defaults (e.g., if maestroFrequency
is
missing the default is 1 day/daily).
In most use cases, the actual code inside of my_pipe
would be to run an ETL job (extract data from a source, transform it,
and load it into a file system or database). In technical terms, it’s
the side effect of the code and not its return value that is
important.
Here’s a more realistic, albeit impractical, example:
#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-05-24
#' @maestroTz UTC
#' @maestroLogLevel INFO
my_pipe <- function() {
random_data <- data.frame(
letters = sample(letters, 10),
numbers = sample.int(10)
)
write.csv(random_data, file = tempfile())
}
Adding New Pipelines
A project with a single pipeline is ok, but in maestro
is more useful when you have multiple jobs to run. You can add more
pipelines to your pipelines directory manually or use
create_pipeline()
:
create_pipeline(
pipe_name = "another_pipeline",
pipeline_dir = "pipelines",
frequency = "1 hour",
start_time = "2024-05-17 15:00:00",
tz = "America/Halifax",
log_level = "ERROR"
)
Orchestrator
The orchestrator is the process that schedules and monitors the pipelines.
Anatomy of the Orchestrator
The orchestrator can be an R script, Quarto/RMarkdown doc, but here
we’ll use a regular R script. Here is where you’ll run
maestro
functions. The two main functions are
build_schedule()
and run_schedule()
.
library(maestro)
schedule <- build_schedule()
output <- run_schedule(
schedule,
orch_frequency = "1 hour"
)
Error in get(paste0(generic, ".", class), envir = get_method_env()) : object 'type_sum.accel' not found ℹ 1 script successfully parsed ── [2025-01-07 14:39:08] Running pipelines ▶ ── [2025-01-07 14:39:08] Pipeline execution completed ■ | 0.02 sec elapsed ✔ 0 successes | → 1 skipped | ! 0 warnings | ✖ 0 errors | ◼ 1 total ──────────────────────────────────────────────────────────────────────────────── ── Next scheduled pipelines ❯ Pipe name | Next scheduled run • my_pipe | 2025-01-08
Building the schedule gets maestro
to look through the
pipelines in the pipelines folder and creates a schedule object. Then,
you pass that to run_schedule()
along with how often the
orchestrator is supposed to run. It is important to tell
maestro
how often it’ll be checking the pipelines using the
orch_frequency
parameter. Here, we’re informing it that the
orchestrator is running every 1 hour.
Importantly, it isn’t maestro
’s job to actually run it
this often - it’s your job to make sure it runs at that
frequency (e.g., deploying it via cron or some cloud environment where
code can be scheduled).1