Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[add] detect and recover from kernel auto restarts #5558

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelogs/current_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Provide a bulleted list of breaking changes and a reference to the PR(s) contain

Provide a bulleted list of new features or improvements and a reference to the PR(s) containing these changes.

- Added support to detect and recover from Kernel auto-restarts for Jupyter. ([PR5558](https://github.com/nteract/nteract/pull/5558))

#### Bug Fixes

Provide a bulleted list of bug fixes and a reference to the PR(s) containing the changes.
Expand Down
3 changes: 3 additions & 0 deletions packages/actions/src/actionTypes/kernel_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const LAUNCH_KERNEL_FAILED = "LAUNCH_KERNEL_FAILED";
export const SHUTDOWN_REPLY_SUCCEEDED = "SHUTDOWN_REPLY_SUCCEEDED";
export const SHUTDOWN_REPLY_TIMED_OUT = "SHUTDOWN_REPLY_TIMED_OUT";
export const DISPOSE_KERNEL = "DISPOSE_KERNEL";
export const KERNEL_AUTO_RESTARTED = "KERNEL_AUTO_RESTARTED";

export type InterruptKernel = Action <typeof INTERRUPT_KERNEL, MaybeHasContent & MaybeHasKernel>;
export type InterruptKernelSuccessful = Action <typeof INTERRUPT_KERNEL_SUCCESSFUL, MaybeHasContent & MaybeHasKernel>;
Expand All @@ -48,6 +49,7 @@ export type LaunchKernelFailed = ErrorAction<typeof LAUNCH_KERNEL_FAI
export type ShutdownReplySucceeded = Action <typeof SHUTDOWN_REPLY_SUCCEEDED, HasKernel & { content: { restart: boolean } }>;
export type ShutdownReplyTimedOut = Action <typeof SHUTDOWN_REPLY_TIMED_OUT, HasKernel>;
export type DisposeKernel = Action <typeof DISPOSE_KERNEL, HasKernel>;
export type KernelAutoRestarted = Action <typeof KERNEL_AUTO_RESTARTED, HasKernel>;

export const interruptKernel = makeActionFunction <InterruptKernel> (INTERRUPT_KERNEL);
export const interruptKernelSuccessful = makeActionFunction <InterruptKernelSuccessful> (INTERRUPT_KERNEL_SUCCESSFUL);
Expand All @@ -66,3 +68,4 @@ export const launchKernelFailed = makeErrorActionFunction <LaunchKerne
export const shutdownReplySucceeded = makeActionFunction <ShutdownReplySucceeded> (SHUTDOWN_REPLY_SUCCEEDED);
export const shutdownReplyTimedOut = makeActionFunction <ShutdownReplyTimedOut> (SHUTDOWN_REPLY_TIMED_OUT);
export const disposeKernel = makeActionFunction <DisposeKernel> (DISPOSE_KERNEL);
export const kernelAutoRestarted = makeActionFunction <KernelAutoRestarted> (KERNEL_AUTO_RESTARTED);
194 changes: 193 additions & 1 deletion packages/epics/__tests__/kernel-lifecycle.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
acquireKernelInfo,
launchKernelWhenNotebookSetEpic,
restartKernelEpic,
watchExecutionStateEpic
watchExecutionStateEpic,
watchForKernelAutoRestartEpic
} from "../src/kernel-lifecycle";

const buildScheduler = () =>
Expand Down Expand Up @@ -466,6 +467,197 @@ describe("watchExecutionStateEpic", () => {
});
});


