1use bincode::Options;
3use eyre::{Result, WrapErr as _, anyhow, bail};
4use http_body_util::Full;
5use hyper::body::Bytes;
6use hyper::server::conn::http1;
7use hyper::service::service_fn;
8use hyper::{Request, Response, StatusCode};
9use hyper_util::rt::TokioIo;
10use std::collections::HashMap;
11use std::fs;
12use std::iter::repeat_with;
13use std::net::SocketAddr;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::mpsc::Sender;
16use std::sync::{Arc, RwLock};
17use std::time::{Instant, SystemTime};
18use tokio::net::TcpListener;
19use tokio::sync::Notify;
20use tracing::{error, info, warn};
21use wellen::{
22 CompressedSignal, CompressedTimeTable, FileFormat, Hierarchy, Signal, SignalRef, Time, viewers,
23};
24
25use crate::{
26 BINCODE_OPTIONS, HTTP_SERVER_KEY, HTTP_SERVER_VALUE_SURFER, SURFER_VERSION, SurverFileInfo,
27 SurverStatus, WELLEN_SURFER_DEFAULT_OPTIONS, WELLEN_VERSION, X_SURFER_VERSION,
28 X_WELLEN_VERSION, modification_time_string,
29};
30
31struct ReadOnly {
32 url: String,
33 token: String,
34}
35
36struct FileInfo {
37 filename: String,
38 hierarchy: Arc<Hierarchy>,
39 file_format: FileFormat,
40 header_len: u64,
41 body_len: u64,
42 body_progress: Arc<AtomicU64>,
43 notify: Arc<Notify>,
44 timetable: Vec<Time>,
45 signals: HashMap<SignalRef, Signal>,
46 reloading: bool,
47 requested_in_session: bool,
48 last_reload_ok: bool,
49 last_reload_time: Option<Instant>,
50 last_modification_time: Option<SystemTime>,
51}
52
53#[derive(Default)]
54struct SurverState {
55 file_infos: Vec<FileInfo>,
56}
57
58impl FileInfo {
59 fn modification_time_string(&self) -> String {
60 modification_time_string(self.last_modification_time)
61 }
62
63 fn reload_time_string(&self) -> String {
64 if let Some(time) = self.last_reload_time {
65 return format!("{:?} ago", time.elapsed());
66 }
67 "never".to_string()
68 }
69
70 pub fn html_table_line(&self) -> String {
71 let bytes_loaded = self.body_progress.load(Ordering::SeqCst);
72
73 let progress = if bytes_loaded == self.body_len {
74 format!(
75 "{} loaded",
76 bytesize::ByteSize::b(self.body_len + self.header_len)
77 )
78 } else {
79 format!(
80 "{} / {}",
81 bytesize::ByteSize::b(bytes_loaded + self.header_len),
82 bytesize::ByteSize::b(self.body_len + self.header_len)
83 )
84 };
85
86 format!(
87 "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>",
88 self.filename,
89 progress,
90 self.modification_time_string(),
91 self.reload_time_string()
92 )
93 }
94}
95
96impl From<&FileInfo> for SurverFileInfo {
97 fn from(file_info: &FileInfo) -> Self {
98 Self {
99 bytes: file_info.body_len + file_info.header_len,
100 bytes_loaded: file_info.body_progress.load(Ordering::SeqCst) + file_info.header_len,
101 filename: file_info.filename.clone(),
102 format: file_info.file_format,
103 reloading: file_info.reloading,
104 last_load_ok: file_info.last_reload_ok,
105 last_modification_time: file_info.last_modification_time,
106 }
107 }
108}
109enum LoaderMessage {
110 SignalRequest(SignalRequest),
111 Reload,
112}
113
114type SignalRequest = Vec<SignalRef>;
115
116fn get_info_page(shared: &Arc<ReadOnly>, state: &Arc<RwLock<SurverState>>) -> String {
117 let state_guard = state.read().expect("State lock poisoned in get_info_page");
118 let html_table_content = state_guard
119 .file_infos
120 .iter()
121 .map(FileInfo::html_table_line)
122 .collect::<Vec<_>>()
123 .join("\n");
124 drop(state_guard);
125
126 format!(
127 r#"
128 <!DOCTYPE html><html lang="en">
129 <head>
130 <link rel="icon" href="favicon.ico" sizes="any">
131 <title>Surver - Surfer Remote Server</title>
132 </head>
133 <body>
134 <h1>Surver - Surfer Remote Server</h1>
135 <b>To connect, run:</b> <code>surfer {}</code><br>
136 <b>Wellen version:</b> {WELLEN_VERSION}<br>
137 <b>Surfer version:</b> {SURFER_VERSION}<br>
138 <table border="1" cellpadding="5" cellspacing="0">
139 <tr><th>Filename</th><th>Load progress</th><th>File modification time</th><th>(Re)load time</th></tr>
140 {}
141 </table>
142 </body></html>
143 "#,
144 shared.url, html_table_content
145 )
146}
147
148fn get_hierarchy(state: &Arc<RwLock<SurverState>>, file_index: usize) -> Result<Vec<u8>> {
149 let state_guard = state.read().expect("State lock poisoned in get_hierarchy");
150 let file_info = &state_guard.file_infos[file_index];
151 let mut raw = BINCODE_OPTIONS.serialize(&file_info.file_format)?;
152 let mut raw2 = BINCODE_OPTIONS.serialize(file_info.hierarchy.as_ref())?;
153 drop(state_guard);
154 raw.append(&mut raw2);
155 let compressed = lz4_flex::compress_prepend_size(&raw);
156 info!(
157 "Sending hierarchy. {} raw, {} compressed.",
158 bytesize::ByteSize::b(raw.len() as u64),
159 bytesize::ByteSize::b(compressed.len() as u64)
160 );
161 Ok(compressed)
162}
163
164async fn get_timetable(state: &Arc<RwLock<SurverState>>, file_index: usize) -> Result<Vec<u8>> {
165 let notify = {
166 let state_guard = state.read().expect("State lock poisoned in get_timetable");
167 state_guard.file_infos[file_index].notify.clone()
168 };
169
170 let table = loop {
172 {
173 let state_guard = state.read().expect("State lock poisoned in get_timetable");
174 let timetable = &state_guard.file_infos[file_index].timetable;
175 if !timetable.is_empty() {
176 break timetable.clone();
177 }
178 }
179
180 notify.notified().await;
181 };
182
183 let raw_size = table.len() * std::mem::size_of::<Time>();
184 let compressed = BINCODE_OPTIONS.serialize(&CompressedTimeTable::compress(&table))?;
185 info!(
186 "Sending timetable. {} raw, {} compressed.",
187 bytesize::ByteSize::b(raw_size as u64),
188 bytesize::ByteSize::b(compressed.len() as u64)
189 );
190 Ok(compressed)
191}
192
193fn get_status(state: &Arc<RwLock<SurverState>>) -> Result<Vec<u8>> {
194 let state_guard = state.read().expect("State lock poisoned in get_status");
195 let file_infos = state_guard
196 .file_infos
197 .iter()
198 .map(SurverFileInfo::from)
199 .collect::<Vec<_>>();
200 drop(state_guard);
201 let status = SurverStatus {
202 wellen_version: WELLEN_VERSION.to_string(),
203 surfer_version: SURFER_VERSION.to_string(),
204 file_infos,
205 };
206 Ok(serde_json::to_vec(&status)?)
207}
208
209async fn get_signals(
210 state: &Arc<RwLock<SurverState>>,
211 file_index: usize,
212 txs: &[Sender<LoaderMessage>],
213 id_strings: &[&str],
214) -> Result<Vec<u8>> {
215 let ids = id_strings
216 .iter()
217 .map(|id_str| {
218 id_str
219 .parse::<u64>()
220 .map_err(|e| anyhow!("Failed to parse signal id `{id_str}`: {e:#}"))
221 .and_then(|index| {
222 SignalRef::from_index(index as usize)
223 .ok_or_else(|| anyhow!("Invalid signal index: {}", index))
224 })
225 })
226 .collect::<Result<Vec<SignalRef>>>()?;
227
228 if ids.is_empty() {
229 return Ok(vec![]);
230 }
231 let num_ids = ids.len();
232
233 txs[file_index].send(LoaderMessage::SignalRequest(ids.clone()))?;
235
236 let notify = {
237 let state_guard = state.read().expect("State lock poisoned in get_signals");
238 state_guard.file_infos[file_index].notify.clone()
239 };
240
241 let mut data = vec![];
243 leb128::write::unsigned(&mut data, num_ids as u64)?;
244 let mut raw_size = 0;
245 loop {
246 {
247 let state_guard = state.read().expect("State lock poisoned in get_signals");
248 if ids
249 .iter()
250 .all(|id| state_guard.file_infos[file_index].signals.contains_key(id))
251 {
252 for id in ids {
253 let signal = &state_guard.file_infos[file_index].signals[&id];
254 raw_size += BINCODE_OPTIONS.serialize(signal)?.len();
255 let comp = CompressedSignal::compress(signal);
256 data.append(&mut BINCODE_OPTIONS.serialize(&comp)?);
257 }
258 break;
259 }
260 };
261 notify.notified().await;
263 }
264 info!(
265 "Sending {} signals. {} raw, {} compressed.",
266 num_ids,
267 bytesize::ByteSize::b(raw_size as u64),
268 bytesize::ByteSize::b(data.len() as u64)
269 );
270 Ok(data)
271}
272
273const CONTENT_TYPE: &str = "Content-Type";
274const JSON_MIME: &str = "application/json";
275const OCTET_MIME: &str = "application/octet-stream";
276const HTML_MIME: &str = "text/html; charset=utf-8";
277
278trait DefaultHeader {
279 fn default_header(self) -> Self;
280}
281
282impl DefaultHeader for hyper::http::response::Builder {
283 fn default_header(self) -> Self {
284 self.header(HTTP_SERVER_KEY, HTTP_SERVER_VALUE_SURFER)
285 .header(X_WELLEN_VERSION, WELLEN_VERSION)
286 .header(X_SURFER_VERSION, SURFER_VERSION)
287 .header("Cache-Control", "no-cache")
288 }
289}
290
291fn build_response(
292 status: StatusCode,
293 content_type: &str,
294 body: Vec<u8>,
295) -> Result<Response<Full<Bytes>>> {
296 Ok(Response::builder()
297 .status(status)
298 .header(CONTENT_TYPE, content_type)
299 .default_header()
300 .body(Full::from(body))?)
301}
302
303fn not_found_response(message: &[u8]) -> Result<Response<Full<Bytes>>> {
304 build_response(StatusCode::NOT_FOUND, OCTET_MIME, message.to_vec())
305}
306
307fn mark_file_requested(state: &Arc<RwLock<SurverState>>, file_index: usize) {
308 let mut state_guard = state
309 .write()
310 .expect("State lock poisoned in request tracking");
311 state_guard.file_infos[file_index].requested_in_session = true;
312}
313
314fn handle_reload_cmd(
315 state: &Arc<RwLock<SurverState>>,
316 txs: &[Sender<LoaderMessage>],
317 file_index: usize,
318) -> Result<Response<Full<Bytes>>> {
319 let mtime = {
320 let state_guard = state
321 .read()
322 .expect("State lock poisoned in reload before metadata");
323 let Ok(meta) = fs::metadata(&state_guard.file_infos[file_index].filename) else {
325 return not_found_response(b"error: file not found");
326 };
327 meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH)
328 };
329
330 let mut state_guard = state.write().expect("State lock poisoned in reload");
331 let file_info = &mut state_guard.file_infos[file_index];
332
333 let unchanged = file_info.last_modification_time == Some(mtime) && file_info.last_reload_ok;
335 if unchanged {
336 if file_info.requested_in_session {
337 drop(state_guard);
338 return build_response(
339 StatusCode::NOT_MODIFIED,
340 JSON_MIME,
341 b"info: file unchanged".to_vec(),
342 );
343 }
344 file_info.requested_in_session = true;
347 drop(state_guard);
348 let body = get_status(state)?;
349 return build_response(StatusCode::ACCEPTED, JSON_MIME, body);
350 }
351 file_info.requested_in_session = true;
352 file_info.last_modification_time = Some(mtime);
353 info!(
354 "File modification time updated to {}",
355 file_info.modification_time_string()
356 );
357 file_info.reloading = true;
358 file_info.last_reload_ok = false;
359 drop(state_guard);
360 info!("Reload requested");
361 txs[file_index].send(LoaderMessage::Reload)?;
362 let body = get_status(state)?;
363 build_response(StatusCode::ACCEPTED, JSON_MIME, body)
364}
365
366async fn handle_cmd(
367 state: &Arc<RwLock<SurverState>>,
368 txs: &[Sender<LoaderMessage>],
369 cmd: &str,
370 file_index: Option<usize>,
371 args: &[&str],
372) -> Result<Response<Full<Bytes>>> {
373 if let Some(file_index) = file_index {
375 let state_guard = state.read().expect("State lock poisoned in handle_cmd");
376 if file_index >= state_guard.file_infos.len() {
377 drop(state_guard);
378 return not_found_response(b"Invalid file index");
379 }
380 }
381 match (file_index, cmd, args) {
382 (_, "get_status", []) => {
383 let body = get_status(state)?;
384 build_response(StatusCode::OK, JSON_MIME, body)
385 }
386 (Some(file_index), "get_hierarchy", []) => {
387 mark_file_requested(state, file_index);
388 let body = get_hierarchy(state, file_index)?;
389 build_response(StatusCode::OK, OCTET_MIME, body)
390 }
391 (Some(file_index), "get_time_table", []) => {
392 mark_file_requested(state, file_index);
393 let body = get_timetable(state, file_index).await?;
394 build_response(StatusCode::OK, OCTET_MIME, body)
395 }
396 (Some(file_index), "get_signals", id_strings) => {
397 mark_file_requested(state, file_index);
398 let body = get_signals(state, file_index, txs, id_strings).await?;
399 build_response(StatusCode::OK, OCTET_MIME, body)
400 }
401 (Some(file_index), "reload", []) => handle_reload_cmd(state, txs, file_index),
402 _ => {
403 not_found_response(&[])
405 }
406 }
407}
408
409async fn handle(
410 state: Arc<RwLock<SurverState>>,
411 shared: Arc<ReadOnly>,
412 txs: Vec<Sender<LoaderMessage>>,
413 req: Request<hyper::body::Incoming>,
414) -> Result<Response<Full<Bytes>>> {
415 if req.uri().path() == "/favicon.ico" {
417 let favicon_data = include_bytes!("../assets/favicon.ico");
418 return Ok(Response::builder()
419 .status(StatusCode::OK)
420 .header("Content-Type", "image/x-icon")
421 .header("Cache-Control", "public, max-age=604800")
422 .body(Full::from(&favicon_data[..]))?);
423 }
424 let path_parts = req.uri().path().split('/').skip(1).collect::<Vec<_>>();
426
427 if let Some(provided_token) = path_parts.first() {
429 if *provided_token != shared.token {
430 warn!(
431 "Received request with invalid token: {provided_token} != {}\n{:?}",
432 shared.token,
433 req.uri()
434 );
435 return not_found_response(&[]);
436 }
437 } else {
438 warn!("Received request with no token: {:?}", req.uri());
440 return not_found_response(&[]);
441 }
442
443 let (file_index, cmd_idx) = path_parts
445 .get(1)
446 .and_then(|s| s.parse::<usize>().ok())
447 .map_or((None, 1), |idx| (Some(idx), 2));
448 let response = if let Some(cmd) = path_parts.get(cmd_idx) {
450 handle_cmd(&state, &txs, cmd, file_index, &path_parts[cmd_idx + 1..]).await?
451 } else {
452 let body = Full::from(get_info_page(&shared, &state));
454 Response::builder()
455 .status(StatusCode::OK)
456 .header(CONTENT_TYPE, HTML_MIME)
457 .default_header()
458 .body(body)?
459 };
460
461 Ok(response)
462}
463
464const MIN_TOKEN_LEN: usize = 8;
465const RAND_TOKEN_LEN: usize = 24;
466
467pub type ServerStartedFlag = Arc<std::sync::atomic::AtomicBool>;
468
469pub async fn surver_main(
470 port: u16,
471 bind_address: String,
472 token: Option<String>,
473 filenames: &[String],
474 started: Option<ServerStartedFlag>,
475) -> Result<()> {
476 let token = token.unwrap_or_else(|| {
478 repeat_with(fastrand::alphanumeric)
480 .take(RAND_TOKEN_LEN)
481 .collect()
482 });
483
484 if token.len() < MIN_TOKEN_LEN {
485 bail!("Token `{token}` is too short. At least {MIN_TOKEN_LEN} characters are required!");
486 }
487
488 let state = Arc::new(RwLock::new(SurverState { file_infos: vec![] }));
489
490 let mut txs: Vec<Sender<LoaderMessage>> = Vec::new();
491 for (file_index, filename) in filenames.iter().enumerate() {
493 let start_read_header = web_time::Instant::now();
494 let header_result = wellen::viewers::read_header_from_file(
495 filename.clone(),
496 &WELLEN_SURFER_DEFAULT_OPTIONS,
497 )
498 .map_err(|e| anyhow!("{e:?}"))
499 .with_context(|| format!("Failed to parse wave file: {filename}"))?;
500 info!(
501 "Loaded header of {filename} in {:?}",
502 start_read_header.elapsed()
503 );
504
505 let file_info = FileInfo {
506 filename: filename.clone(),
507 hierarchy: Arc::new(header_result.hierarchy),
508 file_format: header_result.file_format,
509 header_len: 0, body_len: header_result.body_len,
511 body_progress: Arc::new(AtomicU64::new(0)),
512 notify: Arc::new(Notify::new()),
513 timetable: vec![],
514 signals: HashMap::new(),
515 reloading: false,
516 requested_in_session: false,
517 last_reload_ok: true,
518 last_reload_time: None,
519 last_modification_time: None,
520 };
521 {
522 let mut state_guard = state.write().expect("State lock poisoned when adding file");
523 state_guard.file_infos.push(file_info);
524 }
525 let (tx, rx) = std::sync::mpsc::channel::<LoaderMessage>();
527 txs.push(tx.clone());
528 let state_2 = state.clone();
530 std::thread::spawn(move || loader(&state_2, header_result.body, file_index, &rx));
531 }
532 let ip_addr: std::net::IpAddr = bind_address
533 .parse()
534 .with_context(|| format!("Invalid bind address: {bind_address}"))?;
535 let use_localhost = ip_addr.is_loopback();
536 if !use_localhost {
537 warn!(
538 "Server is binding to {bind_address} instead of 127.0.0.1/0:0:0:0:0:0:0:1 (localhost)"
539 );
540 warn!("This may make the server accessible from external networks");
541 warn!("Surver traffic is unencrypted and unauthenticated - use with caution!");
542 }
543
544 let addr = SocketAddr::new(ip_addr, port);
546 let url = format!("http://{addr}/{token}");
547 let url_copy = url.clone();
548 let token_copy = token.clone();
549 let shared = Arc::new(ReadOnly { url, token });
550
551 info!("Starting server on {addr}. To use:");
553 info!("1. Setup an ssh tunnel: -L {port}:localhost:{port}");
554 let hostname = whoami::hostname();
555 if let Ok(hostname) = hostname.as_ref()
556 && hostname != "localhost"
557 && let Ok(username) = whoami::username()
558 {
559 info!(
560 " The correct command may be: ssh -L {port}:localhost:{port} {username}@{hostname} "
561 );
562 }
563
564 info!("2. Start Surfer: surfer {url_copy} ");
565 if !use_localhost && let Ok(hostname) = hostname {
566 let hosturl = format!("http://{hostname}:{port}/{token_copy}");
567 info!("or, if the host is directly accessible:");
568 info!("1. Start Surfer: surfer {hosturl} ");
569 }
570 let listener = TcpListener::bind(&addr).await?;
572
573 if let Some(started) = started {
575 started.store(true, Ordering::SeqCst);
576 }
577
578 loop {
580 let (stream, _) = listener.accept().await?;
581 let io = TokioIo::new(stream);
582
583 let state = state.clone();
584 let shared = shared.clone();
585 let txs = txs.clone();
586 tokio::task::spawn(async move {
587 let service =
588 service_fn(move |req| handle(state.clone(), shared.clone(), txs.clone(), req));
589 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
590 error!("server error: {e}");
591 }
592 });
593 }
594}
595
596fn loader(
598 state: &Arc<RwLock<SurverState>>,
599 mut body_cont: viewers::ReadBodyContinuation<std::io::BufReader<std::fs::File>>,
600 file_index: usize,
601 rx: &std::sync::mpsc::Receiver<LoaderMessage>,
602) -> Result<()> {
603 loop {
604 let start_load_body = web_time::Instant::now();
606 let (filename, hierarchy, body_progress) = {
607 let state_guard = state
608 .read()
609 .expect("State lock poisoned in loader before body load");
610 let file_info = &state_guard.file_infos[file_index];
611 (
612 file_info.filename.clone(),
613 file_info.hierarchy.clone(),
614 file_info.body_progress.clone(),
615 )
616 };
617
618 let body_result = viewers::read_body(body_cont, &hierarchy, Some(body_progress))
620 .map_err(|e| anyhow!("{e:?}"))
621 .with_context(|| format!("Failed to parse body of wave file: {filename}"))?;
622
623 info!(
624 "Loaded body of {} in {:?}",
625 filename,
626 start_load_body.elapsed()
627 );
628
629 {
631 let mut state_guard = state
632 .write()
633 .expect("State lock poisoned in loader after body load");
634 let file_info = &mut state_guard.file_infos[file_index];
635 file_info.timetable = body_result.time_table;
636 file_info.signals.clear(); if let Ok(meta) = fs::metadata(&file_info.filename) {
638 file_info.last_modification_time = Some(meta.modified()?);
639 info!(
640 "File modification time of {} set to {}",
641 filename,
642 file_info.modification_time_string()
643 );
644 }
645 file_info.last_reload_time = Some(Instant::now());
646 file_info.reloading = false;
647 file_info.last_reload_ok = true;
648 file_info.notify.notify_waiters();
649 }
650 let mut source = body_result.source;
652
653 loop {
655 let msg = rx.recv()?;
656
657 match msg {
658 LoaderMessage::SignalRequest(ids) => {
659 let mut filtered_ids = {
661 let state_guard = state
662 .read()
663 .expect("State lock poisoned in loader signal request");
664 ids.iter()
665 .filter(|id| {
666 !state_guard.file_infos[file_index].signals.contains_key(id)
667 })
668 .copied()
669 .collect::<Vec<_>>()
670 };
671
672 if filtered_ids.is_empty() {
674 continue;
675 }
676
677 filtered_ids.sort();
679 filtered_ids.dedup();
680 let result = {
681 let state_guard = state
682 .read()
683 .expect("State lock poisoned in loader signal request");
684 source.load_signals(
685 &filtered_ids,
686 &state_guard.file_infos[file_index].hierarchy,
687 true,
688 )
689 };
690
691 {
693 let mut state_guard = state
694 .write()
695 .expect("State lock poisoned in loader when storing signals");
696 for signal in result {
697 state_guard.file_infos[file_index]
698 .signals
699 .insert(signal.signal_ref(), signal);
700 }
701 state_guard.file_infos[file_index].notify.notify_waiters();
702 }
703 }
704 LoaderMessage::Reload => {
705 let state_guard = state
706 .read()
707 .expect("State lock poisoned in loader before reload");
708 info!(
709 "Reloading waveform file: {}",
710 state_guard.file_infos[file_index].filename
711 );
712 state_guard.file_infos[file_index]
714 .body_progress
715 .store(0, Ordering::SeqCst);
716
717 let header_result = wellen::viewers::read_header_from_file(
719 state_guard.file_infos[file_index].filename.clone(),
720 &WELLEN_SURFER_DEFAULT_OPTIONS,
721 )
722 .map_err(|e| anyhow!("{e:?}"))
723 .with_context(|| {
724 format!(
725 "Failed to reload wave file: {}",
726 state_guard.file_infos[file_index].filename
727 )
728 })?;
729
730 body_cont = header_result.body;
731 break; }
733 }
734 }
735 }
736}