Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions lib/ferogram/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ ferogram-macros = { path = "../ferogram-macros", version = "0.1.0", optional = t
grammers-client = { git = "https://github.com/Lonami/grammers.git", version = "0.7.0" }
grammers-mtsender = { git = "https://github.com/Lonami/grammers.git", version = "0.7.0" }

log = "0.4.25"
log = "^0.4"
url = { version = "^2.5", optional = true }
mlua = { version = "^0.10", features = ["async", "lua54", "module"], optional = true }
pyo3 = { version = "^0.23", features = ["experimental-async", "macros"], optional = true }
regex = "1.11.1"
regex = "^1.11"
tokio = { version = "^1.43", features = ["fs", "rt", "signal", "sync"] }
rpassword = "7.3.1"
bincode = { version = "^2.0" }
rpassword = "^7.3"
async-trait = "^0.1"
futures-util = { version = "^0.3", default-features = false, features = ["alloc"] }
async-recursion = "^1.1"
Expand Down
148 changes: 148 additions & 0 deletions lib/ferogram/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2024-2025 - Andriel Ferreira
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Cache module.
use std::{collections::HashMap, path::Path, sync::Arc};

use bincode::{
Decode, Encode, config,
de::Decoder,
enc::Encoder,
error::{DecodeError, EncodeError},
};
use grammers_client::types::PackedChat;
use tokio::sync::RwLock;

/// The cache.
#[derive(Clone, Debug, Default)]
pub struct Cache {
/// The inner cache.
inner: Arc<RwLock<InnerCache>>,
}

impl Cache {
/// Load a previous cache instance from a file or create one if it doesn’t exist.
pub fn load_file_or_create<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
// try to open the cache file.
if let Ok(mut file) = std::fs::File::open(&path) {
// get the standard config.
let config = config::standard();

// construct the inner cache.
let inner: InnerCache = bincode::decode_from_std_read(&mut file, config)?;

log::debug!("loaded {} chats from cache", inner.chats.len());

Ok(Self {
inner: Arc::new(RwLock::new(inner)),
})
} else {
log::debug!("no cache was found, generating a new one");

Ok(Self {
inner: Arc::new(RwLock::new(InnerCache::default())),
})
}
}

/// Try to save the cache to a file.
pub async fn save_to_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
// delete the cache file if it exists.
if std::fs::exists(&path)? {
std::fs::remove_file(&path)?;
}

// create the cache file.
let mut file = std::fs::File::create(path)?;

// get the standard config.
let config = config::standard();

// clone the inner.
let inner = self.inner.write().await.clone();

// write to the file.
bincode::encode_into_std_write(inner, &mut file, config)?;

Ok(())
}

/// Gets a saved chat by its ID.
pub fn get_chat(&self, chat_id: i64) -> Option<PackedChat> {
let inner = self.inner.try_read().expect("failed to get saved chats");

inner.chats.get(&chat_id).cloned()
}

/// Saves a chat in the cache.
pub(crate) async fn save_chat(&self, chat: PackedChat) -> crate::Result<()> {
let mut inner = self.inner.write().await;

if !inner.chat_exists(chat.id) {
log::trace!("saved a new chat: {:?}", chat);

inner.push_chat(chat);
}

Ok(())
}
}

/// The inner cache.
#[derive(Clone, Debug, Default)]
struct InnerCache {
/// The packed chat map.
chats: HashMap<i64, PackedChat>,
}

impl InnerCache {
/// Pushes a chat.
pub fn push_chat(&mut self, chat: PackedChat) {
self.chats.entry(chat.id).or_insert(chat);
}

/// Checks if a chat exists.
pub fn chat_exists(&self, chat_id: i64) -> bool {
self.chats.contains_key(&chat_id)
}
}

impl Encode for InnerCache {
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
// convert chats to bytes.
let chats = self
.chats
.clone()
.into_iter()
.map(|(id, chat)| (id, chat.to_bytes()))
.collect::<HashMap<_, _>>();

Encode::encode(&chats, encoder)?;

Ok(())
}
}

impl<Context> Decode<Context> for InnerCache {
fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
// convert bytes to chats.
let encoded_chats: HashMap<i64, [u8; 17]> = Decode::decode(decoder)?;
let chats = encoded_chats
.into_iter()
.map(|(id, bytes)| {
(
id,
PackedChat::from_bytes(&bytes).expect("failed to decode chat bytes"),
)
})
.collect::<HashMap<_, _>>();

Ok(Self { chats })
}
}
48 changes: 42 additions & 6 deletions lib/ferogram/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

//! Client module.
use std::path::Path;
use std::{path::Path, time::Duration};

use grammers_client::{
Config, InitParams, ReconnectionPolicy, SignInError, grammers_tl_types as tl, session::Session,
};
use grammers_mtsender::ServerAddr;

use crate::{Context, Dispatcher, ErrorHandler, Result, di, utils::prompt};
use crate::{Cache, Context, Dispatcher, ErrorHandler, Result, di, utils::prompt};

