From 12b52ecf432083fea9d9cb12d6bc695201633950 Mon Sep 17 00:00:00 2001 From: AndrielFR Date: Wed, 14 May 2025 19:01:41 -0300 Subject: [PATCH] feat(cache): add caching mechanism for chats and users - Introduced a new `Cache` module to manage chat and user data. - Implemented methods to load, save, and retrieve cached data using `bincode` for serialization. - Integrated the cache into the `Client`, `Context`, and `Dispatcher` modules. - Added auto-save functionality for the cache at regular intervals and during application shutdown. - Updated `Cargo.toml` to include `bincode` as a dependency for encoding/decoding cache data. - Enhanced `Dispatcher` to save chats and users to the cache when processing updates. --- lib/ferogram/Cargo.toml | 7 +- lib/ferogram/src/cache.rs | 148 +++++++++++++++++++++++++++++++++ lib/ferogram/src/client.rs | 48 +++++++++-- lib/ferogram/src/context.rs | 17 +++- lib/ferogram/src/dispatcher.rs | 70 +++++++++------- lib/ferogram/src/lib.rs | 2 + 6 files changed, 252 insertions(+), 40 deletions(-) create mode 100644 lib/ferogram/src/cache.rs diff --git a/lib/ferogram/Cargo.toml b/lib/ferogram/Cargo.toml index 2c7a65f..a967a4e 100644 --- a/lib/ferogram/Cargo.toml +++ b/lib/ferogram/Cargo.toml @@ -29,13 +29,14 @@ ferogram-macros = { path = "../ferogram-macros", version = "0.1.0", optional = t grammers-client = { git = "https://github.com/Lonami/grammers.git", version = "0.7.0" } grammers-mtsender = { git = "https://github.com/Lonami/grammers.git", version = "0.7.0" } -log = "0.4.25" +log = "^0.4" url = { version = "^2.5", optional = true } mlua = { version = "^0.10", features = ["async", "lua54", "module"], optional = true } pyo3 = { version = "^0.23", features = ["experimental-async", "macros"], optional = true } -regex = "1.11.1" +regex = "^1.11" tokio = { version = "^1.43", features = ["fs", "rt", "signal", "sync"] } -rpassword = "7.3.1" +bincode = { version = "^2.0" } +rpassword = "^7.3" async-trait = "^0.1" futures-util = { version = "^0.3", default-features = false, features = ["alloc"] } async-recursion = "^1.1" diff --git a/lib/ferogram/src/cache.rs b/lib/ferogram/src/cache.rs new file mode 100644 index 0000000..258b452 --- /dev/null +++ b/lib/ferogram/src/cache.rs @@ -0,0 +1,148 @@ +// Copyright 2024-2025 - Andriel Ferreira +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Cache module. + +use std::{collections::HashMap, path::Path, sync::Arc}; + +use bincode::{ + Decode, Encode, config, + de::Decoder, + enc::Encoder, + error::{DecodeError, EncodeError}, +}; +use grammers_client::types::PackedChat; +use tokio::sync::RwLock; + +/// The cache. +#[derive(Clone, Debug, Default)] +pub struct Cache { + /// The inner cache. + inner: Arc>, +} + +impl Cache { + /// Load a previous cache instance from a file or create one if it doesn’t exist. + pub fn load_file_or_create>(path: P) -> crate::Result { + // try to open the cache file. + if let Ok(mut file) = std::fs::File::open(&path) { + // get the standard config. + let config = config::standard(); + + // construct the inner cache. + let inner: InnerCache = bincode::decode_from_std_read(&mut file, config)?; + + log::debug!("loaded {} chats from cache", inner.chats.len()); + + Ok(Self { + inner: Arc::new(RwLock::new(inner)), + }) + } else { + log::debug!("no cache was found, generating a new one"); + + Ok(Self { + inner: Arc::new(RwLock::new(InnerCache::default())), + }) + } + } + + /// Try to save the cache to a file. + pub async fn save_to_file>(&self, path: P) -> crate::Result<()> { + // delete the cache file if it exists. + if std::fs::exists(&path)? { + std::fs::remove_file(&path)?; + } + + // create the cache file. + let mut file = std::fs::File::create(path)?; + + // get the standard config. + let config = config::standard(); + + // clone the inner. + let inner = self.inner.write().await.clone(); + + // write to the file. + bincode::encode_into_std_write(inner, &mut file, config)?; + + Ok(()) + } + + /// Gets a saved chat by its ID. + pub fn get_chat(&self, chat_id: i64) -> Option { + let inner = self.inner.try_read().expect("failed to get saved chats"); + + inner.chats.get(&chat_id).cloned() + } + + /// Saves a chat in the cache. + pub(crate) async fn save_chat(&self, chat: PackedChat) -> crate::Result<()> { + let mut inner = self.inner.write().await; + + if !inner.chat_exists(chat.id) { + log::trace!("saved a new chat: {:?}", chat); + + inner.push_chat(chat); + } + + Ok(()) + } +} + +/// The inner cache. +#[derive(Clone, Debug, Default)] +struct InnerCache { + /// The packed chat map. + chats: HashMap, +} + +impl InnerCache { + /// Pushes a chat. + pub fn push_chat(&mut self, chat: PackedChat) { + self.chats.entry(chat.id).or_insert(chat); + } + + /// Checks if a chat exists. + pub fn chat_exists(&self, chat_id: i64) -> bool { + self.chats.contains_key(&chat_id) + } +} + +impl Encode for InnerCache { + fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { + // convert chats to bytes. + let chats = self + .chats + .clone() + .into_iter() + .map(|(id, chat)| (id, chat.to_bytes())) + .collect::>(); + + Encode::encode(&chats, encoder)?; + + Ok(()) + } +} + +impl Decode for InnerCache { + fn decode>(decoder: &mut D) -> Result { + // convert bytes to chats. + let encoded_chats: HashMap = Decode::decode(decoder)?; + let chats = encoded_chats + .into_iter() + .map(|(id, bytes)| { + ( + id, + PackedChat::from_bytes(&bytes).expect("failed to decode chat bytes"), + ) + }) + .collect::>(); + + Ok(Self { chats }) + } +} diff --git a/lib/ferogram/src/client.rs b/lib/ferogram/src/client.rs index d5c3ceb..88973a5 100644 --- a/lib/ferogram/src/client.rs +++ b/lib/ferogram/src/client.rs @@ -8,14 +8,14 @@ //! Client module. -use std::path::Path; +use std::{path::Path, time::Duration}; use grammers_client::{ Config, InitParams, ReconnectionPolicy, SignInError, grammers_tl_types as tl, session::Session, }; use grammers_mtsender::ServerAddr; -use crate::{Context, Dispatcher, ErrorHandler, Result, di, utils::prompt}; +use crate::{Cache, Context, Dispatcher, ErrorHandler, Result, di, utils::prompt}; /// Wrapper about grammers' `Client` instance. pub struct Client { @@ -26,6 +26,8 @@ pub struct Client { /// The inner grammers' `Client` instance. inner_client: grammers_client::Client, + /// The session cache. + cache: Cache, /// The session file path. session_file: Option, @@ -33,7 +35,7 @@ pub struct Client { is_connected: bool, /// Whether is to update Telegram's bot commands. set_bot_commands: bool, - /// Wheter is to wait for a `Ctrl + C` signal to close the connection and exit the app. + /// Whether is to wait for a `Ctrl + C` signal to close the connection and exit the app. wait_for_ctrl_c: bool, /// The global error handler. @@ -226,7 +228,7 @@ impl Client { pub fn new_ctx(&self) -> Context { let upd_receiver = self.dispatcher.upd_sender.subscribe(); - Context::new(&self.inner_client, upd_receiver) + Context::new(&self.cache, &self.inner_client, upd_receiver) } /// Listen to Telegram's updates and send them to the dispatcher's routers. @@ -239,6 +241,7 @@ impl Client { /// # } /// ``` pub async fn run(self) -> Result<()> { + let cache = self.cache.clone(); let handle = self.inner_client; let dispatcher = self.dispatcher; let err_handler = self.err_handler; @@ -286,12 +289,13 @@ impl Client { loop { match handle.next_update().await { Ok(update) => { + let cache = cache.clone(); let client = handle.clone(); let mut dp = dispatcher.clone(); let err_handler = err_handler.clone(); tokio::task::spawn(async move { - if let Err(e) = dp.handle_update(&client, &update).await { + if let Err(e) = dp.handle_update(&cache, &client, &update).await { if let Some(err_handler) = err_handler.as_ref() { err_handler.run(client, update, e).await; } else { @@ -308,6 +312,21 @@ impl Client { }); if self.wait_for_ctrl_c { + let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session"); + let cache_file = format!("{}.cc", session_file); + + let cache = self.cache.clone(); + let path = cache_file.clone(); + tokio::task::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + + if let Err(e) = cache.save_to_file(&path).await { + log::error!("failed to auto-save cache: {}", e); + } + } + }); + tokio::signal::ctrl_c().await?; if let Some(mut handler) = self.exit_handler { @@ -317,8 +336,8 @@ impl Client { handler.handle(&mut injector).await.unwrap(); } - let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session"); client.session().save_to_file(session_file)?; + self.cache.save_to_file(cache_file).await?; } Ok(()) @@ -335,6 +354,7 @@ impl Client { /// ``` pub async fn keep_alive(self) -> Result<()> { let handle = self.inner_client; + let client = handle.clone(); tokio::task::spawn(async move { loop { @@ -344,6 +364,12 @@ impl Client { if self.wait_for_ctrl_c { tokio::signal::ctrl_c().await?; + + let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session"); + let cache_file = &format!("{}.cc", session_file); + + client.session().save_to_file(session_file)?; + self.cache.save_to_file(cache_file).await?; } Ok(()) @@ -427,6 +453,15 @@ impl ClientBuilder { /// ``` pub async fn build(self) -> Result { let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session"); + let cache_file = &format!("{}.cc", session_file); + + // check if the session file don't exists and the cache file exists. + if !std::fs::exists(session_file)? && std::fs::exists(cache_file)? { + // delete the cache file if the session file changes. + std::fs::remove_file(cache_file)?; + + log::debug!("session file changed, deleting the cache file"); + } let inner_client = grammers_client::Client::connect(Config { session: Session::load_file_or_create(session_file)?, @@ -441,6 +476,7 @@ impl ClientBuilder { client_type: self.client_type, inner_client, + cache: Cache::load_file_or_create(cache_file)?, session_file: Some(session_file.to_string()), is_connected: false, diff --git a/lib/ferogram/src/context.rs b/lib/ferogram/src/context.rs index 5aef849..e7b7aaf 100644 --- a/lib/ferogram/src/context.rs +++ b/lib/ferogram/src/context.rs @@ -23,12 +23,14 @@ use tokio::{ sync::{Mutex, broadcast::Receiver}, }; -use crate::{Filter, utils::bytes_to_string}; +use crate::{Cache, Filter, utils::bytes_to_string}; /// The context of an update. #[derive(Debug)] pub struct Context { - // The client that received the update. + /// The session cache. + pub cache: Cache, + /// The client that received the update. client: grammers_client::Client, /// The update itself. update: Option, @@ -38,8 +40,13 @@ pub struct Context { impl Context { /// Creates a new context. - pub fn new(client: &grammers_client::Client, upd_receiver: Receiver) -> Self { + pub fn new( + cache: &Cache, + client: &grammers_client::Client, + upd_receiver: Receiver, + ) -> Self { Self { + cache: cache.clone(), client: client.clone(), update: None, upd_receiver: Arc::new(Mutex::new(upd_receiver)), @@ -48,11 +55,13 @@ impl Context { /// Creates a new context with an update. pub fn with( + cache: &Cache, client: &grammers_client::Client, update: &Update, upd_receiver: Receiver, ) -> Self { Self { + cache: cache.clone(), client: client.clone(), update: Some(update.clone()), upd_receiver: Arc::new(Mutex::new(upd_receiver)), @@ -77,6 +86,7 @@ impl Context { .expect("Failed to lock receiver"); Self { + cache: self.cache.clone(), client: self.client.clone(), update: Some(update.clone()), upd_receiver: Arc::new(Mutex::new(upd_receiver.resubscribe())), @@ -1259,6 +1269,7 @@ impl Clone for Context { .expect("Failed to lock receiver"); Self { + cache: self.cache.clone(), client: self.client.clone(), update: self.update.clone(), upd_receiver: Arc::new(Mutex::new(upd_receiver.resubscribe())), diff --git a/lib/ferogram/src/dispatcher.rs b/lib/ferogram/src/dispatcher.rs index 89d4f94..3d98655 100644 --- a/lib/ferogram/src/dispatcher.rs +++ b/lib/ferogram/src/dispatcher.rs @@ -11,7 +11,9 @@ use grammers_client::{Client, Update, types::Chat}; use tokio::sync::broadcast::Sender; -use crate::{Context, Plugin, Result, Router, di, filters::Command, middleware::MiddlewareStack}; +use crate::{ + Cache, Context, Plugin, Result, Router, di, filters::Command, middleware::MiddlewareStack, +}; /// A dispatcher. /// @@ -172,11 +174,16 @@ impl Dispatcher { /// let dispatcher = dispatcher.handle_update(&client, &update).await?; /// # } /// ``` - pub(crate) async fn handle_update(&mut self, client: &Client, update: &Update) -> Result<()> { + pub(crate) async fn handle_update( + &mut self, + cache: &Cache, + client: &Client, + update: &Update, + ) -> Result<()> { let mut injector = di::Injector::default(); let upd_receiver = self.upd_sender.subscribe(); - let context = Context::with(client, update, upd_receiver); + let context = Context::with(cache, client, update, upd_receiver); injector.insert(context); self.upd_sender @@ -187,39 +194,46 @@ impl Dispatcher { injector.insert(update.clone()); injector.extend(&mut self.injector.clone()); - if !self.allow_from_self { - match update { - Update::NewMessage(message) | Update::MessageEdited(message) => { - if let Some(Chat::User(user)) = message.sender() { - if user.is_self() { - return Ok(()); - } - } - } - Update::CallbackQuery(query) => { - if let Chat::User(user) = query.sender() { - if user.is_self() { - return Ok(()); - } - } - } - Update::InlineQuery(query) => { - let user = query.sender(); + match update { + Update::NewMessage(message) | Update::MessageEdited(message) => { + let chat = message.chat(); + cache.save_chat(chat.pack()).await?; - if user.is_self() { + if let Some(Chat::User(user)) = message.sender() { + cache.save_chat(user.pack()).await?; + + if !self.allow_from_self && user.is_self() { return Ok(()); } } - Update::InlineSend(inline_send) => { - let user = inline_send.sender(); + } + Update::CallbackQuery(query) => { + if let Chat::User(user) = query.sender() { + cache.save_chat(user.pack()).await?; - if user.is_self() { + if !self.allow_from_self && user.is_self() { return Ok(()); } } - _ => {} - }; - } + } + Update::InlineQuery(query) => { + let user = query.sender(); + cache.save_chat(user.pack()).await?; + + if !self.allow_from_self && user.is_self() { + return Ok(()); + } + } + Update::InlineSend(inline_send) => { + let user = inline_send.sender(); + cache.save_chat(user.pack()).await?; + + if user.is_self() { + return Ok(()); + } + } + _ => {} + }; for router in self.routers.iter_mut() { match router diff --git a/lib/ferogram/src/lib.rs b/lib/ferogram/src/lib.rs index 4b82047..77cdc41 100644 --- a/lib/ferogram/src/lib.rs +++ b/lib/ferogram/src/lib.rs @@ -10,6 +10,7 @@ //! //! The main module of the library. +mod cache; mod client; mod context; pub(crate) mod di; @@ -25,6 +26,7 @@ mod plugin; mod router; pub mod utils; +pub(crate) use cache::Cache; pub use client::{Client, ClientBuilder as Builder}; pub use context::Context; pub use di::Injector;