Skip to main content

libsurfer/
channels.rs

1use tokio::sync::{
2    RwLock,
3    mpsc::{
4        self,
5        error::{SendError, TryRecvError},
6    },
7};
8use tracing::error;
9
10use crate::{EGUI_CONTEXT, OUTSTANDING_TRANSACTIONS, message::Message};
11
12const CHANNEL_SIZE: usize = 100;
13
14pub struct IngressReceiver<T> {
15    sc_messages: mpsc::Receiver<T>,
16}
17
18impl<T> IngressReceiver<T> {
19    pub fn new(sc_messages: mpsc::Receiver<T>) -> Self {
20        Self { sc_messages }
21    }
22
23    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
24        let result = self.sc_messages.try_recv();
25        match result {
26            Ok(result) => {
27                OUTSTANDING_TRANSACTIONS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
28                Ok(result)
29            }
30            Err(TryRecvError::Empty) => Err(TryRecvError::Empty),
31            Err(_) => {
32                OUTSTANDING_TRANSACTIONS.store(0, std::sync::atomic::Ordering::SeqCst);
33                Err(TryRecvError::Disconnected)
34            }
35        }
36    }
37}
38
39pub struct IngressSender<T> {
40    sc_messages: mpsc::Sender<T>,
41}
42
43impl<T> IngressSender<T> {
44    pub fn new(sc_messages: mpsc::Sender<T>) -> Self {
45        Self { sc_messages }
46    }
47
48    pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
49        let result = self.sc_messages.send(message).await;
50        OUTSTANDING_TRANSACTIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
51        if let Some(ctx) = EGUI_CONTEXT.read().unwrap().as_ref() {
52            ctx.request_repaint();
53        }
54        result
55    }
56}
57
58#[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
59pub(crate) struct IngressHandler<T> {
60    pub tx: IngressSender<T>,
61    pub rx: RwLock<Option<IngressReceiver<T>>>,
62}
63impl<T> IngressHandler<T> {
64    #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
65    pub fn new() -> Self {
66        let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
67        Self {
68            tx: IngressSender::new(tx),
69            rx: RwLock::new(Some(IngressReceiver::new(rx))),
70        }
71    }
72}
73
74#[cfg(target_arch = "wasm32")]
75pub(crate) struct GlobalChannelTx<T> {
76    pub tx: mpsc::Sender<T>,
77    #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
78    pub rx: RwLock<mpsc::Receiver<T>>,
79}
80#[cfg(target_arch = "wasm32")]
81impl<T> GlobalChannelTx<T> {
82    pub fn new() -> Self {
83        let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
84        Self {
85            tx,
86            rx: RwLock::new(rx),
87        }
88    }
89}
90
91#[inline]
92pub fn checked_send(sender: &std::sync::mpsc::Sender<Message>, msg: Message) {
93    if let Err(e) = sender.send(msg) {
94        error!("Failed to send message: {e}");
95    }
96}
97
98#[inline]
99pub fn checked_send_many(sender: &std::sync::mpsc::Sender<Message>, msgs: Vec<Message>) {
100    for msg in msgs {
101        checked_send(sender, msg);
102    }
103}