Fixed lost sender issue.
This commit is contained in:
parent
f8bf13e91b
commit
c3f24d58c3
@ -1,8 +1,5 @@
|
|||||||
use crate::queue::Message;
|
use crate::queue::Message;
|
||||||
use std::{
|
use std::{fmt, sync::mpsc::Sender};
|
||||||
fmt,
|
|
||||||
sync::mpsc::Sender,
|
|
||||||
};
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@ -65,8 +62,8 @@ impl fmt::Display for Field {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod fields {
|
mod fields {
|
||||||
use std::sync::mpsc::channel;
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn string_to_field() {
|
fn string_to_field() {
|
||||||
@ -128,7 +125,7 @@ mod fields {
|
|||||||
let msg = Message::new();
|
let msg = Message::new();
|
||||||
sender.send(msg).unwrap();
|
sender.send(msg).unwrap();
|
||||||
rx.recv().unwrap();
|
rx.recv().unwrap();
|
||||||
},
|
}
|
||||||
_ => unreachable!("should have been a sender"),
|
_ => unreachable!("should have been a sender"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
12
src/lib.rs
12
src/lib.rs
@ -4,17 +4,25 @@ mod queue;
|
|||||||
|
|
||||||
use client::{Client, Reply, Request};
|
use client::{Client, Reply, Request};
|
||||||
use field::Field;
|
use field::Field;
|
||||||
use std::sync::mpsc::{channel, Sender};
|
use queue::Message;
|
||||||
|
use std::sync::{
|
||||||
|
mpsc::{channel, Sender},
|
||||||
|
Arc, RwLock,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MoreThanText {
|
pub struct MoreThanText {
|
||||||
|
registry: Arc<RwLock<Vec<Sender<Message>>>>,
|
||||||
tx: Sender<Request>,
|
tx: Sender<Request>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MoreThanText {
|
impl MoreThanText {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let tx = Client::start();
|
let tx = Client::start();
|
||||||
Self { tx: tx }
|
Self {
|
||||||
|
registry: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
tx: tx,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn request<F>(&self, _session: Option<F>) -> Reply
|
pub fn request<F>(&self, _session: Option<F>) -> Reply
|
||||||
|
86
src/queue.rs
86
src/queue.rs
@ -1,16 +1,18 @@
|
|||||||
use crate::field::Field;
|
use crate::field::Field;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
sync::mpsc::{channel, Receiver, Sender},
|
sync::{
|
||||||
|
mpsc::{channel, Receiver, Sender},
|
||||||
|
Arc, RwLock,
|
||||||
|
},
|
||||||
thread::spawn,
|
thread::spawn,
|
||||||
};
|
};
|
||||||
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
enum MsgType {
|
enum MsgType {
|
||||||
ClientNewMessage,
|
ClientMessage,
|
||||||
|
NewClientMessage,
|
||||||
NoOp,
|
NoOp,
|
||||||
Register,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
@ -40,7 +42,11 @@ impl Message {
|
|||||||
&self.class
|
&self.class
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_data<S, F>(&mut self, name: S, data: F) where S: Into<String>, F: Into<Field> {
|
fn add_data<S, F>(&mut self, name: S, data: F)
|
||||||
|
where
|
||||||
|
S: Into<String>,
|
||||||
|
F: Into<Field>,
|
||||||
|
{
|
||||||
self.data.insert(name.into(), data.into());
|
self.data.insert(name.into(), data.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,11 +75,11 @@ mod messages {
|
|||||||
let id = Uuid::new_v4();
|
let id = Uuid::new_v4();
|
||||||
let mut msg = Message::new();
|
let mut msg = Message::new();
|
||||||
msg.id = id.clone();
|
msg.id = id.clone();
|
||||||
let data = MsgType::Register;
|
let data = MsgType::NewClientMessage;
|
||||||
let result = msg.reply(data);
|
let result = msg.reply(data);
|
||||||
assert_eq!(result.id, id);
|
assert_eq!(result.id, id);
|
||||||
match result.class {
|
match result.class {
|
||||||
MsgType::Register => {},
|
MsgType::NewClientMessage => {}
|
||||||
_ => unreachable!("should have been a registration request"),
|
_ => unreachable!("should have been a registration request"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -82,7 +88,7 @@ mod messages {
|
|||||||
fn get_message_type() {
|
fn get_message_type() {
|
||||||
let msg = Message::new();
|
let msg = Message::new();
|
||||||
match msg.get_class() {
|
match msg.get_class() {
|
||||||
MsgType::NoOp => {},
|
MsgType::NoOp => {}
|
||||||
_ => unreachable!("should have bneen noopn"),
|
_ => unreachable!("should have bneen noopn"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -101,22 +107,22 @@ mod messages {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct Queue {
|
struct Queue {
|
||||||
senders: Vec<Sender<Message>>,
|
registry: Arc<RwLock<Vec<Sender<Message>>>>,
|
||||||
rx: Receiver<Message>,
|
rx: Receiver<Message>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Queue {
|
impl Queue {
|
||||||
fn new(rx: Receiver<Message>) -> Self {
|
fn new(rx: Receiver<Message>, registry: Arc<RwLock<Vec<Sender<Message>>>>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
senders: Vec::new(),
|
registry: registry,
|
||||||
rx: rx,
|
rx: rx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start() -> Sender<Message> {
|
fn start(registry: Arc<RwLock<Vec<Sender<Message>>>>) -> Sender<Message> {
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
let mut queue = Queue::new(rx);
|
let mut queue = Queue::new(rx, registry);
|
||||||
queue.listen();
|
queue.listen();
|
||||||
});
|
});
|
||||||
tx
|
tx
|
||||||
@ -125,56 +131,40 @@ impl Queue {
|
|||||||
fn listen(&mut self) {
|
fn listen(&mut self) {
|
||||||
loop {
|
loop {
|
||||||
let msg = self.rx.recv().unwrap();
|
let msg = self.rx.recv().unwrap();
|
||||||
match msg.get_class() {
|
let senders = self.registry.read().unwrap();
|
||||||
MsgType::Register => self.register(msg.get_data()).unwrap(),
|
for sender in senders.iter() {
|
||||||
_ => {
|
sender.send(Message::new()).unwrap();
|
||||||
for sender in self.senders.iter() {
|
|
||||||
sender.send(Message::new()).unwrap();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register(&mut self, data: &HashMap<String, Field>) -> Result<(), String> {
|
|
||||||
match data.get("tx") {
|
|
||||||
Some(data) => {
|
|
||||||
self.senders.push(data.to_sender().unwrap());
|
|
||||||
Ok(())
|
|
||||||
},
|
|
||||||
None => Err("missing tx sender".to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod queues {
|
mod queues {
|
||||||
use std::time::Duration;
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
fn start_queue() -> (Sender<Message>, Receiver<Message>) {
|
static TIMEOUT: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
|
fn start_queue() -> (
|
||||||
|
Sender<Message>,
|
||||||
|
Receiver<Message>,
|
||||||
|
) {
|
||||||
|
let reg: Arc<RwLock<Vec<Sender<Message>>>> = Arc::new(RwLock::new(Vec::new()));
|
||||||
let (tx, rx) = channel::<Message>();
|
let (tx, rx) = channel::<Message>();
|
||||||
let initial = Message::new();
|
let mut data = reg.write().unwrap();
|
||||||
let mut msg = initial.reply(MsgType::Register);
|
data.push(tx.clone());
|
||||||
msg.add_data("tx", tx);
|
drop(data);
|
||||||
let queue_tx = Queue::start();
|
let queue_tx = Queue::start(Arc::clone(®));
|
||||||
(queue_tx, rx)
|
(queue_tx, rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn run_queue() {
|
|
||||||
let queue_tx = Queue::start();
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
let msg = Message::new();
|
|
||||||
let mut reply = msg.reply(MsgType::Register);
|
|
||||||
reply.add_data("tx", tx);
|
|
||||||
queue_tx.send(reply);
|
|
||||||
queue_tx.send(msg.reply(MsgType::NoOp));
|
|
||||||
rx.recv_timeout(Duration::from_millis(400)).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn get_new_client_message() {
|
fn get_new_client_message() {
|
||||||
let (tx, rx) = start_queue();
|
let (tx, rx) = start_queue();
|
||||||
|
let initial = Message::new();
|
||||||
|
let msg = initial.reply(MsgType::NewClientMessage);
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user