Skip to main content

libsurfer/
channels.rs

1use tokio::sync::{
2    RwLock,
3    mpsc::{
4        self,
5        error::{SendError, TryRecvError},
6    },
7};
8
9use crate::{EGUI_CONTEXT, OUTSTANDING_TRANSACTIONS};
10
11const CHANNEL_SIZE: usize = 100;
12
13pub struct IngressReceiver<T> {
14    sc_messages: mpsc::Receiver<T>,
15}
16
17impl<T> IngressReceiver<T> {
18    pub fn new(sc_messages: mpsc::Receiver<T>) -> Self {
19        Self { sc_messages }
20    }
21
22    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
23        let result = self.sc_messages.try_recv();
24        match result {
25            Ok(result) => {
26                OUTSTANDING_TRANSACTIONS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
27                Ok(result)
28            }
29            Err(TryRecvError::Empty) => Err(TryRecvError::Empty),
30            Err(_) => {
31                OUTSTANDING_TRANSACTIONS.store(0, std::sync::atomic::Ordering::SeqCst);
32                Err(TryRecvError::Disconnected)
33            }
34        }
35    }
36}
37
38pub struct IngressSender<T> {
39    sc_messages: mpsc::Sender<T>,
40}
41
42impl<T> IngressSender<T> {
43    pub fn new(sc_messages: mpsc::Sender<T>) -> Self {
44        Self { sc_messages }
45    }
46
47    pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
48        let result = self.sc_messages.send(message).await;
49        OUTSTANDING_TRANSACTIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
50        if let Some(ctx) = EGUI_CONTEXT.read().unwrap().as_ref() {
51            ctx.request_repaint();
52        }
53        result
54    }
55}
56
57#[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
58pub(crate) struct IngressHandler<T> {
59    pub tx: IngressSender<T>,
60    pub rx: RwLock<Option<IngressReceiver<T>>>,
61}
62impl<T> IngressHandler<T> {
63    #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
64    pub fn new() -> Self {
65        let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
66        Self {
67            tx: IngressSender::new(tx),
68            rx: RwLock::new(Some(IngressReceiver::new(rx))),
69        }
70    }
71}
72
73#[cfg(target_arch = "wasm32")]
74pub(crate) struct GlobalChannelTx<T> {
75    pub tx: mpsc::Sender<T>,
76    #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
77    pub rx: RwLock<mpsc::Receiver<T>>,
78}
79#[cfg(target_arch = "wasm32")]
80impl<T> GlobalChannelTx<T> {
81    pub fn new() -> Self {
82        let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
83        Self {
84            tx,
85            rx: RwLock::new(rx),
86        }
87    }
88}