From 52b65060886c5121447dffdb376157cf386689f8 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Wed, 6 Nov 2024 21:05:52 -0500 Subject: [PATCH] rebuilt to use message router. --- src/cache.rs | 124 -------------------- src/client.rs | 157 ++++++++++++++++++++++++++ src/lib.rs | 136 ++++++++++++---------- src/main.rs | 12 +- src/message.rs | 93 +++++++++++++++ src/messages.rs | 21 ---- src/router.rs | 66 +++++++++++ src/session.rs | 238 +++++++++++++++++++++++++++++++++++++++ test/test_single_boot.py | 2 + 9 files changed, 640 insertions(+), 209 deletions(-) delete mode 100644 src/cache.rs create mode 100644 src/client.rs create mode 100644 src/message.rs delete mode 100644 src/messages.rs create mode 100644 src/router.rs create mode 100644 src/session.rs diff --git a/src/cache.rs b/src/cache.rs deleted file mode 100644 index 465ce75..0000000 --- a/src/cache.rs +++ /dev/null @@ -1,124 +0,0 @@ -use super::messages::{ReceiveMsg, SendMsg}; -use std::sync::mpsc::Receiver; -use uuid::Uuid; - -/// MoreThanText database Cache -pub struct Cache { - data: Vec, - rx: Receiver, -} - -impl Cache { - /// Create database cache - /// - /// This should not be called directly. - /// It is part of the MoreThanText::new function. - pub fn new(rx: Receiver) -> Self { - Self { - data: Vec::new(), - rx: rx, - } - } - - /// Starts listening for database requests. - /// - /// Should not be directly called. - /// Part of the MoreThanText::new function. - pub fn listen(&mut self) { - loop { - match self.rx.recv().unwrap() { - SendMsg::OpenSession(msg) => msg.tx.send(self.get_session(msg.id)).unwrap(), - } - } - } - - fn get_session(&mut self, id: Option) -> ReceiveMsg { - let sid: ReceiveMsg; - match id { - Some(input) => match Uuid::parse_str(&input) { - Ok(vid) => { - if self.data.contains(&vid) { - sid = ReceiveMsg::Session(vid); - } else { - sid = self.new_session(); - } - } - Err(_) => sid = self.new_session(), - }, - None => sid = self.new_session(), - } - sid - } - - fn new_session(&mut self) -> ReceiveMsg { - let mut id = Uuid::new_v4(); - while self.data.contains(&id) { - id = Uuid::new_v4(); - } - self.data.push(id.clone()); - ReceiveMsg::Session(id) - } -} - -#[cfg(test)] -mod session { - use super::*; - use std::sync::mpsc::channel; - - #[test] - fn unique_ids() { - let (_, rx) = channel(); - let mut cache = Cache::new(rx); - let mut ids: Vec = Vec::new(); - for _ in 1..10 { - let id = cache.get_session(None); - match id { - ReceiveMsg::Session(sid) => { - if ids.contains(&sid) { - assert!(false, "{} was a duplicate id.", sid) - } - ids.push(sid); - } - } - } - } - - #[test] - fn existing_ids_are_reused() { - let (_, rx) = channel(); - let mut cache = Cache::new(rx); - let id1: Uuid; - let id2: Uuid; - match cache.get_session(None) { - ReceiveMsg::Session(sid) => id1 = sid, - } - match cache.get_session(Some(id1.to_string())) { - ReceiveMsg::Session(sid) => id2 = sid, - } - assert_eq!(id2, id1); - } - - #[test] - fn bad_ids_generate_new_ones() { - let (_, rx) = channel(); - let mut cache = Cache::new(rx); - let id: Uuid; - let bad_id = "A very bad id"; - match cache.get_session(Some(bad_id.to_string())) { - ReceiveMsg::Session(sid) => id = sid, - } - assert_ne!(id.to_string(), bad_id); - } - - #[test] - fn expired_ids_generate_new_ids() { - let (_, rx) = channel(); - let mut cache = Cache::new(rx); - let old_id = Uuid::new_v4(); - let id: Uuid; - match cache.get_session(Some(old_id.to_string())) { - ReceiveMsg::Session(sid) => id = sid, - } - assert_ne!(id, old_id); - } -} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..be783d9 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,157 @@ +use super::{Message, Msg, MsgData, SessionMsg}; +use std::{ + collections::HashMap, + sync::mpsc::{channel, Receiver, Sender}, + thread::spawn, +}; +use uuid::Uuid; + +#[derive(Clone)] +pub enum ClientMsg { + OpenSession(Option, Sender), +} + +impl Msg for ClientMsg { + fn to_msgdata(&self) -> MsgData { + MsgData::Client(self.clone()) + } +} + +pub struct Client { + router_tx: Sender, + client_rx: Receiver, + requests: HashMap>, +} + +impl Client { + fn new(router_tx: Sender, client_rx: Receiver) -> Self { + Self { + router_tx: router_tx, + client_rx: client_rx, + requests: HashMap::new(), + } + } + + pub fn start(router_tx: Sender) -> Sender { + let (client_tx, client_rx) = channel(); + spawn(move || { + let mut client = Self::new(router_tx, client_rx); + client.listen(); + }); + client_tx + } + + fn listen(&mut self) { + loop { + let msg = self.client_rx.recv().unwrap(); + match self.requests.get(&msg.get_id()) { + Some(tx) => match msg.get_message() { + MsgData::Session(sess) => match sess { + SessionMsg::Opened(_) => tx.send(msg).unwrap(), + _ => {} + }, + _ => {} + }, + None => match msg.get_message() { + MsgData::Client(req) => match req { + ClientMsg::OpenSession(id, tx) => { + let sess = SessionMsg::Get(id.clone()); + self.requests.insert(msg.get_id(), tx.clone()); + self.router_tx.send(msg.reply(&sess)).unwrap(); + } + }, + _ => {} + }, + } + } + } +} + +#[cfg(test)] +mod messages { + use super::{super::SessionData, *}; + use std::time::Duration; + use uuid::Uuid; + + fn setup_client() -> (Sender, Receiver) { + let (tx, rx) = channel(); + let client_tx = Client::start(tx); + (client_tx, rx) + } + + fn opensession(id: &str) -> (Message, Receiver) { + let (tx, rx) = channel(); + let sess = ClientMsg::OpenSession(Some(id.to_string()), tx); + let msg = Message::new(&sess); + (msg, rx) + } + + #[test] + fn open_session() { + let (tx, rx) = setup_client(); + let sess_id = Uuid::new_v4().to_string(); + let (sess_msg, _) = opensession(&sess_id); + tx.send(sess_msg.clone()).unwrap(); + let result1 = rx.recv().unwrap(); + assert_eq!(result1.get_id(), sess_msg.get_id()); + match result1.get_message() { + MsgData::Session(req) => match req { + SessionMsg::Get(sess_data) => match sess_data { + Some(id) => assert_eq!(id.to_string(), sess_id), + _ => unreachable!("Should have returned some data."), + }, + _ => unreachable!("Should have been a get session message."), + }, + _ => unreachable!("Should have been a Session message."), + } + } + + #[test] + fn respond_session() { + let (tx, _rx) = setup_client(); + let (sess_msg, client_rx) = opensession(&Uuid::new_v4().to_string()); + tx.send(sess_msg.clone()).unwrap(); + let expected = Uuid::new_v4(); + let right = SessionData::new(expected.clone()); + let exp_msg = SessionMsg::Opened(right); + tx.send(sess_msg.reply(&exp_msg)).unwrap(); + let result = client_rx.recv().unwrap(); + assert_eq!(sess_msg.get_id(), result.get_id(), "Different message ids."); + match result.get_message() { + MsgData::Session(req) => match req { + SessionMsg::Opened(sess) => assert_eq!( + sess.to_string(), + expected.to_string(), + "Different sesssion ids." + ), + _ => unreachable!("Should have been an opened session."), + }, + _ => unreachable!("Should have been a session message."), + } + } + + #[test] + fn does_not_react_if_not_requested() { + let (tx, rx) = setup_client(); + let sess = SessionData::new(Uuid::new_v4()); + let exp_msg = SessionMsg::Opened(sess); + tx.send(Message::new(&exp_msg)).unwrap(); + match rx.recv_timeout(Duration::from_millis(500)) { + Err(_) => {} + _ => unreachable!("Should not receive anything."), + } + } + + #[test] + fn ignores_other_session_messages() { + let (tx, _rx) = setup_client(); + let (sess_msg, client_rx) = opensession(&Uuid::new_v4().to_string()); + tx.send(sess_msg.clone()).unwrap(); + let req = SessionMsg::Get(None); + tx.send(sess_msg.reply(&req)).unwrap(); + match client_rx.recv_timeout(Duration::from_millis(500)) { + Err(_) => {} + _ => unreachable!("Should not return anything."), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index ad58504..84fac69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,20 +1,48 @@ -mod cache; -mod messages; +mod client; +mod message; +mod router; +mod session; -use cache::Cache; -use messages::{ReceiveMsg, SendMsg, SessionRequest}; +//use client::ClientMsg; +//use router::Router; +//use session::{Session, SessionFilter, SessionMsg}; +use client::{Client, ClientMsg}; +use message::{Message, MsgData}; +use router::Router; +use session::{Session, SessionData, SessionMsg}; use std::{ sync::mpsc::{channel, Sender}, - thread::spawn, }; -use uuid::Uuid; -/// Application connection to the database +/// Support functions for Messages. +pub trait Msg { + fn to_msgdata(&self) -> MsgData; +} + +#[cfg(test)] +mod test_message { + use super::*; + + pub enum Tester { + Test1, + Test2, + } + + impl Msg for Tester { + fn to_msgdata(&self) -> MsgData { + match self { + Tester::Test1 => MsgData::Test1, + Tester::Test2 => MsgData::Test2, + } + } + } +} + +/// Application client to MoreThanText #[derive(Clone)] pub struct MoreThanText { - id: Option, - tx: Sender, - nodes: Vec + session: Option, + tx: Sender, } impl MoreThanText { @@ -25,38 +53,48 @@ impl MoreThanText { /// ``` /// use morethantext::MoreThanText; /// - /// MoreThanText::new(Vec::new()); + /// MoreThanText::new(); /// ``` - pub fn new(_nodes: Vec) -> Self { + pub fn new() -> Self { let (tx, rx) = channel(); - spawn(move || { - let mut cache = Cache::new(rx); - cache.listen(); - }); - Self { id: None, tx: tx, nodes: Vec::new() } + let mut senders = Vec::new(); + senders.push(Client::start(tx.clone())); + senders.push(Session::start(tx.clone())); + Router::start(senders, rx); + Self { + session: None, + tx: tx, + } } /// Opens an existing or new session with the database. /// /// If the string is None, incorrect, or expired, - /// a new session will be created. + /// a new session will be created. /// /// Example: /// /// ``` /// use morethantext::MoreThanText; /// - /// let mut mtt = MoreThanText::new(Vec::new()); + /// let mut mtt = MoreThanText::new(); /// mtt.open_session(None); /// mtt.open_session(Some("7b1ff340-7dfa-4f29-b144-601384e54423".to_string())); /// ``` pub fn open_session(&mut self, id: Option) { let (tx, rx) = channel(); - let request = SessionRequest { id: id, tx: tx }; - self.tx.send(SendMsg::OpenSession(request)).unwrap(); - match rx.recv().unwrap() { - ReceiveMsg::Session(sid) => self.id = Some(sid), - } + let req = ClientMsg::OpenSession(id, tx); + let msg = Message::new(&req); + self.tx.send(msg).unwrap(); + match rx.recv().unwrap().get_message() { + MsgData::Session(data) => { + match data { + SessionMsg::Opened(sess) => self.session = Some(sess.clone()), + _ => {}, + } + }, + _ => {}, + }; } /// Get the session id @@ -66,13 +104,13 @@ impl MoreThanText { /// ``` /// use morethantext::MoreThanText; /// - /// let mut mtt = MoreThanText::new(Vec::new()); - /// mtt.get_id(); + /// let mut mtt = MoreThanText::new(); + /// let id = mtt.get_id(); /// ``` pub fn get_id(&self) -> String { - match self.id { - Some(sid) => sid.to_string(), - None => "none".to_string(), + match &self.session { + Some(id) => id.to_string(), + None => "".to_string(), } } } @@ -82,40 +120,24 @@ mod mtt_client { use super::*; #[test] - fn uniques_ids() { - let mut mtt = MoreThanText::new(Vec::new()); - let mut ids: Vec = Vec::new(); - for _ in 1..10 { - mtt.open_session(None); - let id = mtt.id.clone().unwrap(); - if ids.contains(&id) { - assert!(false, "{} is a duplicate id", id); - } - ids.push(id); - } + fn default_values() { + let mtt = MoreThanText::new(); + assert!(mtt.session.is_none()); } #[test] - fn existing_ids_are_reused() { - let mut mtt = MoreThanText::new(Vec::new()); + fn new_session() { + let mut mtt = MoreThanText::new(); mtt.open_session(None); - let holder = mtt.id.clone().unwrap().to_string(); - mtt.open_session(Some(holder.clone())); - assert_eq!(mtt.id.clone().unwrap().to_string(), holder); + assert!(mtt.session.is_some()); } #[test] - fn bad_ids_generate_new_ones() { - let mut mtt = MoreThanText::new(Vec::new()); - mtt.open_session(Some("bad test string".to_string())); - assert!(mtt.id.is_some()); - } - - #[test] - fn incorrect_ids_generate_new_ones() { - let mut mtt = MoreThanText::new(Vec::new()); - let holder = Uuid::new_v4(); - mtt.open_session(Some(holder.clone().to_string())); - assert_ne!(mtt.id, Some(holder)); + fn session_ids_are_unique() { + let mut mtt = MoreThanText::new(); + mtt.open_session(None); + let id1 = mtt.get_id(); + mtt.open_session(None); + assert_ne!(mtt.get_id(), id1); } } diff --git a/src/main.rs b/src/main.rs index c2727fd..8a9ef75 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,8 +33,7 @@ mod http_session { async fn main() { let args = Args::parse(); let addr = format!("{}:{}", args.address, args.port); - let nodes = args.node; - let state = MoreThanText::new(nodes); + let state = MoreThanText::new(); let app = Router::new().route("/", get(handler)).with_state(state); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); axum::serve(listener, app.into_make_service()) @@ -43,12 +42,11 @@ async fn main() { } async fn handler(jar: CookieJar, mut state: State) -> impl IntoResponse { - let sid: Option; let mut cookies = jar.clone(); - match jar.get(SESSION_KEY) { - Some(cookie) => sid = Some(cookie.value().to_string()), - None => sid = None, - } + let sid = match jar.get(SESSION_KEY) { + Some(cookie) => Some(cookie.value().to_string()), + None => None, + }; state.open_session(sid.clone()); if !sid.is_some_and(|x| x == state.get_id()) { let cookie = Cookie::build((SESSION_KEY, state.get_id())); diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..9f37c74 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,93 @@ +use super::{ClientMsg, Msg, SessionMsg}; +use uuid::Uuid; + +/// Message Types +#[derive(Clone)] +pub enum MsgData { + Client(ClientMsg), + Session(SessionMsg), + Test1, + Test2, +} + +/// MoreThanText Message Structure +#[derive(Clone)] +pub struct Message { + id: Uuid, + msg: MsgData, +} + +impl Message { + pub fn new(data: &D) -> Self + where + D: Msg, + { + Self { + id: Uuid::new_v4(), + msg: data.to_msgdata(), + } + } + + pub fn get_message(&self) -> &MsgData { + &self.msg + } + + pub fn get_id(&self) -> Uuid { + self.id.clone() + } + + pub fn reply(&self, data: &D) -> Self + where + D: Msg, + { + Self { + id: self.id.clone(), + msg: data.to_msgdata(), + } + } +} + +#[cfg(test)] +mod messages { + use super::{super::test_message::Tester, *}; + + #[test] + fn new_messagees() { + let data = Tester::Test1; + let msg = Message::new(&data); + match msg.get_message() { + MsgData::Test1 => {} + _ => unreachable!("Should have received Test1"), + } + let data = Tester::Test2; + let msg = Message::new(&data); + match msg.get_message() { + MsgData::Test2 => {} + _ => unreachable!("Should have received Test1"), + } + } + + #[test] + fn message_ids_are_unique() { + let mut ids: Vec = Vec::new(); + let data = Tester::Test1; + for _ in 1..10 { + let msg = Message::new(&data); + assert!(!ids.contains(&msg.get_id()), "Had a duplicate id"); + ids.push(msg.get_id()); + } + } + + #[test] + fn create_replies() { + let data1 = Tester::Test1; + let data2 = Tester::Test2; + let msg = Message::new(&data1); + let reply = msg.reply(&data2); + assert_eq!(reply.get_id(), msg.get_id()); + match reply.get_message() { + MsgData::Test2 => {} + _ => unreachable!("Should have been a Test1"), + } + } +} diff --git a/src/messages.rs b/src/messages.rs deleted file mode 100644 index 3675a0a..0000000 --- a/src/messages.rs +++ /dev/null @@ -1,21 +0,0 @@ -/// These are the nessages that are used by the database. -use std::sync::mpsc::Sender; -use uuid::Uuid; - -/// Requests of the database. -pub enum SendMsg { - OpenSession(SessionRequest), -} - -/// Responses to database requests -pub enum ReceiveMsg { - Session(Uuid), -} - -/// Items needed for a session request. -pub struct SessionRequest { - /// Optional string reprosentation of the session id. - pub id: Option, - /// Channel Sender for the reply. - pub tx: Sender, -} diff --git a/src/router.rs b/src/router.rs new file mode 100644 index 0000000..0ca85cd --- /dev/null +++ b/src/router.rs @@ -0,0 +1,66 @@ +use super::Message; +use std::{ + sync::mpsc::{Receiver, Sender}, + thread::spawn, +}; + +pub struct Router { + txs: Vec>, + rx: Receiver, +} + +impl Router { + fn new(senders: Vec>, rx: Receiver) -> Self { + Self { + txs: senders, + rx: rx, + } + } + + pub fn start(senders: Vec>, rx: Receiver) { + spawn(move || { + let router = Router::new(senders, rx); + router.listen(); + }); + } + + fn listen(&self) { + loop { + let msg = self.rx.recv().unwrap(); + for tx in self.txs.iter() { + tx.send(msg.clone()).unwrap(); + } + } + } +} + +#[cfg(test)] +mod messages { + use std::sync::mpsc::channel; + use super::super::MsgData; + use super::{super::test_message::Tester, *}; + + #[test] + fn forward_messages() { + let (tx, rx) = channel(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let senders = vec![tx1, tx2]; + Router::start(senders, rx); + let data = Tester::Test1; + let msg = Message::new(&data); + tx.send(msg.clone()).unwrap(); + let result1 = rx1.recv().unwrap(); + assert_eq!(result1.get_id(), msg.get_id()); + match result1.get_message() { + MsgData::Test1 => {} + _ => unreachable!("Should have been test1."), + } + let result2 = rx2.recv().unwrap(); + assert_eq!(result2.get_id(), msg.get_id()); + match result2.get_message() { + MsgData::Test1 => {} + _ => unreachable!("Should have been test1."), + } + } +} diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..85148f9 --- /dev/null +++ b/src/session.rs @@ -0,0 +1,238 @@ +use super::{Message, Msg, MsgData}; +use std::{ + fmt, + sync::mpsc::{channel, Receiver, Sender}, + thread::spawn, +}; +use uuid::Uuid; + +#[derive(Clone)] +pub enum SessionMsg { + Get(Option), + Opened(SessionData), +} + +impl Msg for SessionMsg { + fn to_msgdata(&self) -> MsgData { + MsgData::Session(self.clone()) + } +} + +#[derive(Clone)] +pub struct SessionData { + id: Uuid, +} + +impl SessionData { + pub fn new(id: Uuid) -> Self { + Self { id: id } + } +} + +impl fmt::Display for SessionData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.id) + } +} + +pub struct Session { + router_tx: Sender, + session_rx: Receiver, + ids: Vec, +} + +impl Session { + fn new(router_tx: Sender, session_rx: Receiver) -> Self { + Self { + router_tx: router_tx, + session_rx: session_rx, + ids: Vec::new(), + } + } + + pub fn start(router_tx: Sender) -> Sender { + let (session_tx, session_rx) = channel(); + spawn(move || { + let mut session = Session::new(router_tx, session_rx); + session.listen(); + }); + session_tx + } + + fn listen(&mut self) { + loop { + let msg = self.session_rx.recv().unwrap(); + match msg.get_message() { + MsgData::Session(data) => match data { + SessionMsg::Get(req_id) => { + let id: Uuid; + match req_id { + Some(req) => { + match Uuid::try_parse(req) { + Ok(data) => { + if self.ids.contains(&data) { + id = data; + } else { + id = self.create_session(); + } + }, + Err(_) => id = self.create_session(), + } + }, + None => id = self.create_session(), + } + let data = SessionMsg::Opened(SessionData::new(id)); + self.router_tx.send(msg.reply(&data)).unwrap() + }, + _ => {} + }, + _ => {} + } + } + } + + fn create_session(&mut self) -> Uuid { + let id = Uuid::new_v4(); + self.ids.push(id.clone()); + id + } +} + +#[cfg(test)] +mod messages { + use super::{super::test_message::Tester, *}; + use std::time::Duration; + + fn setup_session() -> (Sender, Receiver) { + let (tx, rx) = channel(); + let session_tx = Session::start(tx); + (session_tx, rx) + } + + #[test] + fn ignore_unwanted_messages() { + let (tx, rx) = setup_session(); + let data = Tester::Test1; + let msg = Message::new(&data); + tx.send(msg).unwrap(); + match rx.recv_timeout(Duration::from_millis(500)) { + Err(_) => {} + _ => unreachable!("Should not receive anything."), + } + } + + #[test] + fn create_new_session() { + let (tx, rx) = setup_session(); + let msgdata = SessionMsg::Get(None); + let msg = Message::new(&msgdata); + tx.send(msg.clone()).unwrap(); + let result = rx.recv().unwrap(); + assert_eq!(result.get_id(), msg.get_id()); + match result.get_message() { + MsgData::Session(data) => match data { + SessionMsg::Opened(_) => {} + _ => unreachable!("Should have been an opened response."), + }, + _ => unreachable!("Should be a session responsee."), + } + } + + #[test] + fn ignore_session_replies() { + let (tx, rx) = setup_session(); + let msgdata = SessionMsg::Opened(SessionData::new(Uuid::new_v4())); + let msg = Message::new(&msgdata); + tx.send(msg).unwrap(); + match rx.recv_timeout(Duration::from_millis(500)) { + Err(_) => {} + _ => unreachable!("Should not receive anything."), + } + } + + #[test] + fn ids_must_be_unique() { + let (tx, rx) = setup_session(); + let msgdata = SessionMsg::Get(None); + let mut ids: Vec = Vec::new(); + for _ in 0..10 { + let msg = Message::new(&msgdata); + tx.send(msg).unwrap(); + match rx.recv().unwrap().get_message() { + MsgData::Session(msg) => match msg { + SessionMsg::Opened(sess) => { + let id = sess.to_string(); + assert!(!ids.contains(&id), "duplicated id found."); + ids.push(id); + } + _ => unreachable!("Shouuld have opened a session."), + }, + _ => unreachable!("Should be a session message"), + } + } + } + + #[test] + fn expired_ids_get_new() { + let (tx, rx) = setup_session(); + let old_id = Uuid::new_v4(); + let msgdata = SessionMsg::Get(Some(old_id.to_string())); + let msg = Message::new(&msgdata); + tx.send(msg.clone()).unwrap(); + let result = rx.recv().unwrap(); + assert_eq!(result.get_id(), msg.get_id()); + match result.get_message() { + MsgData::Session(msg) => match msg { + SessionMsg::Opened(sess) => assert_ne!(sess.to_string(), old_id.to_string()), + _ => unreachable!("Should habe been an Opened message."), + }, + _ => unreachable!("Should have been a session message."), + } + } + + #[test] + fn bad_session_ids_get_new() { + let (tx, rx) = setup_session(); + let id = "something badA"; + let msgdata = SessionMsg::Get(Some(id.to_string())); + let msg = Message::new(&msgdata); + tx.send(msg.clone()).unwrap(); + let result = rx.recv().unwrap(); + assert_eq!(result.get_id(), msg.get_id()); + match result.get_message() { + MsgData::Session(data) => match data { + SessionMsg::Opened(sess) => assert_ne!(sess.to_string(), id), + _ => unreachable!("Should habe been an Opened message."), + }, + _ => unreachable!("Should have been a session message."), + } + } + + #[test] + fn uses_existing_session() { + let (tx, rx) = setup_session(); + let msgdata = SessionMsg::Get(None); + let msg = Message::new(&msgdata); + tx.send(msg).unwrap(); + let result = rx.recv().unwrap(); + let thesess = match result.get_message() { + MsgData::Session(data) => match data { + SessionMsg::Opened(sess) => sess, + _ => unreachable!("Should habe been an Opened message."), + }, + _ => unreachable!("Should have been a session message."), + }; + let msgdata = SessionMsg::Get(Some(thesess.to_string())); + let msg = Message::new(&msgdata); + tx.send(msg.clone()).unwrap(); + let result = rx.recv().unwrap(); + assert_eq!(result.get_id(), msg.get_id()); + match result.get_message() { + MsgData::Session(data) => match data { + SessionMsg::Opened(sess) => assert_eq!(sess.to_string(), thesess.to_string(), "Should use existing sesssion."), + _ => unreachable!("Should habe been an Opened message."), + }, + _ => unreachable!("Should have been a session message."), + } + } +} diff --git a/test/test_single_boot.py b/test/test_single_boot.py index 27b0f56..223d544 100644 --- a/test/test_single_boot.py +++ b/test/test_single_boot.py @@ -1,6 +1,7 @@ """Tests for single server boot ups.""" from socket import gethostbyname, gethostname +from unittest import skip from aiohttp import ClientSession from .mtt_tc import MTTClusterTC, SESSION_KEY @@ -86,6 +87,7 @@ class BootUpTC(MTTClusterTC): self.assertIn(SESSION_KEY, response.cookies) self.assertNotEqual(response.cookies[SESSION_KEY].value, value) + @skip("Code not availaable yet.") async def test_sessions_are_shared_between_servers(self): """Does the session apply to the cluster.""" await self.create_cluster()