In computationally demanding analysis projects, statisticians and data
scientists asynchronously deploy long-running tasks to distributed
systems, ranging from traditional clusters to cloud services. The
crew.aws.batch
package extends the
mirai
-powered ‘crew’ package
with a worker launcher plugin for AWS
Batch. Inspiration also comes from
packages mirai
,
future
,
rrq
,
clustermq
, and
batchtools
.
Type | Source | Command |
---|---|---|
Release | CRAN | install.packages("crew.aws.batch") |
Development | GitHub | remotes::install_github("wlandau/crew.aws.batch") |
Development | R-universe | install.packages("crew.aws.batch", repos = "https://wlandau.r-universe.dev") |
Please see https://wlandau.github.io/crew.aws.batch/ for documentation, including a full function reference and usage tutorial.
crew.aws.batch
launches AWS Batch
jobs to run crew
workers. This
comes with a set of special requirements:
- Understand AWS Batch and its official documentation.
- Your job
definitions
must each have Docker-compatible
container image with R and
crew.aws.batch
installed. You may wish to inherit from an existing rocker image. - At minimum, for the launcher plugin to work, your IAM policies need permission to submit and terminate jobs. To appropriately monitor jobs, your policies also need permission to list and describe jobs. In addition, managing job definitions as described below requires permission to register, deregister, and describe job definitions. To view CloudWatch logs, you need permission to get log events.
- In the compute
environment,
the security
group
must permit all inbound and outbound TCP traffic within itself.1
The controller and the workers must run in this security group so
they can communicate within the firewalled local network.2 If
your security group ID is
sg-00000
and belongs to VPCvpc-00000
, then your inbound and outbound rules may look something like this:
client <- paws.compute::ec2()
groups <- client$describe_security_groups(GroupIds = "sg-00000")
str(groups$SecurityGroups[[1L]])
#> List of 8
#> $ Description : chr "Allow TCP traffic on ephemeral ports"
#> $ GroupName : chr "self-pointing-group"
#> $ IpPermissions :List of 1
#> ..$ :List of 7
#> .. ..$ FromPort : num 1024
#> .. ..$ IpProtocol : chr "tcp"
#> .. ..$ IpRanges : list()
#> .. ..$ Ipv6Ranges : list()
#> .. ..$ PrefixListIds : list()
#> .. ..$ ToPort : num 65535
#> .. ..$ UserIdGroupPairs:List of 1
#> .. .. ..$ :List of 7
#> .. .. .. ..$ Description : chr "Accept traffic from other jobs in group."
#> .. .. .. ..$ GroupId : chr "sg-00000"
#> .. .. .. ..$ GroupName : chr(0)
#> .. .. .. ..$ PeeringStatus : chr(0)
#> .. .. .. ..$ UserId : chr "CENSORED"
#> .. .. .. ..$ VpcId : chr(0)
#> .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#> $ OwnerId : chr "CENSORED"
#> $ GroupId : chr "sg-00000"
#> $ IpPermissionsEgress:List of 1
#> ..$ :List of 7
#> .. ..$ FromPort : num 1024
#> .. ..$ IpProtocol : chr "tcp"
#> .. ..$ IpRanges : list()
#> .. ..$ Ipv6Ranges : list()
#> .. ..$ PrefixListIds : list()
#> .. ..$ ToPort : num 65535
#> .. ..$ UserIdGroupPairs:List of 1
#> .. .. ..$ :List of 7
#> .. .. .. ..$ Description : chr "Allow traffic to other jobs in group."
#> .. .. .. ..$ GroupId : chr "sg-00000"
#> .. .. .. ..$ GroupName : chr(0)
#> .. .. .. ..$ PeeringStatus : chr(0)
#> .. .. .. ..$ UserId : chr "CENSORED"
#> .. .. .. ..$ VpcId : chr(0)
#> .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#> $ Tags : list()
#> $ VpcId : chr "vpc-00000"
Before submitting jobs, AWS Batch requires a job definition to describe
the container image and resource requirements. You can do this through
the AWS web console, the AWS command line interface (CLI), a software
development kit (SDK) like the paws
R package, or the job definition
class in crew.aws.batch
. For crew.aws.batch
, first create a job
definition object.
definition <- crew_definition_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME"
)
The job definition may or may not exist at this point. If it does not
exist, you can register with register()
, an oversimplified
limited-scope method which creates container-based job definitions with
the "awslogs"
log driver (for CloudWatch).3 Below, your container
image can be as simple as a Docker Hub identifier (like
"alpine:latest:
) or a full URI of an ECR image.4
definition$register(
image = "AWS_ACCOUNT_ID.dkr.ecr.AWS_REGION.amazonaws.com/ECR_REPOSITORY_NAME:IMAGE_TAG",
platform_capabilities = "EC2",
memory_units = "gigabytes",
memory = 8,
cpus = 2
)
#> # A tibble: 1 × 3
#> name revision arn
#> <chr> <int> <chr>
#> 1 YOUR_JOB_DEFINITION_NAME 81 arn:aws:batch:us-east-1:CENSORED:jo…
The describe()
method shows information about current and past
revisions of the job definition. Set active
to TRUE
to see just the
active revisions.
definition$describe(active = TRUE)
#> # A tibble: 2 × 16
#> name arn revision status type scheduling_priority parameters
#> <chr> <chr> <int> <chr> <chr> <dbl> <list>
#> 1 YOUR_JOB_DEFIN… arn:… 82 active cont… 3 <list [0]>
#> 2 YOUR_JOB_DEFIN… arn:… 81 active cont… 3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> # timeout <list>, node_properties <list>, tags <list>,
#> # propagate_tags <lgl>, platform_capabilities <chr>,
#> # eks_properties <list>, container_orchestration_type <chr>
Use deregister()
to deregister a revision of a job definition. If a
revision number is not supplied, then it defaults to the greatest active
revision number.
definition$deregister()
#> # A tibble: 1 × 16
#> name arn revision status type scheduling_priority parameters
#> <chr> <chr> <int> <chr> <chr> <dbl> <list>
#> 1 YOUR_JOB_DEFIN… arn:… 81 active cont… 3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> # timeout <list>, node_properties <list>, tags <list>,
#> # propagate_tags <lgl>, platform_capabilities <chr>,
#> # eks_properties <list>, container_orchestration_type <chr>
With crew.aws.batch
, your crew
controller automatically submits jobs
to AWS Batch. These jobs may fail or linger for any number of reasons,
which could impede work and increase costs. So before you use
crew_controller_aws_batch()
, please learn how to monitor and terminate
AWS Batch jobs manually.
crew_monitor_aws_batch()
defines a “monitor” to help you manually
list, inspect, and terminate jobs. You will need to supply a job
definition name and a job queue name.
monitor <- crew_monitor_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME"
)
You can submit individual AWS Batch jobs to test your computing environment.
job1 <- monitor$submit(name = "job1", command = c("echo", "hello\nworld"))
job2 <- monitor$submit(name = "job2", command = c("echo", "job\nsubmitted"))
job2
#> # A tibble: 1 × 3
#> name id arn
#> <chr> <chr> <chr>
#> 1 job2 c38d55ad-4a86-4371-9994-6ea8882f5726 arn:aws:batch:us-east-2:0…
Method status()
checks the status of an individual job.
monitor$status(id = job2$id)
#> # A tibble: 1 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job2 c38d55ad-4a86-43… arn:… runnable NA 1.70e12 NA NA
The jobs()
method gets the status of all the jobs within the job queue
and job definition you originally supplied to
crew_monitor_aws_batch()
. This may include many more jobs than the
ones you submitted during the life cycle of the current monitor
object.
monitor$jobs()
#> # A tibble: 2 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job1 653df636-ac74-43… arn:… succeeded Essen… 1.70e12 1.70e12 1.70e12
#> 2 job2 c38d55ad-4a86-43… arn:… runnable NA 1.70e12 NA NA
The job
state
can be "submitted"
, "pending"
, "runnable"
, "starting"
,
"running"
, "succeeded"
, or "failed"
. The monitor has a method for
each job state to get only the jobs with that state.
monitor$succeeded()
#> # A tibble: 1 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job1 653df636-ac74-43… arn:… succeeded NA 1.70e12 1.70e12 1.70e12
In addition, there is an active()
method for just states
"submitted"
, "pending"
, "runnable"
, "starting"
, and "running"
,
and there is an inactive()
method for just the "succeeded"
and
"failed"
states.
monitor$inactive()
#> # A tibble: 1 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dbl> <dbl> <dbl>
#> 1 job1 653df636-ac74-43… arn:… succeeded NA 1.70e12 1.70e12 1.70e12
To terminate a job, use the terminate()
method. This has the effect of
both canceling and terminating the job, although you may not see the
change right away if the job is currently "runnable"
. Manually
terminated jobs are listed as failed.
monitor$terminate(id = job2$id)
To get the CloudWatch logs of a job, use the log()
method. This method
returns a tibble
with the log messages and numeric timestamps.
log <- monitor$log(id = job1$id)
log
#> # A tibble: 2 × 3
#> message timestamp ingestion_time
#> <chr> <dbl> <dbl>
#> 1 hello 1702068378163 1702068378245
#> 2 world 1702068378163 1702068378245
If the log messages are too long to conveniently view in the tibble
,
you can print them to your screen with cat()
or writeLines()
.
writeLines(log$message)
#> hello
#> world
To start using crew.aws.batch
in earnest, first create a controller
object. Also supply the names of your job queue and job definition, as
well as any optional flags and settings you may need. If you do not
already have a job definition, the “monitor” object above can help you
create one (see above).
library(crew.aws.batch)
controller <- crew_controller_aws_batch(
name = "my_workflow", # for informative job names
workers = 16,
tasks_max = 2, # to avoid reaching wall time limits (if any exist)
seconds_launch = 600, # to allow a 10-minute startup window
seconds_idle = 60, # to release resources when they are not needed
processes = NULL, # See the "Asynchronous worker management" section below.
options_aws_batch = crew_options_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME",
cpus = 2,
gpus = 0,
# Launch workers with 4 GB memory, then 8 GB if the worker crashes,
# then 16 GB on all subsequent launches. Go back to 4 GB if the worker
# completes all its tasks before exiting.
memory = c(4, 8, 16),
memory_units = "gigabytes"
)
)
controller$start()
At this point, usage is exactly the same as basic
crew
. The push()
method submits
tasks and auto-scales AWS Batch workers
to meet demand.
controller$push(name = "do work", command = do_work())
The pop()
method retrieves available tasks.
controller$pop()
#> # A tibble: 1 × 11
#> name command result seconds seed error trace warni…¹ launc…² worker insta…³
#> <chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr> <int> <chr>
#> 1 do work … do_work… <int> 0 1.56e8 NA NA NA 79e71c… 1 7686b2…
#> # … with abbreviated variable names ¹warnings, ²launcher, ³instance
Remember to terminate the controller when you are done.
controller$terminate()
HTTP requests to submit and terminate jobs may take up to 1 or 2
seconds, and this overhead may be burdensome if there are many workers.
To run these requests asynchronously, set the processes
argument of
crew_controller_aws_batch()
to the number of local mirai
daemons you
want to process the requests. These processes will start on
controller$start()
and end on controller$terminate()
or when your
local R session ends. controller$launcher$async$errors()
shows the
most recent error messages generated on launch or termination for all
workers.
processes = NULL
disables async and makes launch/termination errors
immediate and easier to see. You may also wish to set
options(paws.log_level = 3L)
to increase the verbosity of paws
messages.
-
Charlie Gao created
mirai
andnanonext
and graciously accommodated the complicated and demanding feature requests that madecrew
and its ecosystem possible. - Thanks to Henrik Bengtsson, David Kretch, Adam Banker, and Michael Schubert for edifying conversations about cloud computing in R.
Please note that the crew
project is released with a Contributor Code
of
Conduct.
By contributing to this project, you agree to abide by its terms.
citation("crew.aws.batch")
To cite package 'crew.aws.batch' in publications use:
Landau WM (????). _crew.aws.batch: A Crew Launcher Plugin for AWS
Batch_. R package version 0.0.7,
https://github.com/wlandau/crew.aws.batch,
<https://wlandau.github.io/crew.aws.batch/>.
A BibTeX entry for LaTeX users is
@Manual{,
title = {crew.aws.batch: A Crew Launcher Plugin for AWS Batch},
author = {William Michael Landau},
note = {R package version 0.0.7,
https://github.com/wlandau/crew.aws.batch},
url = {https://wlandau.github.io/crew.aws.batch/},
}
Footnotes
-
If you already know the TCP port you will supply to
port
argument ofcrew_controller_aws_batch()
, you can restrict the port range to only use that port number. ↩ -
Please read about the risks and keep TLS encryption turned on (default:
tls = crew_tls(mode = "automatic")
). Please understand and comply with all the security policies of your organization. ↩ -
The log group supplied to
crew_monitor_aws_batch()
must be valid. The default is"/aws/batch/log"
, which may not exist if your system administrator has a custom logging policy. ↩ -
For the
crew
controller, you will definitely want an image with R andcrew
installed. For the purposes of testing the monitor,"alpine:latest"
will work. ↩