lyquor/
lyquor.rs

1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use tokio::signal::unix::{SignalKind, signal};
10use tokio::sync::Notify;
11
12use lyquor_api::{
13    actor::Stop,
14    anyhow::{self, Context as _Context},
15    kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
16    subkey_builder,
17};
18use lyquor_crypto::{EvmSigner, LocalEvmSigner, ProdEvmSigner};
19use lyquor_db::{MemDB, RocksDB};
20use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
21use lyquor_lib::{api, lyquid, utils};
22use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, Network as OracleNetwork, RegisterEvent};
23use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
24use lyquor_vm::log::Log;
25
26subkey_builder!(RootSubKey(
27    ([0x01])-lyquid() => Key,
28    ([0x02])-fco() => Key,
29    ([0x03])-log() => Key,
30    ([0x04])-event_store() => Key,
31));
32
33fn subkey() -> &'static RootSubKey {
34    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
35    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
36}
37
38const LYQUOR_DB_ROOT: &str = "./lyquor_db";
39const ANVIL_STATE_FILENAME: &str = "anvil.state";
40
41static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
42
43#[derive(Debug, Message)]
44#[rtype(result = "()")]
45struct LoadLyquid {
46    id: LyquidID,
47    deps: Vec<LyquidID>,
48}
49
50#[derive(Debug, Message)]
51#[rtype(result = "()")]
52struct SetPool {
53    pool: Addr<lyquid::LyquidPool>,
54}
55
56struct LyquidDiscovery {
57    pool: Option<Addr<lyquid::LyquidPool>>,
58    register: Arc<Notify>,
59    bartender_id: LyquidID,
60    bartender_addr: Address,
61}
62
63lyquor_primitives::debug_struct_name!(LyquidDiscovery);
64
65impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
66    type Result = ResponseFuture<Address>;
67    #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
68    fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
69        let lyquor_seq::eth::GetContract { id, idx } = msg;
70        if id == self.bartender_id && idx == Some(0) {
71            return Box::pin(std::future::ready(self.bartender_addr));
72        }
73        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
74        let reg = self.register.clone();
75        Box::pin(async move {
76            loop {
77                // FIXME: improve the efficiency of this by filtering the reigster event for
78                // the specific lyquid in bartender
79                // idx=None means latest; else nth deployment
80                if let Some(n) = idx {
81                    match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
82                        Ok(Some(info)) => return info.contract,
83                        Ok(None) => reg.notified().await,
84                        Err(e) => {
85                            tracing::error!("Failed to get lyquid deployment info: {}", e);
86                            // Stall forever for safety
87                            std::future::pending::<()>().await;
88                        }
89                    }
90                } else {
91                    match pool.send(crate::lyquid::GetLatestLyquidInfo { id }).await {
92                        Ok(Some(info)) => return info.contract,
93                        Ok(None) => reg.notified().await,
94                        Err(e) => {
95                            tracing::error!("Failed to get latest lyquid info: {}", e);
96                            std::future::pending::<()>().await;
97                        }
98                    }
99                }
100            }
101        })
102    }
103}
104
105// latest variant handled in GetContract via idx=None
106
107impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
108    type Result = ();
109
110    #[tracing::instrument(level = "trace", skip(msg))]
111    fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
112        let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
113        if let Some(event) = event {
114            ctx.spawn(self.load(event.id, event.deps).into_actor(self));
115        }
116    }
117}
118
119impl Handler<LoadLyquid> for LyquidDiscovery {
120    type Result = ResponseFuture<()>;
121
122    #[tracing::instrument(level = "trace")]
123    fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
124        Box::pin(self.load(msg.id, msg.deps))
125    }
126}
127
128impl Handler<SetPool> for LyquidDiscovery {
129    type Result = ();
130
131    #[tracing::instrument(level = "trace")]
132    fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
133        if self.pool.is_some() {
134            return
135        }
136        let pool = msg.pool;
137        let me = ctx.address();
138        self.pool = Some(pool.clone());
139        ctx.spawn(
140            async move {
141                // TODO: right now we just subscribe to every lyquid known in history, need to change this
142                // to user-configured set (or groups)
143                let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
144                    None => {
145                        // FIXME: this could be triggered when bartender has only executed one slot
146                        // upon recovery. There is a race that before bartender repeat the
147                        // execution of the first slot, its state is without image so this call
148                        // will fail.
149                        tracing::error!("Failed to obtain the registered Lyquid list.");
150                        return
151                    }
152                    Some(lyquids_with_deps) => lyquids_with_deps,
153                };
154                // Now we have the actual dependency list from the bartender
155                for (id, deps) in lyquids_with_deps.into_iter() {
156                    if let Err(e) = me.send(LoadLyquid { id, deps }).await {
157                        tracing::error!("Failed to send LoadLyquid message: {}", e);
158                    }
159                }
160            }
161            .into_actor(self),
162        );
163    }
164}
165
166impl LyquidDiscovery {
167    fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
168        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
169        let register = self.register.clone();
170        async move {
171            match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
172                Ok(result) => match result {
173                    Ok(already_exists) => {
174                        if !already_exists {
175                            tracing::info!("Discovered {}", id);
176                        }
177                    }
178                    Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
179                },
180                Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
181            }
182            register.notify_waiters();
183        }
184    }
185}
186
187impl Handler<Stop> for LyquidDiscovery {
188    type Result = ();
189
190    #[tracing::instrument(level = "trace")]
191    fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
192        ctx.stop();
193    }
194}
195
196impl Actor for LyquidDiscovery {
197    type Context = actix::Context<Self>;
198
199    #[tracing::instrument(level = "trace", skip(_ctx))]
200    fn started(&mut self, _ctx: &mut Self::Context) {}
201
202    #[tracing::instrument(level = "trace", skip(_ctx))]
203    fn stopped(&mut self, _ctx: &mut Self::Context) {}
204}
205
206struct DBFlusher {
207    last_bn: u64,
208    store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
209    fco: Addr<FCO>,
210    anvil_chain: Option<ClientHandle>,
211    flush_interval_secs: u64,
212}
213
214lyquor_primitives::debug_struct_name!(DBFlusher);
215
216impl DBFlusher {
217    fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
218        let anvil_chain = self.anvil_chain.clone();
219        let fco = self.fco.clone();
220        async move {
221            fco.send(lyquor_seq::fco::Commit)
222                .await
223                .map_err(anyhow::Error::from)
224                .and_then(|e| e.map_err(anyhow::Error::from))?;
225
226            if let Some(client) = &anvil_chain {
227                // If running devnet (anvil), dump chain state.
228                //
229                // We dump the anvil chain state *after* the commit of FCO, so FCO's progress will
230                // not outpace the dumped chain state (i.e., includes execution caused by the
231                // blocks later). Of course, this somehow assume Anvil's API handling has an
232                // ordering (the dump chain state is more recent than that handled previous API
233                // responses).
234                use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
235                let dump = client
236                    .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
237                    .await
238                    .map(|r| r.0);
239
240                match dump {
241                    Ok(state) => {
242                        // Write the anvil state to file
243                        let devnet_path = format!("{}/devnet", LYQUOR_DB_ROOT);
244                        std::fs::create_dir_all(&devnet_path)?;
245                        std::fs::write(&format!("{}/{}", devnet_path, ANVIL_STATE_FILENAME), &state)?;
246                    }
247                    Err(_) => {
248                        tracing::error!(
249                            "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
250                        );
251                    }
252                }
253            }
254
255            Ok(())
256        }
257    }
258
259    fn flush(&mut self, res: anyhow::Result<()>) {
260        if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
261            tracing::error!("Failed to write: {e}");
262        } else {
263            tracing::info!("Written to the storage backend (block={}).", self.last_bn);
264        }
265    }
266}
267
268#[derive(Message, Debug)]
269#[rtype(result = "()")]
270struct FlushDB;
271
272impl Handler<FlushDB> for DBFlusher {
273    type Result = ResponseActFuture<Self, ()>;
274
275    #[tracing::instrument(level = "trace", skip(self, _ctx))]
276    fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
277        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
278            actor.flush(res);
279        }))
280    }
281}
282
283impl Handler<Stop> for DBFlusher {
284    type Result = ResponseActFuture<Self, ()>;
285
286    #[tracing::instrument(level = "trace")]
287    fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
288        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
289            actor.flush(res);
290        }))
291    }
292}
293
294impl Actor for DBFlusher {
295    type Context = actix::Context<Self>;
296
297    #[tracing::instrument(level = "trace", skip_all)]
298    fn started(&mut self, ctx: &mut Self::Context) {
299        if self.flush_interval_secs > 0 {
300            let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
301            ctx.run_interval(periodic_write, |_act, ctx| {
302                ctx.address().do_send(FlushDB);
303            });
304        }
305    }
306
307    #[tracing::instrument(level = "trace", skip_all)]
308    fn stopped(&mut self, _ctx: &mut Self::Context) {}
309}
310
311async fn setup_eth_backend(
312    profile: &utils::LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
313) -> anyhow::Result<eth::Backend> {
314    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
315        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
316
317    Ok(eth::Backend::new(
318        eth::Config::builder()
319            .finality_tag(finality_tag)
320            .registry(reg)
321            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
322            .build(),
323        jsonrpc_client,
324    ))
325}
326
327#[derive(Default)]
328struct Subsystems {
329    fco: Option<Addr<FCO>>,
330    log: Option<Addr<Log>>,
331    pool: Option<Addr<lyquid::LyquidPool>>,
332    api: Option<Addr<api::EthAPIServer>>,
333    db: Option<Addr<DBFlusher>>,
334    dis: Option<Addr<LyquidDiscovery>>,
335    client: Option<ClientHandle>,
336    eth_backend: Option<lyquor_seq::eth::Backend>,
337}
338debug_struct_name!(Subsystems);
339
340impl Subsystems {
341    #[tracing::instrument(level = "trace")]
342    async fn stop(&mut self) {
343        if let Some(api) = self.api.take() {
344            api.send(Stop).await.ok();
345        }
346        if let Some(eth_backend) = self.eth_backend.take() {
347            eth_backend.stop().await;
348        }
349        if let Some(log) = self.log.take() {
350            log.send(Stop).await.ok();
351        }
352        if let Some(dis) = self.dis.take() {
353            dis.send(Stop).await.ok();
354        }
355        if let Some(pool) = self.pool.take() {
356            pool.send(Stop).await.ok();
357        }
358        if let Some(db) = self.db.take() {
359            db.send(Stop).await.ok();
360        }
361        if let Some(dis) = self.dis.take() {
362            dis.send(Stop).await.ok();
363        }
364        if let Some(fco) = self.fco.take() {
365            fco.send(Stop).await.ok();
366        }
367    }
368}
369
370struct NodeConfig {
371    mem_db: bool,
372    seq_eth_url: String,
373    api_url: String,
374    profile: utils::LyquorProfile,
375    started_anvil: bool,
376    flush_interval_secs: u64,
377}
378
379/// TODO: refactor this with actix's Actor model.
380struct Node {
381    sys: Subsystems,
382    config: Arc<NodeConfig>,
383    network: Option<lyquor_net::hub::Hub>,
384}
385
386lyquor_primitives::debug_struct_name!(Node);
387
388impl Handler<Stop> for Node {
389    type Result = AtomicResponse<Self, ()>;
390
391    #[tracing::instrument(level = "trace")]
392    fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
393        let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
394        AtomicResponse::new(Box::pin(
395            async move {
396                sys.stop().await;
397            }
398            .into_actor(self)
399            .map(|_, _act, ctx| {
400                ctx.stop();
401            }),
402        ))
403    }
404}
405
406impl Actor for Node {
407    type Context = Context<Self>;
408
409    #[tracing::instrument(level = "trace", skip(ctx))]
410    fn started(&mut self, ctx: &mut Self::Context) {
411        // NOTE: use spawn here instead of wait so Stop can be handled
412        ctx.spawn(
413            Self::init(
414                self.config.clone(),
415                self.network.take().expect("Node: Network not found."),
416            )
417            .into_actor(self)
418            .map(|sys, act, ctx| {
419                match sys {
420                    Ok(sys) => {
421                        act.sys = sys;
422                    }
423                    Err(e) => {
424                        tracing::error!("Failed to initialize node: {e:?}");
425                        ctx.stop();
426                    }
427                };
428            }),
429        );
430    }
431
432    #[tracing::instrument(level = "trace", skip(_ctx))]
433    fn stopped(&mut self, _ctx: &mut Self::Context) {}
434}
435
436impl Node {
437    #[tracing::instrument(level = "trace", skip_all)]
438    async fn init(config: Arc<NodeConfig>, network: lyquor_net::hub::Hub) -> anyhow::Result<Subsystems> {
439        let mut sys = Subsystems::default();
440
441        //// 1. DB & Storage
442        let store: Arc<ShadowKVStore<Box<dyn KVStore>>>;
443        let lyq_store: Arc<dyn utils::LVMStoreFactory> = if config.mem_db {
444            tracing::warn!("using MemDB, all data will be lost upon exit");
445            store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new())));
446            Arc::new(utils::LVMMemStore)
447        } else {
448            store = Arc::new(ShadowKVStore::new(Box::new(RocksDB::new(
449                &Path::new(LYQUOR_DB_ROOT).join(config.profile.id()),
450            )?)));
451            Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
452                store.clone(),
453                subkey().lyquid(),
454            ))))
455        };
456        ////
457
458        //// 2. Fate-Constrainted Ordering Subsystem.
459        // Start discovering Lyquid and sequencing contracts.
460        let dis = LyquidDiscovery {
461            pool: None,
462            register: Arc::new(Notify::new()),
463            bartender_id: *config.profile.bartender_id(),
464            bartender_addr: Address::from_str(config.profile.bartender_addr())
465                .context("Invalid bartender address in profile.")?
466                .into(),
467        }
468        .start();
469        sys.dis = Some(dis.clone());
470
471        // Set up Ethereum sequence backend.
472        let jsonrpc_client = ClientConfig::builder()
473            .url(config.seq_eth_url.clone().parse()?)
474            .build()
475            .into_client();
476        sys.client = Some(jsonrpc_client.clone());
477
478        // Load anvil state if we started our own anvil and state file exists
479        let anvil_chain = if config.started_anvil && !config.mem_db {
480            let state_file = format!("{}/{}/{}", LYQUOR_DB_ROOT, config.profile.id(), ANVIL_STATE_FILENAME);
481            if std::path::Path::new(&state_file).exists() {
482                use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
483                let state_data = std::fs::read(&state_file)?;
484                let load_result = jsonrpc_client
485                    .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
486                    .await?;
487                tracing::info!("Loaded anvil state from {}: {}.", state_file, load_result.0);
488            } else {
489                tracing::info!(
490                    "No existing anvil state file found at {}, starting with fresh state.",
491                    state_file
492                );
493            }
494            Some(jsonrpc_client.clone())
495        } else {
496            None
497        };
498
499        let eth_backend = setup_eth_backend(&config.profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
500
501        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
502        let fco = FCO::new(
503            eth_backend.clone(),
504            Arc::new(fco_store),
505            Some(config.profile.init_chain_position()),
506        )?
507        .start();
508        sys.fco = Some(fco.clone());
509        // Keep a separate reference for shutdown management
510        sys.eth_backend = Some(eth_backend);
511        ////
512
513        //// 3. Lyquor Virtual Machine Subsystem.
514        // Configure Log subsystem.
515        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
516        let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
517        sys.log = Some(log.clone());
518
519        // Select oracle ECDSA signer profile based on the configured network.
520        let oracle_network = match config.profile.id() {
521            "devnet" => OracleNetwork::Devnet,
522            "testnet" => OracleNetwork::Testnet,
523            "mainnet" => OracleNetwork::Mainnet,
524            _ => OracleNetwork::Devnet,
525        };
526
527        let evm_signer: Arc<dyn EvmSigner> = match oracle_network {
528            OracleNetwork::Devnet => Arc::new(LocalEvmSigner::new(std::iter::once(network.get_id()))),
529            OracleNetwork::Testnet | OracleNetwork::Mainnet => Arc::new(ProdEvmSigner),
530        };
531
532        let lvm = lyquid::LVM::new(
533            log.clone(),
534            network,
535            10,
536            Some(lyquor_vm::InterOps {
537                inter: fco.clone().recipient(),
538                submit: fco.clone().recipient(),
539            }),
540            evm_signer,
541        );
542        let api_subs = api::Subscriptions::new().start();
543        let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
544            Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
545            1000,
546        ));
547        ////
548
549        //// 4. Pool of all Lyquids
550        let image_repo_store = RocksDB::new(Path::new("./lyquid_image_db"))?;
551        let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
552        let bartender_id = *config.profile.bartender_id();
553        let pool = lyquid::LyquidPool::new(
554            bartender_id,
555            lvm,
556            lyquid::LyquidPoolSetup {
557                fco: fco.clone(),
558                store_factory: lyq_store,
559                image_repo,
560                event_store,
561                api_subs: api_subs.clone(),
562                config: lyquid::LyquidConfig::builder().build(),
563            },
564        )
565        .await?
566        .start();
567        sys.pool = Some(pool.clone());
568        ////
569
570        // Configure bartender.
571        let bartender_setup = {
572            let register_topic = LyteLog::tagged_value_topic("Register");
573            match log
574                .send(lyquor_vm::log::GetNumberOfRecords {
575                    id: bartender_id,
576                    topic: register_topic,
577                })
578                .await
579            {
580                Ok(count) if count > 0 => None,
581                Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
582                    id: bartender_id,
583                    topic: register_topic,
584                })),
585                Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
586            }
587        };
588
589        // Start the API server - use the same jsonrpc client as sequencer
590        let api = lyquor_lib::api::EthAPIServer::new(
591            &config.api_url,
592            jsonrpc_client.clone(),
593            api_subs,
594            fco.clone(),
595            pool.clone(),
596        )
597        .await?;
598        sys.api = Some(api.start());
599
600        // Wait for bartender to be registered.
601        if let Some(reg) = bartender_setup {
602            tracing::info!("Waiting for bartender to be registered.");
603            reg.await.context("Failed to wait for bartender registration.")?;
604        }
605        // Bartender, as a normal Lyquid, may have some re-execution, let's make sure it is done.
606        let bartender_ins = pool
607            .send(lyquid::GetBartender)
608            .await
609            .expect("Failed to obtain bartender.");
610        let bartender_ln = bartender_ins
611            .instance()
612            .read()
613            .max_number()
614            .unwrap_or(LyquidNumber::ZERO);
615        tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
616        bartender_ins
617            .wait_until(bartender_ln)
618            .await
619            .context("Failed to wait for the initial re-execution of bartender.")?;
620
621        // Finish up LyquidDiscovery.
622        log.send(lyquor_vm::log::Subscribe {
623            id: bartender_id,
624            topic: LyteLog::tagged_value_topic("Register"),
625            subscriber: dis.clone().recipient(),
626        })
627        .await
628        .context("Failed to subscribe to log system for LyquidDiscovery.")?;
629        dis.send(SetPool { pool: pool.clone() })
630            .await
631            .context("Failed to set pool in LyquidDiscovery.")?;
632
633        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
634
635        // Start a DB writer that persist when node is idle.
636        let db = DBFlusher {
637            last_bn: 0,
638            store,
639            fco,
640            anvil_chain,
641            flush_interval_secs: config.flush_interval_secs,
642        }
643        .start();
644        sys.db = Some(db);
645
646        Ok(sys)
647    }
648}
649
650fn generate_unused_port() -> anyhow::Result<u16> {
651    let listener = std::net::TcpListener::bind("127.0.0.1:0").context("Failed to bind to local address.")?;
652    let port = listener.local_addr().context("Failed to get local address.")?.port();
653    Ok(port)
654}
655
656fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
657    let anvil_port = generate_unused_port()?;
658    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
659    // terminate when dropped)
660    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
661    let mut sigset = SigSet::empty();
662    sigset.add(Signal::SIGINT);
663    let mut old_mask = SigSet::empty();
664    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
665        .context("Failed to mask SIGINT for anvil.")?;
666
667    let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
668
669    let stdout_reader = std::io::BufReader::new(
670        anvil
671            .child_mut()
672            .stdout
673            .take()
674            .context("Failed to read from anvil stdout.")?,
675    );
676
677    tokio::task::spawn_blocking(move || {
678        use std::io::BufRead;
679        let mut stdout_lines = stdout_reader.lines();
680        while let Some(line) = stdout_lines.next() {
681            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
682        }
683    });
684
685    tracing::info!("Anvil started at port {}.", anvil_port);
686    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
687        .context("Failed to restore old signal mask for anvil.")?;
688
689    Ok(anvil)
690}
691
692#[actix::main]
693async fn main() -> anyhow::Result<()> {
694    // Install panic hook to cleanup anvil on panic
695    std::panic::set_hook(Box::new(|_| {
696        if let Some(&pid) = ANVIL_PID.get() {
697            tracing::warn!("Panic detected, killing anvil process {}", pid);
698            use nix::sys::signal::{Signal, kill};
699            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
700        }
701    }));
702
703    lyquor_cli::setup_tracing()?;
704
705    println!("{}", lyquor_cli::format_logo_banner());
706
707    let matches = clap::command!()
708        .propagate_version(true)
709        .arg(
710            clap::arg!(--"mem-db" "Use in-memory DB simulation (for debugging or some benchmarks).")
711                .required(false)
712                .action(clap::ArgAction::SetTrue),
713        )
714        .arg(
715            clap::arg!(--"api-eth" <IP_AND_PORT> "Serving address for Ethereum API.")
716                .required(false)
717                .default_value("localhost:10087"),
718        )
719        .arg(
720            clap::arg!(--"upc-addr" <IP_AND_PORT> "Serving address for UPC.")
721                .required(false)
722                .default_value("localhost:10080"),
723        )
724        .arg(
725            clap::arg!(--network <NETWORK> "Select the network to run. (Available: devnet/testnet/mainnet)")
726                .required(false)
727                .default_value("devnet"),
728        )
729        .arg(
730            clap::arg!(--"force-seq-eth" <WS_URL> "Override the WebSocket url for Ethereum API Sequencer.")
731                .required(false),
732        )
733        .arg(
734            clap::arg!(--"flush-interval" <SECONDS> "Interval in seconds for database flushing (0 to disable).")
735                .required(false)
736                .value_parser(clap::value_parser!(u64)),
737        )
738        .get_matches();
739
740    // TODO: configure these parameters properly
741    let (_, tls_config) = lyquor_tls::generator::single_node_config();
742    let addr = matches.get_one::<String>("upc-addr").unwrap();
743    let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
744        .listen_addr(addr.clone())
745        .build()
746        .context("Failed to setup network.")?;
747    network.start().await.context("Failed to start network.")?;
748
749    let mut _anvil: Option<AnvilInstance> = None;
750    let network_name = matches.get_one::<String>("network").map(|s| s.as_str());
751    let profile: utils::LyquorProfile = match network_name {
752        Some("devnet") => {
753            let ws_url = match matches.get_one::<String>("force-seq-eth") {
754                None => {
755                    let anvil = start_devnet_anvil()?;
756                    // Store anvil PID for panic hook
757                    ANVIL_PID.set(anvil.child().id()).ok();
758                    let ep = anvil.ws_endpoint();
759                    _anvil = Some(anvil);
760                    ep
761                }
762                Some(url) => url.to_string(),
763            };
764
765            utils::Devnet::new(ws_url).into()
766        }
767        _ => return Err(anyhow::anyhow!("Network is not supported.")),
768    };
769    let seq_eth_url = matches
770        .get_one::<String>("force-seq-eth")
771        .cloned()
772        .unwrap_or_else(|| profile.sequencer().to_string());
773    let api_url = matches.get_one::<String>("api-eth").cloned().unwrap();
774    let mem_db = matches.get_flag("mem-db");
775    let flush_interval_secs = matches
776        .get_one::<u64>("flush-interval")
777        .copied()
778        .unwrap_or_else(|| match network_name {
779            Some("devnet") => 0,
780            _ => 10,
781        });
782
783    // Signal handlers should be registered before starting any actors, so they'll be stopped
784    // gracefully.
785    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
786    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
787
788    let node = Node {
789        sys: Subsystems::default(),
790        config: Arc::new(NodeConfig {
791            mem_db,
792            seq_eth_url,
793            api_url,
794            profile,
795            started_anvil: _anvil.is_some(),
796            flush_interval_secs,
797        }),
798        network: Some(network),
799    }
800    .start();
801
802    tokio::select! {
803        _ = sigint.recv() => {
804            tracing::info!("received SIGINT");
805        }
806        _ = sigterm.recv() => {
807            tracing::info!("received SIGTERM");
808        }
809    }
810
811    Ok(node.send(Stop).await?)
812}