install.packages("maestro")
maestro
0.3.0 introduces principled methods for obtaining a schedule table, pipeline status, and artifacts (i.e., return values). This is now possible because of a major refactoring of the backend to use R6 classes for pipelines and schedules. This also introduced a few breaking changes, but on the whole, not a lot is different.
If you haven’t heard of maestro, it’s a package that helps you schedule your R scripts all in a single project using tags. You can learn more about it here.
Get it from CRAN:
A schedule is now a <MaestroSchedule>
Schedules are now a special class with attributes and methods. For most users, this won’t change how they use maestro, but it’ll make it much easier to extend maestro with new features in the future. Let’s see how a typical maestro scenario plays out now:
library(maestro)
# Create a few dummy pipelines for demo purposes
create_pipeline(
"my_pipe", open = FALSE, quiet = TRUE, overwrite = TRUE
)create_pipeline(
"my_pipe2", frequency = "1 week", open = FALSE, quiet = TRUE, overwrite = TRUE
)create_pipeline(
"my_pipe3", frequency = "2 hours", open = FALSE, quiet = TRUE, overwrite = TRUE
)
<- build_schedule(quiet = TRUE)
schedule
<- run_schedule(
output
schedule,orch_frequency = "hourly"
)
── [2024-09-23 14:31:34]
Running pipelines ▶
ℹ my_pipe3
✔ my_pipe3 [8ms]
── [2024-09-23 14:31:34]
Pipeline execution completed ■ | 0.101 sec elapsed
✔ 1 success | → 2 skipped | ! 0 warnings | ✖ 0 errors | ◼ 3 total
────────────────────────────────────────────────────────────────────────────────
── Next scheduled pipelines ❯
Pipe name | Next scheduled run
• my_pipe3 | 2024-09-23 20:00:00
• my_pipe | 2024-09-24
• my_pipe2 | 2024-09-30
So far these are the same steps as before. The difference is now in how we interact with the schedule. Here, schedule
is no longer a data.frame, it’s a <MaestroSchedule>
R6 object. If we want to get the status of each of the pipelines, we can use the new get_status()
function:
get_status(schedule)
# A tibble: 3 × 10
pipe_name script_path invoked success pipeline_started pipeline_ended
<chr> <chr> <lgl> <lgl> <dttm> <dttm>
1 my_pipe ./pipelines… FALSE FALSE NA NA
2 my_pipe2 ./pipelines… FALSE FALSE NA NA
3 my_pipe3 ./pipelines… TRUE TRUE 2024-09-23 17:31:34 2024-09-23 17:31:34
# ℹ 4 more variables: errors <int>, warnings <int>, messages <int>,
# next_run <dttm>
If we just want the schedule, we can use get_schedule()
:
get_schedule(schedule)
# A tibble: 3 × 9
script_path pipe_name frequency start_time tz skip log_level
<chr> <chr> <chr> <dttm> <chr> <lgl> <chr>
1 ./pipelines/my_… my_pipe 1 day 2024-09-23 00:00:00 UTC FALSE INFO
2 ./pipelines/my_… my_pipe2 1 week 2024-09-23 00:00:00 UTC FALSE INFO
3 ./pipelines/my_… my_pipe3 2 hours 2024-09-23 00:00:00 UTC FALSE INFO
# ℹ 2 more variables: frequency_n <int>, frequency_unit <chr>
Note that we didn’t have to assign a new object when we ran run_schedule()
. The object schedule
is updated to reflect the execution of the schedule.
If any pipelines have return values, we can access those as a named list using get_artifacts()
:
get_artifacts(schedule)
named list()
Invoking a pipeline
When interactively working in a maestro project, it may sometimes be useful to manually trigger a pipeline. Now, if you’ve created a <MaestroSchedule>
object in the environment, you can do this using invoke()
. This will execute the pipeline regardless of whether it’s scheduled or not.
invoke(schedule, pipe_name = "my_pipe")
ℹ my_pipe
✔ my_pipe [6ms]
Check out the release notes for more details on what’s new in version 0.3.0. If you find any bugs or want to suggest new features and improvements, please add them here or reach out to me on LinkedIn.
Happy orchestrating!