
Advanced DAGs - Dynamic Fan-out and Collect
Source:vignettes/maestro-4b-directed-acyclic-graphs.Rmd
maestro-4b-directed-acyclic-graphs.RmdMore complex types of DAGs involve dynamically spawning new pipelines (dynamic fan-out) based on a list or vector and/or collecting inputs from multiple pipelines into a single pipeline.
Dynamic Fan-out
Sometimes an upstream pipeline returns a collection of values and you
want to run a downstream pipeline once per element — a
pattern called fan-out or scatter. Add @maestroMap to the
downstream pipeline to enable this. An empty @maestroMap
tag iterates over each element of the upstream return value
directly.
get_letters
|-shout[1]
|-shout[2]
|-shout[3]
#' @maestroFrequency daily
get_letters <- function() {
c("a", "b", "c")
}
#' @maestroInputs get_letters
#' @maestroMap
shout <- function(.input) {
toupper(.input)
}shout will execute three times — once for
"a", once for "b", once for "c" —
and the CLI output labels each branch with its iteration index in square
brackets.
library(maestro)
schedule <- build_schedule(quiet = TRUE)
run_schedule(schedule, run_all = TRUE)
get_artifacts(schedule)
── [2026-06-15 13:54:16]
Running pipelines ▶
✔ get_letters [24ms]
✔ |-shout[1] [29ms]
✔ |-shout[2] [25ms]
✔ |-shout[3] [10ms]
── [2026-06-15 13:54:16]
Pipeline execution completed ■ | 0.242 sec elapsed
✔ 4 successes | ! 0 warnings | ✖ 0 errors | ◼ 4 total
────────────────────────────────────────────────────────────────────────────────
── Maestro Schedule with 2 pipelines:
• Success
$get_letters
[1] "a" "b" "c"
$shout
$shout$uuP1Tj
[1] "A"
$shout$yTKKHP
[1] "B"
$shout$fjPLUt
[1] "C"
Note there is no @maestroOutputs equivalent for defining
dynamic fan-out. Here, you must use @maestroInputs combined
with @maestroMap.
Iterating over a field of a list
When the upstream pipeline returns a named list, use
@maestroMap to select which field to scatter over. The full
list remains available as .input inside each branch, so
other fields are still accessible.
#' @maestroFrequency daily
get_letters <- function() {
list(
letter = letters[1:3],
greeting = "hello"
)
}
#' @maestroInputs get_letters
#' @maestroMap .input$letter
make_message <- function(.input) {
paste(.input$greeting, toupper(.input$letter))
}make_message runs once per element of
letter, producing "hello A",
"hello B", "hello C". The
greeting field is available in every branch because the
full list is passed as .input each time.
library(maestro)
schedule <- build_schedule(quiet = TRUE)
run_schedule(schedule, run_all = TRUE)
get_artifacts(schedule)
── [2026-06-15 13:54:17]
Running pipelines ▶
✔ get_letters [10ms]
✔ |-make_message[1] [10ms]
✔ |-make_message[2] [10ms]
✔ |-make_message[3] [10ms]
── [2026-06-15 13:54:17]
Pipeline execution completed ■ | 0.099 sec elapsed
✔ 4 successes | ! 0 warnings | ✖ 0 errors | ◼ 4 total
────────────────────────────────────────────────────────────────────────────────
── Maestro Schedule with 2 pipelines:
• Success
$get_letters
$get_letters$letter
[1] "a" "b" "c"
$get_letters$greeting
[1] "hello"
$make_message
$make_message$xFnbSr
[1] "hello A"
$make_message$NfwRfN
[1] "hello B"
$make_message$yVywTt
[1] "hello C"
If the field name in @maestroMap does not exist in the
upstream return value, maestro records an informative error on the
downstream pipeline rather than silently producing zero branches.
Iterating over multiple fields simultaneously
You can supply multiple space-separated expressions to
@maestroMap to zip across several fields at once — similar
to purrr::pmap(). Each iteration receives
.input with all specified fields replaced by their i-th
element.
#' @maestroFrequency daily
get_data <- function() {
list(
letter = letters[1:3],
greeting = c("hello", "cheers", "hi")
)
}
#' @maestroInputs get_data
#' @maestroMap .input$letter .input$greeting
make_message <- function(.input) {
paste(.input$greeting, toupper(.input$letter))
}This produces "hello A", "cheers B",
"hi C" — each branch receives a distinct
(letter, greeting) pair.
All vectors must be the same length, or length 1 (in which case the scalar is recycled across all iterations). Mismatched lengths produce a pipeline error.
Fan-in (Collect)
Fan-in is the complement of fan-out: multiple upstream pipelines are
gathered into a single downstream pipeline. Wrap one or more upstream
names with collect() in @maestroInputs to
enable this.
letter_a ─┐
|-+combine
letter_b ─┘
The downstream pipeline receives a named list as .input,
where each name corresponds to an upstream pipeline and each value is
that pipeline’s return value.
#' @maestroFrequency daily
letter_a <- function() "a"
#' @maestroFrequency daily
letter_b <- function() "b"
#' @maestroInputs collect(letter_a, letter_b)
combine <- function(.input) {
paste0(.input$letter_a, .input$letter_b)
}combine fires only after both letter_a and
letter_b have succeeded. Inside combine,
.input$letter_a is "a" and
.input$letter_b is "b". Collect pipelines are
shown with a |-+ prefix in the CLI to distinguish them from
regular downstream pipelines.
library(maestro)
schedule <- build_schedule(quiet = TRUE)
run_schedule(schedule, run_all = TRUE)
get_status(schedule)[, c("pipe_name", "invoked", "success", "input_run_id", "lineage")]
── [2026-06-15 13:54:17]
Running pipelines ▶
✔ letter_a [13ms]
✔ letter_b [10ms]
✔ |-+combine [13ms]
── [2026-06-15 13:54:17]
Pipeline execution completed ■ | 0.087 sec elapsed
✔ 3 successes | ! 0 warnings | ✖ 0 errors | ◼ 3 total
────────────────────────────────────────────────────────────────────────────────
── Maestro Schedule with 3 pipelines:
• Success
# A tibble: 3 × 5
pipe_name invoked success input_run_id lineage
<chr> <lgl> <lgl> <chr> <chr>
1 letter_a TRUE TRUE NA letter_a
2 letter_b TRUE TRUE NA letter_b
3 combine TRUE TRUE BRVGSu, s1XzVQ letter_a&letter_b->combine
If any upstream pipeline fails, the collect pipeline will not fire.
The failed pipeline’s run ID is also excluded from
input_run_id in get_status().
Fan-out into Fan-in
@maestroMap and collect() compose
naturally. An upstream pipeline can fan out with
@maestroMap, and a downstream pipeline can gather all
successful iterations back together with collect(). Note
that in the dynamic fan-out to fan-in case, the downstream pipeline will
run if at least one upstream iteration has succeeded.
numbers
|-multiply[1] ─┐
|-multiply[2] ──|-+add
|-multiply[3] ─┘
#' @maestroFrequency daily
numbers <- function() 1:3
#' @maestroInputs numbers
#' @maestroMap
multiply <- function(.input) .input * 3
#' @maestroInputs collect(multiply)
add <- function(.input) {
sum(unlist(.input))
}Here multiply executes three times (once per element of
1:3), then add collects all three results and
sums them. The .input received by add is a
list of the successful iteration return values.
library(maestro)
schedule <- build_schedule(quiet = TRUE)
run_schedule(schedule, run_all = TRUE)
get_artifacts(schedule)$add
── [2026-06-15 13:54:18]
Running pipelines ▶
✔ numbers [10ms]
✔ |-multiply[1] [31ms]
✔ |-multiply[2] [10ms]
✔ |-multiply[3] [10ms]
✔ |-+add [11ms]
── [2026-06-15 13:54:18]
Pipeline execution completed ■ | 0.143 sec elapsed
✔ 5 successes | ! 0 warnings | ✖ 0 errors | ◼ 5 total
────────────────────────────────────────────────────────────────────────────────
── Maestro Schedule with 3 pipelines:
• Success
[1] 18