refactor(profile-switch): add watchdog, heartbeat, and async timeout guards

- Introduce SwitchHeartbeat for stage tracking and timing; log stage transitions with elapsed durations.
- Add watchdog in driver to cancel stalled switches (5s heartbeat timeout).
- Wrap blocking ops (Config::apply, tray updates, profiles_save_file_safe, etc.) with time::timeout to prevent async stalls.
- Improve logs for stage transitions and watchdog timeouts to clarify cancellation points.
This commit is contained in:
Slinetrac
2025-10-27 10:58:07 +08:00
Unverified
parent b3e234e9cc
commit 9ab80cd7a4
4 changed files with 320 additions and 58 deletions

View File

@@ -1,7 +1,7 @@
use super::{
CmdResult,
state::{SwitchCancellation, SwitchManager, SwitchRequest, manager},
workflow::{self, SwitchPanicInfo},
workflow::{self, SwitchPanicInfo, SwitchStage},
};
use crate::{logging, utils::logging::Type};
use futures::FutureExt;
@@ -10,15 +10,22 @@ use smartstring::alias::String as SmartString;
use std::{
collections::{HashMap, VecDeque},
panic::AssertUnwindSafe,
time::Duration,
};
use tokio::sync::{
mpsc::{self, error::TrySendError},
oneshot,
use tokio::{
sync::{
mpsc::{self, error::TrySendError},
oneshot,
},
time::{self, MissedTickBehavior},
};
const SWITCH_QUEUE_CAPACITY: usize = 32;
static SWITCH_QUEUE: OnceCell<mpsc::Sender<SwitchDriverMessage>> = OnceCell::new();
const WATCHDOG_TIMEOUT: Duration = Duration::from_secs(5);
const WATCHDOG_TICK: Duration = Duration::from_millis(500);
#[derive(Debug, Default)]
struct SwitchDriverState {
active: Option<SwitchRequest>,
@@ -265,18 +272,53 @@ fn start_switch_job(
request: SwitchRequest,
) {
let completion_request = request.clone();
let heartbeat = request.heartbeat().clone();
let cancel_token = request.cancel_token().clone();
let task_id = request.task_id();
let profile_label = request.profile_id().clone();
tokio::spawn(async move {
let job_result = match AssertUnwindSafe(workflow::run_switch_job(manager, request))
.catch_unwind()
.await
{
Ok(Ok(success)) => SwitchJobOutcome::Completed { success },
Ok(Err(info)) => SwitchJobOutcome::Panicked { info },
Err(payload) => SwitchJobOutcome::Panicked {
info: SwitchPanicInfo::driver_task(workflow::describe_panic_payload(
payload.as_ref(),
)),
},
let mut watchdog_interval = time::interval(WATCHDOG_TICK);
watchdog_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let workflow_fut =
AssertUnwindSafe(workflow::run_switch_job(manager, request)).catch_unwind();
tokio::pin!(workflow_fut);
let job_result = loop {
tokio::select! {
res = workflow_fut.as_mut() => {
break match res {
Ok(Ok(success)) => SwitchJobOutcome::Completed { success },
Ok(Err(info)) => SwitchJobOutcome::Panicked { info },
Err(payload) => SwitchJobOutcome::Panicked {
info: SwitchPanicInfo::driver_task(
workflow::describe_panic_payload(payload.as_ref()),
),
},
};
}
_ = watchdog_interval.tick() => {
if cancel_token.is_cancelled() {
continue;
}
let elapsed = heartbeat.elapsed();
if elapsed > WATCHDOG_TIMEOUT {
let stage = SwitchStage::from_code(heartbeat.stage_code())
.unwrap_or(SwitchStage::Workflow);
logging!(
warn,
Type::Cmd,
"Switch task {} watchdog timeout (profile={} stage={:?}, elapsed={:?}); cancelling",
task_id,
profile_label.as_str(),
stage,
elapsed
);
cancel_token.cancel();
}
}
}
};
if let Err(err) = driver_tx

View File

@@ -1,8 +1,8 @@
use once_cell::sync::OnceCell;
use smartstring::alias::String as SmartString;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex;
pub(super) const SWITCH_JOB_TIMEOUT: Duration = Duration::from_secs(30);
@@ -97,6 +97,7 @@ pub(super) struct SwitchRequest {
profile_id: SmartString,
notify: bool,
cancel_token: SwitchCancellation,
heartbeat: SwitchHeartbeat,
}
impl SwitchRequest {
@@ -106,6 +107,7 @@ impl SwitchRequest {
profile_id,
notify,
cancel_token: SwitchCancellation::new(),
heartbeat: SwitchHeartbeat::new(),
}
}
@@ -130,4 +132,52 @@ impl SwitchRequest {
pub(super) fn cancel_token(&self) -> &SwitchCancellation {
&self.cancel_token
}
pub(super) fn heartbeat(&self) -> &SwitchHeartbeat {
&self.heartbeat
}
}
#[derive(Debug, Clone)]
pub(super) struct SwitchHeartbeat {
last_tick_millis: Arc<AtomicU64>,
stage_code: Arc<AtomicU32>,
}
impl SwitchHeartbeat {
fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
pub(super) fn new() -> Self {
let heartbeat = Self {
last_tick_millis: Arc::new(AtomicU64::new(Self::now_millis())),
stage_code: Arc::new(AtomicU32::new(0)),
};
heartbeat.touch();
heartbeat
}
pub(super) fn touch(&self) {
self.last_tick_millis
.store(Self::now_millis(), Ordering::SeqCst);
}
pub(super) fn elapsed(&self) -> Duration {
let last = self.last_tick_millis.load(Ordering::SeqCst);
let now = Self::now_millis();
Duration::from_millis(now.saturating_sub(last))
}
pub(super) fn set_stage(&self, stage: u32) {
self.stage_code.store(stage, Ordering::SeqCst);
self.touch();
}
pub(super) fn stage_code(&self) -> u32 {
self.stage_code.load(Ordering::SeqCst)
}
}

View File

@@ -19,8 +19,8 @@ use tokio::{fs as tokio_fs, time};
mod state_machine;
pub(super) use state_machine::SwitchPanicInfo;
use state_machine::SwitchStateMachine;
use state_machine::{CONFIG_APPLY_TIMEOUT, SAVE_PROFILES_TIMEOUT, SwitchStateMachine};
pub(super) use state_machine::{SwitchPanicInfo, SwitchStage};
pub(super) async fn run_switch_job(
manager: &'static SwitchManager,
@@ -324,16 +324,40 @@ pub(super) async fn restore_previous_profile(previous: Option<SmartString>) -> C
.draft_mut()
.patch_config(restore_profiles)
.stringify_err()?;
Config::profiles().await.apply();
if time::timeout(CONFIG_APPLY_TIMEOUT, async {
Config::profiles().await.apply();
})
.await
.is_err()
{
logging!(
warn,
Type::Cmd,
"Restoring previous configuration timed out after {:?}",
CONFIG_APPLY_TIMEOUT
);
return Ok(());
}
AsyncHandler::spawn(|| async move {
if let Err(e) = profiles_save_file_safe().await {
logging!(
warn,
Type::Cmd,
"Failed to persist restored configuration asynchronously: {}",
e
);
match time::timeout(SAVE_PROFILES_TIMEOUT, profiles_save_file_safe()).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
logging!(
warn,
Type::Cmd,
"Failed to persist restored configuration asynchronously: {}",
e
);
}
Err(_) => {
logging!(
warn,
Type::Cmd,
"Persisting restored configuration timed out after {:?}",
SAVE_PROFILES_TIMEOUT
);
}
}
});
}

View File

@@ -1,6 +1,6 @@
use super::{CmdResult, describe_panic_payload, restore_previous_profile, validate_profile_yaml};
use crate::{
cmd::profile_switch::state::{SwitchManager, SwitchRequest, SwitchScope},
cmd::profile_switch::state::{SwitchHeartbeat, SwitchManager, SwitchRequest, SwitchScope},
config::{Config, IProfiles, profiles::profiles_save_file_safe},
core::{CoreManager, handle, tray::Tray},
logging,
@@ -11,6 +11,11 @@ use smartstring::alias::String as SmartString;
use std::{mem, panic::AssertUnwindSafe, time::Duration};
use tokio::{sync::MutexGuard, time};
pub(super) const CONFIG_APPLY_TIMEOUT: Duration = Duration::from_secs(5);
const TRAY_UPDATE_TIMEOUT: Duration = Duration::from_secs(3);
const REFRESH_TIMEOUT: Duration = Duration::from_secs(3);
pub(super) const SAVE_PROFILES_TIMEOUT: Duration = Duration::from_secs(5);
/// Explicit state machine for profile switching so we can reason about
/// cancellation, stale requests, and side-effects at each stage.
pub(super) struct SwitchStateMachine {
@@ -31,6 +36,37 @@ pub(crate) enum SwitchStage {
DriverTask,
}
impl SwitchStage {
pub(crate) fn as_code(self) -> u32 {
match self {
SwitchStage::Start => 0,
SwitchStage::AcquireCore => 1,
SwitchStage::Prepare => 2,
SwitchStage::ValidateTarget => 3,
SwitchStage::PatchDraft => 4,
SwitchStage::UpdateCore => 5,
SwitchStage::Finalize => 6,
SwitchStage::Workflow => 7,
SwitchStage::DriverTask => 8,
}
}
pub(crate) fn from_code(code: u32) -> Option<Self> {
Some(match code {
0 => SwitchStage::Start,
1 => SwitchStage::AcquireCore,
2 => SwitchStage::Prepare,
3 => SwitchStage::ValidateTarget,
4 => SwitchStage::PatchDraft,
5 => SwitchStage::UpdateCore,
6 => SwitchStage::Finalize,
7 => SwitchStage::Workflow,
8 => SwitchStage::DriverTask,
_ => return None,
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct SwitchPanicInfo {
pub(crate) stage: SwitchStage,
@@ -57,8 +93,13 @@ impl SwitchStateMachine {
request: Option<SwitchRequest>,
profiles: IProfiles,
) -> Self {
let heartbeat = request
.as_ref()
.map(|req| req.heartbeat().clone())
.unwrap_or_else(SwitchHeartbeat::new);
Self {
ctx: SwitchContext::new(manager, request, profiles),
ctx: SwitchContext::new(manager, request, profiles, heartbeat),
state: SwitchState::Start,
}
}
@@ -137,6 +178,7 @@ impl SwitchStateMachine {
F: FnOnce(&'a mut Self) -> Fut,
Fut: std::future::Future<Output = CmdResult<SwitchState>> + 'a,
{
self.ctx.record_stage(stage);
AssertUnwindSafe(f(self))
.catch_unwind()
.await
@@ -292,34 +334,96 @@ impl SwitchStateMachine {
"Configuration update succeeded, sequence: {}",
self.ctx.sequence()
);
Config::profiles().await.apply();
handle::Handle::refresh_clash();
match time::timeout(CONFIG_APPLY_TIMEOUT, async {
Config::profiles().await.apply();
})
.await
{
Ok(()) => {}
Err(_) => {
logging!(
warn,
Type::Cmd,
"Applying profile configuration timed out after {:?}",
CONFIG_APPLY_TIMEOUT
);
Config::profiles().await.discard();
return Ok(SwitchState::Complete(false));
}
}
if let Err(err) = Tray::global().update_tooltip().await {
if time::timeout(REFRESH_TIMEOUT, async {
handle::Handle::refresh_clash();
})
.await
.is_err()
{
logging!(
warn,
Type::Cmd,
"Failed to update tray tooltip asynchronously: {}",
err
"Refreshing Clash state timed out after {:?}",
REFRESH_TIMEOUT
);
}
if let Err(err) = Tray::global().update_menu().await {
logging!(
warn,
Type::Cmd,
"Failed to update tray menu asynchronously: {}",
err
);
match time::timeout(TRAY_UPDATE_TIMEOUT, Tray::global().update_tooltip()).await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
logging!(
warn,
Type::Cmd,
"Failed to update tray tooltip asynchronously: {}",
err
);
}
Err(_) => {
logging!(
warn,
Type::Cmd,
"Updating tray tooltip timed out after {:?}",
TRAY_UPDATE_TIMEOUT
);
}
}
if let Err(err) = profiles_save_file_safe().await {
logging!(
warn,
Type::Cmd,
"Failed to persist configuration file asynchronously: {}",
err
);
match time::timeout(TRAY_UPDATE_TIMEOUT, Tray::global().update_menu()).await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
logging!(
warn,
Type::Cmd,
"Failed to update tray menu asynchronously: {}",
err
);
}
Err(_) => {
logging!(
warn,
Type::Cmd,
"Updating tray menu timed out after {:?}",
TRAY_UPDATE_TIMEOUT
);
}
}
match time::timeout(SAVE_PROFILES_TIMEOUT, profiles_save_file_safe()).await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
logging!(
warn,
Type::Cmd,
"Failed to persist configuration file asynchronously: {}",
err
);
}
Err(_) => {
logging!(
warn,
Type::Cmd,
"Persisting configuration file timed out after {:?}",
SAVE_PROFILES_TIMEOUT
);
}
}
if let Some(current) = self.ctx.new_profile_for_event.clone() {
@@ -387,6 +491,10 @@ struct SwitchContext {
new_profile_for_event: Option<SmartString>,
switch_scope: Option<SwitchScope<'static>>,
core_guard: Option<MutexGuard<'static, ()>>,
heartbeat: SwitchHeartbeat,
task_id: Option<u64>,
profile_label: SmartString,
active_stage: SwitchStage,
}
impl SwitchContext {
@@ -394,7 +502,15 @@ impl SwitchContext {
manager: &'static SwitchManager,
request: Option<SwitchRequest>,
profiles: IProfiles,
heartbeat: SwitchHeartbeat,
) -> Self {
let task_id = request.as_ref().map(|req| req.task_id());
let profile_label = request
.as_ref()
.map(|req| req.profile_id().clone())
.or_else(|| profiles.current.clone())
.unwrap_or_else(|| SmartString::from("unknown"));
heartbeat.touch();
Self {
manager,
request,
@@ -405,6 +521,10 @@ impl SwitchContext {
new_profile_for_event: None,
switch_scope: None,
core_guard: None,
heartbeat,
task_id,
profile_label,
active_stage: SwitchStage::Start,
}
}
@@ -462,16 +582,42 @@ impl SwitchContext {
}
fn sequence(&self) -> u64 {
match self.sequence {
Some(sequence) => sequence,
None => {
logging!(
warn,
Type::Cmd,
"Sequence unexpectedly missing in switch context; defaulting to 0"
);
0
}
self.sequence.unwrap_or_else(|| {
logging!(
warn,
Type::Cmd,
"Sequence unexpectedly missing in switch context; defaulting to 0"
);
0
})
}
fn record_stage(&mut self, stage: SwitchStage) {
let since_last = self.heartbeat.elapsed();
let previous = self.active_stage;
self.active_stage = stage;
self.heartbeat.set_stage(stage.as_code());
match self.task_id {
Some(task_id) => logging!(
debug,
Type::Cmd,
"Switch task {} (profile={}) transitioned {:?} -> {:?} after {:?}",
task_id,
self.profile_label,
previous,
stage,
since_last
),
None => logging!(
debug,
Type::Cmd,
"Profile patch {} transitioned {:?} -> {:?} after {:?}",
self.profile_label,
previous,
stage,
since_last
),
}
}
}