Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickerwrapper: use atomic instead of locks #7214

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.cc.mu.Lock()
defer ccb.cc.mu.Unlock()
if ccb.cc.conns == nil {
// The CC has been closed; ignore this update.
return
}

ccb.mu.Lock()
if ccb.closed {
Expand Down
78 changes: 36 additions & 42 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
Expand All @@ -33,35 +33,42 @@ import (
"google.golang.org/grpc/status"
)

// pickerGeneration stores a picker and a channel used to signal that a picker
// newer than this one is available.
type pickerGeneration struct {
// picker is the picker produced by the LB policy. May be nil if a picker
// has never been produced.
picker balancer.Picker
// blockingCh is closed when the picker has been invalidated because there
// is a new one available.
blockingCh chan struct{}
}

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
// If pickerGen holds a nil pointer, the pickerWrapper is closed.
pickerGen atomic.Pointer[pickerGeneration]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wohooo, generics!!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like atomic.Pointer a lot. atomic's unsafe.Pointer operations were terrible.

statsHandlers []stats.Handler // to record blocking picker calls
}

func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
return &pickerWrapper{
blockingCh: make(chan struct{}),
pw := &pickerWrapper{
statsHandlers: statsHandlers,
}
pw.pickerGen.Store(&pickerGeneration{
blockingCh: make(chan struct{}),
})
return pw
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are here, could you please update this comment to say UpdateState instead of UpdateBalancerState. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified that it comes from the LB policy since we have 16 different UpdateStates.

func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
return
}
pw.picker = p
// pw.blockingCh should never be nil.
close(pw.blockingCh)
pw.blockingCh = make(chan struct{})
pw.mu.Unlock()
old := pw.pickerGen.Swap(&pickerGeneration{
picker: p,
blockingCh: make(chan struct{}),
})
close(old.blockingCh)
}

// doneChannelzWrapper performs the following:
Expand Down Expand Up @@ -98,20 +105,17 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
var lastPickErr error

for {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
pg := pw.pickerGen.Load()
if pg == nil {
return nil, balancer.PickResult{}, ErrClientConnClosing
}

if pw.picker == nil {
ch = pw.blockingCh
if pg.picker == nil {
ch = pg.blockingCh
}
if ch == pw.blockingCh {
if ch == pg.blockingCh {
// This could happen when either:
// - pw.picker is nil (the previous if condition), or
// - has called pick on the current picker.
pw.mu.Unlock()
// - we have already called pick on the current picker.
select {
case <-ctx.Done():
var errStr string
Expand Down Expand Up @@ -145,9 +149,8 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
}

ch = pw.blockingCh
p := pw.picker
pw.mu.Unlock()
ch = pg.blockingCh
p := pg.picker

pickResult, err := p.Pick(info)
if err != nil {
Expand Down Expand Up @@ -197,24 +200,15 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}

func (pw *pickerWrapper) close() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.done {
return
}
pw.done = true
close(pw.blockingCh)
old := pw.pickerGen.Swap(nil)
close(old.blockingCh)
}

// reset clears the pickerWrapper and prepares it for being used again when idle
// mode is exited.
func (pw *pickerWrapper) reset() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.done {
return
}
pw.blockingCh = make(chan struct{})
old := pw.pickerGen.Swap(&pickerGeneration{blockingCh: make(chan struct{})})
close(old.blockingCh)
}

// dropError is a wrapper error that indicates the LB policy wishes to drop the
Expand Down