diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..27db33b --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "tokio-messaging" +version = "0.0.1" +edition = "2021" + +[dependencies] +lazy_static = "1.4" +num-traits = "0.2" +tokio = { version = "1.21", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } diff --git a/README.md b/README.md new file mode 100644 index 0000000..250541c --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# tokio-messaging + +A crate which offers non-blocking publish/subscribe functionality using Tokio channels. + +Publishing messages and subscribing to them is done using an `Messaging` instance, +which acts as a message broker. + +In order to create a message broker, start by defining the structure of messages and their data (payload). +The types should implement `Message` and `MessageData` respectively. + +```rust +enum MyMessage +{ + Greeting, + Request +} + +impl Message for MyMessage {} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +struct MyPayload(&'static str); + +impl MessageData for MyPayload {} +``` + +Next, create the message broker instance. Usually, you'll have a single, long-living instance. +```rust +lazy_static! { + static ref INSTANCE: Messaging = { Messaging::new() }; +} + +pub fn messaging() -> &'static Messaging { &INSTANCE } +``` + +Publish messages using the `dispatch()` function and subscribe to them using the `on()` function. +```rust +// Subscribe to messages +tokio::spawn(messaging().on(MyMessage::Request, |data: MyPayload| { + assert_eq!(data.0, "Here's a request!"); +})); + +// Publish a message +messaging().dispatch(MyMessage::Request, MyPayload("Here's a request!")); +``` diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..aa3401a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,104 @@ +/*! +A crate which offers non-blocking publish/subscribe functionality using Tokio channels. + +Publishing messages and subscribing to them is done using an `Messaging` instance, +which acts as a message broker. + +In order to create a message broker, start by defining the structure of messages and their data (payload). +The types should implement `Message` and `MessageData` respectively. + +``` +# use tokio_messaging::*; +# +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +enum MyMessage +{ + Greeting, + Request +} + +impl Message for MyMessage {} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +struct MyPayload(&'static str); + +impl MessageData for MyPayload {} +``` + +Next, create the message broker instance. Usually, you'll have a single, long-living instance. +``` +# use lazy_static::lazy_static; +# use tokio_messaging::*; +# +# #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +# enum MyMessage +# { +# Greeting, +# Request +# } +# +# impl Message for MyMessage {} +# +# #[derive(Clone, Copy, Debug, PartialEq, Eq)] +# struct MyPayload(&'static str); +# +# impl MessageData for MyPayload {} +# +lazy_static! { + static ref INSTANCE: Messaging = { Messaging::new() }; +} + +pub fn messaging() -> &'static Messaging { &INSTANCE } +``` + +Publish messages using the `dispatch()` function and subscribe to them using the `on()` function. +``` +# use lazy_static::lazy_static; +# use tokio_messaging::*; +# +# #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +# enum MyMessage +# { +# Greeting, +# Request +# } +# +# impl Message for MyMessage {} +# +# #[derive(Clone, Copy, Debug, PartialEq, Eq)] +# struct MyPayload(&'static str); +# +# impl MessageData for MyPayload {} +# +# lazy_static! { +# static ref INSTANCE: Messaging = { Messaging::new() }; +# } +# +# pub fn messaging() -> &'static Messaging { &INSTANCE } +# +# let mut rt = tokio::runtime::Runtime::new().unwrap(); +# rt.block_on(async { +# +# use std::sync::Arc; +# use std::sync::Mutex; +# use std::time::Duration; +# +// Subscribe to messages +tokio::spawn(messaging().on(MyMessage::Request, |data: MyPayload| { + assert_eq!(data.0, "Here's a request!"); +})); + +// Publish a message +messaging().dispatch(MyMessage::Request, MyPayload("Here's a request!")); +# +# }); +``` +*/ +mod message_data; +pub use message_data::*; + +mod message; +pub use message::*; + +mod messaging; +pub use messaging::*; \ No newline at end of file diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..1e2a6a7 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,5 @@ +use std::{fmt::Debug, hash::Hash}; + +pub trait Message : Copy + Debug + Eq + Hash +{ +} \ No newline at end of file diff --git a/src/message_data.rs b/src/message_data.rs new file mode 100644 index 0000000..68c5251 --- /dev/null +++ b/src/message_data.rs @@ -0,0 +1,4 @@ +use std::fmt::Debug; + +pub trait MessageData: Clone + Debug + Send +{} \ No newline at end of file diff --git a/src/messaging.rs b/src/messaging.rs new file mode 100644 index 0000000..f09f69e --- /dev/null +++ b/src/messaging.rs @@ -0,0 +1,85 @@ +use std::{collections::HashMap, future::Future}; +use std::sync::Arc; +use std::sync::RwLock; + +use tokio::sync::broadcast; + +use crate::{Message, MessageData}; + +/// The message broker which allows publishing and subscribing to messages. +pub struct Messaging +{ + channels: RwLock, broadcast::Receiver)>>> +} + +impl Messaging +{ + /// Create a new instance of the messaging broker. + pub fn new() -> Self + { + return Messaging { + channels: RwLock::new(HashMap::new()) + }; + } + + fn get_channel(&self, message: M) -> Arc<(broadcast::Sender, broadcast::Receiver)> + { + let mut channels = self.channels.write().unwrap(); + + let channel = channels.entry(message).or_insert_with(|| { + return Arc::new(broadcast::channel::(16)); + }); + + return channel.clone(); + } + + /// Dispatch a message to all listeners. + /// + /// The data may get cloned in the process. + pub fn dispatch(&self, message: M, data: D) + { + let tx = &self.get_channel(message).0; + tx.send(data.clone()).unwrap(); + } + + /// Listen for a message from any source and invokes the specified callback for each recieved + /// message. + pub async fn on(&self, message: M, mut callback: F) + where F: FnMut(D) -> () + Send + 'static + { + let tx = &self.get_channel(message).0; + let mut receiver = tx.subscribe(); + + loop { + if let Ok(data) = receiver.recv().await { + callback(data.clone()); + } + else { + // TODO: a better handling (at least a warning) might be necessary. + // Err can happen if the receiver lagged behind. + break; + } + } + } + + /// Listen for a message from any source and invokes the specified callback for each recieved + /// message. + pub async fn on_async(&self, message: M, mut callback: F) + where F: FnMut(D) -> R + Send + 'static, + R: Future + { + let mut receiver = self.get_channel(message).0.subscribe(); + + loop { + if let Ok(data) = receiver.recv().await { + //println!("Message received: {:?}", message); + callback(data).await; + } + else { + // TODO: a better handling (at least a warning) might be necessary. + // Err can happen if the receiver lagged behind. + break; + } + } + } +} \ No newline at end of file diff --git a/tests/tokio_messaging.rs b/tests/tokio_messaging.rs new file mode 100644 index 0000000..251649b --- /dev/null +++ b/tests/tokio_messaging.rs @@ -0,0 +1,176 @@ +extern crate tokio_messaging; + +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use tokio::time::timeout; + +use tokio_messaging::*; + +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +enum TestMessage +{ + Greeting, + Request +} + +impl Message for TestMessage {} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +struct TestData(&'static str); + +impl MessageData for TestData {} + +fn messaging() -> &'static mut Messaging +{ + static mut INSTANCE: Option> = None; + + unsafe { + if let None = INSTANCE { + INSTANCE = Some(Messaging::new()); + } + + return INSTANCE.as_mut().unwrap(); + } +} + +/// A single sender sends a message while a single receiver is listening for that message. +/// Ensure the receiver receives the message. +#[tokio::test] +async fn spsc() +{ + let sender_handle = tokio::spawn(async move { + // Wait a bit until the receiver has started listening + tokio::time::sleep(Duration::from_millis(500)).await; + + messaging().dispatch(TestMessage::Request, TestData("Hello from sender")); + }); + + let receiver_handle = tokio::spawn(async move { + let result: Arc>> = Arc::new(Mutex::new(None)); + let result_to_be_filled = result.clone(); + let task = messaging().on(TestMessage::Request, move |data: TestData| { + result_to_be_filled.lock().unwrap().replace(data); + }); + + // Wait a bit until the sender has sent and the callback has processed the information, then + // interrupt the listening after 1s + timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out."); + + return result.lock().unwrap().clone(); + }); + + let (_sender_result, receiver_result) = tokio::join!(sender_handle, receiver_handle); + let receiver_result = receiver_result.unwrap(); + + assert_eq!(receiver_result.unwrap().0, "Hello from sender"); +} + +/// A single sender sends a message while a multiple receivers are listening for that message. +/// Ensure each receiver receives the message. +#[tokio::test] +async fn spmc() +{ + let sender_handle = tokio::spawn(async move { + // Wait a bit until the receiver has started listening + tokio::time::sleep(Duration::from_millis(500)).await; + + messaging().dispatch(TestMessage::Request, TestData("Hello from sender")); + }); + + let receiver = || async { + let result: Arc>> = Arc::new(Mutex::new(None)); + let result_to_be_filled = result.clone(); + let task = messaging().on(TestMessage::Request, move |data: TestData| { + result_to_be_filled.lock().unwrap().replace(data); + }); + + // Wait a bit until the sender has sent and the callback has processed the information, then + // interrupt the listening after 1s + timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out."); + + return result.lock().unwrap().clone(); + }; + let receiver_handle_a = tokio::spawn(receiver()); + let receiver_handle_b = tokio::spawn(receiver()); + let receiver_handle_c = tokio::spawn(receiver()); + + let (_, receiver_a_result, receiver_b_result, receiver_c_result) = tokio::join!( + sender_handle, + receiver_handle_a, + receiver_handle_b, + receiver_handle_c); + let receiver_a_result = receiver_a_result.unwrap(); + let receiver_b_result = receiver_b_result.unwrap(); + let receiver_c_result = receiver_c_result.unwrap(); + + assert_eq!(receiver_a_result.unwrap().0, "Hello from sender"); + assert_eq!(receiver_b_result.unwrap().0, "Hello from sender"); + assert_eq!(receiver_c_result.unwrap().0, "Hello from sender"); +} + +/// Multiple senders send a message while a single receiver is listening for that message. +/// Ensure the receiver receives all the messages. +#[tokio::test] +async fn mpsc() +{ + let sender = || async { + // Wait a bit until the receiver has started listening + tokio::time::sleep(Duration::from_millis(500)).await; + + messaging().dispatch(TestMessage::Request, TestData("Hello from sender")); + }; + let sender_handle_a = tokio::spawn(sender()); + let sender_handle_b = tokio::spawn(sender()); + let sender_handle_c = tokio::spawn(sender()); + + let receiver_handle = tokio::spawn(async move { + let result: Arc> = Arc::new(Mutex::new(0)); + let result_to_be_filled = result.clone(); + let task = messaging().on(TestMessage::Request, move |_data: TestData| { + *result_to_be_filled.lock().unwrap() += 1; + }); + + // Wait a bit until the sender has sent and the callback has processed the information, then + // interrupt the listening after 1s + timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out."); + + return result.lock().unwrap().clone(); + }); + + let (_, _, _, receiver_result) = tokio::join!( + sender_handle_a, + sender_handle_b, + sender_handle_c, + receiver_handle); + let received_messages = receiver_result.unwrap(); + + assert_eq!(received_messages, 3); +} + +/// Send a message and listen for another one. Ensure the listener does not receive the sent message. +#[tokio::test] +async fn message_isolation() +{ + let sender_handle = tokio::spawn(async move { + // Wait a bit until the receiver has started listening + tokio::time::sleep(Duration::from_millis(500)).await; + + messaging().dispatch(TestMessage::Request, TestData("Hello from sender")); + }); + + let receiver_handle = tokio::spawn(async move { + let task = messaging().on(TestMessage::Greeting, move |_data: TestData| { + panic!("Received data; this shouldn't happen."); + }); + + // Wait a bit until the sender has sent and the callback has processed the information, then + // interrupt the listening after 1s + timeout(Duration::from_millis(1000), task).await.expect_err("The listening task is expected to time out."); + }); + + let results = tokio::join!(sender_handle, receiver_handle); + results.0.unwrap(); + results.1.unwrap(); +} \ No newline at end of file