Data Parallelism with Rust and Rayon

Cover image

Hello world! Today we’ll be talking about using the Rayon crate to speed up synchronous data processing. Rayon is a library crate that allows you to ergonomically parallelize your computations with high-level methods. You can also use the more low level API to divide the work up yourself!

What makes rayon good?

Parallel execution is hard. When writing a library that can carry out parallel execution, you are likely to induce a significantly difficult-to-fix bug or a data race. Rayon’s APIs guarantee data race freedom and use an ergonomic API via traits that means if your code compiles normally, it will still do mostly the same thing as it did before. If your iterators have side effects however, they may occur in a different order.

Rayon’s core primitive is called join, which essentially just “joins” two functions in FnOnce closures that may or may not run in parallel, depending on if there is an idle core available. This approach of potential parallelism rather than forced parallelism can also provide an upside to performance as there are times when sequential work can be more value. It is also difficult to predict when parallelism is a good thing, which adds to the complexity.

The join function is implemented using work stealing (like the Tokio async runtime). Rayon uses a global thread pool to be able to take advantage of this. Essentially this means that if you have a thread doing work that finishes, it will then look for other units of work. For example, let’s have task A and task B. A thread might execute task A, while adding B to a local queue of work to be done. Threads from the thread pool will actively look for work to execute - so another idle thread from the thread pool might try to execute task B.

The great thing about join is that the way it is set up is inherently safe. Let’s take a look at the below code which doesn’t compile and if it did, would cause bad things to happen:

fn share_rc<T:PartialOrd+Send>(rc: Rc<i32>) {
    // In the closures below, the calls to `clone` increment the
    // reference count. These calls MIGHT execute in parallel.
    // Would not be good!
    rayon::join(|| something(rc.clone()),
                || something(rc.clone()));
}

You can’t have two closures that are simultaneously in scope and access the same &mut data. &mut types themselves can only be borrowed once; by trying to use it over both closures, we’ve violated this rule. Additionally, the Rc type doesn’t actually implement the Send marker trait - which is required to send values across threads!

Using Rayon

Getting started

To get started, add rayon to your Rust application:

cargo add rayon

Parallelizing array work

The simplest way to use Rayon is to convert your iterators into parallel iterators. If you’re using .iter(), you simply change the method to .par_iter() and you’re done! No change required.

use rayon::prelude::*;

fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter()
         .map(|i| i * i)
         .sum()
}

You can also parallelise extending a vector or array by using the ParallelExtend trait. This also requires IntoParallelIterator, the parallel version of the IntoIterator trait from the standard library.

If you require indexes, you can use the IndexParallelIterator trait. It supports random access, allowing you to split an array at arbitrary indices and draw data from a given point of your choosing.

Turn an iterator into a parallel iterator

Some types may implement Iterator but are otherwise impossible or extremely difficult to implement ParallelIterator for. This is where the par_bridge() function comes in. By using this function on an iterator, it lets you bridge an Iterator type to IterBridge (which then allows conversion to ParallelIterator). A quick example might look something like this:

use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;

let rx = {
    let (tx, rx) = channel();

    tx.send("one!");
    tx.send("two!");
    tx.send("three!");

    rx
};

let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
output.sort_unstable();

assert_eq!(&*output, &["one!", "three!", "two!"]);

Note that while the final iterator type generated from this is generally not as good as implementing ParallelIterator yourself, it can be a great deal faster than purely sequential work.

Using your own thread pool

For even more low level work, you may want to use your own thread pool for work or customize the global thread pool for rayon. A quick way to get started would be using it like this:

// as a variable
let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();

// globally
rayon::ThreadPoolBuilder::new().num_threads(8).build_global().unwrap();

When using the ThreadPool, you can do a few things:

  • Carry out a function on every single thread using .broadcast()
  • Use .join() to start some parallel work
  • Use install() which takes a join type

Interested in learning more? You can find more about the ThreadPool type here.

Use cases for rayon

Log analysis

Text processing is a great area where Rayon can provide a big performance bonus! Here we can analyse a log file that has been extracted from somewhere by filtering for any lines that contain the word “ERROR” in uppercase.

use std::fs::read_to_string;
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};

fn main() {
  // read a csv file to a string
    let my_string = read_to_string("my_file.txt").unwrap();

  let my_vec = my_string.lines().collect::<Vec<&str>>();
  let my_vec: Vec<String> = my_vec.into_par_iter()
      .filter(|x| x.contains("ERROR"))
      .map(|x| x.to_owned())
      .collect();

  // this should now print a vec of vecs
  // where every single value is the "Hello world!" string
  println!("{:?}", my_vec);

}

CSV processing

Of course, where there’s mountains of data, data parallelism will always be a good thing. You can parse a file, split it then parallelise the reading. In the below example, we parse a CSV file and turn every value into a “Hello world!” string:

use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use std::fs::read_to_string;
use std::str::Split;

fn main() {
    // read a csv file to a string
    let my_string = read_to_string("my_file.csv").unwrap();

    let my_vec: Vec<String> = my_string
        .lines()
        .filter(|x| *x != String::new())
        .map(|x| x.to_string())
        .collect();
    println!("{my_vec:?}");

    let my_vec: Vec<Vec<String>> = my_vec
        .into_par_iter()
        .map(|x| {
            let x = x.split(",").map(|x| x.to_string()).collect::<Vec<String>>();
            let res: Vec<String> = x
                .into_par_iter()
                .map(|_| "Hello world!".to_string())
                .collect();
            res
        })
        .collect();

    // this should now print a vec of vecs
    // where every single value is the "Hello world!" string
    println!("{:?}", my_vec);
}

For many use cases, the more sensible option here most of the time would be to use the csv crate. However, that doesn’t stop us from implementing our own CSV parser - particularly if we’re not using the serde crate to deserialize and serialize our CSV records.

Pitfalls

Rayon is a very powerful crate. However, there are some pitfalls that can happen while using it if you’re not careful.

Blocking rayon threads (deadlocks)

Work done in parallel rayon threads that block each others’ threads are something you absolutely want to avoid here. One example of this may be a mutex that gets locked on every iteration.

fn main() {
let arcmutex = Arc::new(Mutex::new(0));

    let res = (0..5000).par_iter().for_each|x| {
        let mut mutex_locked = arcmutex.lock().unwrap();
        mutex_locked += x;
    }.collect();

println!("{res}");
}

Here, you can see that the mutex gets locked on every iteration! That’s a terrible way to do it. Clearly, we need a better way. We can avoid this totally by simply collecting the parallelized work, and then adding it to our wrapped number:

fn main() {
let arcmutex = Arc::new(Mutex::new(0));

let res = (0..5000).par_iter().sum();
let mut locked_mutex = arcmutex.lock().unwrap();
*locked_mutex += res;

println!("{locked_mutex:?}");
}

Workload too small

If your workload is too small, the overhead from using the rayon crate can neutralise any performance gains. This is particularly because of the thread pool management and dealing with work-stealing. However, whether this matters to you will typically depend on your use case.

Finishing up

Thanks for reading! Rayon is an awesome crate to help you speed up your data processing.

Read more:

This blog post is powered by shuttle - The Rust-native, open source, cloud development platform. If you have any questions, or want to provide feedback, join our Discord server!
Share article
rocket

Build the Future of Backend Development with us

Join the movement and help revolutionize the world of backend development. Together, we can create the future!