libsurfer/
channels.rs
1use tokio::sync::{
2 mpsc::{
3 self,
4 error::{SendError, TryRecvError},
5 },
6 RwLock,
7};
8
9use crate::{EGUI_CONTEXT, OUTSTANDING_TRANSACTIONS};
10
11pub struct IngressReceiver<T> {
12 sc_messages: mpsc::Receiver<T>,
13}
14
15impl<T> IngressReceiver<T> {
16 pub fn new(sc_messages: mpsc::Receiver<T>) -> Self {
17 Self { sc_messages }
18 }
19
20 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
21 let result = self.sc_messages.try_recv();
22 match result {
23 Ok(result) => {
24 OUTSTANDING_TRANSACTIONS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
25 Ok(result)
26 }
27 Err(TryRecvError::Empty) => Err(TryRecvError::Empty),
28 Err(_) => {
29 OUTSTANDING_TRANSACTIONS.store(0, std::sync::atomic::Ordering::SeqCst);
30 Err(TryRecvError::Disconnected)
31 }
32 }
33 }
34}
35
36pub struct IngressSender<T> {
37 sc_messages: mpsc::Sender<T>,
38}
39
40impl<T> IngressSender<T> {
41 pub fn new(sc_messages: mpsc::Sender<T>) -> Self {
42 Self { sc_messages }
43 }
44
45 pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
46 let result = self.sc_messages.send(message).await;
47 OUTSTANDING_TRANSACTIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
48 if let Some(ctx) = EGUI_CONTEXT.read().unwrap().as_ref() {
49 ctx.request_repaint();
50 }
51 result
52 }
53}
54
55#[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
56pub(crate) struct IngressHandler<T> {
57 pub tx: IngressSender<T>,
58 pub rx: RwLock<Option<IngressReceiver<T>>>,
59}
60impl<T> IngressHandler<T> {
61 #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
62 pub fn new() -> Self {
63 let (tx, rx) = mpsc::channel(100);
64 Self {
65 tx: IngressSender::new(tx),
66 rx: RwLock::new(Some(IngressReceiver::new(rx))),
67 }
68 }
69}
70
71#[cfg(target_arch = "wasm32")]
72pub(crate) struct GlobalChannelTx<T> {
73 pub tx: mpsc::Sender<T>,
74 #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
75 pub rx: RwLock<mpsc::Receiver<T>>,
76}
77#[cfg(target_arch = "wasm32")]
78impl<T> GlobalChannelTx<T> {
79 pub fn new() -> Self {
80 let (tx, rx) = mpsc::channel(100);
81 Self {
82 tx,
83 rx: RwLock::new(rx),
84 }
85 }
86}