Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2348 Make CSOT feature-gated behavior the default #1515

Open
wants to merge 73 commits into
base: master
Choose a base branch
from

Conversation

prestonvasquez
Copy link
Collaborator

@prestonvasquez prestonvasquez commented Jan 11, 2024

GODRIVER-2348

Summary

The PR attempts to make CSOT feature-gated behavior the default by dropping support for legacy timeout behavior.

Workflow Changes

Remove Contexts From Structs

Putting contexts on structs is an antipattern and makes unifying and creating context timeouts difficult due to the requirement of maintaining them as state (e.g. how do you ensure you never overwrite the context while use in different methods). This PR proposes replacing this practice with the following pattern:

func (s someStruct) do(ctx context.Context) {
	ctx, cancel := context.WithCancel(ctx)

	done := make(chan struct{}) 
	defer close(done) // cancel the context if check conmpletes w/o cancelation signal.

	go func() {
		defer cancel()

		select {
		case <-s.cancel:
		case <-done:
		}
	}()

	...
}

func (s someStruct) close() {
	s.cancelOnce.Do(func() {
		s.cancel <- struct{}{}
	})

	...
}

We still need some ability to cancel contexts passed to blocking operations outside of the go routine they are initiated in. So some state will need to be maintained on the initiating object. In this case, a cancelation channel and a sync.Once object.

Unify Timeout Logic

This PR proposes renaming csot.MakeTimeoutContext to csot.WithTimeout. Additionally, this update removes the need to create blocks like the following in operaiton execution, watching a change stream, and perform CRUD operations in GridFS:

if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
	newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
	// Redefine ctx to be the new timeout-derived context.
	ctx = newCtx
	// Cancel the timeout-derived context at the end of Execute to avoid a context leak.
	defer cancelFunc()
}

Rename maxTimeMS for cursor to maxAwaitTimeMS

There is still a need to maintain the maxTimeMS logic for tailable awaitData cursors. This PR proposes changing the code symbol from MaxTime to MaxAwaitTime to match the operation options (e.g. FindOptions.MaxAwaitTime) and the CSOT specifications. This logic has been deferred to the operation-level via Operation.DefaultWTimeout.

CSOT-Specific Changes

Write Concern

This PR proposes removing the WTimeout field from the WriteConcern object. It is worth noting that there is still a use case for sending wire messages with the wtimeout option set. Specifically, while committing a transaction:

if the modified write concern does not include a wtimeout value, drivers MUST also apply wtimeout: 10000 to the write concern in order to avoid waiting forever (or until a socket timeout) if the majority write concern cannot be satisfied.

The Go Driver organizes this logic by maintaining a copy of the txn write concern on the client session. In this case, given a CSOT, the remainder of that time is used for the value of wtimeout rather than the 10000 ms default.

Server Selection

client-side-operations-timeout/client-side-operations-timeout.md#server-selection

The server selection portion of the specification requires a comparison between timeoutMS and serverSelectionTimeoutMS. It also requires that the remaining timeout be passed to any followup operation establishing or checking out a connection:

ctx, cancel := csot.WithServerSelectionTimeout()
defer cancel()

server := SelectServer(ctx)
conn := server.Connection(ctx)

This workflow proposes building the context in a way that is decoupled from SelectServer and server.Connection.

Because server selection is made through a topology.Deployment interface, there is no obvious way to extrapolate the serverSelectionTimeoutMS for use by WithServerSelectionTimeout. Practically, the only implementation of topology.Deployment that accesses the SSTO data form options.ClientOptions is topology.Topology and so we could assert that implementation before calculating the timeout. However, for extensibility the current proposal is to use an interface:

type ServerSelectionTimeoutGetter struct {
	GetServerSelectionTimeout() time.Duration
}

func WithServerSelectionTimeout(
	parent context.Context, 
	serverSelectionTimeout time.Duration,
) (context.Context, context.CancelFunc) {
	...
}

func main() {
	dep := newDeployment()

	ctx, cancel := WithServerSelectionTimeout(context.Background(), dep.GetServerSelectionTimeout())
	defer cancel()

	server := SelectServer(ctx)
	conn := server.Connection(ctx)
	
	...
}

This would extend the topology.Deployment interface with the addition function GetServerSelectionTimeout().

Command Execution

client-side-operations-timeout/client-side-operations-timeout.md#command-execution

The original implementation of CSOT did not apply a context deadline to the maxTimeMS value. See here for the original logic. This PR proposes the following workflow to be spec compliant:

Screenshot 2024-02-20 at 8 42 22 PM

Change Streams

client-side-operations-timeout/client-side-operations-timeout.md#change-streams

GridFS API

client-side-operations-timeout/client-side-operations-timeout.md#gridfs-api

