Event driven Microservices using Kafka and Rust

Cover image

When it comes to real-time data handling, there are certain challenges that are imposed upon regular HTTP-based services. Resiliency, consistency and reliability are all qualities that a production-grade system handling real-time data processing should have. Event driven architecture and microservices is one way to become more resilient against this kind of issue. In this post, we'll take a deep dive into writing an event driven microservice and how we can use Change Data Capture to assist with taking functionality from a monolith and putting it in a microservice.

Interested in deploying or got stuck somewhere in the article? You can find the final repository here.

What does “event-driven” mean?

The term “event-driven” simply means that the web service is not driven by HTTP requests, but consumes events from event sources and executes logic based on the event type. This is in contrast to HTTP request based interactions. In practicality, events may be driven by a message queue (like AMQP) or event store like Kafka where messages are sent and received by one or more services.

Event-driven microservices are often used as part of an event-driven architecture, where the whole system is based on passing of published messages and using them as a source of truth. The advantages of event-driven architecture are clear:

  • Ideal for handling real time data in large quantities
  • Because event driven services don’t rely on synchronous HTTP calls, instead using events, there is less chance of a cascading failure across your architecture if something fails.
  • Using a message broker promotes loose component coupling, meaning you are not required to know the specific implementation of how your component works with other components.

Of course, with advantages there are also new challenges that need to be overcome:

  • How do you deal with out-of-order messages?
  • How do you migrate functionality from a monolith to an event-driven microservice?
  • How do you test that your system actually works?

We’ll be diving into all this and more in this article!

Getting started

Pre-requisites

If you want to connect locally to a Kafka instance, you’ll either need Apache Kafka and Zookeeper with both running, or Docker installed so that you can spin up Kafka and Zookeeper instances..

If you want to deploy this service to the web somewhere, you’ll also want to make use of a service that provides Kafka. Shuttle currently doesn’t provide it, but you can make use of services like Upstash to be able to provide it. You'll need to store your environment variables in the Secrets.toml file as described later on.

Project setup

To get started, we’ll want to make a new project using cargo shuttle init - don’t forget to select Axum as the framework! You will need cargo-shuttle installed for this. If you don’t have it installed, you can use cargo install cargo-shuttle.

We’ll then want to use the following shell snippet to install our project dependencies:

cargo add serde@1.0.198 -F derive
cargo add serde-json@1.0.116
cargo add shuttle-service
cargo add shuttle-shared-db -F postgres,sqlx
cargo add sqlx -F runtime-tokio-rustls,postgres,macros
cargo add thiserror@1.0.59
cargo add rdkafka@0.36.2 -F cmake-build
cargo add futures@0.3.30
cargo add pretty_env_logger@0.5.0 --dev
cargo add testcontainers@0.15.0 --dev
cargo add testcontainers-modules@0.3.7 --dev -F kafka

Then we’ll add our macro annotations to our application:

#[shuttle_runtime::main]
async fn main(
    #[shuttle_shared_db::Postgres] db: PgPool,
    #[shuttle_runtime::Secrets] secrets: SecretStore,
) -> shuttle_axum::ShuttleAxum {
    // your function code here
}

With only three lines, we’ve added a database (provisioned locally by Docker and provisioned by Shuttle servers in deployment!), our Secrets file and deployment metadata to the application!

We will also be using sqlx to handle our database migrations and queries. This means we’ll also want to use sqlx-cli to help manage our migrations. We’ll get started by installing it (the following command adds all features):

cargo install sqlx-cli

Next, you can use sqlx migrate add init to generate a new migration file located in the migrations folder (a subfolder of your root project). We’ll generate a simple table:

-- Add migration script here
create table if not exists messages (
     message_id int primary key,
     name varchar not null,
     message varchar not null,
     last_updated date not null default current_date
);

We’ll be expanding on this later, but for the basics this is all we need.

We’ll also be making use of SQLx’s macros to enable type-checked queries. For this, we’ll need to spin up a database, use sqlx migrate run to run the migrations against the database then use cargo sqlx prepare to generate our .sqlx folder. Once this folder gets checked into version control, we won’t need a database anymore when compiling our query macros.