/// Wrapper about grammers' `Client` instance.
pub struct Client {
Expand All @@ -26,14 +26,16 @@ pub struct Client {
/// The inner grammers' `Client` instance.
inner_client: grammers_client::Client,

/// The session cache.
cache: Cache,
/// The session file path.
session_file: Option<String>,

/// Whether the client is connected.
is_connected: bool,
/// Whether is to update Telegram's bot commands.
set_bot_commands: bool,
/// Wheter is to wait for a `Ctrl + C` signal to close the connection and exit the app.
/// Whether is to wait for a `Ctrl + C` signal to close the connection and exit the app.
wait_for_ctrl_c: bool,

/// The global error handler.
Expand Down Expand Up @@ -226,7 +228,7 @@ impl Client {
pub fn new_ctx(&self) -> Context {
let upd_receiver = self.dispatcher.upd_sender.subscribe();

Context::new(&self.inner_client, upd_receiver)
Context::new(&self.cache, &self.inner_client, upd_receiver)
}

/// Listen to Telegram's updates and send them to the dispatcher's routers.
Expand All @@ -239,6 +241,7 @@ impl Client {
/// # }
/// ```
pub async fn run(self) -> Result<()> {
let cache = self.cache.clone();
let handle = self.inner_client;
let dispatcher = self.dispatcher;
let err_handler = self.err_handler;
Expand Down Expand Up @@ -286,12 +289,13 @@ impl Client {
loop {
match handle.next_update().await {
Ok(update) => {
let cache = cache.clone();
let client = handle.clone();
let mut dp = dispatcher.clone();
let err_handler = err_handler.clone();

tokio::task::spawn(async move {
if let Err(e) = dp.handle_update(&client, &update).await {
if let Err(e) = dp.handle_update(&cache, &client, &update).await {
if let Some(err_handler) = err_handler.as_ref() {
err_handler.run(client, update, e).await;
} else {
Expand All @@ -308,6 +312,21 @@ impl Client {
});

if self.wait_for_ctrl_c {
let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
let cache_file = format!("{}.cc", session_file);

let cache = self.cache.clone();
let path = cache_file.clone();
tokio::task::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;

if let Err(e) = cache.save_to_file(&path).await {
log::error!("failed to auto-save cache: {}", e);
}
}
});

tokio::signal::ctrl_c().await?;

if let Some(mut handler) = self.exit_handler {
Expand All @@ -317,8 +336,8 @@ impl Client {
handler.handle(&mut injector).await.unwrap();
}

let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
client.session().save_to_file(session_file)?;
self.cache.save_to_file(cache_file).await?;
}

Ok(())
Expand All @@ -335,6 +354,7 @@ impl Client {
/// ```
pub async fn keep_alive(self) -> Result<()> {
let handle = self.inner_client;
let client = handle.clone();

tokio::task::spawn(async move {
loop {
Expand All @@ -344,6 +364,12 @@ impl Client {

if self.wait_for_ctrl_c {
tokio::signal::ctrl_c().await?;

let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
let cache_file = &format!("{}.cc", session_file);

client.session().save_to_file(session_file)?;
self.cache.save_to_file(cache_file).await?;
}

Ok(())
Expand Down Expand Up @@ -427,6 +453,15 @@ impl ClientBuilder {
/// ```
pub async fn build(self) -> Result<Client> {
let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
let cache_file = &format!("{}.cc", session_file);

// check if the session file don't exists and the cache file exists.
if !std::fs::exists(session_file)? && std::fs::exists(cache_file)? {
// delete the cache file if the session file changes.
std::fs::remove_file(cache_file)?;

log::debug!("session file changed, deleting the cache file");
}

let inner_client = grammers_client::Client::connect(Config {
session: Session::load_file_or_create(session_file)?,
Expand All @@ -441,6 +476,7 @@ impl ClientBuilder {
client_type: self.client_type,
inner_client,

cache: Cache::load_file_or_create(cache_file)?,
session_file: Some(session_file.to_string()),

is_connected: false,
Expand Down
17 changes: 14 additions & 3 deletions lib/ferogram/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use tokio::{
sync::{Mutex, broadcast::Receiver},
};

use crate::{Filter, utils::bytes_to_string};
use crate::{Cache, Filter, utils::bytes_to_string};

/// The context of an update.
#[derive(Debug)]
pub struct Context {
// The client that received the update.
/// The session cache.
pub cache: Cache,
/// The client that received the update.
client: grammers_client::Client,
/// The update itself.
update: Option<Update>,
Expand All @@ -38,8 +40,13 @@ pub struct Context {

impl Context {
/// Creates a new context.
pub fn new(client: &grammers_client::Client, upd_receiver: Receiver<Update>) -> Self {
pub fn new(
cache: &Cache,
client: &grammers_client::Client,
upd_receiver: Receiver<Update>,
) -> Self {
Self {
cache: cache.clone(),
client: client.clone(),
update: None,
upd_receiver: Arc::new(Mutex::new(upd_receiver)),
Expand All @@ -48,11 +55,13 @@ impl Context {

/// Creates a new context with an update.
pub fn with(
cache: &Cache,
client: &grammers_client::Client,
update: &Update,
upd_receiver: Receiver<Update>,
) -> Self {
Self {
cache: cache.clone(),
client: client.clone(),
update: Some(update.clone()),
upd_receiver: Arc::new(Mutex::new(upd_receiver)),
Expand All @@ -77,6 +86,7 @@ impl Context {
.expect("Failed to lock receiver");

Self {
cache: self.cache.clone(),
client: self.client.clone(),
update: Some(update.clone()),
upd_receiver: Arc::new(Mutex::new(upd_receiver.resubscribe())),
Expand Down Expand Up @@ -1259,6 +1269,7 @@ impl Clone for Context {
.expect("Failed to lock receiver");

Self {
cache: self.cache.clone(),
client: self.client.clone(),
update: self.update.clone(),
upd_receiver: Arc::new(Mutex::new(upd_receiver.resubscribe())),
Expand Down
Loading
Loading