The specifications note that all methods in the GridFS bucket API must support the timeoutMS option. It's worth pointing out that all blocking operations within the scope of the database used to construct a GridFS bucket will inherit the client-level timeout value.

Additionally, the specifications require that the timeoutMS option cap the lifetime of the entire stream. The Go Driver currently only ensures this is true if a deadline is set on a context. If relying on the client-level timeout, this value is "refreshed" with every new blocking operation.

Sessions

client-side-operations-timeout/client-side-operations-timeout.md#convenient-transaction-api

Propose implementing the convenient transaction api requirements in GODRIVER-2367 , since the context behavior for convenience transactions are still being debated.

Background & Motivation

v1 required deprecating a set of legacy timeout options, gating their use behind a CSOT to avoid making a backwards breaking change. For v2, we can remove these legacy timeouts altogether.

@prestonvasquez prestonvasquez changed the base branch from v1 to master January 11, 2024 02:24
Copy link

mongodb-drivers-pr-bot bot commented Jan 16, 2024

API Change Report

./mongo

incompatible changes

(*Cursor).SetMaxTime: removed

compatible changes

(*Cursor).SetMaxAwaitTime: added

./mongo/options

incompatible changes

(*AggregateOptions).SetMaxTime: removed
(*ClientOptions).SetSocketTimeout: removed
(*CountOptions).SetMaxTime: removed
(*CreateIndexesOptions).SetMaxTime: removed
(*DistinctOptions).SetMaxTime: removed
(*DropIndexesOptions).SetMaxTime: removed
(*EstimatedDocumentCountOptions).SetMaxTime: removed
(*FindOneAndDeleteOptions).SetMaxTime: removed
(*FindOneAndReplaceOptions).SetMaxTime: removed
(*FindOneAndUpdateOptions).SetMaxTime: removed
(*FindOneOptions).SetMaxTime: removed
(*FindOptions).SetMaxTime: removed
(*GridFSFindOptions).SetMaxTime: removed
(*ListIndexesOptions).SetMaxTime: removed
(*SessionOptions).SetDefaultMaxCommitTime: removed
(*TransactionOptions).SetMaxCommitTime: removed
AggregateOptions.MaxTime: removed
ClientOptions.SocketTimeout: removed
CountOptions.MaxTime: removed
CreateIndexesOptions.MaxTime: removed
DistinctOptions.MaxTime: removed
DropIndexesOptions.MaxTime: removed
EstimatedDocumentCountOptions.MaxTime: removed
FindOneAndDeleteOptions.MaxTime: removed
FindOneAndReplaceOptions.MaxTime: removed
FindOneAndUpdateOptions.MaxTime: removed
FindOneOptions.MaxTime: removed
FindOptions.MaxTime: removed
GridFSFindOptions.MaxTime: removed
ListIndexesOptions.MaxTime: removed
SessionOptions.DefaultMaxCommitTime: removed
TransactionOptions.MaxCommitTime: removed

./mongo/writeconcern

incompatible changes

WriteConcern.WTimeout: removed

./x/mongo/driver

incompatible changes

(*BatchCursor).SetMaxTime: removed
CursorOptions.MaxTimeMS: removed
Deployment.GetServerSelectionTimeout: added
ErrNegativeMaxTime: removed
Operation.MaxTime: removed

compatible changes

(*BatchCursor).SetMaxAwaitTime: added
(*CursorOptions).SetMaxAwaitTime: added
CursorOptions.MaxAwaitTime: added
Operation.OmitMaxTimeMS: added
SingleConnectionDeployment.GetServerSelectionTimeout: added
SingleServerDeployment.GetServerSelectionTimeout: added

./x/mongo/driver/connstring

incompatible changes

ConnString.WTimeout: removed
ConnString.WTimeoutSet: removed
ConnString.WTimeoutSetFromOption: removed

./x/mongo/driver/operation

incompatible changes

(*Aggregate).MaxTime: removed
(*CommitTransaction).MaxTime: removed
(*Count).MaxTime: removed
(*CreateIndexes).MaxTime: removed
(*Distinct).MaxTime: removed
(*DropIndexes).MaxTime: removed
(*Find).MaxTime: removed
(*FindAndModify).MaxTime: removed
(*ListIndexes).MaxTime: removed

compatible changes

(*Hello).OmitMaxTimeMS: added

./x/mongo/driver/session

incompatible changes

Client.CurrentMct: removed
ClientOptions.DefaultMaxCommitTime: removed
TransactionOptions.MaxCommitTime: removed

compatible changes

Client.CurrentWTimeout: added

./x/mongo/driver/topology

incompatible changes

