Building a Notification Service in Rust with AWS SNS

Cover image

Notifications are a very helpful tool for notifying services. This is particularly relevant if you’re using microservice architecture. One notification mechanism we can use is AWS SNS (Simple Notification Service), which lets us create topics, subscribe to them and push messages to topics. By the end of this article, you’ll have two fully functioning web services with the following overall functionality:

  • One web service will have a single endpoint that can receive a message from AWS SNS
  • One web service will handle the job of sending messages

Interested in just deploying? You can find the repository here, complete with instructions on how to deploy.

Getting started

Pre-requisites

To get started, you will need two keys from AWS:

  • Your access key ID
  • Your secret access key

Receiving AWS SNS messages

To start with, let’s have a look at how we can receive SNS messages. We can do this with shuttle init, picking Axum as our framework. This service will have a single endpoint that takes the SNS messages and prints the message out.

To make this idiomatic, we’ll want to create a struct called SnsMessage that implements axum::FromRequest. This will allow us to use it as an extractor, rather than trying to parse the POST requests from SNS manually.

use axum::{routing::{get, post}, Router, response::{IntoResponse, Response},
    http::StatusCode, extract::{Request, FromRequest},
    Json, RequestExt
};
use serde::Deserialize;

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct SnsMessage {
    #[serde(rename = "Type")]
    kind: String,
    message_id: String,
    topic_arn: String,
    subject: String,
    message: String,
    timestamp: String,
    signature_version: String,
    signature: String,
    signing_cert_url: String,
    unsubscribe_url: String
}

#[axum::async_trait]
impl<S> FromRequest<S> for SnsMessage {
    type Rejection = Response;

    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
        let headers = req.headers();

        // if none of these headers are sent in the request
        // automatically send a BAD_REQUEST status code
        if !headers.contains_key("x-amz-sns-message-type")
            | !headers.contains_key("x-amz-message-id")
            | !headers.contains_key("x-amz-topic-arn")
            | !headers.contains_key("x-amz-subscription-arn") {
            return Err((StatusCode::BAD_REQUEST).into_response())
        }

        let Json(payload): axum::Json<SnsMessage> = req.extract()
            .await.map_err(|_| (StatusCode::BAD_REQUEST).into_response())?;

        Ok(payload)
    }
}

Note that this FromRequest<S> implementation is primarily to show the fundamentals. In production use cases, you would want to verify the signature sent from AWS to make sure it’s accurate. You can find out more about the basics of this here.

To use our new struct, you can use SnsMessage as an extractor for a handler function like this:

async fn receive_sns(
    sns_message: SnsMessage
) -> StatusCode {
    println!("{}", sns_message.message);
    StatusCode::OK
}

You can find out more about receiving POST requests from AWS SNS here. To extend this, you may want to think about verifying the signature sent by AWS here.

We’ll tie this all together by adding the handler function to our router:

use axum::routing::post;

#[shuttle_runtime::main]
async fn axum() -> shuttle_axum::ShuttleAxum {
    let router = Router::new().route("/", get(hello_world))
        .route("/sns", post(receive_sns));

    Ok(router.into())
}

Now it’s complete!

Deploying

To deploy, you can use shuttle deploy (with --allow-dirty if on a Git branch and uncommitted changes). Take note of the deployment URL! We will need this later on.

Sending AWS SNS messages

Now for the second part of our notification service: sending messages to our receiver!

Setup

We’ll get started by using shuttle init once again, picking Axum as our choice of framework. Make sure to follow the prompt until the end to finalise your project.

Once done, we can install our dependencies with this script:

cargo add aws-config@1.1.8 -F behavior-version-latest
cargo add aws-credential-types@1.1.8 -F hardcoded-credentials
cargo add aws-sdk-sns@1.18.0
cargo add anyhow@1.0.81
cargo add serde@1.0.197 -F derive
cargo add serde-json@1.0.114
cargo add thiserror@1.0.58

Remember your AWS keys from earlier? We’ll add them to Secrets.toml file in the root of our project folder. It should look like this:

AWS_ACCESS_KEY_ID = "your-access-key-id-here"
AWS_SECRET_ACCESS_KEY = "your-secret-access-key-here"

