Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove old actions with no listeners #778

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

zbirenbaum
Copy link
Contributor

@zbirenbaum zbirenbaum commented Mar 19, 2024

Description

Implement scheduler side removal of actions with no listeners. Adds disconnect_timeout_s configuration field with default of 60s. If the client waiting on a given action is disconnected for longer than this duration without reconnecting the scheduler will stop tracking it. This does not remove it from the worker if the job has already been dispatched.

Fixes #338

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

How Has This Been Tested?

A test has been added to ensure that requested actions are removed if a client is found to be disconnected for disconnect_timeout_s

Checklist

  • Tests added/amended
  • bazel test //... passes locally
  • PR is contained in a single commit, using git amend see some docs

This change is Reviewable

Copy link
Contributor Author

@zbirenbaum zbirenbaum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Vercel, pre-commit-checks


nativelink-scheduler/src/simple_scheduler.rs line 77 at r1 (raw file):

    /// Updated on every client connect and periodically while it has listeners.
    last_update_timestamp: Mutex<u64>

Mutex added due to borrow checker difficulties with modifying this field from active_actions.


nativelink-scheduler/src/simple_scheduler.rs line 80 at r1 (raw file):

}

impl AwaitedAction {

These helpers are added to avoid in-lining long variable references and mutex locking but might not be necessary

Copy link
Contributor

@adam-singer adam-singer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using struct.Instant over struct.SystemTime in this case?

Reviewed 2 of 3 files at r1, 1 of 1 files at r2, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), macos-13, ubuntu-22.04, zig-cc ubuntu-20.04, zig-cc ubuntu-22.04


nativelink-scheduler/src/simple_scheduler.rs line 475 at r2 (raw file):

            // add update to queued action update timestamp here
            let action = self.queued_actions.get_mut(&action_info).unwrap();
            let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();

Would it make a difference if we captured system time now before the loop?


