Skip to content

Commit

Permalink
feat(retry): add ctx param for customized result retry funcs (#1353)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangruiEmma committed May 23, 2024
1 parent a11b1c7 commit 551e2b2
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 42 deletions.
2 changes: 1 addition & 1 deletion client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func WithRetryMethodPolicies(mp map[string]retry.Policy) Option {
// But if your retry policy is enabled by remote config, WithSpecifiedResultRetry is useful.
func WithSpecifiedResultRetry(rr *retry.ShouldResultRetry) Option {
return Option{F: func(o *client.Options, di *utils.Slice) {
if rr == nil || (rr.RespRetry == nil && rr.ErrorRetry == nil) {
if rr == nil || !rr.IsValid() {
panic(fmt.Errorf("WithSpecifiedResultRetry: invalid '%+v'", rr))
}
di.Push(fmt.Sprintf("WithSpecifiedResultRetry(%+v)", rr))
Expand Down
12 changes: 6 additions & 6 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,40 @@ func TestRetryOptionDebugInfo(t *testing.T) {
fp.WithDDLStop()
expectPolicyStr := "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr := fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr := fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithFixedBackOff(10)
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:fixed CfgItems:map[fix_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRandomBackOff(10, 20)
fp.DisableChainRetryStop()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRetrySameNode()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetry: func(err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

bp := retry.NewBackupPolicy(20)
expectPolicyStr = "WithBackupRequest({RetryDelayMS:20 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:false CBPolicy:{ErrorRate:0.1}} RetrySameNode:false})"
policyStr = fmt.Sprintf("WithBackupRequest(%+v)", *bp)
policyStr = fmt.Sprintf("WithBackupRequest(%+v)", bp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)
WithBackupRequest(bp)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/retry/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ func (p *BackupPolicy) WithRetrySameNode() {
}

// String is used to print human readable debug info.
func (p BackupPolicy) String() string {
func (p *BackupPolicy) String() string {
return fmt.Sprintf("{RetryDelayMS:%+v StopPolicy:%+v RetrySameNode:%+v}", p.RetryDelayMS, p.StopPolicy, p.RetrySameNode)
}
2 changes: 1 addition & 1 deletion pkg/retry/backup_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (r *backupRetryer) UpdatePolicy(rp Policy) (err error) {
}

// AppendErrMsgIfNeeded implements the Retryer interface.
func (r *backupRetryer) AppendErrMsgIfNeeded(err error, ri rpcinfo.RPCInfo, msg string) {
func (r *backupRetryer) AppendErrMsgIfNeeded(ctx context.Context, err error, ri rpcinfo.RPCInfo, msg string) {
if kerrors.IsTimeoutError(err) {
// Add additional reason to the error message when timeout occurs but the backup request is not sent.
appendErrMsg(err, msg)
Expand Down
5 changes: 3 additions & 2 deletions pkg/retry/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package retry

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -112,14 +113,14 @@ func (p *FailurePolicy) WithSpecifiedResultRetry(rr *ShouldResultRetry) {
}

// String prints human readable information.
func (p FailurePolicy) String() string {
func (p *FailurePolicy) String() string {
return fmt.Sprintf("{StopPolicy:%+v BackOffPolicy:%+v RetrySameNode:%+v "+
"ShouldResultRetry:{ErrorRetry:%t, RespRetry:%t}}", p.StopPolicy, p.BackOffPolicy, p.RetrySameNode, p.IsErrorRetryNonNil(), p.IsRespRetryNonNil())
}

// AllErrorRetry is common choice for ShouldResultRetry.
func AllErrorRetry() *ShouldResultRetry {
return &ShouldResultRetry{ErrorRetry: func(err error, ri rpcinfo.RPCInfo) bool {
return &ShouldResultRetry{ErrorRetryWithCtx: func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
return err != nil
}}
}
Expand Down
34 changes: 22 additions & 12 deletions pkg/retry/failure_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (r *failureRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rp
circuitbreak.RecordStat(ctx, req, nil, err, cbKey, r.cbContainer.cbCtl, r.cbContainer.cbPanel)
}
if err == nil {
if r.policy.IsRespRetryNonNil() && r.policy.ShouldResultRetry.RespRetry(resp, cRI) {
if r.policy.IsRespRetry(ctx, resp, cRI) {
// user specified resp to do retry
continue
}
Expand All @@ -147,7 +147,7 @@ func (r *failureRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rp
if i == retryTimes {
// stop retry then wrap error
err = kerrors.ErrRetry.WithCause(err)
} else if !r.isRetryErr(err, cRI) {
} else if !r.isRetryErr(ctx, err, cRI) {
// not timeout or user specified error won't do retry
break
}
Expand Down Expand Up @@ -204,8 +204,8 @@ func (r *failureRetryer) UpdatePolicy(rp Policy) (err error) {
}

// AppendErrMsgIfNeeded implements the Retryer interface.
func (r *failureRetryer) AppendErrMsgIfNeeded(err error, ri rpcinfo.RPCInfo, msg string) {
if r.isRetryErr(err, ri) {
func (r *failureRetryer) AppendErrMsgIfNeeded(ctx context.Context, err error, ri rpcinfo.RPCInfo, msg string) {
if r.isRetryErr(ctx, err, ri) {
// Add additional reason when retry is not applied.
appendErrMsg(err, msg)
}
Expand All @@ -216,7 +216,7 @@ func (r *failureRetryer) Prepare(ctx context.Context, prevRI, retryRI rpcinfo.RP
handleRetryInstance(r.policy.RetrySameNode, prevRI, retryRI)
}

func (r *failureRetryer) isRetryErr(err error, ri rpcinfo.RPCInfo) bool {
func (r *failureRetryer) isRetryErr(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
if err == nil {
return false
}
Expand All @@ -228,7 +228,7 @@ func (r *failureRetryer) isRetryErr(err error, ri rpcinfo.RPCInfo) bool {
if r.policy.IsRetryForTimeout() && kerrors.IsTimeoutError(err) {
return true
}
if r.policy.IsErrorRetryNonNil() && r.policy.ShouldResultRetry.ErrorRetry(err, ri) {
if r.policy.IsErrorRetry(ctx, err, ri) {
return true
}
return false
Expand Down Expand Up @@ -283,8 +283,11 @@ func (r *failureRetryer) Dump() map[string]interface{} {
dm["failure_retry"] = r.policy
if r.policy != nil {
dm["specified_result_retry"] = map[string]bool{
"error_retry": r.policy.IsErrorRetryNonNil(),
"resp_retry": r.policy.IsRespRetryNonNil(),
"error_retry": r.policy.IsErrorRetryWithCtxNonNil(),
"resp_retry": r.policy.IsRespRetryWithCtxNonNil(),
// keep it for some versions to confirm the correctness when troubleshooting
"old_error_retry": r.policy.IsErrorRetryNonNil(),
"old_resp_retry": r.policy.IsRespRetryNonNil(),
}
}
if r.errMsg != "" {
Expand All @@ -298,9 +301,16 @@ func (r *failureRetryer) setSpecifiedResultRetryIfNeeded(rr *ShouldResultRetry)
// save the object specified by client.WithSpecifiedResultRetry(..)
r.specifiedResultRetry = rr
}
if r.policy != nil && r.specifiedResultRetry != nil {
// The priority of client.WithSpecifiedResultRetry(..) is higher, so always update it
// NOTE: client.WithSpecifiedResultRetry(..) will always reject a nil object
r.policy.ShouldResultRetry = r.specifiedResultRetry
if r.policy != nil {
if r.specifiedResultRetry != nil {
// The priority of client.WithSpecifiedResultRetry(..) is higher, so always update it
// NOTE: client.WithSpecifiedResultRetry(..) will always reject a nil object
r.policy.ShouldResultRetry = r.specifiedResultRetry
}

// even though rr passed from this func is nil,
// the Policy may also have ShouldResultRetry from client.WithFailureRetry or callopt.WithRetryPolicy.
// convertResultRetry is used to convert 'ErrorRetry and RespRetry' to 'ErrorRetryWithCtx and RespRetryWithCtx'
r.policy.ConvertResultRetry()
}
}
89 changes: 73 additions & 16 deletions pkg/retry/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package retry

import (
"context"
"fmt"

"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand Down Expand Up @@ -95,21 +96,6 @@ type FailurePolicy struct {
Extra string `json:"extra"`
}

// IsRespRetryNonNil is used to check if RespRetry is nil
func (p FailurePolicy) IsRespRetryNonNil() bool {
return p.ShouldResultRetry != nil && p.ShouldResultRetry.RespRetry != nil
}

// IsErrorRetryNonNil is used to check if ErrorRetry is nil
func (p FailurePolicy) IsErrorRetryNonNil() bool {
return p.ShouldResultRetry != nil && p.ShouldResultRetry.ErrorRetry != nil
}

// IsRetryForTimeout is used to check if timeout error need to retry
func (p FailurePolicy) IsRetryForTimeout() bool {
return p.ShouldResultRetry == nil || !p.ShouldResultRetry.NotRetryForTimeout
}

// BackupPolicy for backup request
// DON'T FORGET to update Equals() and DeepCopy() if you add new fields
type BackupPolicy struct {
Expand Down Expand Up @@ -168,8 +154,16 @@ const (

// ShouldResultRetry is used for specifying which error or resp need to be retried
type ShouldResultRetry struct {
// ErrorRetryWithCtx is added in v0.10.0, passing ctx is more convenient for user
ErrorRetryWithCtx func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool
// RespRetryWithCtx is added in v0.10.0, passing ctx is more convenient for user
RespRetryWithCtx func(ctx context.Context, resp interface{}, ri rpcinfo.RPCInfo) bool

// Deprecated: please use ErrorRetryWithCtx instead of ErrorRetry
ErrorRetry func(err error, ri rpcinfo.RPCInfo) bool
RespRetry func(resp interface{}, ri rpcinfo.RPCInfo) bool
// Deprecated: please use RespRetryWithCtx instead of RespRetry
RespRetry func(resp interface{}, ri rpcinfo.RPCInfo) bool

// disable the default timeout retry in specific scenarios (e.g. the requests are not non-idempotent)
NotRetryForTimeout bool
}
Expand Down Expand Up @@ -229,6 +223,65 @@ func (p *FailurePolicy) DeepCopy() *FailurePolicy {
}
}

// IsRespRetryWithCtxNonNil is used to check if RespRetryWithCtx is nil.
func (p *FailurePolicy) IsRespRetryWithCtxNonNil() bool {
return p.ShouldResultRetry != nil && p.ShouldResultRetry.RespRetryWithCtx != nil
}

// IsErrorRetryWithCtxNonNil is used to check if ErrorRetryWithCtx is nil
func (p *FailurePolicy) IsErrorRetryWithCtxNonNil() bool {
return p.ShouldResultRetry != nil && p.ShouldResultRetry.ErrorRetryWithCtx != nil
}

// IsRespRetryNonNil is used to check if RespRetry is nil.
// Deprecated: please use IsRespRetryWithCtxNonNil instead of IsRespRetryNonNil.
func (p *FailurePolicy) IsRespRetryNonNil() bool {
return p.ShouldResultRetry != nil && p.ShouldResultRetry.RespRetry != nil
}

// IsErrorRetryNonNil is used to check if ErrorRetry is nil.
// Deprecated: please use IsErrorRetryWithCtxNonNil instead of IsErrorRetryNonNil.
func (p *FailurePolicy) IsErrorRetryNonNil() bool {
return p.ShouldResultRetry != nil && p.ShouldResultRetry.ErrorRetry != nil
}

// IsRetryForTimeout is used to check if timeout error need to retry
func (p *FailurePolicy) IsRetryForTimeout() bool {
return p.ShouldResultRetry == nil || !p.ShouldResultRetry.NotRetryForTimeout
}

// IsRespRetry is used to check if the resp need to do retry.
func (p *FailurePolicy) IsRespRetry(ctx context.Context, resp interface{}, ri rpcinfo.RPCInfo) bool {
// note: actually, it is better to check IsRespRetry to ignore the bad cases,
// but IsRespRetry is a deprecated definition and here will be executed for every call, depends on ConvertResultRetry to ensure the compatibility
return p.IsRespRetryWithCtxNonNil() && p.ShouldResultRetry.RespRetryWithCtx(ctx, resp, ri)
}

// IsErrorRetry is used to check if the error need to do retry.
func (p *FailurePolicy) IsErrorRetry(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
// note: actually, it is better to check IsErrorRetry to ignore the bad cases,
// but IsErrorRetry is a deprecated definition and here will be executed for every call, depends on ConvertResultRetry to ensure the compatibility
return p.IsErrorRetryWithCtxNonNil() && p.ShouldResultRetry.ErrorRetryWithCtx(ctx, err, ri)
}

// ConvertResultRetry is used to convert 'ErrorRetry and RespRetry' to 'ErrorRetryWithCtx and RespRetryWithCtx'
func (p *FailurePolicy) ConvertResultRetry() {
if p == nil || p.ShouldResultRetry == nil {
return
}
rr := p.ShouldResultRetry
if rr.ErrorRetry != nil && rr.ErrorRetryWithCtx == nil {
rr.ErrorRetryWithCtx = func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
return rr.ErrorRetry(err, ri)
}
}
if rr.RespRetry != nil && rr.RespRetryWithCtx == nil {
rr.RespRetryWithCtx = func(ctx context.Context, resp interface{}, ri rpcinfo.RPCInfo) bool {
return rr.RespRetry(resp, ri)
}
}
}

// Equals to check if BackupPolicy is equal
func (p *BackupPolicy) Equals(np *BackupPolicy) bool {
if p == nil {
Expand Down Expand Up @@ -305,6 +358,10 @@ func (p *BackOffPolicy) copyCfgItems() map[BackOffCfgKey]float64 {
return cfgItems
}

func (rr *ShouldResultRetry) IsValid() bool {
return rr.ErrorRetryWithCtx != nil || rr.RespRetryWithCtx != nil || rr.RespRetry != nil || rr.ErrorRetry != nil
}

func checkCBErrorRate(p *CBPolicy) error {
if p.ErrorRate <= 0 || p.ErrorRate > 0.3 {
return fmt.Errorf("invalid retry circuit breaker rate, errRate=%0.2f", p.ErrorRate)
Expand Down
4 changes: 2 additions & 2 deletions pkg/retry/retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Retryer interface {

// Retry policy execute func. recycleRI is to decide if the firstRI can be recycled.
Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpcinfo.RPCInfo, request interface{}) (lastRI rpcinfo.RPCInfo, recycleRI bool, err error)
AppendErrMsgIfNeeded(err error, ri rpcinfo.RPCInfo, msg string)
AppendErrMsgIfNeeded(ctx context.Context, err error, ri rpcinfo.RPCInfo, msg string)

// Prepare to do something needed before retry call.
Prepare(ctx context.Context, prevRI, retryRI rpcinfo.RPCInfo)
Expand Down Expand Up @@ -339,7 +339,7 @@ func (rc *Container) WithRetryIfNeeded(ctx context.Context, callOptRetry *Policy
return ri, true, err
}
if msg != "" {
retryer.AppendErrMsgIfNeeded(err, ri, msg)
retryer.AppendErrMsgIfNeeded(ctx, err, ri, msg)
}
return ri, false, err
}
Expand Down

0 comments on commit 551e2b2

Please sign in to comment.