Moved queue into separate module.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s

This commit is contained in:
2026-01-30 13:58:55 -05:00
parent a5e6d1c6e2
commit 5833d20ea6
9 changed files with 26 additions and 17 deletions

496
src/queue/data_director.rs Normal file
View File

@@ -0,0 +1,496 @@
use crate::{
message::{Action, Message, MsgAction},
mtterror::MTTError,
name::{Name, NameType, Names},
queue::router::Queue,
};
use std::{
collections::{HashMap, HashSet},
sync::mpsc::Receiver,
thread::spawn,
};
use uuid::Uuid;
#[derive(Clone, Debug, Eq, Hash)]
pub enum Include<T> {
All,
Just(T),
}
impl<T: PartialEq> PartialEq for Include<T> {
fn eq(&self, other: &Self) -> bool {
match self {
Include::All => true,
Include::Just(data) => match other {
Include::All => true,
Include::Just(other_data) => data == other_data,
},
}
}
}
#[cfg(test)]
mod includes {
use super::*;
#[test]
fn does_all_equal_evberything() {
let a: Include<isize> = Include::All;
let b: Include<isize> = Include::Just(5);
let c: Include<isize> = Include::Just(7);
assert!(a == a, "all should equal all");
assert!(a == b, "all should equal just");
assert!(b == a, "just should equal all");
assert!(b == b, "same just should equal");
assert!(b != c, "different justs do not equal");
}
}
#[derive(Clone, Debug)]
pub enum RegMsg {
AddRoute(Path),
AddDocName(Vec<Name>),
DocumentNameID(Uuid),
Error(MTTError),
GetNameID(Name),
Ok,
RemoveSender(Uuid),
RouteID(RouteID),
}
#[derive(Clone, Debug)]
pub struct Register {
msg: RegMsg,
sender_id: Uuid,
}
impl Register {
pub fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self {
Self {
msg: reg_msg,
sender_id: sender_id,
}
}
pub fn get_msg(&self) -> &RegMsg {
&self.msg
}
pub fn get_sender_id(&self) -> &Uuid {
&self.sender_id
}
pub fn response(&self, reg_msg: RegMsg) -> Self {
Self {
msg: reg_msg,
sender_id: self.sender_id.clone(),
}
}
}
#[derive(Clone, Debug)]
pub struct Path {
pub msg_id: Include<Uuid>,
pub doc: Include<NameType>,
pub action: Include<Action>,
}
impl Path {
pub fn new(id: Include<Uuid>, doc: Include<NameType>, action: Include<Action>) -> Self {
Self {
msg_id: id,
doc: doc,
action: action,
}
}
pub fn for_message<NT>(name: NT, action: &MsgAction) -> Self
where
NT: Into<NameType>,
{
Self {
msg_id: Include::Just(Uuid::new_v4()),
doc: Include::Just(name.into()),
action: Include::Just(action.into()),
}
}
}
#[cfg(test)]
mod paths {
use super::*;
use crate::{
message::{MsgAction, Records},
name::{Name, Names},
};
#[test]
fn can_create_for_message() {
let input = [
(Name::english("one"), MsgAction::Show),
(
Name::english("two"),
MsgAction::Records(Records::new(Names::new())),
),
];
for item in input.iter() {
let path = Path::for_message(item.0.clone(), &item.1);
match path.doc {
Include::Just(name) => assert_eq!(name, item.0.clone().into()),
_ => unreachable!("should have returned document name"),
}
match path.action {
Include::Just(action) => assert_eq!(action, item.1.clone().into()),
_ => unreachable!("should have returned action type"),
}
}
}
#[test]
fn message_ids_are_unique_for_message_paths() {
let count = 10;
let mut ids: Vec<Uuid> = Vec::new();
for _ in 0..count {
let path = Path::for_message(NameType::None, &MsgAction::Show);
let id = match path.msg_id {
Include::Just(data) => data.clone(),
Include::All => unreachable!("should have been a message id"),
};
assert!(!ids.contains(&id), "{:?} is duplicated in {:?}", id, ids);
ids.push(id);
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Route {
pub action: Include<Action>,
pub doc_id: Include<Uuid>,
pub msg_id: Include<Uuid>,
}
impl Route {
pub fn new(msg_id: Include<Uuid>, doc: Include<Uuid>, action: Include<Action>) -> Self {
Self {
action: action,
doc_id: doc,
msg_id: msg_id,
}
}
}
impl Default for Route {
fn default() -> Self {
Self {
action: Include::All,
doc_id: Include::All,
msg_id: Include::All,
}
}
}
impl From<RouteID> for Route {
fn from(value: RouteID) -> Self {
Self::from(&value)
}
}
impl From<&RouteID> for Route {
fn from(value: &RouteID) -> Self {
Self {
action: match &value.action {
Some(data) => Include::Just(data.clone()),
None => Include::All,
},
doc_id: match &value.doc_id {
Some(doc) => Include::Just(doc.clone()),
None => Include::All,
},
msg_id: match &value.msg_id {
Some(msg) => Include::Just(msg.clone()),
None => Include::All,
},
}
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct RouteID {
action: Option<Action>,
doc_id: Option<Uuid>,
msg_id: Option<Uuid>,
}
impl From<Route> for RouteID {
fn from(value: Route) -> Self {
Self {
action: match value.action {
Include::All => None,
Include::Just(action) => Some(action.clone()),
},
doc_id: match value.doc_id {
Include::All => None,
Include::Just(doc) => Some(doc.clone()),
},
msg_id: match value.msg_id {
Include::All => None,
Include::Just(id) => Some(id.clone()),
},
}
}
}
struct RouteStorage {
data: HashMap<RouteID, HashSet<Uuid>>,
}
impl RouteStorage {
fn new() -> Self {
Self {
data: HashMap::new(),
}
}
fn add(&mut self, route: Route, sender_id: Uuid) -> RouteID {
let route_id: RouteID = route.into();
let set = match self.data.get_mut(&route_id) {
Some(result) => result,
None => {
let holder = HashSet::new();
self.data.insert(route_id.clone(), holder);
self.data.get_mut(&route_id).unwrap()
}
};
set.insert(sender_id);
route_id
}
fn remove_sender_id(&mut self, sender_id: &Uuid) {
let mut removal: Vec<RouteID> = Vec::new();
for (route_id, set) in self.data.iter_mut() {
set.remove(sender_id);
if set.is_empty() {
removal.push(route_id.clone());
}
}
for route_id in removal.iter() {
self.data.remove(route_id);
}
}
fn get(&self, route: Route) -> HashSet<Uuid> {
let mut output = HashSet::new();
for (route_id, set) in self.data.iter() {
if route == route_id.into() {
output = output.union(set).cloned().collect();
}
}
output
}
}
#[cfg(test)]
mod route_storeage {
use super::*;
#[test]
fn can_add_routes() {
let mut routes = RouteStorage::new();
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
let route_id1 = routes.add(route1.clone(), id1.clone());
let route_id2 = routes.add(route2.clone(), id2.clone());
let result1 = routes.get(route1.clone());
assert_eq!(result1.len(), 1);
assert!(
result1.contains(&id1),
"{:?} not found in {:?}",
id1,
result1
);
assert_eq!(route_id1, route1.into());
let result2 = routes.get(route2.clone());
assert_eq!(result2.len(), 1);
assert!(
result2.contains(&id2),
"{:?} not found in {:?}",
id2,
result2
);
assert_eq!(route_id2, route2.into());
}
#[test]
fn returns_empty_set_when_nothing_is_available() {
let routes = RouteStorage::new();
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
let result = routes.get(route);
assert_eq!(result.len(), 0);
}
#[test]
fn returns_all_entries_using_the_same_route() {
let count = 5;
let mut routes = RouteStorage::new();
let mut ids: HashSet<Uuid> = HashSet::new();
while ids.len() < count {
ids.insert(Uuid::new_v4());
}
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
for id in ids.iter() {
routes.add(route.clone(), id.clone());
}
let result = routes.get(route);
assert_eq!(result, ids);
}
#[test]
fn routes_are_not_duplicated() {
let count = 5;
let mut routes = RouteStorage::new();
let id = Uuid::new_v4();
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
for _ in 0..count {
routes.add(route.clone(), id.clone());
}
let result = routes.get(route);
assert_eq!(result.len(), 1);
assert!(result.contains(&id), "{:?} not found in {:?}", id, result);
}
#[test]
fn overlapping_routes_are_combined() {
let mut routes = RouteStorage::new();
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
routes.add(route1.clone(), id1.clone());
routes.add(route2.clone(), id2.clone());
let retrieve = Route::new(Include::All, Include::All, Include::All);
let result = routes.get(retrieve);
assert_eq!(result.len(), 2);
assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result);
assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result);
}
#[test]
fn can_remove_sender_id() {
let mut routes = RouteStorage::new();
let count = 5;
let mut ids: HashSet<Uuid> = HashSet::new();
while ids.len() < count {
ids.insert(Uuid::new_v4());
}
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
for id in ids.iter() {
routes.add(route.clone(), id.clone());
}
let removed = ids.iter().last().unwrap().clone();
ids.remove(&removed);
routes.remove_sender_id(&removed);
let result = routes.get(route);
assert_eq!(result, ids);
}
#[test]
fn empty_routes_are_release_memory() {
let mut routes = RouteStorage::new();
let id = Uuid::new_v4();
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
routes.add(route.clone(), id.clone());
routes.remove_sender_id(&id);
assert_eq!(routes.data.len(), 0);
}
}
pub struct DocRegistry {
doc_names: Names,
queue: Queue,
receiver: Receiver<Message>,
routes: RouteStorage,
}
impl DocRegistry {
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
Self {
doc_names: Names::new(),
queue: queue,
receiver: rx,
routes: RouteStorage::new(),
}
}
pub fn start(queue: Queue, rx: Receiver<Message>) {
let mut doc_names = DocRegistry::new(queue, rx);
spawn(move || {
doc_names.listen();
});
}
fn listen(&mut self) {
loop {
let mut msg = self.receiver.recv().unwrap();
match msg.get_action() {
MsgAction::Register(data) => {
let id = data.get_sender_id();
let reply = msg.response(self.register_action(data));
self.queue.forward(id, reply);
}
_ => match self.path_to_route(&msg.get_path()) {
Ok(route) => {
msg.set_route(route.clone());
for sender_id in self.routes.get(route).iter() {
self.queue.forward(sender_id, msg.clone());
}
}
Err(err) => self
.queue
.send(msg.response(MsgAction::Error(err)))
.unwrap(),
},
}
}
}
fn path_to_route(&self, path: &Path) -> Result<Route, MTTError> {
let doc_id = match &path.doc {
Include::Just(name) => match self.doc_names.get_id(name) {
Ok(id) => Include::Just(id),
Err(err) => return Err(err),
},
Include::All => Include::All,
};
Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone()))
}
fn register_action(&mut self, reg: &Register) -> Register {
match reg.get_msg() {
RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) {
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
Err(err) => reg.response(RegMsg::Error(err)),
},
RegMsg::AddRoute(path) => {
let response = match self.path_to_route(path) {
Ok(route) => {
let id = self.routes.add(route, reg.get_sender_id().clone());
RegMsg::RouteID(id)
}
Err(err) => RegMsg::Error(err),
};
reg.response(response)
}
RegMsg::GetNameID(name) => match self.doc_names.get_id(name) {
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
Err(err) => reg.response(RegMsg::Error(err)),
},
RegMsg::RemoveSender(sender_id) => {
self.routes.remove_sender_id(sender_id);
reg.response(RegMsg::Ok)
}
_ => reg.response(RegMsg::Ok),
}
}
}

