refactor(profile-switch): serialize switches with async queue and enrich frontend events

This commit is contained in:
Slinetrac
2025-10-25 18:04:32 +08:00
Unverified
parent 49926b4823
commit a889d0b1e5
4 changed files with 363 additions and 160 deletions

View File

@@ -23,9 +23,14 @@ use std::{
sync::atomic::{AtomicBool, AtomicU64, Ordering},
time::Duration,
};
use tokio::sync::Mutex;
use tokio::sync::{
Mutex,
mpsc::{self, error::TrySendError},
};
static SWITCH_MUTEX: OnceCell<Mutex<()>> = OnceCell::new();
static SWITCH_QUEUE: OnceCell<mpsc::Sender<(String, bool)>> = OnceCell::new();
const SWITCH_QUEUE_CAPACITY: usize = 32;
// 全局请求序列号跟踪,用于避免队列化执行
static CURRENT_REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
static CURRENT_SWITCHING_PROFILE: AtomicBool = AtomicBool::new(false);
@@ -45,6 +50,113 @@ impl Drop for SwitchScope {
}
}
fn switch_queue_sender() -> &'static mpsc::Sender<(String, bool)> {
SWITCH_QUEUE.get_or_init(|| {
let (tx, mut rx) = mpsc::channel::<(String, bool)>(SWITCH_QUEUE_CAPACITY);
tokio::spawn(async move {
let mutex = SWITCH_MUTEX.get_or_init(|| Mutex::new(()));
while let Some((profile, notify)) = rx.recv().await {
let _guard = mutex.lock().await;
if let Err(err) = process_switch_task(profile.clone(), notify).await {
logging!(
error,
Type::Cmd,
"Failed to process profile switch task ({}): {}",
profile,
err
);
}
}
});
tx
})
}
async fn process_switch_task(profile_index: String, notify_success: bool) -> CmdResult<()> {
logging!(
info,
Type::Cmd,
"Processing queued profile switch: {} (notify={})",
profile_index,
notify_success
);
let switch_result = AssertUnwindSafe(patch_profiles_config_internal(IProfiles {
current: Some(profile_index.clone()),
items: None,
}))
.catch_unwind()
.await;
let switch_result = match switch_result {
Ok(inner) => inner,
Err(_) => {
logging!(
error,
Type::Cmd,
"Panic occurred during profile switch: {}",
profile_index
);
handle::Handle::notice_message(
"config_validate::panic",
format!("profile switch panic: {}", profile_index),
);
handle::Handle::notify_profile_switch_finished(
profile_index.clone(),
false,
notify_success,
);
return Ok(());
}
};
let success = match switch_result {
Ok(val) => val,
Err(err) => {
logging!(
error,
Type::Cmd,
"Profile switch failed ({}): {}",
profile_index,
err
);
handle::Handle::notice_message("config_validate::error", err.clone());
handle::Handle::notify_profile_switch_finished(
profile_index.clone(),
false,
notify_success,
);
return Ok(());
}
};
handle::Handle::notify_profile_switch_finished(profile_index.clone(), success, notify_success);
if let Err(err) = handle::Handle::mihomo().await.close_all_connections().await {
logging!(
warn,
Type::Cmd,
"Failed to close connections after profile switch ({}): {}",
profile_index,
err
);
}
if notify_success && success {
handle::Handle::notice_message("info", "Profile Switched");
}
logging!(
info,
Type::Cmd,
"Profile switch task finished: {} (success={})",
profile_index,
success
);
Ok(())
}
#[tauri::command]
pub async fn get_profiles() -> CmdResult<IProfiles> {
// 策略1: 尝试快速获取latest数据
@@ -593,51 +705,46 @@ pub async fn patch_profiles_config_by_profile_index(profile_index: String) -> Cm
#[tauri::command]
pub async fn switch_profile(profile_index: String, notify_success: bool) -> CmdResult<bool> {
logging!(info, Type::Cmd, "请求切换配置到: {}", &profile_index);
let mutex = SWITCH_MUTEX.get_or_init(|| Mutex::new(()));
let _guard = mutex.lock().await;
let switch_result = AssertUnwindSafe(patch_profiles_config_internal(IProfiles {
current: Some(profile_index.clone()),
items: None,
}))
.catch_unwind()
.await;
let result = match switch_result {
Ok(inner) => inner?,
Err(_) => {
logging!(
info,
Type::Cmd,
"Queue profile switch to: {}",
&profile_index
);
let sender = switch_queue_sender();
match sender.try_send((profile_index.clone(), notify_success)) {
Ok(_) => Ok(true),
Err(TrySendError::Full(task)) => {
logging!(
warn,
Type::Cmd,
"Profile switch queue is full, waiting for space: {}",
&profile_index
);
match sender.send(task).await {
Ok(_) => Ok(true),
Err(err) => {
logging!(
error,
Type::Cmd,
"Profile switch queue closed while waiting ({}): {}",
&profile_index,
err
);
Err("switch profile queue unavailable".into())
}
}
}
Err(TrySendError::Closed(_)) => {
logging!(
error,
Type::Cmd,
"切换配置过程中发生panic目标: {}",
profile_index
"Profile switch queue is closed, cannot enqueue: {}",
&profile_index
);
handle::Handle::notice_message(
"config_validate::panic",
format!("profile switch panic: {}", profile_index),
);
handle::Handle::notify_profile_switch_finished(profile_index.clone(), false);
return Ok(false);
Err("switch profile queue unavailable".into())
}
};
if result {
handle::Handle::notify_profile_switch_finished(profile_index.clone(), true);
} else {
handle::Handle::notify_profile_switch_finished(profile_index.clone(), false);
}
if let Err(err) = handle::Handle::mihomo().await.close_all_connections().await {
logging!(warn, Type::Cmd, "切换配置后关闭连接失败: {}", err);
}
if notify_success && result {
handle::Handle::notice_message("info", "Profile Switched");
}
Ok(result)
}
/// 修改某个profile item的

View File

@@ -100,10 +100,11 @@ impl Handle {
});
}
pub fn notify_profile_switch_finished(profile_id: String, success: bool) {
pub fn notify_profile_switch_finished(profile_id: String, success: bool, notify: bool) {
Self::send_event(FrontendEvent::ProfileSwitchFinished {
profile_id,
success,
notify,
});
}

View File

@@ -20,14 +20,34 @@ pub enum FrontendEvent {
RefreshClash,
RefreshVerge,
RefreshProxy,
ProxiesUpdated { payload: serde_json::Value },
NoticeMessage { status: String, message: String },
ProfileChanged { current_profile_id: String },
ProfileSwitchFinished { profile_id: String, success: bool },
TimerUpdated { profile_index: String },
ProfileUpdateStarted { uid: String },
ProfileUpdateCompleted { uid: String },
RustPanic { message: String, location: String },
ProxiesUpdated {
payload: serde_json::Value,
},
NoticeMessage {
status: String,
message: String,
},
ProfileChanged {
current_profile_id: String,
},
ProfileSwitchFinished {
profile_id: String,
success: bool,
notify: bool,
},
TimerUpdated {
profile_index: String,
},
ProfileUpdateStarted {
uid: String,
},
ProfileUpdateCompleted {
uid: String,
},
RustPanic {
message: String,
location: String,
},
}
#[derive(Debug, Default)]
@@ -172,9 +192,10 @@ impl NotificationSystem {
FrontendEvent::ProfileSwitchFinished {
profile_id,
success,
notify,
} => (
"profile-switch-finished",
Ok(json!({ "profileId": profile_id, "success": success })),
Ok(json!({ "profileId": profile_id, "success": success, "notify": notify })),
),
FrontendEvent::TimerUpdated { profile_index } => {
("verge://timer-updated", Ok(json!(profile_index)))