diff --git a/pilot/pkg/model/push_context.go b/pilot/pkg/model/push_context.go index cc755e63e7df..2c6ffb4b3c83 100644 --- a/pilot/pkg/model/push_context.go +++ b/pilot/pkg/model/push_context.go @@ -1081,7 +1081,7 @@ func (ps *PushContext) IsClusterLocal(service *Service) bool { // InitContext will initialize the data structures used for code generation. // This should be called before starting the push, from the thread creating // the push context. -func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) error { +func (ps *PushContext) InitContext(env *Environment, wp *PushContextWorkerPool, oldPushContext *PushContext, pushReq *PushRequest) error { // Acquire a lock to ensure we don't concurrently initialize the same PushContext. // If this does happen, one thread will block then exit early from InitDone=true ps.initializeMutex.Lock() @@ -1100,11 +1100,11 @@ func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext // create new or incremental update if pushReq == nil || oldPushContext == nil || !oldPushContext.InitDone.Load() || len(pushReq.ConfigsUpdated) == 0 { - if err := ps.createNewContext(env); err != nil { + if err := ps.createNewContext(env, wp); err != nil { return err } } else { - if err := ps.updateContext(env, oldPushContext, pushReq); err != nil { + if err := ps.updateContext(env, wp, oldPushContext, pushReq); err != nil { return err } } @@ -1117,7 +1117,7 @@ func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext return nil } -func (ps *PushContext) createNewContext(env *Environment) error { +func (ps *PushContext) createNewContext(env *Environment, wp *PushContextWorkerPool) error { if err := ps.initServiceRegistry(env); err != nil { return err } @@ -1164,7 +1164,7 @@ func (ps *PushContext) createNewContext(env *Environment) error { } // Must be initialized in the end - if err := ps.initSidecarScopes(env); err != nil { + if err := ps.initSidecarScopes(env, wp); err != nil { return err } return nil @@ -1172,6 +1172,7 @@ func (ps *PushContext) createNewContext(env *Environment) error { func (ps *PushContext) updateContext( env *Environment, + wp *PushContextWorkerPool, oldPushContext *PushContext, pushReq *PushRequest) error { var servicesChanged, virtualServicesChanged, destinationRulesChanged, gatewayChanged, @@ -1352,7 +1353,7 @@ func (ps *PushContext) updateContext( // Sidecars need to be updated if services, virtual services, destination rules, or the sidecar configs change if servicesChanged || virtualServicesChanged || destinationRulesChanged || sidecarsChanged { log.Infof("[Ying] sidecar scope changed") - if err := ps.initSidecarScopes(env); err != nil { + if err := ps.initSidecarScopes(env, wp); err != nil { return err } } else { @@ -1618,7 +1619,7 @@ func (ps *PushContext) initDefaultExportMaps() { // When proxies connect to Pilot, we identify the sidecar scope associated // with the proxy and derive listeners/routes/clusters based on the sidecar // scope. -func (ps *PushContext) initSidecarScopes(env *Environment) error { +func (ps *PushContext) initSidecarScopes(env *Environment, wp *PushContextWorkerPool) error { t := time.Now() sidecarConfigs, err := env.List(gvk.Sidecar, NamespaceAll) if err != nil { @@ -1664,40 +1665,34 @@ func (ps *PushContext) initSidecarScopes(env *Environment) error { } ps.sidecarIndex.rootConfig = rootNSConfig - numWorkers := 100 - configsPerWorker := len(sidecarConfigs) / 100 - if configsPerWorker == 0 { - configsPerWorker = 1 - } - - ch := make(chan *SidecarScope) - - var wg sync.WaitGroup - wg.Add(numWorkers) - for i := 0; i < numWorkers; i++ { - go func(i int) { - defer wg.Done() - // [start,end) - start := i * configsPerWorker - end := (i + 1) * configsPerWorker - if len(sidecarConfigs) < end { - end = len(sidecarConfigs) - } + if wp != nil { + ch := make(chan *SidecarScope) - for j := start; j < end; j++ { - c := sidecarConfigs[j] + n := len(sidecarConfigs) + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + work := func() { + defer wg.Done() + c := sidecarConfigs[i] ch <- ConvertToSidecarScope(ps, &c, c.Namespace) } - }(i) - } + wp.PushWork(work) + } - go func() { - wg.Wait() - close(ch) - }() + go func() { + wg.Wait() + close(ch) + }() - for sidecarScope := range ch { - ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace] = append(ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace], sidecarScope) + for sidecarScope := range ch { + ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace] = append(ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace], sidecarScope) + } + } else { + for _, sidecarConfig := range sidecarConfigs { + ps.sidecarIndex.sidecarsByNamespace[sidecarConfig.Namespace] = append(ps.sidecarIndex.sidecarsByNamespace[sidecarConfig.Namespace], + ConvertToSidecarScope(ps, &sidecarConfig, sidecarConfig.Namespace)) + } } log.Infof("[Ying] populate sidecarIndex took %v seconds", time.Since(t)) @@ -2293,3 +2288,45 @@ func (ps *PushContext) ReferenceAllowed(kind config.GroupVersionKind, resourceNa } return false } + +type PushContextWorkerPool struct { + workerCount int + + workQueue chan func() +} + +func NewPushContextWorkerPool(workerCount int) *PushContextWorkerPool { + // There must be at least one worker. + if workerCount < 1 { + workerCount = 1 + } + + pool := &PushContextWorkerPool{ + workerCount: workerCount, + workQueue: make(chan func(), workerCount), + } + return pool +} + +func (p *PushContextWorkerPool) GetWorkerCount() int { + return p.workerCount +} + +func (p *PushContextWorkerPool) PushWork(w func()) { + p.workQueue <- w +} + +func (p *PushContextWorkerPool) Run(stop <-chan struct{}) { + for i := 0; i < p.workerCount; i++ { + go func() { + for { + select { + case f := <-p.workQueue: + f() + case <-stop: + return + } + } + }() + } +} diff --git a/pilot/pkg/model/push_context_test.go b/pilot/pkg/model/push_context_test.go index 99a4efbaad21..be557a7eced7 100644 --- a/pilot/pkg/model/push_context_test.go +++ b/pilot/pkg/model/push_context_test.go @@ -696,7 +696,7 @@ func TestServiceIndex(t *testing.T) { // Init a new push context pc := NewPushContext() - if err := pc.InitContext(env, nil, nil); err != nil { + if err := pc.InitContext(env, nil, nil, nil); err != nil { t.Fatal(err) } si := pc.ServiceIndex @@ -917,14 +917,14 @@ func TestInitPushContext(t *testing.T) { // Init a new push context old := NewPushContext() - if err := old.InitContext(env, nil, nil); err != nil { + if err := old.InitContext(env, nil, nil, nil); err != nil { t.Fatal(err) } // Create a new one, copying from the old one // Pass a ConfigsUpdated otherwise we would just copy it directly newPush := NewPushContext() - if err := newPush.InitContext(env, old, &PushRequest{ + if err := newPush.InitContext(env, nil, old, &PushRequest{ ConfigsUpdated: map[ConfigKey]struct{}{ {Kind: gvk.Secret}: {}, }, diff --git a/pilot/pkg/networking/core/v1alpha3/envoyfilter/cluster_patch_test.go b/pilot/pkg/networking/core/v1alpha3/envoyfilter/cluster_patch_test.go index 59d74a2aa1a0..312262d7c722 100644 --- a/pilot/pkg/networking/core/v1alpha3/envoyfilter/cluster_patch_test.go +++ b/pilot/pkg/networking/core/v1alpha3/envoyfilter/cluster_patch_test.go @@ -608,7 +608,7 @@ func TestClusterPatching(t *testing.T) { serviceDiscovery := memory.NewServiceDiscovery(nil) env := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches)) push := model.NewPushContext() - push.InitContext(env, nil, nil) + push.InitContext(env, nil, nil, nil) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { efw := push.EnvoyFilters(tc.proxy) diff --git a/pilot/pkg/networking/core/v1alpha3/envoyfilter/extension_configuration_patch_test.go b/pilot/pkg/networking/core/v1alpha3/envoyfilter/extension_configuration_patch_test.go index fba5d9c51817..c8df3c3f8693 100644 --- a/pilot/pkg/networking/core/v1alpha3/envoyfilter/extension_configuration_patch_test.go +++ b/pilot/pkg/networking/core/v1alpha3/envoyfilter/extension_configuration_patch_test.go @@ -91,7 +91,7 @@ func TestInsertedExtensionConfig(t *testing.T) { serviceDiscovery := memory.NewServiceDiscovery(nil) env := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches)) push := model.NewPushContext() - push.InitContext(env, nil, nil) + push.InitContext(env, nil, nil, nil) for _, c := range testCases { t.Run(c.name, func(t *testing.T) { diff --git a/pilot/pkg/networking/core/v1alpha3/envoyfilter/listener_patch_test.go b/pilot/pkg/networking/core/v1alpha3/envoyfilter/listener_patch_test.go index 747d2ccb1a07..91000a76966a 100644 --- a/pilot/pkg/networking/core/v1alpha3/envoyfilter/listener_patch_test.go +++ b/pilot/pkg/networking/core/v1alpha3/envoyfilter/listener_patch_test.go @@ -104,7 +104,7 @@ func newTestEnvironment(serviceDiscovery model.ServiceDiscovery, meshConfig mesh e.PushContext = model.NewPushContext() e.Init() - _ = e.PushContext.InitContext(e, nil, nil) + _ = e.PushContext.InitContext(e, nil, nil, nil) return e } @@ -1699,7 +1699,7 @@ func TestApplyListenerPatches(t *testing.T) { serviceDiscovery := memregistry.NewServiceDiscovery(nil) e := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches)) push := model.NewPushContext() - _ = push.InitContext(e, nil, nil) + _ = push.InitContext(e, nil, nil, nil) type args struct { patchContext networking.EnvoyFilter_PatchContext @@ -1841,7 +1841,7 @@ func BenchmarkTelemetryV2Filters(b *testing.B) { serviceDiscovery := memregistry.NewServiceDiscovery(nil) e := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches)) push := model.NewPushContext() - _ = push.InitContext(e, nil, nil) + _ = push.InitContext(e, nil, nil, nil) var got interface{} b.ResetTimer() diff --git a/pilot/pkg/networking/core/v1alpha3/envoyfilter/rc_patch_test.go b/pilot/pkg/networking/core/v1alpha3/envoyfilter/rc_patch_test.go index b60086f1eb3e..7fa21ce2ada7 100644 --- a/pilot/pkg/networking/core/v1alpha3/envoyfilter/rc_patch_test.go +++ b/pilot/pkg/networking/core/v1alpha3/envoyfilter/rc_patch_test.go @@ -656,7 +656,7 @@ func TestApplyRouteConfigurationPatches(t *testing.T) { serviceDiscovery := memory.NewServiceDiscovery(nil) env := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches)) push := model.NewPushContext() - push.InitContext(env, nil, nil) + push.InitContext(env, nil, nil, nil) sidecarNode := &model.Proxy{Type: model.SidecarProxy, ConfigNamespace: "not-default"} gatewayNode := &model.Proxy{Type: model.Router, ConfigNamespace: "not-default"} diff --git a/pilot/pkg/networking/core/v1alpha3/fake.go b/pilot/pkg/networking/core/v1alpha3/fake.go index 783f71d40fc9..d6f83fc6a98a 100644 --- a/pilot/pkg/networking/core/v1alpha3/fake.go +++ b/pilot/pkg/networking/core/v1alpha3/fake.go @@ -178,7 +178,7 @@ func NewConfigGenTest(t test.Failer, opts TestOptions) *ConfigGenTest { if err := env.InitNetworksManager(&FakeXdsUpdater{}); err != nil { t.Fatal(err) } - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { t.Fatalf("Failed to initialize push context: %v", err) } } diff --git a/pilot/pkg/networking/core/v1alpha3/httproute_test.go b/pilot/pkg/networking/core/v1alpha3/httproute_test.go index 4ab925269c45..db8f355979b9 100644 --- a/pilot/pkg/networking/core/v1alpha3/httproute_test.go +++ b/pilot/pkg/networking/core/v1alpha3/httproute_test.go @@ -1512,7 +1512,7 @@ func testSidecarRDSVHosts(t *testing.T, services []*model.Service, env := buildListenerEnvWithAdditionalConfig(services, virtualServices, nil) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { t.Fatalf("failed to initialize push context") } if registryOnly { diff --git a/pilot/pkg/networking/core/v1alpha3/listener_builder_test.go b/pilot/pkg/networking/core/v1alpha3/listener_builder_test.go index f44014f62fe1..82cf211e2f9b 100644 --- a/pilot/pkg/networking/core/v1alpha3/listener_builder_test.go +++ b/pilot/pkg/networking/core/v1alpha3/listener_builder_test.go @@ -77,7 +77,7 @@ func TestVirtualListenerBuilder(t *testing.T) { services := []*model.Service{service} env := buildListenerEnv(services) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { t.Fatalf("init push context error: %s", err.Error()) } instances := make([]*model.ServiceInstance, len(services)) @@ -128,7 +128,7 @@ func prepareListeners(t *testing.T, services []*model.Service, mode model.Traffi ldsEnv := getDefaultLdsEnv() env := buildListenerEnv(services) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { t.Fatalf("init push context error: %s", err.Error()) } instances := make([]*model.ServiceInstance, len(services)) diff --git a/pilot/pkg/networking/core/v1alpha3/listener_test.go b/pilot/pkg/networking/core/v1alpha3/listener_test.go index 7556de22a2d9..834c3a6ee3db 100644 --- a/pilot/pkg/networking/core/v1alpha3/listener_test.go +++ b/pilot/pkg/networking/core/v1alpha3/listener_test.go @@ -1714,7 +1714,7 @@ func TestHttpProxyListener(t *testing.T) { configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{}) env := buildListenerEnv(nil) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { t.Fatalf("error in initializing push context: %s", err) } @@ -1740,7 +1740,7 @@ func TestHttpProxyListenerPerWorkload(t *testing.T) { configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{}) env := buildListenerEnv(nil) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { t.Fatalf("error in initializing push context: %s", err) } @@ -2166,7 +2166,7 @@ func TestHttpProxyListener_Tracing(t *testing.T) { features.EnableIstioTags = !tc.disableIstioTags env := buildListenerEnv(nil) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { t.Fatalf("error in initializing push context: %s", err) } @@ -2440,7 +2440,7 @@ func getOldestService(services ...*model.Service) *model.Service { func buildAllListeners(p plugin.Plugin, env *model.Environment) []*listener.Listener { configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{}) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { return nil } @@ -2478,7 +2478,7 @@ func buildInboundListeners(t *testing.T, p plugin.Plugin, proxy *model.Proxy, si t.Helper() configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{}) env := buildListenerEnv(services) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { return nil } proxy.SetServiceInstances(env) @@ -2694,7 +2694,7 @@ func buildListenerEnvWithAdditionalConfig(services []*model.Service, virtualServ func TestAppendListenerFallthroughRouteForCompleteListener(t *testing.T) { env := buildListenerEnv(nil) push := model.NewPushContext() - _ = push.InitContext(env, nil, nil) + _ = push.InitContext(env, nil, nil, nil) tests := []struct { name string @@ -2766,7 +2766,7 @@ func TestAppendListenerFallthroughRouteForCompleteListener(t *testing.T) { func TestMergeTCPFilterChains(t *testing.T) { env := buildListenerEnv(nil) push := model.NewPushContext() - _ = push.InitContext(env, nil, nil) + _ = push.InitContext(env, nil, nil, nil) node := &model.Proxy{ ID: "foo.bar", diff --git a/pilot/pkg/networking/core/v1alpha3/loadbalancer/loadbalancer_test.go b/pilot/pkg/networking/core/v1alpha3/loadbalancer/loadbalancer_test.go index 32676edacedf..194d1660daed 100644 --- a/pilot/pkg/networking/core/v1alpha3/loadbalancer/loadbalancer_test.go +++ b/pilot/pkg/networking/core/v1alpha3/loadbalancer/loadbalancer_test.go @@ -600,7 +600,7 @@ func buildEnvForClustersWithDistribute(distribute []*networking.LocalityLoadBala env.PushContext = model.NewPushContext() env.Init() - _ = env.PushContext.InitContext(env, nil, nil) + _ = env.PushContext.InitContext(env, nil, nil, nil) env.PushContext.SetDestinationRules([]config.Config{ { Meta: config.Meta{ @@ -659,7 +659,7 @@ func buildEnvForClustersWithFailover() *model.Environment { env.PushContext = model.NewPushContext() env.Init() - _ = env.PushContext.InitContext(env, nil, nil) + _ = env.PushContext.InitContext(env, nil, nil, nil) env.PushContext.SetDestinationRules([]config.Config{ { Meta: config.Meta{ @@ -713,7 +713,7 @@ func buildEnvForClustersWithFailoverPriority(failoverPriority []string) *model.E env.PushContext = model.NewPushContext() env.Init() - _ = env.PushContext.InitContext(env, nil, nil) + _ = env.PushContext.InitContext(env, nil, nil, nil) env.PushContext.SetDestinationRules([]config.Config{ { Meta: config.Meta{ diff --git a/pilot/pkg/networking/core/v1alpha3/networkfilter_test.go b/pilot/pkg/networking/core/v1alpha3/networkfilter_test.go index 4a93f3fc65ac..8f902292349f 100644 --- a/pilot/pkg/networking/core/v1alpha3/networkfilter_test.go +++ b/pilot/pkg/networking/core/v1alpha3/networkfilter_test.go @@ -81,7 +81,7 @@ func TestInboundNetworkFilterStatPrefix(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { env := buildListenerEnv(services) - env.PushContext.InitContext(env, nil, nil) + env.PushContext.InitContext(env, nil, nil, nil) env.PushContext.Mesh.InboundClusterStatName = tt.statPattern instance := &model.ServiceInstance{ @@ -143,7 +143,7 @@ func TestInboundNetworkFilterIdleTimeout(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { env := buildListenerEnv(services) - env.PushContext.InitContext(env, nil, nil) + env.PushContext.InitContext(env, nil, nil, nil) instance := &model.ServiceInstance{ Service: &model.Service{ @@ -270,7 +270,7 @@ func TestOutboundNetworkFilterStatPrefix(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { env := buildListenerEnv(services) - env.PushContext.InitContext(env, nil, nil) + env.PushContext.InitContext(env, nil, nil, nil) env.PushContext.Mesh.OutboundClusterStatName = tt.statPattern proxy := getProxy() @@ -394,7 +394,7 @@ func TestOutboundNetworkFilterWithSourceIPHashing(t *testing.T) { destinationRules := []*config.Config{&destinationRule, &simpleDestinationRule, &subsetdestinationRule, &subsetDifferentdestinationRule} env := buildListenerEnvWithAdditionalConfig(services, nil, destinationRules) - env.PushContext.InitContext(env, nil, nil) + env.PushContext.InitContext(env, nil, nil, nil) proxy := getProxy() proxy.IstioVersion = model.ParseIstioVersion(proxy.Metadata.IstioVersion) diff --git a/pilot/pkg/networking/grpcgen/grpcgen_test.go b/pilot/pkg/networking/grpcgen/grpcgen_test.go index 074a5a6bf163..d2dce8310511 100644 --- a/pilot/pkg/networking/grpcgen/grpcgen_test.go +++ b/pilot/pkg/networking/grpcgen/grpcgen_test.go @@ -151,7 +151,7 @@ func TestGRPC(t *testing.T) { env := ds.DiscoveryServer.Env env.Init() - if err := env.PushContext.InitContext(env, env.PushContext, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, env.PushContext, nil); err != nil { t.Fatal(err) } ds.DiscoveryServer.UpdateServiceShards(env.PushContext) diff --git a/pilot/pkg/xds/ads.go b/pilot/pkg/xds/ads.go index e0cfa8ccff22..b79178b8f585 100644 --- a/pilot/pkg/xds/ads.go +++ b/pilot/pkg/xds/ads.go @@ -290,8 +290,9 @@ func (s *DiscoveryServer) Stream(stream DiscoveryStream) error { log.Debugf("Unauthenticated XDS: %s", peerAddr) } + log.Info("[Ying] Init context called") // InitContext returns immediately if the context was already initialized. - if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil { + if err = s.globalPushContext().InitContext(s.Env, s.pushContextWorkerPool, nil, nil); err != nil { // Error accessing the data - log and close, maybe a different pilot replica // has more luck log.Warnf("Error reading config %v", err) diff --git a/pilot/pkg/xds/bench_test.go b/pilot/pkg/xds/bench_test.go index 4a04054368c1..990197a853a8 100644 --- a/pilot/pkg/xds/bench_test.go +++ b/pilot/pkg/xds/bench_test.go @@ -453,7 +453,7 @@ func setupAndInitializeTest(t testing.TB, config ConfigInput) (*FakeDiscoverySer } func initPushContext(env *model.Environment, proxy *model.Proxy) { - env.PushContext.InitContext(env, nil, nil) + env.PushContext.InitContext(env, nil, nil, nil) proxy.SetSidecarScope(env.PushContext) proxy.SetGatewaysForProxy(env.PushContext) proxy.SetServiceInstances(env.ServiceDiscovery) diff --git a/pilot/pkg/xds/delta.go b/pilot/pkg/xds/delta.go index bfa59cb41b54..243a6b135133 100644 --- a/pilot/pkg/xds/delta.go +++ b/pilot/pkg/xds/delta.go @@ -76,7 +76,7 @@ func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error { } // InitContext returns immediately if the context was already initialized. - if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil { + if err = s.globalPushContext().InitContext(s.Env, s.pushContextWorkerPool, nil, nil); err != nil { // Error accessing the data - log and close, maybe a different pilot replica // has more luck log.Warnf("Error reading config %v", err) diff --git a/pilot/pkg/xds/discovery.go b/pilot/pkg/xds/discovery.go index 8aa246071909..e6421882f9c9 100644 --- a/pilot/pkg/xds/discovery.go +++ b/pilot/pkg/xds/discovery.go @@ -122,6 +122,10 @@ type DiscoveryServer struct { // mutex used for protecting Environment.PushContext updateMutex sync.RWMutex + // pushContextWorkerPool is used for calculating push context. Right now + // only sidecar scope calculation uses it. + pushContextWorkerPool *model.PushContextWorkerPool + // pushQueue is the buffer that used after debounce and before the real xds push. pushQueue *PushQueue @@ -195,6 +199,7 @@ func NewDiscoveryServer(env *model.Environment, plugins []string, instanceID str InboundUpdates: atomic.NewInt64(0), CommittedUpdates: atomic.NewInt64(0), pushChannel: make(chan *model.PushRequest, 10), + pushContextWorkerPool: model.NewPushContextWorkerPool(10), pushQueue: NewPushQueue(), debugHandlers: map[string]string{}, adsClients: map[string]*Connection{}, @@ -396,6 +401,8 @@ func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) { // It ensures that at minimum minQuiet time has elapsed since the last event before processing it. // It also ensures that at most maxDelay is elapsed between receiving an event and processing it. func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) { + s.pushContextWorkerPool.Run(stopCh) + log.Info("[Ying] push context worker pool started") debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates) } @@ -552,7 +559,7 @@ func (s *DiscoveryServer) initPushContext(req *model.PushRequest, oldPushContext push := model.NewPushContext() push.PushVersion = version push.JwtKeyResolver = s.JwtKeyResolver - if err := push.InitContext(s.Env, oldPushContext, req); err != nil { + if err := push.InitContext(s.Env, s.pushContextWorkerPool, oldPushContext, req); err != nil { log.Errorf("XDS: failed to init push context: %v", err) // We can't push if we can't read the data - stick with previous version. pushContextErrors.Increment() diff --git a/pilot/pkg/xds/ep_filters_test.go b/pilot/pkg/xds/ep_filters_test.go index bfe20a159da5..107d4e5576cc 100644 --- a/pilot/pkg/xds/ep_filters_test.go +++ b/pilot/pkg/xds/ep_filters_test.go @@ -618,7 +618,7 @@ func runNetworkFilterTest(t *testing.T, env *model.Environment, tests []networkF for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { push := model.NewPushContext() - _ = push.InitContext(env, nil, nil) + _ = push.InitContext(env, nil, nil, nil) b := NewEndpointBuilder("outbound|80||example.ns.svc.cluster.local", tt.conn.proxy, push) testEndpoints := b.buildLocalityLbEndpointsFromShards(testShards(), &model.Port{Name: "http", Port: 80, Protocol: protocol.HTTP}) filtered := b.EndpointsByNetworkFilter(testEndpoints) diff --git a/tests/fuzz/networking_core_v1alpha3_envoyfilter_fuzzer.go b/tests/fuzz/networking_core_v1alpha3_envoyfilter_fuzzer.go index c580c35dc2e7..e2e01de8ea28 100644 --- a/tests/fuzz/networking_core_v1alpha3_envoyfilter_fuzzer.go +++ b/tests/fuzz/networking_core_v1alpha3_envoyfilter_fuzzer.go @@ -77,7 +77,7 @@ func InternalFuzzApplyClusterMerge(data []byte) int { serviceDiscovery := memory.NewServiceDiscovery(nil) env := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches)) push := model.NewPushContext() - push.InitContext(env, nil, nil) + push.InitContext(env, nil, nil, nil) efw := push.EnvoyFilters(proxy) _ = ApplyClusterMerge(networking.EnvoyFilter_GATEWAY, efw, c, []host.Name{host.Name(fuzz_host)}) return 1 diff --git a/tests/fuzz/networking_core_v1alpha3_fuzzer.go b/tests/fuzz/networking_core_v1alpha3_fuzzer.go index 2ec0804368a5..9dd8bf770304 100644 --- a/tests/fuzz/networking_core_v1alpha3_fuzzer.go +++ b/tests/fuzz/networking_core_v1alpha3_fuzzer.go @@ -166,7 +166,7 @@ func InternalFuzzbuildSidecarInboundListeners(data []byte) int { return 0 } env := buildListenerEnv(allServices) - if err := env.PushContext.InitContext(env, nil, nil); err != nil { + if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil { return 0 } proxy.SetServiceInstances(env) diff --git a/tests/fuzz/pilot_model_fuzzer.go b/tests/fuzz/pilot_model_fuzzer.go index b7cf60a7426f..6489503185bc 100644 --- a/tests/fuzz/pilot_model_fuzzer.go +++ b/tests/fuzz/pilot_model_fuzzer.go @@ -202,7 +202,7 @@ func FuzzInitContext(data []byte) int { env.Watcher = mesh.NewFixedWatcher(m) env.Init() pc := model.NewPushContext() - _ = pc.InitContext(env, nil, nil) + _ = pc.InitContext(env, nil, nil, nil) return 1 }