Setting up our AWS config

To get started, we’ll want to add our resource annotations to our main function - which should look like this:

#[shuttle_runtime::main]
async fn main(
    #[shuttle_runtime::Secrets] secrets: SecretStore,
) -> shuttle_axum::ShuttleAxum {
   // .. your code here
}

When we use shuttle run, our secrets will now get provisioned to us without needing to do anything!

Before we start adding more code, let’s write our AppState struct to hold Axum state. This will allow our handler functions to access our AWS SNS client whenever we want.

use aws_sdk_sns::Client;

#[derive(Clone)]
pub struct AppState {
    sns: Client,
    topic_arn: String
}

Next, we’ll want to add code to our main function that gets the secrets, creates aws_credential_types::Credentials and creates an AWS config from the credentials, as well as adding a region. We’ll use eu-west-02 to reduce latency (because the Shuttle servers are hosted on eu-west-02). We then create a new client, initialize our app state and then add it to the axum::Router;

let access_key_id = secrets.get("AWS_ACCESS_KEY_ID").expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");
    let secret_access_key = secrets.get("AWS_SECRET_ACCESS_KEY").expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");

        let creds = Credentials::from_keys(
               access_key_id,
               secret_access_key,
               None
            );

        let cfg = aws_config::from_env()
            .region(Region::new("eu-west-02"))
            .credentials_provider(creds)
            .load()
            .await;

       let sns = Client::new(&cfg);

    let state = AppState { sns };

    let router = Router::new().route("/", get(hello_world)).with_state(state);

Error handling

Error handling for the ShuttleAxum type

The shuttle_axum::ShuttleAxum return type defaults to anyhow::Error for user errors. This doesn’t affect handler functions. For functions used in the main function however, they will need to either be able to convert to anyhow::Error or be of the same type.

Error handling for API route functions

To create errors easily, we’ll use the thiserror crate we installed to generate From<T> implementations for our error type, as well as the error messages. Let’s have a look.

use thiserror::Error;

use aws_sdk_sns::error::SdkError;
use aws_sdk_sns::operation::publish::PublishError

#[derive(Debug, Error)]
pub enum ApiError {
    #[error("Error while publishing message: {0}")]
    PublishMessage(#[from] SdkError<PublishError>),
}

We use the thiserror::Error derive macro to enable the attribute macros. A few things are happening here:

