Setting up message routing.

This commit is contained in:
2025-07-17 09:12:23 -04:00
parent 9de7e75740
commit f673673dac
4 changed files with 271 additions and 153 deletions

View File

@@ -1,5 +1,8 @@
use crate::queue::{Message, MsgType, Queue};
use std::{sync::mpsc::{channel, Receiver}, thread::spawn};
use std::{
sync::mpsc::{channel, Receiver},
thread::spawn,
};
const RESPONDS_TO: [MsgType; 0] = [];

View File

@@ -1,39 +1,47 @@
use std::{collections::HashMap, sync::{Arc, RwLock, mpsc::{Sender, Receiver, channel}}};
use crate::field::Field;
use std::{
collections::HashMap,
sync::{
mpsc::{channel, Receiver, Sender},
Arc, RwLock,
},
};
use uuid::Uuid;
#[derive(Clone, Debug)]
enum MTTError {
DocumentAlreadyExists(String),
DocumentNotFound(String),
RouteNoListeners,
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum Action {
New,
NewDocumentType,
Query,
Reply,
Update,
}
#[derive(Clone)]
enum DocumentID {
enum NameID {
ID(Uuid),
Name(String),
}
impl From<&str> for DocumentID {
impl From<&str> for NameID {
fn from(value: &str) -> Self {
Self::Name(value.to_string())
}
}
impl From<String> for DocumentID {
impl From<String> for NameID {
fn from(value: String) -> Self {
Self::Name(value)
}
}
impl From<Uuid> for DocumentID {
impl From<Uuid> for NameID {
fn from(value: Uuid) -> Self {
Self::ID(value)
}
@@ -42,17 +50,20 @@ impl From<Uuid> for DocumentID {
#[derive(Clone)]
struct Message {
msg_id: Uuid,
document_id: DocumentID,
document_id: NameID,
action: Action,
//instructions: ?,
}
impl Message {
fn new<D>(doc_id: D, action: Action) -> Self where D: Into<DocumentID> {
fn new<D>(doc_id: D, action: Action) -> Self
where
D: Into<NameID>,
{
Self {
msg_id: Uuid::new_v4(),
document_id: doc_id.into(),
action: action
action: action,
}
}
@@ -60,16 +71,12 @@ impl Message {
&self.msg_id
}
fn get_document_id(&self) -> &DocumentID {
fn get_document_id(&self) -> &NameID {
&self.document_id
}
fn reply(&self) -> Self {
Self {
msg_id: self.msg_id.clone(),
document_id: DocumentID::Name("fred".to_string()),
action: Action::Update,
}
fn get_action(&self) -> &Action {
&self.action
}
}
@@ -81,12 +88,12 @@ mod messages {
fn can_the_document_be_a_stringi_reference() {
let dts = ["one", "two"];
for document in dts.into_iter() {
let msg = Message::new(document, Action::New);
let msg = Message::new(document, Action::NewDocumentType);
match msg.get_document_id() {
DocumentID::ID(_) => unreachable!("should have been a string id"),
DocumentID::Name(data) => assert_eq!(data, document),
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, document),
}
assert_eq!(msg.action, Action::New);
assert_eq!(msg.get_action(), &Action::NewDocumentType);
}
}
@@ -96,10 +103,10 @@ mod messages {
for document in dts.into_iter() {
let msg = Message::new(document.clone(), Action::Update);
match msg.get_document_id() {
DocumentID::ID(_) => unreachable!("should have been a string id"),
DocumentID::Name(data) => assert_eq!(data, &document),
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, &document),
}
assert_eq!(msg.action, Action::Update);
assert_eq!(msg.get_action(), &Action::Update);
}
}
@@ -108,8 +115,8 @@ mod messages {
let document = Uuid::new_v4();
let msg = Message::new(document.clone(), Action::Query);
match msg.get_document_id() {
DocumentID::ID(data) => assert_eq!(data, &document),
DocumentID::Name(_) => unreachable!("should have been an id"),
NameID::ID(data) => assert_eq!(data, &document),
NameID::Name(_) => unreachable!("should have been an id"),
}
assert_eq!(msg.action, Action::Query);
}
@@ -118,77 +125,107 @@ mod messages {
fn is_the_message_id_random() {
let mut ids: Vec<Uuid> = Vec::new();
for _ in 0..5 {
let msg = Message::new("tester", Action::New);
let msg = Message::new("tester", Action::NewDocumentType);
let id = msg.get_message_id().clone();
assert!(!ids.contains(&id), "{:?} containts {}", ids, id);
ids.push(id);
}
}
}
#[test]
fn does_the_message_reply_have_the_same_message_id() {
let msg = Message::new("tester", Action::New);
let reply = msg.reply();
assert_eq!(reply.get_message_id(), msg.get_message_id());
#[derive(Eq, Hash, PartialEq)]
struct Route {
action: Action,
doc_type: Option<Uuid>,
}
impl Route {
fn new(doc_type: Option<Uuid>, action: Action) -> Self {
Self {
action: action,
doc_type: doc_type,
}
}
}
struct QueueData {
senders: HashMap<Uuid, Sender<Message>>,
names: HashMap<String, Uuid>,
routes: HashMap<Route, Uuid>,
}
impl QueueData {
fn new() -> Self {
Self {
Self {
senders: HashMap::new(),
names: HashMap::new(),
routes: HashMap::new(),
}
}
fn register(&mut self, name: String, tx: Sender<Message>) -> Result<Uuid, MTTError> {
match self.names.get(&name) {
Some(_) => return Err(MTTError::DocumentAlreadyExists(name)),
None => {},
None => {}
}
let mut id = Uuid::new_v4();
while self.senders.contains_key(&id) {
id = Uuid::new_v4();
}
let id = Uuid::new_v4();
self.senders.insert(id.clone(), tx);
self.names.insert(name, id.clone());
Ok(id)
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
let tx = match msg.get_document_id() {
DocumentID::ID(id) => self.senders.get(id).unwrap(),
DocumentID::Name(name) => {
let docid = match self.names.get(name) {
Some(id) => id,
None => {
return Err(MTTError::DocumentNotFound(name.to_string()));
&Uuid::new_v4()
}
};
self.senders.get(docid).unwrap()
let doc_id = match msg.get_document_id() {
NameID::Name(name) => match self.names.get(name) {
Some(id) => Some(id.clone()),
None => return Err(MTTError::DocumentNotFound(name.clone())),
},
NameID::ID(id) => Some(id.clone()),
};
let route = Route::new(doc_id, msg.get_action().clone());
let sender_id = match self.routes.get(&route) {
Some(sender_id) => sender_id,
None => return Ok(()),
};
let tx = self.senders.get(sender_id).unwrap();
tx.send(msg).unwrap();
Ok(())
}
fn add_route(
&mut self,
sender_id: &Uuid,
doc_type: Option<String>,
action: Action,
) -> Result<(), MTTError> {
let doc_id = match doc_type {
Some(name) => Some(self.names.get(&name).unwrap().clone()),
None => None,
};
let route = Route::new(doc_id, action);
self.routes.insert(route, sender_id.clone());
Ok(())
}
}
#[cfg(test)]
mod queuedatas {
use super::*;
use std::time::Duration;
use std::{sync::mpsc::RecvTimeoutError, time::Duration};
static TIMEOUT: Duration = Duration::from_millis(500);
#[test]
fn can_a_new_document_type_be_rgistered() {
let name = Uuid::new_v4().to_string();
let action = Action::Query;
let (tx, rx) = channel();
let mut queuedata = QueueData::new();
let id = queuedata.register(name.clone(), tx).unwrap();
queuedata.add_route(&id, Some(name.clone()), action);
let msg = Message::new(name.clone(), Action::Query);
queuedata.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
@@ -228,6 +265,62 @@ mod queuedatas {
},
}
}
#[test]
fn is_send_okay_if_no_one_is_listening() {
let mut queuedata = QueueData::new();
let name = "something";
let (tx, _) = channel();
queuedata.register(name.to_string(), tx).unwrap();
let msg = Message::new("something", Action::NewDocumentType);
match queuedata.send(msg) {
Ok(_) => {}
Err(err) => unreachable!("got {:?}: should not error", err),
}
}
#[test]
fn can_certain_messages_be_ignored() {
let mut queuedata = QueueData::new();
let doctype = "test";
let (tx, rx) = channel();
let id = queuedata.register(doctype.to_string(), tx).unwrap();
queuedata.add_route(&id, Some(doctype.to_string()), Action::Query);
let msg = Message::new(doctype, Action::Query);
queuedata.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
let msg = Message::new(doctype, Action::Reply);
match rx.recv_timeout(TIMEOUT) {
Ok(_) => unreachable!("should timeout"),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should timeout"),
},
}
}
/*
#[test]
fn can_messages_be_directed() {
let mut queuedata = QueueData::new();
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
let id1 = queuedata.register("task".to_string(), tx1);
let id2 = queuedata.register("work".to_string(), tx2);
let msg = Message::new("task".to_string(), Action::Query);
queuedata.send(msg.clone()).unwrap();
let result = rx1.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
match rx2.recv_timeout(TIMEOUT) {
Ok(_) => unreachable!("should timeout"),
Err(err) => match err {
RecvTimeoutError::Timeout => {},
_ => unreachable!("should timeout"),
}
}
}
*/
}
#[derive(Clone)]
@@ -260,11 +353,9 @@ impl Document {
Self {}
}
fn start(queue: Queue) {
}
fn start(queue: Queue) {}
fn listen(&self) {
}
fn listen(&self) {}
}
#[cfg(test)]
@@ -274,13 +365,13 @@ mod documents {
#[test]
fn create_document_creation() {
let queue = Queue::new();
Document::start(queue);
Document::start(queue.clone());
}
}
// Create a double hash map. posswible names that leads to an id that is int eh ids
// \and the second is the id and the sender to be used.and a third for who wants to
// listen to what.
// Create a double hash map. posswible names that leads to an id that is int eh ids
// \and the second is the id and the sender to be used.and a third for who wants to
// listen to what.
//
// The queue has a read write lock on the abbove strucutee. A clone of this is given to
// every process.
// every process.

View File

@@ -218,7 +218,7 @@ pub mod sessions {
let mut msg = Message::new(MsgType::SessionValidate);
match lang {
Some(data) => msg.add_data("language", data.clone()),
None => {},
None => {}
}
queue.send(msg.clone()).unwrap();
let holder = rx.recv_timeout(TIMEOUT).unwrap();
@@ -357,7 +357,10 @@ pub mod sessions {
let reply = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(reply.get_id(), msg.get_id());
assert_eq!(reply.get_msg_type(), &MsgType::Session);
assert_eq!(reply.get_data("language").unwrap().to_language().unwrap(), DEFAULT_LANG);
assert_eq!(
reply.get_data("language").unwrap().to_language().unwrap(),
DEFAULT_LANG
);
}
#[test]
@@ -372,7 +375,10 @@ pub mod sessions {
let reply = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(reply.get_id(), msg.get_id());
assert_eq!(reply.get_msg_type(), &MsgType::Session);
assert_eq!(reply.get_data("language").unwrap().to_language().unwrap(), lang);
assert_eq!(
reply.get_data("language").unwrap().to_language().unwrap(),
lang
);
}
}
}