1use tokio::sync::{
2 RwLock,
3 mpsc::{
4 self,
5 error::{SendError, TryRecvError},
6 },
7};
8use tracing::error;
9
10use crate::{EGUI_CONTEXT, OUTSTANDING_TRANSACTIONS, message::Message};
11
12const CHANNEL_SIZE: usize = 100;
13
14pub struct IngressReceiver<T> {
15 sc_messages: mpsc::Receiver<T>,
16}
17
18impl<T> IngressReceiver<T> {
19 pub fn new(sc_messages: mpsc::Receiver<T>) -> Self {
20 Self { sc_messages }
21 }
22
23 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
24 let result = self.sc_messages.try_recv();
25 match result {
26 Ok(result) => {
27 OUTSTANDING_TRANSACTIONS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
28 Ok(result)
29 }
30 Err(TryRecvError::Empty) => Err(TryRecvError::Empty),
31 Err(_) => {
32 OUTSTANDING_TRANSACTIONS.store(0, std::sync::atomic::Ordering::SeqCst);
33 Err(TryRecvError::Disconnected)
34 }
35 }
36 }
37}
38
39pub struct IngressSender<T> {
40 sc_messages: mpsc::Sender<T>,
41}
42
43impl<T> IngressSender<T> {
44 pub fn new(sc_messages: mpsc::Sender<T>) -> Self {
45 Self { sc_messages }
46 }
47
48 pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
49 let result = self.sc_messages.send(message).await;
50 OUTSTANDING_TRANSACTIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
51 if let Some(ctx) = EGUI_CONTEXT.read().unwrap().as_ref() {
52 ctx.request_repaint();
53 }
54 result
55 }
56}
57
58#[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
59pub(crate) struct IngressHandler<T> {
60 pub tx: IngressSender<T>,
61 pub rx: RwLock<Option<IngressReceiver<T>>>,
62}
63impl<T> IngressHandler<T> {
64 #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
65 pub fn new() -> Self {
66 let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
67 Self {
68 tx: IngressSender::new(tx),
69 rx: RwLock::new(Some(IngressReceiver::new(rx))),
70 }
71 }
72}
73
74#[cfg(target_arch = "wasm32")]
75pub(crate) struct GlobalChannelTx<T> {
76 pub tx: mpsc::Sender<T>,
77 #[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
78 pub rx: RwLock<mpsc::Receiver<T>>,
79}
80#[cfg(target_arch = "wasm32")]
81impl<T> GlobalChannelTx<T> {
82 pub fn new() -> Self {
83 let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
84 Self {
85 tx,
86 rx: RwLock::new(rx),
87 }
88 }
89}
90
91#[inline]
92pub fn checked_send(sender: &std::sync::mpsc::Sender<Message>, msg: Message) {
93 if let Err(e) = sender.send(msg) {
94 error!("Failed to send message: {e}");
95 }
96}
97
98#[inline]
99pub fn checked_send_many(sender: &std::sync::mpsc::Sender<Message>, msgs: Vec<Message>) {
100 for msg in msgs {
101 checked_send(sender, msg);
102 }
103}