Skip to main content

libsurfer/
cxxrtl_container.rs

1use futures::executor::block_on;
2use std::{
3    collections::{HashMap, VecDeque},
4    sync::Arc,
5};
6use tokio::sync::mpsc;
7
8use eyre::Result;
9use num::{
10    BigUint,
11    bigint::{ToBigInt, ToBigUint},
12};
13use serde::Deserialize;
14use surfer_translation_types::VariableEncoding;
15use tracing::{error, info};
16
17use crate::wave_container::ScopeRefExt;
18use crate::{
19    channels::IngressReceiver,
20    cxxrtl::{
21        command::CxxrtlCommand,
22        cs_message::CSMessage,
23        query_container::QueryContainer,
24        sc_message::{
25            CommandResponse, CxxrtlSimulationStatus, Event, SCMessage, SimulationStatusType,
26        },
27        timestamp::CxxrtlTimestamp,
28    },
29    message::Message,
30    wave_container::{
31        QueryResult, ScopeId, ScopeRef, SimulationStatus, VarId, VariableMeta, VariableRef,
32        VariableRefExt,
33    },
34};
35
36const DEFAULT_REFERENCE: &str = "ALL_VARIABLES";
37
38type Callback = Box<dyn FnOnce(CommandResponse, &mut CxxrtlData) + Sync + Send>;
39
40#[derive(Deserialize, Debug, Clone)]
41pub(crate) struct CxxrtlScope {}
42
43#[derive(Deserialize, Debug, Clone)]
44pub struct CxxrtlItem {
45    pub width: u32,
46}
47
48/// A piece of data which we cache from Cxxrtl
49pub enum CachedData<T> {
50    /// The data cache is invalidated, the previously held data if it is still useful is
51    /// kept
52    Uncached { prev: Option<Arc<T>> },
53    /// The data cache is invalidated, and a request has been made for new data. However,
54    /// the new data has not been received yet. If the previous data is not useless, it
55    /// can be stored here
56    Waiting { prev: Option<Arc<T>> },
57    /// The cache is up-to-date
58    Filled(Arc<T>),
59}
60
61impl<T> CachedData<T> {
62    fn empty() -> Self {
63        Self::Uncached { prev: None }
64    }
65
66    fn make_uncached(&self) -> Self {
67        // Since the internals here are all Arc, clones are cheap
68        match &self {
69            CachedData::Uncached { prev } => CachedData::Uncached { prev: prev.clone() },
70            CachedData::Waiting { prev } => CachedData::Uncached { prev: prev.clone() },
71            CachedData::Filled(prev) => CachedData::Uncached {
72                prev: Some(prev.clone()),
73            },
74        }
75    }
76
77    pub fn filled(t: T) -> Self {
78        Self::Filled(Arc::new(t))
79    }
80
81    fn get(&self) -> Option<Arc<T>> {
82        match self {
83            CachedData::Uncached { prev } => prev.clone(),
84            CachedData::Waiting { prev } => prev.clone(),
85            CachedData::Filled(val) => Some(val.clone()),
86        }
87    }
88}
89
90impl<T> CachedData<T>
91where
92    T: Clone,
93{
94    /// Return the current value from the cache if it is there. If the cache is
95    /// Uncached run `f` to fetch the new value. The function must make sure that
96    /// the cache is updated eventually. The state is changed to `Waiting`
97    fn fetch_if_needed(&mut self, f: impl FnOnce()) -> Option<Arc<T>> {
98        if let CachedData::Uncached { .. } = self {
99            f();
100        }
101        match self {
102            CachedData::Uncached { prev } => {
103                let result = prev.clone();
104                *self = CachedData::Waiting { prev: prev.clone() };
105                result
106            }
107            CachedData::Waiting { prev } => prev.clone(),
108            CachedData::Filled(val) => Some(val.clone()),
109        }
110    }
111}
112
113pub struct CxxrtlData {
114    scopes_cache: CachedData<HashMap<ScopeRef, CxxrtlScope>>,
115    module_item_cache: HashMap<ScopeRef, CachedData<HashMap<VariableRef, CxxrtlItem>>>,
116    all_items_cache: CachedData<HashMap<VariableRef, CxxrtlItem>>,
117
118    /// We use the `CachedData` system to keep track of if we have sent a query request,
119    /// but the actual data is stored in the `interval_query_cache`.
120    ///
121    /// The held value in the query result is the end timestamp of the current current
122    /// `interval_query_cache`
123    query_result: CachedData<CxxrtlTimestamp>,
124    interval_query_cache: QueryContainer,
125
126    loaded_signals: Vec<VariableRef>,
127    signal_index_map: HashMap<VariableRef, usize>,
128
129    simulation_status: CachedData<CxxrtlSimulationStatus>,
130
131    msg_channel: std::sync::mpsc::Sender<Message>,
132}
133
134impl CxxrtlData {
135    pub fn trigger_redraw(&self) {
136        self.msg_channel
137            .send(Message::InvalidateDrawCommands)
138            .unwrap();
139        if let Some(ctx) = crate::EGUI_CONTEXT.read().unwrap().as_ref() {
140            ctx.request_repaint();
141        }
142    }
143
144    pub fn on_simulation_status_update(&mut self, status: CxxrtlSimulationStatus) {
145        self.simulation_status = CachedData::filled(status);
146        self.trigger_redraw();
147        self.invalidate_query_result();
148    }
149
150    pub fn invalidate_query_result(&mut self) {
151        self.query_result = self.query_result.make_uncached();
152        self.trigger_redraw();
153        // self.interval_query_cache.invalidate();
154    }
155}
156
157macro_rules! expect_response {
158    ($expected:pat, $response:expr) => {
159        let $expected = $response else {
160            error!(
161                "Got unexpected response. Got {:?} expected {}",
162                $response,
163                stringify!(expected)
164            );
165            return;
166        };
167    };
168}
169
170struct CSSender {
171    cs_messages: mpsc::Sender<String>,
172    callback_queue: VecDeque<Callback>,
173}
174
175impl CSSender {
176    fn run_command<F>(&mut self, command: CxxrtlCommand, f: F)
177    where
178        F: 'static + FnOnce(CommandResponse, &mut CxxrtlData) + Sync + Send,
179    {
180        self.callback_queue.push_back(Box::new(f));
181        let json = serde_json::to_string(&CSMessage::command(command))
182            .expect("Failed to encode cxxrtl command");
183        block_on(self.cs_messages.send(json)).unwrap();
184    }
185}
186
187pub struct CxxrtlContainer {
188    data: CxxrtlData,
189    sending: CSSender,
190    sc_messages: IngressReceiver<String>,
191    disconnected_reported: bool,
192}
193
194impl CxxrtlContainer {
195    async fn new(
196        msg_channel: std::sync::mpsc::Sender<Message>,
197        sending: CSSender,
198        sc_messages: IngressReceiver<String>,
199    ) -> Result<Self> {
200        info!("Sending cxxrtl greeting");
201        sending
202            .cs_messages
203            .send(serde_json::to_string(&CSMessage::greeting { version: 0 }).unwrap())
204            .await
205            .unwrap();
206
207        let data = CxxrtlData {
208            scopes_cache: CachedData::empty(),
209            module_item_cache: HashMap::new(),
210            all_items_cache: CachedData::empty(),
211            query_result: CachedData::empty(),
212            interval_query_cache: QueryContainer::empty(),
213            loaded_signals: vec![],
214            signal_index_map: HashMap::new(),
215            simulation_status: CachedData::empty(),
216            msg_channel: msg_channel.clone(),
217        };
218
219        let result = Self {
220            data,
221            sc_messages,
222            sending,
223            disconnected_reported: false,
224        };
225
226        info!("cxxrtl connected");
227
228        Ok(result)
229    }
230
231    #[cfg(not(target_arch = "wasm32"))]
232    pub async fn new_tcp(
233        addr: &str,
234        msg_channel: std::sync::mpsc::Sender<Message>,
235    ) -> Result<Self> {
236        use eyre::Context;
237
238        use crate::channels::IngressSender;
239        use crate::cxxrtl::io_worker;
240
241        let stream = tokio::net::TcpStream::connect(addr)
242            .await
243            .with_context(|| format!("Failed to connect to {addr}"))?;
244
245        let (read, write) = tokio::io::split(stream);
246
247        let (cs_tx, cs_rx) = mpsc::channel(100);
248        let (sc_tx, sc_rx) = mpsc::channel(100);
249        tokio::spawn(
250            io_worker::CxxrtlWorker::new(write, read, IngressSender::new(sc_tx), cs_rx).start(),
251        );
252
253        Self::new(
254            msg_channel,
255            CSSender {
256                cs_messages: cs_tx,
257                callback_queue: VecDeque::new(),
258            },
259            IngressReceiver::new(sc_rx),
260        )
261        .await
262    }
263
264    #[cfg(target_arch = "wasm32")]
265    pub async fn new_wasm_mailbox(msg_channel: std::sync::mpsc::Sender<Message>) -> Result<Self> {
266        use eyre::anyhow;
267
268        use crate::wasm_api::{CXXRTL_CS_HANDLER, CXXRTL_SC_HANDLER};
269
270        let result = Self::new(
271            msg_channel,
272            CSSender {
273                cs_messages: CXXRTL_CS_HANDLER.tx.clone(),
274                callback_queue: VecDeque::new(),
275            },
276            CXXRTL_SC_HANDLER
277                .rx
278                .write()
279                .await
280                .take()
281                .ok_or_else(|| anyhow!("The wasm mailbox has already been consumed."))?,
282        )
283        .await;
284
285        result
286    }
287
288    pub fn tick(&mut self) {
289        loop {
290            match self.sc_messages.try_recv() {
291                Ok(s) => {
292                    info!("CXXRTL S>C: {s}");
293                    let msg = match serde_json::from_str::<SCMessage>(&s) {
294                        Ok(msg) => msg,
295                        Err(e) => {
296                            error!("Got an unrecognised message from the cxxrtl server {e}");
297                            continue;
298                        }
299                    };
300                    match msg {
301                        SCMessage::greeting { .. } => {
302                            info!("Received cxxrtl greeting");
303                        }
304                        SCMessage::response(response) => {
305                            if let Some(cb) = self.sending.callback_queue.pop_front() {
306                                cb(response, &mut self.data);
307                            } else {
308                                error!("Got a CXXRTL message with no corresponding callback");
309                            }
310                        }
311                        SCMessage::error(e) => {
312                            error!("CXXRTL error: '{}'", e.message);
313                            self.sending.callback_queue.pop_front();
314                        }
315                        SCMessage::event(event) => match event {
316                            Event::simulation_paused { time, cause: _ } => {
317                                self.data
318                                    .on_simulation_status_update(CxxrtlSimulationStatus {
319                                        status: SimulationStatusType::paused,
320                                        latest_time: time,
321                                    });
322                            }
323                            Event::simulation_finished { time } => {
324                                self.data
325                                    .on_simulation_status_update(CxxrtlSimulationStatus {
326                                        status: SimulationStatusType::finished,
327                                        latest_time: time,
328                                    });
329                            }
330                        },
331                    }
332                }
333                Err(mpsc::error::TryRecvError::Empty) => {
334                    break;
335                }
336                Err(mpsc::error::TryRecvError::Disconnected) => {
337                    if !self.disconnected_reported {
338                        error!("CXXRTL sender disconnected");
339                        self.disconnected_reported = true;
340                    }
341                    break;
342                }
343            }
344        }
345    }
346
347    fn get_scopes(&mut self) -> Arc<HashMap<ScopeRef, CxxrtlScope>> {
348        self.data
349            .scopes_cache
350            .fetch_if_needed(|| {
351                self.sending.run_command(
352                    CxxrtlCommand::list_scopes { scope: None },
353                    |response, data| {
354                        expect_response!(CommandResponse::list_scopes { scopes }, response);
355
356                        let scopes = scopes
357                            .into_iter()
358                            .map(|(name, s)| {
359                                (
360                                    ScopeRef {
361                                        strs: name.split(' ').map(str::to_string).collect(),
362                                        id: ScopeId::None,
363                                    },
364                                    s,
365                                )
366                            })
367                            .collect();
368
369                        data.scopes_cache = CachedData::filled(scopes);
370                    },
371                );
372            })
373            .unwrap_or_else(|| Arc::new(HashMap::new()))
374    }
375
376    /// Fetches the details on a specific item. For now, this fetches *all* items, but looks
377    /// up the specific item before returning. This is done in order to not have to return
378    /// the whole Item list since we need to lock the data structure to get that.
379    fn fetch_item(&mut self, var: &VariableRef) -> Option<CxxrtlItem> {
380        self.data
381            .all_items_cache
382            .fetch_if_needed(|| {
383                self.sending.run_command(
384                    CxxrtlCommand::list_items { scope: None },
385                    |response, data| {
386                        expect_response!(CommandResponse::list_items { items }, response);
387
388                        let items = Self::item_list_to_hash_map(items);
389
390                        data.all_items_cache = CachedData::filled(items);
391                    },
392                );
393            })
394            .and_then(|d| d.get(var).cloned())
395    }
396
397    fn fetch_all_items(&mut self) -> Option<Arc<HashMap<VariableRef, CxxrtlItem>>> {
398        self.data
399            .all_items_cache
400            .fetch_if_needed(|| {
401                self.sending.run_command(
402                    CxxrtlCommand::list_items { scope: None },
403                    |response, data| {
404                        expect_response!(CommandResponse::list_items { items }, response);
405
406                        let items = Self::item_list_to_hash_map(items);
407
408                        data.all_items_cache = CachedData::filled(items);
409                    },
410                );
411            })
412            .clone()
413    }
414
415    fn fetch_items_in_module(&mut self, scope: &ScopeRef) -> Arc<HashMap<VariableRef, CxxrtlItem>> {
416        let result = self
417            .data
418            .module_item_cache
419            .entry(scope.clone())
420            .or_insert(CachedData::empty())
421            .fetch_if_needed(|| {
422                let scope = scope.clone();
423                self.sending.run_command(
424                    CxxrtlCommand::list_items {
425                        scope: Some(scope.cxxrtl_repr()),
426                    },
427                    move |response, data| {
428                        expect_response!(CommandResponse::list_items { items }, response);
429
430                        let items = Self::item_list_to_hash_map(items);
431
432                        data.module_item_cache
433                            .insert(scope.clone(), CachedData::filled(items));
434                    },
435                );
436            });
437
438        result.unwrap_or_default()
439    }
440
441    fn item_list_to_hash_map(
442        items: HashMap<String, CxxrtlItem>,
443    ) -> HashMap<VariableRef, CxxrtlItem> {
444        items
445            .into_iter()
446            .filter_map(|(k, v)| {
447                let sp = k.split(' ').collect::<Vec<_>>();
448
449                if sp.is_empty() {
450                    error!("Found an empty variable name and scope");
451                    None
452                } else {
453                    Some((
454                        VariableRef {
455                            path: ScopeRef::from_strs(
456                                &sp[0..sp.len() - 1]
457                                    .iter()
458                                    .map(ToString::to_string)
459                                    .collect::<Vec<_>>(),
460                            ),
461                            name: (*sp.last().unwrap()).to_string(),
462                            id: VarId::None,
463                            index: None,
464                        },
465                        v,
466                    ))
467                }
468            })
469            .collect()
470    }
471
472    fn scopes(&mut self) -> Option<Arc<HashMap<ScopeRef, CxxrtlScope>>> {
473        Some(self.get_scopes())
474    }
475
476    pub fn modules(&mut self) -> Vec<ScopeRef> {
477        if let Some(scopes) = &self.scopes() {
478            scopes.keys().cloned().collect()
479        } else {
480            vec![]
481        }
482    }
483
484    pub fn root_modules(&mut self) -> Vec<ScopeRef> {
485        // In the cxxrtl protocol, the root scope is always ""
486        if self.scopes().is_some() {
487            vec![ScopeRef {
488                strs: vec![],
489                id: ScopeId::None,
490            }]
491        } else {
492            vec![]
493        }
494    }
495
496    pub fn module_exists(&mut self, module: &ScopeRef) -> bool {
497        self.scopes().is_some_and(|s| s.contains_key(module))
498    }
499
500    pub fn child_scopes(&mut self, parent: &ScopeRef) -> Vec<ScopeRef> {
501        self.scopes()
502            .map(|scopes| {
503                scopes
504                    .keys()
505                    .filter_map(|scope| {
506                        if scope.strs().len() == parent.strs().len() + 1 {
507                            if scope.strs()[0..parent.strs().len()]
508                                == parent.strs()[0..parent.strs().len()]
509                            {
510                                Some(scope.clone())
511                            } else {
512                                None
513                            }
514                        } else {
515                            None
516                        }
517                    })
518                    .collect()
519            })
520            .unwrap_or_default()
521    }
522
523    pub fn variables_in_module(&mut self, module: &ScopeRef) -> Vec<VariableRef> {
524        self.fetch_items_in_module(module).keys().cloned().collect()
525    }
526
527    pub fn no_variables_in_module(&mut self, module: &ScopeRef) -> bool {
528        self.fetch_items_in_module(module).is_empty()
529    }
530
531    pub fn variable_meta(&mut self, variable: &VariableRef) -> Result<VariableMeta> {
532        Ok(self.fetch_item(variable).map_or_else(
533            || VariableMeta {
534                var: variable.clone(),
535                num_bits: None,
536                variable_type: None,
537                variable_type_name: None,
538                index: None,
539                direction: None,
540                enum_map: Default::default(),
541                encoding: VariableEncoding::BitVector,
542            },
543            |item| VariableMeta {
544                var: variable.clone(),
545                num_bits: Some(item.width),
546                variable_type: None,
547                variable_type_name: None,
548                index: None,
549                direction: None,
550                enum_map: Default::default(),
551                encoding: VariableEncoding::BitVector,
552            },
553        ))
554    }
555
556    #[must_use]
557    pub fn max_displayed_timestamp(&self) -> Option<CxxrtlTimestamp> {
558        self.data.query_result.get().map(|t| (*t).clone())
559    }
560
561    pub fn max_timestamp(&mut self) -> Option<CxxrtlTimestamp> {
562        self.raw_simulation_status().map(|s| s.latest_time)
563    }
564
565    pub fn query_variable(
566        &mut self,
567        variable: &VariableRef,
568        time: &BigUint,
569    ) -> Option<QueryResult> {
570        // Before we can query any signals, we need some other data available. If we don't have
571        // that we'll early return with no value
572        let max_timestamp = self.max_timestamp()?;
573        let info = self.fetch_all_items()?;
574        let loaded_signals = self.data.loaded_signals.clone();
575
576        let res = self
577            .data
578            .query_result
579            .fetch_if_needed(|| {
580                info!("Running query variable");
581
582                self.sending.run_command(
583                    CxxrtlCommand::query_interval {
584                        interval: (CxxrtlTimestamp::zero(), max_timestamp.clone()),
585                        collapse: true,
586                        items: Some(DEFAULT_REFERENCE.to_string()),
587                        item_values_encoding: "base64(u32)",
588                        diagnostics: false,
589                    },
590                    move |response, data| {
591                        expect_response!(CommandResponse::query_interval { samples }, response);
592
593                        data.query_result = CachedData::filled(max_timestamp);
594                        data.interval_query_cache.populate(
595                            loaded_signals.clone(),
596                            info,
597                            samples,
598                            data.msg_channel.clone(),
599                        );
600                    },
601                );
602            })
603            .map(|_cached| {
604                // If we get here, the cache is valid and we we should look into the
605                // interval_query_cache for the query result
606                self.data
607                    .interval_query_cache
608                    .query(variable, time.to_bigint().unwrap())
609            })
610            .unwrap_or_default();
611        Some(res)
612    }
613
614    pub fn load_variables<S: AsRef<VariableRef>, T: Iterator<Item = S>>(&mut self, variables: T) {
615        let data = &mut self.data;
616        for variable in variables {
617            let varref = variable.as_ref().clone();
618
619            if !data.signal_index_map.contains_key(&varref) {
620                let idx = data.loaded_signals.len();
621                data.signal_index_map.insert(varref.clone(), idx);
622                data.loaded_signals.push(varref.clone());
623            }
624        }
625
626        self.sending.run_command(
627            CxxrtlCommand::reference_items {
628                reference: DEFAULT_REFERENCE.to_string(),
629                items: data
630                    .loaded_signals
631                    .iter()
632                    .map(|s| vec![s.cxxrtl_repr()])
633                    .collect(),
634            },
635            |_response, data| {
636                info!("Item references updated");
637                data.invalidate_query_result();
638            },
639        );
640    }
641
642    fn raw_simulation_status(&mut self) -> Option<CxxrtlSimulationStatus> {
643        self.data
644            .simulation_status
645            .fetch_if_needed(|| {
646                self.sending
647                    .run_command(CxxrtlCommand::get_simulation_status, |response, data| {
648                        expect_response!(CommandResponse::get_simulation_status(status), response);
649
650                        data.on_simulation_status_update(status);
651                    });
652            })
653            .map(|s| s.as_ref().clone())
654    }
655
656    pub fn simulation_status(&mut self) -> Option<SimulationStatus> {
657        self.raw_simulation_status().map(|s| match s.status {
658            SimulationStatusType::running => SimulationStatus::Running,
659            SimulationStatusType::paused => SimulationStatus::Paused,
660            SimulationStatusType::finished => SimulationStatus::Finished,
661        })
662    }
663
664    pub fn unpause(&mut self) {
665        let duration = self.raw_simulation_status().map_or_else(
666            || CxxrtlTimestamp::from_femtoseconds(100_000_000u32.to_biguint().unwrap()),
667            |s| {
668                CxxrtlTimestamp::from_femtoseconds(
669                    s.latest_time.as_femtoseconds() + 100_000_000u32.to_biguint().unwrap(),
670                )
671            },
672        );
673
674        let cmd = CxxrtlCommand::run_simulation {
675            until_time: Some(duration),
676            until_diagnostics: vec![],
677            sample_item_values: true,
678        };
679
680        self.sending.run_command(cmd, |_, data| {
681            data.simulation_status = CachedData::filled(CxxrtlSimulationStatus {
682                status: SimulationStatusType::running,
683                latest_time: CxxrtlTimestamp::zero(),
684            });
685            info!("Unpausing simulation");
686        });
687    }
688
689    pub fn pause(&mut self) {
690        self.sending
691            .run_command(CxxrtlCommand::pause_simulation, |response, data| {
692                expect_response!(CommandResponse::pause_simulation { time }, response);
693
694                data.on_simulation_status_update(CxxrtlSimulationStatus {
695                    status: SimulationStatusType::paused,
696                    latest_time: time,
697                });
698            });
699    }
700}