Hello world! Today we’ll be talking about using the Rayon crate to speed up synchronous data processing. Rayon is a library crate that allows you to ergonomically parallelize your computations with high-level methods. You can also use the more low level API to divide the work up yourself!
What makes rayon good?
Parallel execution is hard. When writing a library that can carry out parallel execution, you are likely to induce a significantly difficult-to-fix bug or a data race. Rayon’s APIs guarantee data race freedom and use an ergonomic API via traits that means if your code compiles normally, it will still do mostly the same thing as it did before. If your iterators have side effects however, they may occur in a different order.
Rayon’s core primitive is called join
, which essentially just “joins” two functions in FnOnce
closures that may or may not run in parallel, depending on if there is an idle core available. This approach of potential parallelism rather than forced parallelism can also provide an upside to performance as there are times when sequential work can be more value. It is also difficult to predict when parallelism is a good thing, which adds to the complexity.
The join
function is implemented using work stealing (like the Tokio async runtime). Rayon uses a global thread pool to be able to take advantage of this. Essentially this means that if you have a thread doing work that finishes, it will then look for other units of work. For example, let’s have task A and task B. A thread might execute task A, while adding B to a local queue of work to be done. Threads from the thread pool will actively look for work to execute - so another idle thread from the thread pool might try to execute task B.
The great thing about join
is that the way it is set up is inherently safe. Let’s take a look at the below code which doesn’t compile and if it did, would cause bad things to happen:
fn share_rc<T:PartialOrd+Send>(rc: Rc<i32>) {
// In the closures below, the calls to `clone` increment the
// reference count. These calls MIGHT execute in parallel.
// Would not be good!
rayon::join(|| something(rc.clone()),
|| something(rc.clone()));
}
You can’t have two closures that are simultaneously in scope and access the same &mut
data. &mut
types themselves can only be borrowed once; by trying to use it over both closures, we’ve violated this rule. Additionally, the Rc
type doesn’t actually implement the Send
marker trait - which is required to send values across threads!
Using Rayon
Getting started
To get started, add rayon
to your Rust application:
cargo add rayon
Parallelizing array work
The simplest way to use Rayon is to convert your iterators into parallel iterators. If you’re using .iter()
, you simply change the method to .par_iter()
and you’re done! No change required.
use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
input.par_iter()
.map(|i| i * i)
.sum()
}
You can also parallelise extending a vector or array by using the ParallelExtend
trait. This also requires IntoParallelIterator
, the parallel version of the IntoIterator
trait from the standard library.
If you require indexes, you can use the IndexParallelIterator
trait. It supports random access, allowing you to split an array at arbitrary indices and draw data from a given point of your choosing.
Turn an iterator into a parallel iterator
Some types may implement Iterator
but are otherwise impossible or extremely difficult to implement ParallelIterator
for. This is where the par_bridge()
function comes in. By using this function on an iterator, it lets you bridge an Iterator
type to IterBridge
(which then allows conversion to ParallelIterator
). A quick example might look something like this:
use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;
let rx = {
let (tx, rx) = channel();
tx.send("one!");
tx.send("two!");
tx.send("three!");
rx
};
let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
output.sort_unstable();
assert_eq!(&*output, &["one!", "three!", "two!"]);
Note that while the final iterator type generated from this is generally not as good as implementing ParallelIterator
yourself, it can be a great deal faster than purely sequential work.
Using your own thread pool
For even more low level work, you may want to use your own thread pool for work or customize the global thread pool for rayon
. A quick way to get started would be using it like this:
// as a variable
let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
// globally
rayon::ThreadPoolBuilder::new().num_threads(8).build_global().unwrap();
When using the ThreadPool, you can do a few things:
- Carry out a function on every single thread using
.broadcast()
- Use
.join()
to start some parallel work - Use
install()
which takes ajoin
type
Interested in learning more? You can find more about the ThreadPool
type here.
Use cases for rayon
Log analysis
Text processing is a great area where Rayon can provide a big performance bonus! Here we can analyse a log file that has been extracted from somewhere by filtering for any lines that contain the word “ERROR” in uppercase.
use std::fs::read_to_string;
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
fn main() {
// read a csv file to a string
let my_string = read_to_string("my_file.txt").unwrap();
let my_vec = my_string.lines().collect::<Vec<&str>>();
let my_vec: Vec<String> = my_vec.into_par_iter()
.filter(|x| x.contains("ERROR"))
.map(|x| x.to_owned())
.collect();
// this should now print a vec of vecs
// where every single value is the "Hello world!" string
println!("{:?}", my_vec);
}
CSV processing
Of course, where there’s mountains of data, data parallelism will always be a good thing. You can parse a file, split it then parallelise the reading. In the below example, we parse a CSV file and turn every value into a “Hello world!” string:
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use std::fs::read_to_string;
use std::str::Split;
fn main() {
// read a csv file to a string
let my_string = read_to_string("my_file.csv").unwrap();
let my_vec: Vec<String> = my_string
.lines()
.filter(|x| *x != String::new())
.map(|x| x.to_string())
.collect();
println!("{my_vec:?}");
let my_vec: Vec<Vec<String>> = my_vec
.into_par_iter()
.map(|x| {
let x = x.split(",").map(|x| x.to_string()).collect::<Vec<String>>();
let res: Vec<String> = x
.into_par_iter()
.map(|_| "Hello world!".to_string())
.collect();
res
})
.collect();
// this should now print a vec of vecs
// where every single value is the "Hello world!" string
println!("{:?}", my_vec);
}
For many use cases, the more sensible option here most of the time would be to use the csv
crate. However, that doesn’t stop us from implementing our own CSV parser - particularly if we’re not using the serde
crate to deserialize and serialize our CSV records.
Pitfalls
Rayon is a very powerful crate. However, there are some pitfalls that can happen while using it if you’re not careful.
Blocking rayon threads (deadlocks)
Work done in parallel rayon threads that block each others’ threads are something you absolutely want to avoid here. One example of this may be a mutex that gets locked on every iteration.
fn main() {
let arcmutex = Arc::new(Mutex::new(0));
let res = (0..5000).par_iter().for_each|x| {
let mut mutex_locked = arcmutex.lock().unwrap();
mutex_locked += x;
}.collect();
println!("{res}");
}
Here, you can see that the mutex gets locked on every iteration! That’s a terrible way to do it. Clearly, we need a better way. We can avoid this totally by simply collecting the parallelized work, and then adding it to our wrapped number:
fn main() {
let arcmutex = Arc::new(Mutex::new(0));
let res = (0..5000).par_iter().sum();
let mut locked_mutex = arcmutex.lock().unwrap();
*locked_mutex += res;
println!("{locked_mutex:?}");
}
Workload too small
If your workload is too small, the overhead from using the rayon
crate can neutralise any performance gains. This is particularly because of the thread pool management and dealing with work-stealing. However, whether this matters to you will typically depend on your use case.
Finishing up
Thanks for reading! Rayon is an awesome crate to help you speed up your data processing.
Read more: