feat(profile-switch): track cleanup and coordinate pipeline

- Add explicit cleanup tracking in the driver (`cleanup_profiles` map + `CleanupDone` messages) to know when background post-switch work is still running before starting a new workflow. (driver.rs:29-50)
- Update `handle_enqueue` to detect “cleanup in progress”: same-profile retries are short-circuited; other requests collapse the pending queue, cancelling old tokens so only the latest intent survives. (driver.rs:176-247)
- Rework scheduling helpers: `start_next_job` refuses to start while cleanup is outstanding; discarded requests release cancellation tokens; cleanup completion explicitly restarts the pipeline. (driver.rs:258-442)
This commit is contained in:
Slinetrac
2025-10-27 13:41:23 +08:00
Unverified
parent f88e848452
commit f1bc704e16
2 changed files with 132 additions and 43 deletions

View File

@@ -31,6 +31,7 @@ struct SwitchDriverState {
active: Option<SwitchRequest>,
queue: VecDeque<SwitchRequest>,
latest_tokens: HashMap<SmartString, SwitchCancellation>,
cleanup_profiles: HashMap<SmartString, tokio::task::JoinHandle<()>>,
}
#[derive(Debug)]
@@ -43,6 +44,9 @@ enum SwitchDriverMessage {
request: SwitchRequest,
outcome: SwitchJobOutcome,
},
CleanupDone {
profile: SmartString,
},
}
#[derive(Debug)]
@@ -159,6 +163,9 @@ fn switch_driver_sender() -> &'static mpsc::Sender<SwitchDriverMessage> {
SwitchDriverMessage::Completion { request, outcome } => {
handle_completion(&mut state, request, outcome, driver_tx.clone(), manager);
}
SwitchDriverMessage::CleanupDone { profile } => {
handle_cleanup_done(&mut state, profile, driver_tx.clone(), manager);
}
}
}
});
@@ -176,6 +183,33 @@ fn handle_enqueue(
let mut responder = Some(respond_to);
let accepted = true;
let profile_key = request.profile_id().clone();
let cleanup_pending = state.active.is_none() && !state.cleanup_profiles.is_empty();
if cleanup_pending && state.cleanup_profiles.contains_key(&profile_key) {
logging!(
debug,
Type::Cmd,
"Switch task {} -> {} ignored because cleanup is still running",
request.task_id(),
profile_key
);
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
return;
}
if cleanup_pending {
logging!(
debug,
Type::Cmd,
"Cleanup running for {} profile(s); collapsing pending requests before enqueuing task {} -> {}",
state.cleanup_profiles.len(),
request.task_id(),
profile_key
);
drop_pending_requests(state);
}
if let Some(previous) = state
.latest_tokens
@@ -203,18 +237,12 @@ fn handle_enqueue(
.queue
.retain(|queued| queued.profile_id() != &profile_key);
if state.active.is_none() {
state.active = Some(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
start_switch_job(driver_tx, manager, request);
} else {
state.queue.push_back(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
state.queue.push_back(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
start_next_job(state, driver_tx, manager);
}
fn handle_completion(
@@ -258,11 +286,32 @@ fn handle_completion(
state.latest_tokens.remove(request.profile_id());
}
if state.active.is_none()
&& let Some(next) = state.queue.pop_front()
{
state.active = Some(next.clone());
start_switch_job(driver_tx, manager, next);
// Schedule cleanup tracking removal once background task finishes.
track_cleanup(state, driver_tx.clone(), request.profile_id().clone());
start_next_job(state, driver_tx, manager);
}
fn drop_pending_requests(state: &mut SwitchDriverState) {
while let Some(request) = state.queue.pop_front() {
discard_request(state, request);
}
}
fn discard_request(state: &mut SwitchDriverState, request: SwitchRequest) {
let key = request.profile_id().clone();
let should_remove = state
.latest_tokens
.get(&key)
.map(|latest| latest.same_token(request.cancel_token()))
.unwrap_or(false);
if should_remove {
state.latest_tokens.remove(&key);
}
if !request.cancel_token().is_cancelled() {
request.cancel_token().cancel();
}
}
@@ -337,3 +386,57 @@ fn start_switch_job(
}
});
}
fn track_cleanup(
state: &mut SwitchDriverState,
driver_tx: mpsc::Sender<SwitchDriverMessage>,
profile: SmartString,
) {
if state.cleanup_profiles.contains_key(&profile) {
return;
}
let profile_clone = profile.clone();
let handle = tokio::spawn(async move {
time::sleep(Duration::from_millis(10)).await;
let _ = driver_tx
.send(SwitchDriverMessage::CleanupDone {
profile: profile_clone,
})
.await;
});
state.cleanup_profiles.insert(profile, handle);
}
fn handle_cleanup_done(
state: &mut SwitchDriverState,
profile: SmartString,
driver_tx: mpsc::Sender<SwitchDriverMessage>,
manager: &'static SwitchManager,
) {
if let Some(handle) = state.cleanup_profiles.remove(&profile) {
handle.abort();
}
start_next_job(state, driver_tx, manager);
}
fn start_next_job(
state: &mut SwitchDriverState,
driver_tx: mpsc::Sender<SwitchDriverMessage>,
manager: &'static SwitchManager,
) {
if state.active.is_some() || !state.cleanup_profiles.is_empty() {
return;
}
while let Some(request) = state.queue.pop_front() {
if request.cancel_token().is_cancelled() {
discard_request(state, request);
continue;
}
state.active = Some(request.clone());
start_switch_job(driver_tx, manager, request);
break;
}
}

View File

@@ -33,9 +33,8 @@ pub(super) async fn run_switch_job(
"Switch task {} cancelled before validation",
request.task_id()
);
handle::Handle::notify_profile_switch_finished(
schedule_post_switch_failure(
request.profile_id().clone(),
false,
request.notify(),
request.task_id(),
);
@@ -56,7 +55,7 @@ pub(super) async fn run_switch_job(
err
);
handle::Handle::notice_message("config_validate::error", err.clone());
handle::Handle::notify_profile_switch_finished(profile_id.clone(), false, notify, task_id);
schedule_post_switch_failure(profile_id.clone(), notify, task_id);
return Ok(false);
}
@@ -102,12 +101,7 @@ pub(super) async fn run_switch_job(
"config_validate::error",
format!("profile switch timed out: {}", profile_id),
);
handle::Handle::notify_profile_switch_finished(
profile_id.clone(),
false,
notify,
task_id,
);
schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Ok(false)
}
Ok(Err(panic_payload)) => {
@@ -124,12 +118,7 @@ pub(super) async fn run_switch_job(
"config_validate::panic",
format!("profile switch panic: {}", profile_id),
);
handle::Handle::notify_profile_switch_finished(
profile_id.clone(),
false,
notify,
task_id,
);
schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Err(SwitchPanicInfo::workflow_root(panic_message))
}
Ok(Ok(machine_result)) => match machine_result {
@@ -147,12 +136,7 @@ pub(super) async fn run_switch_job(
err
);
handle::Handle::notice_message("config_validate::error", err.clone());
handle::Handle::notify_profile_switch_finished(
profile_id.clone(),
false,
notify,
task_id,
);
schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Ok(false)
}
},
@@ -170,12 +154,7 @@ pub(super) async fn run_switch_job(
"config_validate::panic",
format!("profile switch panic: {}", profile_id),
);
handle::Handle::notify_profile_switch_finished(
profile_id.clone(),
false,
notify,
task_id,
);
schedule_post_switch_failure(profile_id.clone(), notify, task_id);
Err(panic_info)
}
},
@@ -411,6 +390,13 @@ fn schedule_post_switch_success(
});
}
fn schedule_post_switch_failure(profile_id: SmartString, notify: bool, task_id: u64) {
AsyncHandler::spawn(move || async move {
handle::Handle::notify_profile_switch_finished(profile_id.clone(), false, notify, task_id);
schedule_close_connections(profile_id);
});
}
pub(super) fn describe_panic_payload(payload: &(dyn Any + Send)) -> String {
if let Some(message) = payload.downcast_ref::<&str>() {
(*message).to_string()