If you have ever tried to deploy a service in the last three to four years, there’s a good chance someone has suggested using Kubernetes; however, if this is your first time hearing of Kubernetes, welcome. Kubernetes is an orchestrator that makes it easy to run large-scale containerized applications.
What makes Kubernetes really great is its extensibility, by using the API users are able to create custom applications that don’t ship with Kubernetes out of the box, It's how we have great tools like ArgoCD.
If you clicked on this post, you are likely familiar with Kubernetes and looking to take things a step further, regardless of why you are here. In this article, we will look at a bunch of useful operations for interacting with Kubernetes using the planet's most loved crab language, Rust.
Getting Started
To create a local Kubernetes cluster, we will use KinD, a tool that allows you to run Kubernetes in Docker, hence the name KinD. Don't forget to have Docker installed!
If you’re on Mac, you can install kind using brew:
brew install kind
For Linux users, if you have Go installed and path set correctly, you can install KinD using:
go install sigs.k8s.io/kind@v0.24.0
With that installed, you can create a new cluster by running:
kind create cluster --name krust-playground
Pre-requisites
Along with KinD, you will also need to install Kubectl. If you are feeling adventurous, you can install that right away using:
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
And for ARM machines:
curl -LO https://dl.k8s.io/release/v1.31.0/bin/linux/arm64/kubectl
Otherwise, take a look at this section of the Kubernetes docs for more specific installation instructions.
Kube-rs?
To interact with the Kubernetes API we will need a client library. As described in the project’s readme, kube-rs
is:
| A Rust client for Kubernetes in the style of a more generic client-go, a runtime abstraction inspired by controller-runtime, and a derive macro for CRDs inspired by Kubebuilder
Project Setup
Create a new cargo project:
cargo init k8s-rs
Add the following crates to your Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
kube = { version = "0.95.0", features = ["runtime", "derive" , "ws"] }
k8s-openapi = { version = "0.23.0", features = ["latest"] }
serde_json = "1.0"
tracing = "0.1.37"
tokio-util = { version = "0.7.8", features = ["io"] }
tokio-stream = { version = "0.1.9", features = ["net"] }
tracing-subscriber = "0.3.17"
futures = "0.3.28"
anyhow = "1.0.71"
schemars = "0.8.12"
serde = { version = "1.0", features = ["derive"] }
yaml-rust2 = "0.8"
Connecting to Your Cluster
We’ll begin by looking at how to connect to the cluster we created earlier. By default, your connection information is stored in ~/.kube/config
. To use this in Rust, we can use the following code:
We’ll begin by looking at how to connect to the cluster we created earlier:
use kube::{Config};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::infer().await?;
println!(
"Connected to cluster at {:?}",
config.cluster_url.host().unwrap()
);
Ok(())
}
Using Config::infer()
kube-rs
will attempt to connect to the cluster using your default kubeconfig.
Creating a Client
Using the config is great for printing out cluster information, but we can’t use it to interact with it. To create a new cluster, we can use Client::try_default()
, which calls Client::infer()
under the hood and returns a new client:
use kube::{Client};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
Ok(())
}
Working with Pods
Pods are a fundamental resource in Kubernetes. To create one using kube-rs
we can use the following code:
Create a pod
use k8s_openapi::api::core::v1::Pod;
use kube::{Api, Client, Config};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
// Define the Pod using a JSON spec
let pod_json = json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "pong-pod"
},
"spec": {
"containers": [{
"name": "pong-container",
"image": "ghcr.io/s1ntaxe770r/pong"
}]
}
});
let pod = serde_json::from_value(pod_json)?;
let pod = pods.create(&kube::api::PostParams::default(), &pod).await?;
println!("Pod created: {}", pod.metadata.name.unwrap());
Ok(())
}
This works, but as we start to work with larger objects, it can quickly get confusing, and who really likes writing json?
Alternatively, you can create a pod by using a struct; we will be using this method going forward.
use k8s_openapi::api::core::v1::{Container, Pod, PodSpec};
use kube::api::{ObjectMeta, PostParams};
use kube::{Api, Client};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
let pod = Pod {
metadata: ObjectMeta {
name: Some("pong-pod".to_string()),
..Default::default()
},
spec: Some(PodSpec {
containers: vec![Container {
name: "pong-container".to_string(),
image: Some("ghcr.io/s1ntaxe770r/pong".to_string()),
..Default::default()
}],
..Default::default()
}),
..Default::default()
};
let pod = pods.create(&PostParams::default(), &pod).await;
match pod {
Ok(pod) => {
println!("created pod {}", pod.metadata.name.unwrap());
}
Err(e) => {
println!("unable to create pod {}", e)
}
}
Ok(())
}
Listing pods
Another common operation is listing pods. With the code snippet below, we can list the pods in the kube-system
namespace.
use k8s_openapi::api::core::v1::Pod;
use kube::{api::ListParams, Api, Client};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::namespaced(client, "kube-system");
let list_params = ListParams::default();
for pod in pods.list(&list_params).await? {
println!("Found Pod {:?}", pod.metadata.name.unwrap())
}
Ok(())
}
We can also filter what pods by using labels. Labels are customizable key/value tags attached to objects, used for organization.
use k8s_openapi::api::core::v1::Pod;
use kube::{api::ListParams, Api, Client};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::namespaced(client, "kube-system");
let list_params = ListParams::default().labels("component=kube-apiserver");
for pod in pods.list(&list_params).await? {
println!("Found Pod {:?}", pod.metadata.name.unwrap())
}
Ok(())
}
In this example, only pods with the label component=kube-apiserver
will be returned.
Updating Pods
While creating and listing pods are everyday tasks, you'll often need to update existing pods. Let's look at an example of adding labels to a pod.
First, let's add a label to our existing pong-pod
:
use k8s_openapi::api::core::v1::Pod;
use kube::{Api, Client, Config};
use kube::api::{Patch, PatchParams};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize,Debug)]
struct PodPatch {
metadata: PodMetadataPatch,
}
#[derive(Serialize, Deserialize,Debug)]
struct PodMetadataPatch {
labels: std::collections::BTreeMap<String, String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
let mut new_labels = std::collections::BTreeMap::new();
new_labels.insert("environment".to_string(), "production".to_string());
let patch = PodPatch {
metadata: PodMetadataPatch {
labels: new_labels,
},
};
let params = PatchParams::default();
let patched_pod = pods.patch("pong-pod", ¶ms, &Patch::Merge(&patch)).await?;
println!("Pod updated with new label: {:?}", patched_pod.metadata.labels);
Ok(())
}
Deleting Pods
Finally, we can delete pods using the following code:
use k8s_openapi::api::core::v1::Pod;
use kube::api::{DeleteParams, Patch, PatchParams};
use kube::{Api, Client};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
let delete_params = DeleteParams::default();
pods.delete("pong-pod", &delete_params).await?;
println!("deleted pod");
Ok(())
}
Interacting with running Pods
Switching gears a little, let’s explore how to interact with running pods.
Fetching logs from a Pod
Often, you'll want to see what's happening inside a pod to debug issues or monitor application behavior. We can mimic the functionality of kubectl logs
using the kube-rs. Here's a snippet showing how to fetch logs from the API server:
use k8s_openapi::api::core::v1::{Container, Pod, PodSpec, ContainerPort};
use kube::{Api, Client, ResourceExt};
use kube::api::{PostParams, WatchEvent};
use futures::{StreamExt, TryStreamExt};
use tokio::time::{Duration, sleep};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize the Kubernetes client
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
// Define the Pod with port 80 exposed
let pod = Pod {
metadata: kube::api::ObjectMeta {
name: Some("nginx-example".to_string()),
..Default::default()
},
spec: Some(PodSpec {
containers: vec![Container {
name: "nginx".to_string(),
image: Some("nginx:latest".to_string()),
ports: Some(vec![ContainerPort {
container_port: 80,
..Default::default()
}]),
..Default::default()
}],
..Default::default()
}),
..Default::default()
};
// Create the Pod
let pod = pods.create(&PostParams::default(), &pod).await?;
println!("Created pod: {}", pod.name_any());
// Wait for the Pod to be ready
let timeout = Duration::from_secs(60);
let start = std::time::Instant::now();
loop {
let pod = pods.get("nginx-example").await?;
let status = pod.status.as_ref().expect("Pod status should be available");
let Some(phase) = &status.phase else {
if start.elapsed() > timeout {
return Err("Timed out waiting for pod to be ready".into());
}
sleep(Duration::from_secs(1)).await;
continue;
};
if phase == "Running" {
println!("Pod is running");
break;
}
if start.elapsed() > timeout {
return Err("Timed out waiting for pod to be ready".into());
}
sleep(Duration::from_secs(1)).await;
}
// Fetch logs
let logs = pods.logs("nginx-example", &Default::default()).await?;
println!("Pod logs:\n{}", logs);
Ok(())
}
This example demonstrates a slightly more realistic scenario of working with Pods in a Kubernetes cluster. We create a Pod running Nginx, wait for it to be ready, and then fetch its logs.
After creating the Pod, we use the status
field to determine if the Pod is running:
let Some(phase) = &status.phase else {
if start.elapsed() > timeout {
return Err("Timed out waiting for pod to be ready".into());
}
sleep(Duration::from_secs(1)).await;
continue;
};
Port forwarding to a Pod
Sometimes, you need to interact directly with a service running in a pod. We can replicate the functionality of kubectl port-forward
. Here's a stripped-down version of port forwarding, adapted from the kube-rs repository:
First, let's set up the Kubernetes client and wait for our pod to be running:
use anyhow::Context;
use std::net::SocketAddr;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::Api,
runtime::wait::{await_condition, conditions::is_pod_running},
Client, ResourceExt,
};
use tracing::*;
use futures::StreamExt;
use futures::TryStreamExt;
use tokio::net::{TcpListener, TcpStream};
use tokio_stream::wrappers::TcpListenerStream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
// Get the Pod
let p = pods.get("nginx-example").await?;
info!("Found pod: {}", p.name_any());
// Wait until the pod is running
let running = await_condition(pods.clone(), "nginx-example", is_pod_running());
tokio::time::timeout(Duration::from_secs(60), running).await??;
info!("Pod is running");
}
Next, we'll set up the local address to forward traffic to:
// ... (in main function)
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
let pod_port = 80;
info!(local_addr = %addr, pod_port, "forwarding traffic to the pod");
info!(
"try opening http://{0} in a browser, or `curl http://{0}`",
addr
);
info!("use Ctrl-C to stop the server and delete the pod");
Using a TcpListenerStream
, we set up a local TCP server that listens for incoming connections:
let server = TcpListenerStream::new(TcpListener::bind(addr).await?)
.take_until(tokio::signal::ctrl_c())
.try_for_each(|client_conn| async {
if let Ok(peer_addr) = client_conn.peer_addr() {
info!(%peer_addr, "new connection");
}
let pods = pods.clone();
tokio::spawn(async move {
if let Err(e) = forward_connection(&pods, "nginx-example", 80, client_conn).await {
error!(
error = e.as_ref() as &dyn std::error::Error,
"failed to forward connection"
);
}
});
Ok(())
});
if let Err(e) = server.await {
error!(error = &e as &dyn std::error::Error, "server error");
}
info!("Shutting down");
Finally, let's look at the function that forwards the connection:
async fn forward_connection(
pods: &Api<Pod>,
pod_name: &str,
port: u16,
mut client_conn: TcpStream,
) -> anyhow::Result<()> {
let mut forwarder = pods.portforward(pod_name, &[port]).await?;
let mut upstream_conn = forwarder
.take_stream(port)
.context("port not found in forwarder")?;
tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await?;
drop(upstream_conn);
forwarder.join().await?;
info!("connection closed");
Ok(())
}
Working with Custom Resources
Kubernetes has an interesting concept called custom resources. These allow you to extend the Kubernetes API and add new resources. This by itself isn’t very descriptive, so let’s take things a step further with an example.
Say you wanted to inform Kubernetes about a new resource called crustacean
. By default if you run kubectl get crustacean
, you will get an error saying error: the server doesn't have a resource type "crustacean."
. To create the resource, start out by defining a new custom resource:
use kube::CustomResource;
use kube::ResourceExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "experiments.gopherlabs.io",
version = "v1",
kind = "Crustacean",
printcolumn = r#"
{"name": "Habitat", "type": "string", "jsonPath": ".spec.habitat"}
"#
)]
pub struct CrustaceanSpec {
pub name: String,
pub species: String,
pub habitat: Option<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
We can generate a manifest using the following code:
use kube::CustomResource;
use kube::CustomResourceExt;
use schemars::schema_for;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::Write;
use yaml_rust2::{YamlEmitter, YamlLoader};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let larry = Crustacean::new(
"larry",
CrustaceanSpec {
species: "lobster".to_string(),
habitat: Some("Atlantic Ocean".to_string()),
},
);
// Convert the CRD to JSON first
let crd_json = serde_json::to_string(&larry)?;
// Parse the JSON to yaml_rust2::Yaml
let docs = YamlLoader::load_from_str(&crd_json)?;
let doc = &docs[0];
// Emit YAML
let mut out_str = String::new();
{
let mut emitter = YamlEmitter::new(&mut out_str);
emitter.dump(doc)?;
}
// Write the CRD to a YAML file
let mut file = File::create("crustacean_crd.yaml")?;
file.write_all(out_str.as_bytes())?;
println!("\nCRD written to crustacean_crd.yaml");
Ok(())
}
Running the above code will produce the following manifest:
apiVersion: experiments.gopherlabs.io/v1
kind: Crustacean
metadata:
name: larry
spec:
species: lobster
habitat: Atlantic Ocean
Great, but Kubernetes still has no idea how to handle this CRD. To do this, we need to apply the custom resource definition to the cluster:
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::{api::PostParams, Api, Client, CustomResource};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::fs;
use yaml_rust2::YamlLoader;
impl Crustacean {
pub fn new_crustacean(species: String, habitat: Option<String>, name: String) -> Self {
Crustacean {
metadata: ObjectMeta {
name: Some(name),
..Default::default()
},
spec: CrustaceanSpec { species, habitat },
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a client to interact with the Kubernetes API
let client = Client::try_default().await?;
// Read the Crustacean instance from the YAML file
let crustacean_yaml = fs::read_to_string("crustacean_crd.yaml")?;
let yaml_docs = YamlLoader::load_from_str(&crustacean_yaml)?;
let yaml_doc = &yaml_docs[0];
// Extract values from YAML
let name = yaml_doc["metadata"]["name"].as_str().unwrap_or("").to_string();
let species = yaml_doc["spec"]["species"]
.as_str()
.unwrap_or("")
.to_string();
let habitat = yaml_doc["spec"]["habitat"].as_str().map(String::from);
// Create Crustacean instance
let crustacean = Crustacean::new_crustacean(species, habitat, name);
// Apply the Crustacean instance
let crustaceans: Api<Crustacean> = Api::all(client.clone());
crustaceans
.create(&PostParams::default(), &crustacean)
.await?;
println!("Crustacean instance applied successfully");
Ok(())
}
There are a couple of things going on in the snippet above, the important parts:
- Using the
fs
crate, we load the yaml file from the current directory. - We introduce a helper method called
new_crustacean
to help with some of the setup for the CRD definition. After extracting the values from the yaml file, we pass the parameters to thenew_crustacean
function and apply the manifest. - The
ensure_crd
function checks if the CRD exists in the cluster before applying the manifest, this is essential because kubernetes will try and validated the manifest we apply against the CRD schema.
async fn ensure_crd(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
let lp = ListParams::default().fields("metadata.name=crustaceans.experiments.gopherlabs.io");
let existing_crds = crds.list(&lp).await?;
if existing_crds.items.is_empty() {
println!("CRD not found. Creating new CRD.");
let crd = Crustacean::crd();
crds.create(&PostParams::default(), &crd).await?;
println!("CRD created successfully")
} else {
println!("CRD already exists. Skipping creation.");
}
Ok(())
}
At this point, you should be able to run:
kubectl get crustaceans
The output is similar to:
NAME HABITAT
larry Atlantic Ocean
Watching for Resource Changes
So we've got our Crustacean
CRD up and running. But creating resources is only half the battle. More often you want to react to events in the cluster this is where watchers come in, as the name suggests, watchers are used to monitor specific events.
In our case, we'll monitor for new Crustacean
resources and create a pod for each one.
use futures::{stream, StreamExt, TryStreamExt};
use k8s_openapi::{api::core::v1::Pod, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition};
use kube::{api::{PostParams, ListParams}, runtime::{watcher, WatchStreamExt}, Api, Client, CustomResource, CustomResourceExt, runtime::wait::{await_condition, conditions::is_pod_running},};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tracing::*;
use tokio::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
tracing_subscriber::fmt::init();
// Watch for Crustacean resources
let crustaceans: Api<Crustacean> = Api::all(client.clone());
let lp = ListParams::default();
let mut crustacean_watcher = watcher(crustaceans, watcher::Config::default()).applied_objects().boxed();
// Watch loop: create a pod when a Crustacean resource is applied
while let Some(crustacean) = crustacean_watcher.try_next().await? {
let crustacean_name = crustacean.metadata.name.unwrap();
let pod_name = crustacean_name.clone();
info!("got crustacean {} , creating pod....",crustacean_name.clone());
// Create a pod with the same name as the Crustacean resource
let pods: Api<Pod> = Api::namespaced(client.clone(), "default");
let pod = Pod {
metadata: kube::api::ObjectMeta {
name: Some(crustacean_name.clone()),
..Default::default()
},
spec: Some(k8s_openapi::api::core::v1::PodSpec {
containers: vec![k8s_openapi::api::core::v1::Container {
name: "crustacean-container".to_string(),
image: Some("nginx".to_string()),
..Default::default()
}],
..Default::default()
}),
..Default::default()
};
pods.create(&PostParams::default(), &pod).await?;
let running = await_condition(pods.clone(), &pod_name, is_pod_running());
tokio::time::timeout(Duration::from_secs(60), running).await??;
println!("Pod '{}' created for Crustacean resource.", crustacean_name);
}
Ok(())
}
Using the following code (as from the above code snippet):
let mut crustacean_watcher = watcher(crustaceans, watcher::Config::default())
.applied_objects()
.boxed();
We set up a watcher to monitor the Crustacean
custom resource, using the applied_objects()
: method ensures that we only care about the resources that have been applied (created or updated). It filters out other events like deletions.
In the while loop, we watch for events in the stream provided by the crustacean_watcher
. For each Crustacean
resource that the watcher detects, we create a Pod.
Using await_condition(pods.clone(), &pod_name, is_pod_running())
creates a future that waits for the condition where the Pod is running. is_pod_running()
is a predefined condition function that checks the Pod's status to see if it's in the "Running" state.
Running the example should result in the following output:
warning: `k8s-rs` (bin "k8s-rs") generated 4 warnings (run `cargo fix --bin "k8s-rs"` to apply 3 suggestions)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.42s
Running `target/debug/k8s-rs`
2024-09-23T20:59:43.549885Z INFO k8s_rs: got crustacean larry , creating pod....
Pod 'larry' created for Crustacean resource.
And if we run kubectl get pods
:
NAME READY STATUS RESTARTS AGE
app-deployment-6c8d544675-rn56b 1/1 Running 0 30h
larry 1/1 Running 0 9s
nginx-example 1/1 Running 0 19h
patch-add-ephemeral-container 1/1 Running 64 (8m51s ago) 45h
pong-deployment-84d87cb454-tqct9 2/2 Running 0 30h
Sure enough, Larry should be there.
Summary
If you made it to this part, you probably are really interested in kube-rs
. While the library is pretty neat, Kubernetes has a LOT of API’s and can quickly get confusing. When working with kube-rs
or Kubernetes in general, here are some things to keep in mind.
Know when to use a controller or an operator.
These concepts have some overlap and can be very confusing, which is why I avoided mentioning so as not to blow this out of scope. Luckily, people far smarter than me have written on this topic. Check out Ivan Velichko’s guide on Kubernetes operators. Looking for something with more Rust? The folks at Metalbear have an entire guide on creating an operator using kube-rs.
Examples don’t bite
The maintainers of kube-rs
did an amazing job of creating several examples of using the crate. This guide is not an exhaustive list of all its features; some of the examples used here were adapted from the repo.
Using Rust to interact with Kubernetes can be quite refreshing, especially if you come from the Kubebuilder world. Creating a CRD by adding a macro is extremely powerful, and in my experience thus far, I have written less code to achieve common functionality. If you’re curious about admission controllers, check out the guide I wrote here.
Conclusion
Of course, you don’t need Kubernetes for everything. If you’re interested in something more simple, Shuttle is your one-stop shop for easy Rust deployments. Create (or migrate) your project, use shuttle deploy
, and watch the magic happen!