Skip to main content

lyquor/
lyquor.rs

1use std::path::{Path, PathBuf};
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use alloy_node_bindings::{Anvil, AnvilInstance};
7use barstrainer_client::SigningIdentity;
8use lyquor_primitives::{LyquidID, LyquidNumber, debug_struct_name};
9use serde_json::{Map, Value};
10use tokio::signal::unix::{SignalKind, signal};
11use tokio_util::sync::CancellationToken;
12
13use lyquor_api::{
14    anyhow::{self, Context as _Context},
15    config::{DB, NodeConfig},
16    profile::{LyquorProfile, NetworkType},
17    store::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
18    subkey_builder,
19};
20use lyquor_db::{MemDB, RocksDB};
21use lyquor_hosting::{
22    ImageResolver, LyquidConfig, LyquidLog, LyquidRegistry, ProcessFactory, ProcessFactoryDeps, ProcessManager,
23    ProcessStoreFactory, ProcessStores, Sequencer, VmRuntime,
24};
25use lyquor_image_store::{DirectoryOciStore, DirectoryRepo, LyquidImageRepo, OciDistributionStore};
26use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
27use lyquor_jsonrpc::types::{EthGetChainId, EthGetChainIdResp};
28use lyquor_lib::db::{DBFlusher, DBFlusherHandle};
29use lyquor_lib::dns_updater::publish_external_host_dns;
30use lyquor_lib::http;
31use lyquor_lib::side_effects::{SideEffectRuntime, build_side_effect_runtime};
32use lyquor_primitives::{Address, Bytes, LyteLog};
33use lyquor_seq::{SequenceBackend, eth, fco::FCO};
34
35subkey_builder!(RootSubKey(
36    ([0x01])-lyquid() => Key,
37    ([0x02])-fco() => Key,
38    ([0x03])-log() => Key,
39    ([0x04])-side_effect_store() => Key,
40));
41
42fn subkey() -> &'static RootSubKey {
43    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
44    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
45}
46
47const ANVIL_STATE_FILENAME: &str = "anvil-state.json";
48
49const PROGRESS_KEY: [u8; 1] = [0x00];
50
51subkey_builder!(ProcessStoreRootSubkey(
52    ([])-lyquid(&LyquidID) => ProcessLyquidSubkey,
53));
54
55subkey_builder!(ProcessLyquidSubkey(
56    (PROGRESS_KEY)-progress() => Key
57));
58
59static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
60
61async fn setup_eth_backend(
62    profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
63    shutdown: CancellationToken,
64) -> anyhow::Result<eth::Backend> {
65    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
66        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
67
68    Ok(eth::Backend::new(
69        eth::Config::builder()
70            .finality_tag(finality_tag)
71            .registry(reg)
72            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
73            .contract_map_factory(Arc::new(|store, cache_size| {
74                Box::pin(async move {
75                    let store = lyquor_db::SortedMappingStore::new(store, cache_size).await?;
76                    Ok(Arc::new(store) as eth::ContractMap)
77                })
78            }))
79            .build(),
80        jsonrpc_client,
81        shutdown,
82    ))
83}
84
85fn lyquid_number_map_factory() -> lyquor_hosting::LyquidNumberMapFactory {
86    Arc::new(|store, cache_size| {
87        Box::pin(async move {
88            let store = lyquor_db::SortedMappingStore::new(store, cache_size).await?;
89            Ok(Arc::new(store) as lyquor_hosting::LyquidNumberMap)
90        })
91    })
92}
93
94fn kv_store_backed_process_store_factory<V: KVStore + 'static>(store: Arc<V>) -> ProcessStoreFactory {
95    Arc::new(move |id| {
96        let store = store.clone();
97        Box::pin(async move {
98            let subkey = ProcessStoreRootSubkey::new([].into()).lyquid(&id);
99            let vm_root: Arc<dyn KVStore> = Arc::new(PrefixedKVStore::new(store.clone(), subkey.prefix().clone()));
100            let vm = lyquor_vm::InstanceStores::kv(vm_root).await?;
101            let progress: Arc<dyn KVStore> = Arc::new(PrefixedKVStore::new(store.clone(), subkey.progress()));
102            Ok(ProcessStores { vm, progress })
103        })
104    })
105}
106
107#[derive(Default)]
108struct Subsystems {
109    eth_backend: Option<lyquor_seq::eth::Backend>,
110    fco: Option<FCO>,
111    log: Option<LyquidLog>,
112    process_manager: Option<ProcessManager>,
113    api: Option<http::HttpServer>,
114    db: Option<DBFlusherHandle>,
115    contract_resolver: Option<lyquor_lib::discovery::ContractResolver>,
116    registration_watcher: Option<lyquor_lib::discovery::RegistrationWatcher>,
117    client: Option<ClientHandle>,
118}
119debug_struct_name!(Subsystems);
120
121impl Subsystems {
122    #[tracing::instrument(level = "trace")]
123    async fn stop(&mut self, shutdown: CancellationToken) {
124        if let Some(api) = self.api.take() {
125            api.stop();
126        }
127        if let Some(eth_backend) = self.eth_backend.take() {
128            eth_backend.shutdown().await;
129        }
130        if let Some(registration_watcher) = self.registration_watcher.take() {
131            registration_watcher.shutdown().await;
132        }
133        if let Some(process_manager) = self.process_manager.take() {
134            process_manager.shutdown().await;
135        }
136        if let Some(db) = self.db.take() {
137            db.stop().await;
138        }
139        shutdown.cancel();
140        if let Some(client) = self.client.take() {
141            client.wait_for_shutdown().await;
142        }
143        if let Some(fco) = self.fco.take() {
144            fco.shutdown().await;
145        }
146        if let Some(log) = self.log.take() {
147            log.shutdown().await;
148        }
149    }
150}
151
152struct Node {
153    sys: Subsystems,
154    config: Arc<NodeConfig>,
155    profile: Arc<LyquorProfile>,
156    seq_eth_url: String,
157    http_tls: Option<http::httptls::HttpsTlsRuntime>,
158    started_anvil: bool,
159    network: Option<Arc<lyquor_net::hub::Hub>>,
160    vm_runtime: tokio::runtime::Handle,
161    shutdown: CancellationToken,
162}
163
164lyquor_primitives::debug_struct_name!(Node);
165
166impl Node {
167    #[tracing::instrument(level = "trace", skip_all)]
168    async fn start(mut self) -> anyhow::Result<Self> {
169        self.sys = Self::init(
170            self.config.clone(),
171            self.profile.clone(),
172            self.seq_eth_url.clone(),
173            self.http_tls.clone(),
174            self.started_anvil,
175            self.network.take().expect("Node: Network not found."),
176            self.vm_runtime.clone(),
177            self.shutdown.clone(),
178        )
179        .await?;
180        Ok(self)
181    }
182
183    #[tracing::instrument(level = "trace", skip_all)]
184    async fn shutdown(&mut self) {
185        self.sys.stop(self.shutdown.clone()).await;
186    }
187
188    #[tracing::instrument(level = "trace", skip_all)]
189    async fn init(
190        config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String,
191        http_tls: Option<http::httptls::HttpsTlsRuntime>, started_anvil: bool, network: Arc<lyquor_net::hub::Hub>,
192        vm_runtime: tokio::runtime::Handle, shutdown: CancellationToken,
193    ) -> anyhow::Result<Subsystems> {
194        let mut sys = Subsystems::default();
195
196        // Set up Ethereum sequence backend.
197        let jsonrpc_client = ClientConfig::builder()
198            .url(seq_eth_url.parse()?)
199            .build()
200            .into_client(shutdown.child_token());
201        sys.client = Some(jsonrpc_client.clone());
202        let sequence_backend_chain_id: u64 = jsonrpc_client
203            .request::<EthGetChainId, EthGetChainIdResp>(EthGetChainId)
204            .await
205            .context("Failed to query sequence backend chain ID.")?
206            .0
207            .to();
208        let sequence_backend_bartender_addr =
209            Address::from_str(profile.bartender_addr()).context("Invalid bartender address in profile.")?;
210        let resolved_db_dir = config.resolved_db_dir();
211        let resolved_image_repo_dir = config.resolved_image_repo_dir();
212
213        //// 1. DB & Storage
214        let store: Arc<ShadowKVStore<Box<dyn KVStore>>> = match &config.storage.db {
215            DB::MemDb => {
216                tracing::warn!("using MemDB, all data will be lost upon exit");
217                Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>))
218            }
219            DB::RocksDb { .. } => {
220                let base_dir = resolved_db_dir.clone();
221                let store = Arc::new(ShadowKVStore::new(
222                    Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
223                ));
224
225                if started_anvil {
226                    let state_file = anvil_state_path(&base_dir, profile.id());
227                    if state_file.exists() {
228                        tracing::info!("Using Anvil state file {}.", state_file.display());
229                    } else {
230                        tracing::info!(
231                            "No existing Anvil state file found at {}, starting with fresh state.",
232                            state_file.display()
233                        );
234                    }
235                }
236
237                store
238            }
239        };
240        let lyq_store: ProcessStoreFactory =
241            kv_store_backed_process_store_factory(Arc::new(PrefixedKVStore::new(store.clone(), subkey().lyquid())));
242        ////
243
244        //// 2. Fate-Constrainted Ordering Subsystem.
245        // Start discovering Lyquid and sequencing contracts.
246        let contract_resolver =
247            lyquor_lib::discovery::ContractResolver::new(*profile.bartender_id(), sequence_backend_bartender_addr);
248        let contract_resolver_handle = contract_resolver.handle();
249        sys.contract_resolver = Some(contract_resolver);
250
251        let eth_backend = setup_eth_backend(
252            &profile,
253            contract_resolver_handle.contract_registry_service(),
254            jsonrpc_client.clone(),
255            shutdown.child_token(),
256        )
257        .await?;
258        sys.eth_backend = Some(eth_backend.clone());
259        let SideEffectRuntime {
260            store: side_effect_store,
261            provider: side_effect_provider,
262        } = build_side_effect_runtime(config.as_ref(), store.clone(), subkey().side_effect_store())?;
263        let archive_inbound_store = (!side_effect_provider.is_selective_hosting()).then(|| side_effect_store.clone());
264
265        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
266        let fco = FCO::new(
267            eth_backend.clone(),
268            Arc::new(fco_store),
269            Some(profile.init_chain_position()),
270            side_effect_provider,
271            archive_inbound_store,
272        )
273        .await?;
274        let fco_handle = fco.handle();
275        sys.fco = Some(fco);
276        ////
277
278        //// 3. Lyquor Virtual Machine Subsystem.
279        // Configure VM log persistence (store) and runtime log hub (delivery).
280        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
281        let log_store: Arc<dyn KVStore> = Arc::new(log_store);
282        let log = LyquidLog::new(log_store);
283        let log_hub = log.handle();
284        sys.log = Some(log);
285
286        let mut lvm = VmRuntime::new(
287            vm_runtime,
288            network.clone(),
289            Some(Sequencer {
290                inter: lyquor_api::interface::InterCallService::new(fco_handle.clone()),
291                submit: lyquor_api::interface::SubmitService::new(fco_handle.clone()),
292                fetch_oracle_info: lyquor_api::interface::OracleInfoService::new(fco_handle.clone()),
293            }),
294        )
295        .context("Failed to initialize shared wasmtime engine.")?;
296        ////
297
298        //// 4. UPC peers
299        for peer in &config.peers {
300            lvm.add_remote(peer.node_id, Some(peer.upc_addr.clone()))
301                .await
302                .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
303        }
304        ////
305
306        //// 5. Hosted Lyquid processes
307        let image_repo = Arc::new(
308            DirectoryRepo::new(&resolved_image_repo_dir)
309                .await
310                .context("Failed to initialize image store.")?,
311        );
312        let image_repo_for_resolver: Arc<dyn LyquidImageRepo> = image_repo.clone();
313        let image_repo_for_http: Arc<dyn OciDistributionStore> = Arc::new(DirectoryOciStore::new(image_repo.clone()));
314        let image_resolver = Arc::new(ImageResolver::new(
315            image_repo_for_resolver,
316            config.image.fallback_repos.clone(),
317        ));
318        let bartender_id = *profile.bartender_id();
319        let process_factory = ProcessFactory::new(
320            lvm,
321            ProcessFactoryDeps {
322                fco: fco_handle.clone(),
323                store_factory: lyq_store,
324                lyquid_number_map_factory: lyquid_number_map_factory(),
325                image_resolver,
326                side_effect_store,
327                log_hub: log_hub.clone(),
328                config: LyquidConfig::builder().build(),
329            },
330        );
331        let process_manager = ProcessManager::new(bartender_id, process_factory).await?;
332        let process_manager_handle = process_manager.handle();
333        sys.process_manager = Some(process_manager);
334        ////
335
336        // Configure bartender.
337        let register_topic = LyteLog::tagged_value_topic("Register");
338        let register_count = log_hub.get_number_of_records(bartender_id, register_topic).await;
339
340        let bartender_ins = process_manager_handle
341            .get_bartender()
342            .await
343            .context("Failed to obtain bartender.")?;
344        let sequence_backend_id =
345            lyquor_primitives::sequence_backend_id(sequence_backend_chain_id, sequence_backend_bartender_addr);
346        let lyquid_registry = LyquidRegistry::new(bartender_ins.clone(), sequence_backend_id);
347        lyquid_registry
348            .bind_process_manager(process_manager_handle.clone())
349            .await
350            .context("Failed to bind process manager into LyquidRegistry.")?;
351        process_manager_handle
352            .bind_registry(lyquid_registry.clone())
353            .await
354            .context("Failed to bind LyquidRegistry into ProcessManager.")?;
355
356        let node_service_context = http::NodeServiceContext::new(config.clone(), network.clone());
357        // Start the API server - use the same jsonrpc client as sequencer
358        let api = lyquor_lib::http::HttpServer::new(
359            &config.network.api_addr,
360            jsonrpc_client.clone(),
361            fco_handle.clone(),
362            process_manager_handle.clone(),
363            lyquid_registry.clone(),
364            node_service_context,
365            image_repo_for_http,
366            shutdown.child_token(),
367            http_tls,
368        )
369        .await?;
370        sys.api = Some(api);
371
372        // Wait for bartender to be registered.
373        if register_count == 0 {
374            tracing::info!("Waiting for bartender to be registered.");
375            log_hub
376                .wait_next(bartender_id, register_topic)
377                .await
378                .context("Failed to wait for bartender registration.")?;
379        }
380        // Bartender, as a normal Lyquid, may have some re-execution, let's make sure it is done.
381        let bartender_ln = bartender_ins.max_number().await.unwrap_or(LyquidNumber::ZERO);
382        tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
383        bartender_ins
384            .wait_until(bartender_ln)
385            .await
386            .context("Failed to wait for the initial re-execution of bartender.")?;
387
388        contract_resolver_handle
389            .bind_registry(lyquid_registry.clone())
390            .await
391            .context("Failed to bind LyquidRegistry into ContractResolver.")?;
392
393        let registration_watcher = lyquor_lib::discovery::RegistrationWatcher::start(
394            lyquid_registry.clone(),
395            process_manager_handle.clone(),
396            contract_resolver_handle,
397            log_hub
398                .subscribe(bartender_id, register_topic)
399                .await
400                .context("Failed to subscribe to log system for RegistrationWatcher.")?,
401        );
402        sys.registration_watcher = Some(registration_watcher);
403
404        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
405
406        // Start a DB writer that persist when node is idle.
407        let db = DBFlusher::new(0, store, fco_handle, Duration::from_secs(config.flush_interval_secs)).start();
408        sys.db = Some(db);
409
410        Ok(sys)
411    }
412}
413
414fn generate_unused_port() -> anyhow::Result<u16> {
415    let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
416    let port = listener.local_addr().context("Failed to get local address.")?.port();
417    Ok(port)
418}
419
420fn anvil_state_path(data_dir: &Path, profile_id: &str) -> PathBuf {
421    data_dir.join(profile_id).join(ANVIL_STATE_FILENAME)
422}
423
424fn start_devnet_anvil(state_file: Option<&Path>, state_interval_secs: u64) -> anyhow::Result<AnvilInstance> {
425    let anvil_port = generate_unused_port()?;
426    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
427    // terminate when dropped)
428    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
429    let mut sigset = SigSet::empty();
430    sigset.add(Signal::SIGINT);
431    let mut old_mask = SigSet::empty();
432    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
433        .context("Failed to mask SIGINT for anvil.")?;
434
435    let mut anvil_builder = Anvil::new().port(anvil_port).keep_stdout();
436    if let Some(state_file) = state_file {
437        if let Some(state_dir) = state_file.parent() {
438            std::fs::create_dir_all(state_dir)?;
439        }
440        anvil_builder = anvil_builder.arg("--state").arg(state_file.as_os_str());
441        if state_interval_secs > 0 {
442            anvil_builder = anvil_builder
443                .arg("--state-interval")
444                .arg(state_interval_secs.to_string());
445        }
446    }
447    let mut anvil = anvil_builder.try_spawn()?;
448
449    let stdout_reader = std::io::BufReader::new(
450        anvil
451            .child_mut()
452            .stdout
453            .take()
454            .context("Failed to read from anvil stdout.")?,
455    );
456
457    tokio::task::spawn_blocking(move || {
458        use std::io::BufRead;
459        let mut stdout_lines = stdout_reader.lines();
460        while let Some(line) = stdout_lines.next() {
461            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
462        }
463    });
464
465    tracing::info!("Anvil started at port {}.", anvil_port);
466    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
467        .context("Failed to restore old signal mask for anvil.")?;
468
469    Ok(anvil)
470}
471
472fn parse_override_value(raw: &str) -> Value {
473    if let Ok(v) = raw.parse::<bool>() {
474        return Value::from(v);
475    }
476    if let Ok(v) = raw.parse::<u64>() {
477        return Value::from(v);
478    }
479    if let Ok(v) = raw.parse::<i64>() {
480        return Value::from(v);
481    }
482    if let Ok(v) = raw.parse::<f64>() {
483        return Value::from(v);
484    }
485    Value::from(raw.to_string())
486}
487
488fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
489    let mut segments = key.split('.').peekable();
490    let mut current = overrides;
491
492    while let Some(segment) = segments.next() {
493        if segments.peek().is_none() {
494            current.insert(segment.to_string(), value);
495            return Ok(());
496        }
497
498        let entry = current
499            .entry(segment.to_string())
500            .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
501
502        match entry {
503            Value::Object(dict) => {
504                current = dict;
505            }
506            _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
507        }
508    }
509
510    Ok(())
511}
512
513fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
514    let mut overrides: Map<String, Value> = Map::new();
515
516    if let Some(entries) = matches.get_many::<String>("config-override") {
517        for entry in entries {
518            let (key, value) = entry
519                .split_once('=')
520                .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
521            insert_override_path(&mut overrides, key, parse_override_value(value))?;
522        }
523    }
524
525    Ok(overrides)
526}
527
528const NODE_RUNTIME_THREAD_NAME: &str = "lyquor-node";
529const VM_RUNTIME_THREAD_NAME: &str = "lyquor-vm";
530
531fn build_node_runtime(worker_threads: usize) -> std::io::Result<tokio::runtime::Runtime> {
532    tokio::runtime::Builder::new_multi_thread()
533        .enable_all()
534        .worker_threads(worker_threads)
535        .thread_name(NODE_RUNTIME_THREAD_NAME)
536        .build()
537}
538
539fn build_vm_runtime(worker_threads: usize) -> std::io::Result<tokio::runtime::Runtime> {
540    tokio::runtime::Builder::new_multi_thread()
541        .enable_all()
542        .worker_threads(worker_threads)
543        .thread_name(VM_RUNTIME_THREAD_NAME)
544        .build()
545}
546
547fn load_config_from_args() -> anyhow::Result<NodeConfig> {
548    let matches = clap::command!()
549        .version(lyquor_cli::build_version!())
550        .propagate_version(true)
551        .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
552        .arg(
553            clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
554                .required(false)
555                .action(clap::ArgAction::Append)
556                .value_parser(clap::builder::NonEmptyStringValueParser::new()),
557        )
558        .get_matches();
559
560    let config_path = matches.get_one::<String>("config").map(PathBuf::from);
561    let overrides = build_config_overrides(&matches)?;
562    NodeConfig::load(config_path, overrides)
563}
564
565fn main() -> anyhow::Result<()> {
566    // Install panic hook to cleanup anvil on panic
567    std::panic::set_hook(Box::new(|_| {
568        if let Some(&pid) = ANVIL_PID.get() {
569            tracing::warn!("Panic detected, killing anvil process {}", pid);
570            use nix::sys::signal::{Signal, kill};
571            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
572        }
573    }));
574
575    lyquor_cli::setup_tracing()?;
576
577    println!("{}", lyquor_cli::format_logo_banner(lyquor_cli::build_version!()));
578
579    let config = load_config_from_args()?;
580    let runtime = build_node_runtime(config.runtime.node_threads).context("Failed to build node Tokio runtime")?;
581    let vm_runtime = build_vm_runtime(config.runtime.execution_threads).context("Failed to build VM Tokio runtime")?;
582    let vm_runtime_handle = vm_runtime.handle().clone();
583
584    let result = runtime.block_on(run_node(config, vm_runtime_handle));
585    vm_runtime.shutdown_background();
586    result
587}
588
589async fn run_node(config: NodeConfig, vm_runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
590    let mut _anvil: Option<AnvilInstance> = None;
591    let seq_eth_url = match config.profile.base {
592        NetworkType::Devnet => {
593            match config.profile.sequencer.as_deref() {
594                None => {
595                    let state_file = match &config.storage.db {
596                        DB::RocksDb { .. } => Some(anvil_state_path(
597                            &config.resolved_db_dir(),
598                            config.profile.base.as_str(),
599                        )),
600                        DB::MemDb => None,
601                    };
602                    let anvil = start_devnet_anvil(state_file.as_deref(), config.flush_interval_secs)?;
603                    // Store anvil PID for panic hook
604                    ANVIL_PID.set(anvil.child().id()).ok();
605                    let ep = anvil.ws_endpoint();
606                    _anvil = Some(anvil);
607                    ep
608                }
609                Some(url) => url.to_string(),
610            }
611        }
612        _ => return Err(anyhow::anyhow!("Network is not supported.")),
613    };
614    let profile: Arc<LyquorProfile> = Arc::new(config.profile.resolve(seq_eth_url.clone())?);
615    let node_seed = config.node_key.seed_bytes()?;
616    let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed, profile.dns_suffix());
617
618    if let (Some(external_host), Some(barstrainer_addr)) = (
619        config.network.external_host.as_deref(),
620        config.network.barstrainer_addr.as_deref(),
621    ) {
622        match SigningIdentity::from_tls_config(&tls_config) {
623            Ok(barstrainer_identity) => {
624                let dns_suffix = profile.dns_suffix().to_string();
625                let external_host = external_host.to_string();
626                let barstrainer_addr = barstrainer_addr.to_string();
627                tokio::spawn(async move {
628                    if let Err(err) =
629                        publish_external_host_dns(&barstrainer_identity, &dns_suffix, &external_host, &barstrainer_addr)
630                            .await
631                    {
632                        tracing::warn!(
633                            error = ?err,
634                            external_host = %external_host,
635                            barstrainer_addr = %barstrainer_addr,
636                            "Failed to publish node DNS record"
637                        );
638                    }
639                });
640            }
641            Err(err) => {
642                tracing::warn!(
643                    error = ?err,
644                    "Skipping external DNS publication because signing identity could not be built"
645                );
646            }
647        }
648    } else {
649        if config.network.external_host.is_some() {
650            tracing::warn!("Skipping external DNS publication because network.barstrainer_addr is not configured");
651        }
652    }
653
654    let shutdown = CancellationToken::new();
655
656    // Grab TLS cert via ACME DNS-01 and barstrainer
657    // This still works without barstrainer_addr because user may supply local certs that is valid
658    let http_tls = if let Some(mut acme_config) = config.network.acme.clone() {
659        acme_config.storage_path = config
660            .resolved_acme_storage_path()
661            .expect("resolved ACME storage path should exist when ACME is configured");
662        let acme = lyquor_lib::acme::Acme::new(
663            acme_config,
664            config.network.barstrainer_addr.clone().unwrap_or_default(),
665            profile.dns_suffix().to_owned(),
666            &tls_config,
667        );
668
669        let http_tls = if let Ok(acme) = acme {
670            let domain = barstrainer_client::fqdn_for_node(&tls_config.node_id(), profile.dns_suffix(), None);
671            let wildcard_domain =
672                barstrainer_client::fqdn_for_node(&tls_config.node_id(), profile.dns_suffix(), Some("*"));
673            let domains = vec![domain, wildcard_domain];
674
675            http::httptls::HttpsTlsRuntime::new(acme, domains, &tls_config, shutdown.child_token()).await
676        } else {
677            Err(anyhow::anyhow!("Failed to construct ACME config"))
678        };
679
680        match http_tls {
681            Ok(t) => Some(t),
682            Err(err) => {
683                tracing::warn!("Failed to initialize HTTP TLS for api server: {:?}", err);
684                None
685            }
686        }
687    } else {
688        None
689    };
690
691    // Start network hub
692    let config = Arc::new(config);
693    let network = lyquor_net::hub::Hub::new(tls_config, config.network.upc_addr.clone(), shutdown.child_token())
694        .await
695        .context("Failed to setup network.")?;
696    let network = Arc::new(network);
697
698    // Signal handlers should be registered before starting any actors, so they'll be stopped
699    // gracefully.
700    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
701    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
702
703    let mut node = Node {
704        sys: Subsystems::default(),
705        config,
706        profile: profile.clone(),
707        seq_eth_url,
708        http_tls,
709        started_anvil: _anvil.is_some(),
710        network: Some(network.clone()),
711        vm_runtime,
712        shutdown,
713    }
714    .start()
715    .await?;
716
717    tokio::select! {
718        _ = sigint.recv() => {
719            tracing::info!("received SIGINT");
720        }
721        _ = sigterm.recv() => {
722            tracing::info!("received SIGTERM");
723        }
724    }
725
726    node.shutdown().await;
727    network.wait_for_shutdown().await;
728    Ok(())
729}