331
src/queue/router.rs Normal file
View File

@@ -0,0 +1,331 @@
use crate::{
message::Message,
mtterror::MTTError,
name::NameType,
queue::data_director::{DocRegistry, RegMsg, Register},
};
use std::{
collections::HashMap,
sync::{
mpsc::{channel, Sender},
Arc, RwLock,
},
};
use uuid::Uuid;
struct Router {
doc_registry: Sender<Message>,
senders: HashMap<Uuid, Sender<Message>>,
}
impl Router {
fn new(tx: Sender<Message>) -> Self {
Self {
doc_registry: tx,
senders: HashMap::new(),
}
}
fn add_sender(&mut self, sender: Sender<Message>) -> Uuid {
let mut id = Uuid::new_v4();
while self.senders.contains_key(&id) {
id = Uuid::new_v4();
}
self.senders.insert(id.clone(), sender);
id
}
fn remove_sender(&mut self, id: &Uuid) {
let action = Register::new(Uuid::nil(), RegMsg::RemoveSender(id.clone()));
self.doc_registry
.send(Message::new(NameType::None, action))
.unwrap();
self.senders.remove(id);
}
fn forward(&self, id: &Uuid, msg: Message) {
if id == &Uuid::nil() {
return;
}
match self.senders.get(id) {
Some(sender) => sender.send(msg).unwrap(),
None => {}
}
}
fn send(&self, msg: Message) {
self.doc_registry.send(msg).unwrap();
}
}
#[derive(Clone)]
pub struct Queue {
router: Arc<RwLock<Router>>,
}
impl Queue {
pub fn new() -> Self {
let (tx, rx) = channel();
let output = Self {
router: Arc::new(RwLock::new(Router::new(tx))),
};
DocRegistry::start(output.clone(), rx);
output
}
pub fn add_sender(&mut self, sender: Sender<Message>) -> Uuid {
let mut router = self.router.write().unwrap();
router.add_sender(sender)
}
pub fn remove_sender(&mut self, id: &Uuid) {
let mut router = self.router.write().unwrap();
router.remove_sender(id);
}
pub fn forward(&self, id: &Uuid, msg: Message) {
let router = self.router.read().unwrap();
router.forward(id, msg);
}
pub fn send(&self, msg: Message) -> Result<(), MTTError> {
let router = self.router.read().unwrap();
router.send(msg.clone());
Ok(())
}
}
#[cfg(test)]
mod routers {
use super::*;
use crate::{
message::{MsgAction, Query},
name::Name,
support_tests::TIMEOUT,
};
use std::{
collections::HashSet,
sync::mpsc::{Receiver, RecvTimeoutError},
};
struct Setup {
test_mod: Router,
rx: Receiver<Message>,
}
impl Setup {
fn new() -> Self {
let (tx, rx) = channel();
Self {
test_mod: Router::new(tx),
rx: rx,
}
}
fn get_router(&self) -> &Router {
&self.test_mod
}
fn get_router_mut(&mut self) -> &mut Router {
&mut self.test_mod
}
fn recv(&self) -> Result<Message, RecvTimeoutError> {
self.rx.recv_timeout(TIMEOUT)
}
}
#[test]
fn can_pass_message() {
let setup = Setup::new();
let router = setup.get_router();
let msg = Message::new(Name::english("task"), Query::new());
router.send(msg.clone());
let result = setup.recv().unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
#[test]
fn can_forward_message() {
let mut setup = Setup::new();
let router = setup.get_router_mut();
let mut receivers: HashMap<Uuid, Receiver<Message>> = HashMap::new();
for _ in 0..10 {
let (tx, rx) = channel();
let id = router.add_sender(tx);
receivers.insert(id, rx);
}
for (id, recv) in receivers.iter() {
let msg = Message::new(Name::english(id.to_string().as_str()), Query::new());
router.forward(id, msg.clone());
let result = recv.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
}
#[test]
fn sender_ids_are_unique() {
let mut setup = Setup::new();
let router = setup.get_router_mut();
let count = 10;
let mut holder: HashSet<Uuid> = HashSet::new();
for _ in 0..count {
let (tx, _) = channel();
holder.insert(router.add_sender(tx));
}
assert_eq!(holder.len(), count, "had duplicate keys");
}
#[test]
fn can_remove_sender() {
let mut setup = Setup::new();
let router = setup.get_router_mut();
let mut receivers: HashMap<Uuid, Receiver<Message>> = HashMap::new();
for _ in 0..10 {
let (tx, rx) = channel();
let id = router.add_sender(tx);
receivers.insert(id, rx);
}
let removed = receivers.keys().last().unwrap().clone();
router.remove_sender(&removed);
let router = setup.get_router();
let removed_recv = receivers.remove(&removed).unwrap();
router.forward(&removed, Message::new(NameType::None, Query::new()));
match removed_recv.recv_timeout(TIMEOUT) {
Err(err) => match err {
RecvTimeoutError::Disconnected => {}
_ => unreachable!("got {:?}, should have been disconnected", err),
},
_ => unreachable!("should have returned an error"),
}
let announce = setup.recv().unwrap();
let action = announce.get_action();
match action {
MsgAction::Register(data) => {
let output = data.get_msg();
match output {
RegMsg::RemoveSender(id) => assert_eq!(id, &removed),
_ => unreachable!("got {:?} should have been sender removal", output),
}
}
_ => unreachable!("got {:?}, should have been register", action),
}
for (id, recv) in receivers.iter() {
let msg = Message::new(Name::english(id.to_string().as_str()), Query::new());
router.forward(id, msg.clone());
let result = recv.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
}
#[test]
fn ignores_bad_id_removals() {
let mut setup = Setup::new();
let router = setup.get_router_mut();
let removed = Uuid::new_v4();
router.remove_sender(&removed);
assert_eq!(router.senders.len(), 0, "should have no senders.");
let announce = setup.recv().unwrap();
let action = announce.get_action();
match action {
MsgAction::Register(data) => {
let output = data.get_msg();
match output {
RegMsg::RemoveSender(id) => assert_eq!(id, &removed),
_ => unreachable!("got {:?} should have been sender removal", output),
}
}
_ => unreachable!("got {:?}, should have been register", action),
}
}
}
#[cfg(test)]
mod queues {
use super::*;
use crate::{
message::MsgAction,
name::Name,
queue::data_director::{Include, Path},
support_tests::TIMEOUT,
};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
struct Setup {
test_mod: Queue,
rx: Receiver<Message>,
rx_id: Uuid,
}
impl Setup {
fn new() -> Self {
let mut queue = Queue::new();
let (tx, rx) = channel();
let id = queue.add_sender(tx);
Self {
test_mod: queue,
rx: rx,
rx_id: id,
}
}
fn send_reg_msg(&self, msg: RegMsg) {
let reg_msg = Register::new(self.rx_id.clone(), msg);
self.test_mod
.send(Message::new(NameType::None, reg_msg))
.unwrap();
}
fn recv(&self) -> Result<Message, RecvTimeoutError> {
self.rx.recv_timeout(TIMEOUT)
}
}
#[test]
fn can_add_names_registry() {
let setup = Setup::new();
let name = Name::english(Uuid::new_v4().to_string().as_str());
let reg = RegMsg::AddDocName(vec![name.clone()]);
setup.send_reg_msg(reg);
let result = setup.recv().unwrap();
let action = result.get_action();
match action {
MsgAction::Register(data) => {
let regmsg = data.get_msg();
match data.get_msg() {
RegMsg::DocumentNameID(_) => {}
_ => unreachable!("got {:?} should have been document id", regmsg),
}
}
_ => unreachable!("got {:?} should have been register", action),
}
}
#[test]
fn returns_error_when_document_name_not_found() {
let setup = Setup::new();
let name = Name::english(Uuid::new_v4().to_string().as_str());
let path = Path::new(
Include::All,
Include::Just(name.clone().into()),
Include::All,
);
let reg = RegMsg::AddRoute(path);
setup.send_reg_msg(reg);
let result = setup.recv().unwrap();
let action = result.get_action();
match action {
MsgAction::Register(data) => {
let regmsg = data.get_msg();
match data.get_msg() {
RegMsg::Error(err) => match err {
MTTError::NameNotFound(failed_name) => assert_eq!(failed_name, &name),
_ => unreachable!("got {:?} should have been missing name", err),
},
_ => unreachable!("got {:?} should have been error", regmsg),
}
}
_ => unreachable!("got {:?} should have been register", action),
}
}
}