From 9de5866efa8931b55d983778f0972444ed7f2277 Mon Sep 17 00:00:00 2001 From: Slinetrac Date: Sun, 26 Oct 2025 23:58:43 +0800 Subject: [PATCH] feat(profile_switch): integrate explicit state machine for profile switching - workflow.rs:24 now delegates each switch to SwitchStateMachine, passing an owned SwitchRequest. Queue cancellation and state-sequence checks are centralized inside the machine instead of scattered guards. - workflow.rs:176 replaces the old helper with `SwitchStateMachine::new(manager(), None, profiles).run().await`, ensuring manual profile patches follow the same workflow (locking, validation, rollback) as queued switches. - workflow.rs:180 & 275 expose `validate_profile_yaml` and `restore_previous_profile` for reuse inside the state machine. - workflow/state_machine.rs:1 introduces a dedicated state machine module. It manages global mutex acquisition, request/cancellation state, YAML validation, draft patching, `CoreManager::update_config`, failure rollback, and tray/notification side-effects. Transitions check for cancellations and stale sequences; completions release guards via `SwitchScope` drop. --- src-tauri/src/cmd/profile_switch/workflow.rs | 248 +--------- .../profile_switch/workflow/state_machine.rs | 448 ++++++++++++++++++ 2 files changed, 464 insertions(+), 232 deletions(-) create mode 100644 src-tauri/src/cmd/profile_switch/workflow/state_machine.rs diff --git a/src-tauri/src/cmd/profile_switch/workflow.rs b/src-tauri/src/cmd/profile_switch/workflow.rs index dc311bcc..859401e6 100644 --- a/src-tauri/src/cmd/profile_switch/workflow.rs +++ b/src-tauri/src/cmd/profile_switch/workflow.rs @@ -6,7 +6,7 @@ use super::{ use crate::cmd::StringifyErr; use crate::{ config::{Config, IProfiles, profiles::profiles_save_file_safe}, - core::{CoreManager, handle, tray::Tray}, + core::handle, logging, process::AsyncHandler, utils::{dirs, logging::Type}, @@ -17,6 +17,10 @@ use smartstring::alias::String as SmartString; use std::{any::Any, panic::AssertUnwindSafe, time::Duration}; use tokio::{fs as tokio_fs, time}; +mod state_machine; + +use state_machine::SwitchStateMachine; + pub(super) async fn run_switch_job( manager: &'static SwitchManager, request: SwitchRequest, @@ -64,17 +68,18 @@ pub(super) async fn run_switch_job( notify ); - let request_clone = request.clone(); + let pipeline_request = request; let pipeline = async move { - let _core_guard = manager.core_mutex().lock().await; - patch_profiles_config_internal( + let target_profile = pipeline_request.profile_id().clone(); + SwitchStateMachine::new( manager, - Some(&request_clone), + Some(pipeline_request), IProfiles { - current: Some(request_clone.profile_id().clone()), + current: Some(target_profile), items: None, }, ) + .run() .await }; @@ -169,233 +174,12 @@ pub(super) async fn run_switch_job( } pub(super) async fn patch_profiles_config(profiles: IProfiles) -> CmdResult { - let manager = manager(); - let _core_guard = manager.core_mutex().lock().await; - patch_profiles_config_internal(manager, None, profiles).await + SwitchStateMachine::new(manager(), None, profiles) + .run() + .await } -async fn patch_profiles_config_internal( - manager: &'static SwitchManager, - request_ctx: Option<&SwitchRequest>, - profiles: IProfiles, -) -> CmdResult { - if manager.is_switching() { - logging!( - info, - Type::Cmd, - "Profile switch already in progress; skipping request" - ); - return Ok(false); - } - - if let Some(req) = request_ctx - && req.cancel_token().is_cancelled() - { - return Ok(false); - } - - let _switch_guard = manager.begin_switch(); - - let current_sequence = manager.next_request_sequence(); - let mut target_profile = profiles.current.clone(); - if target_profile.is_none() - && let Some(req) = request_ctx - { - target_profile = Some(req.profile_id().clone()); - } - - logging!( - info, - Type::Cmd, - "Begin modifying configuration; sequence: {}, target profile: {:?}", - current_sequence, - target_profile - ); - - if current_sequence < manager.latest_request_sequence() { - logging!( - info, - Type::Cmd, - "Detected a newer request after acquiring the lock (sequence: {} < {}), abandoning current request", - current_sequence, - manager.latest_request_sequence() - ); - return Ok(false); - } - - let current_profile = { - let profiles_guard = Config::profiles().await; - profiles_guard.latest_ref().current.clone() - }; - logging!(info, Type::Cmd, "Current profile: {:?}", current_profile); - - if let Some(new_profile) = target_profile.as_ref() - && current_profile.as_ref() != Some(new_profile) - { - logging!(info, Type::Cmd, "Switching to new profile: {}", new_profile); - - if let Some(req) = request_ctx - && req.cancel_token().is_cancelled() - { - return Ok(false); - } - - if !validate_profile_yaml(new_profile).await? { - return Ok(false); - } - } - - if current_sequence < manager.latest_request_sequence() { - logging!( - info, - Type::Cmd, - "Detected a newer request before core operation (sequence: {} < {}), abandoning current request", - current_sequence, - manager.latest_request_sequence() - ); - return Ok(false); - } - - logging!( - info, - Type::Cmd, - "Updating configuration draft, sequence: {}", - current_sequence - ); - - let current_value = profiles.current.clone(); - let _ = Config::profiles().await.draft_mut().patch_config(profiles); - - if current_sequence < manager.latest_request_sequence() { - logging!( - info, - Type::Cmd, - "Detected a newer request before core interaction (sequence: {} < {}), abandoning current request", - current_sequence, - manager.latest_request_sequence() - ); - Config::profiles().await.discard(); - return Ok(false); - } - - logging!( - info, - Type::Cmd, - "Starting core configuration update, sequence: {}", - current_sequence - ); - let update_result = time::timeout( - Duration::from_secs(30), - CoreManager::global().update_config(), - ) - .await; - - match update_result { - Ok(Ok((true, _))) => { - if current_sequence < manager.latest_request_sequence() { - logging!( - info, - Type::Cmd, - "Detected a newer request after core operation (sequence: {} < {}), ignoring current result", - current_sequence, - manager.latest_request_sequence() - ); - Config::profiles().await.discard(); - return Ok(false); - } - - logging!( - info, - Type::Cmd, - "Configuration update succeeded, sequence: {}", - current_sequence - ); - Config::profiles().await.apply(); - handle::Handle::refresh_clash(); - - if let Err(e) = Tray::global().update_tooltip().await { - logging!( - warn, - Type::Cmd, - "Failed to update tray tooltip asynchronously: {}", - e - ); - } - - if let Err(e) = Tray::global().update_menu().await { - logging!( - warn, - Type::Cmd, - "Failed to update tray menu asynchronously: {}", - e - ); - } - - if let Err(e) = profiles_save_file_safe().await { - logging!( - warn, - Type::Cmd, - "Failed to persist configuration file asynchronously: {}", - e - ); - } - - if let Some(current) = current_value { - logging!( - info, - Type::Cmd, - "Emitting configuration change event to frontend: {}, sequence: {}", - current, - current_sequence - ); - handle::Handle::notify_profile_changed(current); - } - - Ok(true) - } - Ok(Ok((false, error_msg))) => { - logging!( - warn, - Type::Cmd, - "Configuration validation failed: {}", - error_msg - ); - Config::profiles().await.discard(); - restore_previous_profile(current_profile).await?; - handle::Handle::notice_message("config_validate::error", error_msg.to_string()); - Ok(false) - } - Ok(Err(e)) => { - logging!( - warn, - Type::Cmd, - "Error occurred during update: {}, sequence: {}", - e, - current_sequence - ); - Config::profiles().await.discard(); - handle::Handle::notice_message("config_validate::boot_error", e.to_string()); - - Ok(false) - } - Err(_) => { - let timeout_msg = "Configuration update timed out (30s); possible validation or core communication stall"; - logging!( - error, - Type::Cmd, - "{}, sequence: {}", - timeout_msg, - current_sequence - ); - Config::profiles().await.discard(); - restore_previous_profile(current_profile).await?; - handle::Handle::notice_message("config_validate::timeout", timeout_msg); - Ok(false) - } - } -} - -async fn validate_profile_yaml(profile: &SmartString) -> CmdResult { +pub(super) async fn validate_profile_yaml(profile: &SmartString) -> CmdResult { let file_path = { let profiles_guard = Config::profiles().await; let profiles_data = profiles_guard.latest_ref(); @@ -490,7 +274,7 @@ async fn validate_profile_yaml(profile: &SmartString) -> CmdResult { } } -async fn restore_previous_profile(previous: Option) -> CmdResult<()> { +pub(super) async fn restore_previous_profile(previous: Option) -> CmdResult<()> { if let Some(prev_profile) = previous { logging!( info, diff --git a/src-tauri/src/cmd/profile_switch/workflow/state_machine.rs b/src-tauri/src/cmd/profile_switch/workflow/state_machine.rs new file mode 100644 index 00000000..7283a468 --- /dev/null +++ b/src-tauri/src/cmd/profile_switch/workflow/state_machine.rs @@ -0,0 +1,448 @@ +use super::{CmdResult, restore_previous_profile, validate_profile_yaml}; +use crate::{ + cmd::profile_switch::state::{SwitchManager, SwitchRequest, SwitchScope}, + config::{Config, IProfiles, profiles::profiles_save_file_safe}, + core::{CoreManager, handle, tray::Tray}, + logging, + utils::logging::Type, +}; +use smartstring::alias::String as SmartString; +use std::{mem, time::Duration}; +use tokio::{sync::MutexGuard, time}; + +/// Explicit state machine for profile switching so we can reason about +/// cancellation, stale requests, and side-effects at each stage. +pub(super) struct SwitchStateMachine { + ctx: SwitchContext, + state: SwitchState, +} + +impl SwitchStateMachine { + pub(super) fn new( + manager: &'static SwitchManager, + request: Option, + profiles: IProfiles, + ) -> Self { + Self { + ctx: SwitchContext::new(manager, request, profiles), + state: SwitchState::Start, + } + } + + pub(super) async fn run(mut self) -> CmdResult { + loop { + let current_state = mem::replace(&mut self.state, SwitchState::Complete(false)); + let next = match current_state { + SwitchState::Start => self.handle_start(), + SwitchState::AcquireCore => self.handle_acquire_core().await, + SwitchState::Prepare => self.handle_prepare().await, + SwitchState::ValidateTarget => self.handle_validate_target().await, + SwitchState::PatchDraft => self.handle_patch_draft().await, + SwitchState::UpdateCore => self.handle_update_core().await, + SwitchState::Finalize(outcome) => self.handle_finalize(outcome).await, + SwitchState::Complete(result) => return Ok(result), + }?; + self.state = next; + } + } + + fn handle_start(&mut self) -> CmdResult { + if self.ctx.manager.is_switching() { + logging!( + info, + Type::Cmd, + "Profile switch already in progress; skipping request" + ); + return Ok(SwitchState::Complete(false)); + } + Ok(SwitchState::AcquireCore) + } + + async fn handle_acquire_core(&mut self) -> CmdResult { + self.ctx.core_guard = Some(self.ctx.manager.core_mutex().lock().await); + self.ctx.switch_scope = Some(self.ctx.manager.begin_switch()); + self.ctx.sequence = Some(self.ctx.manager.next_request_sequence()); + self.ctx.ensure_target_profile(); + + logging!( + info, + Type::Cmd, + "Begin modifying configuration; sequence: {}, target profile: {:?}", + self.ctx.sequence(), + self.ctx.target_profile + ); + + if self.ctx.cancelled() { + self.ctx.log_cancelled("after acquiring core lock"); + return Ok(SwitchState::Complete(false)); + } + + if self.ctx.stale() { + StaleStage::AfterLock.log(&self.ctx); + return Ok(SwitchState::Complete(false)); + } + + Ok(SwitchState::Prepare) + } + + async fn handle_prepare(&mut self) -> CmdResult { + let current_profile = { + let profiles_guard = Config::profiles().await; + profiles_guard.latest_ref().current.clone() + }; + + logging!(info, Type::Cmd, "Current profile: {:?}", current_profile); + self.ctx.previous_profile = current_profile; + Ok(SwitchState::ValidateTarget) + } + + async fn handle_validate_target(&mut self) -> CmdResult { + if self.ctx.cancelled() { + self.ctx.log_cancelled("before validation"); + return Ok(SwitchState::Complete(false)); + } + + if self.ctx.should_validate_target() { + let Some(target) = self.ctx.target_profile.clone() else { + logging!( + error, + Type::Cmd, + "Missing target profile while validation was requested; aborting switch" + ); + return Err("missing target profile at validation".into()); + }; + if !validate_profile_yaml(&target).await? { + return Ok(SwitchState::Complete(false)); + } + } + + if self.ctx.stale() { + StaleStage::BeforeCoreOperation.log(&self.ctx); + return Ok(SwitchState::Complete(false)); + } + + Ok(SwitchState::PatchDraft) + } + + async fn handle_patch_draft(&mut self) -> CmdResult { + if self.ctx.cancelled() { + self.ctx.log_cancelled("before patching configuration"); + return Ok(SwitchState::Complete(false)); + } + + logging!( + info, + Type::Cmd, + "Updating configuration draft, sequence: {}", + self.ctx.sequence() + ); + + let patch = self + .ctx + .take_profiles_patch() + .ok_or_else(|| "profiles patch already consumed".to_string())?; + self.ctx.new_profile_for_event = patch.current.clone(); + let _ = Config::profiles().await.draft_mut().patch_config(patch); + + if self.ctx.stale() { + StaleStage::BeforeCoreInteraction.log(&self.ctx); + Config::profiles().await.discard(); + return Ok(SwitchState::Complete(false)); + } + + Ok(SwitchState::UpdateCore) + } + + async fn handle_update_core(&mut self) -> CmdResult { + logging!( + info, + Type::Cmd, + "Starting core configuration update, sequence: {}", + self.ctx.sequence() + ); + + let update_result = time::timeout( + Duration::from_secs(30), + CoreManager::global().update_config(), + ) + .await; + + let outcome = match update_result { + Ok(Ok((true, _))) => CoreUpdateOutcome::Success, + Ok(Ok((false, msg))) => CoreUpdateOutcome::ValidationFailed { + message: msg.to_string(), + }, + Ok(Err(err)) => CoreUpdateOutcome::CoreError { + message: err.to_string(), + }, + Err(_) => CoreUpdateOutcome::Timeout, + }; + + Ok(SwitchState::Finalize(outcome)) + } + + async fn handle_finalize(&mut self, outcome: CoreUpdateOutcome) -> CmdResult { + match outcome { + CoreUpdateOutcome::Success => { + if self.ctx.stale() { + StaleStage::AfterCoreOperation.log(&self.ctx); + Config::profiles().await.discard(); + return Ok(SwitchState::Complete(false)); + } + + logging!( + info, + Type::Cmd, + "Configuration update succeeded, sequence: {}", + self.ctx.sequence() + ); + Config::profiles().await.apply(); + handle::Handle::refresh_clash(); + + if let Err(err) = Tray::global().update_tooltip().await { + logging!( + warn, + Type::Cmd, + "Failed to update tray tooltip asynchronously: {}", + err + ); + } + + if let Err(err) = Tray::global().update_menu().await { + logging!( + warn, + Type::Cmd, + "Failed to update tray menu asynchronously: {}", + err + ); + } + + if let Err(err) = profiles_save_file_safe().await { + logging!( + warn, + Type::Cmd, + "Failed to persist configuration file asynchronously: {}", + err + ); + } + + if let Some(current) = self.ctx.new_profile_for_event.clone() { + logging!( + info, + Type::Cmd, + "Emitting configuration change event to frontend: {}, sequence: {}", + current, + self.ctx.sequence() + ); + handle::Handle::notify_profile_changed(current); + } + + Ok(SwitchState::Complete(true)) + } + CoreUpdateOutcome::ValidationFailed { message } => { + logging!( + warn, + Type::Cmd, + "Configuration validation failed: {}", + message + ); + Config::profiles().await.discard(); + restore_previous_profile(self.ctx.previous_profile.clone()).await?; + handle::Handle::notice_message("config_validate::error", message); + Ok(SwitchState::Complete(false)) + } + CoreUpdateOutcome::CoreError { message } => { + logging!( + warn, + Type::Cmd, + "Error occurred during update: {}, sequence: {}", + message, + self.ctx.sequence() + ); + Config::profiles().await.discard(); + handle::Handle::notice_message("config_validate::boot_error", message); + Ok(SwitchState::Complete(false)) + } + CoreUpdateOutcome::Timeout => { + let timeout_msg = "Configuration update timed out (30s); possible validation or core communication stall"; + logging!( + error, + Type::Cmd, + "{}, sequence: {}", + timeout_msg, + self.ctx.sequence() + ); + Config::profiles().await.discard(); + restore_previous_profile(self.ctx.previous_profile.clone()).await?; + handle::Handle::notice_message("config_validate::timeout", timeout_msg); + Ok(SwitchState::Complete(false)) + } + } + } +} + +struct SwitchContext { + manager: &'static SwitchManager, + request: Option, + profiles_patch: Option, + sequence: Option, + target_profile: Option, + previous_profile: Option, + new_profile_for_event: Option, + switch_scope: Option>, + core_guard: Option>, +} + +impl SwitchContext { + fn new( + manager: &'static SwitchManager, + request: Option, + profiles: IProfiles, + ) -> Self { + Self { + manager, + request, + profiles_patch: Some(profiles), + sequence: None, + target_profile: None, + previous_profile: None, + new_profile_for_event: None, + switch_scope: None, + core_guard: None, + } + } + + fn ensure_target_profile(&mut self) { + if let Some(patch) = self.profiles_patch.as_mut() { + if patch.current.is_none() + && let Some(request) = self.request.as_ref() + { + patch.current = Some(request.profile_id().clone()); + } + self.target_profile = patch.current.clone(); + } + } + + fn take_profiles_patch(&mut self) -> Option { + self.profiles_patch.take() + } + + fn cancelled(&self) -> bool { + self.request + .as_ref() + .map(|req| req.cancel_token().is_cancelled()) + .unwrap_or(false) + } + + fn log_cancelled(&self, stage: &str) { + if let Some(request) = self.request.as_ref() { + logging!( + info, + Type::Cmd, + "Switch task {} cancelled {}; profile={}", + request.task_id(), + stage, + request.profile_id() + ); + } else { + logging!(info, Type::Cmd, "Profile switch cancelled {}", stage); + } + } + + fn should_validate_target(&self) -> bool { + match (&self.target_profile, &self.previous_profile) { + (Some(target), Some(current)) => current != target, + (Some(_), None) => true, + _ => false, + } + } + + fn stale(&self) -> bool { + self.sequence + .map(|seq| seq < self.manager.latest_request_sequence()) + .unwrap_or(false) + } + + fn sequence(&self) -> u64 { + match self.sequence { + Some(sequence) => sequence, + None => { + logging!( + warn, + Type::Cmd, + "Sequence unexpectedly missing in switch context; defaulting to 0" + ); + 0 + } + } + } +} + +enum SwitchState { + Start, + AcquireCore, + Prepare, + ValidateTarget, + PatchDraft, + UpdateCore, + Finalize(CoreUpdateOutcome), + Complete(bool), +} + +enum CoreUpdateOutcome { + Success, + ValidationFailed { message: String }, + CoreError { message: String }, + Timeout, +} + +enum StaleStage { + AfterLock, + BeforeCoreOperation, + BeforeCoreInteraction, + AfterCoreOperation, +} + +impl StaleStage { + fn log(&self, ctx: &SwitchContext) { + let sequence = ctx.sequence(); + let latest = ctx.manager.latest_request_sequence(); + match self { + StaleStage::AfterLock => logging!( + info, + Type::Cmd, + "Detected a newer request after acquiring the lock (sequence: {} < {}), abandoning current request", + sequence, + latest + ), + StaleStage::BeforeCoreOperation => logging!( + info, + Type::Cmd, + "Detected a newer request before core operation (sequence: {} < {}), abandoning current request", + sequence, + latest + ), + StaleStage::BeforeCoreInteraction => logging!( + info, + Type::Cmd, + "Detected a newer request before core interaction (sequence: {} < {}), abandoning current request", + sequence, + latest + ), + StaleStage::AfterCoreOperation => logging!( + info, + Type::Cmd, + "Detected a newer request after core operation (sequence: {} < {}), ignoring current result", + sequence, + latest + ), + } + } +} + +impl Drop for SwitchContext { + fn drop(&mut self) { + self.core_guard.take(); + self.switch_scope.take(); + } +}