pub mod action; mod document; mod message; mod mtterror; pub mod name; mod queue; use document::{Clock, CreateDoc, Session}; use isolang::Language; use message::{wrapper::Message, MessageAction, MessageID}; use queue::{ data_director::{RegMsg, Register}, router::Queue, SenderID, }; use std::{ sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}, time::Duration, }; use uuid::Uuid; pub use action::*; pub use document::MissingTranslation; pub use mtterror::{ErrorID, MTTError}; pub use name::{Name, NameType}; pub use queue::data_director::{Include, Path}; #[cfg(test)] mod support_tests { use super::*; use std::time::Duration; pub static TIMEOUT: Duration = Duration::from_millis(500); pub fn random_name() -> Name { Name::english(Uuid::new_v4().to_string().as_str()) } } static TIMEOUT: Duration = Duration::from_secs(10); struct PreBuildClient { queue: Queue, msg_id: MessageID, rx: Receiver, sender_id: SenderID, } impl PreBuildClient { fn new(mut queue: Queue) -> Self { let sess_name = Session::doc_names()[0].clone(); let (tx, rx) = channel(); let sender_id = queue.add_sender(tx); let msg_id = MessageID::new(); let paths = [ Path::new( Include::Just(msg_id.clone()), Include::Just(sess_name.clone().into()), Include::Just(Action::Records), ), Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::Error), ), ]; for path in paths.iter().cloned() { let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path)); queue.send(Message::with_id(msg_id.clone(), reg_msg)); let result = rx.recv().unwrap(); } Self { queue: queue, msg_id: msg_id, rx: rx, sender_id: sender_id, } } fn send(&self, action: MA) -> Records where MA: Into, { self.queue .send(Message::with_id(self.msg_id.clone(), action)); let result = self.rx.recv().unwrap(); match result.get_action() { MsgAction::Records(data) => data.clone(), _ => unreachable!("session requests should always return data"), } } } pub struct MTTClient { queue: Queue, rx: Receiver, sender_id: SenderID, session_id: Uuid, } impl MTTClient { fn internal_new(prebuild: PreBuildClient, lang: Option) -> Self { let mut add = Addition::new(Session::doc_names()[0].clone()); match lang { Some(language) => { let field: Field = language.into(); add.add_field(Session::language_field_names()[0].clone(), field); } None => {} } let result = prebuild.send(add); let rec = result.iter().last().unwrap(); let session_id = match rec.get(Session::id_field_names()[0].clone()).unwrap() { Field::Uuid(data) => data.clone(), _ => unreachable!("should always be uuid"), }; Self { queue: prebuild.queue, rx: prebuild.rx, sender_id: prebuild.sender_id, session_id: session_id, } } fn new(mut queue: Queue, lang: Option) -> Self { let prebuild = PreBuildClient::new(queue); Self::internal_new(prebuild, lang) } fn with_session(queue: Queue, id: String, lang: Option) -> Self { let prebuild = PreBuildClient::new(queue.clone()); let sess_id = match Uuid::try_from(id.as_str()) { Ok(data) => data, Err(_) => return Self::internal_new(prebuild, lang), }; let mut qry = Query::new(Session::doc_names()[0].clone()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); calc.add_value(sess_id).unwrap(); qry.add(Session::id_field_names()[0].clone(), calc); let result = prebuild.send(qry); if result.len() == 0 { Self::internal_new(prebuild, lang) } else { let rec = result.iter().last().unwrap(); let session_id = match rec.get(Session::id_field_names()[0].clone()).unwrap() { Field::Uuid(data) => data.clone(), _ => unreachable!("should always be uuid"), }; Self { queue: queue, rx: prebuild.rx, sender_id: prebuild.sender_id.clone(), session_id: session_id, } } } pub fn session_id(&self) -> String { self.session_id.to_string() } } impl Drop for MTTClient { fn drop(&mut self) { self.queue.remove_sender(&self.sender_id); } } #[derive(Clone)] pub struct MoreThanText { queue: Queue, } impl MoreThanText { pub fn new() -> Self { let queue = Queue::new(); let mut output = Self { queue: queue.clone(), }; Clock::start(queue.clone()); CreateDoc::start(queue.clone()); output .create_document(Session::document_definition()) .unwrap(); output } pub fn client(&self) -> MTTClient { MTTClient::new(self.queue.clone(), None) } pub fn client_with_language(&self, lang: Language) -> MTTClient { MTTClient::new(self.queue.clone(), Some(lang)) } pub fn client_with_session(&self, id: String, lang: Option) -> MTTClient { MTTClient::with_session(self.queue.clone(), id, lang) } fn new_session(lang: Option) -> ClientAction { let mut output = Addition::new(Session::doc_names()[0].clone()); match lang { Some(data) => { let name = Session::language_field_names()[0].clone(); let field: Field = data.into(); output.add_field(name, field); } None => {} } output.into() } fn recursive_message_request(&mut self, action: CA, lang: Option) -> Uuid where CA: Into, { match self.records(action) { Ok(data) => { if data.len() == 0 { self.recursive_message_request(MoreThanText::new_session(lang), lang) } else { let rec = data.iter().last().unwrap(); match rec.get(Name::english("id")).unwrap() { Field::Uuid(id) => id, _ => unreachable!("should always return uuid"), } } } Err(_) => self.recursive_message_request(MoreThanText::new_session(lang), lang), } } pub fn validate_session(&mut self, session: Option, lang: Option) -> Uuid { let action = match session { Some(data) => match Uuid::try_from(data.as_str()) { Ok(id) => { let mut query = Query::new(Session::doc_names()[0].clone()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); calc.add_value(id).unwrap(); query.add(Name::english("id"), calc); query.into() } Err(_) => MoreThanText::new_session(lang), }, None => MoreThanText::new_session(lang), }; self.recursive_message_request(action, lang) } pub fn records(&mut self, request: UA) -> Result where UA: Into, { let req = request.into(); let (tx, rx) = channel(); let sender_id = self.queue.add_sender(tx); let doc_id = req.doc_name().clone(); let msg = Message::new(req); let msg_id = msg.get_message_id(); let paths = [ Path::new( Include::Just(msg_id.clone()), Include::Just(doc_id.clone()), Include::Just(Action::Records), ), Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::Error), ), ]; for path in paths.iter() { let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone())); self.queue.send(Message::new(reg_msg)); let result = rx.recv().unwrap(); let action = result.get_action(); match action { MsgAction::Register(status) => match status.get_msg() { RegMsg::Error(err) => { let mut error = err.clone(); error.add_parent(ErrorID::Document(msg.doc_name().clone())); self.queue.remove_sender(&sender_id); return Err(error); } _ => {} }, _ => unreachable!("got {:?} should have been a registry message", action), } } self.queue.send(msg); let output = match rx.recv_timeout(TIMEOUT) { Ok(data) => match data.get_action() { MsgAction::Records(data) => Ok(data.clone()), MsgAction::Error(err) => Err(err.clone()), _ => unreachable!("should only receive records or errors"), }, Err(_) => Err(MTTError::new(ErrorID::TimeOut)), }; self.queue.remove_sender(&sender_id); output } pub fn create_document(&mut self, docdef: DocDef) -> Result<(), MTTError> { let (tx, rx) = channel(); let sender_id = self.queue.add_sender(tx); let msg = Message::new(docdef); let msg_id = msg.get_message_id(); let paths = [ Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::DocumentCreated), ), Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::Error), ), ]; for path in paths.iter() { let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone())); self.queue.send(Message::new(reg_msg)); rx.recv().unwrap(); // Wait for completion. } self.queue.send(msg); let output = match rx.recv_timeout(TIMEOUT) { Ok(data) => match data.get_action() { MsgAction::DocumentCreated => Ok(()), MsgAction::Error(err) => Err(err.clone()), _ => unreachable!("should only receive records or errors"), }, Err(_) => Err(MTTError::new(ErrorID::TimeOut)), }; self.queue.remove_sender(&sender_id); output } pub fn get_document(&self, name: &str, id: &str) -> Result { if name == "page" { Ok("something".to_string()) } else { Err(MTTError::new(ErrorID::DocumentNotFound)) } } } pub struct TestMoreThanText { mtt: MoreThanText, queue: Queue, channel: Option>, } impl TestMoreThanText { pub fn new() -> Self { let mut mtt = MoreThanText::new(); let queue = mtt.queue.clone(); Self { mtt: mtt, queue: queue, channel: None, } } pub fn get_morethantext(&self) -> MoreThanText { self.mtt.clone() } pub fn send_time_pulse(&self) { let msg = Clock::gen_message(); self.queue.send(msg); } pub fn register_channel(&mut self, paths: Vec) { let mut queue = self.mtt.queue.clone(); let (tx, rx) = channel(); let sender_id = queue.add_sender(tx); for path in paths.iter() { let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone())); queue.send(Message::new(reg_msg)); rx.recv().unwrap(); // Wait for completion. } self.channel = Some(rx); } pub fn recv(&self) -> Result { match &self.channel { Some(rx) => rx.recv_timeout(Duration::from_millis(500)), None => panic!("test environment does not have a channel setup"), } } pub fn get_trigger_records(&self, action: Action) -> Records { let msg = self.recv().unwrap(); let msg_action = msg.get_action(); if action == msg_action.clone().into() { match msg_action { MsgAction::OnAddition(data) => data.clone(), MsgAction::OnDelete(data) => data.clone(), MsgAction::OnQuery(data) => data.clone(), MsgAction::OnUpdate(data) => data.clone(), _ => panic!("{:?} is not a trigger", action), } } else { panic!("received {:?} instead of {:?} trigger", msg, action); } } }