Initial import

This commit is contained in:
Nehu 2022-11-10 21:36:26 +01:00
parent fc22033964
commit f454bb04e4
No known key found for this signature in database
GPG key ID: 78EB319889EB5FBF
8 changed files with 429 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
/Cargo.lock

9
Cargo.toml Normal file
View file

@ -0,0 +1,9 @@
[package]
name = "tokio-messaging"
version = "0.0.1"
edition = "2021"
[dependencies]
lazy_static = "1.4"
num-traits = "0.2"
tokio = { version = "1.21", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }

44
README.md Normal file
View file

@ -0,0 +1,44 @@
# tokio-messaging
A crate which offers non-blocking publish/subscribe functionality using Tokio channels.
Publishing messages and subscribing to them is done using an `Messaging` instance,
which acts as a message broker.
In order to create a message broker, start by defining the structure of messages and their data (payload).
The types should implement `Message` and `MessageData` respectively.
```rust
enum MyMessage
{
Greeting,
Request
}
impl Message for MyMessage {}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct MyPayload(&'static str);
impl MessageData for MyPayload {}
```
Next, create the message broker instance. Usually, you'll have a single, long-living instance.
```rust
lazy_static! {
static ref INSTANCE: Messaging<MyMessage, MyPayload> = { Messaging::new() };
}
pub fn messaging() -> &'static Messaging<MyMessage, MyPayload> { &INSTANCE }
```
Publish messages using the `dispatch()` function and subscribe to them using the `on()` function.
```rust
// Subscribe to messages
tokio::spawn(messaging().on(MyMessage::Request, |data: MyPayload| {
assert_eq!(data.0, "Here's a request!");
}));
// Publish a message
messaging().dispatch(MyMessage::Request, MyPayload("Here's a request!"));
```

104
src/lib.rs Normal file
View file

@ -0,0 +1,104 @@
/*!
A crate which offers non-blocking publish/subscribe functionality using Tokio channels.
Publishing messages and subscribing to them is done using an `Messaging` instance,
which acts as a message broker.
In order to create a message broker, start by defining the structure of messages and their data (payload).
The types should implement `Message` and `MessageData` respectively.
```
# use tokio_messaging::*;
#
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
enum MyMessage
{
Greeting,
Request
}
impl Message for MyMessage {}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct MyPayload(&'static str);
impl MessageData for MyPayload {}
```
Next, create the message broker instance. Usually, you'll have a single, long-living instance.
```
# use lazy_static::lazy_static;
# use tokio_messaging::*;
#
# #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
# enum MyMessage
# {
# Greeting,
# Request
# }
#
# impl Message for MyMessage {}
#
# #[derive(Clone, Copy, Debug, PartialEq, Eq)]
# struct MyPayload(&'static str);
#
# impl MessageData for MyPayload {}
#
lazy_static! {
static ref INSTANCE: Messaging<MyMessage, MyPayload> = { Messaging::new() };
}
pub fn messaging() -> &'static Messaging<MyMessage, MyPayload> { &INSTANCE }
```
Publish messages using the `dispatch()` function and subscribe to them using the `on()` function.
```
# use lazy_static::lazy_static;
# use tokio_messaging::*;
#
# #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
# enum MyMessage
# {
# Greeting,
# Request
# }
#
# impl Message for MyMessage {}
#
# #[derive(Clone, Copy, Debug, PartialEq, Eq)]
# struct MyPayload(&'static str);
#
# impl MessageData for MyPayload {}
#
# lazy_static! {
# static ref INSTANCE: Messaging<MyMessage, MyPayload> = { Messaging::new() };
# }
#
# pub fn messaging() -> &'static Messaging<MyMessage, MyPayload> { &INSTANCE }
#
# let mut rt = tokio::runtime::Runtime::new().unwrap();
# rt.block_on(async {
#
# use std::sync::Arc;
# use std::sync::Mutex;
# use std::time::Duration;
#
// Subscribe to messages
tokio::spawn(messaging().on(MyMessage::Request, |data: MyPayload| {
assert_eq!(data.0, "Here's a request!");
}));
// Publish a message
messaging().dispatch(MyMessage::Request, MyPayload("Here's a request!"));
#
# });
```
*/
mod message_data;
pub use message_data::*;
mod message;
pub use message::*;
mod messaging;
pub use messaging::*;

