Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator Life Cycle Overhaul #568

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jacksonrnewhouse
Copy link
Contributor

This reworks how our operators shutdown, both because of EndOfData messages and stopping checkpoints. It affects SourceOperators, ArrowOperators, and the controller.

SourceOperator changes

Two main changes here. First, rather than having every source have a very similar match block on the control receiver, they all call handle_control_message(). This will call flush_before_checkpoint(), which each operator must implement. This did require moving some of the tracking structs into the main source struct.

Secondly, once the source finishes producing data, it now waits for either a stopping checkpoint or an immediate stop message. This lets checkpoints still be taken after some but not all sources are finished, where previously they would've hung.

ArrowOperators

The previous on_close() is mainly replaced with an on_end_of_data() method, which is called when the operator is finishing. Mainly relevant for the WatermarkGenerator, which will insert the watermark that flushes downstream nodes. on_close() is still there, but now only used by the PreviewSink.

Like sources, it waits until an explicit signal to shut down, in this case receiving a SignalMessage::Shutdown.

It also tracks whether the inputs were finished because they were stopped or because they don't have anymore data.

Controller

The Finishing state has been overhauled. First, it is only transitioned to once all of the operators have finished processing their data. Then, it takes a stopping checkpoint which will commit any outstanding sinks.

Because we have several states that take stopping checkpoints I've factored that out into the take_stopping_checkpoint() method on the JobConfig. I'd appreciate a look if I translated all of the states correctly.

StopAndSendStop,
Finish,
FinishedStoppingCheckpoint,
NoMoreData { end_of_data: bool },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads as a bit confusing to me — NoMoreData and end_of_data seem like synonyms, but we can have NoMoreData { end_of_data: false }. Is there a better name for one of them?

Stop,
StopAndSendStop,
Finish,
FinishedStoppingCheckpoint,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These names don't make sense to me for ControlOutcome, which is supposed to tell the operator what to do ("continue", "stop", "finish", etc.).

Stop,
StopAndSendStop,
Finish,
FinishedStoppingCheckpoint,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These names don't make sense to me for ControlOutcome, which is supposed to tell the operator what to do ("continue", "stop", "finish", etc.).

.await;
}
SourceFinishType::Immediate => {
ctx.broadcast(ArrowMessage::Signal(SignalMessage::Shutdown))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For immediate shutdown, I think the source should just exit or use try_send, instead of blocking on send a message through the dataflow. (I believe this isn't new behavior in this PR, but the changes have made it more obvious what's happening).

The point of immediate shutdown is that there may be problems processing messages, or a ton of backpressure, which prevents messages from flowing (and possibly blocking even sending this message).

Immediate shutdown is a message from the user that they would like to shut down immediately, without waiting on the dataflow. That's accomplished fastest by exiting and letting the queues close.

@@ -172,6 +172,7 @@ service ControllerGrpc {
rpc TaskStarted(TaskStartedReq) returns (TaskStartedResp);
rpc TaskCheckpointEvent(TaskCheckpointEventReq) returns (TaskCheckpointEventResp);
rpc TaskCheckpointCompleted(TaskCheckpointCompletedReq) returns (TaskCheckpointCompletedResp);
rpc TaskDataFinished(TaskFinishedReq) returns (TaskFinishedResp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is TaskDataFinished the right name for this? It seems to be sent whenever operators exit.

}
}
None => {
warn!("source {}-{} received None from control channel, indicating sender has been dropped",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a panic?

if job_controller.finished() {
return Ok(StoppingCheckpointOutcome::SuccessfullyStopped);
} else if !shutdown_started {
info!("Starting shutdown");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the controller, logs should use structed logging like https://github.com/ArroyoSystems/arroyo/blob/master/crates/arroyo-controller/src/states/recovering.rs#L24 — this preserves context around job_id and makes it easier to build alerts / monitoring around the log lines

.rx
.recv()
.await
.expect("channel closed while receiving")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't panic in the controller

_ => {
// ignore other messages
}
match ctx.take_stopping_checkpoint().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there might be some problems with rescaling. I got into a state where the the operators didn't shutdown cleanly, getting the controller stuck indefinitely in rescaling:
rescaling.txt.

I also wasn't able to force stop out of rescaling — the pipeline was just wedged until I restarted the controller, then on recovery it went into CheckpointStopping where it also got stuck.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm able to consistently reproduce this by doing two rescales in a row

break;
ControlOutcome::NoMoreData { end_of_data } => {
if operator_state != OperatorState::Running {
warn!("received no more data update in operator {}-{} while in state {:?}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consistently see this warning when stopping pipelines — is it an actual issue, or should we make it a debug?

Change how Finishing pipelines behave so that they take a final stopping checkpoint.
Unify how sources handle control messages.
Factor out stopping checkpoint control.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants