diff --git a/src/client.rs b/src/client.rs index a569477..92e980f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,5 @@ use crate::{ - queue::{Message, MsgType}, + queue::{Message, MsgType, Queue}, utils::GenID, }; use std::{ @@ -62,10 +62,6 @@ impl ClientRegistry { } } - fn create_storage() -> Arc>>> { - Arc::new(Mutex::new(HashMap::new())) - } - fn get_id<'a>( gen: &mut impl Iterator, data: &HashMap>, @@ -78,7 +74,6 @@ impl ClientRegistry { } pub fn add(&mut self, tx: Sender) -> Uuid { - let id = Uuid::new_v4(); let mut reg = self.registry.lock().unwrap(); let mut gen_id = GenID::new(); let id = ClientRegistry::get_id(&mut gen_id, ®); @@ -158,7 +153,7 @@ impl ClientLink { } } - pub fn send(&mut self, req: Request) -> Receiver { + pub fn send(&mut self, _req: Request) -> Receiver { let (tx, rx) = channel(); let mut msg = Message::new(MsgType::ClientRequest); let id = self.registry.add(tx); @@ -211,7 +206,7 @@ impl Client { } } - pub fn start() -> ClientLink { + pub fn start(_queue: Queue) -> ClientLink { let (tx, rx) = channel(); let mut client = Client::new(rx); let link = ClientLink::new(tx, client.get_registry()); @@ -242,7 +237,8 @@ mod clients { #[test] fn start_client() { - let mut link = Client::start(); + let queue = Queue::new(); + let mut link = Client::start(queue.clone()); let req = create_request(); link.send(req); } diff --git a/src/field.rs b/src/field.rs index 4968786..d3c01a8 100644 --- a/src/field.rs +++ b/src/field.rs @@ -1,5 +1,4 @@ -use crate::queue::Message; -use std::{fmt, sync::mpsc::Sender}; +use std::fmt; use uuid::Uuid; #[derive(Clone, Debug)] @@ -55,7 +54,6 @@ impl fmt::Display for Field { #[cfg(test)] mod fields { use super::*; - use std::sync::mpsc::channel; #[test] fn string_to_field() { diff --git a/src/lib.rs b/src/lib.rs index 2787807..72c5c1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,23 +5,18 @@ mod utils; use client::{Client, ClientLink, Reply, Request}; use field::Field; -use queue::Message; -use std::sync::{ - mpsc::{channel, Sender}, - Arc, RwLock, -}; +use queue::Queue; #[derive(Clone)] pub struct MoreThanText { client_link: ClientLink, - registry: Arc>>>, } impl MoreThanText { pub fn new() -> Self { + let queue = Queue::new(); Self { - client_link: Client::start(), - registry: Arc::new(RwLock::new(Vec::new())), + client_link: Client::start(queue.clone()), } } diff --git a/src/queue.rs b/src/queue.rs index e9dfc8c..e10c0b9 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -2,18 +2,15 @@ use crate::{client::Request, field::Field}; use std::{ collections::HashMap, sync::{ - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, RwLock, + mpsc::Sender, + Arc, RwLock, }, - thread::spawn, }; use uuid::Uuid; #[derive(Clone)] pub enum MsgType { - ClientMessage, ClientRequest, - NewClientMessage, NoOp, } @@ -27,7 +24,7 @@ pub struct Message { impl Message { pub fn new(msg_type: MsgType) -> Self { Self { - id: Uuid::nil(), + id: Uuid::new_v4(), class: msg_type, data: HashMap::new(), } @@ -37,7 +34,7 @@ impl Message { Self { id: self.id.clone(), class: data, - data: self.data.clone(), + data: HashMap::new(), } } @@ -59,7 +56,7 @@ impl Message { } impl From for Message { - fn from(value: Request) -> Self { + fn from(_value: Request) -> Self { let msg = Message::new(MsgType::ClientRequest); msg.reply(MsgType::ClientRequest) } @@ -72,7 +69,6 @@ mod messages { #[test] fn new_message() { let msg = Message::new(MsgType::NoOp); - assert_eq!(msg.id, Uuid::nil()); match msg.class { MsgType::NoOp => (), _ => unreachable!("new defaults to noop"), @@ -80,18 +76,31 @@ mod messages { assert!(msg.data.is_empty()); } + #[test] + fn message_ids_are_random() { + let mut ids: Vec = Vec::new(); + for _ in 0..10 { + let msg = Message::new(MsgType::NoOp); + let id = msg.id.clone(); + assert!(!ids.contains(&id), "{} is a duplicate", id); + ids.push(id); + } + } + #[test] fn create_reply() { let id = Uuid::new_v4(); let mut msg = Message::new(MsgType::NoOp); msg.id = id.clone(); - let data = MsgType::NewClientMessage; + msg.add_data("test", "test"); + let data = MsgType::ClientRequest; let result = msg.reply(data); assert_eq!(result.id, id); match result.class { - MsgType::NewClientMessage => {} + MsgType::ClientRequest => {} _ => unreachable!("should have been a registration request"), } + assert!(result.data.is_empty()); } #[test] @@ -116,24 +125,25 @@ mod messages { } } -struct ServiceRegistry { - store: Arc>>>, +#[derive(Clone)] +pub struct Queue { + store: Arc>>>, } -impl ServiceRegistry { - fn new() -> Self { +impl Queue { + pub fn new() -> Self { Self { - store: Arc::new(Mutex::new(Vec::new())), + store: Arc::new(RwLock::new(Vec::new())), } } fn add(&self, tx: Sender) { - let mut store = self.store.lock().unwrap(); + let mut store = self.store.write().unwrap(); store.push(tx); } fn send(&self, msg: Message) { - let mut store = self.store.lock().unwrap(); + let store = self.store.read().unwrap(); for sender in store.iter() { sender.send(msg.clone()).unwrap(); } @@ -142,99 +152,18 @@ impl ServiceRegistry { #[cfg(test)] mod serviceredistries { + use std::sync::mpsc::channel; use super::*; #[test] - fn create_registry() { - let reg = ServiceRegistry::new(); + fn create_queue() { + let queue = Queue::new(); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); - reg.add(tx1); - reg.add(tx2); - reg.send(Message::new(MsgType::NoOp)); + queue.add(tx1); + queue.add(tx2); + queue.send(Message::new(MsgType::NoOp)); rx1.recv().unwrap(); rx2.recv().unwrap(); } } - -struct Queue { - registry: Arc>>>, - rx: Receiver, -} - -impl Queue { - fn new(rx: Receiver, registry: Arc>>>) -> Self { - Self { - registry: registry, - rx: rx, - } - } - - fn start(registry: Arc>>>) -> Sender { - let (tx, rx) = channel(); - spawn(move || { - let mut queue = Queue::new(rx, registry); - queue.listen(); - }); - tx - } - - fn listen(&mut self) { - loop { - let mut msg = self.rx.recv().unwrap(); - msg.id = Uuid::new_v4(); - let senders = self.registry.read().unwrap(); - for sender in senders.iter() { - sender.send(msg.reply(MsgType::ClientMessage)).unwrap(); - } - } - } -} - -#[cfg(test)] -mod queues { - use super::*; - use std::time::Duration; - - static TIMEOUT: Duration = Duration::from_millis(500); - - fn start_queue() -> (Sender, Receiver) { - let reg: Arc>>> = Arc::new(RwLock::new(Vec::new())); - let (tx, rx) = channel::(); - let mut data = reg.write().unwrap(); - data.push(tx); - drop(data); - let queue_tx = Queue::start(Arc::clone(®)); - (queue_tx, rx) - } - - #[test] - fn get_new_client_message() { - let (tx, rx) = start_queue(); - let initial = Message::new(MsgType::NoOp); - let msg = initial.reply(MsgType::NewClientMessage); - tx.send(msg).unwrap(); - let msg = rx.recv_timeout(TIMEOUT).unwrap(); - match msg.class { - MsgType::ClientMessage => assert_ne!(msg.id, initial.id), - _ => unreachable!("should have been a client message"), - } - } - - #[test] - fn new_client_messages_are_unique() { - let (tx, rx) = start_queue(); - let msg = Message::new(MsgType::NoOp); - let mut ids: Vec = Vec::new(); - for _ in 0..10 { - tx.send(msg.reply(MsgType::NewClientMessage)).unwrap(); - let result = rx.recv().unwrap(); - assert!( - !ids.contains(&result.id.clone()), - "{} is a duplicate", - &result.id - ); - ids.push(result.id); - } - } -}