maestro 0.3.0

New methods for getting schedule status and return values from pipelines

R
data pipelines
orchestration
maestro
packages
release
Author

Will Hipson

Published

September 23, 2024

Modified

September 23, 2024

maestro 0.3.0 introduces principled methods for obtaining a schedule table, pipeline status, and artifacts (i.e., return values). This is now possible because of a major refactoring of the backend to use R6 classes for pipelines and schedules. This also introduced a few breaking changes, but on the whole, not a lot is different.

If you haven’t heard of maestro, it’s a package that helps you schedule your R scripts all in a single project using tags. You can learn more about it here.

Get it from CRAN:

install.packages("maestro")

A schedule is now a <MaestroSchedule>

Schedules are now a special class with attributes and methods. For most users, this won’t change how they use maestro, but it’ll make it much easier to extend maestro with new features in the future. Let’s see how a typical maestro scenario plays out now:

library(maestro)

# Create a few dummy pipelines for demo purposes
create_pipeline(
  "my_pipe", open = FALSE, quiet = TRUE, overwrite = TRUE
  )
create_pipeline(
  "my_pipe2", frequency = "1 week", open = FALSE, quiet = TRUE, overwrite = TRUE
  )
create_pipeline(
  "my_pipe3", frequency = "2 hours", open = FALSE, quiet = TRUE, overwrite = TRUE
  )

schedule <- build_schedule(quiet = TRUE)

output <- run_schedule(
  schedule,
  orch_frequency = "hourly"
)
── [2024-09-23 14:31:34]
Running pipelines ▶ 
ℹ my_pipe3
✔ my_pipe3 [8ms]
── [2024-09-23 14:31:34]
Pipeline execution completed ■ | 0.101 sec elapsed 
✔ 1 success | → 2 skipped | ! 0 warnings | ✖ 0 errors | ◼ 3 total
────────────────────────────────────────────────────────────────────────────────
── Next scheduled pipelines ❯ 
Pipe name | Next scheduled run
• my_pipe3 | 2024-09-23 20:00:00
• my_pipe | 2024-09-24
• my_pipe2 | 2024-09-30

So far these are the same steps as before. The difference is now in how we interact with the schedule. Here, schedule is no longer a data.frame, it’s a <MaestroSchedule> R6 object. If we want to get the status of each of the pipelines, we can use the new get_status() function:

get_status(schedule)
# A tibble: 3 × 10
  pipe_name script_path  invoked success pipeline_started    pipeline_ended     
  <chr>     <chr>        <lgl>   <lgl>   <dttm>              <dttm>             
1 my_pipe   ./pipelines… FALSE   FALSE   NA                  NA                 
2 my_pipe2  ./pipelines… FALSE   FALSE   NA                  NA                 
3 my_pipe3  ./pipelines… TRUE    TRUE    2024-09-23 17:31:34 2024-09-23 17:31:34
# ℹ 4 more variables: errors <int>, warnings <int>, messages <int>,
#   next_run <dttm>

If we just want the schedule, we can use get_schedule():

get_schedule(schedule)
# A tibble: 3 × 9
  script_path      pipe_name frequency start_time          tz    skip  log_level
  <chr>            <chr>     <chr>     <dttm>              <chr> <lgl> <chr>    
1 ./pipelines/my_… my_pipe   1 day     2024-09-23 00:00:00 UTC   FALSE INFO     
2 ./pipelines/my_… my_pipe2  1 week    2024-09-23 00:00:00 UTC   FALSE INFO     
3 ./pipelines/my_… my_pipe3  2 hours   2024-09-23 00:00:00 UTC   FALSE INFO     
# ℹ 2 more variables: frequency_n <int>, frequency_unit <chr>

Note that we didn’t have to assign a new object when we ran run_schedule(). The object schedule is updated to reflect the execution of the schedule.

If any pipelines have return values, we can access those as a named list using get_artifacts():

get_artifacts(schedule)
named list()

Invoking a pipeline

When interactively working in a maestro project, it may sometimes be useful to manually trigger a pipeline. Now, if you’ve created a <MaestroSchedule> object in the environment, you can do this using invoke(). This will execute the pipeline regardless of whether it’s scheduled or not.

invoke(schedule, pipe_name = "my_pipe")
ℹ my_pipe
✔ my_pipe [6ms]

Check out the release notes for more details on what’s new in version 0.3.0. If you find any bugs or want to suggest new features and improvements, please add them here or reach out to me on LinkedIn.

Happy orchestrating!