1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::debug_struct_name;
9use tokio::signal::unix::{SignalKind, signal};
10use tokio::sync::Notify;
11
12use lyquor_api::{
13 actor::Stop,
14 anyhow::{self, Context as _Context},
15 kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
16 subkey_builder,
17};
18use lyquor_db::{MemDB, RocksDB};
19use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
20use lyquor_lib::{api, lyquid, utils};
21use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
22use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
23use lyquor_vm::log::Log;
24
25subkey_builder!(RootSubKey(
26 ([0x01])-lyquid() => Key,
27 ([0x02])-fco() => Key,
28 ([0x03])-log() => Key,
29 ([0x04])-event_store() => Key,
30));
31
32fn subkey() -> &'static RootSubKey {
33 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
34 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
35}
36
37const LYQUOR_DB_ROOT: &str = "./lyquor_db";
38const ANVIL_STATE_FILENAME: &str = "anvil.state";
39
40static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
41
42#[derive(Debug, Message)]
43#[rtype(result = "()")]
44struct LoadLyquid {
45 id: LyquidID,
46 deps: Vec<LyquidID>,
47}
48
49#[derive(Debug, Message)]
50#[rtype(result = "()")]
51struct SetPool {
52 pool: Addr<lyquid::LyquidPool>,
53}
54
55struct LyquidDiscovery {
56 pool: Option<Addr<lyquid::LyquidPool>>,
57 register: Arc<Notify>,
58 bartender_id: LyquidID,
59 bartender_addr: Address,
60}
61
62lyquor_primitives::debug_struct_name!(LyquidDiscovery);
63
64impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
65 type Result = ResponseFuture<Address>;
66 #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
67 fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
68 let lyquor_seq::eth::GetContract { id, idx } = msg;
69 if id == self.bartender_id && idx == 0 {
70 return Box::pin(std::future::ready(self.bartender_addr));
71 }
72 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
73 let reg = self.register.clone();
74 Box::pin(async move {
75 loop {
76 match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx }).await {
79 Ok(response) => match response {
80 Some(info) => return info.contract,
81 None => reg.notified().await,
82 },
83 Err(e) => {
84 tracing::error!("Failed to get lyquid deployment info: {}", e);
85 std::future::pending::<()>().await;
87 }
88 }
89 }
90 })
91 }
92}
93
94impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
95 type Result = ();
96
97 #[tracing::instrument(level = "trace", skip(msg))]
98 fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
99 let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
100 if let Some(event) = event {
101 ctx.spawn(self.load(event.id, event.deps).into_actor(self));
102 }
103 }
104}
105
106impl Handler<LoadLyquid> for LyquidDiscovery {
107 type Result = ResponseFuture<()>;
108
109 #[tracing::instrument(level = "trace")]
110 fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
111 Box::pin(self.load(msg.id, msg.deps))
112 }
113}
114
115impl Handler<SetPool> for LyquidDiscovery {
116 type Result = ();
117
118 #[tracing::instrument(level = "trace")]
119 fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
120 if self.pool.is_some() {
121 return
122 }
123 let pool = msg.pool;
124 let me = ctx.address();
125 self.pool = Some(pool.clone());
126 ctx.spawn(
127 async move {
128 let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
131 None => {
132 tracing::error!("Failed to obtain the registered Lyquid list.");
137 return
138 }
139 Some(lyquids_with_deps) => lyquids_with_deps,
140 };
141 for (id, deps) in lyquids_with_deps.into_iter() {
143 if let Err(e) = me.send(LoadLyquid { id, deps }).await {
144 tracing::error!("Failed to send LoadLyquid message: {}", e);
145 }
146 }
147 }
148 .into_actor(self),
149 );
150 }
151}
152
153impl LyquidDiscovery {
154 fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
155 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
156 let register = self.register.clone();
157 async move {
158 match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
159 Ok(result) => match result {
160 Ok(already_exists) => {
161 if !already_exists {
162 tracing::info!("Discovered {}", id);
163 }
164 }
165 Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
166 },
167 Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
168 }
169 register.notify_waiters();
170 }
171 }
172}
173
174impl Handler<Stop> for LyquidDiscovery {
175 type Result = ();
176
177 #[tracing::instrument(level = "trace")]
178 fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
179 ctx.stop();
180 }
181}
182
183impl Actor for LyquidDiscovery {
184 type Context = actix::Context<Self>;
185
186 #[tracing::instrument(level = "trace", skip(_ctx))]
187 fn started(&mut self, _ctx: &mut Self::Context) {}
188
189 #[tracing::instrument(level = "trace", skip(_ctx))]
190 fn stopped(&mut self, _ctx: &mut Self::Context) {}
191}
192
193struct DBFlusher {
194 last_bn: u64,
195 store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
196 fco: Addr<FCO>,
197 anvil_chain: Option<ClientHandle>,
198 flush_interval_secs: u64,
199}
200
201lyquor_primitives::debug_struct_name!(DBFlusher);
202
203impl DBFlusher {
204 fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
205 let anvil_chain = self.anvil_chain.clone();
206 let fco = self.fco.clone();
207 async move {
208 fco.send(lyquor_seq::fco::Commit)
209 .await
210 .map_err(anyhow::Error::from)
211 .and_then(|e| e.map_err(anyhow::Error::from))?;
212
213 if let Some(client) = &anvil_chain {
214 use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
222 let dump = client
223 .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
224 .await
225 .map(|r| r.0);
226
227 match dump {
228 Ok(state) => {
229 let devnet_path = format!("{}/devnet", LYQUOR_DB_ROOT);
231 std::fs::create_dir_all(&devnet_path)?;
232 std::fs::write(&format!("{}/{}", devnet_path, ANVIL_STATE_FILENAME), &state)?;
233 }
234 Err(_) => {
235 tracing::error!(
236 "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
237 );
238 }
239 }
240 }
241
242 Ok(())
243 }
244 }
245
246 fn flush(&mut self, res: anyhow::Result<()>) {
247 if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
248 tracing::error!("Failed to write: {e}");
249 } else {
250 tracing::info!("Written to the storage backend (block={}).", self.last_bn);
251 }
252 }
253}
254
255#[derive(Message, Debug)]
256#[rtype(result = "()")]
257struct FlushDB;
258
259impl Handler<FlushDB> for DBFlusher {
260 type Result = ResponseActFuture<Self, ()>;
261
262 #[tracing::instrument(level = "trace", skip(self, _ctx))]
263 fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
264 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
265 actor.flush(res);
266 }))
267 }
268}
269
270impl Handler<Stop> for DBFlusher {
271 type Result = ResponseActFuture<Self, ()>;
272
273 #[tracing::instrument(level = "trace")]
274 fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
275 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
276 actor.flush(res);
277 }))
278 }
279}
280
281impl Actor for DBFlusher {
282 type Context = actix::Context<Self>;
283
284 #[tracing::instrument(level = "trace", skip_all)]
285 fn started(&mut self, ctx: &mut Self::Context) {
286 if self.flush_interval_secs > 0 {
287 let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
288 ctx.run_interval(periodic_write, |_act, ctx| {
289 ctx.address().do_send(FlushDB);
290 });
291 }
292 }
293
294 #[tracing::instrument(level = "trace", skip_all)]
295 fn stopped(&mut self, _ctx: &mut Self::Context) {}
296}
297
298async fn setup_eth_backend(
299 profile: &utils::LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
300) -> anyhow::Result<eth::Backend> {
301 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
302 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
303
304 Ok(eth::Backend::new(
305 eth::Config::builder()
306 .finality_tag(finality_tag)
307 .registry(reg)
308 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
309 .build(),
310 jsonrpc_client,
311 ))
312}
313
314#[derive(Default)]
315struct Subsystems {
316 fco: Option<Addr<FCO>>,
317 log: Option<Addr<Log>>,
318 pool: Option<Addr<lyquid::LyquidPool>>,
319 api: Option<Addr<api::EthAPIServer>>,
320 db: Option<Addr<DBFlusher>>,
321 dis: Option<Addr<LyquidDiscovery>>,
322 client: Option<ClientHandle>,
323 eth_backend: Option<lyquor_seq::eth::Backend>,
324}
325debug_struct_name!(Subsystems);
326
327impl Subsystems {
328 #[tracing::instrument(level = "trace")]
329 async fn stop(&mut self) {
330 if let Some(api) = self.api.take() {
331 api.send(Stop).await.ok();
332 }
333 if let Some(eth_backend) = self.eth_backend.take() {
334 eth_backend.stop().await;
335 }
336 if let Some(log) = self.log.take() {
337 log.send(Stop).await.ok();
338 }
339 if let Some(dis) = self.dis.take() {
340 dis.send(Stop).await.ok();
341 }
342 if let Some(pool) = self.pool.take() {
343 pool.send(Stop).await.ok();
344 }
345 if let Some(db) = self.db.take() {
346 db.send(Stop).await.ok();
347 }
348 if let Some(dis) = self.dis.take() {
349 dis.send(Stop).await.ok();
350 }
351 if let Some(fco) = self.fco.take() {
352 fco.send(Stop).await.ok();
353 }
354 }
355}
356
357struct NodeConfig {
358 mem_db: bool,
359 seq_eth_url: String,
360 api_url: String,
361 profile: utils::LyquorProfile,
362 started_anvil: bool,
363 flush_interval_secs: u64,
364}
365
366struct Node {
368 sys: Subsystems,
369 config: Arc<NodeConfig>,
370 network: Option<lyquor_net::hub::Hub>,
371}
372
373lyquor_primitives::debug_struct_name!(Node);
374
375impl Handler<Stop> for Node {
376 type Result = AtomicResponse<Self, ()>;
377
378 #[tracing::instrument(level = "trace")]
379 fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
380 let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
381 AtomicResponse::new(Box::pin(
382 async move {
383 sys.stop().await;
384 }
385 .into_actor(self)
386 .map(|_, _act, ctx| {
387 ctx.stop();
388 }),
389 ))
390 }
391}
392
393impl Actor for Node {
394 type Context = Context<Self>;
395
396 #[tracing::instrument(level = "trace", skip(ctx))]
397 fn started(&mut self, ctx: &mut Self::Context) {
398 ctx.spawn(
400 Self::init(
401 self.config.clone(),
402 self.network.take().expect("Node: Network not found."),
403 )
404 .into_actor(self)
405 .map(|sys, act, ctx| {
406 match sys {
407 Ok(sys) => {
408 act.sys = sys;
409 }
410 Err(e) => {
411 tracing::error!("Failed to initialize node: {e:?}");
412 ctx.stop();
413 }
414 };
415 }),
416 );
417 }
418
419 #[tracing::instrument(level = "trace", skip(_ctx))]
420 fn stopped(&mut self, _ctx: &mut Self::Context) {}
421}
422
423impl Node {
424 #[tracing::instrument(level = "trace", skip_all)]
425 async fn init(config: Arc<NodeConfig>, network: lyquor_net::hub::Hub) -> anyhow::Result<Subsystems> {
426 let mut sys = Subsystems::default();
427
428 let store: Arc<ShadowKVStore<Box<dyn KVStore>>>;
430 let lyq_store: Arc<dyn utils::LVMStoreFactory> = if config.mem_db {
431 tracing::warn!("using MemDB, all data will be lost upon exit");
432 store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new())));
433 Arc::new(utils::LVMMemStore)
434 } else {
435 store = Arc::new(ShadowKVStore::new(Box::new(RocksDB::new(
436 &Path::new(LYQUOR_DB_ROOT).join(config.profile.id()),
437 )?)));
438 Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
439 store.clone(),
440 subkey().lyquid(),
441 ))))
442 };
443 let dis = LyquidDiscovery {
448 pool: None,
449 register: Arc::new(Notify::new()),
450 bartender_id: *config.profile.bartender_id(),
451 bartender_addr: Address::from_str(config.profile.bartender_addr())
452 .context("Invalid bartender address in profile.")?
453 .into(),
454 }
455 .start();
456 sys.dis = Some(dis.clone());
457
458 let jsonrpc_client = ClientConfig::builder()
460 .url(config.seq_eth_url.clone().parse()?)
461 .build()
462 .into_client();
463 sys.client = Some(jsonrpc_client.clone());
464
465 let anvil_chain = if config.started_anvil && !config.mem_db {
467 let state_file = format!("{}/{}/{}", LYQUOR_DB_ROOT, config.profile.id(), ANVIL_STATE_FILENAME);
468 if std::path::Path::new(&state_file).exists() {
469 use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
470 let state_data = std::fs::read(&state_file)?;
471 let load_result = jsonrpc_client
472 .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
473 .await?;
474 tracing::info!("Loaded anvil state from {}: {}.", state_file, load_result.0);
475 } else {
476 tracing::info!(
477 "No existing anvil state file found at {}, starting with fresh state.",
478 state_file
479 );
480 }
481 Some(jsonrpc_client.clone())
482 } else {
483 None
484 };
485
486 let eth_backend = setup_eth_backend(&config.profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
487
488 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
489 let fco = FCO::new(
490 eth_backend.clone(),
491 Arc::new(fco_store),
492 Some(config.profile.init_chain_position()),
493 )?
494 .start();
495 sys.fco = Some(fco.clone());
496 sys.eth_backend = Some(eth_backend);
498 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
503 let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
504 sys.log = Some(log.clone());
505
506 let lvm = lyquid::LVM::new(log.clone(), network, 10, Some(fco.clone().recipient()));
508 let api_subs = api::Subscriptions::new().start();
509 let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
510 Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
511 1000,
512 ));
513 let image_repo_store = RocksDB::new(Path::new("./lyquid_image_db"))?;
517 let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
518 let bartender_id = *config.profile.bartender_id();
519 let pool = lyquid::LyquidPool::new(
520 bartender_id,
521 lvm,
522 lyquid::LyquidPoolSetup {
523 fco: fco.clone(),
524 store_factory: lyq_store,
525 image_repo,
526 event_store,
527 api_subs: api_subs.clone(),
528 config: lyquid::LyquidConfig::builder().build(),
529 },
530 )
531 .await?
532 .start();
533 sys.pool = Some(pool.clone());
534 let bartender_setup = {
538 let register_topic = LyteLog::tagged_value_topic("Register");
539 match log
540 .send(lyquor_vm::log::GetNumberOfRecords {
541 id: bartender_id,
542 topic: register_topic,
543 })
544 .await
545 {
546 Ok(count) if count > 0 => None,
547 Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
548 id: bartender_id,
549 topic: register_topic,
550 })),
551 Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
552 }
553 };
554
555 let api = lyquor_lib::api::EthAPIServer::new(
557 &config.api_url,
558 jsonrpc_client.clone(),
559 api_subs,
560 fco.clone(),
561 pool.clone(),
562 )
563 .await?;
564 sys.api = Some(api.start());
565
566 if let Some(reg) = bartender_setup {
568 tracing::info!("Waiting for bartender to be registered.");
569 reg.await.context("Failed to wait for bartender registration.")?;
570 }
571
572 log.send(lyquor_vm::log::Subscribe {
574 id: bartender_id,
575 topic: LyteLog::tagged_value_topic("Register"),
576 subscriber: dis.clone().recipient(),
577 })
578 .await
579 .context("Failed to subscribe to log system for LyquidDiscovery.")?;
580 dis.send(SetPool { pool: pool.clone() })
581 .await
582 .context("Failed to set pool in LyquidDiscovery.")?;
583
584 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
585
586 let db = DBFlusher {
588 last_bn: 0,
589 store,
590 fco,
591 anvil_chain,
592 flush_interval_secs: config.flush_interval_secs,
593 }
594 .start();
595 sys.db = Some(db);
596
597 Ok(sys)
598 }
599}
600
601fn generate_unused_port() -> anyhow::Result<u16> {
602 let listener = std::net::TcpListener::bind("127.0.0.1:0").context("Failed to bind to local address.")?;
603 let port = listener.local_addr().context("Failed to get local address.")?.port();
604 Ok(port)
605}
606
607fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
608 let anvil_port = generate_unused_port()?;
609 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
612 let mut sigset = SigSet::empty();
613 sigset.add(Signal::SIGINT);
614 let mut old_mask = SigSet::empty();
615 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
616 .context("Failed to mask SIGINT for anvil.")?;
617
618 let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
619
620 let stdout_reader = std::io::BufReader::new(
621 anvil
622 .child_mut()
623 .stdout
624 .take()
625 .context("Failed to read from anvil stdout.")?,
626 );
627
628 tokio::task::spawn_blocking(move || {
629 use std::io::BufRead;
630 let mut stdout_lines = stdout_reader.lines();
631 while let Some(line) = stdout_lines.next() {
632 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
633 }
634 });
635
636 tracing::info!("Anvil started at port {}.", anvil_port);
637 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
638 .context("Failed to restore old signal mask for anvil.")?;
639
640 Ok(anvil)
641}
642
643#[actix::main]
644async fn main() -> anyhow::Result<()> {
645 std::panic::set_hook(Box::new(|_| {
647 if let Some(&pid) = ANVIL_PID.get() {
648 tracing::warn!("Panic detected, killing anvil process {}", pid);
649 use nix::sys::signal::{Signal, kill};
650 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
651 }
652 }));
653
654 lyquor_cli::setup_tracing()?;
655
656 println!("{}", lyquor_cli::format_logo_banner());
657
658 let matches = clap::command!()
659 .propagate_version(true)
660 .arg(
661 clap::arg!(--"mem-db" "Use in-memory DB simulation (for debugging or some benchmarks).")
662 .required(false)
663 .action(clap::ArgAction::SetTrue),
664 )
665 .arg(
666 clap::arg!(--"api-eth" <IP_AND_PORT> "Serving address for Ethereum API.")
667 .required(false)
668 .default_value("localhost:10087"),
669 )
670 .arg(
671 clap::arg!(--"upc-addr" <IP_AND_PORT> "Serving address for UPC.")
672 .required(false)
673 .default_value("localhost:10080"),
674 )
675 .arg(
676 clap::arg!(--network <NETWORK> "Select the network to run. (Available: devnet/testnet/mainnet)")
677 .required(false)
678 .default_value("devnet"),
679 )
680 .arg(
681 clap::arg!(--"force-seq-eth" <WS_URL> "Override the WebSocket url for Ethereum API Sequencer.")
682 .required(false),
683 )
684 .arg(
685 clap::arg!(--"flush-interval" <SECONDS> "Interval in seconds for database flushing (0 to disable).")
686 .required(false)
687 .value_parser(clap::value_parser!(u64)),
688 )
689 .get_matches();
690
691 let (_, tls_config) = lyquor_tls::generator::single_node_config();
693 let addr = matches.get_one::<String>("upc-addr").unwrap();
694 let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
695 .listen_addr(addr.clone())
696 .build()
697 .context("Failed to setup network.")?;
698 network.start().await.context("Failed to start network.")?;
699
700 let mut _anvil: Option<AnvilInstance> = None;
701 let network_name = matches.get_one::<String>("network").map(|s| s.as_str());
702 let profile: utils::LyquorProfile = match network_name {
703 Some("devnet") => {
704 let ws_url = match matches.get_one::<String>("force-seq-eth") {
705 None => {
706 let anvil = start_devnet_anvil()?;
707 ANVIL_PID.set(anvil.child().id()).ok();
709 let ep = anvil.ws_endpoint();
710 _anvil = Some(anvil);
711 ep
712 }
713 Some(url) => url.to_string(),
714 };
715
716 utils::Devnet::new(ws_url).into()
717 }
718 _ => return Err(anyhow::anyhow!("Network is not supported.")),
719 };
720 let seq_eth_url = matches
721 .get_one::<String>("force-seq-eth")
722 .cloned()
723 .unwrap_or_else(|| profile.sequencer().to_string());
724 let api_url = matches.get_one::<String>("api-eth").cloned().unwrap();
725 let mem_db = matches.get_flag("mem-db");
726 let flush_interval_secs = matches
727 .get_one::<u64>("flush-interval")
728 .copied()
729 .unwrap_or_else(|| match network_name {
730 Some("devnet") => 0,
731 _ => 10,
732 });
733
734 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
737 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
738
739 let node = Node {
740 sys: Subsystems::default(),
741 config: Arc::new(NodeConfig {
742 mem_db,
743 seq_eth_url,
744 api_url,
745 profile,
746 started_anvil: _anvil.is_some(),
747 flush_interval_secs,
748 }),
749 network: Some(network),
750 }
751 .start();
752
753 tokio::select! {
754 _ = sigint.recv() => {
755 tracing::info!("received SIGINT");
756 }
757 _ = sigterm.recv() => {
758 tracing::info!("received SIGTERM");
759 }
760 }
761
762 Ok(node.send(Stop).await?)
763}