From 572abbeda87f7aa6306c3779b53fb4bc844fccc1 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Sat, 8 Apr 2023 15:04:04 -0400 Subject: [PATCH] Got the cache channels setup. --- src/morethantext/mod.rs | 135 +++++++++++++++++++++++++++++++++++----- 1 file changed, 121 insertions(+), 14 deletions(-) diff --git a/src/morethantext/mod.rs b/src/morethantext/mod.rs index 08fb705..dbbde18 100644 --- a/src/morethantext/mod.rs +++ b/src/morethantext/mod.rs @@ -1,9 +1,9 @@ use async_std::{ - channel::{unbounded, Sender}, + channel::{unbounded, Receiver, Sender}, path::PathBuf, task::spawn, }; -use std::{error::Error, fmt}; +use std::{collections::HashMap, error::Error, fmt}; const ENTRY: &str = "EntryPoint"; @@ -180,17 +180,33 @@ mod datatypes { } } +#[derive(Debug)] +enum FromCache { + Data(HashMap), + Error(MTTError), +} + +struct CacheQuery { + ids: Vec, + reply: Sender, +} + +enum ToCache { + Query(CacheQuery), +} + #[derive(Clone)] pub struct MoreThanText { session: Vec, - channel: Sender, + cache: Sender>, } impl MoreThanText { - async fn new() {} - - async fn get_entry(&self, id: String) { - self.channel.send(id).await.unwrap(); + async fn new(cache: Sender>) -> Result { + Ok(Self { + session: [ENTRY.to_string()].to_vec(), + cache: cache, + }) } } @@ -200,25 +216,116 @@ mod mtt { #[async_std::test] async fn create() { - MoreThanText::new().await; + let (s, _) = unbounded(); + let mtt = MoreThanText::new(s).await.unwrap(); + assert_eq!(mtt.session, [ENTRY]); } } -pub async fn start_db

(dir: P) -> Result +struct Cache { + channel: Receiver, +} + +impl Cache { + async fn new

(_dir: P, channel: Receiver) -> Result + where + P: Into, + { + Ok(Self { channel: channel }) + } + + async fn start(&self) { + loop { + match self.channel.recv().await.unwrap() { + ToCache::Query(data) => { + for id in data.ids { + if id == ENTRY { + let mut holder = HashMap::new(); + holder.insert(ENTRY.to_string(), DataType::new("store").unwrap()); + data.reply.send(FromCache::Data(holder)).await.unwrap(); + } else { + data.reply + .send(FromCache::Error(MTTError::new("fred"))) + .await + .unwrap(); + } + } + } + } + } + } +} + +#[cfg(test)] +mod caches { + use super::*; + use tempfile::tempdir; + + async fn start_cache

(dir: P) -> Sender + where + P: Into, + { + let (s, r) = unbounded(); + let datadir = dir.into(); + spawn(async move { + let cache = Cache::new(datadir, r).await.unwrap(); + cache.start().await; + }); + s + } + + #[async_std::test] + async fn create() { + let dir = tempdir().unwrap(); + let s_cache = start_cache(dir.path()).await; + let (s_rep, r_rep) = unbounded(); + let request = ToCache::Query(CacheQuery { + ids: [ENTRY.to_string()].to_vec(), + reply: s_rep, + }); + s_cache.send(request).await.unwrap(); + let result = r_rep.recv().await.unwrap(); + match result { + FromCache::Data(data) => match data.get(ENTRY) { + Some(output) => match output { + DataType::DBMap(_) => (), + _ => assert!(false, "{:?} is not a database store.", output), + }, + None => assert!(false, "Should contain entry point."), + }, + _ => assert!(false, "{:?} should have been a store.", result), + } + } + + #[async_std::test] + async fn bad_entry() { + let dir = tempdir().unwrap(); + let s_cache = start_cache(dir.path()).await; + let (s_rep, r_rep) = unbounded(); + let request = ToCache::Query(CacheQuery { + ids: ["bad_id".to_string()].to_vec(), + reply: s_rep, + }); + s_cache.send(request).await.unwrap(); + let result = r_rep.recv().await.unwrap(); + match result { + FromCache::Error(_) => (), + _ => assert!(false, "{:?} should have been an error.", result), + } + } +} + +pub async fn start_db

(_dir: P) -> Result where P: Into, { - let data_dir = dir.into(); let (s, r) = unbounded(); spawn(async move { loop { r.recv().await.unwrap(); } }); - Ok(MoreThanText { - session: [ENTRY.to_string()].to_vec(), - channel: s, - }) + Ok(MoreThanText::new(s).await.unwrap()) } #[cfg(test)]