1use tokio::sync::{
2 RwLock,
3 mpsc::{
4 self,
5 error::{SendError, TryRecvError},
6 },
7};
8
9use crate::{EGUI_CONTEXT, OUTSTANDING_TRANSACTIONS};
10
11const CHANNEL_SIZE: usize = 100;
12
13pub struct IngressReceiver<T> {
14 sc_messages: mpsc::Receiver<T>,
15}
16
17impl<T> IngressReceiver<T> {
18 pub fn new(sc_messages: mpsc::Receiver<T>) -> Self {
19 Self { sc_messages }
20 }
21
22 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
23 let result = self.sc_messages.try_recv();
24 match result {
25 Ok(result) => {
26 OUTSTANDING_TRANSACTIONS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
27 Ok(result)
28 }
29 Err(TryRecvError::Empty) => Err(TryRecvError::Empty),
30 Err(_) => {
31 OUTSTANDING_TRANSACTIONS.store(0, std::sync::atomic::Ordering::SeqCst);
32 Err(TryRecvError::Disconnected)
33 }
34 }
35 }
36}
37
38pub struct IngressSender<T> {
39 sc_messages: mpsc::Sender<T>,
40}
41
42impl<T> IngressSender<T> {
43 pub fn new(sc_messages: mpsc::Sender<T>) -> Self {
44 Self { sc_messages }
45 }
46
47 pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
48 let result = self.sc_messages.send(message).await;
49 OUTSTANDING_TRANSACTIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
50 if let Some(ctx) = EGUI_CONTEXT.read().unwrap().as_ref() {
51 ctx.request_repaint();
52 }
53 result
54 }
55}
56
57#[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
58pub(crate) struct IngressHandler<T> {
59 pub tx: IngressSender<T>,
60 pub rx: RwLock<Option<IngressReceiver<T>>>,
61}
62impl<T> IngressHandler<T> {
63 #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
64 pub fn new() -> Self {
65 let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
66 Self {
67 tx: IngressSender::new(tx),
68 rx: RwLock::new(Some(IngressReceiver::new(rx))),
69 }
70 }
71}
72
73#[cfg(target_arch = "wasm32")]
74pub(crate) struct GlobalChannelTx<T> {
75 pub tx: mpsc::Sender<T>,
76 #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
77 pub rx: RwLock<mpsc::Receiver<T>>,
78}
79#[cfg(target_arch = "wasm32")]
80impl<T> GlobalChannelTx<T> {
81 pub fn new() -> Self {
82 let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
83 Self {
84 tx,
85 rx: RwLock::new(rx),
86 }
87 }
88}