  • The #[from] implementation automatically derives From<T> so that the given type will automatically turn into a given enum variant. Note however that if the error type is an enum, it will convert it regardless of the enum variant. If you want more custom error handling, you may want to implement From<T> manually.
  • #[error("..")] automatically generates the std::fmt::Display implementation for our error type. Note that we need to use this on every enum variant, otherwise we will receive compile-time errors.
  • Using thiserror::Error automatically implements std::error::Error for ApiError so there’s no need for us to re-implement it!

Next, we’ll want to implement axum::response::IntoResponse so that we can return ApiError from our Axum handler functions.

use axum::{
    http::StatusCode,
    response::{IntoResponse, Response},
};

impl IntoResponse for ApiError {
    fn into_response(self) -> Response {
        (StatusCode::INTERNAL_SERVER_ERROR, self.to_string().into_response())
    }
}

Currently we just have all of our error variants return internal server errors. To go further into this, you could create a response depending on the internal error enum variant or source (via pattern matching).

Topics

To create a topic, we can use the AWS SNS client to create a builder object and then add tags or attributes to it depending on what we want.

For the endpoint, it is mostly about creating the builder, appending the attributes and using send() to create it.

async fn create_topic(sns: &Client, name: &str) -> AnyhowResult<String> {
    let topic = sns.create_topic().name(name);

    let output = topic.send().await.unwrap();

    println!("Topic created: {output:?}");
    Ok(output.topic_arn.unwrap())
}

The output from sending the topic will contain the ARN. We need to store this to be able to retrieve a topic later on. Note that attempting to create two topics with the same name will return the current ARN instead of creating a new resource.

If you’re interested in customising this further, you can check out more about the CreateTopicFluentBuilder here.

Subscriptions

Receiving published messages requires a topic subscription. Creating a subscription requires declaring an endpoint, the topic ARN as well as the protocol used. We can refer to the SubscribeFluentBuilder for this here.

The function will look something like this:

async fn subscribe_to_topic(sns: &Client, url: &str, arn: &str) -> AnyhowResult<()> {
    let sub = sns
        .subscribe()
        .protocol("https".to_string())
        .endpoint(url)
        .topic_arn(arn);

    let output = sub
        .send()
        .await
        .map_err(|e| anyhow::anyhow!("error: {e}"))?;
    println!("New subscriber created: {output:?}");

    Ok(())
}

There is not much to talk about here as it’s primarily just setting it up and then sending it. If we wanted to, we could also create multiple subscriptions to the same topic ARN, giving us the benefit of being able to fan-out our notifications.

Because we’re setting up an HTTPS endpoint, we need to confirm the subscription. When we create the subscription, SNS will send a subscription confirmation message to our receiver. This requires our receiver to already be deployed. Because we already print out the confirmation message in our logs, we can easily access and visit the URL to confirm the subscription. You can find out more about this here.

To check that the subscription exists, we can do this:

async fn subscription_exists(sns: &Client, url: &str, arn: &str) -> AnyhowResult<bool> {
    let subscribers = sns
        .list_subscriptions()
        .send()
        .await
        .map_err(|e| anyhow::anyhow!("error: {e}"))?;

    if let Some(subs) = subscribers.subscriptions {
        if subs.iter().any(|sub| {
            sub.clone().endpoint.unwrap() == *url && sub.clone().topic_arn.unwrap() == *arn
        }) {
            return Ok(true);
        }

        return Ok(false);
    }

    Ok(false)
}

Publishing messages

To be able to publish messages, we’ll need the topic ARN we want to subscribe to, the message as well as the subject.

#[derive(Deserialize)]
struct PublishMessageParams {
    message: String,
    subject: String
}

async fn publish_message(
    State(state): State<AppState>,
    Json(json): Json<PublishMessageParams>
) -> Result<impl IntoResponse, ApiError> {
    let res = state.sns.publish()
        .topic_arn(state.arn)
        .message(json.message)
        .subject(json.subject)
        .send().await?;

    Ok(StatusCode::OK)
}

All of the endpoints who are subscribed to the topic will now automatically receive the message!

Interested in customising this further? You can check out more about the PublishFluentBuilder here.

Connecting it all together

To connect it all, let’s revisit our main function and fill it in with the new functions:

#[shuttle_runtime::main]
async fn main(#[shuttle_secrets::Secrets] secrets: SecretStore) -> shuttle_axum::ShuttleAxum {
    let access_key_id = secrets
        .get("AWS_ACCESS_KEY_ID")
        .expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");
    let secret_access_key = secrets
        .get("AWS_SECRET_ACCESS_KEY")
        .expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");

    let creds = Credentials::from_keys(access_key_id, secret_access_key, None);

    let cfg = aws_config::from_env()
        .region(Region::new("eu-west-02"))
        .credentials_provider(creds)
        .load()
        .await;

    let sns = aws_sdk_sns::Client::new(&cfg);
    let topic_arn = create_topic(&sns, "my_topic").await?;

    // NOTE: Change this to your deployment URL for your receiver service!
    // The SNS receiver route should be on `/sns` as instructed previously
    let url = "https://sns-receiver.shuttleapp.rs/sns";

    if !subscription_exists(&sns, url, &topic_arn).await? {
        subscribe_to_topic(&sns, url, &topic_arn).await?;
    }

    let state = AppState { sns, topic_arn };

    let router = Router::new().route("/", get(hello_world)).with_state(state);

    Ok(router.into())
}

Deploying

To deploy, we just need to use shuttle deploy (with the --allow-dirty flag if on an uncommitted Git branch) and watch the magic happen!

Finishing up

Thanks for reading! AWS SNS is a powerful tool to be able to get going with notifying services, particularly if you need fan-out from something like AWS SQS or other message queues.

Read more:

This blog post is powered by shuttle - The Rust-native, open source, cloud development platform. If you have any questions, or want to provide feedback, join our Discord server!
Share article
rocket

Build the Future of Backend Development with us

Join the movement and help revolutionize the world of backend development. Together, we can create the future!