Skip to content

Commit

Permalink
feat: client releasable slots (#476)
Browse files Browse the repository at this point in the history
* feat: add release slot proto

* feat: add semaphore release state and methods

* feat: go sdk and example

* docs: manual slot release

* chore: linting

* fix: broken test

* fix: unlink step run on manual release

* feat: release slot event

* fix: test

* fix: revert e2e test changes

* chore: remove debug line

* fix: place step run query in same tx

* fix: change migration release version

---------

Co-authored-by: Alexander Belanger <[email protected]>
  • Loading branch information
grutt and abelanger5 committed May 14, 2024
1 parent 9875ef8 commit 48d06b9
Show file tree
Hide file tree
Showing 24 changed files with 665 additions and 235 deletions.
9 changes: 9 additions & 0 deletions api-contracts/dispatcher/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ service Dispatcher {
rpc PutOverridesData(OverridesData) returns (OverridesDataResponse) {}

rpc Unsubscribe(WorkerUnsubscribeRequest) returns (WorkerUnsubscribeResponse) {}

rpc ReleaseSlot(ReleaseSlotRequest) returns (ReleaseSlotResponse) {}
}

message WorkerRegisterRequest {
Expand Down Expand Up @@ -291,3 +293,10 @@ message HeartbeatRequest {
}

message HeartbeatResponse {}

message ReleaseSlotRequest {
// the id of the step run to release
string stepRunId = 1;
}

message ReleaseSlotResponse {}
1 change: 1 addition & 0 deletions api-contracts/openapi/components/schemas/workflow_run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ StepRunEventReason:
- FAILED
- RETRYING
- CANCELLED
- SLOT_RELEASED

StepRunEventSeverity:
type: string
Expand Down
199 changes: 100 additions & 99 deletions api/v1/server/oas/gen/openapi.gen.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions frontend/app/src/lib/api/generated/data-contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ export enum StepRunEventReason {
FAILED = "FAILED",
RETRYING = "RETRYING",
CANCELLED = "CANCELLED",
SLOT_RELEASED = "SLOT_RELEASED",
}

export enum StepRunEventSeverity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ const REASON_TO_TITLE: Record<StepRunEventReason, string> = {
[StepRunEventReason.REQUEUED_NO_WORKER]: 'Requeueing (no worker available)',
[StepRunEventReason.REQUEUED_RATE_LIMIT]: 'Requeueing (rate limit)',
[StepRunEventReason.SCHEDULING_TIMED_OUT]: 'Scheduling timed out',
[StepRunEventReason.SLOT_RELEASED]: 'Slot released',
};

function getTitleFromReason(reason: StepRunEventReason, message: string) {
Expand All @@ -99,7 +100,7 @@ function renderCardFooter(event: StepRunEvent) {
if (event.data) {
const data = event.data as any;

if (event.reason == StepRunEventReason.ASSIGNED && data.worker_id) {
if (data.worker_id) {
return (
<CardFooter>
<Link to={`/workers/${data.worker_id}`}>
Expand Down
3 changes: 2 additions & 1 deletion frontend/docs/pages/home/features/_meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
"streaming": "Streaming",
"triggering-runs": "Triggering Runs",
"rate-limits": "Global Rate Limits",
"additional-metadata": "Additional Metadata"
"additional-metadata": "Additional Metadata",
"advanced": "Advanced"
}
3 changes: 3 additions & 0 deletions frontend/docs/pages/home/features/advanced/_meta.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"manual-slot-release": "Manual Slot Release"
}
89 changes: 89 additions & 0 deletions frontend/docs/pages/home/features/advanced/manual-slot-release.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";

# Manual Slot Release

The Hatchet execution model sets a number of available slots for running steps in a workflow. When a step is running, it occupies a slot, and if a worker has no available slots, it will not be able to run any more steps concurrently.

In some cases, you may have a step in your workflow that is resource-intensive and requires exclusive access to a shared resource, such as a database connection or a GPU compute instance. To ensure that other steps in the workflow can run concurrently, you can manually release the slot after the resource-intensive step has completed, but the step still has non-resource-intensive work to do (i.e. upload or cleanup).

<Callout type="warning">
This is an advanced feature and should be used with caution. Manually
releasing the slot can have unintended side effects on system performance and
concurrency.
</Callout>

## Using Manual Slot Release

You can manually release a slot in from within a running step in your Workflow using the Hatchet context param:

<Tabs items={['Python', 'Typescript', 'Go']}>
<Tabs.Tab>
```python
@hatchet.workflow(on_events=["user:create"])
@hatchet.step()
def step1(self, context: Context):
print('RESOURCE INTENSIVE PROCESS')
time.sleep(10)
# Release the slot after the resource-intensive process, so that other steps can run
context.release_slot()
print("NON RESOURCE INTENSIVE PROCESS")
return {"status": "success"}
```
</Tabs.Tab>

<Tabs.Tab>
```typescript
const workflow: Workflow = {
// ... other workflow properties
steps: [
{
name: 'step1',
run: async (ctx) => {
console.log('RESOURCE INTENSIVE PROCESS...');
await sleep(5000);
// Release the slot after the resource-intensive process, so that other steps can run
await ctx.releaseSlot();
console.log('NON RESOURCE INTENSIVE PROCESS...');
return { step1: 'step1 results!' };
},
},
],
};

````

</Tabs.Tab>
<Tabs.Tab>
```go
func StepOne(ctx worker.HatchetContext) (result \*stepOneOutput, err error) {
fmt.Println("RESOURCE INTENSIVE PROCESS")
time.Sleep(10 * time.Second)
// Release the slot after the resource-intensive process, so that other steps can run
ctx.ReleaseSlot()
fmt.Println("NON RESOURCE INTENSIVE PROCESS")
return &stepOneOutput{
Message: "step1 results",
}, nil
},
````

</Tabs.Tab>
</Tabs>

In the above examples, the release_slot() method is called after the resource-intensive process has completed. This allows other steps in the workflow to start executing while the current step continues with non-resource-intensive tasks.

<Callout type="info">
Manually releasing the slot does not terminate the current step. The step will
continue executing until it completes or encounters an error.
</Callout>

## Use Cases

Some common use cases for Manual Slot Release include:

- Performing data processing or analysis that requires significant CPU, GPU, or memory resources
- Acquiring locks or semaphores to access shared resources
- Executing long-running tasks that don't need to block other steps after some initial work is done

By utilizing Manual Slot Release, you can optimize the concurrency and resource utilization of your workflows, allowing multiple steps to run in parallel when possible.
2 changes: 2 additions & 0 deletions internal/repository/prisma/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/repository/prisma/dbsqlc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ CREATE TYPE "JobRunStatus" AS ENUM ('PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED',
CREATE TYPE "LogLineLevel" AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR');

-- CreateEnum
CREATE TYPE "StepRunEventReason" AS ENUM ('REQUEUED_NO_WORKER', 'REQUEUED_RATE_LIMIT', 'SCHEDULING_TIMED_OUT', 'ASSIGNED', 'STARTED', 'FINISHED', 'FAILED', 'RETRYING', 'CANCELLED');
CREATE TYPE "StepRunEventReason" AS ENUM ('REQUEUED_NO_WORKER', 'REQUEUED_RATE_LIMIT', 'SCHEDULING_TIMED_OUT', 'ASSIGNED', 'STARTED', 'FINISHED', 'FAILED', 'RETRYING', 'CANCELLED', 'SLOT_RELEASED');

-- CreateEnum
CREATE TYPE "StepRunEventSeverity" AS ENUM ('INFO', 'WARNING', 'CRITICAL');
Expand Down Expand Up @@ -355,6 +355,7 @@ CREATE TABLE "StepRun" (
"callerFiles" JSONB,
"gitRepoBranch" TEXT,
"retryCount" INTEGER NOT NULL DEFAULT 0,
"semaphoreReleased" BOOLEAN NOT NULL DEFAULT false,

CONSTRAINT "StepRun_pkey" PRIMARY KEY ("id")
);
Expand Down
3 changes: 2 additions & 1 deletion internal/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ SET
WHEN sqlc.narg('rerun')::boolean THEN NULL
ELSE COALESCE(sqlc.narg('cancelledReason')::text, "cancelledReason")
END,
"retryCount" = COALESCE(sqlc.narg('retryCount')::int, "retryCount")
"retryCount" = COALESCE(sqlc.narg('retryCount')::int, "retryCount"),
"semaphoreReleased" = COALESCE(sqlc.narg('semaphoreReleased')::boolean, "semaphoreReleased")
WHERE
"id" = @id::uuid AND
"tenantId" = @tenantId::uuid
Expand Down
30 changes: 19 additions & 11 deletions internal/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/repository/prisma/dbsqlc/tickers.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 48d06b9

Please sign in to comment.