Here’s a shell snippet you can use to accomplish this quickly (requires Docker to be running):

docker run -d -t -p 8081:5432 --name kafka-shuttle-pg postgres
sqlx migrate run --database-url postgres://postgres:postgres@localhost:8081/postgres
DATABASE_URL=postgres://postgres:postgres@localhost:8081/postgres cargo sqlx prepare
docker rm -f kafka-shuttle-pg

You’ll want to make sure to keep your Kafka endpoint URL somewhere, as we’ll be putting this in a Secrets.toml file:

KAFKA_URL = "<your-kafka-endpoint-url>"

Because we’re deploying via Shuttle, we will be using Secrets rather than raw environment variables. When storing our environment variables later, you’ll want to iterate through the SecretStore type like in the snippet below:

secret_store.into_iter().for_each(|x| std::env::set_var(x.0, x.1));

When you deploy or run locally, the secrets will get taken from this file.

Set up Kafka using Docker

A simple Kafka setup can easily be done using Docker Compose:

# docker-compose.yml
version: "3"

services:
# kafka
  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    volumes:
    - ./config/zookeeper-1/zookeeper.properties:/kafka/config/zookeeper.properties

  kafka-1:
    container_name: kafka-1
    image: bitnami/kafka
    restart: on-failure
    depends_on:
      - zookeeper-1
    ports:
      - 9092:9092
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper-1:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CREATE_TOPICS=messages:1:3
    healthcheck:
      test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server kafka:9092 --list" ]
      interval: 5s
      timeout: 10s
      retries: 5
networks:
  net:
    name: "net"
    driver: bridge

If you run this docker-compose.yml setup, it should automatically spawn a Zookeeper and Kafka instance for you. There are a lot of initial logs created when spawning both instances, so you may want to either create a detached instance (using -d flag) or opening a separate terminal window for this.

You may find on startup that the topic is not initialised properly, in which case you can use this shell snippet to auto-create a topic in the Docker container:

#!/usr/bin/env sh

docker exec kafka-1 /opt/bitnami/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 --create --if-not-exists --topic messages \
--replication-factor 1 --partitions 1

Building

Error Handling

Before we get started, we should think about the kinds of errors we can get from using our application. There are a few that immediately come to mind:

  • An RDKafka error
  • An error from using Kafka itself
  • serde_json errors (particularly when serializing and deserializing messages)
  • Oneshot messages being cancelled (from futures::channel)

We can represent all of these errors using an enum that uses the thiserror crate to easily derive error messages:

