A conditional pipeline is a pipeline that executes only if a
particular condition is met. Using the @maestroRunIf tag,
you can specify a boolean R expression where TRUE executes
the pipeline and FALSE skips it. Conditional pipelines can
use the input of an upstream DAG pipeline, a resource from the
orchestrator as in run_schedule(..., resources = list()),
or any arbitrary R code so long as it returns a single TRUE/FALSE.
Simple Conditional
To make a pipeline conditional, simply use the
@maestroRunIf tag containing an R expression that evaluates
to a single boolean value. This expression can be inline with the tag or
span multiple lines.
In this simple example, the pipeline is scheduled to run daily, but
will only execute if a TRUE is randomly sampled.
#' ./pipelines/conditional1.R
#' @maestroFrequency 1 day
#' @maestroRunIf sample(c(TRUE, FALSE), size = 1)
random_execution <- function() {
message("Maybe, maybe not")
}
library(maestro)
schedule <- build_schedule(quiet = TRUE)
status <- run_schedule(
schedule,
orch_frequency = "1 day",
log_to_console = TRUE
)
── [2025-10-28 18:24:02]
Running pipelines ▶
✔ ? random_execution [10ms]
[random_execution] [INFO] [2025-10-28 18:24:02.875651]: Maybe, maybe not
✔ random_execution [32ms]
── [2025-10-28 18:24:02]
Pipeline execution completed ■ | 0.082 sec elapsed
✔ 1 success | ! 0 warnings | ✖ 0 errors | ◼ 1 total
────────────────────────────────────────────────────────────────────────────────
── Next scheduled pipelines ❯
Pipe name | Next scheduled run
• random_execution | 2025-10-30
DAG Conditionals
DAG pipelines
are several pipelines chained together such that the input from an
upstream pipeline is passed to a downstream pipeline. The return value
of the upstream pipeline is passed to the downstream pipeline via the
.input parameter. This same .input can be accessed in the
@maestroRunIf tag.
The example below executes the transform_flights and
load_flights pipelines only if the incoming value is a
dataframe and its row count is greater than 0.
#' ./pipelines/conditional2.R
#' @maestroFrequency 1 hour
extract_flights <- function() {
# Pretends to fetch data from an API
data.frame(
flight_id = 1:5,
airline_code = c("UA", "AC", "AC", "AA", "DE"),
departure_time = as.POSIXct("2025-10-14 12:00:00") + c(100, 450, 750, 1450, 1750)
)
}
#' @maestroInputs extract_flights
#' @maestroRunIf
#' is.data.frame(.input) && nrow(.input) > 0
transform_flights <- function(.input) {
proc_data <- .input
proc_data <- proc_data |>
dplyr::filter(
flight_id > 5
) |>
dplyr::mutate(
departing_from = "YHZ"
)
proc_data
}
#' @maestroInputs transform_flights
#' @maestroRunIf
#' is.data.frame(.input) && nrow(.input) > 0
load_flights <- function(.input) {
write.csv("flights.csv")
}
library(maestro)
schedule <- build_schedule(quiet = TRUE)
status <- run_schedule(
schedule,
orch_frequency = "1 hour"
)
── [2025-10-28 18:24:03]
Running pipelines ▶
✔ extract_flights [12ms]
✔ ? transform_flights [7ms]
✔ |-transform_flights [18ms]
✔ ? load_flights [7ms]
── [2025-10-28 18:24:03]
Pipeline execution completed ■ | 0.079 sec elapsed
✔ 2 successes | ! 0 warnings | ✖ 0 errors | ◼ 2 total
────────────────────────────────────────────────────────────────────────────────
── Next scheduled pipelines ❯
Pipe name | Next scheduled run
• extract_flights | 2025-10-28 19:00:00
Resource Conditionals
In maestro, a resource is an argument or variable passed from the
orchestrator to all the pipelines that are designed to make use of that
resource. It’s a way of allowing a pipeline to use a variable created
from the orchestrator context - often it’s useful for global
configuration type stuff. Conditional pipeline logic can also make use
of these resources. Let’s see a concrete example where we only want to
execute a pipeline if we get a prod = TRUE signal from the
orchestator:
#' ./pipelines/conditional3.R
#' @maestroFrequency 1 day
#' @maestroRunIf prod
process_payments <- function() {
# Some code that does important payment processing stuff
# but only in production!
message("Payments processed")
}
library(maestro)
schedule <- build_schedule(quiet = TRUE)
status <- run_schedule(
schedule,
orch_frequency = "1 day",
resources = list(
prod = TRUE
),
log_to_console = TRUE
)
── [2025-10-28 18:24:03]
Running pipelines ▶
✔ ? process_payments [8ms]
[process_payments] [INFO] [2025-10-28 18:24:03.555837]: Payments processed
✔ process_payments [17ms]
── [2025-10-28 18:24:03]
Pipeline execution completed ■ | 0.036 sec elapsed
✔ 1 success | ! 0 warnings | ✖ 0 errors | ◼ 1 total
────────────────────────────────────────────────────────────────────────────────
── Next scheduled pipelines ❯
Pipe name | Next scheduled run
• process_payments | 2025-10-30
