-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] scheduler for running operations subsequently #1095
base: master
Are you sure you want to change the base?
Conversation
Unit Test Results26 tests - 321 23 ✔️ - 318 15s ⏱️ - 15m 22s For more details on these failures and errors, see this check. Results for commit 9559e47. ± Comparison against base commit d516052. This pull request removes 340 and adds 19 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
@Override | ||
public OperationTask get(long id, @Nullable TransactionHandle tx) throws SQLException { | ||
return DbOperation.execute(tx, storage, c -> { | ||
try (PreparedStatement ps = c.prepareStatement(SELECT_QUERY)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we add FOR UPDATE
if tx
is not null?
in common scenario we do the next:
var tx = start_tx();
var some_state = dao.get(tx);
... some business logic ...
dao.update(new_state, tx); <-- simple UPDATE, not CAS
tx.commit();
if we do simple UPDATE in tx, then we should add FOR UPDATE
to our SELECT query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's necessary because we don't have long-lasting transactions that require read-and-update. It may be useful, of course, if we want to ensure that operation_task hasn't been updated by other instance (in case of parallel execution which is not desirable). So I'll revise the code and think about this problem
|
||
public MountDynamicDiskResolver(VmDao vmDao, DynamicMountDao dynamicMountDao, AllocationContext allocationContext, | ||
OperationTaskDao operationTaskDao, OperationTaskScheduler taskScheduler, | ||
Duration leaseDuration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaseDuration
is not a bean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, but it's just an example. It still requires fixes for circular dependencies and this configuration
@@ -0,0 +1,21 @@ | |||
CREATE TYPE task_status AS ENUM ('PENDING', 'RUNNING', 'FAILED', 'FINISHED', 'STALE'); | |||
|
|||
CREATE TYPE task_type AS ENUM ('UNMOUNT', 'MOUNT'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's should be one type per one action. So this enum could be extended in future migrations to support new types of actions.
|
||
CREATE TYPE task_type AS ENUM ('UNMOUNT', 'MOUNT'); | ||
|
||
CREATE TABLE IF NOT EXISTS operation_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main idea is to provide DB as a single source of truth about order of task execution.
Here's short explaination of operation_task
fields:
id
- is bigserial and thus generated on insert of task. This is the main way to present order among certain tasks (seeentity_id
).name
- for debug and readability purposesentity_id
- this is the way to group tasks by some user-generated text id. Tasks with sameentity_id
are executed subsequently according theid
field (in ascending order). Thus, task with smaller id will be executed first. Tasks with differententity_id
can be executed in parallel.type
- is necessary to match code representation of a taskstatus
- status of a task.created_at
,updated_at
- self-explainatory, for debug purposesmetadata
- JSON to keep task arguments and other useful information about the task. The content of this field is defined by user and parsed mainly depending by the type.operation_id
- an operation that is linked to a task. Contains all details about execution. There should be(0-1) <-> 1
relation between a task and an operation.worker_id
- name of the instance that captured a task. This is needed to ensure that a task is executed just once.lease_till
- deadline for scheduler instance to execute this task. Scheduler instance should updatelease_till
field. In case of instance death or any other reason that make instance impossible to finish a task, another scheduler instance can "capture" the task with expiredlease_till
deadline and replaceworker_id
field.
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public class DispatchingOperationTaskResolver implements OperationTaskResolver { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task resolver that can accept a list of different typed resolver to choose resolver by a task type.
|
||
import static ai.lzy.model.db.DbHelper.withRetries; | ||
|
||
public abstract class OpTaskAwareAction extends OperationRunnerBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type of action that is connected to a task. All inheritants of this class will be executed by task scheduler.
} | ||
|
||
@Override | ||
protected void beforeStep() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New step in operation execution to update task lease deadline
} | ||
|
||
@Override | ||
protected void notifyFinished() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task should be moved to a final status on operation finish
metadata, operationId, null, null); | ||
} | ||
|
||
public enum Status { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assumed workflow:
┌─────────┐ ┌─────────┐ ┌──────────┐
│ PENDING ├─────► RUNNING ├──────► FINISHED │
└────┬────┘ └────┬────┘ └──────────┘
│ │
│ │
┌───▼───┐ ┌───▼────┐
│ STALE │ │ FAILED │
└───────┘ └────────┘
|
||
import java.sql.SQLException; | ||
|
||
public interface OperationTaskResolver { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Component that is used to match and create code representation to a task from DB.
No description provided.