1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_oci::pack::{DigestAlgorithm, LyquidPackDigest};
9use lyquor_oci::registry::{OCIReference, OCIRegistry, OCIRegistryAuth, Registry};
10use lyquor_primitives::{LyquidNumber, debug_struct_name};
11use serde_json::{Map, Value};
12use tokio::signal::unix::{SignalKind, signal};
13use tokio::sync::Notify;
14
15use lyquor_api::{
16 actor::Stop,
17 anyhow::{self, Context as _Context},
18 config::{DB, NodeConfig},
19 kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
20 profile::{Devnet, LyquorProfile, NetworkType},
21 subkey_builder,
22};
23use lyquor_db::{MemDB, RocksDB};
24use lyquor_image_store::{DirectoryRepo, LyquidImageRepo};
25use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
26use lyquor_jsonrpc::types::{EthGetChainId, EthGetChainIdResp};
27use lyquor_lib::{http, lyquid, utils};
28use lyquor_primitives::B256;
29use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
30use lyquor_seq::{
31 SequenceBackend, eth,
32 fco::FCO,
33 side_effects::{SideEffectProviderHandle, SideEffectStore},
34};
35use lyquor_vm::log::Log;
36
37subkey_builder!(RootSubKey(
38 ([0x01])-lyquid() => Key,
39 ([0x02])-fco() => Key,
40 ([0x03])-log() => Key,
41 ([0x04])-side_effect_store() => Key,
42));
43
44fn subkey() -> &'static RootSubKey {
45 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
46 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
47}
48
49const ANVIL_STATE_FILENAME: &str = "anvil.state";
50
51static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
52
53#[derive(Debug, Message)]
54#[rtype(result = "()")]
55struct LoadLyquid {
56 id: LyquidID,
57 deps: Vec<LyquidID>,
58}
59
60#[derive(Debug, Message)]
61#[rtype(result = "()")]
62struct SetPool {
63 pool: Addr<lyquid::LyquidPool>,
64}
65
66struct LyquidDiscovery {
67 pool: Option<Addr<lyquid::LyquidPool>>,
68 register: Arc<Notify>,
69 bartender_id: LyquidID,
70 bartender_addr: Address,
71}
72
73lyquor_primitives::debug_struct_name!(LyquidDiscovery);
74
75impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
76 type Result = ResponseFuture<Address>;
77 #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
78 fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
79 let lyquor_seq::eth::GetContract { id, idx } = msg;
80 if id == self.bartender_id && idx == Some(0) {
81 return Box::pin(std::future::ready(self.bartender_addr));
82 }
83 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
84 let reg = self.register.clone();
85 Box::pin(async move {
86 loop {
87 if let Some(n) = idx {
91 match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
92 Ok(Some(info)) => return info.contract,
93 Ok(None) => reg.notified().await,
94 Err(e) => {
95 tracing::error!("Failed to get lyquid deployment info: {}", e);
96 std::future::pending::<()>().await;
98 }
99 }
100 } else {
101 match pool.send(crate::lyquid::GetLatestLyquidDeploymentInfo { id }).await {
102 Ok(Some(info)) => return info.contract,
103 Ok(None) => reg.notified().await,
104 Err(e) => {
105 tracing::error!("Failed to get latest lyquid deployment info: {}", e);
106 std::future::pending::<()>().await;
107 }
108 }
109 }
110 }
111 })
112 }
113}
114
115impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
118 type Result = ();
119
120 #[tracing::instrument(level = "trace", skip(msg))]
121 fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
122 let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
123 if let Some(event) = event {
124 ctx.spawn(self.load(event.id, event.deps).into_actor(self));
125 }
126 }
127}
128
129impl Handler<LoadLyquid> for LyquidDiscovery {
130 type Result = ResponseFuture<()>;
131
132 #[tracing::instrument(level = "trace")]
133 fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
134 Box::pin(self.load(msg.id, msg.deps))
135 }
136}
137
138impl Handler<SetPool> for LyquidDiscovery {
139 type Result = ();
140
141 #[tracing::instrument(level = "trace")]
142 fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
143 if self.pool.is_some() {
144 return;
145 }
146 let pool = msg.pool;
147 let me = ctx.address();
148 self.pool = Some(pool.clone());
149 ctx.spawn(
150 async move {
151 let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
154 None => {
155 tracing::error!("Failed to obtain the registered Lyquid list.");
160 return;
161 }
162 Some(lyquids_with_deps) => lyquids_with_deps,
163 };
164 for (id, deps) in lyquids_with_deps.into_iter() {
166 if let Err(e) = me.send(LoadLyquid { id, deps }).await {
170 tracing::error!("Failed to send LoadLyquid message: {}", e);
171 }
172 }
173 }
174 .into_actor(self),
175 );
176 }
177}
178
179impl LyquidDiscovery {
180 fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
181 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
182 let register = self.register.clone();
183 async move {
184 let (repo_hint, image_digest) = match pool
186 .send(crate::lyquid::GetLatestLyquidDeploymentInfo { id })
187 .await
188 .ok()
189 .flatten()
190 {
191 Some(info) => (info.repo_hint, info.image_digest),
192 None => (None, B256::ZERO),
193 };
194
195 if let Some(repo_hint) = repo_hint {
196 match pool.send(crate::lyquid::ImageRepo).await {
197 Ok(repo) => {
198 Self::pull_image_from_repo_hint(repo.0, repo_hint, image_digest)
199 .await
200 .unwrap_or_else(|e| {
201 tracing::error!("Failed to pull lyquid image from repo hint: {e}");
202 });
203 }
204 Err(e) => tracing::error!("LyquidDiscovery: failed to get ImageRepo: {e}"),
205 }
206 }
207
208 match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
209 Ok(result) => match result {
210 Ok(already_exists) => {
211 if !already_exists {
212 tracing::info!("Discovered {}", id);
213 }
214 }
215 Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
216 },
217 Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
218 }
219 register.notify_waiters();
220 }
221 }
222
223 async fn pull_image_from_repo_hint(
224 image_repo: Arc<dyn LyquidImageRepo>, repo_hint: String, image_digest: B256,
225 ) -> Result<(), String> {
226 let oci_reference = match OCIReference::from_str(&repo_hint) {
227 Ok(reference) => reference,
228 Err(primary_err) => {
229 let with_latest = format!("{repo_hint}:latest");
231 OCIReference::from_str(&with_latest)
232 .map_err(|_| format!("Unsupported repo hint format (expected OCI reference): {primary_err}"))?
233 }
234 };
235 let host = match oci_reference.registry().split_once('/').map(|(host, _)| host) {
236 Some(h) => h,
237 None => oci_reference.registry(),
238 };
239 let auth = match OCIRegistry::get_docker_credential(host) {
240 Ok(auth) => auth,
241 Err(e) => {
242 tracing::trace!(
243 "No docker credential found (reason: {}), falling back to anonymous auth.",
244 e
245 );
246 OCIRegistryAuth::Anonymous
247 }
248 };
249 let oci = OCIRegistry::new(auth, oci_reference.registry());
250 let digest = LyquidPackDigest::new(image_digest, DigestAlgorithm::Sha256);
251
252 let has_image = image_repo
253 .contains(image_digest)
254 .await
255 .map_err(|e| format!("Failed to check local ImageRepo: {e}"))?;
256 if has_image {
257 tracing::debug!("LyquidDiscovery: image already exists in local ImageRepo: {image_digest:?}");
258 return Ok(());
259 }
260
261 tracing::info!("LyquidDiscovery: downloading image from repo hint: {}", repo_hint);
262
263 let pack = oci
265 .pull_full(oci_reference.repository(), None, Some(&digest))
266 .await
267 .map_err(|e| e.to_string())?;
268
269 if *pack.digest().digest() != image_digest {
270 return Err(format!(
271 "Pulled image digest mismatch. expected={}, got={}",
272 digest.to_oci_digest(),
273 pack.digest().to_oci_digest()
274 ));
275 }
276
277 match image_repo.put(pack).await {
279 Ok(()) => (),
280 Err(e) => return Err(e.to_string()),
281 }
282
283 tracing::info!("Image digest: {:?}", image_digest);
284
285 Ok(())
288 }
289}
290
291impl Handler<Stop> for LyquidDiscovery {
292 type Result = ();
293
294 #[tracing::instrument(level = "trace")]
295 fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
296 ctx.stop();
297 }
298}
299
300impl Actor for LyquidDiscovery {
301 type Context = actix::Context<Self>;
302
303 #[tracing::instrument(level = "trace", skip(_ctx))]
304 fn started(&mut self, _ctx: &mut Self::Context) {}
305
306 #[tracing::instrument(level = "trace", skip(_ctx))]
307 fn stopped(&mut self, _ctx: &mut Self::Context) {}
308}
309
310struct DBFlusher {
311 last_bn: u64,
312 db_dir: PathBuf,
313 store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
314 fco: Addr<FCO>,
315 anvil_chain: Option<ClientHandle>,
316 flush_interval_secs: u64,
317}
318
319lyquor_primitives::debug_struct_name!(DBFlusher);
320
321impl DBFlusher {
322 fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
323 let anvil_chain = self.anvil_chain.clone();
324 let fco = self.fco.clone();
325 let db_dir = self.db_dir.clone();
326 async move {
327 fco.send(lyquor_seq::fco::Commit)
328 .await
329 .map_err(anyhow::Error::from)
330 .and_then(|e| e.map_err(anyhow::Error::from))?;
331
332 if let Some(client) = &anvil_chain {
333 use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
341 let dump = client
342 .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
343 .await
344 .map(|r| r.0);
345
346 match dump {
347 Ok(state) => {
348 let devnet_path = db_dir.join("devnet");
350 std::fs::create_dir_all(&devnet_path)?;
351 std::fs::write(devnet_path.join(ANVIL_STATE_FILENAME), &state)?;
352 }
353 Err(_) => {
354 tracing::error!(
355 "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
356 );
357 }
358 }
359 }
360
361 Ok(())
362 }
363 }
364
365 fn flush(&mut self, res: anyhow::Result<()>) {
366 if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
367 tracing::error!("Failed to write: {e}");
368 } else {
369 tracing::info!("Written to the storage backend (block={}).", self.last_bn);
370 }
371 }
372}
373
374#[derive(Message, Debug)]
375#[rtype(result = "()")]
376struct FlushDB;
377
378impl Handler<FlushDB> for DBFlusher {
379 type Result = ResponseActFuture<Self, ()>;
380
381 #[tracing::instrument(level = "trace", skip(self, _ctx))]
382 fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
383 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
384 actor.flush(res);
385 }))
386 }
387}
388
389impl Handler<Stop> for DBFlusher {
390 type Result = ResponseActFuture<Self, ()>;
391
392 #[tracing::instrument(level = "trace")]
393 fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
394 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
395 actor.flush(res);
396 }))
397 }
398}
399
400impl Actor for DBFlusher {
401 type Context = actix::Context<Self>;
402
403 #[tracing::instrument(level = "trace", skip_all)]
404 fn started(&mut self, ctx: &mut Self::Context) {
405 if self.flush_interval_secs > 0 {
406 let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
407 ctx.run_interval(periodic_write, |_act, ctx| {
408 ctx.address().do_send(FlushDB);
409 });
410 }
411 }
412
413 #[tracing::instrument(level = "trace", skip_all)]
414 fn stopped(&mut self, _ctx: &mut Self::Context) {}
415}
416
417async fn setup_eth_backend(
418 profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
419) -> anyhow::Result<eth::Backend> {
420 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
421 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
422
423 Ok(eth::Backend::new(
424 eth::Config::builder()
425 .finality_tag(finality_tag)
426 .registry(reg)
427 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
428 .build(),
429 jsonrpc_client,
430 ))
431}
432
433#[derive(Default)]
434struct Subsystems {
435 fco: Option<Addr<FCO>>,
436 log: Option<Addr<Log>>,
437 pool: Option<Addr<lyquid::LyquidPool>>,
438 api: Option<http::HttpServer>,
439 db: Option<Addr<DBFlusher>>,
440 dis: Option<Addr<LyquidDiscovery>>,
441 client: Option<ClientHandle>,
442 eth_backend: Option<lyquor_seq::eth::Backend>,
443}
444debug_struct_name!(Subsystems);
445
446impl Subsystems {
447 #[tracing::instrument(level = "trace")]
448 async fn stop(&mut self) {
449 if let Some(api) = self.api.take() {
450 api.stop();
451 }
452 if let Some(eth_backend) = self.eth_backend.take() {
453 eth_backend.stop().await;
454 }
455 if let Some(log) = self.log.take() {
456 log.send(Stop).await.ok();
457 }
458 if let Some(dis) = self.dis.take() {
459 dis.send(Stop).await.ok();
460 }
461 if let Some(pool) = self.pool.take() {
462 pool.send(Stop).await.ok();
463 }
464 if let Some(db) = self.db.take() {
465 db.send(Stop).await.ok();
466 }
467 if let Some(dis) = self.dis.take() {
468 dis.send(Stop).await.ok();
469 }
470 if let Some(fco) = self.fco.take() {
471 fco.send(Stop).await.ok();
472 }
473 }
474}
475
476struct Node {
478 sys: Subsystems,
479 config: Arc<NodeConfig>,
480 profile: Arc<LyquorProfile>,
481 seq_eth_url: String,
482 started_anvil: bool,
483 network: Option<lyquor_net::hub::Hub>,
484}
485
486lyquor_primitives::debug_struct_name!(Node);
487
488impl Handler<Stop> for Node {
489 type Result = AtomicResponse<Self, ()>;
490
491 #[tracing::instrument(level = "trace")]
492 fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
493 let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
494 AtomicResponse::new(Box::pin(
495 async move {
496 sys.stop().await;
497 }
498 .into_actor(self)
499 .map(|_, _act, ctx| {
500 ctx.stop();
501 }),
502 ))
503 }
504}
505
506impl Actor for Node {
507 type Context = Context<Self>;
508
509 #[tracing::instrument(level = "trace", skip(ctx))]
510 fn started(&mut self, ctx: &mut Self::Context) {
511 ctx.spawn(
513 Self::init(
514 self.config.clone(),
515 self.profile.clone(),
516 self.seq_eth_url.clone(),
517 self.started_anvil,
518 self.network.take().expect("Node: Network not found."),
519 )
520 .into_actor(self)
521 .map(|sys, act, ctx| {
522 match sys {
523 Ok(sys) => {
524 act.sys = sys;
525 }
526 Err(e) => {
527 tracing::error!("Failed to initialize node: {e:?}");
528 ctx.stop();
529 }
530 };
531 }),
532 );
533 }
534
535 #[tracing::instrument(level = "trace", skip(_ctx))]
536 fn stopped(&mut self, _ctx: &mut Self::Context) {}
537}
538
539impl Node {
540 #[tracing::instrument(level = "trace", skip_all)]
541 async fn init(
542 config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String, started_anvil: bool,
543 network: lyquor_net::hub::Hub,
544 ) -> anyhow::Result<Subsystems> {
545 let mut sys = Subsystems::default();
546
547 let jsonrpc_client = ClientConfig::builder().url(seq_eth_url.parse()?).build().into_client();
549 sys.client = Some(jsonrpc_client.clone());
550 let sequence_backend_chain_id: u64 = jsonrpc_client
551 .request::<EthGetChainId, EthGetChainIdResp>(EthGetChainId)
552 .await
553 .context("Failed to query sequence backend chain ID.")?
554 .0
555 .to();
556 let sequence_backend_bartender_addr =
557 Address::from_str(profile.bartender_addr()).context("Invalid bartender address in profile.")?;
558
559 let (store, lyq_store, anvil_chain): (
561 Arc<ShadowKVStore<Box<dyn KVStore>>>,
562 Arc<dyn utils::LVMStoreFactory>,
563 Option<ClientHandle>,
564 ) = match &config.storage.db {
565 DB::MemDb => {
566 tracing::warn!("using MemDB, all data will be lost upon exit");
567 let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
568 (store, Arc::new(utils::LVMMemStore), None)
569 }
570 DB::RocksDb { db_dir } => {
571 let base_dir = db_dir.clone();
572 let store = Arc::new(ShadowKVStore::new(
573 Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
574 ));
575 let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
576 store.clone(),
577 subkey().lyquid(),
578 ))));
579
580 let anvil_chain = if started_anvil {
582 let state_file = db_dir.join(profile.id()).join(ANVIL_STATE_FILENAME);
583 if state_file.exists() {
584 use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
585 let state_data = std::fs::read(&state_file)?;
586 let load_result = jsonrpc_client
587 .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
588 .await?;
589 tracing::info!("Loaded anvil state from {}: {}.", state_file.display(), load_result.0);
590 } else {
591 tracing::info!(
592 "No existing anvil state file found at {}, starting with fresh state.",
593 state_file.display()
594 );
595 }
596 Some(jsonrpc_client.clone())
597 } else {
598 None
599 };
600
601 (store, lyq_store, anvil_chain)
602 }
603 };
604 let dis = LyquidDiscovery {
609 pool: None,
610 register: Arc::new(Notify::new()),
611 bartender_id: *profile.bartender_id(),
612 bartender_addr: sequence_backend_bartender_addr,
613 }
614 .start();
615 sys.dis = Some(dis.clone());
616
617 let eth_backend = setup_eth_backend(&profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
618 let side_effect_store = Arc::new(SideEffectStore::new(
619 Arc::new(PrefixedKVStore::new(store.clone(), subkey().side_effect_store())),
620 1000,
621 ));
622 let side_effect_provider = SideEffectProviderHandle::local(side_effect_store.clone());
623
624 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
625 let fco = FCO::new(
626 eth_backend.clone(),
627 Arc::new(fco_store),
628 Some(profile.init_chain_position()),
629 Some(side_effect_provider),
630 )?
631 .start();
632 sys.fco = Some(fco.clone());
633 sys.eth_backend = Some(eth_backend);
635 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
640 let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
641 sys.log = Some(log.clone());
642
643 let mut lvm = lyquid::LVM::new(
644 log.clone(),
645 network,
646 10,
647 Some(lyquor_vm::Sequencer {
648 inter: fco.clone().recipient(),
649 submit: fco.clone().recipient(),
650 get_oracle_epoch: fco.clone().recipient(),
651 }),
652 );
653 let api_subs = http::Subscriptions::new().start();
654 for peer in &config.peers {
658 lvm.upc_mut()
659 .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
660 .await
661 .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
662 }
663 let image_repo: Arc<dyn LyquidImageRepo> = Arc::new(
667 DirectoryRepo::new(&config.storage.image_repo_dir)
668 .await
669 .context("Failed to initialize image store.")?,
670 );
671 let bartender_id = *profile.bartender_id();
672 let pool = lyquid::LyquidPool::new(
673 bartender_id,
674 lvm,
675 lyquid::LyquidPoolSetup {
676 fco: fco.clone(),
677 store_factory: lyq_store,
678 image_repo,
679 side_effect_store,
680 api_subs: api_subs.clone(),
681 sequence_backend_chain_id,
682 sequence_backend_bartender_addr,
683 config: lyquid::LyquidConfig::builder().build(),
684 },
685 )
686 .await?
687 .start();
688 sys.pool = Some(pool.clone());
689 let bartender_setup = {
693 let register_topic = LyteLog::tagged_value_topic("Register");
694 match log
695 .send(lyquor_vm::log::GetNumberOfRecords {
696 id: bartender_id,
697 topic: register_topic,
698 })
699 .await
700 {
701 Ok(count) if count > 0 => None,
702 Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
703 id: bartender_id,
704 topic: register_topic,
705 })),
706 Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
707 }
708 };
709
710 let mut api = lyquor_lib::http::HttpServer::new(
712 &config.network.api_addr,
713 jsonrpc_client.clone(),
714 api_subs,
715 fco.clone(),
716 pool.clone(),
717 )
718 .await?;
719 api.start();
720 sys.api = Some(api);
721
722 if let Some(reg) = bartender_setup {
724 tracing::info!("Waiting for bartender to be registered.");
725 reg.await.context("Failed to wait for bartender registration.")?;
726 }
727 let bartender_ins = pool
729 .send(lyquid::GetBartender)
730 .await
731 .expect("Failed to obtain bartender.");
732 let bartender_ln = bartender_ins.instance().max_number().unwrap_or(LyquidNumber::ZERO);
733 tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
734 bartender_ins
735 .wait_until(bartender_ln)
736 .await
737 .context("Failed to wait for the initial re-execution of bartender.")?;
738
739 log.send(lyquor_vm::log::Subscribe {
741 id: bartender_id,
742 topic: LyteLog::tagged_value_topic("Register"),
743 subscriber: dis.clone().recipient(),
744 })
745 .await
746 .context("Failed to subscribe to log system for LyquidDiscovery.")?;
747 dis.send(SetPool { pool: pool.clone() })
748 .await
749 .context("Failed to set pool in LyquidDiscovery.")?;
750
751 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
752
753 let db = DBFlusher {
755 last_bn: 0,
756 db_dir: config.resolved_db_dir(),
757 store,
758 fco,
759 anvil_chain,
760 flush_interval_secs: config.flush_interval_secs,
761 }
762 .start();
763 sys.db = Some(db);
764
765 Ok(sys)
766 }
767}
768
769fn generate_unused_port() -> anyhow::Result<u16> {
770 let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
771 let port = listener.local_addr().context("Failed to get local address.")?.port();
772 Ok(port)
773}
774
775fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
776 let anvil_port = generate_unused_port()?;
777 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
780 let mut sigset = SigSet::empty();
781 sigset.add(Signal::SIGINT);
782 let mut old_mask = SigSet::empty();
783 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
784 .context("Failed to mask SIGINT for anvil.")?;
785
786 let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
787
788 let stdout_reader = std::io::BufReader::new(
789 anvil
790 .child_mut()
791 .stdout
792 .take()
793 .context("Failed to read from anvil stdout.")?,
794 );
795
796 tokio::task::spawn_blocking(move || {
797 use std::io::BufRead;
798 let mut stdout_lines = stdout_reader.lines();
799 while let Some(line) = stdout_lines.next() {
800 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
801 }
802 });
803
804 tracing::info!("Anvil started at port {}.", anvil_port);
805 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
806 .context("Failed to restore old signal mask for anvil.")?;
807
808 Ok(anvil)
809}
810
811fn parse_override_value(raw: &str) -> Value {
812 if let Ok(v) = raw.parse::<bool>() {
813 return Value::from(v);
814 }
815 if let Ok(v) = raw.parse::<u64>() {
816 return Value::from(v);
817 }
818 if let Ok(v) = raw.parse::<i64>() {
819 return Value::from(v);
820 }
821 if let Ok(v) = raw.parse::<f64>() {
822 return Value::from(v);
823 }
824 Value::from(raw.to_string())
825}
826
827fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
828 let mut segments = key.split('.').peekable();
829 let mut current = overrides;
830
831 while let Some(segment) = segments.next() {
832 if segments.peek().is_none() {
833 current.insert(segment.to_string(), value);
834 return Ok(());
835 }
836
837 let entry = current
838 .entry(segment.to_string())
839 .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
840
841 match entry {
842 Value::Object(dict) => {
843 current = dict;
844 }
845 _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
846 }
847 }
848
849 Ok(())
850}
851
852fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
853 let mut overrides: Map<String, Value> = Map::new();
854
855 if let Some(entries) = matches.get_many::<String>("config-override") {
856 for entry in entries {
857 let (key, value) = entry
858 .split_once('=')
859 .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
860 insert_override_path(&mut overrides, key, parse_override_value(value))?;
861 }
862 }
863
864 Ok(overrides)
865}
866
867#[actix::main]
868async fn main() -> anyhow::Result<()> {
869 std::panic::set_hook(Box::new(|_| {
871 if let Some(&pid) = ANVIL_PID.get() {
872 tracing::warn!("Panic detected, killing anvil process {}", pid);
873 use nix::sys::signal::{Signal, kill};
874 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
875 }
876 }));
877
878 lyquor_cli::setup_tracing()?;
879
880 println!("{}", lyquor_cli::format_logo_banner());
881
882 let matches = clap::command!()
883 .propagate_version(true)
884 .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
885 .arg(
886 clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
887 .required(false)
888 .action(clap::ArgAction::Append)
889 .value_parser(clap::builder::NonEmptyStringValueParser::new()),
890 )
891 .get_matches();
892
893 let config_path = matches.get_one::<String>("config").map(PathBuf::from);
894 let overrides = build_config_overrides(&matches)?;
895 let config = NodeConfig::load(config_path, overrides)?;
896
897 let node_seed = config.node_key.seed_bytes()?;
898 let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed);
899 let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
900 .listen_addr(config.network.upc_addr.clone())
901 .build()
902 .context("Failed to setup network.")?;
903 network.start().await.context("Failed to start network.")?;
904
905 let mut _anvil: Option<AnvilInstance> = None;
906 let profile: Arc<LyquorProfile> = Arc::new(match config.profile {
907 NetworkType::Devnet => {
908 let ws_url = match &config.force_seq_eth {
909 None => {
910 let anvil = start_devnet_anvil()?;
911 ANVIL_PID.set(anvil.child().id()).ok();
913 let ep = anvil.ws_endpoint();
914 _anvil = Some(anvil);
915 ep
916 }
917 Some(url) => url.to_string(),
918 };
919
920 Devnet::new(ws_url).into()
921 }
922 _ => return Err(anyhow::anyhow!("Network is not supported.")),
923 });
924 let seq_eth_url = config
925 .force_seq_eth
926 .clone()
927 .unwrap_or_else(|| profile.sequencer().to_string());
928
929 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
932 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
933
934 let node = Node {
935 sys: Subsystems::default(),
936 config: Arc::new(config),
937 profile: profile.clone(),
938 seq_eth_url,
939 started_anvil: _anvil.is_some(),
940 network: Some(network),
941 }
942 .start();
943
944 tokio::select! {
945 _ = sigint.recv() => {
946 tracing::info!("received SIGINT");
947 }
948 _ = sigterm.recv() => {
949 tracing::info!("received SIGTERM");
950 }
951 }
952
953 Ok(node.send(Stop).await?)
954}