##ConnectServer: changed from func(./mongo/address.Address, updateTopologyCallback, ./bson.ObjectID, ...ServerOption) (*Server, error) to func(./mongo/address.Address, updateTopologyCallback, ./bson.ObjectID, time.Duration, ...ServerOption) (*Server, error)
ErrServerSelectionTimeout: removed
##NewServer: changed from func(./mongo/address.Address, ./bson.ObjectID, ...ServerOption) *Server to func(./mongo/address.Address, ./bson.ObjectID, time.Duration, ...ServerOption) *Server
##ServerAPIFromServerOptions: changed from func([]ServerOption) *./x/mongo/driver.ServerAPIOptions to func(time.Duration, []ServerOption) *./x/mongo/driver.ServerAPIOptions
WithConnectTimeout: removed
WithHeartbeatTimeout: removed
WithReadTimeout: removed
WithWriteTimeout: removed

compatible changes

(*Topology).GetServerSelectionTimeout: added
Config.ConnectTimeout: added
Config.Timeout: added

internal/csot/csot.go Show resolved Hide resolved
x/mongo/driver/operation.go Outdated Show resolved Hide resolved
internal/csot/csot.go Outdated Show resolved Hide resolved
internal/csot/csot.go Outdated Show resolved Hide resolved
x/mongo/driver/operation.go Outdated Show resolved Hide resolved
internal/csot/csot_test.go Outdated Show resolved Hide resolved
internal/csot/csot.go Outdated Show resolved Hide resolved
internal/csot/csot.go Outdated Show resolved Hide resolved
internal/csot/csot.go Outdated Show resolved Hide resolved
internal/integration/sdam_error_handling_test.go Outdated Show resolved Hide resolved
internal/integration/unified/operation.go Show resolved Hide resolved
Comment on lines 154 to 157
// validChangeStreamTimeouts will return "false" if maxAwaitTimeMS is set,
// timeoutMS is set to a non-zero value, and maxAwaitTimeMS is greater than or
// equal to timeoutMS. Otherwise, the timeouts are valid.
func validChangeStreamTimeouts(ctx context.Context, maxAwaitTime, timeout *time.Duration) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation does not make sense in the Go Driver. I believe it's based on this requirement from the CSOT spec:

Drivers MUST also apply the original timeoutMS value to each next call on the resulting cursor

However, the Go Driver requires passing a Context to each call to Next or TryNext, so the timeout passed to the initial Watch has no bearing on those future getMore call timeouts.

We should remove this validation. However, we may want to apply an analogous validation when calling Next or TryNext if the Context timeouts for those are less than maxAwaitTimeMS.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation is based on this requirement from the CSOT specs:

Drivers MUST error if maxAwaitTimeMS is set, timeoutMS is set to a non-zero value, and maxAwaitTimeMS is greater than or equal to timeoutMS.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, that's the timeout validation that's implemented here. However, since the Go Driver doesn't do this:

Drivers MUST also apply the original timeoutMS value to each next call on the resulting cursor

we also shouldn't do this:

Drivers MUST error if maxAwaitTimeMS is set, timeoutMS is set to a non-zero value, and maxAwaitTimeMS is greater than or equal to timeoutMS.

because that validation only makes sense in drivers that use the timeout passed to Find as the timeout for Next. In the Go Driver, users pass separate timeouts to Watch and Next.

For example, the following code would cause a validation error even though there is no client-side timeout on Next (i.e. no conflict with maxAwaitTimeMS):

opts := options.ChangeStream().SetMaxAwaitTime(10 * time.Second)

ctx, _ := context.WithTimeout(context.Background(), 1 * time.Second)
cs, err := coll.Watch(ctx, mongo.Pipeline{}, opts)
if err != nil {
	panic(err) // panics with "maxAwaitTimeMS cannot be greater than or equal to timeoutMS"
}

