From f1bc704e16eb01bb437aeb503318efd8dbfcae70 Mon Sep 17 00:00:00 2001 From: Slinetrac Date: Mon, 27 Oct 2025 13:41:23 +0800 Subject: [PATCH] feat(profile-switch): track cleanup and coordinate pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add explicit cleanup tracking in the driver (`cleanup_profiles` map + `CleanupDone` messages) to know when background post-switch work is still running before starting a new workflow. (driver.rs:29-50) - Update `handle_enqueue` to detect “cleanup in progress”: same-profile retries are short-circuited; other requests collapse the pending queue, cancelling old tokens so only the latest intent survives. (driver.rs:176-247) - Rework scheduling helpers: `start_next_job` refuses to start while cleanup is outstanding; discarded requests release cancellation tokens; cleanup completion explicitly restarts the pipeline. (driver.rs:258-442) --- src-tauri/src/cmd/profile_switch/driver.rs | 135 ++++++++++++++++--- src-tauri/src/cmd/profile_switch/workflow.rs | 40 ++---- 2 files changed, 132 insertions(+), 43 deletions(-) diff --git a/src-tauri/src/cmd/profile_switch/driver.rs b/src-tauri/src/cmd/profile_switch/driver.rs index d25c6f51..9cb3f9fe 100644 --- a/src-tauri/src/cmd/profile_switch/driver.rs +++ b/src-tauri/src/cmd/profile_switch/driver.rs @@ -31,6 +31,7 @@ struct SwitchDriverState { active: Option, queue: VecDeque, latest_tokens: HashMap, + cleanup_profiles: HashMap>, } #[derive(Debug)] @@ -43,6 +44,9 @@ enum SwitchDriverMessage { request: SwitchRequest, outcome: SwitchJobOutcome, }, + CleanupDone { + profile: SmartString, + }, } #[derive(Debug)] @@ -159,6 +163,9 @@ fn switch_driver_sender() -> &'static mpsc::Sender { SwitchDriverMessage::Completion { request, outcome } => { handle_completion(&mut state, request, outcome, driver_tx.clone(), manager); } + SwitchDriverMessage::CleanupDone { profile } => { + handle_cleanup_done(&mut state, profile, driver_tx.clone(), manager); + } } } }); @@ -176,6 +183,33 @@ fn handle_enqueue( let mut responder = Some(respond_to); let accepted = true; let profile_key = request.profile_id().clone(); + let cleanup_pending = state.active.is_none() && !state.cleanup_profiles.is_empty(); + + if cleanup_pending && state.cleanup_profiles.contains_key(&profile_key) { + logging!( + debug, + Type::Cmd, + "Switch task {} -> {} ignored because cleanup is still running", + request.task_id(), + profile_key + ); + if let Some(sender) = responder.take() { + let _ = sender.send(accepted); + } + return; + } + + if cleanup_pending { + logging!( + debug, + Type::Cmd, + "Cleanup running for {} profile(s); collapsing pending requests before enqueuing task {} -> {}", + state.cleanup_profiles.len(), + request.task_id(), + profile_key + ); + drop_pending_requests(state); + } if let Some(previous) = state .latest_tokens @@ -203,18 +237,12 @@ fn handle_enqueue( .queue .retain(|queued| queued.profile_id() != &profile_key); - if state.active.is_none() { - state.active = Some(request.clone()); - if let Some(sender) = responder.take() { - let _ = sender.send(accepted); - } - start_switch_job(driver_tx, manager, request); - } else { - state.queue.push_back(request.clone()); - if let Some(sender) = responder.take() { - let _ = sender.send(accepted); - } + state.queue.push_back(request.clone()); + if let Some(sender) = responder.take() { + let _ = sender.send(accepted); } + + start_next_job(state, driver_tx, manager); } fn handle_completion( @@ -258,11 +286,32 @@ fn handle_completion( state.latest_tokens.remove(request.profile_id()); } - if state.active.is_none() - && let Some(next) = state.queue.pop_front() - { - state.active = Some(next.clone()); - start_switch_job(driver_tx, manager, next); + // Schedule cleanup tracking removal once background task finishes. + track_cleanup(state, driver_tx.clone(), request.profile_id().clone()); + + start_next_job(state, driver_tx, manager); +} + +fn drop_pending_requests(state: &mut SwitchDriverState) { + while let Some(request) = state.queue.pop_front() { + discard_request(state, request); + } +} + +fn discard_request(state: &mut SwitchDriverState, request: SwitchRequest) { + let key = request.profile_id().clone(); + let should_remove = state + .latest_tokens + .get(&key) + .map(|latest| latest.same_token(request.cancel_token())) + .unwrap_or(false); + + if should_remove { + state.latest_tokens.remove(&key); + } + + if !request.cancel_token().is_cancelled() { + request.cancel_token().cancel(); } } @@ -337,3 +386,57 @@ fn start_switch_job( } }); } + +fn track_cleanup( + state: &mut SwitchDriverState, + driver_tx: mpsc::Sender, + profile: SmartString, +) { + if state.cleanup_profiles.contains_key(&profile) { + return; + } + + let profile_clone = profile.clone(); + let handle = tokio::spawn(async move { + time::sleep(Duration::from_millis(10)).await; + let _ = driver_tx + .send(SwitchDriverMessage::CleanupDone { + profile: profile_clone, + }) + .await; + }); + state.cleanup_profiles.insert(profile, handle); +} + +fn handle_cleanup_done( + state: &mut SwitchDriverState, + profile: SmartString, + driver_tx: mpsc::Sender, + manager: &'static SwitchManager, +) { + if let Some(handle) = state.cleanup_profiles.remove(&profile) { + handle.abort(); + } + start_next_job(state, driver_tx, manager); +} + +fn start_next_job( + state: &mut SwitchDriverState, + driver_tx: mpsc::Sender, + manager: &'static SwitchManager, +) { + if state.active.is_some() || !state.cleanup_profiles.is_empty() { + return; + } + + while let Some(request) = state.queue.pop_front() { + if request.cancel_token().is_cancelled() { + discard_request(state, request); + continue; + } + + state.active = Some(request.clone()); + start_switch_job(driver_tx, manager, request); + break; + } +} diff --git a/src-tauri/src/cmd/profile_switch/workflow.rs b/src-tauri/src/cmd/profile_switch/workflow.rs index 8726edf7..75b3bfdb 100644 --- a/src-tauri/src/cmd/profile_switch/workflow.rs +++ b/src-tauri/src/cmd/profile_switch/workflow.rs @@ -33,9 +33,8 @@ pub(super) async fn run_switch_job( "Switch task {} cancelled before validation", request.task_id() ); - handle::Handle::notify_profile_switch_finished( + schedule_post_switch_failure( request.profile_id().clone(), - false, request.notify(), request.task_id(), ); @@ -56,7 +55,7 @@ pub(super) async fn run_switch_job( err ); handle::Handle::notice_message("config_validate::error", err.clone()); - handle::Handle::notify_profile_switch_finished(profile_id.clone(), false, notify, task_id); + schedule_post_switch_failure(profile_id.clone(), notify, task_id); return Ok(false); } @@ -102,12 +101,7 @@ pub(super) async fn run_switch_job( "config_validate::error", format!("profile switch timed out: {}", profile_id), ); - handle::Handle::notify_profile_switch_finished( - profile_id.clone(), - false, - notify, - task_id, - ); + schedule_post_switch_failure(profile_id.clone(), notify, task_id); Ok(false) } Ok(Err(panic_payload)) => { @@ -124,12 +118,7 @@ pub(super) async fn run_switch_job( "config_validate::panic", format!("profile switch panic: {}", profile_id), ); - handle::Handle::notify_profile_switch_finished( - profile_id.clone(), - false, - notify, - task_id, - ); + schedule_post_switch_failure(profile_id.clone(), notify, task_id); Err(SwitchPanicInfo::workflow_root(panic_message)) } Ok(Ok(machine_result)) => match machine_result { @@ -147,12 +136,7 @@ pub(super) async fn run_switch_job( err ); handle::Handle::notice_message("config_validate::error", err.clone()); - handle::Handle::notify_profile_switch_finished( - profile_id.clone(), - false, - notify, - task_id, - ); + schedule_post_switch_failure(profile_id.clone(), notify, task_id); Ok(false) } }, @@ -170,12 +154,7 @@ pub(super) async fn run_switch_job( "config_validate::panic", format!("profile switch panic: {}", profile_id), ); - handle::Handle::notify_profile_switch_finished( - profile_id.clone(), - false, - notify, - task_id, - ); + schedule_post_switch_failure(profile_id.clone(), notify, task_id); Err(panic_info) } }, @@ -411,6 +390,13 @@ fn schedule_post_switch_success( }); } +fn schedule_post_switch_failure(profile_id: SmartString, notify: bool, task_id: u64) { + AsyncHandler::spawn(move || async move { + handle::Handle::notify_profile_switch_finished(profile_id.clone(), false, notify, task_id); + schedule_close_connections(profile_id); + }); +} + pub(super) fn describe_panic_payload(payload: &(dyn Any + Send)) -> String { if let Some(message) = payload.downcast_ref::<&str>() { (*message).to_string()