libsurfer/wcp/
wcp_server.rs

1use bytes::{Buf, BytesMut};
2use color_eyre::eyre::Result;
3use eframe::egui::Context;
4use serde::Serialize;
5use serde_json::Error as serde_Error;
6use std::sync::{
7    atomic::{AtomicBool, Ordering},
8    Arc,
9};
10use std::time::Duration;
11use tokio::net::tcp::{ReadHalf, WriteHalf};
12use tokio::net::{TcpListener, TcpStream};
13
14#[cfg(target_arch = "wasm32")]
15use crate::channels::IngressSender;
16use log::{error, info, warn};
17use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
18use tokio::sync::mpsc::Receiver;
19#[cfg(not(target_arch = "wasm32"))]
20use tokio::sync::mpsc::Sender;
21use tokio_stream::wrappers::ReceiverStream;
22use tokio_stream::StreamExt;
23
24use super::{proto::WcpCSMessage, proto::WcpCommand, proto::WcpSCMessage};
25
26struct WcpCSReader<'a> {
27    reader: BufReader<ReadHalf<'a>>,
28    buffer: BytesMut,
29}
30
31impl<'a> WcpCSReader<'a> {
32    pub fn new(stream: ReadHalf<'a>) -> Self {
33        WcpCSReader {
34            reader: BufReader::new(stream),
35            buffer: BytesMut::with_capacity(8 * 1024),
36        }
37    }
38
39    pub async fn read_frame(&mut self) -> Result<Option<WcpCSMessage>, serde_Error> {
40        loop {
41            if let Some(frame) = self.try_decode_frame()? {
42                return Ok(Some(frame));
43            }
44
45            match self.reader.read_buf(&mut self.buffer).await {
46                Ok(0) => {
47                    return Err(serde_Error::io(std::io::Error::new(
48                        std::io::ErrorKind::UnexpectedEof,
49                        "EOF",
50                    )))
51                }
52                Ok(_) => (),
53                Err(e) => return Err(serde_Error::io(e)),
54            }
55        }
56    }
57
58    fn try_decode_frame(&mut self) -> Result<Option<WcpCSMessage>, serde_Error> {
59        match self.buffer.iter().position(|&x| x == 0) {
60            Some(position) => {
61                let frame_data = self.buffer.split_to(position);
62                self.buffer.advance(1);
63                let msg: Result<WcpCSMessage, _> = serde_json::from_slice(&frame_data);
64                match msg {
65                    Ok(msg) => Ok(Some(msg)),
66                    Err(e) => Err(e),
67                }
68            }
69            None => Ok(None),
70        }
71    }
72}
73
74pub struct WcpServer {
75    listener: Option<TcpListener>,
76    stream: Option<TcpStream>,
77    #[cfg(target_arch = "wasm32")]
78    sender: IngressSender<WcpCSMessage>,
79    #[cfg(not(target_arch = "wasm32"))]
80    sender: Sender<WcpCSMessage>,
81    receiver: ReceiverStream<WcpSCMessage>,
82    stop_signal: Arc<AtomicBool>,
83    running_signal: Arc<AtomicBool>,
84    greeted_signal: Arc<AtomicBool>,
85    ctx: Option<Arc<Context>>,
86}
87
88impl WcpServer {
89    #[allow(clippy::too_many_arguments)]
90    pub async fn new(
91        address: String,
92        initiate: bool,
93        #[cfg(target_arch = "wasm32")] c2s_sender: IngressSender<WcpCSMessage>,
94        #[cfg(not(target_arch = "wasm32"))] c2s_sender: Sender<WcpCSMessage>,
95        s2c_receiver: Receiver<WcpSCMessage>,
96        stop_signal: Arc<AtomicBool>,
97        running_signal: Arc<AtomicBool>,
98        greeted_signal: Arc<AtomicBool>,
99        ctx: Option<Arc<Context>>,
100    ) -> Result<Self> {
101        let listener;
102        let stream;
103        if initiate {
104            let the_stream = TcpStream::connect(address).await?;
105            stream = Some(the_stream);
106            listener = None;
107        } else {
108            let the_listener = TcpListener::bind(address).await?;
109            info!(
110                "WCP Server listening on port {}",
111                the_listener.local_addr().unwrap()
112            );
113            listener = Some(the_listener);
114            stream = None;
115        }
116        Ok(WcpServer {
117            listener,
118            stream,
119            sender: c2s_sender,
120            receiver: ReceiverStream::new(s2c_receiver),
121            stop_signal,
122            running_signal,
123            greeted_signal,
124            ctx,
125        })
126    }
127
128    pub async fn run(&mut self) {
129        if self.listener.is_some() {
130            self.listen().await;
131        } else if self.stream.is_some() {
132            self.initiate().await;
133        } else {
134            error!("Internal error: calling `run` with both listener and stream unset");
135        }
136        info!("WCP shutting down");
137        self.greeted_signal.store(false, Ordering::Relaxed);
138        self.running_signal.store(false, Ordering::Relaxed);
139        self.stop_signal.store(true, Ordering::Relaxed);
140    }
141
142    async fn listen(&mut self) {
143        let listener = self.listener.take().unwrap();
144        loop {
145            let stop_signal_clone = self.stop_signal.clone();
146            let stop_signal_waiter = async {
147                while !stop_signal_clone.load(Ordering::Relaxed) {
148                    tokio::time::sleep(Duration::from_millis(100)).await;
149                }
150            };
151
152            tokio::select! {
153                result = listener.accept() => {
154                    match result {
155                        Ok((stream, _addr)) => self.handle_connection(stream).await,
156                        Err(ref e)
157                            if [std::io::ErrorKind::WouldBlock, std::io::ErrorKind::TimedOut]
158                                .contains(&e.kind()) =>
159                        {
160                            continue
161                        }
162                        Err(e) => warn!("WCP Connection failed: {e}"),
163                    }
164                }
165
166                _ = stop_signal_waiter => {
167                    break;
168                }
169            }
170            self.greeted_signal.store(false, Ordering::Relaxed);
171        }
172    }
173
174    async fn initiate(&mut self) {
175        let stream = self.stream.take().unwrap();
176        match self.handle_client(stream).await {
177            Err(error) => warn!("WCP Client disconnected with error: {error:#?}"),
178            Ok(()) => info!("WCP client disconnected"),
179        }
180    }
181
182    async fn handle_connection(&mut self, stream: TcpStream) {
183        info!("WCP New connection: {}", stream.peer_addr().unwrap());
184
185        //handle connection from client
186        match self.handle_client(stream).await {
187            Err(error) => warn!("WCP Client disconnected with error: {error:#?}"),
188            Ok(()) => info!("WCP client disconnected"),
189        }
190    }
191
192    async fn send_message<M: Serialize>(&mut self, stream: &mut WriteHalf<'_>, message: &M) {
193        match serde_json::to_string(message) {
194            Ok(message) => {
195                if let Err(error) = stream.write_all(message.as_bytes()).await {
196                    warn!("WCP Sending of message failed: {error:#?}")
197                }
198            }
199            Err(error) => warn!("Serializing message failed: {error:#?}"),
200        }
201        if let Err(e) = stream.write_all(b"\0").await {
202            warn!("Failed to send WCP message: {e:#?}");
203        }
204        if let Err(e) = stream.flush().await {
205            warn!("Failed to send WCP message: {e:#?}");
206        }
207    }
208
209    async fn handle_client(&mut self, mut stream: TcpStream) -> Result<(), serde_Error> {
210        let (reader, mut writer) = stream.split();
211        let mut reader = WcpCSReader::new(reader);
212
213        loop {
214            let stop_signal_clone = self.stop_signal.clone();
215            let stop_signal_waiter = async {
216                while !stop_signal_clone.load(Ordering::Relaxed) {
217                    tokio::time::sleep(Duration::from_millis(100)).await;
218                }
219            };
220
221            tokio::select! {
222                msg = reader.read_frame() => {
223                    let msg = match msg? {
224                        Some(msg) => msg,
225                        None => continue,
226                    };
227
228                    if let WcpCSMessage::command(WcpCommand::shutdowmn) = msg {
229                        return Ok(());
230                    }
231
232                    if let Err(e) = self.sender.send(msg).await {
233                        error!("Failed to send wcp message into main thread {e}")
234                    };
235
236                    // request repaint of the Surfer UI
237                    if let Some(ctx) = &self.ctx {
238                        ctx.request_repaint();
239                    }
240                }
241
242                Some(s2c) = self.receiver.next() => {
243                    self.send_message(&mut writer, &s2c).await;
244                }
245
246                _ = stop_signal_waiter => {
247                    return Err(serde_Error::io(std::io::Error::new(
248                        std::io::ErrorKind::ConnectionAborted,
249                        "Server terminated",
250                    )));
251                }
252            }
253        }
254    }
255}