1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_lib::lyquid::LyquidPool;
9use lyquor_oci::oci::{LYQUID_PACK_ASSET_TYPE_VALUE_LYQUID, OCIReference, OCIRegistry, OCIRegistryAuth};
10use lyquor_oci::pack::LyquidPackDigest;
11use lyquor_oci::registry::{PulledContent, Registry};
12use lyquor_primitives::{LyquidNumber, debug_struct_name};
13use serde_json::{Map, Value};
14use tokio::signal::unix::{SignalKind, signal};
15use tokio::sync::Notify;
16
17use lyquor_api::{
18 actor::Stop,
19 anyhow::{self, Context as _Context},
20 config::{DB, NodeConfig},
21 kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
22 profile::{Devnet, LyquorProfile, NetworkType},
23 subkey_builder,
24};
25use lyquor_db::{MemDB, RocksDB};
26use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
27use lyquor_lib::{api, lyquid, utils};
28use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
29use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
30use lyquor_vm::log::Log;
31
32subkey_builder!(RootSubKey(
33 ([0x01])-lyquid() => Key,
34 ([0x02])-fco() => Key,
35 ([0x03])-log() => Key,
36 ([0x04])-event_store() => Key,
37));
38
39fn subkey() -> &'static RootSubKey {
40 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
41 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
42}
43
44const ANVIL_STATE_FILENAME: &str = "anvil.state";
45
46static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
47
48#[derive(Debug, Message)]
49#[rtype(result = "()")]
50struct LoadLyquid {
51 id: LyquidID,
52 deps: Vec<LyquidID>,
53}
54
55#[derive(Debug, Message)]
56#[rtype(result = "()")]
57struct SetPool {
58 pool: Addr<lyquid::LyquidPool>,
59}
60
61struct LyquidDiscovery {
62 pool: Option<Addr<lyquid::LyquidPool>>,
63 register: Arc<Notify>,
64 bartender_id: LyquidID,
65 bartender_addr: Address,
66}
67
68lyquor_primitives::debug_struct_name!(LyquidDiscovery);
69
70impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
71 type Result = ResponseFuture<Address>;
72 #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
73 fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
74 let lyquor_seq::eth::GetContract { id, idx } = msg;
75 if id == self.bartender_id && idx == Some(0) {
76 return Box::pin(std::future::ready(self.bartender_addr));
77 }
78 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
79 let reg = self.register.clone();
80 Box::pin(async move {
81 loop {
82 if let Some(n) = idx {
86 match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
87 Ok(Some(info)) => return info.contract,
88 Ok(None) => reg.notified().await,
89 Err(e) => {
90 tracing::error!("Failed to get lyquid deployment info: {}", e);
91 std::future::pending::<()>().await;
93 }
94 }
95 } else {
96 match pool.send(crate::lyquid::GetLatestLyquidDeploymentInfo { id }).await {
97 Ok(Some(info)) => return info.contract,
98 Ok(None) => reg.notified().await,
99 Err(e) => {
100 tracing::error!("Failed to get latest lyquid deployment info: {}", e);
101 std::future::pending::<()>().await;
102 }
103 }
104 }
105 }
106 })
107 }
108}
109
110impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
113 type Result = ();
114
115 #[tracing::instrument(level = "trace", skip(msg))]
116 fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
117 let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
118 if let Some(event) = event {
119 ctx.spawn(self.load(event.id, event.deps).into_actor(self));
120 }
121 }
122}
123
124impl Handler<LoadLyquid> for LyquidDiscovery {
125 type Result = ResponseFuture<()>;
126
127 #[tracing::instrument(level = "trace")]
128 fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
129 Box::pin(self.load(msg.id, msg.deps))
130 }
131}
132
133impl Handler<SetPool> for LyquidDiscovery {
134 type Result = ();
135
136 #[tracing::instrument(level = "trace")]
137 fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
138 if self.pool.is_some() {
139 return;
140 }
141 let pool = msg.pool;
142 let me = ctx.address();
143 self.pool = Some(pool.clone());
144 ctx.spawn(
145 async move {
146 let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
149 None => {
150 tracing::error!("Failed to obtain the registered Lyquid list.");
155 return;
156 }
157 Some(lyquids_with_deps) => lyquids_with_deps,
158 };
159 for (id, deps) in lyquids_with_deps.into_iter() {
161 if let Err(e) = me.send(LoadLyquid { id, deps }).await {
165 tracing::error!("Failed to send LoadLyquid message: {}", e);
166 }
167 }
168 }
169 .into_actor(self),
170 );
171 }
172}
173
174impl LyquidDiscovery {
175 fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
176 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
177 let register = self.register.clone();
178 async move {
179 let image_locator = match pool
181 .send(crate::lyquid::GetLatestLyquidDeploymentInfo { id })
182 .await
183 .ok()
184 .flatten()
185 {
186 Some(info) => info.image_locator,
187 None => None,
188 };
189
190 if let Some(image_locator) = image_locator {
191 tracing::info!("LyquidDiscovery: downloading image from locator: {}", image_locator);
193 Self::pull_image_from_locator(pool.clone(), image_locator)
194 .await
195 .unwrap_or_else(|e| {
196 tracing::error!("Failed to pull lyquid image from locator: {e}");
197 });
198 }
199
200 match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
201 Ok(result) => match result {
202 Ok(already_exists) => {
203 if !already_exists {
204 tracing::info!("Discovered {}", id);
205 }
206 }
207 Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
208 },
209 Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
210 }
211 register.notify_waiters();
212 }
213 }
214
215 async fn pull_image_from_locator(pool: Addr<LyquidPool>, locator: String) -> Result<(), String> {
216 let oci_reference = OCIReference::from_str(&locator)
217 .map_err(|e| format!("Unsupported image locator format (expected OCI reference): {e}"))?;
218 let host = match oci_reference.registry().split_once('/').map(|(host, _)| host) {
219 Some(h) => h,
220 None => oci_reference.registry(),
221 };
222 let auth = match OCIRegistry::get_docker_credential(host) {
223 Ok(auth) => auth,
224 Err(e) => {
225 tracing::trace!(
226 "No docker credential found (reason: {}), falling back to anonymous auth.",
227 e
228 );
229 OCIRegistryAuth::Anonymous
230 }
231 };
232 let oci = OCIRegistry::new(auth, oci_reference.registry());
233 let digest = oci_reference
234 .digest()
235 .ok_or_else(|| "OCI reference must include a digest.".to_string())
236 .and_then(|d| LyquidPackDigest::from_oci_digest(d).map_err(|e| e.to_string()))?;
237
238 let PulledContent::Layer(wasm) = oci
240 .pull(
241 oci_reference.repository(),
242 None,
243 Some(&digest),
244 Some(LYQUID_PACK_ASSET_TYPE_VALUE_LYQUID),
245 None,
246 None,
247 )
248 .await
249 .map_err(|e| e.to_string())?
250 else {
251 return Err("Failed to pull wasm image layer from oci registry.".to_string());
252 };
253
254 let repo_hash = match pool
256 .send(crate::lyquid::ImageRepo)
257 .await
258 .unwrap()
259 .0
260 .push(wasm.into())
261 .await
262 {
263 Ok(hash) => hash,
264 Err(e) => return Err(e.to_string()),
265 };
266
267 tracing::info!("Image hash: {:?}", repo_hash);
268
269 Ok(())
272 }
273}
274
275impl Handler<Stop> for LyquidDiscovery {
276 type Result = ();
277
278 #[tracing::instrument(level = "trace")]
279 fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
280 ctx.stop();
281 }
282}
283
284impl Actor for LyquidDiscovery {
285 type Context = actix::Context<Self>;
286
287 #[tracing::instrument(level = "trace", skip(_ctx))]
288 fn started(&mut self, _ctx: &mut Self::Context) {}
289
290 #[tracing::instrument(level = "trace", skip(_ctx))]
291 fn stopped(&mut self, _ctx: &mut Self::Context) {}
292}
293
294struct DBFlusher {
295 last_bn: u64,
296 db_dir: PathBuf,
297 store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
298 fco: Addr<FCO>,
299 anvil_chain: Option<ClientHandle>,
300 flush_interval_secs: u64,
301}
302
303lyquor_primitives::debug_struct_name!(DBFlusher);
304
305impl DBFlusher {
306 fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
307 let anvil_chain = self.anvil_chain.clone();
308 let fco = self.fco.clone();
309 let db_dir = self.db_dir.clone();
310 async move {
311 fco.send(lyquor_seq::fco::Commit)
312 .await
313 .map_err(anyhow::Error::from)
314 .and_then(|e| e.map_err(anyhow::Error::from))?;
315
316 if let Some(client) = &anvil_chain {
317 use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
325 let dump = client
326 .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
327 .await
328 .map(|r| r.0);
329
330 match dump {
331 Ok(state) => {
332 let devnet_path = db_dir.join("devnet");
334 std::fs::create_dir_all(&devnet_path)?;
335 std::fs::write(devnet_path.join(ANVIL_STATE_FILENAME), &state)?;
336 }
337 Err(_) => {
338 tracing::error!(
339 "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
340 );
341 }
342 }
343 }
344
345 Ok(())
346 }
347 }
348
349 fn flush(&mut self, res: anyhow::Result<()>) {
350 if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
351 tracing::error!("Failed to write: {e}");
352 } else {
353 tracing::info!("Written to the storage backend (block={}).", self.last_bn);
354 }
355 }
356}
357
358#[derive(Message, Debug)]
359#[rtype(result = "()")]
360struct FlushDB;
361
362impl Handler<FlushDB> for DBFlusher {
363 type Result = ResponseActFuture<Self, ()>;
364
365 #[tracing::instrument(level = "trace", skip(self, _ctx))]
366 fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
367 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
368 actor.flush(res);
369 }))
370 }
371}
372
373impl Handler<Stop> for DBFlusher {
374 type Result = ResponseActFuture<Self, ()>;
375
376 #[tracing::instrument(level = "trace")]
377 fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
378 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
379 actor.flush(res);
380 }))
381 }
382}
383
384impl Actor for DBFlusher {
385 type Context = actix::Context<Self>;
386
387 #[tracing::instrument(level = "trace", skip_all)]
388 fn started(&mut self, ctx: &mut Self::Context) {
389 if self.flush_interval_secs > 0 {
390 let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
391 ctx.run_interval(periodic_write, |_act, ctx| {
392 ctx.address().do_send(FlushDB);
393 });
394 }
395 }
396
397 #[tracing::instrument(level = "trace", skip_all)]
398 fn stopped(&mut self, _ctx: &mut Self::Context) {}
399}
400
401async fn setup_eth_backend(
402 profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
403) -> anyhow::Result<eth::Backend> {
404 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
405 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
406
407 Ok(eth::Backend::new(
408 eth::Config::builder()
409 .finality_tag(finality_tag)
410 .registry(reg)
411 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
412 .build(),
413 jsonrpc_client,
414 ))
415}
416
417#[derive(Default)]
418struct Subsystems {
419 fco: Option<Addr<FCO>>,
420 log: Option<Addr<Log>>,
421 pool: Option<Addr<lyquid::LyquidPool>>,
422 api: Option<Addr<api::EthAPIServer>>,
423 db: Option<Addr<DBFlusher>>,
424 dis: Option<Addr<LyquidDiscovery>>,
425 client: Option<ClientHandle>,
426 eth_backend: Option<lyquor_seq::eth::Backend>,
427}
428debug_struct_name!(Subsystems);
429
430impl Subsystems {
431 #[tracing::instrument(level = "trace")]
432 async fn stop(&mut self) {
433 if let Some(api) = self.api.take() {
434 api.send(Stop).await.ok();
435 }
436 if let Some(eth_backend) = self.eth_backend.take() {
437 eth_backend.stop().await;
438 }
439 if let Some(log) = self.log.take() {
440 log.send(Stop).await.ok();
441 }
442 if let Some(dis) = self.dis.take() {
443 dis.send(Stop).await.ok();
444 }
445 if let Some(pool) = self.pool.take() {
446 pool.send(Stop).await.ok();
447 }
448 if let Some(db) = self.db.take() {
449 db.send(Stop).await.ok();
450 }
451 if let Some(dis) = self.dis.take() {
452 dis.send(Stop).await.ok();
453 }
454 if let Some(fco) = self.fco.take() {
455 fco.send(Stop).await.ok();
456 }
457 }
458}
459
460struct Node {
462 sys: Subsystems,
463 config: Arc<NodeConfig>,
464 profile: Arc<LyquorProfile>,
465 seq_eth_url: String,
466 started_anvil: bool,
467 network: Option<lyquor_net::hub::Hub>,
468}
469
470lyquor_primitives::debug_struct_name!(Node);
471
472impl Handler<Stop> for Node {
473 type Result = AtomicResponse<Self, ()>;
474
475 #[tracing::instrument(level = "trace")]
476 fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
477 let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
478 AtomicResponse::new(Box::pin(
479 async move {
480 sys.stop().await;
481 }
482 .into_actor(self)
483 .map(|_, _act, ctx| {
484 ctx.stop();
485 }),
486 ))
487 }
488}
489
490impl Actor for Node {
491 type Context = Context<Self>;
492
493 #[tracing::instrument(level = "trace", skip(ctx))]
494 fn started(&mut self, ctx: &mut Self::Context) {
495 ctx.spawn(
497 Self::init(
498 self.config.clone(),
499 self.profile.clone(),
500 self.seq_eth_url.clone(),
501 self.started_anvil,
502 self.network.take().expect("Node: Network not found."),
503 )
504 .into_actor(self)
505 .map(|sys, act, ctx| {
506 match sys {
507 Ok(sys) => {
508 act.sys = sys;
509 }
510 Err(e) => {
511 tracing::error!("Failed to initialize node: {e:?}");
512 ctx.stop();
513 }
514 };
515 }),
516 );
517 }
518
519 #[tracing::instrument(level = "trace", skip(_ctx))]
520 fn stopped(&mut self, _ctx: &mut Self::Context) {}
521}
522
523impl Node {
524 #[tracing::instrument(level = "trace", skip_all)]
525 async fn init(
526 config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String, started_anvil: bool,
527 network: lyquor_net::hub::Hub,
528 ) -> anyhow::Result<Subsystems> {
529 let mut sys = Subsystems::default();
530
531 let jsonrpc_client = ClientConfig::builder().url(seq_eth_url.parse()?).build().into_client();
533 sys.client = Some(jsonrpc_client.clone());
534
535 let (store, lyq_store, anvil_chain): (
537 Arc<ShadowKVStore<Box<dyn KVStore>>>,
538 Arc<dyn utils::LVMStoreFactory>,
539 Option<ClientHandle>,
540 ) = match &config.storage.db {
541 DB::MemDb => {
542 tracing::warn!("using MemDB, all data will be lost upon exit");
543 let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
544 (store, Arc::new(utils::LVMMemStore), None)
545 }
546 DB::RocksDb { db_dir } => {
547 let base_dir = db_dir.clone();
548 let store = Arc::new(ShadowKVStore::new(
549 Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
550 ));
551 let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
552 store.clone(),
553 subkey().lyquid(),
554 ))));
555
556 let anvil_chain = if started_anvil {
558 let state_file = db_dir.join(profile.id()).join(ANVIL_STATE_FILENAME);
559 if state_file.exists() {
560 use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
561 let state_data = std::fs::read(&state_file)?;
562 let load_result = jsonrpc_client
563 .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
564 .await?;
565 tracing::info!("Loaded anvil state from {}: {}.", state_file.display(), load_result.0);
566 } else {
567 tracing::info!(
568 "No existing anvil state file found at {}, starting with fresh state.",
569 state_file.display()
570 );
571 }
572 Some(jsonrpc_client.clone())
573 } else {
574 None
575 };
576
577 (store, lyq_store, anvil_chain)
578 }
579 };
580 let dis = LyquidDiscovery {
585 pool: None,
586 register: Arc::new(Notify::new()),
587 bartender_id: *profile.bartender_id(),
588 bartender_addr: Address::from_str(profile.bartender_addr())
589 .context("Invalid bartender address in profile.")?
590 .into(),
591 }
592 .start();
593 sys.dis = Some(dis.clone());
594
595 let eth_backend = setup_eth_backend(&profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
596
597 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
598 let fco = FCO::new(
599 eth_backend.clone(),
600 Arc::new(fco_store),
601 Some(profile.init_chain_position()),
602 )?
603 .start();
604 sys.fco = Some(fco.clone());
605 sys.eth_backend = Some(eth_backend);
607 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
612 let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
613 sys.log = Some(log.clone());
614
615 let mut lvm = lyquid::LVM::new(
616 log.clone(),
617 network,
618 10,
619 Some(lyquor_vm::InterOps {
620 inter: fco.clone().recipient(),
621 submit: fco.clone().recipient(),
622 }),
623 );
624 let api_subs = api::Subscriptions::new().start();
625 let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
626 Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
627 1000,
628 ));
629 for peer in &config.peers {
633 lvm.upc_mut()
634 .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
635 .await
636 .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
637 }
638 let image_repo_store = RocksDB::new(&config.storage.image_repo_dir)?;
642 let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
643 let bartender_id = *profile.bartender_id();
644 let pool = lyquid::LyquidPool::new(
645 bartender_id,
646 lvm,
647 lyquid::LyquidPoolSetup {
648 fco: fco.clone(),
649 store_factory: lyq_store,
650 image_repo,
651 event_store,
652 api_subs: api_subs.clone(),
653 config: lyquid::LyquidConfig::builder().build(),
654 },
655 )
656 .await?
657 .start();
658 sys.pool = Some(pool.clone());
659 let bartender_setup = {
663 let register_topic = LyteLog::tagged_value_topic("Register");
664 match log
665 .send(lyquor_vm::log::GetNumberOfRecords {
666 id: bartender_id,
667 topic: register_topic,
668 })
669 .await
670 {
671 Ok(count) if count > 0 => None,
672 Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
673 id: bartender_id,
674 topic: register_topic,
675 })),
676 Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
677 }
678 };
679
680 let api = lyquor_lib::api::EthAPIServer::new(
682 &config.network.api_addr,
683 jsonrpc_client.clone(),
684 api_subs,
685 fco.clone(),
686 pool.clone(),
687 )
688 .await?;
689 sys.api = Some(api.start());
690
691 if let Some(reg) = bartender_setup {
693 tracing::info!("Waiting for bartender to be registered.");
694 reg.await.context("Failed to wait for bartender registration.")?;
695 }
696 let bartender_ins = pool
698 .send(lyquid::GetBartender)
699 .await
700 .expect("Failed to obtain bartender.");
701 let bartender_ln = bartender_ins.instance().max_number().unwrap_or(LyquidNumber::ZERO);
702 tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
703 bartender_ins
704 .wait_until(bartender_ln)
705 .await
706 .context("Failed to wait for the initial re-execution of bartender.")?;
707
708 log.send(lyquor_vm::log::Subscribe {
710 id: bartender_id,
711 topic: LyteLog::tagged_value_topic("Register"),
712 subscriber: dis.clone().recipient(),
713 })
714 .await
715 .context("Failed to subscribe to log system for LyquidDiscovery.")?;
716 dis.send(SetPool { pool: pool.clone() })
717 .await
718 .context("Failed to set pool in LyquidDiscovery.")?;
719
720 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
721
722 let db = DBFlusher {
724 last_bn: 0,
725 db_dir: config.resolved_db_dir(),
726 store,
727 fco,
728 anvil_chain,
729 flush_interval_secs: config.flush_interval_secs,
730 }
731 .start();
732 sys.db = Some(db);
733
734 Ok(sys)
735 }
736}
737
738fn generate_unused_port() -> anyhow::Result<u16> {
739 let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
740 let port = listener.local_addr().context("Failed to get local address.")?.port();
741 Ok(port)
742}
743
744fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
745 let anvil_port = generate_unused_port()?;
746 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
749 let mut sigset = SigSet::empty();
750 sigset.add(Signal::SIGINT);
751 let mut old_mask = SigSet::empty();
752 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
753 .context("Failed to mask SIGINT for anvil.")?;
754
755 let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
756
757 let stdout_reader = std::io::BufReader::new(
758 anvil
759 .child_mut()
760 .stdout
761 .take()
762 .context("Failed to read from anvil stdout.")?,
763 );
764
765 tokio::task::spawn_blocking(move || {
766 use std::io::BufRead;
767 let mut stdout_lines = stdout_reader.lines();
768 while let Some(line) = stdout_lines.next() {
769 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
770 }
771 });
772
773 tracing::info!("Anvil started at port {}.", anvil_port);
774 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
775 .context("Failed to restore old signal mask for anvil.")?;
776
777 Ok(anvil)
778}
779
780fn parse_override_value(raw: &str) -> Value {
781 if let Ok(v) = raw.parse::<bool>() {
782 return Value::from(v);
783 }
784 if let Ok(v) = raw.parse::<u64>() {
785 return Value::from(v);
786 }
787 if let Ok(v) = raw.parse::<i64>() {
788 return Value::from(v);
789 }
790 if let Ok(v) = raw.parse::<f64>() {
791 return Value::from(v);
792 }
793 Value::from(raw.to_string())
794}
795
796fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
797 let mut segments = key.split('.').peekable();
798 let mut current = overrides;
799
800 while let Some(segment) = segments.next() {
801 if segments.peek().is_none() {
802 current.insert(segment.to_string(), value);
803 return Ok(());
804 }
805
806 let entry = current
807 .entry(segment.to_string())
808 .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
809
810 match entry {
811 Value::Object(dict) => {
812 current = dict;
813 }
814 _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
815 }
816 }
817
818 Ok(())
819}
820
821fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
822 let mut overrides: Map<String, Value> = Map::new();
823
824 if let Some(entries) = matches.get_many::<String>("config-override") {
825 for entry in entries {
826 let (key, value) = entry
827 .split_once('=')
828 .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
829 insert_override_path(&mut overrides, key, parse_override_value(value))?;
830 }
831 }
832
833 Ok(overrides)
834}
835
836#[actix::main]
837async fn main() -> anyhow::Result<()> {
838 std::panic::set_hook(Box::new(|_| {
840 if let Some(&pid) = ANVIL_PID.get() {
841 tracing::warn!("Panic detected, killing anvil process {}", pid);
842 use nix::sys::signal::{Signal, kill};
843 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
844 }
845 }));
846
847 lyquor_cli::setup_tracing()?;
848
849 println!("{}", lyquor_cli::format_logo_banner());
850
851 let matches = clap::command!()
852 .propagate_version(true)
853 .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
854 .arg(
855 clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
856 .required(false)
857 .action(clap::ArgAction::Append)
858 .value_parser(clap::builder::NonEmptyStringValueParser::new()),
859 )
860 .get_matches();
861
862 let config_path = matches.get_one::<String>("config").map(PathBuf::from);
863 let overrides = build_config_overrides(&matches)?;
864 let config = NodeConfig::load(config_path, overrides)?;
865
866 let node_seed = config.node_key.seed_bytes()?;
867 let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed);
868 let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
869 .listen_addr(config.network.upc_addr.clone())
870 .build()
871 .context("Failed to setup network.")?;
872 network.start().await.context("Failed to start network.")?;
873
874 let mut _anvil: Option<AnvilInstance> = None;
875 let profile: Arc<LyquorProfile> = Arc::new(match config.profile {
876 NetworkType::Devnet => {
877 let ws_url = match &config.force_seq_eth {
878 None => {
879 let anvil = start_devnet_anvil()?;
880 ANVIL_PID.set(anvil.child().id()).ok();
882 let ep = anvil.ws_endpoint();
883 _anvil = Some(anvil);
884 ep
885 }
886 Some(url) => url.to_string(),
887 };
888
889 Devnet::new(ws_url).into()
890 }
891 _ => return Err(anyhow::anyhow!("Network is not supported.")),
892 });
893 let seq_eth_url = config
894 .force_seq_eth
895 .clone()
896 .unwrap_or_else(|| profile.sequencer().to_string());
897
898 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
901 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
902
903 let node = Node {
904 sys: Subsystems::default(),
905 config: Arc::new(config),
906 profile: profile.clone(),
907 seq_eth_url,
908 started_anvil: _anvil.is_some(),
909 network: Some(network),
910 }
911 .start();
912
913 tokio::select! {
914 _ = sigint.recv() => {
915 tracing::info!("received SIGINT");
916 }
917 _ = sigterm.recv() => {
918 tracing::info!("received SIGTERM");
919 }
920 }
921
922 Ok(node.send(Stop).await?)
923}