libsurfer/
cxxrtl_container.rs

1use futures::executor::block_on;
2use std::{
3    collections::{HashMap, VecDeque},
4    sync::Arc,
5};
6use tokio::sync::mpsc;
7
8use color_eyre::Result;
9use log::{error, info};
10use num::{
11    bigint::{ToBigInt, ToBigUint},
12    BigUint,
13};
14use serde::Deserialize;
15use surfer_translation_types::VariableEncoding;
16
17use crate::wave_container::ScopeRefExt;
18use crate::{
19    channels::IngressReceiver,
20    cxxrtl::{
21        command::CxxrtlCommand,
22        cs_message::CSMessage,
23        query_container::QueryContainer,
24        sc_message::{
25            CommandResponse, CxxrtlSimulationStatus, Event, SCMessage, SimulationStatusType,
26        },
27        timestamp::CxxrtlTimestamp,
28    },
29    message::Message,
30    wave_container::{
31        QueryResult, ScopeId, ScopeRef, SimulationStatus, VarId, VariableMeta, VariableRef,
32        VariableRefExt,
33    },
34};
35
36const DEFAULT_REFERENCE: &str = "ALL_VARIABLES";
37
38type Callback = Box<dyn FnOnce(CommandResponse, &mut CxxrtlData) + Sync + Send>;
39
40#[derive(Deserialize, Debug, Clone)]
41pub(crate) struct CxxrtlScope {}
42
43#[derive(Deserialize, Debug, Clone)]
44pub struct CxxrtlItem {
45    pub width: u32,
46}
47
48/// A piece of data which we cache from Cxxrtl
49pub enum CachedData<T> {
50    /// The data cache is invalidated, the previously held data if it is still useful is
51    /// kept
52    Uncached { prev: Option<Arc<T>> },
53    /// The data cache is invalidated, and a request has been made for new data. However,
54    /// the new data has not been received yet. If the previous data is not useless, it
55    /// can be stored here
56    Waiting { prev: Option<Arc<T>> },
57    /// The cache is up-to-date
58    Filled(Arc<T>),
59}
60
61impl<T> CachedData<T> {
62    fn empty() -> Self {
63        Self::Uncached { prev: None }
64    }
65
66    fn make_uncached(&self) -> Self {
67        // Since the internals here are all Arc, clones are cheap
68        match &self {
69            CachedData::Uncached { prev } => CachedData::Uncached { prev: prev.clone() },
70            CachedData::Waiting { prev } => CachedData::Uncached { prev: prev.clone() },
71            CachedData::Filled(prev) => CachedData::Uncached {
72                prev: Some(prev.clone()),
73            },
74        }
75    }
76
77    pub fn filled(t: T) -> Self {
78        Self::Filled(Arc::new(t))
79    }
80
81    fn get(&self) -> Option<Arc<T>> {
82        match self {
83            CachedData::Uncached { prev } => prev.clone(),
84            CachedData::Waiting { prev } => prev.clone(),
85            CachedData::Filled(val) => Some(val.clone()),
86        }
87    }
88}
89
90impl<T> CachedData<T>
91where
92    T: Clone,
93{
94    /// Return the current value from the cache if it is there. If the cache is
95    /// Uncached run `f` to fetch the new value. The function must make sure that
96    /// the cache is updated eventually. The state is changed to `Waiting`
97    fn fetch_if_needed(&mut self, f: impl FnOnce()) -> Option<Arc<T>> {
98        if let CachedData::Uncached { .. } = self {
99            f();
100        }
101        match self {
102            CachedData::Uncached { prev } => {
103                let result = prev.as_ref().cloned();
104                *self = CachedData::Waiting { prev: prev.clone() };
105                result
106            }
107            CachedData::Waiting { prev } => prev.clone(),
108            CachedData::Filled(val) => Some(val.clone()),
109        }
110    }
111}
112
113pub struct CxxrtlData {
114    scopes_cache: CachedData<HashMap<ScopeRef, CxxrtlScope>>,
115    module_item_cache: HashMap<ScopeRef, CachedData<HashMap<VariableRef, CxxrtlItem>>>,
116    all_items_cache: CachedData<HashMap<VariableRef, CxxrtlItem>>,
117
118    /// We use the CachedData system to keep track of if we have sent a query request,
119    /// but the actual data is stored in the interval_query_cache.
120    ///
121    /// The held value in the query result is the end timestamp of the current current
122    /// interval_query_cache
123    query_result: CachedData<CxxrtlTimestamp>,
124    interval_query_cache: QueryContainer,
125
126    loaded_signals: Vec<VariableRef>,
127    signal_index_map: HashMap<VariableRef, usize>,
128
129    simulation_status: CachedData<CxxrtlSimulationStatus>,
130
131    msg_channel: std::sync::mpsc::Sender<Message>,
132}
133
134impl CxxrtlData {
135    pub fn trigger_redraw(&self) {
136        self.msg_channel
137            .send(Message::InvalidateDrawCommands)
138            .unwrap();
139        if let Some(ctx) = crate::EGUI_CONTEXT.read().unwrap().as_ref() {
140            ctx.request_repaint();
141        }
142    }
143
144    pub fn on_simulation_status_update(&mut self, status: CxxrtlSimulationStatus) {
145        self.simulation_status = CachedData::filled(status);
146        self.trigger_redraw();
147        self.invalidate_query_result();
148    }
149
150    pub fn invalidate_query_result(&mut self) {
151        self.query_result = self.query_result.make_uncached();
152        self.trigger_redraw();
153        // self.interval_query_cache.invalidate();
154    }
155}
156
157macro_rules! expect_response {
158    ($expected:pat, $response:expr) => {
159        let $expected = $response else {
160            log::error!(
161                "Got unexpected response. Got {:?} expected {}",
162                $response,
163                stringify!(expected)
164            );
165            return;
166        };
167    };
168}
169
170struct CSSender {
171    cs_messages: mpsc::Sender<String>,
172    callback_queue: VecDeque<Callback>,
173}
174
175impl CSSender {
176    fn run_command<F>(&mut self, command: CxxrtlCommand, f: F)
177    where
178        F: 'static + FnOnce(CommandResponse, &mut CxxrtlData) + Sync + Send,
179    {
180        self.callback_queue.push_back(Box::new(f));
181        let json = serde_json::to_string(&CSMessage::command(command))
182            .expect("Failed to encode cxxrtl command");
183        block_on(self.cs_messages.send(json)).unwrap();
184    }
185}
186
187pub struct CxxrtlContainer {
188    data: CxxrtlData,
189    sending: CSSender,
190    sc_messages: IngressReceiver<String>,
191    disconnected_reported: bool,
192}
193
194impl CxxrtlContainer {
195    async fn new(
196        msg_channel: std::sync::mpsc::Sender<Message>,
197        sending: CSSender,
198        sc_messages: IngressReceiver<String>,
199    ) -> Result<Self> {
200        info!("Sending cxxrtl greeting");
201        sending
202            .cs_messages
203            .send(serde_json::to_string(&CSMessage::greeting { version: 0 }).unwrap())
204            .await
205            .unwrap();
206
207        let data = CxxrtlData {
208            scopes_cache: CachedData::empty(),
209            module_item_cache: HashMap::new(),
210            all_items_cache: CachedData::empty(),
211            query_result: CachedData::empty(),
212            interval_query_cache: QueryContainer::empty(),
213            loaded_signals: vec![],
214            signal_index_map: HashMap::new(),
215            simulation_status: CachedData::empty(),
216            msg_channel: msg_channel.clone(),
217        };
218
219        let result = Self {
220            data,
221            sc_messages,
222            sending,
223            disconnected_reported: false,
224        };
225
226        info!("cxxrtl connected");
227
228        Ok(result)
229    }
230
231    #[cfg(not(target_arch = "wasm32"))]
232    pub async fn new_tcp(
233        addr: &str,
234        msg_channel: std::sync::mpsc::Sender<Message>,
235    ) -> Result<Self> {
236        use color_eyre::eyre::Context;
237
238        use crate::channels::IngressSender;
239        use crate::cxxrtl::io_worker;
240
241        let stream = tokio::net::TcpStream::connect(addr)
242            .await
243            .with_context(|| format!("Failed to connect to {addr}"))?;
244
245        let (read, write) = tokio::io::split(stream);
246
247        let (cs_tx, cs_rx) = mpsc::channel(100);
248        let (sc_tx, sc_rx) = mpsc::channel(100);
249        tokio::spawn(
250            io_worker::CxxrtlWorker::new(write, read, IngressSender::new(sc_tx), cs_rx).start(),
251        );
252
253        Self::new(
254            msg_channel,
255            CSSender {
256                cs_messages: cs_tx,
257                callback_queue: VecDeque::new(),
258            },
259            IngressReceiver::new(sc_rx),
260        )
261        .await
262    }
263
264    #[cfg(target_arch = "wasm32")]
265    pub async fn new_wasm_mailbox(msg_channel: std::sync::mpsc::Sender<Message>) -> Result<Self> {
266        use color_eyre::eyre::anyhow;
267
268        use crate::wasm_api::{CXXRTL_CS_HANDLER, CXXRTL_SC_HANDLER};
269
270        let result = Self::new(
271            msg_channel,
272            CSSender {
273                cs_messages: CXXRTL_CS_HANDLER.tx.clone(),
274                callback_queue: VecDeque::new(),
275            },
276            CXXRTL_SC_HANDLER
277                .rx
278                .write()
279                .await
280                .take()
281                .ok_or_else(|| anyhow!("The wasm mailbox has already been consumed."))?,
282        )
283        .await;
284
285        result
286    }
287
288    pub fn tick(&mut self) {
289        loop {
290            match self.sc_messages.try_recv() {
291                Ok(s) => {
292                    info!("CXXRTL S>C: {s}");
293                    let msg = match serde_json::from_str::<SCMessage>(&s) {
294                        Ok(msg) => msg,
295                        Err(e) => {
296                            error!("Got an unrecognised message from the cxxrtl server {e}");
297                            continue;
298                        }
299                    };
300                    match msg {
301                        SCMessage::greeting { .. } => {
302                            info!("Received cxxrtl greeting")
303                        }
304                        SCMessage::response(response) => {
305                            if let Some(cb) = self.sending.callback_queue.pop_front() {
306                                cb(response, &mut self.data)
307                            } else {
308                                error!("Got a CXXRTL message with no corresponding callback")
309                            };
310                        }
311                        SCMessage::error(e) => {
312                            error!("CXXRTL error: '{}'", e.message);
313                            self.sending.callback_queue.pop_front();
314                        }
315                        SCMessage::event(event) => match event {
316                            Event::simulation_paused { time, cause: _ } => {
317                                self.data
318                                    .on_simulation_status_update(CxxrtlSimulationStatus {
319                                        status: SimulationStatusType::paused,
320                                        latest_time: time,
321                                    });
322                            }
323                            Event::simulation_finished { time } => {
324                                self.data
325                                    .on_simulation_status_update(CxxrtlSimulationStatus {
326                                        status: SimulationStatusType::finished,
327                                        latest_time: time,
328                                    });
329                            }
330                        },
331                    }
332                }
333                Err(mpsc::error::TryRecvError::Empty) => {
334                    break;
335                }
336                Err(mpsc::error::TryRecvError::Disconnected) => {
337                    if !self.disconnected_reported {
338                        error!("CXXRTL sender disconnected");
339                        self.disconnected_reported = true;
340                    }
341                    break;
342                }
343            }
344        }
345    }
346
347    fn get_scopes(&mut self) -> Arc<HashMap<ScopeRef, CxxrtlScope>> {
348        self.data
349            .scopes_cache
350            .fetch_if_needed(|| {
351                self.sending.run_command(
352                    CxxrtlCommand::list_scopes { scope: None },
353                    |response, data| {
354                        expect_response!(CommandResponse::list_scopes { scopes }, response);
355
356                        let scopes = scopes
357                            .into_iter()
358                            .map(|(name, s)| {
359                                (
360                                    ScopeRef {
361                                        strs: name
362                                            .split(' ')
363                                            .map(std::string::ToString::to_string)
364                                            .collect(),
365                                        id: ScopeId::None,
366                                    },
367                                    s,
368                                )
369                            })
370                            .collect();
371
372                        data.scopes_cache = CachedData::filled(scopes);
373                    },
374                );
375            })
376            .unwrap_or_else(|| Arc::new(HashMap::new()))
377    }
378
379    /// Fetches the details on a specific item. For now, this fetches *all* items, but looks
380    /// up the specific item before returning. This is done in order to not have to return
381    /// the whole Item list since we need to lock the data structure to get that.
382    fn fetch_item(&mut self, var: &VariableRef) -> Option<CxxrtlItem> {
383        self.data
384            .all_items_cache
385            .fetch_if_needed(|| {
386                self.sending.run_command(
387                    CxxrtlCommand::list_items { scope: None },
388                    |response, data| {
389                        expect_response!(CommandResponse::list_items { items }, response);
390
391                        let items = Self::item_list_to_hash_map(items);
392
393                        data.all_items_cache = CachedData::filled(items);
394                    },
395                );
396            })
397            .and_then(|d| d.get(var).cloned())
398    }
399
400    fn fetch_all_items(&mut self) -> Option<Arc<HashMap<VariableRef, CxxrtlItem>>> {
401        self.data
402            .all_items_cache
403            .fetch_if_needed(|| {
404                self.sending.run_command(
405                    CxxrtlCommand::list_items { scope: None },
406                    |response, data| {
407                        expect_response!(CommandResponse::list_items { items }, response);
408
409                        let items = Self::item_list_to_hash_map(items);
410
411                        data.all_items_cache = CachedData::filled(items);
412                    },
413                );
414            })
415            .clone()
416    }
417
418    fn fetch_items_in_module(&mut self, scope: &ScopeRef) -> Arc<HashMap<VariableRef, CxxrtlItem>> {
419        let result = self
420            .data
421            .module_item_cache
422            .entry(scope.clone())
423            .or_insert(CachedData::empty())
424            .fetch_if_needed(|| {
425                let scope = scope.clone();
426                self.sending.run_command(
427                    CxxrtlCommand::list_items {
428                        scope: Some(scope.cxxrtl_repr()),
429                    },
430                    move |response, data| {
431                        expect_response!(CommandResponse::list_items { items }, response);
432
433                        let items = Self::item_list_to_hash_map(items);
434
435                        data.module_item_cache
436                            .insert(scope.clone(), CachedData::filled(items));
437                    },
438                );
439            });
440
441        result.unwrap_or_default()
442    }
443
444    fn item_list_to_hash_map(
445        items: HashMap<String, CxxrtlItem>,
446    ) -> HashMap<VariableRef, CxxrtlItem> {
447        items
448            .into_iter()
449            .filter_map(|(k, v)| {
450                let sp = k.split(' ').collect::<Vec<_>>();
451
452                if sp.is_empty() {
453                    error!("Found an empty variable name and scope");
454                    None
455                } else {
456                    Some((
457                        VariableRef {
458                            path: ScopeRef::from_strs(
459                                &sp[0..sp.len() - 1]
460                                    .iter()
461                                    .map(std::string::ToString::to_string)
462                                    .collect::<Vec<_>>(),
463                            ),
464                            name: sp.last().unwrap().to_string(),
465                            id: VarId::None,
466                        },
467                        v,
468                    ))
469                }
470            })
471            .collect()
472    }
473
474    fn scopes(&mut self) -> Option<Arc<HashMap<ScopeRef, CxxrtlScope>>> {
475        Some(self.get_scopes())
476    }
477
478    pub fn modules(&mut self) -> Vec<ScopeRef> {
479        if let Some(scopes) = &self.scopes() {
480            scopes.iter().map(|(k, _)| k.clone()).collect()
481        } else {
482            vec![]
483        }
484    }
485
486    pub fn root_modules(&mut self) -> Vec<ScopeRef> {
487        // In the cxxrtl protocol, the root scope is always ""
488        if self.scopes().is_some() {
489            vec![ScopeRef {
490                strs: vec![],
491                id: ScopeId::None,
492            }]
493        } else {
494            vec![]
495        }
496    }
497
498    pub fn module_exists(&mut self, module: &ScopeRef) -> bool {
499        self.scopes().is_some_and(|s| s.contains_key(module))
500    }
501
502    pub fn child_scopes(&mut self, parent: &ScopeRef) -> Vec<ScopeRef> {
503        self.scopes()
504            .map(|scopes| {
505                scopes
506                    .keys()
507                    .filter_map(|scope| {
508                        if scope.strs().len() == parent.strs().len() + 1 {
509                            if scope.strs()[0..parent.strs().len()]
510                                == parent.strs()[0..parent.strs().len()]
511                            {
512                                Some(scope.clone())
513                            } else {
514                                None
515                            }
516                        } else {
517                            None
518                        }
519                    })
520                    .collect()
521            })
522            .unwrap_or_default()
523    }
524
525    pub fn variables_in_module(&mut self, module: &ScopeRef) -> Vec<VariableRef> {
526        self.fetch_items_in_module(module).keys().cloned().collect()
527    }
528
529    pub fn no_variables_in_module(&mut self, module: &ScopeRef) -> bool {
530        self.fetch_items_in_module(module).is_empty()
531    }
532
533    pub fn variable_meta(&mut self, variable: &VariableRef) -> Result<VariableMeta> {
534        Ok(self
535            .fetch_item(variable)
536            .map(|item| VariableMeta {
537                var: variable.clone(),
538                num_bits: Some(item.width),
539                variable_type: None,
540                variable_type_name: None,
541                index: None,
542                direction: None,
543                enum_map: Default::default(),
544                encoding: VariableEncoding::BitVector,
545            })
546            .unwrap_or_else(|| VariableMeta {
547                var: variable.clone(),
548                num_bits: None,
549                variable_type: None,
550                variable_type_name: None,
551                index: None,
552                direction: None,
553                enum_map: Default::default(),
554                encoding: VariableEncoding::BitVector,
555            }))
556    }
557
558    pub fn max_displayed_timestamp(&self) -> Option<CxxrtlTimestamp> {
559        self.data.query_result.get().map(|t| (*t).clone())
560    }
561
562    pub fn max_timestamp(&mut self) -> Option<CxxrtlTimestamp> {
563        self.raw_simulation_status().map(|s| s.latest_time)
564    }
565
566    pub fn query_variable(
567        &mut self,
568        variable: &VariableRef,
569        time: &BigUint,
570    ) -> Option<QueryResult> {
571        // Before we can query any signals, we need some other data available. If we don't have
572        // that we'll early return with no value
573        let max_timestamp = self.max_timestamp()?;
574        let info = self.fetch_all_items()?;
575        let loaded_signals = self.data.loaded_signals.clone();
576
577        let res = self
578            .data
579            .query_result
580            .fetch_if_needed(|| {
581                info!("Running query variable");
582
583                self.sending.run_command(
584                    CxxrtlCommand::query_interval {
585                        interval: (CxxrtlTimestamp::zero(), max_timestamp.clone()),
586                        collapse: true,
587                        items: Some(DEFAULT_REFERENCE.to_string()),
588                        item_values_encoding: "base64(u32)",
589                        diagnostics: false,
590                    },
591                    move |response, data| {
592                        expect_response!(CommandResponse::query_interval { samples }, response);
593
594                        data.query_result = CachedData::filled(max_timestamp);
595                        data.interval_query_cache.populate(
596                            loaded_signals.clone(),
597                            info,
598                            samples,
599                            data.msg_channel.clone(),
600                        );
601                    },
602                );
603            })
604            .map(|_cached| {
605                // If we get here, the cache is valid and we we should look into the
606                // interval_query_cache for the query result
607                self.data
608                    .interval_query_cache
609                    .query(variable, time.to_bigint().unwrap())
610            })
611            .unwrap_or_default();
612        Some(res)
613    }
614
615    pub fn load_variables<S: AsRef<VariableRef>, T: Iterator<Item = S>>(&mut self, variables: T) {
616        let data = &mut self.data;
617        for variable in variables {
618            let varref = variable.as_ref().clone();
619
620            if !data.signal_index_map.contains_key(&varref) {
621                let idx = data.loaded_signals.len();
622                data.signal_index_map.insert(varref.clone(), idx);
623                data.loaded_signals.push(varref.clone());
624            }
625        }
626
627        self.sending.run_command(
628            CxxrtlCommand::reference_items {
629                reference: DEFAULT_REFERENCE.to_string(),
630                items: data
631                    .loaded_signals
632                    .iter()
633                    .map(|s| vec![s.cxxrtl_repr()])
634                    .collect(),
635            },
636            |_response, data| {
637                info!("Item references updated");
638                data.invalidate_query_result();
639            },
640        );
641    }
642
643    fn raw_simulation_status(&mut self) -> Option<CxxrtlSimulationStatus> {
644        self.data
645            .simulation_status
646            .fetch_if_needed(|| {
647                self.sending
648                    .run_command(CxxrtlCommand::get_simulation_status, |response, data| {
649                        expect_response!(CommandResponse::get_simulation_status(status), response);
650
651                        data.on_simulation_status_update(status);
652                    });
653            })
654            .map(|s| s.as_ref().clone())
655    }
656
657    pub fn simulation_status(&mut self) -> Option<SimulationStatus> {
658        self.raw_simulation_status().map(|s| match s.status {
659            SimulationStatusType::running => SimulationStatus::Running,
660            SimulationStatusType::paused => SimulationStatus::Paused,
661            SimulationStatusType::finished => SimulationStatus::Finished,
662        })
663    }
664
665    pub fn unpause(&mut self) {
666        let duration = self
667            .raw_simulation_status()
668            .map(|s| {
669                CxxrtlTimestamp::from_femtoseconds(
670                    s.latest_time.as_femtoseconds() + 100_000_000u32.to_biguint().unwrap(),
671                )
672            })
673            .unwrap_or_else(|| {
674                CxxrtlTimestamp::from_femtoseconds(100_000_000u32.to_biguint().unwrap())
675            });
676
677        let cmd = CxxrtlCommand::run_simulation {
678            until_time: Some(duration),
679            until_diagnostics: vec![],
680            sample_item_values: true,
681        };
682
683        self.sending.run_command(cmd, |_, data| {
684            data.simulation_status = CachedData::filled(CxxrtlSimulationStatus {
685                status: SimulationStatusType::running,
686                latest_time: CxxrtlTimestamp::zero(),
687            });
688            info!("Unpausing simulation");
689        });
690    }
691
692    pub fn pause(&mut self) {
693        self.sending
694            .run_command(CxxrtlCommand::pause_simulation, |response, data| {
695                expect_response!(CommandResponse::pause_simulation { time }, response);
696
697                data.on_simulation_status_update(CxxrtlSimulationStatus {
698                    status: SimulationStatusType::paused,
699                    latest_time: time,
700                });
701            });
702    }
703}