#[derive(Debug, thiserror::Error)]
pub enum ApiError {
    #[error("RDKafka error: {0}")]
    RDKafka(#[from] rdkafka::error::RDKafkaError),
    #[error("Kafka error: {0}")]
    Kafka(rdkafka::error::KafkaError),
    #[error("De/serialization error: {0}")]
    SerdeJson(#[from] serde_json::Error),
    #[error("Oneshot message was canceled")]
    CanceledMessage(#[from] futures::channel::oneshot::Canceled),
}

Note that while three of our types use the #[from] attribute macro to quickly derive the From<T> implementation, converting a KafkaError into our enum variant is a little bit more tricky. The methods that return this error will normally return the error as a tuple containing both the error and the record where the error occurred. We can thus implement it like this:

impl<'a>
    From<(
        rdkafka::error::KafkaError,
        rdkafka::producer::FutureRecord<'a, str, std::vec::Vec<u8>>,
    )> for ApiError
{
    fn from(
        e: (
            rdkafka::error::KafkaError,
            rdkafka::producer::FutureRecord<'a, str, std::vec::Vec<u8>>,
        ),
    ) -> Self {
        Self::Kafka(e.0)
    }
}

impl From<(rdkafka::error::KafkaError, rdkafka::message::OwnedMessage)> for ApiError {
    fn from(e: (rdkafka::error::KafkaError, rdkafka::message::OwnedMessage)) -> Self {
        Self::Kafka(e.0)
    }
}

To use this error type with our Axum service, we need to implement the IntoResponse trait. This trait specifically represents a type that can be turned into a HTTP response. We can do so by pattern matching the enum like so:

impl IntoResponse for ApiError {
    fn into_response(self) -> Response {
        let (status, body) = match self {
            Self::Kafka(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
            Self::RDKafka(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
            Self::SerdeJson(e) => (StatusCode::BAD_REQUEST, e.to_string()),
            Self::CanceledMessage(e) => (StatusCode::BAD_REQUEST, e.to_string()),
        };

        (status, body).into_response()
    }
}

Setting up a producer and consumer

To get started with rdkafka, we need to create a publisher and a consumer. We can do this with these two functions:

pub fn create_kafka_producer(secrets: &SecretStore) -> FutureProducer {
    let url = secrets.get("KAFKA_URL").unwrap();

    let log_level: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", url)
        .set("message.timeout.ms", "5000")
        .set("allow.auto.create.topics", "true")
        .create()
        .expect("Producer creation error");

    log_level
}

pub fn create_kafka_consumer(secrets: &SecretStore) -> StreamConsumer {
    let url = secrets.get("KAFKA_URL").unwrap();

    ClientConfig::new()
        .set("group.id", "shuttle-kafka")
        .set("bootstrap.servers", url)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "true")
        // only store offset from the consumer
        .set("enable.auto.offset.store", "false")
        .set_log_level(RDKafkaLogLevel::Debug)
        .create()
        .expect("Consumer creation failed")
}

These settings may be changed according to your liking. You can find a full list of configuration properties here.

You may note that above, we’ve enabled auto commit while only enabling storing offset from the consumer. The reason for this is that it allows us to rely on the underlying Kafka logic to commit regularly while only allowing the consumer to commit a message after it’s been fully processed. This enables us to prevent any loss of messages! This is also called At Least Once delivery.

Note that the producer has permissions to automatically create topics. In production, you may want to remove this and create topics manually. Allowing a producer to create topics freely may result in some unexpected behaviour!

Additionally, some hosted Kafka services will require SASL or SSL authentication. You can find more about the dependencies in the rdkafka-rust repo here. Note that if you are unable to install dependencies, rdkafka also has feature flags for vendored versions of the required dependencies.

Next, we’ll want to create our AppState which will hold the FutureProducer created by the create_kafka_producer function:

// src/state.rs
use crate::kafka;
use rdkafka::producer::FutureProducer;
use shuttle_runtime::SecretStore;
use shuttle_service::Environment;

#[derive(Clone)]
pub struct AppState {
    kafka_producer: FutureProducer,
}

impl AppState {
    pub fn new(secrets: &SecretStore) -> Self {
           let kafka_producer = kafka::create_kafka_producer(secrets),

        Self { kafka_producer }
    }
}

impl<'a> AppState {
    pub fn producer(&'a self) -> &'a FutureProducer {
        &self.kafka_producer
    }
}

To tie this all together, we’ll add it to our main function:

#[shuttle_runtime::main]
async fn main(
    #[shuttle_shared_db::Postgres] db: PgPool,
    #[shuttle_runtime::Secrets] secrets: SecretStore,
) -> shuttle_axum::ShuttleAxum {
    sqlx::migrate!().run(&db).await.unwrap();

    let state = AppState::new(&secrets);

    let rtr = Router::new().route("/", get(hello_world)).with_state(state);

    Ok(rtr.into())
}

Using a Kafka producer

To use our producer, we’ll first need something to send. Let’s create a simple struct that will have an action, as well as a message ID, name and the message itself:

use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct CustomMessage {
    name: String,
    message: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct KafkaMessage {
    action: Action,
    message_id: i32,
    data: Option<CustomMessage>,
}

#[derive(Debug, Serialize, Deserialize)]
enum Action {
    Create,
    Update,
    Delete,
}

Here we have modeled a message that can take three different forms: A create action, an update action and a delete action. The data field has been left as an Option, as with the Delete action there is no data required. We can then use these in a new Axum handler function endpoint like so:

async fn send_message(
    State(state): State<AppState>,
    Json(message): Json<KafkaMessage>,
) -> Result<&'static str, ApiError> {
    let msg = serde_json::to_vec(&message)?;
    let record: FutureRecord<str, Vec<u8>> = FutureRecord::to("messages").payload(&msg).key("1");

    state.producer().send_result(record)?.await??;

    tracing::info!("Message sent with data: {message:?}");

    Ok("Message sent!")
}

This is pretty much the only endpoint we need at the moment. We need to add this endpoint to our router like so to use it:

let rtr = Router::new()
     .route("/", get(hello_world))
     .route("/send", post(send_message))
     .with_state(state);

If you use cargo shuttle run to start your application (assuming Kafka is running) and run this curl command:

	curl localhost:8000/send -H 'Content-Type: application/json' \
		-d '{"action":"Create","message_id":4,"data":{"name":"Josh","message":"Hello world!"}}'

You should see “Message sent!” as a response.

Using a Kafka consumer

Next, the important part: receiving our messages! As a basic example, we will spawn a Tokio task to handle this.

Here is a short example of how we can use a StreamConsumer to subscribe to a channel, then loop while waiting for the message stream to receive a message:

// src/kafka.rs
#[tracing::instrument(skip_all)]
pub async fn kafka_consumer_task(con: StreamConsumer, db: sqlx::PgPool) {
    con.subscribe(&["messages"]).expect("Failed to subscribe to topics");

    tracing::info!("Starting the consumer loop...");

    loop {
        match con.recv().await {
            Err(e) => tracing::warn!("Kafka error: {}", e),
            Ok(m) => {
                let Some(payload) = m.payload() else {
                    tracing::error!("Could not find a payload :(");
                    continue;
                };

                // here we use `from_slice()` as we initally send it as &[u8]
                let message: KafkaMessage = match serde_json::from_slice(payload) {
                    Ok(res) => res,
                    Err(e) => {
                        // if there is a deserialization error, print an error
                        // and go to the next loop iteration
                        tracing::error!("Deserialization error: {e}");
                        continue;
                    }
                };

                // print out our payload
                tracing::info!("Got payload: {message:?}");

                let _ = con
                    .store_offset_from_message(&m)
                    .inspect_err(|e| tracing::warn!("Error while storing offset: {}", e));
            }
        };
    }
}

Currently, we just get a message and do nothing with it. However, in production when your app is communicating with other applications you probably want your consumer to do something!

Let’s do something with our message payloads by carrying out an SQL query for each action:

use crate::kafka::KafkaMessage;

#[tracing::instrument]
pub async fn create_message(message: KafkaMessage, db: &sqlx::PgPool) {
    let _ = sqlx::query!(
        "INSERT INTO MESSAGES
                           (message_id, name, message)
                            VALUES
                            ($1, $2, $3)
                            ON CONFLICT (message_id) DO NOTHING",
        message.message_id(),
        message.data().name(),
        message.data().message()
    )
    .execute(db)
    .await
    .inspect_err(|e| tracing::error!("Error while inserting message: {e}"));
}

#[tracing::instrument]
pub async fn update_message(message: KafkaMessage, db: &sqlx::PgPool) {
    let _ = sqlx::query!(
        "UPDATE MESSAGES
                            SET
                            name = $1,
                            message = $2
                            where message_id = $3",
        message.data().name(),
        message.data().message(),
        message.message_id()
    )
    .execute(db)
    .await
    .inspect_err(|e| tracing::error!("Error while updating message: {e}"));
}

#[tracing::instrument]
pub async fn delete_message(message: KafkaMessage, db: &sqlx::PgPool) {
    let _ = sqlx::query!(
        "DELETE from messages where message_id = $1",
        message.message_id()
    )
    .execute(db)
    .await
    .inspect_err(|e| tracing::error!("Error while deleting message: {e}"));
}

Next, we can add a short pattern-matching snippet to our consumer task function:

tracing::info!("Got payload: {message:?}");
match message.action {
    Action::Create => queries::create_message(message, &db).await,
    Action::Update => queries::update_message(message, &db).await,
    Action::Delete => queries::delete_message(message, &db).await,
}

Now whenever we receive a message, the following should happen:

  • We attempt to get the message payload
  • We attempt to deserialize the payload to KafkaMessage
  • Depending on the action, we either create a new record, update an existing record or delete a record

Beyond the basics

Introducing Change Data Capture

If you’re considering migrating from a monolith to a microservice that uses a database, there are probably a few concerns that you have:

  • How do I maintain data consistency?
  • How do I deal with out of order messages?

To solve your issues, one idea that you can leverage is Change Data Capture (or CDC). CDC typically refers to the tracking of data in a data source - for example, a database or data warehouse - so it can be captured in destination systems.

CDC is useful because it allows us to track changes across a database reliably: for example, let’s say you have a record that was updated, and then another event of the same type is sent and the same field gets updated again. We can track these changes such that we’ll know which version of the record the field should actually be updated to!

How you can do this typically depends on the database that you’re using. Since we’re using Postgres in our application, we’ll look at a great way you can carry out Change Data Capture using Postgres using triggers.

Postgres Trigger-based Change Data Capture

Given our current Postgres migrations, we can also create a table for CDC logs that holds a generic row ID, the message ID, operation type, timestamp, pre-operation values and post-operation values:

create table if not exists messages_cdc (
    cdc_id SERIAL PRIMARY KEY,
    message_id INT,
    operation_type VARCHAR(10),
    timestamp TIMESTAMP,
    name_before VARCHAR,
    message_before VARCHAR,
    name_after VARCHAR,
    message_after VARCHAR
);

Next, we’ll need to create an SQL function that returns a trigger.

CREATE
OR REPLACE FUNCTION capture_changes() RETURNS TRIGGER AS $$
BEGIN
END
;
IF (TG_OP = 'DELETE')
THEN
   Log
   DELETE
      operation
      INSERT INTO
         messages_cdc (message_id, operation_type, timestamp, name_before, message_before)
      VALUES
         (
            OLD.message_id, 'DELETE', NOW(), OLD.name, OLD.message
         )
;
ELSIF (TG_OP = 'UPDATE')
THEN
   -- Log UPDATE operation
   INSERT INTO
      users_cdc (message_id, operation_type, timestamp, name_before, email_before, name_after, message_after)
   VALUES
      (
         NEW.message_id, 'UPDATE', NOW(), OLD.name, OLD.message, NEW.name, NEW.message
      )
;
ELSIF (TG_OP = 'INSERT')
THEN
   Log INSERT operation
   INSERT INTO
      users_cdc (message_id, operation_type, timestamp, name_after, message_after)
   VALUES
      (
         NEW.message_id, 'INSERT', NOW(), NEW.name, NEW.message
      )
;
END
IF;
RETURN NEW;
$$ LANGUAGE plpgsql;

Finally, we need to create a trigger:

CREATE TRIGGER messages_trigger
AFTER INSERT OR UPDATE OR DELETE
ON MESSAGES
FOR EACH ROW
EXECUTE FUNCTION capture_changes();

And we’re done! You can put this in a new migration file, start your application up and you’ll have added both the new function and trigger to your database.

As you can see, there is a non-insignificant amount of code required to implement this and you’re also required to know some SQL beyond the fundamentals. However in exchange for this, Postgres triggers are very reliable, comprehensive and also enable instantaneous data capture. You can also create triggers for lots of different types of events! However, this also additionally puts extra strain onto the database. If you’re finding that your database table is starting to slow down, you may need to create a read-only replica to minimise resource usage.

Telemetry

Of course, while writing a microservice architecture you will probably want a way to track events across your service. For example, when a payload is retrieved successfully or an error occurs while trying to insert a new record in your database. With the tracing libraries, this can be as simple as adding the #[tracing::instrument] macro to your function and then using any one of the event macros. Below is our consumer task loop, fully instrumented:

#[tracing::instrument(skip(con, db))]
pub async fn kafka_consumer_task(con: StreamConsumer, db: sqlx::PgPool) {
    con.subscribe(&["messages"])
        .expect("Failed to subscribe to topics");

    tracing::warn!("Starting the consumer loop...");

    loop {
        match con.recv().await {
            Err(e) => tracing::warn!("Kafka error: {}", e),
            Ok(m) => {
                let payload = match m.payload() {
                    Some(payload) => payload,
                    None => {
                        tracing::error!("Could not find a payload :(");
                        continue;
                    }
                };

                let message: KafkaMessage = match serde_json::from_slice(payload) {
                    Ok(res) => res,
                    Err(e) => {
                        tracing::error!("Deserialization error: {e}");
                        continue;
                    }
                };

                tracing::info!("Got payload: {message:?}");
                match message.action {
                    Action::Create => queries::create_message(message, &db).await,
                    Action::Update => queries::update_message(message, &db).await,
                    Action::Delete => queries::delete_message(message, &db).await,
                }

                con.commit_message(&m, CommitMode::Async).unwrap();
            }
        };
    }
}

Note that we skip adding the consumer to our traces. In order for something to be logged in traces, it needs to implement std::fmt::Debug - which the consumer doesn’t.

Rate limiting

As mentioned previously, event-driven architecture may be considered naturally more resilient than other types of microservice architectures. Because the message queue acts as a natural barrier between microservices, event-driven service won’t immediately fall over in the event of a failed message. This is in comparison to HTTP request driven services, which can fail almost immediately if a request fails for any reason (without re-try code).

However, this doesn’t make it invincible. Naturally, we should make every effort possible to avoid load peaks and causing services to fall over. A few different things can cause a Kafka consumer to lag, which can cause eventually falling-over of services:

  • A producer starts producing much more number of messages than the consumer can handle
  • Slow consumer processing
  • Consumer doesn’t have high enough capacity

One way to solve these issues is through rate limiting. In event-driven services, generally speaking rate limiting can be effectively implemented using backpressure: a mechanism that signals to the upstream system to either slow down, or stop producing messages. One way to implement this may be controlling the polling interval (max.poll.interval.ms) disabling auto-commits (the enable.auto.commit property) so that we only commit on processing completion. This slows down the consumer, but allows Kafka much better control over how much is being processed and alleviates memory load.

More specifically for Kafka, you can also pause and resume collections as well as adjusting the max.poll.records option for our consumer.

On the HTTP side, you may have a public-facing web service. Should you need to add it, HTTP rate limiting is also made easy with tower-governor. You can find more about this in our article here where we talk about how you can implement a naive sliding window rate limiter, as well as more production-ready rate limiting.

Testing

To avoid having to spin up a Kafka container manually every time you want to test Kafka, you can use Testcontainers to be able to be able to spin up a container without any external input. Since we already installed testcontainers and testcontainers-modules (with the kafka feature flag), we can get coding immediately. We can put this code snippet in a test:

let docker = clients::Cli::default();
let kafka_node = docker.run(kafka::Kafka::default());

let bootstrap_servers = format!("127.0.0.1:{}",
     kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT)
     );

The address will link directly to the Kafka node which we can connect to and create a FutureProducer, StreamConsumer etc from.

testcontainers is primarily designed around isolated testing - if you want a persistent connection over several tests, you probably need to either reconfigure your tests or maybe want to think about a different approach using bollard to power your automated testing.

Deploying

To deploy, simply write cargo shuttle deploy --ad and watch the magic happen! Once you’ve compiled the first time, you’ll benefit from incremental deployment compilations.

Finishing up

Thanks for reading! Hopefully with this article, you’ve gained a better understanding of how to use Kafka and when it would be useful to do so in an application.

Read more:

This blog post is powered by shuttle - The Rust-native, open source, cloud development platform. If you have any questions, or want to provide feedback, join our Discord server!
Share article
rocket

Build the Future of Backend Development with us

Join the movement and help revolutionize the world of backend development. Together, we can create the future!