databeam/cache/
redis.rs

1//! Redis connection manager
2use redis::Commands;
3use serde::{de::DeserializeOwned, Serialize};
4
5use super::{Cache, TimedObject, EXPIRE_AT};
6
7#[derive(Clone)]
8pub struct RedisCache {
9    pub client: redis::Client,
10}
11
12impl Cache for RedisCache {
13    type Item = String;
14    type Client = redis::Connection;
15
16    async fn new() -> Self {
17        Self {
18            client: redis::Client::open("redis://127.0.0.1:6379").unwrap(),
19        }
20    }
21
22    async fn get_con(&self) -> Self::Client {
23        self.client.get_connection().unwrap()
24    }
25
26    async fn get(&self, id: Self::Item) -> Option<String> {
27        self.get_con().await.get(id).ok()
28    }
29
30    async fn set(&self, id: Self::Item, content: Self::Item) -> bool {
31        let mut c = self.get_con().await;
32        let res: Result<String, redis::RedisError> = c.set_ex(id, content, 604800);
33
34        res.is_ok()
35    }
36
37    async fn update(&self, id: Self::Item, content: Self::Item) -> bool {
38        self.set(id, content).await
39    }
40
41    async fn remove(&self, id: Self::Item) -> bool {
42        let mut c = self.get_con().await;
43        let res: Result<String, redis::RedisError> = c.del(id);
44
45        res.is_ok()
46    }
47
48    async fn remove_starting_with(&self, id: Self::Item) -> bool {
49        let mut c = self.get_con().await;
50
51        // get keys
52        let mut cmd = redis::cmd("DEL");
53        let keys: Result<Vec<String>, redis::RedisError> = c.keys(id);
54
55        for key in keys.unwrap() {
56            cmd.arg(key);
57        }
58
59        // remove
60        let res: Result<String, redis::RedisError> = cmd.query(&mut c);
61
62        res.is_ok()
63    }
64
65    async fn incr(&self, id: Self::Item) -> bool {
66        let mut c = self.get_con().await;
67        let res: Result<String, redis::RedisError> = c.incr(id, 1);
68
69        res.is_ok()
70    }
71
72    async fn decr(&self, id: Self::Item) -> bool {
73        let mut c = self.get_con().await;
74        let res: Result<String, redis::RedisError> = c.decr(id, 1);
75
76        res.is_ok()
77    }
78
79    async fn get_timed<T: Serialize + DeserializeOwned>(
80        &self,
81        id: Self::Item,
82    ) -> Option<TimedObject<T>> {
83        let mut c = self.get_con().await;
84        let res: Result<String, redis::RedisError> = c.get(&id);
85
86        match res {
87            Ok(d) => match serde_json::from_str::<TimedObject<T>>(&d) {
88                Ok(d) => {
89                    // check time
90                    let now = rainbeam_shared::epoch_timestamp(2024);
91
92                    if now - d.0 >= EXPIRE_AT {
93                        // expired key, remove and return None
94                        self.remove(id).await;
95                        return None;
96                    }
97
98                    // return
99                    Some(d)
100                }
101                Err(_) => None,
102            },
103            Err(_) => None,
104        }
105    }
106
107    async fn set_timed<T: Serialize + DeserializeOwned>(&self, id: Self::Item, content: T) -> bool {
108        let mut c = self.get_con().await;
109        let res: Result<String, redis::RedisError> = c.set(
110            id,
111            match serde_json::to_string::<TimedObject<T>>(&(
112                rainbeam_shared::epoch_timestamp(2024),
113                content,
114            )) {
115                Ok(s) => s,
116                Err(_) => return false,
117            },
118        );
119
120        res.is_ok()
121    }
122}