1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use tokio::signal::unix::{SignalKind, signal};
10use tokio::sync::Notify;
11
12use lyquor_api::{
13 actor::Stop,
14 anyhow::{self, Context as _Context},
15 kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
16 subkey_builder,
17};
18use lyquor_crypto::{EvmSigner, LocalEvmSigner, ProdEvmSigner};
19use lyquor_db::{MemDB, RocksDB};
20use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
21use lyquor_lib::{api, lyquid, utils};
22use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, Network as OracleNetwork, RegisterEvent};
23use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
24use lyquor_vm::log::Log;
25
26subkey_builder!(RootSubKey(
27 ([0x01])-lyquid() => Key,
28 ([0x02])-fco() => Key,
29 ([0x03])-log() => Key,
30 ([0x04])-event_store() => Key,
31));
32
33fn subkey() -> &'static RootSubKey {
34 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
35 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
36}
37
38const LYQUOR_DB_ROOT: &str = "./lyquor_db";
39const ANVIL_STATE_FILENAME: &str = "anvil.state";
40
41static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
42
43#[derive(Debug, Message)]
44#[rtype(result = "()")]
45struct LoadLyquid {
46 id: LyquidID,
47 deps: Vec<LyquidID>,
48}
49
50#[derive(Debug, Message)]
51#[rtype(result = "()")]
52struct SetPool {
53 pool: Addr<lyquid::LyquidPool>,
54}
55
56struct LyquidDiscovery {
57 pool: Option<Addr<lyquid::LyquidPool>>,
58 register: Arc<Notify>,
59 bartender_id: LyquidID,
60 bartender_addr: Address,
61}
62
63lyquor_primitives::debug_struct_name!(LyquidDiscovery);
64
65impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
66 type Result = ResponseFuture<Address>;
67 #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
68 fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
69 let lyquor_seq::eth::GetContract { id, idx } = msg;
70 if id == self.bartender_id && idx == Some(0) {
71 return Box::pin(std::future::ready(self.bartender_addr));
72 }
73 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
74 let reg = self.register.clone();
75 Box::pin(async move {
76 loop {
77 if let Some(n) = idx {
81 match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
82 Ok(Some(info)) => return info.contract,
83 Ok(None) => reg.notified().await,
84 Err(e) => {
85 tracing::error!("Failed to get lyquid deployment info: {}", e);
86 std::future::pending::<()>().await;
88 }
89 }
90 } else {
91 match pool.send(crate::lyquid::GetLatestLyquidInfo { id }).await {
92 Ok(Some(info)) => return info.contract,
93 Ok(None) => reg.notified().await,
94 Err(e) => {
95 tracing::error!("Failed to get latest lyquid info: {}", e);
96 std::future::pending::<()>().await;
97 }
98 }
99 }
100 }
101 })
102 }
103}
104
105impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
108 type Result = ();
109
110 #[tracing::instrument(level = "trace", skip(msg))]
111 fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
112 let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
113 if let Some(event) = event {
114 ctx.spawn(self.load(event.id, event.deps).into_actor(self));
115 }
116 }
117}
118
119impl Handler<LoadLyquid> for LyquidDiscovery {
120 type Result = ResponseFuture<()>;
121
122 #[tracing::instrument(level = "trace")]
123 fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
124 Box::pin(self.load(msg.id, msg.deps))
125 }
126}
127
128impl Handler<SetPool> for LyquidDiscovery {
129 type Result = ();
130
131 #[tracing::instrument(level = "trace")]
132 fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
133 if self.pool.is_some() {
134 return
135 }
136 let pool = msg.pool;
137 let me = ctx.address();
138 self.pool = Some(pool.clone());
139 ctx.spawn(
140 async move {
141 let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
144 None => {
145 tracing::error!("Failed to obtain the registered Lyquid list.");
150 return
151 }
152 Some(lyquids_with_deps) => lyquids_with_deps,
153 };
154 for (id, deps) in lyquids_with_deps.into_iter() {
156 if let Err(e) = me.send(LoadLyquid { id, deps }).await {
157 tracing::error!("Failed to send LoadLyquid message: {}", e);
158 }
159 }
160 }
161 .into_actor(self),
162 );
163 }
164}
165
166impl LyquidDiscovery {
167 fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
168 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
169 let register = self.register.clone();
170 async move {
171 match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
172 Ok(result) => match result {
173 Ok(already_exists) => {
174 if !already_exists {
175 tracing::info!("Discovered {}", id);
176 }
177 }
178 Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
179 },
180 Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
181 }
182 register.notify_waiters();
183 }
184 }
185}
186
187impl Handler<Stop> for LyquidDiscovery {
188 type Result = ();
189
190 #[tracing::instrument(level = "trace")]
191 fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
192 ctx.stop();
193 }
194}
195
196impl Actor for LyquidDiscovery {
197 type Context = actix::Context<Self>;
198
199 #[tracing::instrument(level = "trace", skip(_ctx))]
200 fn started(&mut self, _ctx: &mut Self::Context) {}
201
202 #[tracing::instrument(level = "trace", skip(_ctx))]
203 fn stopped(&mut self, _ctx: &mut Self::Context) {}
204}
205
206struct DBFlusher {
207 last_bn: u64,
208 store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
209 fco: Addr<FCO>,
210 anvil_chain: Option<ClientHandle>,
211 flush_interval_secs: u64,
212}
213
214lyquor_primitives::debug_struct_name!(DBFlusher);
215
216impl DBFlusher {
217 fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
218 let anvil_chain = self.anvil_chain.clone();
219 let fco = self.fco.clone();
220 async move {
221 fco.send(lyquor_seq::fco::Commit)
222 .await
223 .map_err(anyhow::Error::from)
224 .and_then(|e| e.map_err(anyhow::Error::from))?;
225
226 if let Some(client) = &anvil_chain {
227 use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
235 let dump = client
236 .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
237 .await
238 .map(|r| r.0);
239
240 match dump {
241 Ok(state) => {
242 let devnet_path = format!("{}/devnet", LYQUOR_DB_ROOT);
244 std::fs::create_dir_all(&devnet_path)?;
245 std::fs::write(&format!("{}/{}", devnet_path, ANVIL_STATE_FILENAME), &state)?;
246 }
247 Err(_) => {
248 tracing::error!(
249 "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
250 );
251 }
252 }
253 }
254
255 Ok(())
256 }
257 }
258
259 fn flush(&mut self, res: anyhow::Result<()>) {
260 if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
261 tracing::error!("Failed to write: {e}");
262 } else {
263 tracing::info!("Written to the storage backend (block={}).", self.last_bn);
264 }
265 }
266}
267
268#[derive(Message, Debug)]
269#[rtype(result = "()")]
270struct FlushDB;
271
272impl Handler<FlushDB> for DBFlusher {
273 type Result = ResponseActFuture<Self, ()>;
274
275 #[tracing::instrument(level = "trace", skip(self, _ctx))]
276 fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
277 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
278 actor.flush(res);
279 }))
280 }
281}
282
283impl Handler<Stop> for DBFlusher {
284 type Result = ResponseActFuture<Self, ()>;
285
286 #[tracing::instrument(level = "trace")]
287 fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
288 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
289 actor.flush(res);
290 }))
291 }
292}
293
294impl Actor for DBFlusher {
295 type Context = actix::Context<Self>;
296
297 #[tracing::instrument(level = "trace", skip_all)]
298 fn started(&mut self, ctx: &mut Self::Context) {
299 if self.flush_interval_secs > 0 {
300 let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
301 ctx.run_interval(periodic_write, |_act, ctx| {
302 ctx.address().do_send(FlushDB);
303 });
304 }
305 }
306
307 #[tracing::instrument(level = "trace", skip_all)]
308 fn stopped(&mut self, _ctx: &mut Self::Context) {}
309}
310
311async fn setup_eth_backend(
312 profile: &utils::LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
313) -> anyhow::Result<eth::Backend> {
314 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
315 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
316
317 Ok(eth::Backend::new(
318 eth::Config::builder()
319 .finality_tag(finality_tag)
320 .registry(reg)
321 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
322 .build(),
323 jsonrpc_client,
324 ))
325}
326
327#[derive(Default)]
328struct Subsystems {
329 fco: Option<Addr<FCO>>,
330 log: Option<Addr<Log>>,
331 pool: Option<Addr<lyquid::LyquidPool>>,
332 api: Option<Addr<api::EthAPIServer>>,
333 db: Option<Addr<DBFlusher>>,
334 dis: Option<Addr<LyquidDiscovery>>,
335 client: Option<ClientHandle>,
336 eth_backend: Option<lyquor_seq::eth::Backend>,
337}
338debug_struct_name!(Subsystems);
339
340impl Subsystems {
341 #[tracing::instrument(level = "trace")]
342 async fn stop(&mut self) {
343 if let Some(api) = self.api.take() {
344 api.send(Stop).await.ok();
345 }
346 if let Some(eth_backend) = self.eth_backend.take() {
347 eth_backend.stop().await;
348 }
349 if let Some(log) = self.log.take() {
350 log.send(Stop).await.ok();
351 }
352 if let Some(dis) = self.dis.take() {
353 dis.send(Stop).await.ok();
354 }
355 if let Some(pool) = self.pool.take() {
356 pool.send(Stop).await.ok();
357 }
358 if let Some(db) = self.db.take() {
359 db.send(Stop).await.ok();
360 }
361 if let Some(dis) = self.dis.take() {
362 dis.send(Stop).await.ok();
363 }
364 if let Some(fco) = self.fco.take() {
365 fco.send(Stop).await.ok();
366 }
367 }
368}
369
370struct NodeConfig {
371 mem_db: bool,
372 seq_eth_url: String,
373 api_url: String,
374 profile: utils::LyquorProfile,
375 started_anvil: bool,
376 flush_interval_secs: u64,
377}
378
379struct Node {
381 sys: Subsystems,
382 config: Arc<NodeConfig>,
383 network: Option<lyquor_net::hub::Hub>,
384}
385
386lyquor_primitives::debug_struct_name!(Node);
387
388impl Handler<Stop> for Node {
389 type Result = AtomicResponse<Self, ()>;
390
391 #[tracing::instrument(level = "trace")]
392 fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
393 let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
394 AtomicResponse::new(Box::pin(
395 async move {
396 sys.stop().await;
397 }
398 .into_actor(self)
399 .map(|_, _act, ctx| {
400 ctx.stop();
401 }),
402 ))
403 }
404}
405
406impl Actor for Node {
407 type Context = Context<Self>;
408
409 #[tracing::instrument(level = "trace", skip(ctx))]
410 fn started(&mut self, ctx: &mut Self::Context) {
411 ctx.spawn(
413 Self::init(
414 self.config.clone(),
415 self.network.take().expect("Node: Network not found."),
416 )
417 .into_actor(self)
418 .map(|sys, act, ctx| {
419 match sys {
420 Ok(sys) => {
421 act.sys = sys;
422 }
423 Err(e) => {
424 tracing::error!("Failed to initialize node: {e:?}");
425 ctx.stop();
426 }
427 };
428 }),
429 );
430 }
431
432 #[tracing::instrument(level = "trace", skip(_ctx))]
433 fn stopped(&mut self, _ctx: &mut Self::Context) {}
434}
435
436impl Node {
437 #[tracing::instrument(level = "trace", skip_all)]
438 async fn init(config: Arc<NodeConfig>, network: lyquor_net::hub::Hub) -> anyhow::Result<Subsystems> {
439 let mut sys = Subsystems::default();
440
441 let store: Arc<ShadowKVStore<Box<dyn KVStore>>>;
443 let lyq_store: Arc<dyn utils::LVMStoreFactory> = if config.mem_db {
444 tracing::warn!("using MemDB, all data will be lost upon exit");
445 store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new())));
446 Arc::new(utils::LVMMemStore)
447 } else {
448 store = Arc::new(ShadowKVStore::new(Box::new(RocksDB::new(
449 &Path::new(LYQUOR_DB_ROOT).join(config.profile.id()),
450 )?)));
451 Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
452 store.clone(),
453 subkey().lyquid(),
454 ))))
455 };
456 let dis = LyquidDiscovery {
461 pool: None,
462 register: Arc::new(Notify::new()),
463 bartender_id: *config.profile.bartender_id(),
464 bartender_addr: Address::from_str(config.profile.bartender_addr())
465 .context("Invalid bartender address in profile.")?
466 .into(),
467 }
468 .start();
469 sys.dis = Some(dis.clone());
470
471 let jsonrpc_client = ClientConfig::builder()
473 .url(config.seq_eth_url.clone().parse()?)
474 .build()
475 .into_client();
476 sys.client = Some(jsonrpc_client.clone());
477
478 let anvil_chain = if config.started_anvil && !config.mem_db {
480 let state_file = format!("{}/{}/{}", LYQUOR_DB_ROOT, config.profile.id(), ANVIL_STATE_FILENAME);
481 if std::path::Path::new(&state_file).exists() {
482 use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
483 let state_data = std::fs::read(&state_file)?;
484 let load_result = jsonrpc_client
485 .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
486 .await?;
487 tracing::info!("Loaded anvil state from {}: {}.", state_file, load_result.0);
488 } else {
489 tracing::info!(
490 "No existing anvil state file found at {}, starting with fresh state.",
491 state_file
492 );
493 }
494 Some(jsonrpc_client.clone())
495 } else {
496 None
497 };
498
499 let eth_backend = setup_eth_backend(&config.profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
500
501 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
502 let fco = FCO::new(
503 eth_backend.clone(),
504 Arc::new(fco_store),
505 Some(config.profile.init_chain_position()),
506 )?
507 .start();
508 sys.fco = Some(fco.clone());
509 sys.eth_backend = Some(eth_backend);
511 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
516 let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
517 sys.log = Some(log.clone());
518
519 let oracle_network = match config.profile.id() {
521 "devnet" => OracleNetwork::Devnet,
522 "testnet" => OracleNetwork::Testnet,
523 "mainnet" => OracleNetwork::Mainnet,
524 _ => OracleNetwork::Devnet,
525 };
526
527 let evm_signer: Arc<dyn EvmSigner> = match oracle_network {
528 OracleNetwork::Devnet => Arc::new(LocalEvmSigner::new(std::iter::once(network.get_id()))),
529 OracleNetwork::Testnet | OracleNetwork::Mainnet => Arc::new(ProdEvmSigner),
530 };
531
532 let lvm = lyquid::LVM::new(
533 log.clone(),
534 network,
535 10,
536 Some(lyquor_vm::InterOps {
537 inter: fco.clone().recipient(),
538 submit: fco.clone().recipient(),
539 }),
540 evm_signer,
541 );
542 let api_subs = api::Subscriptions::new().start();
543 let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
544 Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
545 1000,
546 ));
547 let image_repo_store = RocksDB::new(Path::new("./lyquid_image_db"))?;
551 let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
552 let bartender_id = *config.profile.bartender_id();
553 let pool = lyquid::LyquidPool::new(
554 bartender_id,
555 lvm,
556 lyquid::LyquidPoolSetup {
557 fco: fco.clone(),
558 store_factory: lyq_store,
559 image_repo,
560 event_store,
561 api_subs: api_subs.clone(),
562 config: lyquid::LyquidConfig::builder().build(),
563 },
564 )
565 .await?
566 .start();
567 sys.pool = Some(pool.clone());
568 let bartender_setup = {
572 let register_topic = LyteLog::tagged_value_topic("Register");
573 match log
574 .send(lyquor_vm::log::GetNumberOfRecords {
575 id: bartender_id,
576 topic: register_topic,
577 })
578 .await
579 {
580 Ok(count) if count > 0 => None,
581 Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
582 id: bartender_id,
583 topic: register_topic,
584 })),
585 Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
586 }
587 };
588
589 let api = lyquor_lib::api::EthAPIServer::new(
591 &config.api_url,
592 jsonrpc_client.clone(),
593 api_subs,
594 fco.clone(),
595 pool.clone(),
596 )
597 .await?;
598 sys.api = Some(api.start());
599
600 if let Some(reg) = bartender_setup {
602 tracing::info!("Waiting for bartender to be registered.");
603 reg.await.context("Failed to wait for bartender registration.")?;
604 }
605 let bartender_ins = pool
607 .send(lyquid::GetBartender)
608 .await
609 .expect("Failed to obtain bartender.");
610 let bartender_ln = bartender_ins
611 .instance()
612 .read()
613 .max_number()
614 .unwrap_or(LyquidNumber::ZERO);
615 tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
616 bartender_ins
617 .wait_until(bartender_ln)
618 .await
619 .context("Failed to wait for the initial re-execution of bartender.")?;
620
621 log.send(lyquor_vm::log::Subscribe {
623 id: bartender_id,
624 topic: LyteLog::tagged_value_topic("Register"),
625 subscriber: dis.clone().recipient(),
626 })
627 .await
628 .context("Failed to subscribe to log system for LyquidDiscovery.")?;
629 dis.send(SetPool { pool: pool.clone() })
630 .await
631 .context("Failed to set pool in LyquidDiscovery.")?;
632
633 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
634
635 let db = DBFlusher {
637 last_bn: 0,
638 store,
639 fco,
640 anvil_chain,
641 flush_interval_secs: config.flush_interval_secs,
642 }
643 .start();
644 sys.db = Some(db);
645
646 Ok(sys)
647 }
648}
649
650fn generate_unused_port() -> anyhow::Result<u16> {
651 let listener = std::net::TcpListener::bind("127.0.0.1:0").context("Failed to bind to local address.")?;
652 let port = listener.local_addr().context("Failed to get local address.")?.port();
653 Ok(port)
654}
655
656fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
657 let anvil_port = generate_unused_port()?;
658 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
661 let mut sigset = SigSet::empty();
662 sigset.add(Signal::SIGINT);
663 let mut old_mask = SigSet::empty();
664 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
665 .context("Failed to mask SIGINT for anvil.")?;
666
667 let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
668
669 let stdout_reader = std::io::BufReader::new(
670 anvil
671 .child_mut()
672 .stdout
673 .take()
674 .context("Failed to read from anvil stdout.")?,
675 );
676
677 tokio::task::spawn_blocking(move || {
678 use std::io::BufRead;
679 let mut stdout_lines = stdout_reader.lines();
680 while let Some(line) = stdout_lines.next() {
681 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
682 }
683 });
684
685 tracing::info!("Anvil started at port {}.", anvil_port);
686 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
687 .context("Failed to restore old signal mask for anvil.")?;
688
689 Ok(anvil)
690}
691
692#[actix::main]
693async fn main() -> anyhow::Result<()> {
694 std::panic::set_hook(Box::new(|_| {
696 if let Some(&pid) = ANVIL_PID.get() {
697 tracing::warn!("Panic detected, killing anvil process {}", pid);
698 use nix::sys::signal::{Signal, kill};
699 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
700 }
701 }));
702
703 lyquor_cli::setup_tracing()?;
704
705 println!("{}", lyquor_cli::format_logo_banner());
706
707 let matches = clap::command!()
708 .propagate_version(true)
709 .arg(
710 clap::arg!(--"mem-db" "Use in-memory DB simulation (for debugging or some benchmarks).")
711 .required(false)
712 .action(clap::ArgAction::SetTrue),
713 )
714 .arg(
715 clap::arg!(--"api-eth" <IP_AND_PORT> "Serving address for Ethereum API.")
716 .required(false)
717 .default_value("localhost:10087"),
718 )
719 .arg(
720 clap::arg!(--"upc-addr" <IP_AND_PORT> "Serving address for UPC.")
721 .required(false)
722 .default_value("localhost:10080"),
723 )
724 .arg(
725 clap::arg!(--network <NETWORK> "Select the network to run. (Available: devnet/testnet/mainnet)")
726 .required(false)
727 .default_value("devnet"),
728 )
729 .arg(
730 clap::arg!(--"force-seq-eth" <WS_URL> "Override the WebSocket url for Ethereum API Sequencer.")
731 .required(false),
732 )
733 .arg(
734 clap::arg!(--"flush-interval" <SECONDS> "Interval in seconds for database flushing (0 to disable).")
735 .required(false)
736 .value_parser(clap::value_parser!(u64)),
737 )
738 .get_matches();
739
740 let (_, tls_config) = lyquor_tls::generator::single_node_config();
742 let addr = matches.get_one::<String>("upc-addr").unwrap();
743 let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
744 .listen_addr(addr.clone())
745 .build()
746 .context("Failed to setup network.")?;
747 network.start().await.context("Failed to start network.")?;
748
749 let mut _anvil: Option<AnvilInstance> = None;
750 let network_name = matches.get_one::<String>("network").map(|s| s.as_str());
751 let profile: utils::LyquorProfile = match network_name {
752 Some("devnet") => {
753 let ws_url = match matches.get_one::<String>("force-seq-eth") {
754 None => {
755 let anvil = start_devnet_anvil()?;
756 ANVIL_PID.set(anvil.child().id()).ok();
758 let ep = anvil.ws_endpoint();
759 _anvil = Some(anvil);
760 ep
761 }
762 Some(url) => url.to_string(),
763 };
764
765 utils::Devnet::new(ws_url).into()
766 }
767 _ => return Err(anyhow::anyhow!("Network is not supported.")),
768 };
769 let seq_eth_url = matches
770 .get_one::<String>("force-seq-eth")
771 .cloned()
772 .unwrap_or_else(|| profile.sequencer().to_string());
773 let api_url = matches.get_one::<String>("api-eth").cloned().unwrap();
774 let mem_db = matches.get_flag("mem-db");
775 let flush_interval_secs = matches
776 .get_one::<u64>("flush-interval")
777 .copied()
778 .unwrap_or_else(|| match network_name {
779 Some("devnet") => 0,
780 _ => 10,
781 });
782
783 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
786 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
787
788 let node = Node {
789 sys: Subsystems::default(),
790 config: Arc::new(NodeConfig {
791 mem_db,
792 seq_eth_url,
793 api_url,
794 profile,
795 started_anvil: _anvil.is_some(),
796 flush_interval_secs,
797 }),
798 network: Some(network),
799 }
800 .start();
801
802 tokio::select! {
803 _ = sigint.recv() => {
804 tracing::info!("received SIGINT");
805 }
806 _ = sigterm.recv() => {
807 tracing::info!("received SIGTERM");
808 }
809 }
810
811 Ok(node.send(Stop).await?)
812}