Skip to main content

libsurfer/
cxxrtl_container.rs

1use futures::executor::block_on;
2use std::{
3    collections::{HashMap, VecDeque},
4    sync::Arc,
5};
6use tokio::sync::mpsc;
7
8use eyre::Result;
9use num::{
10    BigUint,
11    bigint::{ToBigInt, ToBigUint},
12};
13use serde::Deserialize;
14use surfer_translation_types::VariableEncoding;
15use tracing::{error, info};
16
17use crate::wave_container::ScopeRefExt;
18use crate::{
19    channels::IngressReceiver,
20    cxxrtl::{
21        command::CxxrtlCommand,
22        cs_message::CSMessage,
23        query_container::QueryContainer,
24        sc_message::{
25            CommandResponse, CxxrtlSimulationStatus, Event, SCMessage, SimulationStatusType,
26        },
27        timestamp::CxxrtlTimestamp,
28    },
29    message::Message,
30    wave_container::{
31        QueryResult, ScopeId, ScopeRef, SimulationStatus, VarId, VariableMeta, VariableRef,
32        VariableRefExt,
33    },
34};
35
36const DEFAULT_REFERENCE: &str = "ALL_VARIABLES";
37
38type Callback = Box<dyn FnOnce(CommandResponse, &mut CxxrtlData) + Sync + Send>;
39
40#[derive(Deserialize, Debug, Clone)]
41pub(crate) struct CxxrtlScope {}
42
43#[derive(Deserialize, Debug, Clone)]
44pub struct CxxrtlItem {
45    pub width: u32,
46}
47
48/// A piece of data which we cache from Cxxrtl
49pub enum CachedData<T> {
50    /// The data cache is invalidated, the previously held data if it is still useful is
51    /// kept
52    Uncached { prev: Option<Arc<T>> },
53    /// The data cache is invalidated, and a request has been made for new data. However,
54    /// the new data has not been received yet. If the previous data is not useless, it
55    /// can be stored here
56    Waiting { prev: Option<Arc<T>> },
57    /// The cache is up-to-date
58    Filled(Arc<T>),
59}
60
61impl<T> CachedData<T> {
62    fn empty() -> Self {
63        Self::Uncached { prev: None }
64    }
65
66    fn make_uncached(&self) -> Self {
67        // Since the internals here are all Arc, clones are cheap
68        match &self {
69            CachedData::Uncached { prev } => CachedData::Uncached { prev: prev.clone() },
70            CachedData::Waiting { prev } => CachedData::Uncached { prev: prev.clone() },
71            CachedData::Filled(prev) => CachedData::Uncached {
72                prev: Some(prev.clone()),
73            },
74        }
75    }
76
77    pub fn filled(t: T) -> Self {
78        Self::Filled(Arc::new(t))
79    }
80
81    fn get(&self) -> Option<Arc<T>> {
82        match self {
83            CachedData::Uncached { prev } => prev.clone(),
84            CachedData::Waiting { prev } => prev.clone(),
85            CachedData::Filled(val) => Some(val.clone()),
86        }
87    }
88}
89
90impl<T> CachedData<T>
91where
92    T: Clone,
93{
94    /// Return the current value from the cache if it is there. If the cache is
95    /// Uncached run `f` to fetch the new value. The function must make sure that
96    /// the cache is updated eventually. The state is changed to `Waiting`
97    fn fetch_if_needed(&mut self, f: impl FnOnce()) -> Option<Arc<T>> {
98        if let CachedData::Uncached { .. } = self {
99            f();
100        }
101        match self {
102            CachedData::Uncached { prev } => {
103                let result = prev.clone();
104                *self = CachedData::Waiting { prev: prev.clone() };
105                result
106            }
107            CachedData::Waiting { prev } => prev.clone(),
108            CachedData::Filled(val) => Some(val.clone()),
109        }
110    }
111}
112
113pub struct CxxrtlData {
114    scopes_cache: CachedData<HashMap<ScopeRef, CxxrtlScope>>,
115    module_item_cache: HashMap<ScopeRef, CachedData<HashMap<VariableRef, CxxrtlItem>>>,
116    all_items_cache: CachedData<HashMap<VariableRef, CxxrtlItem>>,
117
118    /// We use the `CachedData` system to keep track of if we have sent a query request,
119    /// but the actual data is stored in the `interval_query_cache`.
120    ///
121    /// The held value in the query result is the end timestamp of the current current
122    /// `interval_query_cache`
123    query_result: CachedData<CxxrtlTimestamp>,
124    interval_query_cache: QueryContainer,
125
126    loaded_signals: Vec<VariableRef>,
127    signal_index_map: HashMap<VariableRef, usize>,
128
129    simulation_status: CachedData<CxxrtlSimulationStatus>,
130
131    msg_channel: std::sync::mpsc::Sender<Message>,
132}
133
134impl CxxrtlData {
135    pub fn trigger_redraw(&self) {
136        self.msg_channel
137            .send(Message::InvalidateDrawCommands)
138            .unwrap();
139        if let Some(ctx) = crate::EGUI_CONTEXT.read().unwrap().as_ref() {
140            ctx.request_repaint();
141        }
142    }
143
144    pub fn on_simulation_status_update(&mut self, status: CxxrtlSimulationStatus) {
145        self.simulation_status = CachedData::filled(status);
146        self.trigger_redraw();
147        self.invalidate_query_result();
148    }
149
150    pub fn invalidate_query_result(&mut self) {
151        self.query_result = self.query_result.make_uncached();
152        self.trigger_redraw();
153        // self.interval_query_cache.invalidate();
154    }
155}
156
157macro_rules! expect_response {
158    ($expected:pat, $response:expr) => {
159        let $expected = $response else {
160            tracing::error!(
161                "Got unexpected response. Got {:?} expected {}",
162                $response,
163                stringify!(expected)
164            );
165            return;
166        };
167    };
168}
169
170struct CSSender {
171    cs_messages: mpsc::Sender<String>,
172    callback_queue: VecDeque<Callback>,
173}
174
175impl CSSender {
176    fn run_command<F>(&mut self, command: CxxrtlCommand, f: F)
177    where
178        F: 'static + FnOnce(CommandResponse, &mut CxxrtlData) + Sync + Send,
179    {
180        self.callback_queue.push_back(Box::new(f));
181        let json = serde_json::to_string(&CSMessage::command(command))
182            .expect("Failed to encode cxxrtl command");
183        block_on(self.cs_messages.send(json)).unwrap();
184    }
185}
186
187pub struct CxxrtlContainer {
188    data: CxxrtlData,
189    sending: CSSender,
190    sc_messages: IngressReceiver<String>,
191    disconnected_reported: bool,
192}
193
194impl CxxrtlContainer {
195    async fn new(
196        msg_channel: std::sync::mpsc::Sender<Message>,
197        sending: CSSender,
198        sc_messages: IngressReceiver<String>,
199    ) -> Result<Self> {
200        info!("Sending cxxrtl greeting");
201        sending
202            .cs_messages
203            .send(serde_json::to_string(&CSMessage::greeting { version: 0 }).unwrap())
204            .await
205            .unwrap();
206
207        let data = CxxrtlData {
208            scopes_cache: CachedData::empty(),
209            module_item_cache: HashMap::new(),
210            all_items_cache: CachedData::empty(),
211            query_result: CachedData::empty(),
212            interval_query_cache: QueryContainer::empty(),
213            loaded_signals: vec![],
214            signal_index_map: HashMap::new(),
215            simulation_status: CachedData::empty(),
216            msg_channel: msg_channel.clone(),
217        };
218
219        let result = Self {
220            data,
221            sc_messages,
222            sending,
223            disconnected_reported: false,
224        };
225
226        info!("cxxrtl connected");
227
228        Ok(result)
229    }
230
231    #[cfg(not(target_arch = "wasm32"))]
232    pub async fn new_tcp(
233        addr: &str,
234        msg_channel: std::sync::mpsc::Sender<Message>,
235    ) -> Result<Self> {
236        use eyre::Context;
237
238        use crate::channels::IngressSender;
239        use crate::cxxrtl::io_worker;
240
241        let stream = tokio::net::TcpStream::connect(addr)
242            .await
243            .with_context(|| format!("Failed to connect to {addr}"))?;
244
245        let (read, write) = tokio::io::split(stream);
246
247        let (cs_tx, cs_rx) = mpsc::channel(100);
248        let (sc_tx, sc_rx) = mpsc::channel(100);
249        tokio::spawn(
250            io_worker::CxxrtlWorker::new(write, read, IngressSender::new(sc_tx), cs_rx).start(),
251        );
252
253        Self::new(
254            msg_channel,
255            CSSender {
256                cs_messages: cs_tx,
257                callback_queue: VecDeque::new(),
258            },
259            IngressReceiver::new(sc_rx),
260        )
261        .await
262    }
263
264    #[cfg(target_arch = "wasm32")]
265    pub async fn new_wasm_mailbox(msg_channel: std::sync::mpsc::Sender<Message>) -> Result<Self> {
266        use eyre::anyhow;
267
268        use crate::wasm_api::{CXXRTL_CS_HANDLER, CXXRTL_SC_HANDLER};
269
270        let result = Self::new(
271            msg_channel,
272            CSSender {
273                cs_messages: CXXRTL_CS_HANDLER.tx.clone(),
274                callback_queue: VecDeque::new(),
275            },
276            CXXRTL_SC_HANDLER
277                .rx
278                .write()
279                .await
280                .take()
281                .ok_or_else(|| anyhow!("The wasm mailbox has already been consumed."))?,
282        )
283        .await;
284
285        result
286    }
287
288    pub fn tick(&mut self) {
289        loop {
290            match self.sc_messages.try_recv() {
291                Ok(s) => {
292                    info!("CXXRTL S>C: {s}");
293                    let msg = match serde_json::from_str::<SCMessage>(&s) {
294                        Ok(msg) => msg,
295                        Err(e) => {
296                            error!("Got an unrecognised message from the cxxrtl server {e}");
297                            continue;
298                        }
299                    };
300                    match msg {
301                        SCMessage::greeting { .. } => {
302                            info!("Received cxxrtl greeting");
303                        }
304                        SCMessage::response(response) => {
305                            if let Some(cb) = self.sending.callback_queue.pop_front() {
306                                cb(response, &mut self.data);
307                            } else {
308                                error!("Got a CXXRTL message with no corresponding callback");
309                            }
310                        }
311                        SCMessage::error(e) => {
312                            error!("CXXRTL error: '{}'", e.message);
313                            self.sending.callback_queue.pop_front();
314                        }
315                        SCMessage::event(event) => match event {
316                            Event::simulation_paused { time, cause: _ } => {
317                                self.data
318                                    .on_simulation_status_update(CxxrtlSimulationStatus {
319                                        status: SimulationStatusType::paused,
320                                        latest_time: time,
321                                    });
322                            }
323                            Event::simulation_finished { time } => {
324                                self.data
325                                    .on_simulation_status_update(CxxrtlSimulationStatus {
326                                        status: SimulationStatusType::finished,
327                                        latest_time: time,
328                                    });
329                            }
330                        },
331                    }
332                }
333                Err(mpsc::error::TryRecvError::Empty) => {
334                    break;
335                }
336                Err(mpsc::error::TryRecvError::Disconnected) => {
337                    if !self.disconnected_reported {
338                        error!("CXXRTL sender disconnected");
339                        self.disconnected_reported = true;
340                    }
341                    break;
342                }
343            }
344        }
345    }
346
347    fn get_scopes(&mut self) -> Arc<HashMap<ScopeRef, CxxrtlScope>> {
348        self.data
349            .scopes_cache
350            .fetch_if_needed(|| {
351                self.sending.run_command(
352                    CxxrtlCommand::list_scopes { scope: None },
353                    |response, data| {
354                        expect_response!(CommandResponse::list_scopes { scopes }, response);
355
356                        let scopes = scopes
357                            .into_iter()
358                            .map(|(name, s)| {
359                                (
360                                    ScopeRef {
361                                        strs: name.split(' ').map(str::to_string).collect(),
362                                        id: ScopeId::None,
363                                    },
364                                    s,
365                                )
366                            })
367                            .collect();
368
369                        data.scopes_cache = CachedData::filled(scopes);
370                    },
371                );
372            })
373            .unwrap_or_else(|| Arc::new(HashMap::new()))
374    }
375
376    /// Fetches the details on a specific item. For now, this fetches *all* items, but looks
377    /// up the specific item before returning. This is done in order to not have to return
378    /// the whole Item list since we need to lock the data structure to get that.
379    fn fetch_item(&mut self, var: &VariableRef) -> Option<CxxrtlItem> {
380        self.data
381            .all_items_cache
382            .fetch_if_needed(|| {
383                self.sending.run_command(
384                    CxxrtlCommand::list_items { scope: None },
385                    |response, data| {
386                        expect_response!(CommandResponse::list_items { items }, response);
387
388                        let items = Self::item_list_to_hash_map(items);
389
390                        data.all_items_cache = CachedData::filled(items);
391                    },
392                );
393            })
394            .and_then(|d| d.get(var).cloned())
395    }
396
397    fn fetch_all_items(&mut self) -> Option<Arc<HashMap<VariableRef, CxxrtlItem>>> {
398        self.data
399            .all_items_cache
400            .fetch_if_needed(|| {
401                self.sending.run_command(
402                    CxxrtlCommand::list_items { scope: None },
403                    |response, data| {
404                        expect_response!(CommandResponse::list_items { items }, response);
405
406                        let items = Self::item_list_to_hash_map(items);
407
408                        data.all_items_cache = CachedData::filled(items);
409                    },
410                );
411            })
412            .clone()
413    }
414
415    fn fetch_items_in_module(&mut self, scope: &ScopeRef) -> Arc<HashMap<VariableRef, CxxrtlItem>> {
416        let result = self
417            .data
418            .module_item_cache
419            .entry(scope.clone())
420            .or_insert(CachedData::empty())
421            .fetch_if_needed(|| {
422                let scope = scope.clone();
423                self.sending.run_command(
424                    CxxrtlCommand::list_items {
425                        scope: Some(scope.cxxrtl_repr()),
426                    },
427                    move |response, data| {
428                        expect_response!(CommandResponse::list_items { items }, response);
429
430                        let items = Self::item_list_to_hash_map(items);
431
432                        data.module_item_cache
433                            .insert(scope.clone(), CachedData::filled(items));
434                    },
435                );
436            });
437
438        result.unwrap_or_default()
439    }
440
441    fn item_list_to_hash_map(
442        items: HashMap<String, CxxrtlItem>,
443    ) -> HashMap<VariableRef, CxxrtlItem> {
444        items
445            .into_iter()
446            .filter_map(|(k, v)| {
447                let sp = k.split(' ').collect::<Vec<_>>();
448
449                if sp.is_empty() {
450                    error!("Found an empty variable name and scope");
451                    None
452                } else {
453                    Some((
454                        VariableRef {
455                            path: ScopeRef::from_strs(
456                                &sp[0..sp.len() - 1]
457                                    .iter()
458                                    .map(ToString::to_string)
459                                    .collect::<Vec<_>>(),
460                            ),
461                            name: (*sp.last().unwrap()).to_string(),
462                            id: VarId::None,
463                        },
464                        v,
465                    ))
466                }
467            })
468            .collect()
469    }
470
471    fn scopes(&mut self) -> Option<Arc<HashMap<ScopeRef, CxxrtlScope>>> {
472        Some(self.get_scopes())
473    }
474
475    pub fn modules(&mut self) -> Vec<ScopeRef> {
476        if let Some(scopes) = &self.scopes() {
477            scopes.keys().cloned().collect()
478        } else {
479            vec![]
480        }
481    }
482
483    pub fn root_modules(&mut self) -> Vec<ScopeRef> {
484        // In the cxxrtl protocol, the root scope is always ""
485        if self.scopes().is_some() {
486            vec![ScopeRef {
487                strs: vec![],
488                id: ScopeId::None,
489            }]
490        } else {
491            vec![]
492        }
493    }
494
495    pub fn module_exists(&mut self, module: &ScopeRef) -> bool {
496        self.scopes().is_some_and(|s| s.contains_key(module))
497    }
498
499    pub fn child_scopes(&mut self, parent: &ScopeRef) -> Vec<ScopeRef> {
500        self.scopes()
501            .map(|scopes| {
502                scopes
503                    .keys()
504                    .filter_map(|scope| {
505                        if scope.strs().len() == parent.strs().len() + 1 {
506                            if scope.strs()[0..parent.strs().len()]
507                                == parent.strs()[0..parent.strs().len()]
508                            {
509                                Some(scope.clone())
510                            } else {
511                                None
512                            }
513                        } else {
514                            None
515                        }
516                    })
517                    .collect()
518            })
519            .unwrap_or_default()
520    }
521
522    pub fn variables_in_module(&mut self, module: &ScopeRef) -> Vec<VariableRef> {
523        self.fetch_items_in_module(module).keys().cloned().collect()
524    }
525
526    pub fn no_variables_in_module(&mut self, module: &ScopeRef) -> bool {
527        self.fetch_items_in_module(module).is_empty()
528    }
529
530    pub fn variable_meta(&mut self, variable: &VariableRef) -> Result<VariableMeta> {
531        Ok(self
532            .fetch_item(variable)
533            .map(|item| VariableMeta {
534                var: variable.clone(),
535                num_bits: Some(item.width),
536                variable_type: None,
537                variable_type_name: None,
538                index: None,
539                direction: None,
540                enum_map: Default::default(),
541                encoding: VariableEncoding::BitVector,
542            })
543            .unwrap_or_else(|| VariableMeta {
544                var: variable.clone(),
545                num_bits: None,
546                variable_type: None,
547                variable_type_name: None,
548                index: None,
549                direction: None,
550                enum_map: Default::default(),
551                encoding: VariableEncoding::BitVector,
552            }))
553    }
554
555    #[must_use]
556    pub fn max_displayed_timestamp(&self) -> Option<CxxrtlTimestamp> {
557        self.data.query_result.get().map(|t| (*t).clone())
558    }
559
560    pub fn max_timestamp(&mut self) -> Option<CxxrtlTimestamp> {
561        self.raw_simulation_status().map(|s| s.latest_time)
562    }
563
564    pub fn query_variable(
565        &mut self,
566        variable: &VariableRef,
567        time: &BigUint,
568    ) -> Option<QueryResult> {
569        // Before we can query any signals, we need some other data available. If we don't have
570        // that we'll early return with no value
571        let max_timestamp = self.max_timestamp()?;
572        let info = self.fetch_all_items()?;
573        let loaded_signals = self.data.loaded_signals.clone();
574
575        let res = self
576            .data
577            .query_result
578            .fetch_if_needed(|| {
579                info!("Running query variable");
580
581                self.sending.run_command(
582                    CxxrtlCommand::query_interval {
583                        interval: (CxxrtlTimestamp::zero(), max_timestamp.clone()),
584                        collapse: true,
585                        items: Some(DEFAULT_REFERENCE.to_string()),
586                        item_values_encoding: "base64(u32)",
587                        diagnostics: false,
588                    },
589                    move |response, data| {
590                        expect_response!(CommandResponse::query_interval { samples }, response);
591
592                        data.query_result = CachedData::filled(max_timestamp);
593                        data.interval_query_cache.populate(
594                            loaded_signals.clone(),
595                            info,
596                            samples,
597                            data.msg_channel.clone(),
598                        );
599                    },
600                );
601            })
602            .map(|_cached| {
603                // If we get here, the cache is valid and we we should look into the
604                // interval_query_cache for the query result
605                self.data
606                    .interval_query_cache
607                    .query(variable, time.to_bigint().unwrap())
608            })
609            .unwrap_or_default();
610        Some(res)
611    }
612
613    pub fn load_variables<S: AsRef<VariableRef>, T: Iterator<Item = S>>(&mut self, variables: T) {
614        let data = &mut self.data;
615        for variable in variables {
616            let varref = variable.as_ref().clone();
617
618            if !data.signal_index_map.contains_key(&varref) {
619                let idx = data.loaded_signals.len();
620                data.signal_index_map.insert(varref.clone(), idx);
621                data.loaded_signals.push(varref.clone());
622            }
623        }
624
625        self.sending.run_command(
626            CxxrtlCommand::reference_items {
627                reference: DEFAULT_REFERENCE.to_string(),
628                items: data
629                    .loaded_signals
630                    .iter()
631                    .map(|s| vec![s.cxxrtl_repr()])
632                    .collect(),
633            },
634            |_response, data| {
635                info!("Item references updated");
636                data.invalidate_query_result();
637            },
638        );
639    }
640
641    fn raw_simulation_status(&mut self) -> Option<CxxrtlSimulationStatus> {
642        self.data
643            .simulation_status
644            .fetch_if_needed(|| {
645                self.sending
646                    .run_command(CxxrtlCommand::get_simulation_status, |response, data| {
647                        expect_response!(CommandResponse::get_simulation_status(status), response);
648
649                        data.on_simulation_status_update(status);
650                    });
651            })
652            .map(|s| s.as_ref().clone())
653    }
654
655    pub fn simulation_status(&mut self) -> Option<SimulationStatus> {
656        self.raw_simulation_status().map(|s| match s.status {
657            SimulationStatusType::running => SimulationStatus::Running,
658            SimulationStatusType::paused => SimulationStatus::Paused,
659            SimulationStatusType::finished => SimulationStatus::Finished,
660        })
661    }
662
663    pub fn unpause(&mut self) {
664        let duration = self
665            .raw_simulation_status()
666            .map(|s| {
667                CxxrtlTimestamp::from_femtoseconds(
668                    s.latest_time.as_femtoseconds() + 100_000_000u32.to_biguint().unwrap(),
669                )
670            })
671            .unwrap_or_else(|| {
672                CxxrtlTimestamp::from_femtoseconds(100_000_000u32.to_biguint().unwrap())
673            });
674
675        let cmd = CxxrtlCommand::run_simulation {
676            until_time: Some(duration),
677            until_diagnostics: vec![],
678            sample_item_values: true,
679        };
680
681        self.sending.run_command(cmd, |_, data| {
682            data.simulation_status = CachedData::filled(CxxrtlSimulationStatus {
683                status: SimulationStatusType::running,
684                latest_time: CxxrtlTimestamp::zero(),
685            });
686            info!("Unpausing simulation");
687        });
688    }
689
690    pub fn pause(&mut self) {
691        self.sending
692            .run_command(CxxrtlCommand::pause_simulation, |response, data| {
693                expect_response!(CommandResponse::pause_simulation { time }, response);
694
695                data.on_simulation_status_update(CxxrtlSimulationStatus {
696                    status: SimulationStatusType::paused,
697                    latest_time: time,
698                });
699            });
700    }
701}