From 0bdcfce68537e1d8c8847dd44fc1786acef5fe9d Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Thu, 10 Apr 2025 13:42:43 -0400 Subject: [PATCH] Added a Time message. --- src/clock.rs | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 ++ src/queue.rs | 1 + 3 files changed, 95 insertions(+) create mode 100644 src/clock.rs diff --git a/src/clock.rs b/src/clock.rs new file mode 100644 index 0000000..6c689cc --- /dev/null +++ b/src/clock.rs @@ -0,0 +1,91 @@ +use crate::queue::{Message, MsgType, Queue}; +use chrono::prelude::*; +use std::{ + thread::{sleep, spawn}, + time::Duration, +}; + +const SLEEP_FOR: Duration = Duration::from_millis(1000); + +pub struct Clock { + queue: Queue, +} + +impl Clock { + fn new(queue: Queue) -> Self { + Self { queue: queue } + } + + pub fn start(queue: Queue) { + let clock = Clock::new(queue); + spawn(move || { + clock.listen(); + }); + } + + fn listen(&self) { + loop { + let mut msg = Message::new(MsgType::Time); + msg.add_data("time", Utc::now()); + match self.queue.send(msg) { + Ok(_) => {} + Err(_) => {} + }; + sleep(SLEEP_FOR); + } + } +} + +#[cfg(test)] +mod clocks { + use super::*; + use std::{ + sync::mpsc::{channel, Receiver}, + time::{Duration, Instant}, + }; + + static TIMEOUT: Duration = Duration::from_millis(500); + + fn start_clock(listen_for: Vec) -> Receiver { + let queue = Queue::new(); + let (tx, rx) = channel(); + queue.add(tx, listen_for); + Clock::start(queue); + rx + } + + #[test] + fn sends_timestamp() { + let rx = start_clock([MsgType::Time].to_vec()); + let msg = rx.recv_timeout(TIMEOUT).unwrap(); + match msg.get_class() { + MsgType::Time => { + msg.get_data("time").unwrap().to_datetime().unwrap(); + } + _ => unreachable!("should have been a time message"), + } + } + + #[test] + fn continues_to_send_time() { + let start = Instant::now(); + let rx = start_clock([MsgType::Time].to_vec()); + let msg1 = rx.recv_timeout(TIMEOUT).unwrap(); + let msg2 = rx.recv().unwrap(); + assert!( + start.elapsed() >= SLEEP_FOR, + "did not pause long enough betwee sends" + ); + assert!( + msg2.get_data("time").unwrap().to_datetime().unwrap() + > msg1.get_data("time").unwrap().to_datetime().unwrap(), + "shoould present the latest time" + ); + } + + #[test] + fn does_not_panic_without_listeners() { + let rx = start_clock([MsgType::SessionValidate].to_vec()); + assert!(rx.recv_timeout(TIMEOUT).is_err(), "should timeout"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 1339b71..5deed41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,12 @@ mod client; +mod clock; mod field; mod queue; mod session; mod utils; use client::{Client, ClientLink, Reply, Request}; +use clock::Clock; use field::Field; use queue::Queue; use session::Session; @@ -17,6 +19,7 @@ pub struct MoreThanText { impl MoreThanText { pub fn new() -> Self { let queue = Queue::new(); + Clock::start(queue.clone()); Session::start(queue.clone()); Self { client_link: Client::start(queue.clone()), diff --git a/src/queue.rs b/src/queue.rs index c040460..ad15aa6 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -10,6 +10,7 @@ pub enum MsgType { ClientRequest, SessionValidate, Session, + Time, } #[derive(Clone)]