nativelink-scheduler/src/simple_scheduler.rs line 548 at r2 (raw file):

        for running_action in running_actions {
            if running_action.action.notify_channel.receiver_count() > 0 {
                running_action.set_last_update_timestamp(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());

grab now before the else/if and having it recalled

let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();

Copy link
Collaborator

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), macos-13, ubuntu-22.04, zig-cc ubuntu-20.04, zig-cc ubuntu-22.04


nativelink-scheduler/src/simple_scheduler.rs line 471 at r2 (raw file):

        // unstable feature [see: https://github.com/rust-lang/rust/issues/70530]).
        let action_infos: Vec<Arc<ActionInfo>> =
            self.queued_actions.keys().rev().cloned().collect();

Make this let queued_actions = self.queued_actions.iter().rev().cloned().collect(). This will allow you to then use:

for (action_info, action) in queued_actions {

And you won't need to deal with unwrap and such.


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

            let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
            if action.notify_channel.receiver_count() > 0 {
                action.set_last_update_timestamp(now)

nit: semi-colon.


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

            let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
            if action.notify_channel.receiver_count() > 0 {
                action.set_last_update_timestamp(now)

I really feel that we should be able to detect when clients disconnect and set it then.


nativelink-scheduler/src/simple_scheduler.rs line 480 at r2 (raw file):

            } else if action.get_last_update_timestamp() + self.disconnect_timeout_s < now {
                self.queued_actions_set.remove(&action_info);
                self.queued_actions.remove(&action_info);

nit: Add a warn message here saying it is being removed.


nativelink-scheduler/src/simple_scheduler.rs line 545 at r2 (raw file):

        let mut remove_actions = Vec::new();
        let running_actions = &mut self.active_actions.values().collect::<Vec<_>>();

I'm kinda confused on how this ever worked because I believe this is grabbing a reference to a temporary.

In any case you should be beable to just use:

for running_action in self.active_Actions.values() {

since it is not modifying it in the loop.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1634 at r2 (raw file):

        .await?;

        // Drop our receiver

nit: period.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1641 at r2 (raw file):

        tokio::task::yield_now().await;

        // Sleep for longer than disconnect_timeout_s

nit: period.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1642 at r2 (raw file):

        // Sleep for longer than disconnect_timeout_s
        let _ = sleep(Duration::from_secs(DISCONNECT_TIMEOUT_S + 1)).await;

We should never call native sleep() in tests. It's generally considered bad practice. In this case you probably want to hijack the time function inside the scheduler.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1652 at r2 (raw file):

        }

        // Setup a second action so matching engine is scheduled to rerun

nit: period.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1665 at r2 (raw file):

        tokio::task::yield_now().await;

        // Check to make sure that the action was removed

nit: period.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1669 at r2 (raw file):

            .find_existing_action(&unique_qualifier)
            .await
            .is_none(),);

nit: missing message.

Copy link
Collaborator

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), macos-13, ubuntu-22.04, zig-cc ubuntu-20.04, zig-cc ubuntu-22.04


nativelink-scheduler/src/simple_scheduler.rs line 77 at r2 (raw file):

    /// Updated on every client connect and periodically while it has listeners.
    last_update_timestamp: Mutex<u64>

I'll help you with this when this PR is closer to being done.

However, Mutex<(any primitive)> is silly, since Atomic{Primitive} is always better (I can't think of any reason it wouldn't be faster/better, since it's non-blocking and sometimes has hardware support).

@zbirenbaum zbirenbaum force-pushed the action-listeners branch 6 times, most recently from 307cbb4 to 2e287e5 Compare April 4, 2024 23:40
Copy link
Contributor Author

@zbirenbaum zbirenbaum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Analyze (javascript-typescript), Publish image, Vercel, docker-compose-compiles-nativelink (20.04), pre-commit-checks, ubuntu-20.04 / stable, ubuntu-22.04, vale, and 7 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 77 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

I'll help you with this when this PR is closer to being done.

However, Mutex<(any primitive)> is silly, since Atomic{Primitive} is always better (I can't think of any reason it wouldn't be faster/better, since it's non-blocking and sometimes has hardware support).

I changed it to be AtomicU64 and it works great


nativelink-scheduler/src/simple_scheduler.rs line 471 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

Make this let queued_actions = self.queued_actions.iter().rev().cloned().collect(). This will allow you to then use:

for (action_info, action) in queued_actions {

And you won't need to deal with unwrap and such.

let queued_actions = self.queued_actions.iter().rev().cloned().collect() gives a type error. I tried a bunch of different things to get it to work but couldn't figure it out.


nativelink-scheduler/src/simple_scheduler.rs line 475 at r2 (raw file):

Previously, adam-singer (Adam Singer) wrote…

Would it make a difference if we captured system time now before the loop?

Do_try_match is the slowest part of the scheduler. If we capture it before the loop and it takes a substantial amount of time to complete, it might fail to time out some actions until the next pass.

Whether or not that it acceptable is something I'm not sure about


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: semi-colon.

Done.


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

I really feel that we should be able to detect when clients disconnect and set it then.

I think we discussed this previously and decided that putting this in do_try_match was preferable to creating a new spawn which iterates to check and sleeps. Is there another method of checking when the client drops?


nativelink-scheduler/src/simple_scheduler.rs line 480 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: Add a warn message here saying it is being removed.

Done.


nativelink-scheduler/src/simple_scheduler.rs line 545 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

I'm kinda confused on how this ever worked because I believe this is grabbing a reference to a temporary.

In any case you should be beable to just use:

for running_action in self.active_Actions.values() {

since it is not modifying it in the loop.

Done.


nativelink-scheduler/src/simple_scheduler.rs line 548 at r2 (raw file):

Previously, adam-singer (Adam Singer) wrote…

grab now before the else/if and having it recalled

let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();

Done.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1634 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: period.

Done.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1641 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: period.

Done.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1642 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

We should never call native sleep() in tests. It's generally considered bad practice. In this case you probably want to hijack the time function inside the scheduler.

I changed it so that it calls a function which sets the last_update_timestamp instead of sleeping.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1652 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: period.

Done.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1665 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: period.

Done.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1669 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: missing message.

Done.

Copy link
Contributor

@adam-singer adam-singer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r4, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, Cargo Dev / macos-13, Remote / large-ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), macos-13, windows-2022 / stable, zig-cc ubuntu-20.04, zig-cc ubuntu-22.04, and 7 discussions need to be resolved

Copy link
Collaborator

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and 9 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 77 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

I changed it to be AtomicU64 and it works great

Why Arc?


nativelink-scheduler/src/simple_scheduler.rs line 471 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

let queued_actions = self.queued_actions.iter().rev().cloned().collect() gives a type error. I tried a bunch of different things to get it to work but couldn't figure it out.

Did you instruct .collect() what the output container type was? Remember .collect() does not know what the type of the output will be, it just implements a trait that many containers implement. In this case you can use something like:

let queued_actions: Vec<(ActionInfo, Action)> = self.queued_actions.iter().rev().cloned().collect()

nativelink-scheduler/src/simple_scheduler.rs line 475 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

Do_try_match is the slowest part of the scheduler. If we capture it before the loop and it takes a substantial amount of time to complete, it might fail to time out some actions until the next pass.

Whether or not that it acceptable is something I'm not sure about

I would not over optimize that. if .do_try_match is taking more than 1 second we are in serious trouble.


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

I think we discussed this previously and decided that putting this in do_try_match was preferable to creating a new spawn which iterates to check and sleeps. Is there another method of checking when the client drops?

Yeah, the cleanup happens here, but flagging when clients disconnect can/should happen when the event happens.


nativelink-scheduler/src/simple_scheduler.rs line 60 at r4 (raw file):

const DEFAULT_MAX_JOB_RETRIES: usize = 3;

/// Default timeout for actions without any listeners

nit: period.


nativelink-scheduler/src/simple_scheduler.rs line 363 at r4 (raw file):

    }

    fn set_action_last_update_for_test(

nit: Inline this below, we want to limit the number of these floating around. These functions are generally allowed to be more ugly, since they are not supposed to be public facing.


nativelink-scheduler/src/simple_scheduler.rs line 473 at r4 (raw file):

            // add update to queued action update timestamp here
            let action = self.queued_actions.get_mut(&action_info).unwrap();
            let now = SystemTime::now()

Using SystemTime inlined like this will make testing really difficult. I suggest making a constructor that you can pass in the time function.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1642 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

I changed it so that it calls a function which sets the last_update_timestamp instead of sleeping.

But we are back to the original problem since we are relying on wall time because we are now using SystemTime.

Copy link
Contributor Author

@zbirenbaum zbirenbaum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and 9 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 77 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

Why Arc?

I wasn't sure where we landed on when to update this. I can take it out for now but it will need to be an Arc if we are updating this from another thread that detects when the user disconnects.


nativelink-scheduler/src/simple_scheduler.rs line 471 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

Did you instruct .collect() what the output container type was? Remember .collect() does not know what the type of the output will be, it just implements a trait that many containers implement. In this case you can use something like:

let queued_actions: Vec<(ActionInfo, Action)> = self.queued_actions.iter().rev().cloned().collect()

I did, I tried all the following and more but none of them worked:
let queued_actions: Vec<(ActionInfo, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(&ActionInfo, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(ActionInfo, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(&ActionInfo, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(Arc<ActionInfo>, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(Arc<ActionInfo>, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect(); let queued_actions: Vec<(&Arc, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect(); let queued_actions: Vec<(&Arc, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();`

Here's an example of the full error:
│  the method collect exists for struct Cloned<Rev<Iter<'_, Arc<ActionInfo>, AwaitedAction>>>, but its trait bounds were not satisfied rustc (E0599) [456, 106]
│ the full type name has been written to '/Users/zach/Development/nativelink/target/debug/deps/nativelink_scheduler-1259087a177e1a06.long-type-1362048726443463511.txt'
│ the following trait bounds were not satisfied:
<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>> as std::iter::Iterator>::Item = &_
│ which is required by std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│ which is required by &mut std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│  the method collect exists for struct Cloned<Rev<Iter<'_, Arc<ActionInfo>, AwaitedAction>>>, but its trait bounds were not satisfied rustc (E0599) [456, 106]
│ the full type name has been written to '/Users/zach/Development/nativelink/target/debug/deps/nativelink_scheduler-d0f178a39f61bd0a.long-type-2575323896278722321.txt'
│ the following trait bounds were not satisfied:
<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>> as std::iter::Iterator>::Item = &_
│ which is required by std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│ which is required by &mut std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│  the method call chain might not have had the expected associated types rustc (E0271) [456, 84]


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

Yeah, the cleanup happens here, but flagging when clients disconnect can/should happen when the event happens.

How/where would we detect that the client disconnect has occurred? I was under the impression the only way to do this was to have a spawn which loops over the actions to check their receiver count.

I thought with disconnects it isn't possible to know until you request or try to send something to the other side


nativelink-scheduler/tests/simple_scheduler_test.rs line 1642 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

But we are back to the original problem since we are relying on wall time because we are now using SystemTime.

I'm a bit confused on a couple things here.

Where is the time function set/used in the scheduler? I see SystemTime used other places inside of it but not a now_fn like RunningActionsManagerImpl has.

I don't mind changing it to use that structure, but I just want to make sure that's something we want to include in this PR.

I'm also very confused about why we are relying on wall time here, the actual time doesn't matter at all unless I'm missing something huge.

The comparison done in the code to determine whether or not to time out is:

// If this is true the action is marked as old
running_action.get_last_update_timestamp() + self.disconnect_timeout_s < now

where:
running_action.get_last_update_timestamp() returns some uint
now is the current time since unix epoch in seconds
self.disconnect_timeout_s is a value like 1

if we set running_action.get_last_update_timestamp() to be SystemTime::now() - (self.disconnect_timeout_s+1) then even if no time at all elapses between the first call to SystemTime in the test and the second one in do_try_match we end up with:

now - (disconnect_timeout_s-1) + disconnect_timeout_s < now = -1 < 0 which will always return true

If the thought is that something like daylight savings or some system setting change could mess up the system time between the two calls to this test, we could just set the last_update_timestamp to 0 which would make it

January 1st, 1970 + disconnect_timeout_s < now

Copy link
Contributor Author

@zbirenbaum zbirenbaum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Analyze (javascript-typescript), Bazel Dev / ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), pre-commit-checks, ubuntu-20.04 / stable, vale, and 8 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 363 at r4 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: Inline this below, we want to limit the number of these floating around. These functions are generally allowed to be more ugly, since they are not supposed to be public facing.

Done.

Copy link
Collaborator

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), macos-13, ubuntu-22.04, zig-cc ubuntu-20.04, zig-cc ubuntu-22.04, and 8 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 77 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

I wasn't sure where we landed on when to update this. I can take it out for now but it will need to be an Arc if we are updating this from another thread that detects when the user disconnects.

If we do that we will change the type. In this case we are not even sure how the future will end up, so keep it as simple as possible for now (no Arc).


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

How/where would we detect that the client disconnect has occurred? I was under the impression the only way to do this was to have a spawn which loops over the actions to check their receiver count.

I thought with disconnects it isn't possible to know until you request or try to send something to the other side

Use scope_guard to detect when the stream is dropped in execution_server::to_execute_stream() that then calls a function on the scheduler to let the scheduler know that a client disconnected. The function would then take check if it's the last one subscribed and then the time.

Btw, this will make testing significantly easier with these public functions.

@zbirenbaum zbirenbaum force-pushed the action-listeners branch 2 times, most recently from 57bd35a to 624eda2 Compare April 10, 2024 01:15
Copy link
Contributor

@adam-singer adam-singer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 10 of 11 files at r6, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained, and 12 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 837 at r6 (raw file):

#[async_trait]
impl ActionScheduler for SimpleScheduler {
    fn notify_client_disconnected(&self, unique_qualifier: ActionInfoHashKey) {

When short returning in some of the let/else expressions, should we debug/warn log, it seems in some of the cases it might be useful to know that we are hitting what seems unexpected state or racing states?


nativelink-scheduler/src/simple_scheduler.rs line 837 at r6 (raw file):

#[async_trait]
impl ActionScheduler for SimpleScheduler {
    fn notify_client_disconnected(&self, unique_qualifier: ActionInfoHashKey) {

We should be incrementing counters on different states here, not sure if a counter on notify_client_disconnected is useful (could be to match if we count successful removals from queues), we should counter on inner.queued_actions* removals and inner.active_actions.remove.


nativelink-scheduler/src/simple_scheduler.rs line 859 at r6 (raw file):

        let weak_inner = Arc::downgrade(&self.inner);

        tokio::spawn(async move {

I think we should explain (comments) why we are spawning at this point and what the next stage of removing from queues means.


nativelink-scheduler/src/simple_scheduler.rs line 899 at r6 (raw file):

                        return;
                    };
                    // TODO: Send kill on worker signal - PR: #842.

Couldn't we just call inner.active_actions.remove(&unique_qualifier); since the match would fall through and return?

Copy link
Collaborator

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think Instant is the proper approach. It also solves a bit of other issues with importing SysemTime.

Reviewed all commit messages.
Reviewable status: 0 of 1 LGTMs obtained, and 12 discussions need to be resolved

Copy link
Contributor Author

@zbirenbaum zbirenbaum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dismissed @adam-singer and @allada from 4 discussions.
Reviewable status: 0 of 1 LGTMs obtained, and 7 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 77 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

If we do that we will change the type. In this case we are not even sure how the future will end up, so keep it as simple as possible for now (no Arc).

Done. The var no longer exists so dismissing this.


nativelink-scheduler/src/simple_scheduler.rs line 471 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

I did, I tried all the following and more but none of them worked:
let queued_actions: Vec<(ActionInfo, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(&ActionInfo, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(ActionInfo, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(&ActionInfo, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(Arc<ActionInfo>, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();
let queued_actions: Vec<(Arc<ActionInfo>, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect(); let queued_actions: Vec<(&Arc, AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect(); let queued_actions: Vec<(&Arc, &AwaitedAction)> = self.queued_actions.iter().rev().cloned().collect();`

Here's an example of the full error:
│  the method collect exists for struct Cloned<Rev<Iter<'_, Arc<ActionInfo>, AwaitedAction>>>, but its trait bounds were not satisfied rustc (E0599) [456, 106]
│ the full type name has been written to '/Users/zach/Development/nativelink/target/debug/deps/nativelink_scheduler-1259087a177e1a06.long-type-1362048726443463511.txt'
│ the following trait bounds were not satisfied:
<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>> as std::iter::Iterator>::Item = &_
│ which is required by std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│ which is required by &mut std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│  the method collect exists for struct Cloned<Rev<Iter<'_, Arc<ActionInfo>, AwaitedAction>>>, but its trait bounds were not satisfied rustc (E0599) [456, 106]
│ the full type name has been written to '/Users/zach/Development/nativelink/target/debug/deps/nativelink_scheduler-d0f178a39f61bd0a.long-type-2575323896278722321.txt'
│ the following trait bounds were not satisfied:
<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>> as std::iter::Iterator>::Item = &_
│ which is required by std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│ which is required by &mut std::iter::Cloned<std::iter::Rev<std::collections::btree_map::Iter<'_, std::sync::Arc<nativelink_util::action_messages::ActionInfo>, simple_scheduler::AwaitedAction>>>: std::iter::Iterator
│  the method call chain might not have had the expected associated types rustc (E0271) [456, 84]

Done. do_try_match no longer has any edits and leaving this how it works on main so dismissing this.


nativelink-scheduler/src/simple_scheduler.rs line 475 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

I would not over optimize that. if .do_try_match is taking more than 1 second we are in serious trouble.

Done. do_try_match no longer has any edits and leaving this how it works on main so dismissing this.


nativelink-scheduler/src/simple_scheduler.rs line 477 at r2 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

Use scope_guard to detect when the stream is dropped in execution_server::to_execute_stream() that then calls a function on the scheduler to let the scheduler know that a client disconnected. The function would then take check if it's the last one subscribed and then the time.

Btw, this will make testing significantly easier with these public functions.

Done.


nativelink-scheduler/src/simple_scheduler.rs line 548 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

Done.

Done. do_try_match no longer has any edits and leaving this how it works on main so dismissing this.


nativelink-scheduler/src/simple_scheduler.rs line 60 at r4 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

nit: period.

Done.


nativelink-scheduler/src/simple_scheduler.rs line 473 at r4 (raw file):

Previously, allada (Nathan (Blaise) Bruer) wrote…

Using SystemTime inlined like this will make testing really difficult. I suggest making a constructor that you can pass in the time function.

Done.

@zbirenbaum zbirenbaum force-pushed the action-listeners branch 2 times, most recently from 1bbc496 to 825aa79 Compare April 10, 2024 20:16
Copy link
Contributor

@adam-singer adam-singer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 1 of 1 files at r7, all commit messages.
Reviewable status: 1 of 1 LGTMs obtained, and 4 discussions need to be resolved

Copy link
Collaborator

@allada allada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 1 of 1 LGTMs obtained, and 3 discussions need to be resolved


nativelink-scheduler/src/simple_scheduler.rs line 485 at r7 (raw file):

            self.queued_actions.keys().rev().cloned().collect();
        for action_info in action_infos {
            // add update to queued action update timestamp here

don't forget this comment.


nativelink-scheduler/tests/simple_scheduler_test.rs line 1642 at r2 (raw file):

Previously, zbirenbaum (Zach Birenbaum) wrote…

I'm a bit confused on a couple things here.

Where is the time function set/used in the scheduler? I see SystemTime used other places inside of it but not a now_fn like RunningActionsManagerImpl has.

I don't mind changing it to use that structure, but I just want to make sure that's something we want to include in this PR.

I'm also very confused about why we are relying on wall time here, the actual time doesn't matter at all unless I'm missing something huge.

The comparison done in the code to determine whether or not to time out is:

// If this is true the action is marked as old
running_action.get_last_update_timestamp() + self.disconnect_timeout_s < now

where:
running_action.get_last_update_timestamp() returns some uint
now is the current time since unix epoch in seconds
self.disconnect_timeout_s is a value like 1

if we set running_action.get_last_update_timestamp() to be SystemTime::now() - (self.disconnect_timeout_s+1) then even if no time at all elapses between the first call to SystemTime in the test and the second one in do_try_match we end up with:

now - (disconnect_timeout_s-1) + disconnect_timeout_s < now = -1 < 0 which will always return true

If the thought is that something like daylight savings or some system setting change could mess up the system time between the two calls to this test, we could just set the last_update_timestamp to 0 which would make it

January 1st, 1970 + disconnect_timeout_s < now

If we move everything to Instant it will likely resolve all of these issues.

Implement scheduler side removal of actions with no listeners. Adds
disconnect_timeout_s configuration field with default of 60s. If the
client waiting on a given action is disconnected for longer than this
duration without reconnecting the scheduler will stop tracking it. This
does not remove it from the worker if the job has already been
dispatched.

fixes TraceMachina#338
Copy link
Contributor

@adam-singer adam-singer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 6 of 8 files at r8, all commit messages.
Reviewable status: 1 of 1 LGTMs obtained, and 3 discussions need to be resolved

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Zach Birenbaum seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Actions with no listeners that are old should be removed from queue
4 participants