lyquor/
lyquor.rs

1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::debug_struct_name;
9use tokio::signal::unix::{SignalKind, signal};
10use tokio::sync::Notify;
11
12use lyquor_api::{
13    actor::Stop,
14    anyhow::{self, Context as _Context},
15    kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
16    subkey_builder,
17};
18use lyquor_db::{MemDB, RocksDB};
19use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
20use lyquor_lib::{api, lyquid, utils};
21use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
22use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
23use lyquor_vm::log::Log;
24
25subkey_builder!(RootSubKey(
26    ([0x01])-lyquid() => Key,
27    ([0x02])-fco() => Key,
28    ([0x03])-log() => Key,
29    ([0x04])-event_store() => Key,
30));
31
32fn subkey() -> &'static RootSubKey {
33    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
34    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
35}
36
37const LYQUOR_DB_ROOT: &str = "./lyquor_db";
38const ANVIL_STATE_FILENAME: &str = "anvil.state";
39
40static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
41
42#[derive(Debug, Message)]
43#[rtype(result = "()")]
44struct LoadLyquid {
45    id: LyquidID,
46    deps: Vec<LyquidID>,
47}
48
49#[derive(Debug, Message)]
50#[rtype(result = "()")]
51struct SetPool {
52    pool: Addr<lyquid::LyquidPool>,
53}
54
55struct LyquidDiscovery {
56    pool: Option<Addr<lyquid::LyquidPool>>,
57    register: Arc<Notify>,
58    bartender_id: LyquidID,
59    bartender_addr: Address,
60}
61
62lyquor_primitives::debug_struct_name!(LyquidDiscovery);
63
64impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
65    type Result = ResponseFuture<Address>;
66    #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
67    fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
68        let lyquor_seq::eth::GetContract { id, idx } = msg;
69        if id == self.bartender_id && idx == 0 {
70            return Box::pin(std::future::ready(self.bartender_addr));
71        }
72        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
73        let reg = self.register.clone();
74        Box::pin(async move {
75            loop {
76                // FIXME: improve the efficiency of this by filtering the reigster event for
77                // the specific lyquid in bartender
78                match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx }).await {
79                    Ok(response) => match response {
80                        Some(info) => return info.contract,
81                        None => reg.notified().await,
82                    },
83                    Err(e) => {
84                        tracing::error!("Failed to get lyquid deployment info: {}", e);
85                        // Stall forever for safety
86                        std::future::pending::<()>().await;
87                    }
88                }
89            }
90        })
91    }
92}
93
94impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
95    type Result = ();
96
97    #[tracing::instrument(level = "trace", skip(msg))]
98    fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
99        let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
100        if let Some(event) = event {
101            ctx.spawn(self.load(event.id, event.deps).into_actor(self));
102        }
103    }
104}
105
106impl Handler<LoadLyquid> for LyquidDiscovery {
107    type Result = ResponseFuture<()>;
108
109    #[tracing::instrument(level = "trace")]
110    fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
111        Box::pin(self.load(msg.id, msg.deps))
112    }
113}
114
115impl Handler<SetPool> for LyquidDiscovery {
116    type Result = ();
117
118    #[tracing::instrument(level = "trace")]
119    fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
120        if self.pool.is_some() {
121            return
122        }
123        let pool = msg.pool;
124        let me = ctx.address();
125        self.pool = Some(pool.clone());
126        ctx.spawn(
127            async move {
128                // TODO: right now we just subscribe to every lyquid known in history, need to change this
129                // to user-configured set (or groups)
130                let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
131                    None => {
132                        // FIXME: this could be triggered when bartender has only executed one slot
133                        // upon recovery. There is a race that before bartender repeat the
134                        // execution of the first slot, its state is without image so this call
135                        // will fail.
136                        tracing::error!("Failed to obtain the registered Lyquid list.");
137                        return
138                    }
139                    Some(lyquids_with_deps) => lyquids_with_deps,
140                };
141                // Now we have the actual dependency list from the bartender
142                for (id, deps) in lyquids_with_deps.into_iter() {
143                    if let Err(e) = me.send(LoadLyquid { id, deps }).await {
144                        tracing::error!("Failed to send LoadLyquid message: {}", e);
145                    }
146                }
147            }
148            .into_actor(self),
149        );
150    }
151}
152
153impl LyquidDiscovery {
154    fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
155        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
156        let register = self.register.clone();
157        async move {
158            match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
159                Ok(result) => match result {
160                    Ok(already_exists) => {
161                        if !already_exists {
162                            tracing::info!("Discovered {}", id);
163                        }
164                    }
165                    Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
166                },
167                Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
168            }
169            register.notify_waiters();
170        }
171    }
172}
173
174impl Handler<Stop> for LyquidDiscovery {
175    type Result = ();
176
177    #[tracing::instrument(level = "trace")]
178    fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
179        ctx.stop();
180    }
181}
182
183impl Actor for LyquidDiscovery {
184    type Context = actix::Context<Self>;
185
186    #[tracing::instrument(level = "trace", skip(_ctx))]
187    fn started(&mut self, _ctx: &mut Self::Context) {}
188
189    #[tracing::instrument(level = "trace", skip(_ctx))]
190    fn stopped(&mut self, _ctx: &mut Self::Context) {}
191}
192
193struct DBFlusher {
194    last_bn: u64,
195    store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
196    fco: Addr<FCO>,
197    anvil_chain: Option<ClientHandle>,
198    flush_interval_secs: u64,
199}
200
201lyquor_primitives::debug_struct_name!(DBFlusher);
202
203impl DBFlusher {
204    fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
205        let anvil_chain = self.anvil_chain.clone();
206        let fco = self.fco.clone();
207        async move {
208            fco.send(lyquor_seq::fco::Commit)
209                .await
210                .map_err(anyhow::Error::from)
211                .and_then(|e| e.map_err(anyhow::Error::from))?;
212
213            if let Some(client) = &anvil_chain {
214                // If running devnet (anvil), dump chain state.
215                //
216                // We dump the anvil chain state *after* the commit of FCO, so FCO's progress will
217                // not outpace the dumped chain state (i.e., includes execution caused by the
218                // blocks later). Of course, this somehow assume Anvil's API handling has an
219                // ordering (the dump chain state is more recent than that handled previous API
220                // responses).
221                use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
222                let dump = client
223                    .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
224                    .await
225                    .map(|r| r.0);
226
227                match dump {
228                    Ok(state) => {
229                        // Write the anvil state to file
230                        let devnet_path = format!("{}/devnet", LYQUOR_DB_ROOT);
231                        std::fs::create_dir_all(&devnet_path)?;
232                        std::fs::write(&format!("{}/{}", devnet_path, ANVIL_STATE_FILENAME), &state)?;
233                    }
234                    Err(_) => {
235                        tracing::error!(
236                            "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
237                        );
238                    }
239                }
240            }
241
242            Ok(())
243        }
244    }
245
246    fn flush(&mut self, res: anyhow::Result<()>) {
247        if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
248            tracing::error!("Failed to write: {e}");
249        } else {
250            tracing::info!("Written to the storage backend (block={}).", self.last_bn);
251        }
252    }
253}
254
255#[derive(Message, Debug)]
256#[rtype(result = "()")]
257struct FlushDB;
258
259impl Handler<FlushDB> for DBFlusher {
260    type Result = ResponseActFuture<Self, ()>;
261
262    #[tracing::instrument(level = "trace", skip(self, _ctx))]
263    fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
264        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
265            actor.flush(res);
266        }))
267    }
268}
269
270impl Handler<Stop> for DBFlusher {
271    type Result = ResponseActFuture<Self, ()>;
272
273    #[tracing::instrument(level = "trace")]
274    fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
275        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
276            actor.flush(res);
277        }))
278    }
279}
280
281impl Actor for DBFlusher {
282    type Context = actix::Context<Self>;
283
284    #[tracing::instrument(level = "trace", skip_all)]
285    fn started(&mut self, ctx: &mut Self::Context) {
286        if self.flush_interval_secs > 0 {
287            let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
288            ctx.run_interval(periodic_write, |_act, ctx| {
289                ctx.address().do_send(FlushDB);
290            });
291        }
292    }
293
294    #[tracing::instrument(level = "trace", skip_all)]
295    fn stopped(&mut self, _ctx: &mut Self::Context) {}
296}
297
298async fn setup_eth_backend(
299    profile: &utils::LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
300) -> anyhow::Result<eth::Backend> {
301    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
302        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
303
304    Ok(eth::Backend::new(
305        eth::Config::builder()
306            .finality_tag(finality_tag)
307            .registry(reg)
308            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
309            .build(),
310        jsonrpc_client,
311    ))
312}
313
314#[derive(Default)]
315struct Subsystems {
316    fco: Option<Addr<FCO>>,
317    log: Option<Addr<Log>>,
318    pool: Option<Addr<lyquid::LyquidPool>>,
319    api: Option<Addr<api::EthAPIServer>>,
320    db: Option<Addr<DBFlusher>>,
321    dis: Option<Addr<LyquidDiscovery>>,
322    client: Option<ClientHandle>,
323    eth_backend: Option<lyquor_seq::eth::Backend>,
324}
325debug_struct_name!(Subsystems);
326
327impl Subsystems {
328    #[tracing::instrument(level = "trace")]
329    async fn stop(&mut self) {
330        if let Some(api) = self.api.take() {
331            api.send(Stop).await.ok();
332        }
333        if let Some(eth_backend) = self.eth_backend.take() {
334            eth_backend.stop().await;
335        }
336        if let Some(log) = self.log.take() {
337            log.send(Stop).await.ok();
338        }
339        if let Some(dis) = self.dis.take() {
340            dis.send(Stop).await.ok();
341        }
342        if let Some(pool) = self.pool.take() {
343            pool.send(Stop).await.ok();
344        }
345        if let Some(db) = self.db.take() {
346            db.send(Stop).await.ok();
347        }
348        if let Some(dis) = self.dis.take() {
349            dis.send(Stop).await.ok();
350        }
351        if let Some(fco) = self.fco.take() {
352            fco.send(Stop).await.ok();
353        }
354    }
355}
356
357struct NodeConfig {
358    mem_db: bool,
359    seq_eth_url: String,
360    api_url: String,
361    profile: utils::LyquorProfile,
362    started_anvil: bool,
363    flush_interval_secs: u64,
364}
365
366/// TODO: refactor this with actix's Actor model.
367struct Node {
368    sys: Subsystems,
369    config: Arc<NodeConfig>,
370    network: Option<lyquor_net::hub::Hub>,
371}
372
373lyquor_primitives::debug_struct_name!(Node);
374
375impl Handler<Stop> for Node {
376    type Result = AtomicResponse<Self, ()>;
377
378    #[tracing::instrument(level = "trace")]
379    fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
380        let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
381        AtomicResponse::new(Box::pin(
382            async move {
383                sys.stop().await;
384            }
385            .into_actor(self)
386            .map(|_, _act, ctx| {
387                ctx.stop();
388            }),
389        ))
390    }
391}
392
393impl Actor for Node {
394    type Context = Context<Self>;
395
396    #[tracing::instrument(level = "trace", skip(ctx))]
397    fn started(&mut self, ctx: &mut Self::Context) {
398        // NOTE: use spawn here instead of wait so Stop can be handled
399        ctx.spawn(
400            Self::init(
401                self.config.clone(),
402                self.network.take().expect("Node: Network not found."),
403            )
404            .into_actor(self)
405            .map(|sys, act, ctx| {
406                match sys {
407                    Ok(sys) => {
408                        act.sys = sys;
409                    }
410                    Err(e) => {
411                        tracing::error!("Failed to initialize node: {e:?}");
412                        ctx.stop();
413                    }
414                };
415            }),
416        );
417    }
418
419    #[tracing::instrument(level = "trace", skip(_ctx))]
420    fn stopped(&mut self, _ctx: &mut Self::Context) {}
421}
422
423impl Node {
424    #[tracing::instrument(level = "trace", skip_all)]
425    async fn init(config: Arc<NodeConfig>, network: lyquor_net::hub::Hub) -> anyhow::Result<Subsystems> {
426        let mut sys = Subsystems::default();
427
428        //// 1. DB & Storage
429        let store: Arc<ShadowKVStore<Box<dyn KVStore>>>;
430        let lyq_store: Arc<dyn utils::LVMStoreFactory> = if config.mem_db {
431            tracing::warn!("using MemDB, all data will be lost upon exit");
432            store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new())));
433            Arc::new(utils::LVMMemStore)
434        } else {
435            store = Arc::new(ShadowKVStore::new(Box::new(RocksDB::new(
436                &Path::new(LYQUOR_DB_ROOT).join(config.profile.id()),
437            )?)));
438            Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
439                store.clone(),
440                subkey().lyquid(),
441            ))))
442        };
443        ////
444
445        //// 2. Fate-Constrainted Ordering Subsystem.
446        // Start discovering Lyquid and sequencing contracts.
447        let dis = LyquidDiscovery {
448            pool: None,
449            register: Arc::new(Notify::new()),
450            bartender_id: *config.profile.bartender_id(),
451            bartender_addr: Address::from_str(config.profile.bartender_addr())
452                .context("Invalid bartender address in profile.")?
453                .into(),
454        }
455        .start();
456        sys.dis = Some(dis.clone());
457
458        // Set up Ethereum sequence backend.
459        let jsonrpc_client = ClientConfig::builder()
460            .url(config.seq_eth_url.clone().parse()?)
461            .build()
462            .into_client();
463        sys.client = Some(jsonrpc_client.clone());
464
465        // Load anvil state if we started our own anvil and state file exists
466        let anvil_chain = if config.started_anvil && !config.mem_db {
467            let state_file = format!("{}/{}/{}", LYQUOR_DB_ROOT, config.profile.id(), ANVIL_STATE_FILENAME);
468            if std::path::Path::new(&state_file).exists() {
469                use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
470                let state_data = std::fs::read(&state_file)?;
471                let load_result = jsonrpc_client
472                    .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
473                    .await?;
474                tracing::info!("Loaded anvil state from {}: {}.", state_file, load_result.0);
475            } else {
476                tracing::info!(
477                    "No existing anvil state file found at {}, starting with fresh state.",
478                    state_file
479                );
480            }
481            Some(jsonrpc_client.clone())
482        } else {
483            None
484        };
485
486        let eth_backend = setup_eth_backend(&config.profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
487
488        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
489        let fco = FCO::new(
490            eth_backend.clone(),
491            Arc::new(fco_store),
492            Some(config.profile.init_chain_position()),
493        )?
494        .start();
495        sys.fco = Some(fco.clone());
496        // Keep a separate reference for shutdown management
497        sys.eth_backend = Some(eth_backend);
498        ////
499
500        //// 3. Lyquor Virtual Machine Subsystem.
501        // Configure Log subsystem.
502        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
503        let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
504        sys.log = Some(log.clone());
505
506        // Create the top-level LVM object
507        let lvm = lyquid::LVM::new(log.clone(), network, 10, Some(fco.clone().recipient()));
508        let api_subs = api::Subscriptions::new().start();
509        let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
510            Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
511            1000,
512        ));
513        ////
514
515        //// 4. Pool of all Lyquids
516        let image_repo_store = RocksDB::new(Path::new("./lyquid_image_db"))?;
517        let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
518        let bartender_id = *config.profile.bartender_id();
519        let pool = lyquid::LyquidPool::new(
520            bartender_id,
521            lvm,
522            lyquid::LyquidPoolSetup {
523                fco: fco.clone(),
524                store_factory: lyq_store,
525                image_repo,
526                event_store,
527                api_subs: api_subs.clone(),
528                config: lyquid::LyquidConfig::builder().build(),
529            },
530        )
531        .await?
532        .start();
533        sys.pool = Some(pool.clone());
534        ////
535
536        // Configure bartender.
537        let bartender_setup = {
538            let register_topic = LyteLog::tagged_value_topic("Register");
539            match log
540                .send(lyquor_vm::log::GetNumberOfRecords {
541                    id: bartender_id,
542                    topic: register_topic,
543                })
544                .await
545            {
546                Ok(count) if count > 0 => None,
547                Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
548                    id: bartender_id,
549                    topic: register_topic,
550                })),
551                Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
552            }
553        };
554
555        // Start the API server - use the same jsonrpc client as sequencer
556        let api = lyquor_lib::api::EthAPIServer::new(
557            &config.api_url,
558            jsonrpc_client.clone(),
559            api_subs,
560            fco.clone(),
561            pool.clone(),
562        )
563        .await?;
564        sys.api = Some(api.start());
565
566        // Wait for bartender to be registered.
567        if let Some(reg) = bartender_setup {
568            tracing::info!("Waiting for bartender to be registered.");
569            reg.await.context("Failed to wait for bartender registration.")?;
570        }
571
572        // Finish up LyquidDiscovery.
573        log.send(lyquor_vm::log::Subscribe {
574            id: bartender_id,
575            topic: LyteLog::tagged_value_topic("Register"),
576            subscriber: dis.clone().recipient(),
577        })
578        .await
579        .context("Failed to subscribe to log system for LyquidDiscovery.")?;
580        dis.send(SetPool { pool: pool.clone() })
581            .await
582            .context("Failed to set pool in LyquidDiscovery.")?;
583
584        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
585
586        // Start a DB writer that persist when node is idle.
587        let db = DBFlusher {
588            last_bn: 0,
589            store,
590            fco,
591            anvil_chain,
592            flush_interval_secs: config.flush_interval_secs,
593        }
594        .start();
595        sys.db = Some(db);
596
597        Ok(sys)
598    }
599}
600
601fn generate_unused_port() -> anyhow::Result<u16> {
602    let listener = std::net::TcpListener::bind("127.0.0.1:0").context("Failed to bind to local address.")?;
603    let port = listener.local_addr().context("Failed to get local address.")?.port();
604    Ok(port)
605}
606
607fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
608    let anvil_port = generate_unused_port()?;
609    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
610    // terminate when dropped)
611    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
612    let mut sigset = SigSet::empty();
613    sigset.add(Signal::SIGINT);
614    let mut old_mask = SigSet::empty();
615    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
616        .context("Failed to mask SIGINT for anvil.")?;
617
618    let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
619
620    let stdout_reader = std::io::BufReader::new(
621        anvil
622            .child_mut()
623            .stdout
624            .take()
625            .context("Failed to read from anvil stdout.")?,
626    );
627
628    tokio::task::spawn_blocking(move || {
629        use std::io::BufRead;
630        let mut stdout_lines = stdout_reader.lines();
631        while let Some(line) = stdout_lines.next() {
632            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
633        }
634    });
635
636    tracing::info!("Anvil started at port {}.", anvil_port);
637    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
638        .context("Failed to restore old signal mask for anvil.")?;
639
640    Ok(anvil)
641}
642
643#[actix::main]
644async fn main() -> anyhow::Result<()> {
645    // Install panic hook to cleanup anvil on panic
646    std::panic::set_hook(Box::new(|_| {
647        if let Some(&pid) = ANVIL_PID.get() {
648            tracing::warn!("Panic detected, killing anvil process {}", pid);
649            use nix::sys::signal::{Signal, kill};
650            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
651        }
652    }));
653
654    lyquor_cli::setup_tracing()?;
655
656    println!("{}", lyquor_cli::format_logo_banner());
657
658    let matches = clap::command!()
659        .propagate_version(true)
660        .arg(
661            clap::arg!(--"mem-db" "Use in-memory DB simulation (for debugging or some benchmarks).")
662                .required(false)
663                .action(clap::ArgAction::SetTrue),
664        )
665        .arg(
666            clap::arg!(--"api-eth" <IP_AND_PORT> "Serving address for Ethereum API.")
667                .required(false)
668                .default_value("localhost:10087"),
669        )
670        .arg(
671            clap::arg!(--"upc-addr" <IP_AND_PORT> "Serving address for UPC.")
672                .required(false)
673                .default_value("localhost:10080"),
674        )
675        .arg(
676            clap::arg!(--network <NETWORK> "Select the network to run. (Available: devnet/testnet/mainnet)")
677                .required(false)
678                .default_value("devnet"),
679        )
680        .arg(
681            clap::arg!(--"force-seq-eth" <WS_URL> "Override the WebSocket url for Ethereum API Sequencer.")
682                .required(false),
683        )
684        .arg(
685            clap::arg!(--"flush-interval" <SECONDS> "Interval in seconds for database flushing (0 to disable).")
686                .required(false)
687                .value_parser(clap::value_parser!(u64)),
688        )
689        .get_matches();
690
691    // TODO: configure these parameters properly
692    let (_, tls_config) = lyquor_tls::generator::single_node_config();
693    let addr = matches.get_one::<String>("upc-addr").unwrap();
694    let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
695        .listen_addr(addr.clone())
696        .build()
697        .context("Failed to setup network.")?;
698    network.start().await.context("Failed to start network.")?;
699
700    let mut _anvil: Option<AnvilInstance> = None;
701    let network_name = matches.get_one::<String>("network").map(|s| s.as_str());
702    let profile: utils::LyquorProfile = match network_name {
703        Some("devnet") => {
704            let ws_url = match matches.get_one::<String>("force-seq-eth") {
705                None => {
706                    let anvil = start_devnet_anvil()?;
707                    // Store anvil PID for panic hook
708                    ANVIL_PID.set(anvil.child().id()).ok();
709                    let ep = anvil.ws_endpoint();
710                    _anvil = Some(anvil);
711                    ep
712                }
713                Some(url) => url.to_string(),
714            };
715
716            utils::Devnet::new(ws_url).into()
717        }
718        _ => return Err(anyhow::anyhow!("Network is not supported.")),
719    };
720    let seq_eth_url = matches
721        .get_one::<String>("force-seq-eth")
722        .cloned()
723        .unwrap_or_else(|| profile.sequencer().to_string());
724    let api_url = matches.get_one::<String>("api-eth").cloned().unwrap();
725    let mem_db = matches.get_flag("mem-db");
726    let flush_interval_secs = matches
727        .get_one::<u64>("flush-interval")
728        .copied()
729        .unwrap_or_else(|| match network_name {
730            Some("devnet") => 0,
731            _ => 10,
732        });
733
734    // Signal handlers should be registered before starting any actors, so they'll be stopped
735    // gracefully.
736    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
737    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
738
739    let node = Node {
740        sys: Subsystems::default(),
741        config: Arc::new(NodeConfig {
742            mem_db,
743            seq_eth_url,
744            api_url,
745            profile,
746            started_anvil: _anvil.is_some(),
747            flush_interval_secs,
748        }),
749        network: Some(network),
750    }
751    .start();
752
753    tokio::select! {
754        _ = sigint.recv() => {
755            tracing::info!("received SIGINT");
756        }
757        _ = sigterm.recv() => {
758            tracing::info!("received SIGTERM");
759        }
760    }
761
762    Ok(node.send(Stop).await?)
763}