Skip to contents

A conditional pipeline is a pipeline that executes only if a particular condition is met. Using the @maestroRunIf tag, you can specify a boolean R expression where TRUE executes the pipeline and FALSE skips it. Conditional pipelines can use the input of an upstream DAG pipeline, a resource from the orchestrator as in run_schedule(..., resources = list()), or any arbitrary R code so long as it returns a single TRUE/FALSE.

Simple Conditional

To make a pipeline conditional, simply use the @maestroRunIf tag containing an R expression that evaluates to a single boolean value. This expression can be inline with the tag or span multiple lines.

In this simple example, the pipeline is scheduled to run daily, but will only execute if a TRUE is randomly sampled.

#' ./pipelines/conditional1.R
#' @maestroFrequency 1 day
#' @maestroRunIf sample(c(TRUE, FALSE), size = 1)
random_execution <- function() {
  message("Maybe, maybe not")
}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

status <- run_schedule(
  schedule,
  orch_frequency = "1 day",
  log_to_console = TRUE
)
                                                                                
── [2025-10-28 18:24:02]                                                        
Running pipelines                                                              
 ? random_execution [10ms]                                                     
[random_execution] [INFO] [2025-10-28 18:24:02.875651]: Maybe, maybe not        
 random_execution [32ms]                                                       
                                                                                
── [2025-10-28 18:24:02]                                                        
Pipeline execution completed  | 0.082 sec elapsed                              
 1 success | ! 0 warnings |  0 errors |  1 total                             
────────────────────────────────────────────────────────────────────────────────
                                                                                
── Next scheduled pipelines                                                    
Pipe name | Next scheduled run                                                  
• random_execution | 2025-10-30                                                 

DAG Conditionals

DAG pipelines are several pipelines chained together such that the input from an upstream pipeline is passed to a downstream pipeline. The return value of the upstream pipeline is passed to the downstream pipeline via the .input parameter. This same .input can be accessed in the @maestroRunIf tag.

The example below executes the transform_flights and load_flights pipelines only if the incoming value is a dataframe and its row count is greater than 0.

#' ./pipelines/conditional2.R
#' @maestroFrequency 1 hour
extract_flights <- function() {
  
  # Pretends to fetch data from an API
  data.frame(
    flight_id = 1:5,
    airline_code = c("UA", "AC", "AC", "AA", "DE"),
    departure_time = as.POSIXct("2025-10-14 12:00:00") + c(100, 450, 750, 1450, 1750)
  )
}

#' @maestroInputs extract_flights
#' @maestroRunIf 
#' is.data.frame(.input) && nrow(.input) > 0
transform_flights <- function(.input) {
  
  proc_data <- .input

  proc_data <- proc_data |> 
    dplyr::filter(
      flight_id > 5
    ) |> 
    dplyr::mutate(
      departing_from = "YHZ"
    )

  proc_data
}

#' @maestroInputs transform_flights
#' @maestroRunIf 
#' is.data.frame(.input) && nrow(.input) > 0
load_flights <- function(.input) {

  write.csv("flights.csv")
}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

status <- run_schedule(
  schedule,
  orch_frequency = "1 hour"
)
                                                                                
── [2025-10-28 18:24:03]                                                        
Running pipelines                                                              
 extract_flights [12ms]                                                        
 ? transform_flights [7ms]                                                     
 |-transform_flights [18ms]                                                    
 ? load_flights [7ms]                                                          
                                                                                
── [2025-10-28 18:24:03]                                                        
Pipeline execution completed  | 0.079 sec elapsed                              
 2 successes | ! 0 warnings |  0 errors |  2 total                           
────────────────────────────────────────────────────────────────────────────────
                                                                                
── Next scheduled pipelines                                                    
Pipe name | Next scheduled run                                                  
• extract_flights | 2025-10-28 19:00:00                                         

Resource Conditionals

In maestro, a resource is an argument or variable passed from the orchestrator to all the pipelines that are designed to make use of that resource. It’s a way of allowing a pipeline to use a variable created from the orchestrator context - often it’s useful for global configuration type stuff. Conditional pipeline logic can also make use of these resources. Let’s see a concrete example where we only want to execute a pipeline if we get a prod = TRUE signal from the orchestator:

#' ./pipelines/conditional3.R
#' @maestroFrequency 1 day
#' @maestroRunIf prod
process_payments <- function() {
  
  # Some code that does important payment processing stuff 
  # but only in production!

  message("Payments processed")
}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

status <- run_schedule(
  schedule,
  orch_frequency = "1 day",
  resources = list(
    prod = TRUE
  ),
  log_to_console = TRUE
)
                                                                                
── [2025-10-28 18:24:03]                                                        
Running pipelines                                                              
 ? process_payments [8ms]                                                      
[process_payments] [INFO] [2025-10-28 18:24:03.555837]: Payments processed      
 process_payments [17ms]                                                       
                                                                                
── [2025-10-28 18:24:03]                                                        
Pipeline execution completed  | 0.036 sec elapsed                              
 1 success | ! 0 warnings |  0 errors |  1 total                             
────────────────────────────────────────────────────────────────────────────────
                                                                                
── Next scheduled pipelines                                                    
Pipe name | Next scheduled run                                                  
• process_payments | 2025-10-30