Skip to contents

Hourly Pipeline

This pipeline is an example of a standard extract transform load (ETL) workflow. The pipeline is scheduled to run every 3 hours starting on 2024-04-25 at 05:45:00. The goal of the pipeline is to perform the following:

  • access online hosted CSV file
  • perform lite data wrangling
  • write file to local storage in parquet format

This example is setup as a simple set of tasks creating objects that are used in the next series of tasks. All components of the pipeline are within the pipeline_wildfire_hourly function, which has no parameters.

#' pipeline_wildfire_hourly maestro pipeline
#'
#' @maestroFrequency 3 hour
#' @maestroStartTime 2024-04-25 05:45:00
#' @maestroTz America/Halifax


pipeline_wildfire_hourly <- function() {
  
  # load libraries
  library(dplyr)
  library(readr)
  library(sf)
  library(sfarrow)

  # Access active wildfire data from hosted csv
  df <- readr::read_csv("https://cwfis.cfs.nrcan.gc.ca/downloads/activefires/activefires.csv")
  
  # Data wrangling
  df_geom <- df |>
    dplyr::mutate(insert_datetime = Sys.time()) |>
    sf::st_as_sf(coords = c("lon", "lat"), crs = 4326)
  
  
  # Write active wildfires to file
  basename <- paste("cdn_wildfire", as.integer(Sys.time()), sep = "_")
  
  df_geom |>
    sfarrow::write_sf_dataset("~/data/wildfires",
                              format = "parquet",
                              basename_template = paste0(basename,
                                                         "-{i}.parquet"))
}

Daily Pipeline

This pipeline is an example of a standard extract transform load (ETL) workflow. The pipeline is scheduled to run every day starting on 2024-04-25 at 06:30:00. The goal of the pipeline is to perform the following:

  • submit a request to an API
  • extract data from the API
  • add insert datetime column
  • write file to local storage in parquet format

This example has a custom function that is used to access and extract the data from the API, which is piped into additional tasks. All components of the pipeline are within the pipeline_climate_daily function, which has no parameters.

#' pipeline_climate_daily maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-04-25 06:30:00
#' @maestroTz America/Halifax

pipeline_climate_daily <- function() {
  
  # load libraries
  library(dplyr)
  library(httr2)
  library(arrow)
  
  # Custom function for accessing api climate data
  get_hourly_climate_info <- function(station_id, request_limit = 24) {
  
  # Validate parameters
  stopifnot("`station_id` must be a real number" = is.numeric(station_id) && station_id > 0)
  stopifnot("`station_id` must be a length-one vector" = length(station_id) == 1)
  
  # Access climate hourly via geomet api 
  hourly_req <- httr2::request("https://api.weather.gc.ca/collections/climate-hourly/items") |> 
    httr2::req_url_query(
      lang = "en-CA",
      offset = 0,
      CLIMATE_IDENTIFIER = station_id,
      LOCAL_DATE = Sys.Date() - 1,
      limit = request_limit
    )
  
  # Perform the request
  hourly_resp <- hourly_req |> 
    httr2::req_perform()
  
  # Climate station response to data frame
  geomet_json <- hourly_resp |> 
    httr2::resp_body_json(simplifyVector = TRUE)
  
  geomet_json$features
  }
  
  # Write climate hourly to file
  basename <- paste("climate_hourly", as.integer(Sys.time()), sep = "_")

  get_hourly_climate_info(8202251) |>
    dplyr::mutate(insert_datetime = Sys.time()) |>
    arrow::write_dataset(
      "~/data/climate",
      format = "parquet",
      basename_template = basename
      )
}