1use bincode::Options;
3use eyre::{Context, Result, anyhow, bail};
4use http_body_util::Full;
5use hyper::body::Bytes;
6use hyper::server::conn::http1;
7use hyper::service::service_fn;
8use hyper::{Request, Response, StatusCode};
9use hyper_util::rt::TokioIo;
10use std::collections::HashMap;
11use std::fs;
12use std::iter::repeat_with;
13use std::net::SocketAddr;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::mpsc::Sender;
16use std::sync::{Arc, RwLock};
17use std::time::{Instant, SystemTime};
18use tokio::net::TcpListener;
19use tokio::sync::Notify;
20use tracing::{error, info, warn};
21use wellen::{
22 CompressedSignal, CompressedTimeTable, FileFormat, Hierarchy, Signal, SignalRef, Time, viewers,
23};
24
25use crate::{
26 BINCODE_OPTIONS, HTTP_SERVER_KEY, HTTP_SERVER_VALUE_SURFER, SURFER_VERSION, SurverFileInfo,
27 SurverStatus, WELLEN_SURFER_DEFAULT_OPTIONS, WELLEN_VERSION, X_SURFER_VERSION,
28 X_WELLEN_VERSION,
29};
30
31struct ReadOnly {
32 url: String,
33 token: String,
34}
35
36struct FileInfo {
37 filename: String,
38 hierarchy: Hierarchy,
39 file_format: FileFormat,
40 header_len: u64,
41 body_len: u64,
42 body_progress: Arc<AtomicU64>,
43 notify: Arc<Notify>,
44 timetable: Vec<Time>,
45 signals: HashMap<SignalRef, Signal>,
46 reloading: bool,
47 last_reload_ok: bool,
48 last_reload_time: Option<Instant>,
49 last_file_mtime: Option<SystemTime>,
50}
51
52#[derive(Default)]
53struct SurverState {
54 file_infos: Vec<FileInfo>,
55}
56
57impl FileInfo {
58 pub fn modification_time_string(&self) -> String {
59 if let Some(mtime) = self.last_file_mtime {
60 let dur = mtime
61 .duration_since(std::time::UNIX_EPOCH)
62 .unwrap_or_default();
63 return chrono::DateTime::<chrono::Utc>::from_timestamp(
64 dur.as_secs() as i64,
65 dur.subsec_nanos(),
66 )
67 .map_or_else(
68 || "Incorrect timestamp".to_string(),
69 |dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
70 );
71 }
72 "unknown".to_string()
73 }
74
75 pub fn reload_time_string(&self) -> String {
76 if let Some(time) = self.last_reload_time {
77 return format!("{:?} ago", time.elapsed());
78 }
79 "never".to_string()
80 }
81
82 pub fn html_table_line(&self) -> String {
83 let bytes_loaded = self.body_progress.load(Ordering::SeqCst);
84
85 let progress = if bytes_loaded == self.body_len {
86 format!(
87 "{} loaded",
88 bytesize::ByteSize::b(self.body_len + self.header_len)
89 )
90 } else {
91 format!(
92 "{} / {}",
93 bytesize::ByteSize::b(bytes_loaded + self.header_len),
94 bytesize::ByteSize::b(self.body_len + self.header_len)
95 )
96 };
97
98 format!(
99 "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>",
100 self.filename,
101 progress,
102 self.modification_time_string(),
103 self.reload_time_string()
104 )
105 }
106}
107
108enum LoaderMessage {
109 SignalRequest(SignalRequest),
110 Reload,
111}
112
113type SignalRequest = Vec<SignalRef>;
114
115fn get_info_page(shared: &Arc<ReadOnly>, state: &Arc<RwLock<SurverState>>) -> String {
116 let state_guard = state.read().expect("State lock poisoned in get_info_page");
117 let html_table_content = state_guard
118 .file_infos
119 .iter()
120 .map(FileInfo::html_table_line)
121 .collect::<Vec<_>>()
122 .join("\n");
123 drop(state_guard);
124
125 format!(
126 r#"
127 <!DOCTYPE html><html lang="en">
128 <head>
129 <link rel="icon" href="favicon.ico" sizes="any">
130 <title>Surver - Surfer Remote Server</title>
131 </head>
132 <body>
133 <h1>Surver - Surfer Remote Server</h1>
134 <b>To connect, run:</b> <code>surfer {}</code><br>
135 <b>Wellen version:</b> {WELLEN_VERSION}<br>
136 <b>Surfer version:</b> {SURFER_VERSION}<br>
137 <table border="1" cellpadding="5" cellspacing="0">
138 <tr><th>Filename</th><th>Load progress</th><th>File modification time</th><th>(Re)load time</th></tr>
139 {}
140 </table>
141 </body></html>
142 "#,
143 shared.url, html_table_content
144 )
145}
146
147fn get_hierarchy(state: &Arc<RwLock<SurverState>>, file_index: usize) -> Result<Vec<u8>> {
148 let state_guard = state.read().expect("State lock poisoned in get_hierarchy");
149 let file_info = &state_guard.file_infos[file_index];
150 let mut raw = BINCODE_OPTIONS.serialize(&file_info.file_format)?;
151 let mut raw2 = BINCODE_OPTIONS.serialize(&file_info.hierarchy)?;
152 drop(state_guard);
153 raw.append(&mut raw2);
154 let compressed = lz4_flex::compress_prepend_size(&raw);
155 info!(
156 "Sending hierarchy. {} raw, {} compressed.",
157 bytesize::ByteSize::b(raw.len() as u64),
158 bytesize::ByteSize::b(compressed.len() as u64)
159 );
160 Ok(compressed)
161}
162
163async fn get_timetable(state: &Arc<RwLock<SurverState>>, file_index: usize) -> Result<Vec<u8>> {
164 #[allow(unused_assignments)]
166 let mut table = vec![];
167 loop {
168 {
169 let state = state.read().unwrap();
170 if !state.file_infos[file_index].timetable.is_empty() {
171 table.clone_from(&state.file_infos[file_index].timetable);
172 break;
173 }
174 }
175 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
176 }
177 let raw_size = table.len() * std::mem::size_of::<Time>();
178 let compressed = BINCODE_OPTIONS.serialize(&CompressedTimeTable::compress(&table))?;
179 info!(
180 "Sending timetable. {} raw, {} compressed.",
181 bytesize::ByteSize::b(raw_size as u64),
182 bytesize::ByteSize::b(compressed.len() as u64)
183 );
184 Ok(compressed)
185}
186
187fn get_status(state: &Arc<RwLock<SurverState>>) -> Result<Vec<u8>> {
188 let state_guard = state.read().expect("State lock poisoned in get_status");
189 let mut file_infos = Vec::new();
190 for file_info in &state_guard.file_infos {
191 file_infos.push(SurverFileInfo {
192 bytes: file_info.body_len + file_info.header_len,
193 bytes_loaded: file_info.body_progress.load(Ordering::SeqCst) + file_info.header_len,
194 filename: file_info.filename.clone(),
195 format: file_info.file_format,
196 reloading: file_info.reloading,
197 last_load_ok: file_info.last_reload_ok,
198 last_load_time: file_info.last_reload_time.map(|t| t.elapsed().as_secs()),
199 });
200 }
201 drop(state_guard);
202 let status = SurverStatus {
203 wellen_version: WELLEN_VERSION.to_string(),
204 surfer_version: SURFER_VERSION.to_string(),
205 file_infos,
206 };
207 Ok(serde_json::to_vec(&status)?)
208}
209
210async fn get_signals(
211 state: &Arc<RwLock<SurverState>>,
212 file_index: usize,
213 txs: &[Sender<LoaderMessage>],
214 id_strings: &[&str],
215) -> Result<Vec<u8>> {
216 let mut ids = Vec::with_capacity(id_strings.len());
217 for id in id_strings {
218 let index = id.parse::<u64>()? as usize;
219 let signal_ref = SignalRef::from_index(index)
220 .ok_or_else(|| anyhow!("Invalid signal index: {}", index))?;
221 ids.push(signal_ref);
222 }
223
224 if ids.is_empty() {
225 return Ok(vec![]);
226 }
227 let num_ids = ids.len();
228
229 txs[file_index].send(LoaderMessage::SignalRequest(ids.clone()))?;
231
232 let notify = {
233 let state_guard = state.read().expect("State lock poisoned in get_signals");
234 state_guard.file_infos[file_index].notify.clone()
235 };
236
237 let mut data = vec![];
239 leb128::write::unsigned(&mut data, num_ids as u64)?;
240 let mut raw_size = 0;
241 loop {
242 {
243 let state_guard = state.read().expect("State lock poisoned in get_signals");
244 if ids
245 .iter()
246 .all(|id| state_guard.file_infos[file_index].signals.contains_key(id))
247 {
248 for id in ids {
249 let signal = &state_guard.file_infos[file_index].signals[&id];
250 raw_size += BINCODE_OPTIONS.serialize(signal)?.len();
251 let comp = CompressedSignal::compress(signal);
252 data.append(&mut BINCODE_OPTIONS.serialize(&comp)?);
253 }
254 break;
255 }
256 };
257 notify.notified().await;
259 }
260 info!(
261 "Sending {} signals. {} raw, {} compressed.",
262 num_ids,
263 bytesize::ByteSize::b(raw_size as u64),
264 bytesize::ByteSize::b(data.len() as u64)
265 );
266 Ok(data)
267}
268
269const CONTENT_TYPE: &str = "Content-Type";
270const JSON_MIME: &str = "application/json";
271const OCTET_MIME: &str = "application/octet-stream";
272const HTML_MIME: &str = "text/html; charset=utf-8";
273
274trait DefaultHeader {
275 fn default_header(self) -> Self;
276}
277
278impl DefaultHeader for hyper::http::response::Builder {
279 fn default_header(self) -> Self {
280 self.header(HTTP_SERVER_KEY, HTTP_SERVER_VALUE_SURFER)
281 .header(X_WELLEN_VERSION, WELLEN_VERSION)
282 .header(X_SURFER_VERSION, SURFER_VERSION)
283 .header("Cache-Control", "no-cache")
284 }
285}
286
287async fn handle_cmd(
288 state: &Arc<RwLock<SurverState>>,
289 txs: &[Sender<LoaderMessage>],
290 cmd: &str,
291 file_index: Option<usize>,
292 args: &[&str],
293) -> Result<Response<Full<Bytes>>> {
294 let response = match (file_index, cmd, args) {
295 (_, "get_status", []) => {
296 let body = get_status(state)?;
297 Response::builder()
298 .status(StatusCode::OK)
299 .header(CONTENT_TYPE, JSON_MIME)
300 .default_header()
301 .body(Full::from(body))
302 }
303 (Some(file_index), "get_hierarchy", []) => {
304 let body = get_hierarchy(state, file_index)?;
305 Response::builder()
306 .status(StatusCode::OK)
307 .header(CONTENT_TYPE, OCTET_MIME)
308 .default_header()
309 .body(Full::from(body))
310 }
311 (Some(file_index), "get_time_table", []) => {
312 let body = get_timetable(state, file_index).await?;
313 Response::builder()
314 .status(StatusCode::OK)
315 .header(CONTENT_TYPE, OCTET_MIME)
316 .default_header()
317 .body(Full::from(body))
318 }
319 (Some(file_index), "get_signals", id_strings) => {
320 let body = get_signals(state, file_index, txs, id_strings).await?;
321 Response::builder()
322 .status(StatusCode::OK)
323 .header(CONTENT_TYPE, OCTET_MIME)
324 .default_header()
325 .body(Full::from(body))
326 }
327 (Some(file_index), "reload", []) => {
328 let mut state_guard = state.write().expect("State lock poisoned in reload");
329 let Ok(meta) = fs::metadata(state_guard.file_infos[file_index].filename.clone()) else {
331 drop(state_guard);
332 return Ok(Response::builder()
333 .status(StatusCode::NOT_FOUND)
334 .header(CONTENT_TYPE, JSON_MIME)
335 .default_header()
336 .body(Full::from(b"error: file not found".to_vec()))?);
337 };
338 let mtime = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH);
339 let unchanged = state_guard.file_infos[file_index].last_file_mtime == Some(mtime)
341 && state_guard.file_infos[file_index].last_reload_ok;
342 if unchanged {
343 drop(state_guard);
344 return Ok(Response::builder()
345 .status(StatusCode::NOT_MODIFIED)
346 .header(CONTENT_TYPE, JSON_MIME)
347 .default_header()
348 .body(Full::from(b"info: file unchanged".to_vec()))?);
349 }
350 state_guard.file_infos[file_index].last_file_mtime = Some(mtime);
351 info!(
352 "File modification time updated to {}",
353 state_guard.file_infos[file_index].modification_time_string()
354 );
355 state_guard.file_infos[file_index].reloading = true;
356 state_guard.file_infos[file_index].last_reload_ok = false;
357 drop(state_guard);
358 info!("Reload requested");
359 txs[file_index].send(LoaderMessage::Reload)?;
360 let body = get_status(state)?;
361 Response::builder()
362 .status(StatusCode::ACCEPTED)
363 .header(CONTENT_TYPE, JSON_MIME)
364 .default_header()
365 .body(Full::from(body))
366 }
367 _ => {
368 Response::builder()
370 .status(StatusCode::NOT_FOUND)
371 .header(CONTENT_TYPE, OCTET_MIME)
372 .default_header()
373 .body(Full::from(vec![]))
374 }
375 };
376 Ok(response?)
377}
378
379async fn handle(
380 state: Arc<RwLock<SurverState>>,
381 shared: Arc<ReadOnly>,
382 txs: Vec<Sender<LoaderMessage>>,
383 req: Request<hyper::body::Incoming>,
384) -> Result<Response<Full<Bytes>>> {
385 if req.uri().path() == "/favicon.ico" {
387 let favicon_data = include_bytes!("../assets/favicon.ico");
388 return Ok(Response::builder()
389 .status(StatusCode::OK)
390 .header("Content-Type", "image/x-icon")
391 .header("Cache-Control", "public, max-age=604800")
392 .body(Full::from(&favicon_data[..]))?);
393 }
394 let path_parts = req.uri().path().split('/').skip(1).collect::<Vec<_>>();
396
397 if let Some(provided_token) = path_parts.first() {
399 if *provided_token != shared.token {
400 warn!(
401 "Received request with invalid token: {provided_token} != {}\n{:?}",
402 shared.token,
403 req.uri()
404 );
405 return Ok(Response::builder()
406 .status(StatusCode::NOT_FOUND)
407 .header(CONTENT_TYPE, OCTET_MIME)
408 .default_header()
409 .body(Full::from(vec![]))?);
410 }
411 } else {
412 warn!("Received request with no token: {:?}", req.uri());
414 return Ok(Response::builder()
415 .status(StatusCode::NOT_FOUND)
416 .header(CONTENT_TYPE, OCTET_MIME)
417 .default_header()
418 .body(Full::from(vec![]))?);
419 }
420
421 let (file_index, cmd_idx) = if path_parts.len() >= 2 {
422 let file_index_str = path_parts[1];
424 match file_index_str.parse::<usize>() {
425 Ok(idx) => (Some(idx), 2),
426 Err(_) => (None, 1), }
428 } else {
429 (None, 1) };
431 let response = if let Some(cmd) = path_parts.get(cmd_idx) {
433 handle_cmd(&state, &txs, cmd, file_index, &path_parts[cmd_idx + 1..]).await?
434 } else {
435 let body = Full::from(get_info_page(&shared, &state));
437 Response::builder()
438 .status(StatusCode::OK)
439 .header(CONTENT_TYPE, HTML_MIME)
440 .default_header()
441 .body(body)?
442 };
443
444 Ok(response)
445}
446
447const MIN_TOKEN_LEN: usize = 8;
448const RAND_TOKEN_LEN: usize = 24;
449
450pub type ServerStartedFlag = Arc<std::sync::atomic::AtomicBool>;
451
452pub async fn surver_main(
453 port: u16,
454 bind_address: String,
455 token: Option<String>,
456 filenames: &[String],
457 started: Option<ServerStartedFlag>,
458) -> Result<()> {
459 let token = token.unwrap_or_else(|| {
461 repeat_with(fastrand::alphanumeric)
463 .take(RAND_TOKEN_LEN)
464 .collect()
465 });
466
467 if token.len() < MIN_TOKEN_LEN {
468 bail!("Token `{token}` is too short. At least {MIN_TOKEN_LEN} characters are required!");
469 }
470
471 let state = Arc::new(RwLock::new(SurverState { file_infos: vec![] }));
472
473 let mut txs: Vec<Sender<LoaderMessage>> = Vec::new();
474 for (file_index, filename) in filenames.iter().enumerate() {
476 let start_read_header = web_time::Instant::now();
477 let header_result = wellen::viewers::read_header_from_file(
478 filename.clone(),
479 &WELLEN_SURFER_DEFAULT_OPTIONS,
480 )
481 .map_err(|e| anyhow!("{e:?}"))
482 .with_context(|| format!("Failed to parse wave file: {filename}"))?;
483 info!(
484 "Loaded header of {filename} in {:?}",
485 start_read_header.elapsed()
486 );
487
488 let file_info = FileInfo {
489 filename: filename.clone(),
490 hierarchy: header_result.hierarchy,
491 file_format: header_result.file_format,
492 header_len: 0, body_len: header_result.body_len,
494 body_progress: Arc::new(AtomicU64::new(0)),
495 notify: Arc::new(Notify::new()),
496 timetable: vec![],
497 signals: HashMap::new(),
498 reloading: false,
499 last_reload_ok: true,
500 last_reload_time: None,
501 last_file_mtime: None,
502 };
503 {
504 let mut state_guard = state.write().expect("State lock poisoned when adding file");
505 state_guard.file_infos.push(file_info);
506 }
507 let (tx, rx) = std::sync::mpsc::channel::<LoaderMessage>();
509 txs.push(tx.clone());
510 let state_2 = state.clone();
512 std::thread::spawn(move || loader(&state_2, header_result.body, file_index, &rx));
513 }
514 let ip_addr: std::net::IpAddr = bind_address
515 .parse()
516 .with_context(|| format!("Invalid bind address: {bind_address}"))?;
517 let use_localhost = ip_addr.is_loopback();
518 if !use_localhost {
519 warn!(
520 "Server is binding to {bind_address} instead of 127.0.0.1/0:0:0:0:0:0:0:1 (localhost)"
521 );
522 warn!("This may make the server accessible from external networks");
523 warn!("Surver traffic is unencrypted and unauthenticated - use with caution!");
524 }
525
526 let addr = SocketAddr::new(ip_addr, port);
528 let url = format!("http://{addr}/{token}");
529 let url_copy = url.clone();
530 let token_copy = token.clone();
531 let shared = Arc::new(ReadOnly { url, token });
532
533 info!("Starting server on {addr}. To use:");
535 info!("1. Setup an ssh tunnel: -L {port}:localhost:{port}");
536 let hostname = whoami::hostname();
537 if let Ok(hostname) = hostname.as_ref()
538 && hostname != "localhost"
539 && let Ok(username) = whoami::username()
540 {
541 info!(
542 " The correct command may be: ssh -L {port}:localhost:{port} {username}@{hostname} "
543 );
544 }
545
546 info!("2. Start Surfer: surfer {url_copy} ");
547 if !use_localhost && let Ok(hostname) = hostname {
548 let hosturl = format!("http://{hostname}:{port}/{token_copy}");
549 info!("or, if the host is directly accessible:");
550 info!("1. Start Surfer: surfer {hosturl} ");
551 }
552 let listener = TcpListener::bind(&addr).await?;
554
555 if let Some(started) = started {
557 started.store(true, Ordering::SeqCst);
558 }
559
560 loop {
562 let (stream, _) = listener.accept().await?;
563 let io = TokioIo::new(stream);
564
565 let state = state.clone();
566 let shared = shared.clone();
567 let txs = txs.clone();
568 tokio::task::spawn(async move {
569 let service =
570 service_fn(move |req| handle(state.clone(), shared.clone(), txs.clone(), req));
571 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
572 error!("server error: {e}");
573 }
574 });
575 }
576}
577
578fn loader(
580 state: &Arc<RwLock<SurverState>>,
581 mut body_cont: viewers::ReadBodyContinuation<std::io::BufReader<std::fs::File>>,
582 file_index: usize,
583 rx: &std::sync::mpsc::Receiver<LoaderMessage>,
584) -> Result<()> {
585 loop {
586 let start_load_body = web_time::Instant::now();
588 let state_guard = state
589 .read()
590 .expect("State lock poisoned in loader before body load");
591 let file_info = &state_guard.file_infos[file_index];
592 let filename = file_info.filename.clone();
593 let body_result = viewers::read_body(
594 body_cont,
595 &file_info.hierarchy,
596 Some(file_info.body_progress.clone()),
597 )
598 .map_err(|e| anyhow!("{e:?}"))
599 .with_context(|| format!("Failed to parse body of wave file: {filename}"))?;
600 drop(state_guard);
601 info!(
602 "Loaded body of {} in {:?}",
603 filename,
604 start_load_body.elapsed()
605 );
606
607 {
609 let mut state_guard = state
610 .write()
611 .expect("State lock poisoned in loader after body load");
612 state_guard.file_infos[file_index].timetable = body_result.time_table;
613 state_guard.file_infos[file_index].signals.clear(); if let Ok(meta) = fs::metadata(&state_guard.file_infos[file_index].filename) {
615 state_guard.file_infos[file_index].last_file_mtime = Some(meta.modified()?);
616 info!(
617 "File modification time of {} set to {}",
618 filename,
619 state_guard.file_infos[file_index].modification_time_string()
620 );
621 }
622 state_guard.file_infos[file_index].last_reload_time = Some(Instant::now());
623 state_guard.file_infos[file_index].reloading = false;
624 state_guard.file_infos[file_index].last_reload_ok = true;
625 state_guard.file_infos[file_index].notify.notify_waiters();
626 }
627 let mut source = body_result.source;
629
630 loop {
632 let msg = rx.recv()?;
633
634 match msg {
635 LoaderMessage::SignalRequest(ids) => {
636 let mut filtered_ids = {
638 let state_guard = state
639 .read()
640 .expect("State lock poisoned in loader signal request");
641 ids.iter()
642 .filter(|id| {
643 !state_guard.file_infos[file_index].signals.contains_key(id)
644 })
645 .copied()
646 .collect::<Vec<_>>()
647 };
648
649 if filtered_ids.is_empty() {
651 continue;
652 }
653
654 filtered_ids.sort();
656 filtered_ids.dedup();
657 let result = {
658 let state_guard = state
659 .read()
660 .expect("State lock poisoned in loader signal request");
661 source.load_signals(
662 &filtered_ids,
663 &state_guard.file_infos[file_index].hierarchy,
664 true,
665 )
666 };
667
668 {
670 let mut state_guard = state
671 .write()
672 .expect("State lock poisoned in loader when storing signals");
673 for (id, signal) in result {
674 state_guard.file_infos[file_index]
675 .signals
676 .insert(id, signal);
677 }
678 state_guard.file_infos[file_index].notify.notify_waiters();
679 }
680 }
681 LoaderMessage::Reload => {
682 let state_guard = state
683 .read()
684 .expect("State lock poisoned in loader before reload");
685 info!(
686 "Reloading waveform file: {}",
687 state_guard.file_infos[file_index].filename
688 );
689 state_guard.file_infos[file_index]
691 .body_progress
692 .store(0, Ordering::SeqCst);
693
694 let header_result = wellen::viewers::read_header_from_file(
696 state_guard.file_infos[file_index].filename.clone(),
697 &WELLEN_SURFER_DEFAULT_OPTIONS,
698 )
699 .map_err(|e| anyhow!("{e:?}"))
700 .with_context(|| {
701 format!(
702 "Failed to reload wave file: {}",
703 state_guard.file_infos[file_index].filename
704 )
705 })?;
706
707 body_cont = header_result.body;
708 break; }
710 }
711 }
712 }
713}