5
src/message.rs Normal file
View file

@ -0,0 +1,5 @@
use std::{fmt::Debug, hash::Hash};
pub trait Message : Copy + Debug + Eq + Hash
{
}

4
src/message_data.rs Normal file
View file

@ -0,0 +1,4 @@
use std::fmt::Debug;
pub trait MessageData: Clone + Debug + Send
{}

85
src/messaging.rs Normal file
View file

@ -0,0 +1,85 @@
use std::{collections::HashMap, future::Future};
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::broadcast;
use crate::{Message, MessageData};
/// The message broker which allows publishing and subscribing to messages.
pub struct Messaging<M: Message, D: MessageData>
{
channels: RwLock<HashMap<M, Arc<(broadcast::Sender<D>, broadcast::Receiver<D>)>>>
}
impl<M: Message, D: MessageData + 'static> Messaging<M, D>
{
/// Create a new instance of the messaging broker.
pub fn new() -> Self
{
return Messaging {
channels: RwLock::new(HashMap::new())
};
}
fn get_channel(&self, message: M) -> Arc<(broadcast::Sender<D>, broadcast::Receiver<D>)>
{
let mut channels = self.channels.write().unwrap();
let channel = channels.entry(message).or_insert_with(|| {
return Arc::new(broadcast::channel::<D>(16));
});
return channel.clone();
}
/// Dispatch a message to all listeners.
///
/// The data may get cloned in the process.
pub fn dispatch(&self, message: M, data: D)
{
let tx = &self.get_channel(message).0;
tx.send(data.clone()).unwrap();
}
/// Listen for a message from any source and invokes the specified callback for each recieved
/// message.
pub async fn on<F>(&self, message: M, mut callback: F)
where F: FnMut(D) -> () + Send + 'static
{
let tx = &self.get_channel(message).0;
let mut receiver = tx.subscribe();
loop {
if let Ok(data) = receiver.recv().await {
callback(data.clone());
}
else {
// TODO: a better handling (at least a warning) might be necessary.
// Err can happen if the receiver lagged behind.
break;
}
}
}
/// Listen for a message from any source and invokes the specified callback for each recieved
/// message.
pub async fn on_async<F, R>(&self, message: M, mut callback: F)
where F: FnMut(D) -> R + Send + 'static,
R: Future<Output = ()>
{
let mut receiver = self.get_channel(message).0.subscribe();
loop {
if let Ok(data) = receiver.recv().await {
//println!("Message received: {:?}", message);
callback(data).await;
}
else {
// TODO: a better handling (at least a warning) might be necessary.
// Err can happen if the receiver lagged behind.
break;
}
}
}
}

176
tests/tokio_messaging.rs Normal file
View file

