libsurfer/wcp/
wcp_server.rs
1use bytes::{Buf, BytesMut};
2use color_eyre::eyre::Result;
3use eframe::egui::Context;
4use serde::Serialize;
5use serde_json::Error as serde_Error;
6use std::sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9};
10use std::time::Duration;
11use tokio::net::tcp::{ReadHalf, WriteHalf};
12use tokio::net::{TcpListener, TcpStream};
13
14#[cfg(target_arch = "wasm32")]
15use crate::channels::IngressSender;
16use log::{error, info, warn};
17use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
18use tokio::sync::mpsc::Receiver;
19#[cfg(not(target_arch = "wasm32"))]
20use tokio::sync::mpsc::Sender;
21use tokio_stream::wrappers::ReceiverStream;
22use tokio_stream::StreamExt;
23
24use super::{proto::WcpCSMessage, proto::WcpCommand, proto::WcpSCMessage};
25
26struct WcpCSReader<'a> {
27 reader: BufReader<ReadHalf<'a>>,
28 buffer: BytesMut,
29}
30
31impl<'a> WcpCSReader<'a> {
32 pub fn new(stream: ReadHalf<'a>) -> Self {
33 WcpCSReader {
34 reader: BufReader::new(stream),
35 buffer: BytesMut::with_capacity(8 * 1024),
36 }
37 }
38
39 pub async fn read_frame(&mut self) -> Result<Option<WcpCSMessage>, serde_Error> {
40 loop {
41 if let Some(frame) = self.try_decode_frame()? {
42 return Ok(Some(frame));
43 }
44
45 match self.reader.read_buf(&mut self.buffer).await {
46 Ok(0) => {
47 return Err(serde_Error::io(std::io::Error::new(
48 std::io::ErrorKind::UnexpectedEof,
49 "EOF",
50 )))
51 }
52 Ok(_) => (),
53 Err(e) => return Err(serde_Error::io(e)),
54 }
55 }
56 }
57
58 fn try_decode_frame(&mut self) -> Result<Option<WcpCSMessage>, serde_Error> {
59 match self.buffer.iter().position(|&x| x == 0) {
60 Some(position) => {
61 let frame_data = self.buffer.split_to(position);
62 self.buffer.advance(1);
63 let msg: Result<WcpCSMessage, _> = serde_json::from_slice(&frame_data);
64 match msg {
65 Ok(msg) => Ok(Some(msg)),
66 Err(e) => Err(e),
67 }
68 }
69 None => Ok(None),
70 }
71 }
72}
73
74pub struct WcpServer {
75 listener: Option<TcpListener>,
76 stream: Option<TcpStream>,
77 #[cfg(target_arch = "wasm32")]
78 sender: IngressSender<WcpCSMessage>,
79 #[cfg(not(target_arch = "wasm32"))]
80 sender: Sender<WcpCSMessage>,
81 receiver: ReceiverStream<WcpSCMessage>,
82 stop_signal: Arc<AtomicBool>,
83 running_signal: Arc<AtomicBool>,
84 greeted_signal: Arc<AtomicBool>,
85 ctx: Option<Arc<Context>>,
86}
87
88impl WcpServer {
89 #[allow(clippy::too_many_arguments)]
90 pub async fn new(
91 address: String,
92 initiate: bool,
93 #[cfg(target_arch = "wasm32")] c2s_sender: IngressSender<WcpCSMessage>,
94 #[cfg(not(target_arch = "wasm32"))] c2s_sender: Sender<WcpCSMessage>,
95 s2c_receiver: Receiver<WcpSCMessage>,
96 stop_signal: Arc<AtomicBool>,
97 running_signal: Arc<AtomicBool>,
98 greeted_signal: Arc<AtomicBool>,
99 ctx: Option<Arc<Context>>,
100 ) -> Result<Self> {
101 let listener;
102 let stream;
103 if initiate {
104 let the_stream = TcpStream::connect(address).await?;
105 stream = Some(the_stream);
106 listener = None;
107 } else {
108 let the_listener = TcpListener::bind(address).await?;
109 info!(
110 "WCP Server listening on port {}",
111 the_listener.local_addr().unwrap()
112 );
113 listener = Some(the_listener);
114 stream = None;
115 }
116 Ok(WcpServer {
117 listener,
118 stream,
119 sender: c2s_sender,
120 receiver: ReceiverStream::new(s2c_receiver),
121 stop_signal,
122 running_signal,
123 greeted_signal,
124 ctx,
125 })
126 }
127
128 pub async fn run(&mut self) {
129 if self.listener.is_some() {
130 self.listen().await;
131 } else if self.stream.is_some() {
132 self.initiate().await;
133 } else {
134 error!("Internal error: calling `run` with both listener and stream unset");
135 }
136 info!("WCP shutting down");
137 self.greeted_signal.store(false, Ordering::Relaxed);
138 self.running_signal.store(false, Ordering::Relaxed);
139 self.stop_signal.store(true, Ordering::Relaxed);
140 }
141
142 async fn listen(&mut self) {
143 let listener = self.listener.take().unwrap();
144 loop {
145 let stop_signal_clone = self.stop_signal.clone();
146 let stop_signal_waiter = async {
147 while !stop_signal_clone.load(Ordering::Relaxed) {
148 tokio::time::sleep(Duration::from_millis(100)).await;
149 }
150 };
151
152 tokio::select! {
153 result = listener.accept() => {
154 match result {
155 Ok((stream, _addr)) => self.handle_connection(stream).await,
156 Err(ref e)
157 if [std::io::ErrorKind::WouldBlock, std::io::ErrorKind::TimedOut]
158 .contains(&e.kind()) =>
159 {
160 continue
161 }
162 Err(e) => warn!("WCP Connection failed: {e}"),
163 }
164 }
165
166 _ = stop_signal_waiter => {
167 break;
168 }
169 }
170 self.greeted_signal.store(false, Ordering::Relaxed);
171 }
172 }
173
174 async fn initiate(&mut self) {
175 let stream = self.stream.take().unwrap();
176 match self.handle_client(stream).await {
177 Err(error) => warn!("WCP Client disconnected with error: {error:#?}"),
178 Ok(()) => info!("WCP client disconnected"),
179 }
180 }
181
182 async fn handle_connection(&mut self, stream: TcpStream) {
183 info!("WCP New connection: {}", stream.peer_addr().unwrap());
184
185 match self.handle_client(stream).await {
187 Err(error) => warn!("WCP Client disconnected with error: {error:#?}"),
188 Ok(()) => info!("WCP client disconnected"),
189 }
190 }
191
192 async fn send_message<M: Serialize>(&mut self, stream: &mut WriteHalf<'_>, message: &M) {
193 match serde_json::to_string(message) {
194 Ok(message) => {
195 if let Err(error) = stream.write_all(message.as_bytes()).await {
196 warn!("WCP Sending of message failed: {error:#?}")
197 }
198 }
199 Err(error) => warn!("Serializing message failed: {error:#?}"),
200 }
201 if let Err(e) = stream.write_all(b"\0").await {
202 warn!("Failed to send WCP message: {e:#?}");
203 }
204 if let Err(e) = stream.flush().await {
205 warn!("Failed to send WCP message: {e:#?}");
206 }
207 }
208
209 async fn handle_client(&mut self, mut stream: TcpStream) -> Result<(), serde_Error> {
210 let (reader, mut writer) = stream.split();
211 let mut reader = WcpCSReader::new(reader);
212
213 loop {
214 let stop_signal_clone = self.stop_signal.clone();
215 let stop_signal_waiter = async {
216 while !stop_signal_clone.load(Ordering::Relaxed) {
217 tokio::time::sleep(Duration::from_millis(100)).await;
218 }
219 };
220
221 tokio::select! {
222 msg = reader.read_frame() => {
223 let msg = match msg? {
224 Some(msg) => msg,
225 None => continue,
226 };
227
228 if let WcpCSMessage::command(WcpCommand::shutdowmn) = msg {
229 return Ok(());
230 }
231
232 if let Err(e) = self.sender.send(msg).await {
233 error!("Failed to send wcp message into main thread {e}")
234 };
235
236 if let Some(ctx) = &self.ctx {
238 ctx.request_repaint();
239 }
240 }
241
242 Some(s2c) = self.receiver.next() => {
243 self.send_message(&mut writer, &s2c).await;
244 }
245
246 _ = stop_signal_waiter => {
247 return Err(serde_Error::io(std::io::Error::new(
248 std::io::ErrorKind::ConnectionAborted,
249 "Server terminated",
250 )));
251 }
252 }
253 }
254 }
255}