This content originally appeared on Level Up Coding - Medium and was authored by Itsuki
In one of my previous articles Rust: Execute Multiple Async Simultaneously: Multithreading vs Futures, I have shared a little bit about multi-threading in Rust with you.
In this article, let’s take a look at how we can transfer data between threads using message passing which ensures safe concurrency.
It can also be used to send from, for example, Axum server handlers to the outside app container to stop the server, perform tasks, and many more!
Let’s check it out together!
Basic Idea
Instead of having multiple actors or threads trying to communicate by sharing memory, using message passing, ie: communicating by sending each other messages containing data, helps us avoiding data races and ensuring safe concurrency.
Channel
Rust standard library mpsc provides an implementation of channel, a concept by which data is sent from one thread to another.
A channel consist of two parts, Sender and Receiver, with the following properties.
- First In First Out: All data sent on the Sender will become available on the Receiver in the same order as it was sent
- Infinite buffered: no send will block the calling thread
- Single Receiver. The Sender can be cloned to send to the same channel multiple times, but only one Receiver is supported.
Let’s do an example next to check out the idea in more detail.
Simple Implementation
Here is a simple implementation of sending a message from a thread and to the main thread. (I am using anyhow for error handling here.)
fn main() -> Result<()> {
let (sender, receiver) = channel::<String>();
let handle = thread::spawn(move || -> Result<()> {
let str = String::from("Hi from spawned thread!");
sender.send(str)?;
drop(sender);
Ok(())
});
handle.join().expect("Error Joining.")?;
let received = receiver.recv().context("Error receiving result.")?;
println!("Received: {received}"); // Received: Hi from spawned thread!
Ok(())
}
Technically speaking, we don’t have to call handle.join() in this case and everything would be fine, but I would point out a case where things won’t in couple seconds.
Let’s take a more detailed look at the different parts.
Send
We call send to send the message and drop to drop the sender.
To send to the same channel multiple times, all we have to do is to clone the Sender.
fn main() -> Result<()> {
let (sender, receiver) = channel::<String>();
let handle = thread::spawn(move || -> Result<()> {
let clone = sender.clone();
clone.send("Start!".to_owned())?;
drop(clone);
for i in 1..5 {
let str = format!("Hi {} from the spawned thread!", i);
let clone = sender.clone();
clone.send(str)?;
drop(clone);
}
drop(sender);
Ok(())
});
handle.join().expect("Error Joining.")?;
while let Ok(received) = receiver.recv() {
println!("Received: {received}");
// Received: Start!
// Received: Hi 1 from the spawned thread!
// Received: Hi 2 from the spawned thread!
// Received: Hi 3 from the spawned thread!
// Received: Hi 4 from the spawned thread!
// Received: Finish!
}
Ok(())
}
Note that all senders (the original and its clones) need to be dropped for the receiver to stop blocking to receive messages with Receiver::recv.
Receive with recv
In both our examples above, we have used the recv. This function will always block the current thread if there is no data available and it’s possible for more data to be sent (at least one sender still exists). Once a message is sent to the corresponding Sender, this receiver will wake up and return that message.
If the corresponding Sender has disconnected, or it disconnects while this call is blocking, this call will wake up and return Err to indicate that no more messages can ever be received on this channel. However, since channels are buffered, messages sent before the disconnect will still be properly received. That is why even though we have drop the sender after sending the message, we will still receive the message with success.
Other Recv Options
We also have couple other methods for receiving the message
First of all, we have try_recv, this will attempt to return a pending value on this receiver without blocking and we can use it as an optimistic check before deciding to block on a receiver.
Compared with recv, this function has two failure cases instead of one, one for disconnection, and one for an empty buffer.
Here is a simple example on how we can use it.
fn main() -> Result<()> {
let (sender, receiver) = channel::<String>();
let handle = thread::spawn(move || -> Result<()> {
let str = String::from("Hi from spawned thread!");
sender.send(str)?;
drop(sender);
Ok(())
});
handle.join().expect("Error Joining.")?;
let received = receiver.try_recv().context("Error receiving result.")?;
println!("Received: {received}"); // Received: Hi from spawned thread!
Ok(())
}
Just like what we expect!
However, if we have not called handle.join() in this case, we will get an error because try_recv will not wait for a message to become available.
fn main() -> Result<()> {
let (sender, receiver) = channel::<String>();
let handle = thread::spawn(move || -> Result<()> {
let str = String::from("Hi from spawned thread!");
sender.send(str)?;
drop(sender);
Ok(())
});
assert!(receiver.try_recv().is_err());
Ok(())
}
Here is how we can use try_recv as an optimistic check.
fn main() -> Result<()> {
let (sender, receiver) = channel::<String>();
let _handle = thread::spawn(move || -> Result<()> {
thread::sleep(Duration::from_millis(5));
sender.clone().send("Start!".to_owned())?;
for i in 1..5 {
let str = format!("Hi {} from the spawned thread!", i);
sender.clone().send(str)?;
}
sender.clone().send("Finish!".to_owned())?;
drop(sender);
Ok(())
});
// handle.join().expect("Error Joining.")?;
while receiver.try_recv().is_err() {
for i in 1..5 {
println!("{} from the main thread!", i);
}
}
while let Ok(received) = receiver.try_recv() {
println!("Received: {received}");
}
Ok(())
}
Note how we are not calling handler.join in this case? Because we don’t want to wait for the spawn thread to finish before our main can start executing.
And here is the result.
1 from the main thread!
2 from the main thread!
3 from the main thread!
4 from the main thread!
// ... repeated for many times!
1 from the main thread!
2 from the main thread!
3 from the main thread!
4 from the main thread!
Received: Hi 1 from the spawned thread!
Received: Hi 2 from the spawned thread!
Received: Hi 3 from the spawned thread!
Received: Hi 4 from the spawned thread!
Received: Finish!
In addition, we have recv_timeout that attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up, or if it waits more than timeout. Since we are waiting, this one will also block.
We also have recv_deadline that will wait for a value on this receiver, returning an error if the corresponding channel has hung up, or if deadline is reached. This one is a nightly-only experimental API. (deadline_api #46316)
Multi-Receiver
We have use the channel from mpsc and that’s why we are only allow to have one receiver.
There is also a channel from the mpmc module from the rust standard library that allows Multi-producer, multi-consumer FIFO queue communication primitives.
Note that this is a nightly-only experimental API. (mpmc_channel #126840)
Synchronous version
In the channel we have used above, it is asynchronous and we have an infinite buffer. There is also a synchronous implementation sync_channel which will block after its buffer limit is reached).
Couple Other Use Cases
Receive on Another Thread
Of course, we are not limited to receive on main thread.
fn main() -> Result<()> {
let (sender, receiver) = channel::<String>();
let send_handler = thread::spawn(move || -> Result<()> {
let str = String::from("Hi from send thread!");
sender.send(str)?;
drop(sender);
Ok(())
});
let receive_handler = thread::spawn(move || -> Result<()> {
let received = receiver.recv().context("Error receiving result.")?;
println!("Received: {received}"); // Received: Hi from spawned thread!
Ok(())
});
send_handler.join().expect("Error Joining.")?;
receive_handler.join().expect("Error Joining.")?;
Ok(())
}
From Axum Handlers
We can also send messages from Axum server/handlers spawned in a different thread to the main and eventually stop the server if needed.
#[tokio::main]
async fn main() -> Result<()> {
let (sender, receiver) = channel::<String>();
let handler: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let app = Router::new()
.route("/", get(test_sender))
.with_state(sender);
let listener = tokio::net::TcpListener::bind("localhost:3000").await?;
axum::serve(listener, app).await?;
Ok(())
});
println!("received: {}", receiver.recv()?); // received: hey
handler.abort();
Ok(())
}
async fn test_sender(State(sender): State<Sender<String>>) {
println!("sending");
sender
.send("hey".to_owned())
.expect("Error sending message.");
drop(sender);
}
Our server will run until we GET localhost:3000, we receive the message in our main, and we shut down our server.
Thank you for reading!
That’s it for today!
Happy channeling!
Rust: Transfer Data Between Threads was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.
This content originally appeared on Level Up Coding - Medium and was authored by Itsuki

Itsuki | Sciencx (2025-01-19T20:02:49+00:00) Rust: Transfer Data Between Threads. Retrieved from https://www.scien.cx/2025/01/19/rust-transfer-data-between-threads/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.