In this article we’re going to talk about how you can write your own cron jobs as a web service using Shuttle!
Cron jobs (or “scheduled tasks”) are useful for many things. They allow you to automatically do things like:
- Automate data back-ups.
- Adding daily reminders (for example to customers who are signed up to a service you own but haven’t started using it yet).
- Creating/writing reports.
If you’re interested in the final code and want to deploy quickly, you can find the repo here. You can deploy it with two simple steps:
-
Run
cargo shuttle init --from shuttle-hq/shuttle-examples --subfolder shuttle-cron
and follow the prompt (requirescargo-shuttle
installed) -
Run
cargo shuttle deploy
. That’s it!
Getting Started
If you don’t have cargo-shuttle
installed, you can install it by running the command below (requires Rust + Cargo installed):
cargo install cargo-shuttle
To get started, you’ll want to initialise a project with cargo-shuttle
:
cargo shuttle init
You’ll want to make sure to follow the prompt and choose None
when the framework options come up. This will spawn a custom service that you can use with Shuttle. For this article, we will be using the project name shuttle-example-cron
.
Once done, we’ll want to install the following dependencies:
serde
- Allows us to de/serialize taskschrono
- Allows us to use timestampsapalis
- The task queue framework we’ll be usingsqlx
- Allows you to interact with your provisioned databaseshuttle-shared-db
- Provisions database for you from Shuttle (locally via Docker, via the runtime in deployment)tower
- Used with apalis so that the cron job can be hosted.
We can install all of these with the following shell snippet:
cargo add apalis -F cron,postgres,extensions,retry
cargo add chrono -F serde,clock
cargo add serde -F derive
cargo add shuttle-shared-db -F postgres
cargo add sqlx -F runtime-tokio-native-tls,postgres
cargo add tower
Now that everything is installed, it’s time to get coding!
Writing our first cron job
Adding a database
Before we do anything else, let’s provision a database using the Shuttle runtime. You’ll want to add the shuttle-shared-db
annotation to your main function like so and set up our Postgres connection pool:
use sqlx::{PgPool, postgres::PgPoolOptions};
pub struct MyService {
db: PgPool,
}
#[shuttle_runtime::main]
async fn shuttle_main(
#[shuttle_shared_db::Postgres] conn_string: String,
) -> Result<MyService, shuttle_runtime::Error> {
let db = PgPoolOptions::new()
.min_connections(5)
.max_connections(5)
.connect(&conn_string)
.await
.unwrap();
Ok(MyService { db })
}
As you can see, pretty simple! You would otherwise need to run a Docker command to spin this up. In production, you would also need to manually instantiate and manage your Postgres instance or rely on an IaC (infrastructure as code) tool like Terraform.
Then we need to implement a few new things:
- A struct that implements
apalis::prelude::Job
- A struct that can be added as a shared data extension to our cron service with a function that will do the actual work we want
- A function that gets called by
apalis
when work is required to be done (according to the cronjob schedule)
Let’s start with implementing our job struct. Note that when using apalis
jobs in a cronjob context, they must also implement From<chrono::DateTime<chrono::Utc>>
(essentially this means they can’t hold any other fields):
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use apalis::prelude::Job;
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
struct Reminder(DateTime<Utc>);
impl From<DateTime<Utc>> for Reminder {
fn from(t: DateTime<Utc>) -> Self {
Reminder(t)
}
}
// set up an identifier for apalis
impl Job for Reminder {
const NAME: &'static str = "reminder::DailyReminder";
}
Now we want to implement the function that will do the actual work when called by apalis
. We can set this up so that it just says “Hello world from say_hello_world()
!”:
use apalis::prelude::JobContext;
async fn say_hello_world(job: Reminder, ctx: JobContext) {
println!("Hello world from `say_hello_world()`!");
}
However, if you want to add extra variables (for example, a database connection pool) you need to add an extension layer in your service to be able to access them. Let’s have a look at what this struct might look like:
#[derive(Clone)]
struct CronjobData {
message: String
}
impl CronjobData {
fn execute(&self, item: Reminder) {
println!("{} from CronjobData::execute()!", &self.message);
}
}
You can then feed this struct back into the say_hello_world
function, which uses the execute()
function:
async fn say_hello_world(job: Reminder, ctx: JobContext) {
println!("Hello world from send_reminder()!");
// this lets you use variables stored in the CronjobData struct
let svc = ctx.data_opt::<CronjobData>().unwrap();
// this executes CronjobData::execute()
svc.execute(job);
}
The variables for the CronjobData
struct come from instantiating the struct and then adding it as a shared data extension to the Tower service that we use. This allows apalis
to be able to use the data by using .data_opt()
from the JobContext
struct.
Hooking it all up
Now we’re going to hook the structs we made previously into our main app!
To start with, we need to set up the PostgresStorage
type so that we can use Postgres for durable job queues. Without durable job queues, our jobs would disappear if our web service has any outages! We can convert this directly from the PgPool
type stored in our main struct and run the migrations for it like so:
#[shuttle_runtime::async_trait]
impl shuttle_runtime::Service for MyService {
async fn bind(
self,
_addr: std::net::SocketAddr
) -> Result<(), shuttle_runtime::Error> {
// set up Postgres-backed storage
let storage = PostgresStorage::new(self.db);
// set up storage
storage.setup().await.expect("Unable to run migrations :(");
Ok(())
}
}
The shuttle_runtime::Service
trait is what allows your custom service to be packaged into a project that the Shuttle runtime can use. It also provides a HTTP address that you can optionally bind a HTTP-bound service to (for example, a web server).
Now we can create a tower
service that holds our job that we want to run. We can use the DefaultRetryPolicy
provided to us by apalis
to be able to add automatic re-tries, as well as layering the shared data as an Extension
:
use tower::ServiceBuilder;
use apalis::layers::{DefaultRetryLayer, Extension, RetryLayer};
use apalis::prelude::job_fn;
// .. your previous code
let cron_service_ext = CronjobData {
message: "Hello world".to_string(),
};
// create a servicebuilder for the cronjob
let service = ServiceBuilder::new()
.layer(RetryLayer::new(DefaultRetryPolicy))
.layer(Extension(cron_service_ext))
.service(job_fn(say_hello_world));
Now that we’ve built the service, we need to build the worker and then finally create an apalis::Monitor
process that will carry the work out:
use apalis::prelude::timer::TokioTimer;
use apalis::cron::{Schedule, CronStream};
use apalis::prelude::{WorkerBuilder, Monitor};
// .. your previous code
let schedule = Schedule::from_str("* * * * * *").expect("Couldn't start the scheduler!");
// create a worker that uses the service created from the cronjob
let worker = WorkerBuilder::new("morning-cereal")
.with_storage(storage.clone())
.stream(
CronStream::new(schedule)
.timer(TokioTimer)
.to_stream()
)
.build(service);
// start your worker up
Monitor::new().register(worker).run().await.expect("Unable to start worker");
Here is the full code for our main application:
#[shuttle_runtime::async_trait]
impl shuttle_runtime::Service for MyService {
async fn bind(
self,
_addr: std::net::SocketAddr
) -> Result<(), shuttle_runtime::Error> {
let storage = PostgresStorage::new(self.db.clone());
// set up storage
storage.setup().await.expect("Unable to run migrations :(");
let cron_service_ext = CronjobData {
message: "Hello world".to_string()
};
// create a servicebuilder for the cronjob
let service = ServiceBuilder::new()
.layer(RetryLayer::new(DefaultRetryPolicy))
.layer(Extension(cron_service_ext))
.service(job_fn(say_hello_world));
let schedule = Schedule::from_str("* * * * * *")
.expect("Couldn't start the scheduler!");
// create a worker that uses the service created from the cronjob
let worker = WorkerBuilder::new("morning-cereal")
.with_storage(storage.clone())
.stream(CronStream::new(schedule).timer(TokioTimer).to_stream())
.build(service);
// start your worker up
Monitor::new()
.register(worker)
.run()
.await
.expect("Unable to start worker");
Ok(())
}
}
Deploying
Now that we’ve written everything, all you need to do is cargo shuttle deploy
(with the --allow-dirty
flag if working on a dirty Git branch). When the deployment is finished, you’ll get the deployment information as well as the deployment database URL string.
Extending
Now that we’ve finished our new cron service, here’s a few ideas you can use to extend it:
- Add more cron jobs! You can also abstract creating the scheduler, worker, etc into a new struct (or enum) so that you don’t need to manually instantiate everything as you may want to use the same defaults over many if not all of your jobs.
- Augment the service to use Axum (or another web framework!) instead of purely
tower
. You can find out more about this from this GitHub example.
Finishing Up
Thanks for reading! I hope this cron job template helps you.
Further reading: