From 9ab80cd7a4b5a9ad6c15cb47ec56226fea2ad0a5 Mon Sep 17 00:00:00 2001 From: Slinetrac Date: Mon, 27 Oct 2025 10:58:07 +0800 Subject: [PATCH] refactor(profile-switch): add watchdog, heartbeat, and async timeout guards - Introduce SwitchHeartbeat for stage tracking and timing; log stage transitions with elapsed durations. - Add watchdog in driver to cancel stalled switches (5s heartbeat timeout). - Wrap blocking ops (Config::apply, tray updates, profiles_save_file_safe, etc.) with time::timeout to prevent async stalls. - Improve logs for stage transitions and watchdog timeouts to clarify cancellation points. --- src-tauri/src/cmd/profile_switch/driver.rs | 72 ++++-- src-tauri/src/cmd/profile_switch/state.rs | 54 ++++- src-tauri/src/cmd/profile_switch/workflow.rs | 44 +++- .../profile_switch/workflow/state_machine.rs | 208 +++++++++++++++--- 4 files changed, 320 insertions(+), 58 deletions(-) diff --git a/src-tauri/src/cmd/profile_switch/driver.rs b/src-tauri/src/cmd/profile_switch/driver.rs index d8eecc01..d25c6f51 100644 --- a/src-tauri/src/cmd/profile_switch/driver.rs +++ b/src-tauri/src/cmd/profile_switch/driver.rs @@ -1,7 +1,7 @@ use super::{ CmdResult, state::{SwitchCancellation, SwitchManager, SwitchRequest, manager}, - workflow::{self, SwitchPanicInfo}, + workflow::{self, SwitchPanicInfo, SwitchStage}, }; use crate::{logging, utils::logging::Type}; use futures::FutureExt; @@ -10,15 +10,22 @@ use smartstring::alias::String as SmartString; use std::{ collections::{HashMap, VecDeque}, panic::AssertUnwindSafe, + time::Duration, }; -use tokio::sync::{ - mpsc::{self, error::TrySendError}, - oneshot, +use tokio::{ + sync::{ + mpsc::{self, error::TrySendError}, + oneshot, + }, + time::{self, MissedTickBehavior}, }; const SWITCH_QUEUE_CAPACITY: usize = 32; static SWITCH_QUEUE: OnceCell> = OnceCell::new(); +const WATCHDOG_TIMEOUT: Duration = Duration::from_secs(5); +const WATCHDOG_TICK: Duration = Duration::from_millis(500); + #[derive(Debug, Default)] struct SwitchDriverState { active: Option, @@ -265,18 +272,53 @@ fn start_switch_job( request: SwitchRequest, ) { let completion_request = request.clone(); + let heartbeat = request.heartbeat().clone(); + let cancel_token = request.cancel_token().clone(); + let task_id = request.task_id(); + let profile_label = request.profile_id().clone(); + tokio::spawn(async move { - let job_result = match AssertUnwindSafe(workflow::run_switch_job(manager, request)) - .catch_unwind() - .await - { - Ok(Ok(success)) => SwitchJobOutcome::Completed { success }, - Ok(Err(info)) => SwitchJobOutcome::Panicked { info }, - Err(payload) => SwitchJobOutcome::Panicked { - info: SwitchPanicInfo::driver_task(workflow::describe_panic_payload( - payload.as_ref(), - )), - }, + let mut watchdog_interval = time::interval(WATCHDOG_TICK); + watchdog_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let workflow_fut = + AssertUnwindSafe(workflow::run_switch_job(manager, request)).catch_unwind(); + tokio::pin!(workflow_fut); + + let job_result = loop { + tokio::select! { + res = workflow_fut.as_mut() => { + break match res { + Ok(Ok(success)) => SwitchJobOutcome::Completed { success }, + Ok(Err(info)) => SwitchJobOutcome::Panicked { info }, + Err(payload) => SwitchJobOutcome::Panicked { + info: SwitchPanicInfo::driver_task( + workflow::describe_panic_payload(payload.as_ref()), + ), + }, + }; + } + _ = watchdog_interval.tick() => { + if cancel_token.is_cancelled() { + continue; + } + let elapsed = heartbeat.elapsed(); + if elapsed > WATCHDOG_TIMEOUT { + let stage = SwitchStage::from_code(heartbeat.stage_code()) + .unwrap_or(SwitchStage::Workflow); + logging!( + warn, + Type::Cmd, + "Switch task {} watchdog timeout (profile={} stage={:?}, elapsed={:?}); cancelling", + task_id, + profile_label.as_str(), + stage, + elapsed + ); + cancel_token.cancel(); + } + } + } }; if let Err(err) = driver_tx diff --git a/src-tauri/src/cmd/profile_switch/state.rs b/src-tauri/src/cmd/profile_switch/state.rs index 9ccf328c..8fc930af 100644 --- a/src-tauri/src/cmd/profile_switch/state.rs +++ b/src-tauri/src/cmd/profile_switch/state.rs @@ -1,8 +1,8 @@ use once_cell::sync::OnceCell; use smartstring::alias::String as SmartString; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::Duration; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::Mutex; pub(super) const SWITCH_JOB_TIMEOUT: Duration = Duration::from_secs(30); @@ -97,6 +97,7 @@ pub(super) struct SwitchRequest { profile_id: SmartString, notify: bool, cancel_token: SwitchCancellation, + heartbeat: SwitchHeartbeat, } impl SwitchRequest { @@ -106,6 +107,7 @@ impl SwitchRequest { profile_id, notify, cancel_token: SwitchCancellation::new(), + heartbeat: SwitchHeartbeat::new(), } } @@ -130,4 +132,52 @@ impl SwitchRequest { pub(super) fn cancel_token(&self) -> &SwitchCancellation { &self.cancel_token } + + pub(super) fn heartbeat(&self) -> &SwitchHeartbeat { + &self.heartbeat + } +} + +#[derive(Debug, Clone)] +pub(super) struct SwitchHeartbeat { + last_tick_millis: Arc, + stage_code: Arc, +} + +impl SwitchHeartbeat { + fn now_millis() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_millis() as u64 + } + + pub(super) fn new() -> Self { + let heartbeat = Self { + last_tick_millis: Arc::new(AtomicU64::new(Self::now_millis())), + stage_code: Arc::new(AtomicU32::new(0)), + }; + heartbeat.touch(); + heartbeat + } + + pub(super) fn touch(&self) { + self.last_tick_millis + .store(Self::now_millis(), Ordering::SeqCst); + } + + pub(super) fn elapsed(&self) -> Duration { + let last = self.last_tick_millis.load(Ordering::SeqCst); + let now = Self::now_millis(); + Duration::from_millis(now.saturating_sub(last)) + } + + pub(super) fn set_stage(&self, stage: u32) { + self.stage_code.store(stage, Ordering::SeqCst); + self.touch(); + } + + pub(super) fn stage_code(&self) -> u32 { + self.stage_code.load(Ordering::SeqCst) + } } diff --git a/src-tauri/src/cmd/profile_switch/workflow.rs b/src-tauri/src/cmd/profile_switch/workflow.rs index 7b577dad..071ec4a8 100644 --- a/src-tauri/src/cmd/profile_switch/workflow.rs +++ b/src-tauri/src/cmd/profile_switch/workflow.rs @@ -19,8 +19,8 @@ use tokio::{fs as tokio_fs, time}; mod state_machine; -pub(super) use state_machine::SwitchPanicInfo; -use state_machine::SwitchStateMachine; +use state_machine::{CONFIG_APPLY_TIMEOUT, SAVE_PROFILES_TIMEOUT, SwitchStateMachine}; +pub(super) use state_machine::{SwitchPanicInfo, SwitchStage}; pub(super) async fn run_switch_job( manager: &'static SwitchManager, @@ -324,16 +324,40 @@ pub(super) async fn restore_previous_profile(previous: Option) -> C .draft_mut() .patch_config(restore_profiles) .stringify_err()?; - Config::profiles().await.apply(); + if time::timeout(CONFIG_APPLY_TIMEOUT, async { + Config::profiles().await.apply(); + }) + .await + .is_err() + { + logging!( + warn, + Type::Cmd, + "Restoring previous configuration timed out after {:?}", + CONFIG_APPLY_TIMEOUT + ); + return Ok(()); + } AsyncHandler::spawn(|| async move { - if let Err(e) = profiles_save_file_safe().await { - logging!( - warn, - Type::Cmd, - "Failed to persist restored configuration asynchronously: {}", - e - ); + match time::timeout(SAVE_PROFILES_TIMEOUT, profiles_save_file_safe()).await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + logging!( + warn, + Type::Cmd, + "Failed to persist restored configuration asynchronously: {}", + e + ); + } + Err(_) => { + logging!( + warn, + Type::Cmd, + "Persisting restored configuration timed out after {:?}", + SAVE_PROFILES_TIMEOUT + ); + } } }); } diff --git a/src-tauri/src/cmd/profile_switch/workflow/state_machine.rs b/src-tauri/src/cmd/profile_switch/workflow/state_machine.rs index ee12ff31..5a177fea 100644 --- a/src-tauri/src/cmd/profile_switch/workflow/state_machine.rs +++ b/src-tauri/src/cmd/profile_switch/workflow/state_machine.rs @@ -1,6 +1,6 @@ use super::{CmdResult, describe_panic_payload, restore_previous_profile, validate_profile_yaml}; use crate::{ - cmd::profile_switch::state::{SwitchManager, SwitchRequest, SwitchScope}, + cmd::profile_switch::state::{SwitchHeartbeat, SwitchManager, SwitchRequest, SwitchScope}, config::{Config, IProfiles, profiles::profiles_save_file_safe}, core::{CoreManager, handle, tray::Tray}, logging, @@ -11,6 +11,11 @@ use smartstring::alias::String as SmartString; use std::{mem, panic::AssertUnwindSafe, time::Duration}; use tokio::{sync::MutexGuard, time}; +pub(super) const CONFIG_APPLY_TIMEOUT: Duration = Duration::from_secs(5); +const TRAY_UPDATE_TIMEOUT: Duration = Duration::from_secs(3); +const REFRESH_TIMEOUT: Duration = Duration::from_secs(3); +pub(super) const SAVE_PROFILES_TIMEOUT: Duration = Duration::from_secs(5); + /// Explicit state machine for profile switching so we can reason about /// cancellation, stale requests, and side-effects at each stage. pub(super) struct SwitchStateMachine { @@ -31,6 +36,37 @@ pub(crate) enum SwitchStage { DriverTask, } +impl SwitchStage { + pub(crate) fn as_code(self) -> u32 { + match self { + SwitchStage::Start => 0, + SwitchStage::AcquireCore => 1, + SwitchStage::Prepare => 2, + SwitchStage::ValidateTarget => 3, + SwitchStage::PatchDraft => 4, + SwitchStage::UpdateCore => 5, + SwitchStage::Finalize => 6, + SwitchStage::Workflow => 7, + SwitchStage::DriverTask => 8, + } + } + + pub(crate) fn from_code(code: u32) -> Option { + Some(match code { + 0 => SwitchStage::Start, + 1 => SwitchStage::AcquireCore, + 2 => SwitchStage::Prepare, + 3 => SwitchStage::ValidateTarget, + 4 => SwitchStage::PatchDraft, + 5 => SwitchStage::UpdateCore, + 6 => SwitchStage::Finalize, + 7 => SwitchStage::Workflow, + 8 => SwitchStage::DriverTask, + _ => return None, + }) + } +} + #[derive(Debug, Clone)] pub(crate) struct SwitchPanicInfo { pub(crate) stage: SwitchStage, @@ -57,8 +93,13 @@ impl SwitchStateMachine { request: Option, profiles: IProfiles, ) -> Self { + let heartbeat = request + .as_ref() + .map(|req| req.heartbeat().clone()) + .unwrap_or_else(SwitchHeartbeat::new); + Self { - ctx: SwitchContext::new(manager, request, profiles), + ctx: SwitchContext::new(manager, request, profiles, heartbeat), state: SwitchState::Start, } } @@ -137,6 +178,7 @@ impl SwitchStateMachine { F: FnOnce(&'a mut Self) -> Fut, Fut: std::future::Future> + 'a, { + self.ctx.record_stage(stage); AssertUnwindSafe(f(self)) .catch_unwind() .await @@ -292,34 +334,96 @@ impl SwitchStateMachine { "Configuration update succeeded, sequence: {}", self.ctx.sequence() ); - Config::profiles().await.apply(); - handle::Handle::refresh_clash(); + match time::timeout(CONFIG_APPLY_TIMEOUT, async { + Config::profiles().await.apply(); + }) + .await + { + Ok(()) => {} + Err(_) => { + logging!( + warn, + Type::Cmd, + "Applying profile configuration timed out after {:?}", + CONFIG_APPLY_TIMEOUT + ); + Config::profiles().await.discard(); + return Ok(SwitchState::Complete(false)); + } + } - if let Err(err) = Tray::global().update_tooltip().await { + if time::timeout(REFRESH_TIMEOUT, async { + handle::Handle::refresh_clash(); + }) + .await + .is_err() + { logging!( warn, Type::Cmd, - "Failed to update tray tooltip asynchronously: {}", - err + "Refreshing Clash state timed out after {:?}", + REFRESH_TIMEOUT ); } - if let Err(err) = Tray::global().update_menu().await { - logging!( - warn, - Type::Cmd, - "Failed to update tray menu asynchronously: {}", - err - ); + match time::timeout(TRAY_UPDATE_TIMEOUT, Tray::global().update_tooltip()).await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + logging!( + warn, + Type::Cmd, + "Failed to update tray tooltip asynchronously: {}", + err + ); + } + Err(_) => { + logging!( + warn, + Type::Cmd, + "Updating tray tooltip timed out after {:?}", + TRAY_UPDATE_TIMEOUT + ); + } } - if let Err(err) = profiles_save_file_safe().await { - logging!( - warn, - Type::Cmd, - "Failed to persist configuration file asynchronously: {}", - err - ); + match time::timeout(TRAY_UPDATE_TIMEOUT, Tray::global().update_menu()).await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + logging!( + warn, + Type::Cmd, + "Failed to update tray menu asynchronously: {}", + err + ); + } + Err(_) => { + logging!( + warn, + Type::Cmd, + "Updating tray menu timed out after {:?}", + TRAY_UPDATE_TIMEOUT + ); + } + } + + match time::timeout(SAVE_PROFILES_TIMEOUT, profiles_save_file_safe()).await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + logging!( + warn, + Type::Cmd, + "Failed to persist configuration file asynchronously: {}", + err + ); + } + Err(_) => { + logging!( + warn, + Type::Cmd, + "Persisting configuration file timed out after {:?}", + SAVE_PROFILES_TIMEOUT + ); + } } if let Some(current) = self.ctx.new_profile_for_event.clone() { @@ -387,6 +491,10 @@ struct SwitchContext { new_profile_for_event: Option, switch_scope: Option>, core_guard: Option>, + heartbeat: SwitchHeartbeat, + task_id: Option, + profile_label: SmartString, + active_stage: SwitchStage, } impl SwitchContext { @@ -394,7 +502,15 @@ impl SwitchContext { manager: &'static SwitchManager, request: Option, profiles: IProfiles, + heartbeat: SwitchHeartbeat, ) -> Self { + let task_id = request.as_ref().map(|req| req.task_id()); + let profile_label = request + .as_ref() + .map(|req| req.profile_id().clone()) + .or_else(|| profiles.current.clone()) + .unwrap_or_else(|| SmartString::from("unknown")); + heartbeat.touch(); Self { manager, request, @@ -405,6 +521,10 @@ impl SwitchContext { new_profile_for_event: None, switch_scope: None, core_guard: None, + heartbeat, + task_id, + profile_label, + active_stage: SwitchStage::Start, } } @@ -462,16 +582,42 @@ impl SwitchContext { } fn sequence(&self) -> u64 { - match self.sequence { - Some(sequence) => sequence, - None => { - logging!( - warn, - Type::Cmd, - "Sequence unexpectedly missing in switch context; defaulting to 0" - ); - 0 - } + self.sequence.unwrap_or_else(|| { + logging!( + warn, + Type::Cmd, + "Sequence unexpectedly missing in switch context; defaulting to 0" + ); + 0 + }) + } + + fn record_stage(&mut self, stage: SwitchStage) { + let since_last = self.heartbeat.elapsed(); + let previous = self.active_stage; + self.active_stage = stage; + self.heartbeat.set_stage(stage.as_code()); + + match self.task_id { + Some(task_id) => logging!( + debug, + Type::Cmd, + "Switch task {} (profile={}) transitioned {:?} -> {:?} after {:?}", + task_id, + self.profile_label, + previous, + stage, + since_last + ), + None => logging!( + debug, + Type::Cmd, + "Profile patch {} transitioned {:?} -> {:?} after {:?}", + self.profile_label, + previous, + stage, + since_last + ), } } }