refactor(profile): make switch driver fully async and handle panics safely
This commit is contained in:
@@ -20,6 +20,7 @@ use once_cell::sync::OnceCell;
|
||||
use serde_yaml_ng as serde_yaml;
|
||||
use smartstring::alias::String;
|
||||
use std::{
|
||||
any::Any,
|
||||
collections::VecDeque,
|
||||
fs,
|
||||
panic::AssertUnwindSafe,
|
||||
@@ -106,6 +107,8 @@ const SWITCH_QUEUE_CAPACITY: usize = 32;
|
||||
static CURRENT_REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
|
||||
static CURRENT_SWITCHING_PROFILE: AtomicBool = AtomicBool::new(false);
|
||||
static SWITCH_TASK_SEQUENCE: AtomicU64 = AtomicU64::new(0);
|
||||
const SWITCH_JOB_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SWITCH_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct SwitchRequest {
|
||||
@@ -230,166 +233,190 @@ fn start_switch_job(
|
||||
mutex: &'static Mutex<()>,
|
||||
request: SwitchRequest,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
AsyncHandler::spawn(move || async move {
|
||||
let success = run_switch_job(mutex, &request).await;
|
||||
let task_id = request.task_id;
|
||||
let profile_id = request.profile_id.clone();
|
||||
let notify = request.notify;
|
||||
if let Err(err) = validate_switch_request(request.task_id, &profile_id).await {
|
||||
if let Err(err) = driver_tx
|
||||
.send(SwitchDriverMessage::Completion { request, success })
|
||||
.await
|
||||
{
|
||||
logging!(
|
||||
warn,
|
||||
error,
|
||||
Type::Cmd,
|
||||
"Validation failed for switch task {} -> {}: {}",
|
||||
request.task_id,
|
||||
"Failed to push switch completion to driver (task={} profile={}): {}",
|
||||
task_id,
|
||||
profile_id,
|
||||
err
|
||||
);
|
||||
handle::Handle::notice_message("config_validate::error", err);
|
||||
handle::Handle::notify_profile_switch_finished(
|
||||
profile_id.clone(),
|
||||
false,
|
||||
notify,
|
||||
request.task_id,
|
||||
);
|
||||
let _ = driver_tx
|
||||
.send(SwitchDriverMessage::Completion {
|
||||
request,
|
||||
success: false,
|
||||
})
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
logging!(
|
||||
info,
|
||||
Type::Cmd,
|
||||
"Starting switch task {} for profile {} (notify={})",
|
||||
request.task_id,
|
||||
profile_id,
|
||||
notify
|
||||
);
|
||||
let success = match process_switch_task_with_guard(
|
||||
mutex,
|
||||
request.task_id,
|
||||
profile_id.clone(),
|
||||
notify,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
logging!(
|
||||
error,
|
||||
Type::Cmd,
|
||||
"Switch task execution failed ({}): {}",
|
||||
profile_id,
|
||||
err
|
||||
);
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
let _ = driver_tx
|
||||
.send(SwitchDriverMessage::Completion { request, success })
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_switch_task_with_guard(
|
||||
mutex: &'static Mutex<()>,
|
||||
task_id: u64,
|
||||
profile_index: String,
|
||||
notify_success: bool,
|
||||
) -> CmdResult<bool> {
|
||||
async fn run_switch_job(mutex: &'static Mutex<()>, request: &SwitchRequest) -> bool {
|
||||
let profile_id = request.profile_id.clone();
|
||||
let task_id = request.task_id;
|
||||
let notify = request.notify;
|
||||
|
||||
if let Err(err) = validate_switch_request(task_id, &profile_id).await {
|
||||
logging!(
|
||||
warn,
|
||||
Type::Cmd,
|
||||
"Validation failed for switch task {} -> {}: {}",
|
||||
task_id,
|
||||
profile_id,
|
||||
err
|
||||
);
|
||||
handle::Handle::notice_message("config_validate::error", err.clone());
|
||||
handle::Handle::notify_profile_switch_finished(profile_id.clone(), false, notify, task_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
logging!(
|
||||
info,
|
||||
Type::Cmd,
|
||||
"Processing queued profile switch: {} (notify={})",
|
||||
profile_index,
|
||||
notify_success
|
||||
"Starting switch task {} for profile {} (notify={})",
|
||||
task_id,
|
||||
profile_id,
|
||||
notify
|
||||
);
|
||||
|
||||
let _guard = mutex.lock().await;
|
||||
let profile_for_patch = profile_id.clone();
|
||||
let pipeline = async move {
|
||||
let _guard = mutex.lock().await;
|
||||
patch_profiles_config_internal(IProfiles {
|
||||
current: Some(profile_for_patch),
|
||||
items: None,
|
||||
})
|
||||
.await
|
||||
};
|
||||
|
||||
let switch_result = AssertUnwindSafe(patch_profiles_config_internal(IProfiles {
|
||||
current: Some(profile_index.clone()),
|
||||
items: None,
|
||||
}))
|
||||
.catch_unwind()
|
||||
.await;
|
||||
|
||||
let switch_result = match switch_result {
|
||||
Ok(inner) => inner,
|
||||
match tokio::time::timeout(
|
||||
SWITCH_JOB_TIMEOUT,
|
||||
AssertUnwindSafe(pipeline).catch_unwind(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(_) => {
|
||||
logging!(
|
||||
error,
|
||||
Type::Cmd,
|
||||
"Panic occurred during profile switch: {}",
|
||||
profile_index
|
||||
"Profile switch task {} timed out after {:?}",
|
||||
task_id,
|
||||
SWITCH_JOB_TIMEOUT
|
||||
);
|
||||
handle::Handle::notice_message(
|
||||
"config_validate::panic",
|
||||
format!("profile switch panic: {}", profile_index),
|
||||
"config_validate::error",
|
||||
format!("profile switch timed out: {}", profile_id),
|
||||
);
|
||||
handle::Handle::notify_profile_switch_finished(
|
||||
profile_index.clone(),
|
||||
profile_id.clone(),
|
||||
false,
|
||||
notify_success,
|
||||
notify,
|
||||
task_id,
|
||||
);
|
||||
return Ok(false);
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
let success = match switch_result {
|
||||
Ok(val) => val,
|
||||
Err(err) => {
|
||||
Ok(Err(panic_payload)) => {
|
||||
let panic_message = describe_panic_payload(panic_payload.as_ref());
|
||||
logging!(
|
||||
error,
|
||||
Type::Cmd,
|
||||
"Profile switch failed ({}): {}",
|
||||
profile_index,
|
||||
err
|
||||
"Panic captured during profile switch task {} ({}): {}",
|
||||
task_id,
|
||||
profile_id,
|
||||
panic_message
|
||||
);
|
||||
handle::Handle::notice_message(
|
||||
"config_validate::panic",
|
||||
format!("profile switch panic: {}", profile_id),
|
||||
);
|
||||
handle::Handle::notice_message("config_validate::error", err.clone());
|
||||
handle::Handle::notify_profile_switch_finished(
|
||||
profile_index.clone(),
|
||||
profile_id.clone(),
|
||||
false,
|
||||
notify_success,
|
||||
notify,
|
||||
task_id,
|
||||
);
|
||||
return Ok(false);
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
handle::Handle::notify_profile_switch_finished(
|
||||
profile_index.clone(),
|
||||
success,
|
||||
notify_success,
|
||||
task_id,
|
||||
);
|
||||
|
||||
if let Err(err) = handle::Handle::mihomo().await.close_all_connections().await {
|
||||
logging!(
|
||||
warn,
|
||||
Type::Cmd,
|
||||
"Failed to close connections after profile switch ({}): {}",
|
||||
profile_index,
|
||||
err
|
||||
);
|
||||
Ok(Ok(result)) => match result {
|
||||
Ok(success) => {
|
||||
handle::Handle::notify_profile_switch_finished(
|
||||
profile_id.clone(),
|
||||
success,
|
||||
notify,
|
||||
task_id,
|
||||
);
|
||||
close_connections_after_switch(&profile_id).await;
|
||||
if notify && success {
|
||||
handle::Handle::notice_message("info", "Profile Switched");
|
||||
}
|
||||
logging!(
|
||||
info,
|
||||
Type::Cmd,
|
||||
"Profile switch task finished: {} (success={})",
|
||||
profile_id,
|
||||
success
|
||||
);
|
||||
success
|
||||
}
|
||||
Err(err) => {
|
||||
logging!(
|
||||
error,
|
||||
Type::Cmd,
|
||||
"Profile switch failed ({}): {}",
|
||||
profile_id,
|
||||
err
|
||||
);
|
||||
handle::Handle::notice_message("config_validate::error", err.clone());
|
||||
handle::Handle::notify_profile_switch_finished(
|
||||
profile_id.clone(),
|
||||
false,
|
||||
notify,
|
||||
task_id,
|
||||
);
|
||||
false
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if notify_success && success {
|
||||
handle::Handle::notice_message("info", "Profile Switched");
|
||||
async fn close_connections_after_switch(profile_id: &str) {
|
||||
match tokio::time::timeout(SWITCH_CLEANUP_TIMEOUT, async {
|
||||
handle::Handle::mihomo().await.close_all_connections().await
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(err)) => {
|
||||
logging!(
|
||||
warn,
|
||||
Type::Cmd,
|
||||
"Failed to close connections after profile switch ({}): {}",
|
||||
profile_id,
|
||||
err
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
logging!(
|
||||
warn,
|
||||
Type::Cmd,
|
||||
"Closing connections after profile switch ({}) timed out after {:?}",
|
||||
profile_id,
|
||||
SWITCH_CLEANUP_TIMEOUT
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logging!(
|
||||
info,
|
||||
Type::Cmd,
|
||||
"Profile switch task finished: {} (success={})",
|
||||
profile_index,
|
||||
success
|
||||
);
|
||||
|
||||
Ok(success)
|
||||
fn describe_panic_payload(payload: &(dyn Any + Send)) -> String {
|
||||
if let Some(message) = payload.downcast_ref::<&str>() {
|
||||
(*message).to_string().into()
|
||||
} else if let Some(message) = payload.downcast_ref::<std::string::String>() {
|
||||
message.clone().into()
|
||||
} else {
|
||||
"unknown panic".into()
|
||||
}
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
|
||||
Reference in New Issue
Block a user