1use std::path::{Path, PathBuf};
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use alloy_node_bindings::{Anvil, AnvilInstance};
7use barstrainer_client::SigningIdentity;
8use lyquor_primitives::{LyquidID, LyquidNumber, debug_struct_name};
9use serde_json::{Map, Value};
10use tokio::signal::unix::{SignalKind, signal};
11use tokio_util::sync::CancellationToken;
12
13use lyquor_api::{
14 anyhow::{self, Context as _Context},
15 config::{DB, NodeConfig},
16 profile::{LyquorProfile, NetworkType},
17 store::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
18 subkey_builder,
19};
20use lyquor_db::{MemDB, RocksDB};
21use lyquor_hosting::{
22 ImageResolver, LyquidConfig, LyquidLog, LyquidRegistry, ProcessFactory, ProcessFactoryDeps, ProcessManager,
23 ProcessStoreFactory, ProcessStores, Sequencer, VmRuntime,
24};
25use lyquor_image_store::{DirectoryOciStore, DirectoryRepo, LyquidImageRepo, OciDistributionStore};
26use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
27use lyquor_jsonrpc::types::{EthGetChainId, EthGetChainIdResp};
28use lyquor_lib::db::{DBFlusher, DBFlusherHandle};
29use lyquor_lib::dns_updater::publish_external_host_dns;
30use lyquor_lib::http;
31use lyquor_lib::side_effects::{SideEffectRuntime, build_side_effect_runtime};
32use lyquor_primitives::{Address, Bytes, LyteLog};
33use lyquor_seq::{SequenceBackend, eth, fco::FCO};
34
35subkey_builder!(RootSubKey(
36 ([0x01])-lyquid() => Key,
37 ([0x02])-fco() => Key,
38 ([0x03])-log() => Key,
39 ([0x04])-side_effect_store() => Key,
40));
41
42fn subkey() -> &'static RootSubKey {
43 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
44 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
45}
46
47const ANVIL_STATE_FILENAME: &str = "anvil-state.json";
48
49const PROGRESS_KEY: [u8; 1] = [0x00];
50
51subkey_builder!(ProcessStoreRootSubkey(
52 ([])-lyquid(&LyquidID) => ProcessLyquidSubkey,
53));
54
55subkey_builder!(ProcessLyquidSubkey(
56 (PROGRESS_KEY)-progress() => Key
57));
58
59static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
60
61async fn setup_eth_backend(
62 profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
63 shutdown: CancellationToken,
64) -> anyhow::Result<eth::Backend> {
65 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
66 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
67
68 Ok(eth::Backend::new(
69 eth::Config::builder()
70 .finality_tag(finality_tag)
71 .registry(reg)
72 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
73 .contract_map_factory(Arc::new(|store, cache_size| {
74 Box::pin(async move {
75 let store = lyquor_db::SortedMappingStore::new(store, cache_size).await?;
76 Ok(Arc::new(store) as eth::ContractMap)
77 })
78 }))
79 .build(),
80 jsonrpc_client,
81 shutdown,
82 ))
83}
84
85fn lyquid_number_map_factory() -> lyquor_hosting::LyquidNumberMapFactory {
86 Arc::new(|store, cache_size| {
87 Box::pin(async move {
88 let store = lyquor_db::SortedMappingStore::new(store, cache_size).await?;
89 Ok(Arc::new(store) as lyquor_hosting::LyquidNumberMap)
90 })
91 })
92}
93
94fn kv_store_backed_process_store_factory<V: KVStore + 'static>(store: Arc<V>) -> ProcessStoreFactory {
95 Arc::new(move |id| {
96 let store = store.clone();
97 Box::pin(async move {
98 let subkey = ProcessStoreRootSubkey::new([].into()).lyquid(&id);
99 let vm_root: Arc<dyn KVStore> = Arc::new(PrefixedKVStore::new(store.clone(), subkey.prefix().clone()));
100 let vm = lyquor_vm::InstanceStores::kv(vm_root).await?;
101 let progress: Arc<dyn KVStore> = Arc::new(PrefixedKVStore::new(store.clone(), subkey.progress()));
102 Ok(ProcessStores { vm, progress })
103 })
104 })
105}
106
107#[derive(Default)]
108struct Subsystems {
109 eth_backend: Option<lyquor_seq::eth::Backend>,
110 fco: Option<FCO>,
111 log: Option<LyquidLog>,
112 process_manager: Option<ProcessManager>,
113 api: Option<http::HttpServer>,
114 db: Option<DBFlusherHandle>,
115 contract_resolver: Option<lyquor_lib::discovery::ContractResolver>,
116 registration_watcher: Option<lyquor_lib::discovery::RegistrationWatcher>,
117 client: Option<ClientHandle>,
118}
119debug_struct_name!(Subsystems);
120
121impl Subsystems {
122 #[tracing::instrument(level = "trace")]
123 async fn stop(&mut self, shutdown: CancellationToken) {
124 if let Some(api) = self.api.take() {
125 api.stop();
126 }
127 if let Some(eth_backend) = self.eth_backend.take() {
128 eth_backend.shutdown().await;
129 }
130 if let Some(registration_watcher) = self.registration_watcher.take() {
131 registration_watcher.shutdown().await;
132 }
133 if let Some(process_manager) = self.process_manager.take() {
134 process_manager.shutdown().await;
135 }
136 if let Some(db) = self.db.take() {
137 db.stop().await;
138 }
139 shutdown.cancel();
140 if let Some(client) = self.client.take() {
141 client.wait_for_shutdown().await;
142 }
143 if let Some(fco) = self.fco.take() {
144 fco.shutdown().await;
145 }
146 if let Some(log) = self.log.take() {
147 log.shutdown().await;
148 }
149 }
150}
151
152struct Node {
153 sys: Subsystems,
154 config: Arc<NodeConfig>,
155 profile: Arc<LyquorProfile>,
156 seq_eth_url: String,
157 http_tls: Option<http::httptls::HttpsTlsRuntime>,
158 started_anvil: bool,
159 network: Option<Arc<lyquor_net::hub::Hub>>,
160 vm_runtime: tokio::runtime::Handle,
161 shutdown: CancellationToken,
162}
163
164lyquor_primitives::debug_struct_name!(Node);
165
166impl Node {
167 #[tracing::instrument(level = "trace", skip_all)]
168 async fn start(mut self) -> anyhow::Result<Self> {
169 self.sys = Self::init(
170 self.config.clone(),
171 self.profile.clone(),
172 self.seq_eth_url.clone(),
173 self.http_tls.clone(),
174 self.started_anvil,
175 self.network.take().expect("Node: Network not found."),
176 self.vm_runtime.clone(),
177 self.shutdown.clone(),
178 )
179 .await?;
180 Ok(self)
181 }
182
183 #[tracing::instrument(level = "trace", skip_all)]
184 async fn shutdown(&mut self) {
185 self.sys.stop(self.shutdown.clone()).await;
186 }
187
188 #[tracing::instrument(level = "trace", skip_all)]
189 async fn init(
190 config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String,
191 http_tls: Option<http::httptls::HttpsTlsRuntime>, started_anvil: bool, network: Arc<lyquor_net::hub::Hub>,
192 vm_runtime: tokio::runtime::Handle, shutdown: CancellationToken,
193 ) -> anyhow::Result<Subsystems> {
194 let mut sys = Subsystems::default();
195
196 let jsonrpc_client = ClientConfig::builder()
198 .url(seq_eth_url.parse()?)
199 .build()
200 .into_client(shutdown.child_token());
201 sys.client = Some(jsonrpc_client.clone());
202 let sequence_backend_chain_id: u64 = jsonrpc_client
203 .request::<EthGetChainId, EthGetChainIdResp>(EthGetChainId)
204 .await
205 .context("Failed to query sequence backend chain ID.")?
206 .0
207 .to();
208 let sequence_backend_bartender_addr =
209 Address::from_str(profile.bartender_addr()).context("Invalid bartender address in profile.")?;
210 let resolved_db_dir = config.resolved_db_dir();
211 let resolved_image_repo_dir = config.resolved_image_repo_dir();
212
213 let store: Arc<ShadowKVStore<Box<dyn KVStore>>> = match &config.storage.db {
215 DB::MemDb => {
216 tracing::warn!("using MemDB, all data will be lost upon exit");
217 Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>))
218 }
219 DB::RocksDb { .. } => {
220 let base_dir = resolved_db_dir.clone();
221 let store = Arc::new(ShadowKVStore::new(
222 Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
223 ));
224
225 if started_anvil {
226 let state_file = anvil_state_path(&base_dir, profile.id());
227 if state_file.exists() {
228 tracing::info!("Using Anvil state file {}.", state_file.display());
229 } else {
230 tracing::info!(
231 "No existing Anvil state file found at {}, starting with fresh state.",
232 state_file.display()
233 );
234 }
235 }
236
237 store
238 }
239 };
240 let lyq_store: ProcessStoreFactory =
241 kv_store_backed_process_store_factory(Arc::new(PrefixedKVStore::new(store.clone(), subkey().lyquid())));
242 let contract_resolver =
247 lyquor_lib::discovery::ContractResolver::new(*profile.bartender_id(), sequence_backend_bartender_addr);
248 let contract_resolver_handle = contract_resolver.handle();
249 sys.contract_resolver = Some(contract_resolver);
250
251 let eth_backend = setup_eth_backend(
252 &profile,
253 contract_resolver_handle.contract_registry_service(),
254 jsonrpc_client.clone(),
255 shutdown.child_token(),
256 )
257 .await?;
258 sys.eth_backend = Some(eth_backend.clone());
259 let SideEffectRuntime {
260 store: side_effect_store,
261 provider: side_effect_provider,
262 } = build_side_effect_runtime(config.as_ref(), store.clone(), subkey().side_effect_store())?;
263 let archive_inbound_store = (!side_effect_provider.is_selective_hosting()).then(|| side_effect_store.clone());
264
265 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
266 let fco = FCO::new(
267 eth_backend.clone(),
268 Arc::new(fco_store),
269 Some(profile.init_chain_position()),
270 side_effect_provider,
271 archive_inbound_store,
272 )
273 .await?;
274 let fco_handle = fco.handle();
275 sys.fco = Some(fco);
276 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
281 let log_store: Arc<dyn KVStore> = Arc::new(log_store);
282 let log = LyquidLog::new(log_store);
283 let log_hub = log.handle();
284 sys.log = Some(log);
285
286 let mut lvm = VmRuntime::new(
287 vm_runtime,
288 network.clone(),
289 Some(Sequencer {
290 inter: lyquor_api::interface::InterCallService::new(fco_handle.clone()),
291 submit: lyquor_api::interface::SubmitService::new(fco_handle.clone()),
292 fetch_oracle_info: lyquor_api::interface::OracleInfoService::new(fco_handle.clone()),
293 }),
294 )
295 .context("Failed to initialize shared wasmtime engine.")?;
296 for peer in &config.peers {
300 lvm.add_remote(peer.node_id, Some(peer.upc_addr.clone()))
301 .await
302 .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
303 }
304 let image_repo = Arc::new(
308 DirectoryRepo::new(&resolved_image_repo_dir)
309 .await
310 .context("Failed to initialize image store.")?,
311 );
312 let image_repo_for_resolver: Arc<dyn LyquidImageRepo> = image_repo.clone();
313 let image_repo_for_http: Arc<dyn OciDistributionStore> = Arc::new(DirectoryOciStore::new(image_repo.clone()));
314 let image_resolver = Arc::new(ImageResolver::new(
315 image_repo_for_resolver,
316 config.image.fallback_repos.clone(),
317 ));
318 let bartender_id = *profile.bartender_id();
319 let process_factory = ProcessFactory::new(
320 lvm,
321 ProcessFactoryDeps {
322 fco: fco_handle.clone(),
323 store_factory: lyq_store,
324 lyquid_number_map_factory: lyquid_number_map_factory(),
325 image_resolver,
326 side_effect_store,
327 log_hub: log_hub.clone(),
328 config: LyquidConfig::builder().build(),
329 },
330 );
331 let process_manager = ProcessManager::new(bartender_id, process_factory).await?;
332 let process_manager_handle = process_manager.handle();
333 sys.process_manager = Some(process_manager);
334 let register_topic = LyteLog::tagged_value_topic("Register");
338 let register_count = log_hub.get_number_of_records(bartender_id, register_topic).await;
339
340 let bartender_ins = process_manager_handle
341 .get_bartender()
342 .await
343 .context("Failed to obtain bartender.")?;
344 let sequence_backend_id =
345 lyquor_primitives::sequence_backend_id(sequence_backend_chain_id, sequence_backend_bartender_addr);
346 let lyquid_registry = LyquidRegistry::new(bartender_ins.clone(), sequence_backend_id);
347 lyquid_registry
348 .bind_process_manager(process_manager_handle.clone())
349 .await
350 .context("Failed to bind process manager into LyquidRegistry.")?;
351 process_manager_handle
352 .bind_registry(lyquid_registry.clone())
353 .await
354 .context("Failed to bind LyquidRegistry into ProcessManager.")?;
355
356 let node_service_context = http::NodeServiceContext::new(config.clone(), network.clone());
357 let api = lyquor_lib::http::HttpServer::new(
359 &config.network.api_addr,
360 jsonrpc_client.clone(),
361 fco_handle.clone(),
362 process_manager_handle.clone(),
363 lyquid_registry.clone(),
364 node_service_context,
365 image_repo_for_http,
366 shutdown.child_token(),
367 http_tls,
368 )
369 .await?;
370 sys.api = Some(api);
371
372 if register_count == 0 {
374 tracing::info!("Waiting for bartender to be registered.");
375 log_hub
376 .wait_next(bartender_id, register_topic)
377 .await
378 .context("Failed to wait for bartender registration.")?;
379 }
380 let bartender_ln = bartender_ins.max_number().await.unwrap_or(LyquidNumber::ZERO);
382 tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
383 bartender_ins
384 .wait_until(bartender_ln)
385 .await
386 .context("Failed to wait for the initial re-execution of bartender.")?;
387
388 contract_resolver_handle
389 .bind_registry(lyquid_registry.clone())
390 .await
391 .context("Failed to bind LyquidRegistry into ContractResolver.")?;
392
393 let registration_watcher = lyquor_lib::discovery::RegistrationWatcher::start(
394 lyquid_registry.clone(),
395 process_manager_handle.clone(),
396 contract_resolver_handle,
397 log_hub
398 .subscribe(bartender_id, register_topic)
399 .await
400 .context("Failed to subscribe to log system for RegistrationWatcher.")?,
401 );
402 sys.registration_watcher = Some(registration_watcher);
403
404 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
405
406 let db = DBFlusher::new(0, store, fco_handle, Duration::from_secs(config.flush_interval_secs)).start();
408 sys.db = Some(db);
409
410 Ok(sys)
411 }
412}
413
414fn generate_unused_port() -> anyhow::Result<u16> {
415 let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
416 let port = listener.local_addr().context("Failed to get local address.")?.port();
417 Ok(port)
418}
419
420fn anvil_state_path(data_dir: &Path, profile_id: &str) -> PathBuf {
421 data_dir.join(profile_id).join(ANVIL_STATE_FILENAME)
422}
423
424fn start_devnet_anvil(state_file: Option<&Path>, state_interval_secs: u64) -> anyhow::Result<AnvilInstance> {
425 let anvil_port = generate_unused_port()?;
426 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
429 let mut sigset = SigSet::empty();
430 sigset.add(Signal::SIGINT);
431 let mut old_mask = SigSet::empty();
432 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
433 .context("Failed to mask SIGINT for anvil.")?;
434
435 let mut anvil_builder = Anvil::new().port(anvil_port).keep_stdout();
436 if let Some(state_file) = state_file {
437 if let Some(state_dir) = state_file.parent() {
438 std::fs::create_dir_all(state_dir)?;
439 }
440 anvil_builder = anvil_builder.arg("--state").arg(state_file.as_os_str());
441 if state_interval_secs > 0 {
442 anvil_builder = anvil_builder
443 .arg("--state-interval")
444 .arg(state_interval_secs.to_string());
445 }
446 }
447 let mut anvil = anvil_builder.try_spawn()?;
448
449 let stdout_reader = std::io::BufReader::new(
450 anvil
451 .child_mut()
452 .stdout
453 .take()
454 .context("Failed to read from anvil stdout.")?,
455 );
456
457 tokio::task::spawn_blocking(move || {
458 use std::io::BufRead;
459 let mut stdout_lines = stdout_reader.lines();
460 while let Some(line) = stdout_lines.next() {
461 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
462 }
463 });
464
465 tracing::info!("Anvil started at port {}.", anvil_port);
466 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
467 .context("Failed to restore old signal mask for anvil.")?;
468
469 Ok(anvil)
470}
471
472fn parse_override_value(raw: &str) -> Value {
473 if let Ok(v) = raw.parse::<bool>() {
474 return Value::from(v);
475 }
476 if let Ok(v) = raw.parse::<u64>() {
477 return Value::from(v);
478 }
479 if let Ok(v) = raw.parse::<i64>() {
480 return Value::from(v);
481 }
482 if let Ok(v) = raw.parse::<f64>() {
483 return Value::from(v);
484 }
485 Value::from(raw.to_string())
486}
487
488fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
489 let mut segments = key.split('.').peekable();
490 let mut current = overrides;
491
492 while let Some(segment) = segments.next() {
493 if segments.peek().is_none() {
494 current.insert(segment.to_string(), value);
495 return Ok(());
496 }
497
498 let entry = current
499 .entry(segment.to_string())
500 .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
501
502 match entry {
503 Value::Object(dict) => {
504 current = dict;
505 }
506 _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
507 }
508 }
509
510 Ok(())
511}
512
513fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
514 let mut overrides: Map<String, Value> = Map::new();
515
516 if let Some(entries) = matches.get_many::<String>("config-override") {
517 for entry in entries {
518 let (key, value) = entry
519 .split_once('=')
520 .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
521 insert_override_path(&mut overrides, key, parse_override_value(value))?;
522 }
523 }
524
525 Ok(overrides)
526}
527
528const NODE_RUNTIME_THREAD_NAME: &str = "lyquor-node";
529const VM_RUNTIME_THREAD_NAME: &str = "lyquor-vm";
530
531fn build_node_runtime(worker_threads: usize) -> std::io::Result<tokio::runtime::Runtime> {
532 tokio::runtime::Builder::new_multi_thread()
533 .enable_all()
534 .worker_threads(worker_threads)
535 .thread_name(NODE_RUNTIME_THREAD_NAME)
536 .build()
537}
538
539fn build_vm_runtime(worker_threads: usize) -> std::io::Result<tokio::runtime::Runtime> {
540 tokio::runtime::Builder::new_multi_thread()
541 .enable_all()
542 .worker_threads(worker_threads)
543 .thread_name(VM_RUNTIME_THREAD_NAME)
544 .build()
545}
546
547fn load_config_from_args() -> anyhow::Result<NodeConfig> {
548 let matches = clap::command!()
549 .version(lyquor_cli::build_version!())
550 .propagate_version(true)
551 .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
552 .arg(
553 clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
554 .required(false)
555 .action(clap::ArgAction::Append)
556 .value_parser(clap::builder::NonEmptyStringValueParser::new()),
557 )
558 .get_matches();
559
560 let config_path = matches.get_one::<String>("config").map(PathBuf::from);
561 let overrides = build_config_overrides(&matches)?;
562 NodeConfig::load(config_path, overrides)
563}
564
565fn main() -> anyhow::Result<()> {
566 std::panic::set_hook(Box::new(|_| {
568 if let Some(&pid) = ANVIL_PID.get() {
569 tracing::warn!("Panic detected, killing anvil process {}", pid);
570 use nix::sys::signal::{Signal, kill};
571 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
572 }
573 }));
574
575 lyquor_cli::setup_tracing()?;
576
577 println!("{}", lyquor_cli::format_logo_banner(lyquor_cli::build_version!()));
578
579 let config = load_config_from_args()?;
580 let runtime = build_node_runtime(config.runtime.node_threads).context("Failed to build node Tokio runtime")?;
581 let vm_runtime = build_vm_runtime(config.runtime.execution_threads).context("Failed to build VM Tokio runtime")?;
582 let vm_runtime_handle = vm_runtime.handle().clone();
583
584 let result = runtime.block_on(run_node(config, vm_runtime_handle));
585 vm_runtime.shutdown_background();
586 result
587}
588
589async fn run_node(config: NodeConfig, vm_runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
590 let mut _anvil: Option<AnvilInstance> = None;
591 let seq_eth_url = match config.profile.base {
592 NetworkType::Devnet => {
593 match config.profile.sequencer.as_deref() {
594 None => {
595 let state_file = match &config.storage.db {
596 DB::RocksDb { .. } => Some(anvil_state_path(
597 &config.resolved_db_dir(),
598 config.profile.base.as_str(),
599 )),
600 DB::MemDb => None,
601 };
602 let anvil = start_devnet_anvil(state_file.as_deref(), config.flush_interval_secs)?;
603 ANVIL_PID.set(anvil.child().id()).ok();
605 let ep = anvil.ws_endpoint();
606 _anvil = Some(anvil);
607 ep
608 }
609 Some(url) => url.to_string(),
610 }
611 }
612 _ => return Err(anyhow::anyhow!("Network is not supported.")),
613 };
614 let profile: Arc<LyquorProfile> = Arc::new(config.profile.resolve(seq_eth_url.clone())?);
615 let node_seed = config.node_key.seed_bytes()?;
616 let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed, profile.dns_suffix());
617
618 if let (Some(external_host), Some(barstrainer_addr)) = (
619 config.network.external_host.as_deref(),
620 config.network.barstrainer_addr.as_deref(),
621 ) {
622 match SigningIdentity::from_tls_config(&tls_config) {
623 Ok(barstrainer_identity) => {
624 let dns_suffix = profile.dns_suffix().to_string();
625 let external_host = external_host.to_string();
626 let barstrainer_addr = barstrainer_addr.to_string();
627 tokio::spawn(async move {
628 if let Err(err) =
629 publish_external_host_dns(&barstrainer_identity, &dns_suffix, &external_host, &barstrainer_addr)
630 .await
631 {
632 tracing::warn!(
633 error = ?err,
634 external_host = %external_host,
635 barstrainer_addr = %barstrainer_addr,
636 "Failed to publish node DNS record"
637 );
638 }
639 });
640 }
641 Err(err) => {
642 tracing::warn!(
643 error = ?err,
644 "Skipping external DNS publication because signing identity could not be built"
645 );
646 }
647 }
648 } else {
649 if config.network.external_host.is_some() {
650 tracing::warn!("Skipping external DNS publication because network.barstrainer_addr is not configured");
651 }
652 }
653
654 let shutdown = CancellationToken::new();
655
656 let http_tls = if let Some(mut acme_config) = config.network.acme.clone() {
659 acme_config.storage_path = config
660 .resolved_acme_storage_path()
661 .expect("resolved ACME storage path should exist when ACME is configured");
662 let acme = lyquor_lib::acme::Acme::new(
663 acme_config,
664 config.network.barstrainer_addr.clone().unwrap_or_default(),
665 profile.dns_suffix().to_owned(),
666 &tls_config,
667 );
668
669 let http_tls = if let Ok(acme) = acme {
670 let domain = barstrainer_client::fqdn_for_node(&tls_config.node_id(), profile.dns_suffix(), None);
671 let wildcard_domain =
672 barstrainer_client::fqdn_for_node(&tls_config.node_id(), profile.dns_suffix(), Some("*"));
673 let domains = vec![domain, wildcard_domain];
674
675 http::httptls::HttpsTlsRuntime::new(acme, domains, &tls_config, shutdown.child_token()).await
676 } else {
677 Err(anyhow::anyhow!("Failed to construct ACME config"))
678 };
679
680 match http_tls {
681 Ok(t) => Some(t),
682 Err(err) => {
683 tracing::warn!("Failed to initialize HTTP TLS for api server: {:?}", err);
684 None
685 }
686 }
687 } else {
688 None
689 };
690
691 let config = Arc::new(config);
693 let network = lyquor_net::hub::Hub::new(tls_config, config.network.upc_addr.clone(), shutdown.child_token())
694 .await
695 .context("Failed to setup network.")?;
696 let network = Arc::new(network);
697
698 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
701 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
702
703 let mut node = Node {
704 sys: Subsystems::default(),
705 config,
706 profile: profile.clone(),
707 seq_eth_url,
708 http_tls,
709 started_anvil: _anvil.is_some(),
710 network: Some(network.clone()),
711 vm_runtime,
712 shutdown,
713 }
714 .start()
715 .await?;
716
717 tokio::select! {
718 _ = sigint.recv() => {
719 tracing::info!("received SIGINT");
720 }
721 _ = sigterm.recv() => {
722 tracing::info!("received SIGTERM");
723 }
724 }
725
726 node.shutdown().await;
727 network.wait_for_shutdown().await;
728 Ok(())
729}