libsurfer/
channels.rs

1use tokio::sync::{
2    mpsc::{
3        self,
4        error::{SendError, TryRecvError},
5    },
6    RwLock,
7};
8
9use crate::{EGUI_CONTEXT, OUTSTANDING_TRANSACTIONS};
10
11pub struct IngressReceiver<T> {
12    sc_messages: mpsc::Receiver<T>,
13}
14
15impl<T> IngressReceiver<T> {
16    pub fn new(sc_messages: mpsc::Receiver<T>) -> Self {
17        Self { sc_messages }
18    }
19
20    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
21        let result = self.sc_messages.try_recv();
22        match result {
23            Ok(result) => {
24                OUTSTANDING_TRANSACTIONS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
25                Ok(result)
26            }
27            Err(TryRecvError::Empty) => Err(TryRecvError::Empty),
28            Err(_) => {
29                OUTSTANDING_TRANSACTIONS.store(0, std::sync::atomic::Ordering::SeqCst);
30                Err(TryRecvError::Disconnected)
31            }
32        }
33    }
34}
35
36pub struct IngressSender<T> {
37    sc_messages: mpsc::Sender<T>,
38}
39
40impl<T> IngressSender<T> {
41    pub fn new(sc_messages: mpsc::Sender<T>) -> Self {
42        Self { sc_messages }
43    }
44
45    pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
46        let result = self.sc_messages.send(message).await;
47        OUTSTANDING_TRANSACTIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
48        if let Some(ctx) = EGUI_CONTEXT.read().unwrap().as_ref() {
49            ctx.request_repaint();
50        }
51        result
52    }
53}
54
55#[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
56pub(crate) struct IngressHandler<T> {
57    pub tx: IngressSender<T>,
58    pub rx: RwLock<Option<IngressReceiver<T>>>,
59}
60impl<T> IngressHandler<T> {
61    #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
62    pub fn new() -> Self {
63        let (tx, rx) = mpsc::channel(100);
64        Self {
65            tx: IngressSender::new(tx),
66            rx: RwLock::new(Some(IngressReceiver::new(rx))),
67        }
68    }
69}
70
71#[cfg(target_arch = "wasm32")]
72pub(crate) struct GlobalChannelTx<T> {
73    pub tx: mpsc::Sender<T>,
74    #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
75    pub rx: RwLock<mpsc::Receiver<T>>,
76}
77#[cfg(target_arch = "wasm32")]
78impl<T> GlobalChannelTx<T> {
79    pub fn new() -> Self {
80        let (tx, rx) = mpsc::channel(100);
81        Self {
82            tx,
83            rx: RwLock::new(rx),
84        }
85    }
86}