Skip to content

Commit

Permalink
Include UUID in ActionState
Browse files Browse the repository at this point in the history
Changes the field used for identifying which action an ActionState
corresponds to include a uuid as well as the ActionInfoHashKey. All
existing functionality is kept the same by making use of the nested
ActionInfoHashKey contained within the Id. These changes will provide
the basis for all usage of Id in followup changes to the scheduler.
This breaks compatibility for forwarding an operation from one remote
execution system to another that does not use our operation name
format (ie: very unlikely, but possible).
  • Loading branch information
Zach Birenbaum committed May 29, 2024
1 parent 08e2f2e commit 8104c00
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 99 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::grpc_store::GrpcStore;
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState,
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, OperationId,
};
use nativelink_util::background_spawn;
use nativelink_util::common::DigestInfo;
Expand Down Expand Up @@ -130,12 +130,13 @@ impl ActionScheduler for CacheLookupScheduler {
&self,
action_info: ActionInfo,
) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
let id = OperationId::new(action_info.unique_qualifier.clone());
if action_info.skip_cache_lookup {
// Cache lookup skipped, forward to the upstream.
return self.action_scheduler.add_action(action_info).await;
}
let mut current_state = Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id,
stage: ActionStage::CacheCheck,
});
let (tx, rx) = watch::channel(current_state.clone());
Expand Down Expand Up @@ -172,7 +173,7 @@ impl ActionScheduler for CacheLookupScheduler {
Pin::new(ac_store.as_ref()),
*action_digest,
instance_name,
current_state.unique_qualifier.digest_function,
current_state.id.unique_qualifier.digest_function,
)
.await
{
Expand Down
18 changes: 13 additions & 5 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata, OperationId
};
use nativelink_util::metrics_utils::{
AsyncCounterWrapper, Collector, CollectorState, CounterWithTime, FuncCounterWrapper,
Expand Down Expand Up @@ -182,22 +182,29 @@ struct CompletedAction {

impl Hash for CompletedAction {
fn hash<H: Hasher>(&self, state: &mut H) {
ActionInfoHashKey::hash(&self.state.unique_qualifier, state);
OperationId::hash(&self.state.id, state);
}
}

impl PartialEq for CompletedAction {
fn eq(&self, other: &Self) -> bool {
ActionInfoHashKey::eq(&self.state.unique_qualifier, &other.state.unique_qualifier)
OperationId::eq(&self.state.id, &other.state.id)
}
}

impl Eq for CompletedAction {}

impl Borrow<OperationId> for CompletedAction {
#[inline]
fn borrow(&self) -> &OperationId {
&self.state.id
}
}

impl Borrow<ActionInfoHashKey> for CompletedAction {
#[inline]
fn borrow(&self) -> &ActionInfoHashKey {
&self.state.unique_qualifier
&self.state.id.unique_qualifier
}
}

Expand Down Expand Up @@ -291,9 +298,10 @@ impl SimpleSchedulerImpl {
self.metrics.add_action_new_action_created.inc();
// Action needs to be added to queue or is not cacheable.
let action_info = Arc::new(action_info);
let id = OperationId::new(action_info.unique_qualifier.clone());

let current_state = Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id,
stage: ActionStage::Queued,
});

Expand Down
15 changes: 9 additions & 6 deletions nativelink-scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use nativelink_proto::google::longrunning::{operation, Operation};
use nativelink_proto::google::rpc::Status;
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
OperationId,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
Expand All @@ -44,13 +45,15 @@ mod action_messages_tests {

#[nativelink_test]
async fn action_state_any_url_test() -> Result<(), Error> {
let unique_qualifier = ActionInfoHashKey {
instance_name: "foo_instance".to_string(),
digest_function: DigestHasherFunc::Sha256,
digest: DigestInfo::new([1u8; 32], 5),
salt: 0,
};
let id = OperationId::new(unique_qualifier);
let action_state = ActionState {
unique_qualifier: ActionInfoHashKey {
instance_name: "foo_instance".to_string(),
digest_function: DigestHasherFunc::Sha256,
digest: DigestInfo::new([1u8; 32], 5),
salt: 0,
},
id,
// Result is only populated if has_action_result.
stage: ActionStage::Completed(ActionResult::default()),
};
Expand Down
6 changes: 4 additions & 2 deletions nativelink-scheduler/tests/cache_lookup_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use nativelink_scheduler::action_scheduler::ActionScheduler;
use nativelink_scheduler::cache_lookup_scheduler::CacheLookupScheduler;
use nativelink_scheduler::platform_property_manager::PlatformPropertyManager;
use nativelink_store::memory_store::MemoryStore;
use nativelink_util::action_messages::{ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use nativelink_util::action_messages::{
ActionInfoHashKey, ActionResult, ActionStage, ActionState, OperationId,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::store_trait::Store;
Expand Down Expand Up @@ -97,7 +99,7 @@ mod cache_lookup_scheduler_tests {
.await?;
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::Queued,
}));
let mut skip_cache_action = action_info.clone();
Expand Down
12 changes: 6 additions & 6 deletions nativelink-scheduler/tests/property_modifier_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use nativelink_macro::nativelink_test;
use nativelink_scheduler::action_scheduler::ActionScheduler;
use nativelink_scheduler::platform_property_manager::PlatformPropertyManager;
use nativelink_scheduler::property_modifier_scheduler::PropertyModifierScheduler;
use nativelink_util::action_messages::{ActionInfoHashKey, ActionStage, ActionState};
use nativelink_util::action_messages::{ActionInfoHashKey, ActionStage, ActionState, OperationId};
use nativelink_util::common::DigestInfo;
use nativelink_util::platform_properties::PlatformPropertyValue;
use tokio::sync::watch;
Expand Down Expand Up @@ -74,7 +74,7 @@ mod property_modifier_scheduler_tests {
let action_info = make_base_action_info(UNIX_EPOCH);
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::Queued,
}));
let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([(
Expand Down Expand Up @@ -114,7 +114,7 @@ mod property_modifier_scheduler_tests {
.insert(name.clone(), PlatformPropertyValue::Unknown(original_value));
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::Queued,
}));
let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([(
Expand Down Expand Up @@ -151,7 +151,7 @@ mod property_modifier_scheduler_tests {
let action_info = make_base_action_info(UNIX_EPOCH);
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::Queued,
}));
let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([(
Expand Down Expand Up @@ -188,7 +188,7 @@ mod property_modifier_scheduler_tests {
let action_info = make_base_action_info(UNIX_EPOCH);
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::Queued,
}));
let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([(
Expand Down Expand Up @@ -223,7 +223,7 @@ mod property_modifier_scheduler_tests {
.insert(name, PlatformPropertyValue::Unknown(value));
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
unique_qualifier: action_info.unique_qualifier.clone(),
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::Queued,
}));
let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::new()));
Expand Down
Loading

0 comments on commit 8104c00

Please sign in to comment.