| Title: | Orchestration of Data Pipelines |
|---|---|
| Description: | Framework for creating and orchestrating data pipelines. Organize, orchestrate, and monitor multiple pipelines in a single project. Use tags to decorate functions with scheduling parameters and configuration. |
| Authors: | Will Hipson [cre, aut, cph] (ORCID: <https://orcid.org/0000-0002-3931-2189>), Ryan Garnett [aut, ctb, cph] |
| Maintainer: | Will Hipson <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 1.1.1 |
| Built: | 2026-05-20 19:23:30 UTC |
| Source: | https://github.com/whipson/maestro |
Builds a MaestroSchedule object for use in run_schedule().
build_schedule(pipeline_dir = "./pipelines", quiet = FALSE)build_schedule(pipeline_dir = "./pipelines", quiet = FALSE)
pipeline_dir |
path to directory containing the pipeline scripts |
quiet |
silence metrics to the console (default = |
This function parses the maestro tags of functions located in pipeline_dir which is
conventionally called 'pipelines'. An orchestrator requires a MaestroSchedule
to determine which pipelines are to run and when. Each pipeline in the schedule
is a parsed function and its scheduling parameters such as its frequency.
The MaestroSchedule is mostly intended to be passed directly to run_schedule().
In other words, it is not recommended to make changes to it.
MaestroSchedule
# Creating a temporary directory for demo purposes! In practice, just # create a 'pipelines' directory at the project level. if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) build_schedule(pipeline_dir = pipeline_dir) }# Creating a temporary directory for demo purposes! In practice, just # create a 'pipelines' directory at the project level. if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) build_schedule(pipeline_dir = pipeline_dir) }
Creates a new maestro project
create_maestro(path, type = "R", overwrite = FALSE, quiet = FALSE, ...)create_maestro(path, type = "R", overwrite = FALSE, quiet = FALSE, ...)
path |
file path for the orchestrator script |
type |
file type for the orchestrator (supports R, Quarto, and RMarkdown) |
overwrite |
whether to overwrite an existing orchestrator or maestro project |
quiet |
whether to silence messages in the console (default = |
... |
unused |
invisible
# Creates a new maestro project with an R orchestrator if (interactive()) { new_proj_dir <- tempdir() create_maestro(new_proj_dir, type = "R", overwrite = TRUE) create_maestro(new_proj_dir, type = "Quarto", overwrite = TRUE) }# Creates a new maestro project with an R orchestrator if (interactive()) { new_proj_dir <- tempdir() create_maestro(new_proj_dir, type = "R", overwrite = TRUE) create_maestro(new_proj_dir, type = "Quarto", overwrite = TRUE) }
Create a new orchestrator
create_orchestrator( path, type = c("R", "Quarto", "RMarkdown"), open = interactive(), quiet = FALSE, overwrite = FALSE )create_orchestrator( path, type = c("R", "Quarto", "RMarkdown"), open = interactive(), quiet = FALSE, overwrite = FALSE )
path |
file path for the orchestrator script |
type |
file type for the orchestrator (supports R, Quarto, and RMarkdown) |
open |
whether or not to open the script upon creation |
quiet |
whether to silence messages in the console (default = |
overwrite |
whether to overwrite an existing orchestrator or maestro project |
invisible
Allows the creation of new pipelines (R scripts) and fills in the maestro tags as specified.
create_pipeline( pipe_name, pipeline_dir = "pipelines", frequency = "1 day", start_time = Sys.Date(), tz = "UTC", log_level = "INFO", quiet = FALSE, open = interactive(), overwrite = FALSE, skip = FALSE, inputs = NULL, outputs = NULL, priority = NULL )create_pipeline( pipe_name, pipeline_dir = "pipelines", frequency = "1 day", start_time = Sys.Date(), tz = "UTC", log_level = "INFO", quiet = FALSE, open = interactive(), overwrite = FALSE, skip = FALSE, inputs = NULL, outputs = NULL, priority = NULL )
pipe_name |
name of the pipeline and function |
pipeline_dir |
directory containing the pipeline scripts |
frequency |
how often the pipeline should run (e.g., 1 day, daily, 3 hours, 4 months). Fills in maestroFrequency tag |
start_time |
start time of the pipeline schedule. Fills in maestroStartTime tag |
tz |
timezone that pipeline will be scheduled in. Fills in maestroTz tag |
log_level |
log level for the pipeline (e.g., INFO, WARN, ERROR). Fills in maestroLogLevel tag |
quiet |
whether to silence messages in the console (default = |
open |
whether or not to open the script upon creation |
overwrite |
whether or not to overwrite an existing pipeline of the same name and location. |
skip |
whether to skip the pipeline when running in the orchestrator (default = |
inputs |
vector of names of pipelines that input into this pipeline (default = |
outputs |
vector of names of pipelines that receive output from this pipeline (default = |
priority |
a single positive integer corresponding to the order in which this pipeline will be invoked in the presence of other simultaneously invoked pipelines. |
invisible
if (interactive()) { pipeline_dir <- tempdir() create_pipeline( "extract_data", pipeline_dir = pipeline_dir, frequency = "1 hour", open = FALSE, quiet = TRUE, overwrite = TRUE ) create_pipeline( "new_job", pipeline_dir = pipeline_dir, frequency = "20 minutes", start_time = as.POSIXct("2024-06-21 12:20:00"), log_level = "ERROR", open = FALSE, quiet = TRUE, overwrite = TRUE ) }if (interactive()) { pipeline_dir <- tempdir() create_pipeline( "extract_data", pipeline_dir = pipeline_dir, frequency = "1 hour", open = FALSE, quiet = TRUE, overwrite = TRUE ) create_pipeline( "new_job", pipeline_dir = pipeline_dir, frequency = "20 minutes", start_time = as.POSIXct("2024-06-21 12:20:00"), log_level = "ERROR", open = FALSE, quiet = TRUE, overwrite = TRUE ) }
Artifacts are return values from pipelines. They are accessible as a named list where the names correspond to the names of the pipeline.
get_artifacts(schedule)get_artifacts(schedule)
schedule |
object of type MaestroSchedule created using |
named list
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_artifacts(schedule) # Alternatively, use the underlying R6 method schedule$get_artifacts() }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_artifacts(schedule) # Alternatively, use the underlying R6 method schedule$get_artifacts() }
Creates a long data.frame where each row is a flag for each pipeline.
get_flags(schedule)get_flags(schedule)
schedule |
object of type MaestroSchedule created using |
data.frame
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_flags(schedule) # Alternatively, use the underlying R6 method schedule$get_flags() }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_flags(schedule) # Alternatively, use the underlying R6 method schedule$get_flags() }
Returns the pipeline dependency structure as an edge list data.frame. Each row represents a directed dependency between two pipelines. The result will be empty if there are no DAG pipelines in the schedule.
get_network(schedule)get_network(schedule)
schedule |
object of type MaestroSchedule created using |
data.frame with columns from and to
show_network() which is deprecated in favour of this function.
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_network(schedule) # Alternatively, use the underlying R6 method schedule$get_network() }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_network(schedule) # Alternatively, use the underlying R6 method schedule$get_network() }
Retrieves the scheduled run times for a given schedule, with optional filtering by number of runs and datetime range.
get_run_sequence( schedule, n = NULL, min_datetime = NULL, max_datetime = NULL, include_only_primary = FALSE, include_skipped = TRUE )get_run_sequence( schedule, n = NULL, min_datetime = NULL, max_datetime = NULL, include_only_primary = FALSE, include_skipped = TRUE )
schedule |
object of type MaestroSchedule created using |
n |
Optional positive integer. If specified, returns only the first |
min_datetime |
Optional minimum datetime filter. Can be a |
max_datetime |
Optional maximum datetime filter. Can be a |
include_only_primary |
only primary pipelines are included (this are pipelines that are scheduled and not downstream nodes in a DAG) |
include_skipped |
whether to include pipelines tagged with |
A data.frame of scheduled run times with columns pipe_name, scheduled_time,
and is_primary.
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_run_sequence(schedule) # Alternatively, use the underlying R6 method schedule$get_run_sequence() }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_run_sequence(schedule) # Alternatively, use the underlying R6 method schedule$get_run_sequence() }
A schedule is represented as a table where each row is a pipeline and the columns contain scheduling parameters such as the frequency and start time.
get_schedule(schedule)get_schedule(schedule)
schedule |
object of type MaestroSchedule created using |
The schedule table is used internally in a MaestroSchedule object but can be accessed using this function or accessing the R6 method of the MaestroSchedule object.
data.frame
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_schedule(schedule) # Alternatively, use the underlying R6 method schedule$get_schedule() }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_schedule(schedule) # Alternatively, use the underlying R6 method schedule$get_schedule() }
Get the number of pipelines scheduled to run for each time slot at a particular slot interval. Time slots are times that the orchestrator runs and the slot interval determines the level of granularity to consider.
get_slot_usage(schedule, orch_frequency, slot_interval = "hour")get_slot_usage(schedule, orch_frequency, slot_interval = "hour")
schedule |
object of type MaestroSchedule created using |
orch_frequency |
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc. |
slot_interval |
a time unit indicating the interval of time to consider between slots (e.g., 'hour', 'day') |
This function is particularly useful when you have multiple pipelines in a project and you want to see what recurring time intervals may be available or underused for new pipelines.
Note that this function is intended for use in an interactive session while developing a maestro project. It is not intended for use in the orchestrator.
As an example, consider we have four pipelines running at various frequencies
and the orchestrator running every hour. Then let's say there's to be a new
pipeline that runs every day. One might ask 'what hour should I schedule this new
pipeline to run on?'. By using get_slot_usage(schedule, orch_frequency = '1 hour', slot_interval = 'hour')
on the existing schedule, you could identify for each hour how many pipelines
are already scheduled to run and choose the ones with the lowest usage.
data.frame
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_slot_usage( schedule, orch_frequency = "1 hour", slot_interval = "hour" ) }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_slot_usage( schedule, orch_frequency = "1 hour", slot_interval = "hour" ) }
A status data.frame contains the names and locations of the pipelines as well as information around whether they were invoked, the status (error, warning, etc.), and the run time.
get_status(schedule)get_status(schedule)
schedule |
object of type MaestroSchedule created using |
data.frame
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_status(schedule) # Alternatively, use the underlying R6 method schedule$get_status() }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_status(schedule) # Alternatively, use the underlying R6 method schedule$get_status() }
Instantly run a single pipeline from the schedule. This is useful for testing purposes or if you want to just run something one-off.
invoke( schedule, pipe_name, resources = list(), ..., quiet = TRUE, log_to_console = FALSE )invoke( schedule, pipe_name, resources = list(), ..., quiet = TRUE, log_to_console = FALSE )
schedule |
object of type MaestroSchedule created using |
pipe_name |
name of a single pipe name from the schedule |
resources |
named list of shared resources made available to pipelines as needed |
... |
other arguments passed to |
quiet |
silence metrics to the console (default = |
log_to_console |
whether or not to include pipeline messages, warnings, errors to the console (default = |
Scheduling parameters such as the frequency, start time, and specifiers are ignored.
The pipeline will be run even if maestroSkip is present. If the pipeline is a DAG
pipeline, invoke will attempt to execute the full DAG.
invisible
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) invoke(schedule, "my_new_pipeline") }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) invoke(schedule, "my_new_pipeline") }
Gets the latest schedule build errors following use of build_schedule(). If
the build succeeded or build_schedule() has not been run it will be NULL.
last_build_errors()last_build_errors()
error messages
last_build_errors()last_build_errors()
Gets the latest pipeline errors following use of run_schedule(). If
the all runs succeeded or run_schedule() has not been run it will be NULL.
last_run_errors()last_run_errors()
error messages
last_run_errors()last_run_errors()
Gets the latest pipeline messages following use of run_schedule(). If
there are no messages or run_schedule() has not been run it will be NULL.
last_run_messages()last_run_messages()
Note that setting maestroLogLevel to something greater than INFO will
ignore messages.
messages
last_run_messages()last_run_messages()
Gets the latest pipeline warnings following use of run_schedule(). If
there are no warnings or run_schedule() has not been run it will be NULL.
last_run_warnings()last_run_warnings()
Note that setting maestroLogLevel to something greater than WARN will
ignore warnings.
warning messages
last_run_warnings()last_run_warnings()
maestro tags are roxygen2 comments for configuring the scheduling and execution of pipelines.
maestro tags follow the format: #' @maestroTagName value
Some tags may not take a value.
maestro tags must be written above the function that is to be included as a pipeline. A typical pipeline with tags could look like this:
#' @maestroFrequency 1 hour
#' @maestroStartTime 12:30:00
#' @maestroLogLevel WARN
my_pipeline <- function() {
# Pipeline code
}
Below are descriptions of all the tags currently available in maestro along with examples.
How often to run the pipeline. This tag takes a time unit indicating how long to wait between subsequent runs of the pipeline. Acceptable values include an integer value followed by one of minute(s), hour(s), day(s), week(s), month(s), and year(s). Note that some combinations of integer + unit may be invalid. Adverbs like 'hourly', 'daily', 'weekly', etc. are also valid.
Default: 1 day
Examples:
#' @maestroFrequency 1 hour
#' @maestroFrequency daily
#' @maestroFrequency 2 weeks
#' @maestroFrequency 3 months
Timestamp, date, or time corresponding to the start time of the pipeline. This
also sets the cadence of the pipeline in some cases. For instance, if the start time
is 2025-03-18 03:00:00 and the frequency is daily, the pipeline will run at 03:00
every day. A value in the future prevents the pipeline from running until that time
has been reached.
For pipelines with a frequency lower than daily, partial anchor formats are supported to make it easier to express natural cycle points without choosing a specific date:
HH:MM:SS — time-of-day anchor for minute, hour, or day frequencies.
e.g., 08:00:00 runs every day at 8am.
Mon HH:MM:SS (weekday abbreviation + time) — anchor for week or multi-week
frequencies. Resolved to that weekday of the current ISO week.
e.g., Mon 04:00:00 with @maestroFrequency 1 week runs every Monday at 4am.
Valid abbreviations: Mon, Tue, Wed, Thu, Fri, Sat, Sun.
DD HH:MM:SS or DD (month-day + optional time) — anchor for month
frequencies. Resolved to that day of the current month.
e.g., 15 04:00:00 with @maestroFrequency 1 month runs on the 15th of every month at 4am.
Default: 2024-01-01 00:00:00
Examples:
#' @maestroStartTime 2025-02-05 12:00:00
#' @maestroStartTime 2025-01-01
#' @maestroStartTime 08:00:00
#' @maestroStartTime Mon 04:00:00
#' @maestroStartTime Wed 09:30:00
#' @maestroStartTime 15 04:00:00
#' @maestroStartTime 1
Timezone in which the maestroStartTime is to be considered. Takes any valid timezone
string that can be found in OlsonNames().
Default: UTC
Examples:
#' @maestroTz Europe/Paris
#' @maestroTz America/Halifax
#' @maestroTz US/Pacific
Minimum logging threshold for messages, warnings, and errors that come from the pipeline.
These levels correspond to those in logger:::log_levels_supported but most commonly
used are ERROR, WARN, INFO. For example, if you use WARN then any messages of lower
urgency (i.e., INFO) will be suppressed, but errors will be logged.
Default: INFO
Examples:
#' @maestroLogLevel ERROR
#' @maestroLogLevel WARN
Specific hours of the day in which to run the pipeline. This only applies for pipelines that run
at an hourly or minutely frequency. Acceptable values are integers from 0-23 separated
by spaces. If empty, pipeline runs at all hours. This tag uses the timezone specified by
maestroTz (will be UTC if empty).
Default:
Examples:
#' @maestroHours 1 4 7 10
#' @maestroHours 0 5 20
Specific days of week or days of month on which to run the pipeline. This only applies for pipelines that run at a minutely, hourly, or daily frequency. Acceptable values are either integers from 1-31 or day of week strings like Mon, Tue, Wed, etc. These two options cannot be used in combination.
Default:
Examples
#' @maestroDays 1 11 21 31
#' @maestroDays Mon Tue Wed Thu Fri
Specific months of the year on which to run the pipeline. This only applies for pipelines that do run at least monthly. Acceptable values are integers (1-12) corresponding to the month of the year (e.g., 1 = January, 2 = February, etc.).
Default:
Examples:
#' @maestroMonths 3 8 12
#' @maestroMonths 10
For a DAG style pipeline, the names of pipelines that input into the pipeline.
These names must match the function name defining the inputting pipeline. Multiple
pipelines can be used as inputs and the input value is used in the target pipeline
via the required .input parameter. Note that this tag could be redundant if the
inputting pipeline uses maestroOutputs.
Default:
Examples:
#' @maestroInputs extract verify
For a DAG style pipeline, the names of pipelines that receive the return value of
this pipeline as input. These names must match the function name defining the
outputting pipeline. Multiple pipelines can be outputted into. The return value
of the pipeline will be passed into the receiving pipeline. Note that this tag
could be redundant if pipeline to be inputted into uses maestroInputs.
Default:
Examples:
#' @maestroOutputs transform
Flags a pipeline to never be executed even if it is scheduled to run. This can
be useful when developing or testing a pipeline. This tag takes no value, instead
the presence of the tag indicates whether it is skipped. This tag is ignored when
run_schedule(..., run_all = TRUE) or when using invoke().
Default:
Examples:
#' @maestroSkip
Determines the order in which pipelines that run at the same scheduled instance
are executed. Values are positive integers from 1 to N. Order is determined in
descending order such that 1 indicates the highest priority. Pipelines with the
same priority run in the order in which build_schedule() parses the pipeline
(usually alphabetical according to file name and then line number within file).
By default, all pipelines are given equal priority.
Default:
Examples:
#' @maestroPriority 1
#' @maestroPriority 3
Arbitrary labeling tags which are then made accessible via get_flags(). A pipeline
can have multiple tags separated by spaces.
Default:
Examples:
#' @maestroFlags critical etl cloud
#' @maestroFlags aviation
Generic tag for identifying a maestro pipeline with all the defaults. Useful when you want a pipeline to be scheduled via maestro that accepts all default tag values. Only use this tag if you have no other maestro tags. The tag takes no value.
Default:
Examples:
#' @maestro
Class for a schedule of pipelines
PipelineListobject of type MaestroPipelineList
MaestroSchedule$new()Create a MaestroSchedule object
MaestroSchedule$new(Pipelines = NULL)
Pipelineslist of MaestroPipelines
MaestroSchedule
MaestroSchedule$print()Print the schedule object
MaestroSchedule$print()
MaestroSchedule$run()Run a MaestroSchedule
MaestroSchedule$run(..., quiet = FALSE, run_all = FALSE, n_show_next = 5)
...arguments passed to MaestroPipelineList$run
quietwhether or not to silence console messages
run_allrun all pipelines regardless of the schedule (default is FALSE) - useful for testing.
n_show_nextshow the next n scheduled pipes
invisible
MaestroSchedule$get_schedule()Get the schedule as a data.frame
MaestroSchedule$get_schedule()
data.frame
MaestroSchedule$get_status()Get status of the pipelines as a data.frame
MaestroSchedule$get_status()
data.frame
MaestroSchedule$get_artifacts()Get artifacts (return values) from the pipelines
MaestroSchedule$get_artifacts()
list
MaestroSchedule$get_network()Get the network structure of the pipelines as an edge list (will be empty if there are no DAG pipelines)
MaestroSchedule$get_network()
data.frame
MaestroSchedule$get_flags()Get all pipeline flags as a long data.frame
MaestroSchedule$get_flags()
data.frame
MaestroSchedule$show_network()Visualize the DAG relationships between pipelines in the schedule
MaestroSchedule$show_network()
interactive visualization
MaestroSchedule$get_run_sequence()Get full sequence of scheduled executions for all pipelines
MaestroSchedule$get_run_sequence( n = NULL, min_datetime = NULL, max_datetime = NULL, include_only_primary = FALSE, include_skipped = TRUE )
noptional sequence limit
min_datetimeoptional minimum datetime
max_datetimeoptional maximum datetime
include_only_primaryonly primary pipelines are included (this are pipelines that are scheduled and not downstream nodes in a DAG)
include_skippedwhether to include pipelines tagged with @maestroSkip
(default TRUE for backwards compatibility)
data.frame
MaestroSchedule$clone()The objects of this class are cloneable with this method.
MaestroSchedule$clone(deep = FALSE)
deepWhether to make a deep clone.
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) }
Given a schedule in a maestro project, runs the pipelines that are scheduled to execute
based on the current time.
run_schedule( schedule, orch_frequency = "1 day", check_datetime = lubridate::now(tzone = "UTC"), resources = list(), run_all = FALSE, n_show_next = 5, cores = 1, log_file_max_bytes = 1e+06, quiet = FALSE, log_to_console = FALSE, log_to_file = FALSE )run_schedule( schedule, orch_frequency = "1 day", check_datetime = lubridate::now(tzone = "UTC"), resources = list(), run_all = FALSE, n_show_next = 5, cores = 1, log_file_max_bytes = 1e+06, quiet = FALSE, log_to_console = FALSE, log_to_file = FALSE )
schedule |
object of type MaestroSchedule created using |
orch_frequency |
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc. |
check_datetime |
datetime against which to check the running of pipelines (default is current system time in UTC) |
resources |
named list of shared resources made available to pipelines as needed |
run_all |
run all pipelines regardless of the schedule (default is |
n_show_next |
show the next n scheduled pipes |
cores |
number of cpu cores to run if running in parallel. If > 1, |
log_file_max_bytes |
numeric specifying the maximum number of bytes allowed in the log file before purging the log (within a margin of error) |
quiet |
silence metrics to the console (default = |
log_to_console |
whether or not to include pipeline messages, warnings, errors to the console (default = |
log_to_file |
either a boolean to indicate whether to create and append to a |
The function run_schedule() examines each pipeline in the schedule table and determines
whether it is scheduled to run at the current time using some simple time arithmetic. We assume
run_schedule(schedule, check_datetime = Sys.time()), but this need not be the case.
run_schedule() returns the same MaestroSchedule object with modified attributes. Use get_status()
to examine the status of each pipeline and use get_artifacts() to get any return values from the
pipelines as a list.
If a pipeline takes an argument that doesn't include a default value, these can be supplied
in the orchestrator via run_schedule(resources = list(arg1 = val)). The name of the argument
used by the pipeline must match the name of the argument in the list. Currently, each named
resource must refer to a single object. In other words, you can't have two pipes using
the same argument but requiring different values.
Pipelines can be run in parallel using the cores argument. First, you must run future::plan(future::multisession)
in the orchestrator. Then, supply the desired number of cores to the cores argument. Note that
console output appears different in multicore mode.
By default, maestro suppresses pipeline messages, warnings, and errors from appearing in the console, but
messages coming from print() and other console logging packages like cli and logger are not suppressed
and will be interwoven into the output generated from run_schedule(). Messages from cat() and related functions are always suppressed
due to the nature of how those functions operate with standard output.
Users are advised to make use of R's message(), warning(), and stop() functions in their pipelines
for managing conditions. Use log_to_console = TRUE to print these to the console.
Note that log_to_console does not work when running multicore. In the case of multicore, it is recommended to use
log_to_file.
Maestro can generate a log file that is appended to each time the orchestrator is run. Use log_to_file = TRUE or log_to_file = '[path-to-file]' and
maestro will create/append to a file in the project directory.
This log file will be appended to until it exceeds the byte size defined in log_file_max_bytes argument after which
the log file is deleted.
MaestroSchedule object
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) # Runs the schedule every 1 day run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) # Runs the schedule every 15 minutes run_schedule( schedule, orch_frequency = "15 minutes", quiet = TRUE ) }if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) # Runs the schedule every 1 day run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) # Runs the schedule every 15 minutes run_schedule( schedule, orch_frequency = "15 minutes", quiet = TRUE ) }