Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: periodically detect and update cluster connection status #18143

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 44 additions & 12 deletions controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ const (

// EnvClusterCacheRetryUseBackoff is the env variable to control whether to use a backoff strategy with the retry during cluster cache sync
EnvClusterCacheRetryUseBackoff = "ARGOCD_CLUSTER_CACHE_RETRY_USE_BACKOFF"

// EnvClusterConnectionMonitoringInterval is the env variable to configure the cluster status monitoring interval.
EnvClusterConnectionMonitoringInterval = "ARGOCD_CLUSTER_STATUS_MONITORING_INTERVAL"
)

// GitOps engine cluster cache tuning options
Expand Down Expand Up @@ -100,6 +103,9 @@ var (

// clusterCacheRetryUseBackoff specifies whether to use a backoff strategy on cluster cache sync, if retry is enabled
clusterCacheRetryUseBackoff bool = false

// clusterStatusMonitoringInterval specifies the interval used by Argo CD to monitor the cluster connection status.
clusterStatusMonitoringInterval = 10 * time.Second
)

func init() {
Expand All @@ -111,6 +117,7 @@ func init() {
clusterCacheListSemaphoreSize = env.ParseInt64FromEnv(EnvClusterCacheListSemaphore, clusterCacheListSemaphoreSize, 0, math.MaxInt64)
clusterCacheAttemptLimit = int32(env.ParseNumFromEnv(EnvClusterCacheAttemptLimit, int(clusterCacheAttemptLimit), 1, math.MaxInt32))
clusterCacheRetryUseBackoff = env.ParseBoolFromEnv(EnvClusterCacheRetryUseBackoff, false)
clusterStatusMonitoringInterval = env.ParseDurationFromEnv(EnvClusterConnectionMonitoringInterval, clusterStatusMonitoringInterval, 0, math.MaxInt64)
}

type LiveStateCache interface {
Expand Down Expand Up @@ -176,15 +183,16 @@ func NewLiveStateCache(
resourceTracking argo.ResourceTracking) LiveStateCache {

return &liveStateCache{
appInformer: appInformer,
db: db,
clusters: make(map[string]clustercache.ClusterCache),
onObjectUpdated: onObjectUpdated,
kubectl: kubectl,
settingsMgr: settingsMgr,
metricsServer: metricsServer,
clusterSharding: clusterSharding,
resourceTracking: resourceTracking,
appInformer: appInformer,
db: db,
clusters: make(map[string]clustercache.ClusterCache),
clusterStatusCancel: make(map[string]context.CancelFunc),
onObjectUpdated: onObjectUpdated,
kubectl: kubectl,
settingsMgr: settingsMgr,
metricsServer: metricsServer,
clusterSharding: clusterSharding,
resourceTracking: resourceTracking,
}
}

Expand All @@ -210,9 +218,10 @@ type liveStateCache struct {
resourceTracking argo.ResourceTracking
ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts

clusters map[string]clustercache.ClusterCache
cacheSettings cacheSettings
lock sync.RWMutex
clusterStatusCancel map[string]context.CancelFunc
clusters map[string]clustercache.ClusterCache
cacheSettings cacheSettings
lock sync.RWMutex
}

func (c *liveStateCache) loadCacheSettings() (*cacheSettings, error) {
Expand Down Expand Up @@ -520,10 +529,21 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
clustercache.SetLogr(logutils.NewLogrusLogger(log.WithField("server", cluster.Server))),
clustercache.SetRetryOptions(clusterCacheAttemptLimit, clusterCacheRetryUseBackoff, isRetryableError),
clustercache.SetRespectRBAC(respectRBAC),
clustercache.SetClusterStatusRetryFunc(isTransientNetworkErr),
clustercache.SetClusterConnectionInterval(clusterStatusMonitoringInterval),
}

clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...)

if clusterStatusMonitoringInterval != 0 {
ctx, cancel := context.WithCancel(context.Background())
if c.clusterStatusCancel == nil {
c.clusterStatusCancel = make(map[string]context.CancelFunc)
}
c.clusterStatusCancel[server] = cancel
clusterCache.StartClusterConnectionStatusMonitoring(ctx)
}

_ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) {
toNotify := make(map[string]bool)
var ref v1.ObjectReference
Expand Down Expand Up @@ -777,6 +797,12 @@ func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *a
if !c.canHandleCluster(newCluster) {
cluster.Invalidate()
c.lock.Lock()
cancel, ok := c.clusterStatusCancel[newCluster.Server]
if ok {
// stop the cluster status monitoring goroutine
cancel()
delete(c.clusterStatusCancel, newCluster.Server)
}
delete(c.clusters, newCluster.Server)
c.lock.Unlock()
return
Expand Down Expand Up @@ -818,6 +844,12 @@ func (c *liveStateCache) handleDeleteEvent(clusterServer string) {
if ok {
cluster.Invalidate()
c.lock.Lock()
cancel, ok := c.clusterStatusCancel[clusterServer]
if ok {
// stop the cluster status monitoring goroutine
cancel()
delete(c.clusterStatusCancel, clusterServer)
}
delete(c.clusters, clusterServer)
c.lock.Unlock()
}
Expand Down
11 changes: 7 additions & 4 deletions controller/clusterinfoupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package controller
import (
"context"
"fmt"
"github.com/argoproj/argo-cd/v2/common"
"time"

"github.com/argoproj/argo-cd/v2/common"

"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
Expand Down Expand Up @@ -143,20 +144,22 @@ func (c *clusterInfoUpdater) getUpdatedClusterInfo(ctx context.Context, apps []*
ConnectionState: appv1.ConnectionState{ModifiedAt: &now},
ApplicationsCount: appCount,
}
if info != nil {
if info != nil && appCount != 0 {
clusterInfo.ServerVersion = info.K8SVersion
clusterInfo.APIVersions = argo.APIResourcesToStrings(info.APIResources, true)
if info.LastCacheSyncTime == nil {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
} else if info.SyncError == nil {
} else if info.SyncError == nil && info.ConnectionStatus == cache.ConnectionStatusSuccessful {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusSuccessful
syncTime := metav1.NewTime(*info.LastCacheSyncTime)
clusterInfo.CacheInfo.LastCacheSyncTime = &syncTime
clusterInfo.CacheInfo.APIsCount = int64(info.APIsCount)
clusterInfo.CacheInfo.ResourcesCount = int64(info.ResourcesCount)
} else {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusFailed
clusterInfo.ConnectionState.Message = info.SyncError.Error()
if info.SyncError != nil {
clusterInfo.ConnectionState.Message = info.SyncError.Error()
}
}
} else {
clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
Expand Down
23 changes: 20 additions & 3 deletions controller/clusterinfoupdater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ func TestClusterSecretUpdater(t *testing.T) {
var tests = []struct {
LastCacheSyncTime *time.Time
SyncError error
ConnectionStatus clustercache.ConnectionStatus
ExpectedStatus v1alpha1.ConnectionStatus
}{
{nil, nil, v1alpha1.ConnectionStatusUnknown},
{&now, nil, v1alpha1.ConnectionStatusSuccessful},
{&now, fmt.Errorf("sync failed"), v1alpha1.ConnectionStatusFailed},
{nil, nil, clustercache.ConnectionStatusUnknown, v1alpha1.ConnectionStatusUnknown},
{&now, nil, clustercache.ConnectionStatusSuccessful, v1alpha1.ConnectionStatusSuccessful},
{&now, fmt.Errorf("sync failed"), clustercache.ConnectionStatusSuccessful, v1alpha1.ConnectionStatusFailed},
{&now, nil, clustercache.ConnectionStatusFailed, v1alpha1.ConnectionStatusFailed},
}

emptyArgoCDConfigMap := &v1.ConfigMap{
Expand Down Expand Up @@ -78,12 +80,27 @@ func TestClusterSecretUpdater(t *testing.T) {
cluster, err := argoDB.CreateCluster(ctx, &v1alpha1.Cluster{Server: "http://minikube"})
assert.NoError(t, err, "Test prepare test data create cluster failed")

fakeApp := &v1alpha1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-app",
Namespace: fakeNamespace,
},
Spec: v1alpha1.ApplicationSpec{
Destination: v1alpha1.ApplicationDestination{
Server: cluster.Server,
},
},
}
err = appInformer.GetIndexer().Add(fakeApp)
assert.NoError(t, err)

for _, test := range tests {
info := &clustercache.ClusterInfo{
Server: cluster.Server,
K8SVersion: updatedK8sVersion,
LastCacheSyncTime: test.LastCacheSyncTime,
SyncError: test.SyncError,
ConnectionStatus: test.ConnectionStatus,
}

lister := applisters.NewApplicationLister(appInformer.GetIndexer()).Applications(fakeNamespace)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ require (
)

replace (
github.com/argoproj/gitops-engine => github.com/chetan-rns/gitops-engine v0.1.3-0.20240509130717-b3e1c67fec67
// https://github.com/golang/go/issues/33546#issuecomment-519656923
github.com/go-check/check => github.com/go-check/check v0.0.0-20180628173108-788fd7840127

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,6 @@ github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/appscode/go v0.0.0-20191119085241-0887d8ec2ecc/go.mod h1:OawnOmAL4ZX3YaPdN+8HTNwBveT1jMsqP74moa9XUbE=
github.com/argoproj/gitops-engine v0.7.1-0.20240514190100-8a3ce6d85caa h1:RcIYoAbkaGA7yzpY1YItaTLgKYABDfkITyQ4jUl3Y6c=
github.com/argoproj/gitops-engine v0.7.1-0.20240514190100-8a3ce6d85caa/go.mod h1:Vet2xN0akQpggQJZGmThA8Lozpn26RLagZFmLXw/oSI=
github.com/argoproj/notifications-engine v0.4.1-0.20240403133627-f48567108f01 h1:/V8+HM0VPPTrdjTwUrkIj5a+SjaU//tJwfIXJ1QAOvg=
github.com/argoproj/notifications-engine v0.4.1-0.20240403133627-f48567108f01/go.mod h1:N0A4sEws2soZjEpY4hgZpQS8mRIEw6otzwfkgc3g9uQ=
github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1 h1:qsHwwOJ21K2Ao0xPju1sNuqphyMnMYkyB3ZLoLtxWpo=
Expand Down Expand Up @@ -788,6 +786,8 @@ github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNS
github.com/chai2010/gettext-go v1.0.2/go.mod h1:y+wnP2cHYaVj19NZhYKAwEMH2CI1gNHeQQ+5AjwawxA=
github.com/chainguard-dev/git-urls v1.0.2 h1:pSpT7ifrpc5X55n4aTTm7FFUE+ZQHKiqpiwNkJrVcKQ=
github.com/chainguard-dev/git-urls v1.0.2/go.mod h1:rbGgj10OS7UgZlbzdUQIQpT0k/D4+An04HJY7Ol+Y/o=
github.com/chetan-rns/gitops-engine v0.1.3-0.20240509130717-b3e1c67fec67 h1:MTqQCeBINPIXxuhynmozQA6Y3Ijv59TemYmImd3UFy8=
github.com/chetan-rns/gitops-engine v0.1.3-0.20240509130717-b3e1c67fec67/go.mod h1:Vet2xN0akQpggQJZGmThA8Lozpn26RLagZFmLXw/oSI=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down