refactor(profile_switch): centralize state management and add cancellation flow

- Introduced SwitchManager in state.rs to unify mutex, sequencing, and SwitchScope handling.
- Added SwitchCancellation and SwitchRequest wrappers to encapsulate cancel tokens and notifications.
- Updated driver to allocate task IDs via SwitchManager, cancel old tokens, and queue next jobs in order.
- Updated workflow to check cancellation and sequence at each phase, replacing global flags with manager APIs.
This commit is contained in:
Slinetrac
2025-10-26 23:02:26 +08:00
Unverified
parent 32ff1b759f
commit f90a8a4534
3 changed files with 449 additions and 298 deletions

View File

@@ -1,15 +1,13 @@
use super::{
CmdResult,
state::{SWITCH_MUTEX, SWITCH_TASK_SEQUENCE, SwitchRequest},
state::{SwitchCancellation, SwitchManager, SwitchRequest, manager},
workflow,
};
use crate::{logging, process::AsyncHandler, utils::logging::Type};
use crate::{logging, utils::logging::Type};
use once_cell::sync::OnceCell;
use smartstring::alias::String as SmartString;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{
Mutex,
mpsc::{self, error::TrySendError},
oneshot,
};
@@ -21,6 +19,7 @@ static SWITCH_QUEUE: OnceCell<mpsc::Sender<SwitchDriverMessage>> = OnceCell::new
struct SwitchDriverState {
active: Option<SwitchRequest>,
queue: VecDeque<SwitchRequest>,
latest_tokens: HashMap<SmartString, SwitchCancellation>,
}
#[derive(Debug)]
@@ -39,23 +38,28 @@ pub(super) async fn switch_profile(
profile_index: impl Into<SmartString>,
notify_success: bool,
) -> CmdResult<bool> {
let profile_index: SmartString = profile_index.into();
switch_profile_impl(profile_index.into(), notify_success).await
}
async fn switch_profile_impl(profile_index: SmartString, notify_success: bool) -> CmdResult<bool> {
let manager = manager();
let sender = switch_driver_sender();
let task_id = SWITCH_TASK_SEQUENCE.fetch_add(1, Ordering::SeqCst) + 1;
let request = SwitchRequest::new(
manager.next_task_id(),
profile_index.clone(),
notify_success,
);
logging!(
info,
Type::Cmd,
"Queue profile switch task {} -> {} (notify={})",
task_id,
request.task_id(),
profile_index,
notify_success
);
let request = SwitchRequest {
task_id,
profile_id: profile_index.clone(),
notify: notify_success,
};
let (tx, rx) = oneshot::channel();
match sender.try_send(SwitchDriverMessage::Request {
@@ -125,7 +129,7 @@ fn switch_driver_sender() -> &'static mpsc::Sender<SwitchDriverMessage> {
let (tx, mut rx) = mpsc::channel::<SwitchDriverMessage>(SWITCH_QUEUE_CAPACITY);
let driver_tx = tx.clone();
tokio::spawn(async move {
let mutex = SWITCH_MUTEX.get_or_init(|| Mutex::new(()));
let manager = manager();
let mut state = SwitchDriverState::default();
while let Some(message) = rx.recv().await {
match message {
@@ -133,63 +137,10 @@ fn switch_driver_sender() -> &'static mpsc::Sender<SwitchDriverMessage> {
request,
respond_to,
} => {
let accepted = true;
let mut responder = Some(respond_to);
if let Some(active) = &mut state.active
&& active.profile_id == request.profile_id
{
active.notify |= request.notify;
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
continue;
}
if let Some(existing) = state
.queue
.iter_mut()
.find(|queued| queued.profile_id == request.profile_id)
{
existing.notify |= request.notify;
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
continue;
}
if state.active.is_none() {
state.active = Some(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
start_switch_job(driver_tx.clone(), mutex, request);
} else {
state.queue.push_back(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
}
handle_enqueue(&mut state, request, respond_to, driver_tx.clone(), manager);
}
SwitchDriverMessage::Completion { request, success } => {
logging!(
info,
Type::Cmd,
"Switch task {} completed (success={})",
request.task_id,
success
);
if let Some(active) = &state.active
&& active.task_id == request.task_id
{
state.active = None;
}
if state.active.is_none()
&& let Some(next) = state.queue.pop_front()
{
state.active = Some(next.clone());
start_switch_job(driver_tx.clone(), mutex, next);
}
handle_completion(&mut state, request, success, driver_tx.clone(), manager);
}
}
}
@@ -198,25 +149,111 @@ fn switch_driver_sender() -> &'static mpsc::Sender<SwitchDriverMessage> {
})
}
fn handle_enqueue(
state: &mut SwitchDriverState,
request: SwitchRequest,
respond_to: oneshot::Sender<bool>,
driver_tx: mpsc::Sender<SwitchDriverMessage>,
manager: &'static SwitchManager,
) {
let mut responder = Some(respond_to);
let accepted = true;
let profile_key = request.profile_id().clone();
if let Some(previous) = state
.latest_tokens
.insert(profile_key.clone(), request.cancel_token().clone())
{
previous.cancel();
}
if let Some(active) = state.active.as_mut()
&& active.profile_id() == &profile_key
{
active.cancel_token().cancel();
active.merge_notify(request.notify());
state
.queue
.retain(|queued| queued.profile_id() != &profile_key);
state.queue.push_front(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
return;
}
state
.queue
.retain(|queued| queued.profile_id() != &profile_key);
if state.active.is_none() {
state.active = Some(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
start_switch_job(driver_tx, manager, request);
} else {
state.queue.push_back(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
}
}
fn handle_completion(
state: &mut SwitchDriverState,
request: SwitchRequest,
success: bool,
driver_tx: mpsc::Sender<SwitchDriverMessage>,
manager: &'static SwitchManager,
) {
logging!(
info,
Type::Cmd,
"Switch task {} completed (success={})",
request.task_id(),
success
);
if let Some(active) = state.active.as_ref()
&& active.task_id() == request.task_id()
{
state.active = None;
}
if let Some(latest) = state.latest_tokens.get(request.profile_id())
&& latest.same_token(request.cancel_token())
{
state.latest_tokens.remove(request.profile_id());
}
if state.active.is_none()
&& let Some(next) = state.queue.pop_front()
{
state.active = Some(next.clone());
start_switch_job(driver_tx, manager, next);
}
}
fn start_switch_job(
driver_tx: mpsc::Sender<SwitchDriverMessage>,
mutex: &'static Mutex<()>,
manager: &'static SwitchManager,
request: SwitchRequest,
) {
AsyncHandler::spawn(move || async move {
let success = workflow::run_switch_job(mutex, &request).await;
let task_id = request.task_id;
let profile_id = request.profile_id.clone();
let completion_request = request.clone();
tokio::spawn(async move {
let success = workflow::run_switch_job(manager, request).await;
if let Err(err) = driver_tx
.send(SwitchDriverMessage::Completion { request, success })
.send(SwitchDriverMessage::Completion {
request: completion_request,
success,
})
.await
{
logging!(
error,
Type::Cmd,
"Failed to push switch completion to driver (task={} profile={}): {}",
task_id,
profile_id,
"Failed to push switch completion to driver: {}",
err
);
}

View File

@@ -1,35 +1,133 @@
use once_cell::sync::OnceCell;
use smartstring::alias::String;
use smartstring::alias::String as SmartString;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use tokio::sync::Mutex;
pub(super) static SWITCH_MUTEX: OnceCell<Mutex<()>> = OnceCell::new();
pub(super) static CURRENT_REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
pub(super) static CURRENT_SWITCHING_PROFILE: AtomicBool = AtomicBool::new(false);
pub(super) static SWITCH_TASK_SEQUENCE: AtomicU64 = AtomicU64::new(0);
pub(super) const SWITCH_JOB_TIMEOUT: Duration = Duration::from_secs(30);
pub(super) const SWITCH_CLEANUP_TIMEOUT: Duration = Duration::from_secs(5);
static SWITCH_MANAGER: OnceCell<SwitchManager> = OnceCell::new();
pub(super) fn manager() -> &'static SwitchManager {
SWITCH_MANAGER.get_or_init(SwitchManager::default)
}
#[derive(Debug)]
pub(super) struct SwitchManager {
core_mutex: Mutex<()>,
request_sequence: AtomicU64,
switching: AtomicBool,
task_sequence: AtomicU64,
}
impl Default for SwitchManager {
fn default() -> Self {
Self {
core_mutex: Mutex::new(()),
request_sequence: AtomicU64::new(0),
switching: AtomicBool::new(false),
task_sequence: AtomicU64::new(0),
}
}
}
impl SwitchManager {
pub(super) fn core_mutex(&self) -> &Mutex<()> {
&self.core_mutex
}
pub(super) fn next_task_id(&self) -> u64 {
self.task_sequence.fetch_add(1, Ordering::SeqCst) + 1
}
pub(super) fn next_request_sequence(&self) -> u64 {
self.request_sequence.fetch_add(1, Ordering::SeqCst) + 1
}
pub(super) fn latest_request_sequence(&self) -> u64 {
self.request_sequence.load(Ordering::SeqCst)
}
pub(super) fn begin_switch(&'static self) -> SwitchScope<'static> {
self.switching.store(true, Ordering::SeqCst);
SwitchScope { manager: self }
}
pub(super) fn is_switching(&self) -> bool {
self.switching.load(Ordering::SeqCst)
}
}
pub(super) struct SwitchScope<'a> {
manager: &'a SwitchManager,
}
impl Drop for SwitchScope<'_> {
fn drop(&mut self) {
self.manager.switching.store(false, Ordering::SeqCst);
}
}
#[derive(Debug, Clone)]
pub(super) struct SwitchCancellation(Arc<AtomicBool>);
impl SwitchCancellation {
pub(super) fn new() -> Self {
Self(Arc::new(AtomicBool::new(false)))
}
pub(super) fn cancel(&self) {
self.0.store(true, Ordering::SeqCst);
}
pub(super) fn is_cancelled(&self) -> bool {
self.0.load(Ordering::SeqCst)
}
pub(super) fn same_token(&self, other: &SwitchCancellation) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
#[derive(Debug, Clone)]
pub(super) struct SwitchRequest {
pub(super) task_id: u64,
pub(super) profile_id: String,
pub(super) notify: bool,
task_id: u64,
profile_id: SmartString,
notify: bool,
cancel_token: SwitchCancellation,
}
pub(super) struct SwitchScope;
impl SwitchRequest {
pub(super) fn new(task_id: u64, profile_id: SmartString, notify: bool) -> Self {
Self {
task_id,
profile_id,
notify,
cancel_token: SwitchCancellation::new(),
}
}
impl SwitchScope {
pub(super) fn begin() -> Self {
CURRENT_SWITCHING_PROFILE.store(true, Ordering::SeqCst);
Self
}
}
impl Drop for SwitchScope {
fn drop(&mut self) {
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
pub(super) fn task_id(&self) -> u64 {
self.task_id
}
pub(super) fn profile_id(&self) -> &SmartString {
&self.profile_id
}
pub(super) fn notify(&self) -> bool {
self.notify
}
pub(super) fn merge_notify(&mut self, notify: bool) {
if notify {
self.notify = true;
}
}
pub(super) fn cancel_token(&self) -> &SwitchCancellation {
&self.cancel_token
}
}

View File

@@ -1,9 +1,6 @@
use super::{
CmdResult,
state::{
CURRENT_REQUEST_SEQUENCE, CURRENT_SWITCHING_PROFILE, SWITCH_CLEANUP_TIMEOUT,
SWITCH_JOB_TIMEOUT, SWITCH_MUTEX, SwitchRequest, SwitchScope,
},
state::{SWITCH_CLEANUP_TIMEOUT, SWITCH_JOB_TIMEOUT, SwitchManager, SwitchRequest, manager},
validation::validate_switch_request,
};
use crate::cmd::StringifyErr;
@@ -16,15 +13,35 @@ use crate::{
};
use futures::FutureExt;
use serde_yaml_ng as serde_yaml;
use std::{any::Any, panic::AssertUnwindSafe, sync::atomic::Ordering, time::Duration};
use tokio::{fs as tokio_fs, sync::Mutex, time};
use smartstring::alias::String as SmartString;
use std::{any::Any, panic::AssertUnwindSafe, time::Duration};
use tokio::{fs as tokio_fs, time};
pub(super) async fn run_switch_job(mutex: &'static Mutex<()>, request: &SwitchRequest) -> bool {
let profile_id = request.profile_id.clone();
let task_id = request.task_id;
let notify = request.notify;
pub(super) async fn run_switch_job(
manager: &'static SwitchManager,
request: SwitchRequest,
) -> bool {
if request.cancel_token().is_cancelled() {
logging!(
info,
Type::Cmd,
"Switch task {} cancelled before validation",
request.task_id()
);
handle::Handle::notify_profile_switch_finished(
request.profile_id().clone(),
false,
request.notify(),
request.task_id(),
);
return false;
}
if let Err(err) = validate_switch_request(task_id, &profile_id).await {
let profile_id = request.profile_id().clone();
let task_id = request.task_id();
let notify = request.notify();
if let Err(err) = validate_switch_request(task_id, profile_id.as_str()).await {
logging!(
warn,
Type::Cmd,
@@ -47,13 +64,17 @@ pub(super) async fn run_switch_job(mutex: &'static Mutex<()>, request: &SwitchRe
notify
);
let profile_for_patch = profile_id.clone();
let request_clone = request.clone();
let pipeline = async move {
let _guard = mutex.lock().await;
patch_profiles_config_internal(IProfiles {
current: Some(profile_for_patch),
items: None,
})
let _core_guard = manager.core_mutex().lock().await;
patch_profiles_config_internal(
manager,
Some(&request_clone),
IProfiles {
current: Some(request_clone.profile_id().clone()),
items: None,
},
)
.await
};
@@ -148,13 +169,17 @@ pub(super) async fn run_switch_job(mutex: &'static Mutex<()>, request: &SwitchRe
}
pub(super) async fn patch_profiles_config(profiles: IProfiles) -> CmdResult<bool> {
let mutex = SWITCH_MUTEX.get_or_init(|| Mutex::new(()));
let _guard = mutex.lock().await;
patch_profiles_config_internal(profiles).await
let manager = manager();
let _core_guard = manager.core_mutex().lock().await;
patch_profiles_config_internal(manager, None, profiles).await
}
async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool> {
if CURRENT_SWITCHING_PROFILE.load(Ordering::SeqCst) {
async fn patch_profiles_config_internal(
manager: &'static SwitchManager,
request_ctx: Option<&SwitchRequest>,
profiles: IProfiles,
) -> CmdResult<bool> {
if manager.is_switching() {
logging!(
info,
Type::Cmd,
@@ -162,10 +187,22 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
);
return Ok(false);
}
let _switch_guard = SwitchScope::begin();
let current_sequence = CURRENT_REQUEST_SEQUENCE.fetch_add(1, Ordering::SeqCst) + 1;
let target_profile = profiles.current.clone();
if let Some(req) = request_ctx
&& req.cancel_token().is_cancelled()
{
return Ok(false);
}
let _switch_guard = manager.begin_switch();
let current_sequence = manager.next_request_sequence();
let mut target_profile = profiles.current.clone();
if target_profile.is_none()
&& let Some(req) = request_ctx
{
target_profile = Some(req.profile_id().clone());
}
logging!(
info,
@@ -175,134 +212,46 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
target_profile
);
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
if current_sequence < manager.latest_request_sequence() {
logging!(
info,
Type::Cmd,
"Detected a newer request after acquiring the lock (sequence: {} < {}), abandoning current request",
current_sequence,
latest_sequence
manager.latest_request_sequence()
);
return Ok(false);
}
let current_profile = Config::profiles().await.latest_ref().current.clone();
let current_profile = {
let profiles_guard = Config::profiles().await;
profiles_guard.latest_ref().current.clone()
};
logging!(info, Type::Cmd, "Current profile: {:?}", current_profile);
if let Some(new_profile) = profiles.current.as_ref()
if let Some(new_profile) = target_profile.as_ref()
&& current_profile.as_ref() != Some(new_profile)
{
logging!(info, Type::Cmd, "Switching to new profile: {}", new_profile);
let config_file_result = {
let profiles_config = Config::profiles().await;
let profiles_data = profiles_config.latest_ref();
match profiles_data.get_item(new_profile) {
Ok(item) => {
if let Some(file) = &item.file {
let path = dirs::app_profiles_dir().map(|dir| dir.join(file.as_str()));
path.ok()
} else {
None
}
}
Err(e) => {
logging!(
error,
Type::Cmd,
"Failed to load target profile metadata: {}",
e
);
None
}
}
};
if let Some(req) = request_ctx
&& req.cancel_token().is_cancelled()
{
return Ok(false);
}
if let Some(file_path) = config_file_result {
if !file_path.exists() {
logging!(
error,
Type::Cmd,
"Target profile file does not exist: {}",
file_path.display()
);
handle::Handle::notice_message(
"config_validate::file_not_found",
format!("{}", file_path.display()),
);
return Ok(false);
}
let file_read_result =
time::timeout(Duration::from_secs(5), tokio_fs::read_to_string(&file_path)).await;
match file_read_result {
Ok(Ok(content)) => {
let yaml_parse_result = AsyncHandler::spawn_blocking(move || {
serde_yaml::from_str::<serde_yaml::Value>(&content)
})
.await;
match yaml_parse_result {
Ok(Ok(_)) => {
logging!(info, Type::Cmd, "Target profile YAML syntax is valid");
}
Ok(Err(err)) => {
let error_msg = format!(" {err}");
logging!(
error,
Type::Cmd,
"Target profile contains YAML syntax errors: {}",
error_msg
);
handle::Handle::notice_message(
"config_validate::yaml_syntax_error",
error_msg.clone(),
);
return Ok(false);
}
Err(join_err) => {
let error_msg = format!("YAML parsing task failed: {join_err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::yaml_parse_error",
error_msg.clone(),
);
return Ok(false);
}
}
}
Ok(Err(err)) => {
let error_msg = format!("Failed to read target profile file: {err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::file_read_error",
error_msg.clone(),
);
return Ok(false);
}
Err(_) => {
let error_msg = "Timed out reading profile file (5s)".to_string();
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::file_read_timeout",
error_msg.clone(),
);
return Ok(false);
}
}
if !validate_profile_yaml(new_profile).await? {
return Ok(false);
}
}
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
if current_sequence < manager.latest_request_sequence() {
logging!(
info,
Type::Cmd,
"Detected a newer request before core operation (sequence: {} < {}), abandoning current request",
current_sequence,
latest_sequence
manager.latest_request_sequence()
);
return Ok(false);
}
@@ -315,17 +264,15 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
);
let current_value = profiles.current.clone();
let _ = Config::profiles().await.draft_mut().patch_config(profiles);
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
if current_sequence < manager.latest_request_sequence() {
logging!(
info,
Type::Cmd,
"Detected a newer request before core interaction (sequence: {} < {}), abandoning current request",
current_sequence,
latest_sequence
manager.latest_request_sequence()
);
Config::profiles().await.discard();
return Ok(false);
@@ -345,14 +292,13 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
match update_result {
Ok(Ok((true, _))) => {
let latest_sequence = CURRENT_REQUEST_SEQUENCE.load(Ordering::SeqCst);
if current_sequence < latest_sequence {
if current_sequence < manager.latest_request_sequence() {
logging!(
info,
Type::Cmd,
"Detected a newer request after core operation (sequence: {} < {}), ignoring current result",
current_sequence,
latest_sequence
manager.latest_request_sequence()
);
Config::profiles().await.discard();
return Ok(false);
@@ -394,7 +340,7 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
);
}
if let Some(current) = &current_value {
if let Some(current) = current_value {
logging!(
info,
Type::Cmd,
@@ -402,10 +348,9 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
current,
current_sequence
);
handle::Handle::notify_profile_changed(current.clone());
handle::Handle::notify_profile_changed(current);
}
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(true)
}
Ok(Ok((false, error_msg))) => {
@@ -416,44 +361,8 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
error_msg
);
Config::profiles().await.discard();
if let Some(prev_profile) = current_profile {
logging!(
info,
Type::Cmd,
"Attempting to restore previous configuration: {}",
prev_profile
);
let restore_profiles = IProfiles {
current: Some(prev_profile),
items: None,
};
Config::profiles()
.await
.draft_mut()
.patch_config(restore_profiles)
.stringify_err()?;
Config::profiles().await.apply();
AsyncHandler::spawn(|| async move {
if let Err(e) = profiles_save_file_safe().await {
logging!(
warn,
Type::Cmd,
"Failed to persist restored configuration asynchronously: {}",
e
);
}
});
logging!(
info,
Type::Cmd,
"Successfully restored previous configuration"
);
}
restore_previous_profile(current_profile).await?;
handle::Handle::notice_message("config_validate::error", error_msg.to_string());
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(false)
}
Ok(Err(e)) => {
@@ -467,7 +376,6 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
Config::profiles().await.discard();
handle::Handle::notice_message("config_validate::boot_error", e.to_string());
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(false)
}
Err(_) => {
@@ -480,35 +388,143 @@ async fn patch_profiles_config_internal(profiles: IProfiles) -> CmdResult<bool>
current_sequence
);
Config::profiles().await.discard();
if let Some(prev_profile) = current_profile {
logging!(
info,
Type::Cmd,
"Attempting to restore previous configuration after timeout: {}, sequence: {}",
prev_profile,
current_sequence
);
let restore_profiles = IProfiles {
current: Some(prev_profile),
items: None,
};
Config::profiles()
.await
.draft_mut()
.patch_config(restore_profiles)
.stringify_err()?;
Config::profiles().await.apply();
}
restore_previous_profile(current_profile).await?;
handle::Handle::notice_message("config_validate::timeout", timeout_msg);
CURRENT_SWITCHING_PROFILE.store(false, Ordering::SeqCst);
Ok(false)
}
}
}
async fn close_connections_after_switch(profile_id: &str) {
async fn validate_profile_yaml(profile: &SmartString) -> CmdResult<bool> {
let file_path = {
let profiles_guard = Config::profiles().await;
let profiles_data = profiles_guard.latest_ref();
match profiles_data.get_item(profile) {
Ok(item) => item.file.as_ref().and_then(|file| {
dirs::app_profiles_dir()
.ok()
.map(|dir| dir.join(file.as_str()))
}),
Err(e) => {
logging!(
error,
Type::Cmd,
"Failed to load target profile metadata: {}",
e
);
return Ok(false);
}
}
};
let Some(path) = file_path else {
return Ok(true);
};
if !path.exists() {
logging!(
error,
Type::Cmd,
"Target profile file does not exist: {}",
path.display()
);
handle::Handle::notice_message(
"config_validate::file_not_found",
format!("{}", path.display()),
);
return Ok(false);
}
let file_read_result =
time::timeout(Duration::from_secs(5), tokio_fs::read_to_string(&path)).await;
match file_read_result {
Ok(Ok(content)) => {
let yaml_parse_result = AsyncHandler::spawn_blocking(move || {
serde_yaml::from_str::<serde_yaml::Value>(&content)
})
.await;
match yaml_parse_result {
Ok(Ok(_)) => {
logging!(info, Type::Cmd, "Target profile YAML syntax is valid");
Ok(true)
}
Ok(Err(err)) => {
let error_msg = format!(" {err}");
logging!(
error,
Type::Cmd,
"Target profile contains YAML syntax errors: {}",
error_msg
);
handle::Handle::notice_message(
"config_validate::yaml_syntax_error",
error_msg.clone(),
);
Ok(false)
}
Err(join_err) => {
let error_msg = format!("YAML parsing task failed: {join_err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message(
"config_validate::yaml_parse_error",
error_msg.clone(),
);
Ok(false)
}
}
}
Ok(Err(err)) => {
let error_msg = format!("Failed to read target profile file: {err}");
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message("config_validate::file_read_error", error_msg.clone());
Ok(false)
}
Err(_) => {
let error_msg = "Timed out reading profile file (5s)".to_string();
logging!(error, Type::Cmd, "{}", error_msg);
handle::Handle::notice_message("config_validate::file_read_timeout", error_msg.clone());
Err(error_msg.into())
}
}
}
async fn restore_previous_profile(previous: Option<SmartString>) -> CmdResult<()> {
if let Some(prev_profile) = previous {
logging!(
info,
Type::Cmd,
"Attempting to restore previous configuration: {}",
prev_profile
);
let restore_profiles = IProfiles {
current: Some(prev_profile),
items: None,
};
Config::profiles()
.await
.draft_mut()
.patch_config(restore_profiles)
.stringify_err()?;
Config::profiles().await.apply();
AsyncHandler::spawn(|| async move {
if let Err(e) = profiles_save_file_safe().await {
logging!(
warn,
Type::Cmd,
"Failed to persist restored configuration asynchronously: {}",
e
);
}
});
}
Ok(())
}
async fn close_connections_after_switch(profile_id: &SmartString) {
match time::timeout(SWITCH_CLEANUP_TIMEOUT, async {
handle::Handle::mihomo().await.close_all_connections().await
})