@ -0,0 +1,176 @@
extern crate tokio_messaging;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::time::timeout;
use tokio_messaging::*;
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
enum TestMessage
{
Greeting,
Request
}
impl Message for TestMessage {}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct TestData(&'static str);
impl MessageData for TestData {}
fn messaging() -> &'static mut Messaging<TestMessage, TestData>
{
static mut INSTANCE: Option<Messaging<TestMessage, TestData>> = None;
unsafe {
if let None = INSTANCE {
INSTANCE = Some(Messaging::new());
}
return INSTANCE.as_mut().unwrap();
}
}
/// A single sender sends a message while a single receiver is listening for that message.
/// Ensure the receiver receives the message.
#[tokio::test]
async fn spsc()
{
let sender_handle = tokio::spawn(async move {
// Wait a bit until the receiver has started listening
tokio::time::sleep(Duration::from_millis(500)).await;
messaging().dispatch(TestMessage::Request, TestData("Hello from sender"));
});
let receiver_handle = tokio::spawn(async move {
let result: Arc<Mutex<Option<TestData>>> = Arc::new(Mutex::new(None));
let result_to_be_filled = result.clone();
let task = messaging().on(TestMessage::Request, move |data: TestData| {
result_to_be_filled.lock().unwrap().replace(data);
});
// Wait a bit until the sender has sent and the callback has processed the information, then
// interrupt the listening after 1s
timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out.");
return result.lock().unwrap().clone();
});
let (_sender_result, receiver_result) = tokio::join!(sender_handle, receiver_handle);
let receiver_result = receiver_result.unwrap();
assert_eq!(receiver_result.unwrap().0, "Hello from sender");
}
/// A single sender sends a message while a multiple receivers are listening for that message.
/// Ensure each receiver receives the message.
#[tokio::test]
async fn spmc()
{
let sender_handle = tokio::spawn(async move {
// Wait a bit until the receiver has started listening
tokio::time::sleep(Duration::from_millis(500)).await;
messaging().dispatch(TestMessage::Request, TestData("Hello from sender"));
});
let receiver = || async {
let result: Arc<Mutex<Option<TestData>>> = Arc::new(Mutex::new(None));
let result_to_be_filled = result.clone();
let task = messaging().on(TestMessage::Request, move |data: TestData| {
result_to_be_filled.lock().unwrap().replace(data);
});
// Wait a bit until the sender has sent and the callback has processed the information, then
// interrupt the listening after 1s
timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out.");
return result.lock().unwrap().clone();
};
let receiver_handle_a = tokio::spawn(receiver());
let receiver_handle_b = tokio::spawn(receiver());
let receiver_handle_c = tokio::spawn(receiver());
let (_, receiver_a_result, receiver_b_result, receiver_c_result) = tokio::join!(
sender_handle,
receiver_handle_a,
receiver_handle_b,
receiver_handle_c);
let receiver_a_result = receiver_a_result.unwrap();
let receiver_b_result = receiver_b_result.unwrap();
let receiver_c_result = receiver_c_result.unwrap();
assert_eq!(receiver_a_result.unwrap().0, "Hello from sender");
assert_eq!(receiver_b_result.unwrap().0, "Hello from sender");
assert_eq!(receiver_c_result.unwrap().0, "Hello from sender");
}
/// Multiple senders send a message while a single receiver is listening for that message.
/// Ensure the receiver receives all the messages.
#[tokio::test]
async fn mpsc()
{
let sender = || async {
// Wait a bit until the receiver has started listening
tokio::time::sleep(Duration::from_millis(500)).await;
messaging().dispatch(TestMessage::Request, TestData("Hello from sender"));
};
let sender_handle_a = tokio::spawn(sender());
let sender_handle_b = tokio::spawn(sender());
let sender_handle_c = tokio::spawn(sender());
let receiver_handle = tokio::spawn(async move {
let result: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let result_to_be_filled = result.clone();
let task = messaging().on(TestMessage::Request, move |_data: TestData| {
*result_to_be_filled.lock().unwrap() += 1;
});
// Wait a bit until the sender has sent and the callback has processed the information, then
// interrupt the listening after 1s
timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out.");
return result.lock().unwrap().clone();
});
let (_, _, _, receiver_result) = tokio::join!(
sender_handle_a,
sender_handle_b,
sender_handle_c,
receiver_handle);
let received_messages = receiver_result.unwrap();
assert_eq!(received_messages, 3);
}
/// Send a message and listen for another one. Ensure the listener does not receive the sent message.
#[tokio::test]
async fn message_isolation()
{
let sender_handle = tokio::spawn(async move {
// Wait a bit until the receiver has started listening
tokio::time::sleep(Duration::from_millis(500)).await;
messaging().dispatch(TestMessage::Request, TestData("Hello from sender"));
});
let receiver_handle = tokio::spawn(async move {
let task = messaging().on(TestMessage::Greeting, move |_data: TestData| {
panic!("Received data; this shouldn't happen.");
});
// Wait a bit until the sender has sent and the callback has processed the information, then
// interrupt the listening after 1s
timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out.");
});
let results = tokio::join!(sender_handle, receiver_handle);
results.0.unwrap();
results.1.unwrap();
}