lyquor/
lyquor.rs

1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_lib::lyquid::LyquidPool;
9use lyquor_oci::oci::{LYQUID_PACK_ASSET_TYPE_VALUE_LYQUID, OCIReference, OCIRegistry, OCIRegistryAuth};
10use lyquor_oci::pack::LyquidPackDigest;
11use lyquor_oci::registry::{PulledContent, Registry};
12use lyquor_primitives::{LyquidNumber, debug_struct_name};
13use serde_json::{Map, Value};
14use tokio::signal::unix::{SignalKind, signal};
15use tokio::sync::Notify;
16
17use lyquor_api::{
18    actor::Stop,
19    anyhow::{self, Context as _Context},
20    config::{DB, NodeConfig},
21    kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
22    profile::{Devnet, LyquorProfile, NetworkType},
23    subkey_builder,
24};
25use lyquor_db::{MemDB, RocksDB};
26use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
27use lyquor_lib::{api, lyquid, utils};
28use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
29use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
30use lyquor_vm::log::Log;
31
32subkey_builder!(RootSubKey(
33    ([0x01])-lyquid() => Key,
34    ([0x02])-fco() => Key,
35    ([0x03])-log() => Key,
36    ([0x04])-event_store() => Key,
37));
38
39fn subkey() -> &'static RootSubKey {
40    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
41    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
42}
43
44const ANVIL_STATE_FILENAME: &str = "anvil.state";
45
46static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
47
48#[derive(Debug, Message)]
49#[rtype(result = "()")]
50struct LoadLyquid {
51    id: LyquidID,
52    deps: Vec<LyquidID>,
53}
54
55#[derive(Debug, Message)]
56#[rtype(result = "()")]
57struct SetPool {
58    pool: Addr<lyquid::LyquidPool>,
59}
60
61struct LyquidDiscovery {
62    pool: Option<Addr<lyquid::LyquidPool>>,
63    register: Arc<Notify>,
64    bartender_id: LyquidID,
65    bartender_addr: Address,
66}
67
68lyquor_primitives::debug_struct_name!(LyquidDiscovery);
69
70impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
71    type Result = ResponseFuture<Address>;
72    #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
73    fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
74        let lyquor_seq::eth::GetContract { id, idx } = msg;
75        if id == self.bartender_id && idx == Some(0) {
76            return Box::pin(std::future::ready(self.bartender_addr));
77        }
78        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
79        let reg = self.register.clone();
80        Box::pin(async move {
81            loop {
82                // FIXME: improve the efficiency of this by filtering the reigster event for
83                // the specific lyquid in bartender
84                // idx=None means latest; else nth deployment
85                if let Some(n) = idx {
86                    match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
87                        Ok(Some(info)) => return info.contract,
88                        Ok(None) => reg.notified().await,
89                        Err(e) => {
90                            tracing::error!("Failed to get lyquid deployment info: {}", e);
91                            // Stall forever for safety
92                            std::future::pending::<()>().await;
93                        }
94                    }
95                } else {
96                    match pool.send(crate::lyquid::GetLatestLyquidDeploymentInfo { id }).await {
97                        Ok(Some(info)) => return info.contract,
98                        Ok(None) => reg.notified().await,
99                        Err(e) => {
100                            tracing::error!("Failed to get latest lyquid deployment info: {}", e);
101                            std::future::pending::<()>().await;
102                        }
103                    }
104                }
105            }
106        })
107    }
108}
109
110// latest variant handled in GetContract via idx=None
111
112impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
113    type Result = ();
114
115    #[tracing::instrument(level = "trace", skip(msg))]
116    fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
117        let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
118        if let Some(event) = event {
119            ctx.spawn(self.load(event.id, event.deps).into_actor(self));
120        }
121    }
122}
123
124impl Handler<LoadLyquid> for LyquidDiscovery {
125    type Result = ResponseFuture<()>;
126
127    #[tracing::instrument(level = "trace")]
128    fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
129        Box::pin(self.load(msg.id, msg.deps))
130    }
131}
132
133impl Handler<SetPool> for LyquidDiscovery {
134    type Result = ();
135
136    #[tracing::instrument(level = "trace")]
137    fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
138        if self.pool.is_some() {
139            return;
140        }
141        let pool = msg.pool;
142        let me = ctx.address();
143        self.pool = Some(pool.clone());
144        ctx.spawn(
145            async move {
146                // TODO: right now we just subscribe to every lyquid known in history, need to change this
147                // to user-configured set (or groups)
148                let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
149                    None => {
150                        // FIXME: this could be triggered when bartender has only executed one slot
151                        // upon recovery. There is a race that before bartender repeat the
152                        // execution of the first slot, its state is without image so this call
153                        // will fail.
154                        tracing::error!("Failed to obtain the registered Lyquid list.");
155                        return;
156                    }
157                    Some(lyquids_with_deps) => lyquids_with_deps,
158                };
159                // Now we have the actual dependency list from the bartender
160                for (id, deps) in lyquids_with_deps.into_iter() {
161                    //
162                    //  --- NB: We assume we already have all dependency binaries. ---
163                    //
164                    if let Err(e) = me.send(LoadLyquid { id, deps }).await {
165                        tracing::error!("Failed to send LoadLyquid message: {}", e);
166                    }
167                }
168            }
169            .into_actor(self),
170        );
171    }
172}
173
174impl LyquidDiscovery {
175    fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
176        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
177        let register = self.register.clone();
178        async move {
179            // NOTE: locator is only a retrieval hint; image_hash in slots remains authoritative.
180            let image_locator = match pool
181                .send(crate::lyquid::GetLatestLyquidDeploymentInfo { id })
182                .await
183                .ok()
184                .flatten()
185            {
186                Some(info) => info.image_locator,
187                None => None,
188            };
189
190            if let Some(image_locator) = image_locator {
191                // FIXME: only pull by locator when the slot image hash is not available in the local ImageRepo.
192                tracing::info!("LyquidDiscovery: downloading image from locator: {}", image_locator);
193                Self::pull_image_from_locator(pool.clone(), image_locator)
194                    .await
195                    .unwrap_or_else(|e| {
196                        tracing::error!("Failed to pull lyquid image from locator: {e}");
197                    });
198            }
199
200            match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
201                Ok(result) => match result {
202                    Ok(already_exists) => {
203                        if !already_exists {
204                            tracing::info!("Discovered {}", id);
205                        }
206                    }
207                    Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
208                },
209                Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
210            }
211            register.notify_waiters();
212        }
213    }
214
215    async fn pull_image_from_locator(pool: Addr<LyquidPool>, locator: String) -> Result<(), String> {
216        let oci_reference = OCIReference::from_str(&locator)
217            .map_err(|e| format!("Unsupported image locator format (expected OCI reference): {e}"))?;
218        let host = match oci_reference.registry().split_once('/').map(|(host, _)| host) {
219            Some(h) => h,
220            None => oci_reference.registry(),
221        };
222        let auth = match OCIRegistry::get_docker_credential(host) {
223            Ok(auth) => auth,
224            Err(e) => {
225                tracing::trace!(
226                    "No docker credential found (reason: {}), falling back to anonymous auth.",
227                    e
228                );
229                OCIRegistryAuth::Anonymous
230            }
231        };
232        let oci = OCIRegistry::new(auth, oci_reference.registry());
233        let digest = oci_reference
234            .digest()
235            .ok_or_else(|| "OCI reference must include a digest.".to_string())
236            .and_then(|d| LyquidPackDigest::from_oci_digest(d).map_err(|e| e.to_string()))?;
237
238        // Get image
239        let PulledContent::Layer(wasm) = oci
240            .pull(
241                oci_reference.repository(),
242                None,
243                Some(&digest),
244                Some(LYQUID_PACK_ASSET_TYPE_VALUE_LYQUID),
245                None,
246                None,
247            )
248            .await
249            .map_err(|e| e.to_string())?
250        else {
251            return Err("Failed to pull wasm image layer from oci registry.".to_string());
252        };
253
254        // Push image into repo
255        let repo_hash = match pool
256            .send(crate::lyquid::ImageRepo)
257            .await
258            .unwrap()
259            .0
260            .push(wasm.into())
261            .await
262        {
263            Ok(hash) => hash,
264            Err(e) => return Err(e.to_string()),
265        };
266
267        tracing::info!("Image hash: {:?}", repo_hash);
268
269        // Is this necessary to check the hash here?
270
271        Ok(())
272    }
273}
274
275impl Handler<Stop> for LyquidDiscovery {
276    type Result = ();
277
278    #[tracing::instrument(level = "trace")]
279    fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
280        ctx.stop();
281    }
282}
283
284impl Actor for LyquidDiscovery {
285    type Context = actix::Context<Self>;
286
287    #[tracing::instrument(level = "trace", skip(_ctx))]
288    fn started(&mut self, _ctx: &mut Self::Context) {}
289
290    #[tracing::instrument(level = "trace", skip(_ctx))]
291    fn stopped(&mut self, _ctx: &mut Self::Context) {}
292}
293
294struct DBFlusher {
295    last_bn: u64,
296    db_dir: PathBuf,
297    store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
298    fco: Addr<FCO>,
299    anvil_chain: Option<ClientHandle>,
300    flush_interval_secs: u64,
301}
302
303lyquor_primitives::debug_struct_name!(DBFlusher);
304
305impl DBFlusher {
306    fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
307        let anvil_chain = self.anvil_chain.clone();
308        let fco = self.fco.clone();
309        let db_dir = self.db_dir.clone();
310        async move {
311            fco.send(lyquor_seq::fco::Commit)
312                .await
313                .map_err(anyhow::Error::from)
314                .and_then(|e| e.map_err(anyhow::Error::from))?;
315
316            if let Some(client) = &anvil_chain {
317                // If running devnet (anvil), dump chain state.
318                //
319                // We dump the anvil chain state *after* the commit of FCO, so FCO's progress will
320                // not outpace the dumped chain state (i.e., includes execution caused by the
321                // blocks later). Of course, this somehow assume Anvil's API handling has an
322                // ordering (the dump chain state is more recent than that handled previous API
323                // responses).
324                use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
325                let dump = client
326                    .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
327                    .await
328                    .map(|r| r.0);
329
330                match dump {
331                    Ok(state) => {
332                        // Write the anvil state to file
333                        let devnet_path = db_dir.join("devnet");
334                        std::fs::create_dir_all(&devnet_path)?;
335                        std::fs::write(devnet_path.join(ANVIL_STATE_FILENAME), &state)?;
336                    }
337                    Err(_) => {
338                        tracing::error!(
339                            "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
340                        );
341                    }
342                }
343            }
344
345            Ok(())
346        }
347    }
348
349    fn flush(&mut self, res: anyhow::Result<()>) {
350        if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
351            tracing::error!("Failed to write: {e}");
352        } else {
353            tracing::info!("Written to the storage backend (block={}).", self.last_bn);
354        }
355    }
356}
357
358#[derive(Message, Debug)]
359#[rtype(result = "()")]
360struct FlushDB;
361
362impl Handler<FlushDB> for DBFlusher {
363    type Result = ResponseActFuture<Self, ()>;
364
365    #[tracing::instrument(level = "trace", skip(self, _ctx))]
366    fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
367        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
368            actor.flush(res);
369        }))
370    }
371}
372
373impl Handler<Stop> for DBFlusher {
374    type Result = ResponseActFuture<Self, ()>;
375
376    #[tracing::instrument(level = "trace")]
377    fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
378        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
379            actor.flush(res);
380        }))
381    }
382}
383
384impl Actor for DBFlusher {
385    type Context = actix::Context<Self>;
386
387    #[tracing::instrument(level = "trace", skip_all)]
388    fn started(&mut self, ctx: &mut Self::Context) {
389        if self.flush_interval_secs > 0 {
390            let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
391            ctx.run_interval(periodic_write, |_act, ctx| {
392                ctx.address().do_send(FlushDB);
393            });
394        }
395    }
396
397    #[tracing::instrument(level = "trace", skip_all)]
398    fn stopped(&mut self, _ctx: &mut Self::Context) {}
399}
400
401async fn setup_eth_backend(
402    profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
403) -> anyhow::Result<eth::Backend> {
404    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
405        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
406
407    Ok(eth::Backend::new(
408        eth::Config::builder()
409            .finality_tag(finality_tag)
410            .registry(reg)
411            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
412            .build(),
413        jsonrpc_client,
414    ))
415}
416
417#[derive(Default)]
418struct Subsystems {
419    fco: Option<Addr<FCO>>,
420    log: Option<Addr<Log>>,
421    pool: Option<Addr<lyquid::LyquidPool>>,
422    api: Option<Addr<api::EthAPIServer>>,
423    db: Option<Addr<DBFlusher>>,
424    dis: Option<Addr<LyquidDiscovery>>,
425    client: Option<ClientHandle>,
426    eth_backend: Option<lyquor_seq::eth::Backend>,
427}
428debug_struct_name!(Subsystems);
429
430impl Subsystems {
431    #[tracing::instrument(level = "trace")]
432    async fn stop(&mut self) {
433        if let Some(api) = self.api.take() {
434            api.send(Stop).await.ok();
435        }
436        if let Some(eth_backend) = self.eth_backend.take() {
437            eth_backend.stop().await;
438        }
439        if let Some(log) = self.log.take() {
440            log.send(Stop).await.ok();
441        }
442        if let Some(dis) = self.dis.take() {
443            dis.send(Stop).await.ok();
444        }
445        if let Some(pool) = self.pool.take() {
446            pool.send(Stop).await.ok();
447        }
448        if let Some(db) = self.db.take() {
449            db.send(Stop).await.ok();
450        }
451        if let Some(dis) = self.dis.take() {
452            dis.send(Stop).await.ok();
453        }
454        if let Some(fco) = self.fco.take() {
455            fco.send(Stop).await.ok();
456        }
457    }
458}
459
460/// TODO: refactor this with actix's Actor model.
461struct Node {
462    sys: Subsystems,
463    config: Arc<NodeConfig>,
464    profile: Arc<LyquorProfile>,
465    seq_eth_url: String,
466    started_anvil: bool,
467    network: Option<lyquor_net::hub::Hub>,
468}
469
470lyquor_primitives::debug_struct_name!(Node);
471
472impl Handler<Stop> for Node {
473    type Result = AtomicResponse<Self, ()>;
474
475    #[tracing::instrument(level = "trace")]
476    fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
477        let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
478        AtomicResponse::new(Box::pin(
479            async move {
480                sys.stop().await;
481            }
482            .into_actor(self)
483            .map(|_, _act, ctx| {
484                ctx.stop();
485            }),
486        ))
487    }
488}
489
490impl Actor for Node {
491    type Context = Context<Self>;
492
493    #[tracing::instrument(level = "trace", skip(ctx))]
494    fn started(&mut self, ctx: &mut Self::Context) {
495        // NOTE: use spawn here instead of wait so Stop can be handled
496        ctx.spawn(
497            Self::init(
498                self.config.clone(),
499                self.profile.clone(),
500                self.seq_eth_url.clone(),
501                self.started_anvil,
502                self.network.take().expect("Node: Network not found."),
503            )
504            .into_actor(self)
505            .map(|sys, act, ctx| {
506                match sys {
507                    Ok(sys) => {
508                        act.sys = sys;
509                    }
510                    Err(e) => {
511                        tracing::error!("Failed to initialize node: {e:?}");
512                        ctx.stop();
513                    }
514                };
515            }),
516        );
517    }
518
519    #[tracing::instrument(level = "trace", skip(_ctx))]
520    fn stopped(&mut self, _ctx: &mut Self::Context) {}
521}
522
523impl Node {
524    #[tracing::instrument(level = "trace", skip_all)]
525    async fn init(
526        config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String, started_anvil: bool,
527        network: lyquor_net::hub::Hub,
528    ) -> anyhow::Result<Subsystems> {
529        let mut sys = Subsystems::default();
530
531        // Set up Ethereum sequence backend.
532        let jsonrpc_client = ClientConfig::builder().url(seq_eth_url.parse()?).build().into_client();
533        sys.client = Some(jsonrpc_client.clone());
534
535        //// 1. DB & Storage
536        let (store, lyq_store, anvil_chain): (
537            Arc<ShadowKVStore<Box<dyn KVStore>>>,
538            Arc<dyn utils::LVMStoreFactory>,
539            Option<ClientHandle>,
540        ) = match &config.storage.db {
541            DB::MemDb => {
542                tracing::warn!("using MemDB, all data will be lost upon exit");
543                let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
544                (store, Arc::new(utils::LVMMemStore), None)
545            }
546            DB::RocksDb { db_dir } => {
547                let base_dir = db_dir.clone();
548                let store = Arc::new(ShadowKVStore::new(
549                    Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
550                ));
551                let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
552                    store.clone(),
553                    subkey().lyquid(),
554                ))));
555
556                // Load anvil state if we started our own anvil and state file exists
557                let anvil_chain = if started_anvil {
558                    let state_file = db_dir.join(profile.id()).join(ANVIL_STATE_FILENAME);
559                    if state_file.exists() {
560                        use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
561                        let state_data = std::fs::read(&state_file)?;
562                        let load_result = jsonrpc_client
563                            .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
564                            .await?;
565                        tracing::info!("Loaded anvil state from {}: {}.", state_file.display(), load_result.0);
566                    } else {
567                        tracing::info!(
568                            "No existing anvil state file found at {}, starting with fresh state.",
569                            state_file.display()
570                        );
571                    }
572                    Some(jsonrpc_client.clone())
573                } else {
574                    None
575                };
576
577                (store, lyq_store, anvil_chain)
578            }
579        };
580        ////
581
582        //// 2. Fate-Constrainted Ordering Subsystem.
583        // Start discovering Lyquid and sequencing contracts.
584        let dis = LyquidDiscovery {
585            pool: None,
586            register: Arc::new(Notify::new()),
587            bartender_id: *profile.bartender_id(),
588            bartender_addr: Address::from_str(profile.bartender_addr())
589                .context("Invalid bartender address in profile.")?
590                .into(),
591        }
592        .start();
593        sys.dis = Some(dis.clone());
594
595        let eth_backend = setup_eth_backend(&profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
596
597        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
598        let fco = FCO::new(
599            eth_backend.clone(),
600            Arc::new(fco_store),
601            Some(profile.init_chain_position()),
602        )?
603        .start();
604        sys.fco = Some(fco.clone());
605        // Keep a separate reference for shutdown management
606        sys.eth_backend = Some(eth_backend);
607        ////
608
609        //// 3. Lyquor Virtual Machine Subsystem.
610        // Configure Log subsystem.
611        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
612        let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
613        sys.log = Some(log.clone());
614
615        let mut lvm = lyquid::LVM::new(
616            log.clone(),
617            network,
618            10,
619            Some(lyquor_vm::InterOps {
620                inter: fco.clone().recipient(),
621                submit: fco.clone().recipient(),
622            }),
623        );
624        let api_subs = api::Subscriptions::new().start();
625        let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
626            Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
627            1000,
628        ));
629        ////
630
631        //// 4. UPC peers
632        for peer in &config.peers {
633            lvm.upc_mut()
634                .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
635                .await
636                .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
637        }
638        ////
639
640        //// 5. Pool of all Lyquids
641        let image_repo_store = RocksDB::new(&config.storage.image_repo_dir)?;
642        let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
643        let bartender_id = *profile.bartender_id();
644        let pool = lyquid::LyquidPool::new(
645            bartender_id,
646            lvm,
647            lyquid::LyquidPoolSetup {
648                fco: fco.clone(),
649                store_factory: lyq_store,
650                image_repo,
651                event_store,
652                api_subs: api_subs.clone(),
653                config: lyquid::LyquidConfig::builder().build(),
654            },
655        )
656        .await?
657        .start();
658        sys.pool = Some(pool.clone());
659        ////
660
661        // Configure bartender.
662        let bartender_setup = {
663            let register_topic = LyteLog::tagged_value_topic("Register");
664            match log
665                .send(lyquor_vm::log::GetNumberOfRecords {
666                    id: bartender_id,
667                    topic: register_topic,
668                })
669                .await
670            {
671                Ok(count) if count > 0 => None,
672                Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
673                    id: bartender_id,
674                    topic: register_topic,
675                })),
676                Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
677            }
678        };
679
680        // Start the API server - use the same jsonrpc client as sequencer
681        let api = lyquor_lib::api::EthAPIServer::new(
682            &config.network.api_addr,
683            jsonrpc_client.clone(),
684            api_subs,
685            fco.clone(),
686            pool.clone(),
687        )
688        .await?;
689        sys.api = Some(api.start());
690
691        // Wait for bartender to be registered.
692        if let Some(reg) = bartender_setup {
693            tracing::info!("Waiting for bartender to be registered.");
694            reg.await.context("Failed to wait for bartender registration.")?;
695        }
696        // Bartender, as a normal Lyquid, may have some re-execution, let's make sure it is done.
697        let bartender_ins = pool
698            .send(lyquid::GetBartender)
699            .await
700            .expect("Failed to obtain bartender.");
701        let bartender_ln = bartender_ins.instance().max_number().unwrap_or(LyquidNumber::ZERO);
702        tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
703        bartender_ins
704            .wait_until(bartender_ln)
705            .await
706            .context("Failed to wait for the initial re-execution of bartender.")?;
707
708        // Finish up LyquidDiscovery.
709        log.send(lyquor_vm::log::Subscribe {
710            id: bartender_id,
711            topic: LyteLog::tagged_value_topic("Register"),
712            subscriber: dis.clone().recipient(),
713        })
714        .await
715        .context("Failed to subscribe to log system for LyquidDiscovery.")?;
716        dis.send(SetPool { pool: pool.clone() })
717            .await
718            .context("Failed to set pool in LyquidDiscovery.")?;
719
720        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
721
722        // Start a DB writer that persist when node is idle.
723        let db = DBFlusher {
724            last_bn: 0,
725            db_dir: config.resolved_db_dir(),
726            store,
727            fco,
728            anvil_chain,
729            flush_interval_secs: config.flush_interval_secs,
730        }
731        .start();
732        sys.db = Some(db);
733
734        Ok(sys)
735    }
736}
737
738fn generate_unused_port() -> anyhow::Result<u16> {
739    let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
740    let port = listener.local_addr().context("Failed to get local address.")?.port();
741    Ok(port)
742}
743
744fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
745    let anvil_port = generate_unused_port()?;
746    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
747    // terminate when dropped)
748    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
749    let mut sigset = SigSet::empty();
750    sigset.add(Signal::SIGINT);
751    let mut old_mask = SigSet::empty();
752    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
753        .context("Failed to mask SIGINT for anvil.")?;
754
755    let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
756
757    let stdout_reader = std::io::BufReader::new(
758        anvil
759            .child_mut()
760            .stdout
761            .take()
762            .context("Failed to read from anvil stdout.")?,
763    );
764
765    tokio::task::spawn_blocking(move || {
766        use std::io::BufRead;
767        let mut stdout_lines = stdout_reader.lines();
768        while let Some(line) = stdout_lines.next() {
769            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
770        }
771    });
772
773    tracing::info!("Anvil started at port {}.", anvil_port);
774    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
775        .context("Failed to restore old signal mask for anvil.")?;
776
777    Ok(anvil)
778}
779
780fn parse_override_value(raw: &str) -> Value {
781    if let Ok(v) = raw.parse::<bool>() {
782        return Value::from(v);
783    }
784    if let Ok(v) = raw.parse::<u64>() {
785        return Value::from(v);
786    }
787    if let Ok(v) = raw.parse::<i64>() {
788        return Value::from(v);
789    }
790    if let Ok(v) = raw.parse::<f64>() {
791        return Value::from(v);
792    }
793    Value::from(raw.to_string())
794}
795
796fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
797    let mut segments = key.split('.').peekable();
798    let mut current = overrides;
799
800    while let Some(segment) = segments.next() {
801        if segments.peek().is_none() {
802            current.insert(segment.to_string(), value);
803            return Ok(());
804        }
805
806        let entry = current
807            .entry(segment.to_string())
808            .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
809
810        match entry {
811            Value::Object(dict) => {
812                current = dict;
813            }
814            _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
815        }
816    }
817
818    Ok(())
819}
820
821fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
822    let mut overrides: Map<String, Value> = Map::new();
823
824    if let Some(entries) = matches.get_many::<String>("config-override") {
825        for entry in entries {
826            let (key, value) = entry
827                .split_once('=')
828                .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
829            insert_override_path(&mut overrides, key, parse_override_value(value))?;
830        }
831    }
832
833    Ok(overrides)
834}
835
836#[actix::main]
837async fn main() -> anyhow::Result<()> {
838    // Install panic hook to cleanup anvil on panic
839    std::panic::set_hook(Box::new(|_| {
840        if let Some(&pid) = ANVIL_PID.get() {
841            tracing::warn!("Panic detected, killing anvil process {}", pid);
842            use nix::sys::signal::{Signal, kill};
843            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
844        }
845    }));
846
847    lyquor_cli::setup_tracing()?;
848
849    println!("{}", lyquor_cli::format_logo_banner());
850
851    let matches = clap::command!()
852        .propagate_version(true)
853        .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
854        .arg(
855            clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
856                .required(false)
857                .action(clap::ArgAction::Append)
858                .value_parser(clap::builder::NonEmptyStringValueParser::new()),
859        )
860        .get_matches();
861
862    let config_path = matches.get_one::<String>("config").map(PathBuf::from);
863    let overrides = build_config_overrides(&matches)?;
864    let config = NodeConfig::load(config_path, overrides)?;
865
866    let node_seed = config.node_key.seed_bytes()?;
867    let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed);
868    let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
869        .listen_addr(config.network.upc_addr.clone())
870        .build()
871        .context("Failed to setup network.")?;
872    network.start().await.context("Failed to start network.")?;
873
874    let mut _anvil: Option<AnvilInstance> = None;
875    let profile: Arc<LyquorProfile> = Arc::new(match config.profile {
876        NetworkType::Devnet => {
877            let ws_url = match &config.force_seq_eth {
878                None => {
879                    let anvil = start_devnet_anvil()?;
880                    // Store anvil PID for panic hook
881                    ANVIL_PID.set(anvil.child().id()).ok();
882                    let ep = anvil.ws_endpoint();
883                    _anvil = Some(anvil);
884                    ep
885                }
886                Some(url) => url.to_string(),
887            };
888
889            Devnet::new(ws_url).into()
890        }
891        _ => return Err(anyhow::anyhow!("Network is not supported.")),
892    });
893    let seq_eth_url = config
894        .force_seq_eth
895        .clone()
896        .unwrap_or_else(|| profile.sequencer().to_string());
897
898    // Signal handlers should be registered before starting any actors, so they'll be stopped
899    // gracefully.
900    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
901    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
902
903    let node = Node {
904        sys: Subsystems::default(),
905        config: Arc::new(config),
906        profile: profile.clone(),
907        seq_eth_url,
908        started_anvil: _anvil.is_some(),
909        network: Some(network),
910    }
911    .start();
912
913    tokio::select! {
914        _ = sigint.recv() => {
915            tracing::info!("received SIGINT");
916        }
917        _ = sigterm.recv() => {
918            tracing::info!("received SIGTERM");
919        }
920    }
921
922    Ok(node.send(Stop).await?)
923}