Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
I’m thrilled to introduce the biggest maestro update yet. DAGs! A DAG (directed acyclic graph) in the context of data pipelines is when you have data processing steps that connect into other steps until a final step is reached. Almost all data orchestration platforms use the concept of DAGs to increase reusability and isolation of discrete components. As of maestro 0.4.0, DAGs are now possible using maestroInputs
and maestroOutputs
tags. This post will go through the motivation and implementation of this new feature.
If you haven’t heard of maestro, it’s a package that helps you schedule your R scripts all in a single project using tags. You can learn more about it here.
Get it from CRAN:
install.packages("maestro")
Why DAGs?
Let’s imagine we have a data pipeline where we want to extract data, clean/transform it, train a model, and send the predictions to a database. We can take each of these steps and chain them together so that the output of ‘extract’ is automatically fed into ‘clean/transform’, and so on.
The advantage of doing this in maestro is that you get better observability and retracability along each step. As we’ll see, we can more clearly identify where errors occur and even recover intermediate results.
< section id="dags-in-maestro" class="level2">DAGs in maestro
In short, a DAG pipeline is created using either maestroInputs
or maestroOutputs
tags. Both are valid but usually only one is needed. Simply put, a pipeline with a tag #' @maestroInputs start_pipe
receives the output from a pipeline called start_pipe
. Alternatively, we could use #' @maestroOutputs end_pipe
to indicate that the pipeline called end_pipe
receives the input of the current pipeline.
Let’s see an example where we make model predictions on the nycflights13
data.
#' /pipelines/model_flights.R #' @maestroFrequency daily #' @maestroStartTime 2024-11-22 09:00:00 #' @maestroOutputs process_flights extract_flights <- function() { # Imagine this is from a source where the data changes nycflights13::flights } #' @maestroOutputs train_model process_flights <- function(.input) { daily_flights <- .input |> dplyr::mutate(date = lubridate::make_date(year, month, day)) |> dplyr::summarise( n_flights = dplyr::n(), .by = date ) # A simple time series ts(data = daily_flights$n_flights, frequency = 365) } #' @maestroOutputs forecast_flights train_model <- function(.input) { # A simple ARIMA model (using the {forecast} package would be better) .input |> arima(order = c(1, 1, 1)) } #' @maestro forecast_flights <- function(.input) { # Forecast the next 7 days pred_obj <- predict(.input, n.ahead = 7) pred_obj$pred }
We won’t focus much on the content of the functions. Instead, pay attention to the use of maestroOutputs
. Each function that outputs into another references the name of that function. The last function forecast_flights
just uses a generic #' @maestro
tag to indicate that it is part of the maestro project. Also note the use of the special keyword .input
. This argument must be supplied to all functions receiving an input. Use this argument to capture the data being passed each step along the pipeline.
Now we can build the schedule like always.
# /orchestrator.R library(maestro) schedule <- build_schedule(quiet = TRUE)
We can verify that the DAG is properly defined using the show_network()
function on our newly created schedule.
show_network(schedule)
Now we can run the schedule. For testing purposes, we’ll set run_all = TRUE
so that the pipeline runs no matter what the scheduling is.
run_schedule( schedule, run_all = TRUE )
── [2024-11-22 14:25:11] Running pipelines ▶
ℹ extract_flights
✔ extract_flights [768ms]
ℹ |-process_flights
✔ |-process_flights [24ms]
ℹ |-train_model
✔ |-train_model [9ms]
ℹ |- |-forecast_flights
✔ |- |-forecast_flights [5ms]
── [2024-11-22 14:25:11] Pipeline execution completed ■ | 0.833 sec elapsed
✔ 4 successes | → 0 skipped | ! 0 warnings | ✖ 0 errors | ◼ 4 total
────────────────────────────────────────────────────────────────────────────────
── Maestro Schedule with 4 pipelines:
• Success
We can see from the console output that the whole pipeline ran successfully. If we want to get the output from each of the steps, we can use get_artifacts()
. This returns intermediate results too, which can be helpful if you want to retrieve state after a failed run of the schedule.
artifacts <- get_artifacts(schedule) artifacts$forecast_flights
Time Series: Start = c(2, 1) End = c(2, 7) Frequency = 365 [1] 942.7232 934.9722 933.7773 933.5931 933.5647 933.5603 933.5596
Maestro can be used to create any valid DAG (e.g., branching, joining, etc.). I hope this new addition to maestro super charges your data orchestration.
Check out the release notes for more details on what’s new in version 0.4.0. If you find any bugs or want to suggest new features and improvements, please add them here or reach out to me on LinkedIn.
Happy orchestrating!
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.