describe("watchForKernelAutoRestartEpic", () => {
test("returns an empty Observable when not a jupyter host", done => {
const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: of({
header: { msg_type: "status" },
content: { execution_state: "restarting" }
},
{
header: { msg_type: "status" },
content: { execution_state: "starting" }
}) as Subject<any>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, this works nicely for tests. 😎

cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
const types = actions.map(({ type }) => type);
expect(types).toEqual([]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});

test("returns an Observable detecting auto restarted with valid states", done => {

const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: of({
header: { msg_type: "status" },
content: { execution_state: "restarting" }
},
{
header: { msg_type: "status" },
content: { execution_state: "starting" }
}) as Subject<any>,
cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({}),
app: stateModule.makeAppRecord({
host: stateModule.makeJupyterHostRecord({})
})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
const types = actions.map(({ type }) => type);
expect(types).toEqual([actionsModule.KERNEL_AUTO_RESTARTED]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});

test("returns an empty Observable when states are not valid", done => {
const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: of({
header: { msg_type: "status" },
content: { execution_state: "restarting" }
},
{
header: { msg_type: "status" },
content: { execution_state: "dead" }
},
{
header: { msg_type: "status" },
content: { execution_state: "starting" }
}) as Subject<any>,
cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({}),
app: stateModule.makeAppRecord({
host: stateModule.makeJupyterHostRecord({})
})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
const types = actions.map(({ type }) => type);
expect(types).toEqual([]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});

test("on kernel error returns executeFailed action", done => {
const sent = new Subject();
const received = new Subject();
received.hasError = true;

const mockSocket = Subject.create(sent, received);

const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: mockSocket,
cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({}),
app: stateModule.makeAppRecord({
host: stateModule.makeJupyterHostRecord({})
})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
expect(actions).toEqual([
{
type: actionsModule.EXECUTE_FAILED,
error: true,
payload: {
code: "EXEC_WEBSOCKET_ERROR",
contentRef: "fakeContentRef",
error: new Error(
"The WebSocket connection has unexpectedly disconnected."
)
}
}
]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});
});

describe("restartKernelEpic", () => {
test("work for outputHandling None", () => {
const contentRef = "contentRef";
Expand Down
5 changes: 4 additions & 1 deletion packages/epics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
acquireKernelInfoEpic,
launchKernelWhenNotebookSetEpic,
restartKernelEpic,
watchExecutionStateEpic
watchExecutionStateEpic,
watchForKernelAutoRestartEpic
} from "./kernel-lifecycle";
import { fetchKernelspecsEpic } from "./kernelspecs";
import {
Expand Down Expand Up @@ -51,6 +52,7 @@ const allEpics = [
killKernelEpic,
acquireKernelInfoEpic,
watchExecutionStateEpic,
watchForKernelAutoRestartEpic,
restartKernelEpic,
fetchKernelspecsEpic,
fetchContentEpic,
Expand Down Expand Up @@ -81,6 +83,7 @@ export {
killKernelEpic,
acquireKernelInfoEpic,
watchExecutionStateEpic,
watchForKernelAutoRestartEpic,
launchKernelWhenNotebookSetEpic,
restartKernelEpic,
fetchKernelspecsEpic,
Expand Down
73 changes: 72 additions & 1 deletion packages/epics/src/kernel-lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import {
childOf,
createMessage,
JupyterMessage,
ofMessageType
ofMessageType,
kernelStatuses,
kernelInfoRequest
} from "@nteract/messaging";
import { sendNotification } from "@nteract/mythic-notifications";
import { AnyAction } from "redux";
Expand All @@ -17,9 +19,11 @@ import {
first,
map,
mergeMap,
pairwise,
switchMap,
take,
takeUntil,
tap,
timeout
} from "rxjs/operators";

Expand Down Expand Up @@ -79,6 +83,73 @@ export const watchExecutionStateEpic = (
)
);

/**
* Jupyter has options to automatically restart the kernel on crash for a max-retry of 5 retries.
* Monitor for the kernel to have successfully restarted (sent as a "restarting" status followed by a "starting").
* If all 5 retries fail, the kernel status is reported as "dead".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*
* @oaram {ActionObservable} action$ ActionObservable for LAUNCH_KERNEL_SUCCESSFUL action
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oaram -> @param

*/
export const watchForKernelAutoRestartEpic = (
action$: Observable<
actions.NewKernelAction | actions.KillKernelSuccessful
>,
state$: StateObservable<AppState>
) =>
action$.pipe(
ofType(actions.LAUNCH_KERNEL_SUCCESSFUL),
// Only accept jupyter servers for the host with this epic
filter(() => selectors.isCurrentHostJupyter(state$.value)),
switchMap(
(action: actions.NewKernelAction | actions.KillKernelSuccessful) => {
const { kernel, kernelRef, contentRef } = (action as actions.NewKernelAction).payload;

return kernel.channels.pipe(
kernelStatuses(),
pairwise(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow this one is new to me, very cool.

filter(
([previousStatus, currentStatus]: [KernelStatus, KernelStatus]) =>
previousStatus === KernelStatus.Restarting && currentStatus === KernelStatus.Starting
),
tap(() => {
// to avoid getting stuck in the "starting" state, nudge kernel with kernel_info_request to bring the status to Idle.
// TODO: test can't seem to identify next on subject. For now, check before calling
Copy link
Member

@rgbkrk rgbkrk Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since channels is a plain observable in some of the tests rather than a subject, it doesn't have .next.

if (kernel.channels.next) {
kernel.channels.next(kernelInfoRequest());
}
}),
map(() =>
actions.kernelAutoRestarted({
kernelRef
})
),
takeUntil(
action$.pipe(
ofType(actions.KILL_KERNEL_SUCCESSFUL),
filter(
(
killAction:
| actions.KillKernelSuccessful
| actions.NewKernelAction
) => killAction.payload.kernelRef === action.payload.kernelRef
)
)
),
catchError((error: Error) => {
return of(
actions.executeFailed({
error: new Error(
"The WebSocket connection has unexpectedly disconnected."
),
code: errors.EXEC_WEBSOCKET_ERROR,
contentRef
})
);
})
);
})
);

/**
* Send a kernel_info_request to the kernel.
*
Expand Down