for cs.Next(context.Background()) {
	// ...

Additionally, the following code would not cause a validation error, even though the Next timeout is much lower than maxAwaitTimeMS:

opts := options.ChangeStream().SetMaxAwaitTime(10 * time.Second)

cs, err := coll.Watch(context.Background(), mongo.Pipeline{}, opts)
if err != nil {
	panic(err)
}

ctx, _ := context.WithTimeout(context.Background(), 1 * time.Second)
for cs.Next(ctx) {
	// Next possibly times out before a the server responds.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drivers MUST also apply the original timeoutMS value to each next call on the resulting cursor

This is part of the proposal for this PR, see L712.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on additional discussion, I believe we decided to apply the Watch and Next/TryNext timeouts independently (both still using the client-level timeout when no operation-level timeout is set). As long as that's accurate, we can consider this comment resolved.

mongo/change_stream.go Outdated Show resolved Hide resolved
mongo/change_stream.go Show resolved Hide resolved
mongo/client.go Outdated Show resolved Hide resolved
x/mongo/driver/topology/server.go Show resolved Hide resolved
x/mongo/driver/topology/topology_test.go Outdated Show resolved Hide resolved
x/mongo/driver/topology/topology.go Show resolved Hide resolved
x/mongo/driver/topology/pool.go Outdated Show resolved Hide resolved
x/mongo/driver/topology/server.go Outdated Show resolved Hide resolved
x/mongo/driver/topology/server.go Outdated Show resolved Hide resolved
x/mongo/driver/driver.go Outdated Show resolved Hide resolved
x/mongo/driver/topology/server.go Outdated Show resolved Hide resolved
Comment on lines 119 to 120
heartbeatLock sync.Mutex
conn *connection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of heartbeatLock to guard conn is very difficult to reason about and seems inconsistent. I realize that many of the uses of heartbeatLock were not added in this PR, but the updates here highlight the existing problems.

Is there a way to change the heartbeat logic to guarantee that conn is only accessed by a single goroutine? The only concurrent access seems to come from cancelCheck. Can we use checkCanceled as an indicator that we need to cancel the check and create a new conn instead of accessing conn directly? If not, is there a way to simplify the access pattern so it's easier to understand?

x/mongo/driver/topology/server.go Outdated Show resolved Hide resolved
@@ -204,7 +233,8 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
cs.cursorOptions.BatchSize = *cs.options.BatchSize
}
if cs.options.MaxAwaitTime != nil {
cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
fmt.Println("max await time: ", cs.options.MaxAwaitTime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove debugging statement.

Suggested change
fmt.Println("max await time: ", cs.options.MaxAwaitTime)

mongo/client.go Outdated
Comment on lines 183 to 185
if to := clientOpt.Timeout; to != nil && *to < 0 {
return nil, fmt.Errorf(`invalid value %q for "Timeout": value must be positive`, *to)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this check be in ClientOptions.validate instead?

var handshakeCtx context.Context
handshakeCtx, c.cancelConnectContext = context.WithCancel(ctx)
c.connectContextMutex.Unlock()
ctx, cancel := contextWithNewConnTimeout(ctx, c)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about the need for this. It seems like all callers of connect already apply connectTimeout to the context passed to connect. Is there another reason we need to add the connectTimeout here, too?

Copy link
Collaborator

@matthewdale matthewdale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few small fixes required, but the overall CSOT behavior here looks good!

Comment on lines -609 to 661
desc, err := s.check()
desc, err := checkServerWithSignal(s, s.conn, s.heartbeatListener)
if errors.Is(err, errCheckCancelled) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do something with this error if it's not errCheckCancelled. What should we do if there's some other network or server error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never did anything with it beforehand. Do you have any suggestions?

mongo/change_stream.go Outdated Show resolved Hide resolved
Comment on lines 154 to 157
// validChangeStreamTimeouts will return "false" if maxAwaitTimeMS is set,
// timeoutMS is set to a non-zero value, and maxAwaitTimeMS is greater than or
// equal to timeoutMS. Otherwise, the timeouts are valid.
func validChangeStreamTimeouts(ctx context.Context, maxAwaitTime, timeout *time.Duration) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on additional discussion, I believe we decided to apply the Watch and Next/TryNext timeouts independently (both still using the client-level timeout when no operation-level timeout is set). As long as that's accurate, we can consider this comment resolved.

x/mongo/driver/topology/server.go Outdated Show resolved Hide resolved
x/mongo/driver/topology/server.go Outdated Show resolved Hide resolved
x/mongo/driver/topology/server.go Outdated Show resolved Hide resolved
if cancelFn != nil {
cancelFn()
}
<-c.cancelConnSig
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancelConnSig is never initialized, so this receive will block forever. Also, if closeConnectContext is never called, this will leak a goroutine. I suggest two changes:

  • Initialize cancelConnSig with make(chan struct{}, 1) in newConnection.
    • Use a buffered channel to resolve race conditions between starting this goroutine and calling closeConnectContext.
  • Add defer conn.closeConnectContext to conn.connect so we don't leak this goroutine.

Optional: Consider moving the following code into the beginning of the connection.close method to reduce the chances for bugs when closing a connection:

func (c *connection) close() error {
	conn.closeConnectContext()
	conn.wait()

	// ...
}

In the case the connection is already connected, those calls are idempotent and shouldn't block. That would allow us to make all usage of the cancelConnSig internal to the connection type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! This pattern seems fine, except we won't be able to wait for the connection to be done before closing the connection. Otherwise close() will block indefinitely whenever there is a timeout while performing a handshake with the server.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority-2-medium Medium Priority PR for Review
Projects
None yet
3 participants