From 3355358ac5adfe86eb9c4895d691f7b7acd43be9 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Wed, 2 Apr 2025 14:26:09 -0400 Subject: [PATCH] Moved client responses to mutex. --- src/client.rs | 170 ++++++++++++++++++++++++++++++++++++++++---------- src/lib.rs | 17 ++--- src/main.rs | 2 +- src/queue.rs | 16 ++++- src/utils.rs | 17 +++++ 5 files changed, 179 insertions(+), 43 deletions(-) create mode 100644 src/utils.rs diff --git a/src/client.rs b/src/client.rs index 91f5fdb..9280d15 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,40 +1,28 @@ +use crate::{queue::Message, utils::GenID}; use std::{ - sync::mpsc::{channel, Receiver, Sender}, + collections::HashMap, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, thread::spawn, }; +use uuid::Uuid; -pub struct Request { - tx: Sender, -} +pub struct Request; impl Request { - pub fn new(tx: Sender) -> Self { - Self { tx: tx } - } - - fn get_sender(&self) -> &Sender { - &self.tx + pub fn new() -> Self { + Self {} } } #[cfg(test)] mod requests { use super::*; - use replies::create_reply; - pub fn create_request() -> (Request, Receiver) { - let (tx, rx) = channel(); - let req = Request::new(tx); - (req, rx) - } - - #[test] - fn new_request() { - let (tx, rx) = channel(); - let req = Request::new(tx); - let sender = req.get_sender(); - sender.send(create_reply()).unwrap(); - rx.recv().unwrap(); + pub fn create_request() -> Request { + Request::new() } } @@ -59,28 +47,145 @@ mod replies { } } +#[derive(Clone)] +pub struct ClientRegistry { + registry: Arc>>>, +} + +impl ClientRegistry { + pub fn new() -> Self { + Self { + registry: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn create_storage() -> Arc>>> { + Arc::new(Mutex::new(HashMap::new())) + } + + fn get_id<'a>( + gen: &mut impl Iterator, + data: &HashMap>, + ) -> Uuid { + let mut id = gen.next().unwrap(); + while data.contains_key(&id) { + id = gen.next().unwrap(); + } + id.clone() + } + + pub fn add(&mut self, tx: Sender) -> Uuid { + let id = Uuid::new_v4(); + let mut reg = self.registry.lock().unwrap(); + let mut gen_id = GenID::new(); + let id = ClientRegistry::get_id(&mut gen_id, ®); + reg.insert(id.clone(), tx); + id + } + + fn send(&mut self, id: &Uuid, msg: Reply) { + let mut reg = self.registry.lock().unwrap(); + let tx = reg.get(id).unwrap(); + tx.send(msg).unwrap(); + reg.remove(id).unwrap(); + } +} + +#[cfg(test)] +mod clientregistries { + use super::*; + use std::{ + sync::mpsc::{channel, Receiver}, + time::Duration, + }; + + static TIMEOUT: Duration = Duration::from_millis(500); + + #[test] + fn create_client_registry() { + let reg = ClientRegistry::new(); + let data = reg.registry.lock().unwrap(); + assert!(data.is_empty(), "needs to create an empty hashmap"); + } + + #[test] + fn send_from_client() { + let mut reg = ClientRegistry::new(); + let count = 10; + let mut rxs: HashMap> = HashMap::new(); + for _ in 0..count { + let (tx, rx) = channel::(); + let id = reg.add(tx); + rxs.insert(id, rx); + } + assert_eq!(rxs.len(), count, "should have been {} receivers", count); + for (id, rx) in rxs.iter() { + let msg = Reply {}; + reg.send(id, msg); + rx.recv_timeout(TIMEOUT).unwrap(); + } + let data = reg.registry.lock().unwrap(); + assert!(data.is_empty(), "should remove sender after sending"); + } + + #[test] + fn prevent_duplicates() { + let mut reg = ClientRegistry::new(); + let (tx, rx) = channel::(); + let existing = reg.add(tx); + let expected = Uuid::new_v4(); + let ids = [existing.clone(), expected.clone()]; + let data = reg.registry.lock().unwrap(); + let result = ClientRegistry::get_id(&mut ids.into_iter(), &data); + assert_eq!(result, expected); + } +} + +#[derive(Clone)] +pub struct ClientLink; + +impl ClientLink { + fn new() -> Self { + Self {} + } + + pub fn forward(&self, req: Request) -> Reply { + Reply {} + } +} + +#[cfg(test)] +mod clientlinks { + use super::*; + + #[test] + fn create_client_link() { + ClientLink::new(); + } +} + pub struct Client { - rx: Receiver, + rx: Receiver, } impl Client { - fn new(rx: Receiver) -> Self { + fn new(rx: Receiver) -> Self { Self { rx: rx } } - pub fn start() -> Sender { + pub fn start() -> ClientLink { let (tx, rx) = channel(); spawn(move || { let client = Client::new(rx); client.listen(); }); - tx + ClientLink::new() } fn listen(&self) { loop { let req = self.rx.recv().unwrap(); - req.get_sender().send(Reply {}).unwrap(); + //req.get_sender().send(Reply {}).unwrap(); } } } @@ -92,9 +197,8 @@ mod clients { #[test] fn start_client() { - let tx = Client::start(); - let (req, rx) = create_request(); - tx.send(req).unwrap(); - rx.recv().unwrap(); + let link = Client::start(); + let req = create_request(); + link.forward(req); } } diff --git a/src/lib.rs b/src/lib.rs index 041f628..c12c9bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,9 @@ mod client; mod field; mod queue; +mod utils; -use client::{Client, Reply, Request}; +use client::{Client, ClientLink, Reply, Request}; use field::Field; use queue::Message; use std::sync::{ @@ -12,26 +13,28 @@ use std::sync::{ #[derive(Clone)] pub struct MoreThanText { + client_link: ClientLink, registry: Arc>>>, - tx: Sender, } impl MoreThanText { pub fn new() -> Self { - let tx = Client::start(); Self { + client_link: Client::start(), registry: Arc::new(RwLock::new(Vec::new())), - tx: tx, } } - pub fn request(&self, _session: Option) -> Reply + pub fn request(&mut self, _session: Option) -> Reply where F: Into, { - let (tx, rx) = channel(); + let req = Request::new(); + self.client_link.forward(req) + /* let req = Request::new(tx); - self.tx.send(req).unwrap(); + self.tx.send(req.into()).unwrap(); rx.recv().unwrap() + */ } } diff --git a/src/main.rs b/src/main.rs index faa32ff..d22ec34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,7 +40,7 @@ async fn mtt_conn(jar: CookieJar, state: State) -> impl IntoRespon }; let (tx, mut rx) = channel(5); spawn(async move { - tx.send(state.request(sid)).await.unwrap(); + tx.send(state.clone().request(sid)).await.unwrap(); }); let reply = rx.recv().await.unwrap(); let cookie = Cookie::build((SESSION_KEY, reply.get_session())); diff --git a/src/queue.rs b/src/queue.rs index 7a42de3..4aa7368 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,4 +1,4 @@ -use crate::field::Field; +use crate::{client::Request, field::Field}; use std::{ collections::HashMap, sync::{ @@ -11,6 +11,7 @@ use uuid::Uuid; enum MsgType { ClientMessage, + ClientRequest, NewClientMessage, NoOp, } @@ -55,6 +56,13 @@ impl Message { } } +impl From for Message { + fn from(value: Request) -> Self { + let msg = Message::new(); + msg.reply(MsgType::ClientRequest) + } +} + #[cfg(test)] mod messages { use super::*; @@ -178,7 +186,11 @@ mod queues { for _ in 0..10 { tx.send(msg.reply(MsgType::NewClientMessage)).unwrap(); let result = rx.recv().unwrap(); - assert!(!ids.contains(&result.id.clone()), "{} is a duplicate", &result.id); + assert!( + !ids.contains(&result.id.clone()), + "{} is a duplicate", + &result.id + ); ids.push(result.id); } } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..6726153 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,17 @@ +use uuid::Uuid; + +pub struct GenID; + +impl GenID { + pub fn new() -> Self { + GenID {} + } +} + +impl Iterator for GenID { + type Item = Uuid; + + fn next(&mut self) -> Option { + Some(Uuid::new_v4()) + } +}