Skip to content

Commit

Permalink
worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-zhu committed Oct 20, 2022
1 parent d02b7d5 commit 2e1c2a0
Show file tree
Hide file tree
Showing 21 changed files with 117 additions and 72 deletions.
109 changes: 73 additions & 36 deletions pilot/pkg/model/push_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ func (ps *PushContext) IsClusterLocal(service *Service) bool {
// InitContext will initialize the data structures used for code generation.
// This should be called before starting the push, from the thread creating
// the push context.
func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) error {
func (ps *PushContext) InitContext(env *Environment, wp *PushContextWorkerPool, oldPushContext *PushContext, pushReq *PushRequest) error {
// Acquire a lock to ensure we don't concurrently initialize the same PushContext.
// If this does happen, one thread will block then exit early from InitDone=true
ps.initializeMutex.Lock()
Expand All @@ -1100,11 +1100,11 @@ func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext

// create new or incremental update
if pushReq == nil || oldPushContext == nil || !oldPushContext.InitDone.Load() || len(pushReq.ConfigsUpdated) == 0 {
if err := ps.createNewContext(env); err != nil {
if err := ps.createNewContext(env, wp); err != nil {
return err
}
} else {
if err := ps.updateContext(env, oldPushContext, pushReq); err != nil {
if err := ps.updateContext(env, wp, oldPushContext, pushReq); err != nil {
return err
}
}
Expand All @@ -1117,7 +1117,7 @@ func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext
return nil
}

func (ps *PushContext) createNewContext(env *Environment) error {
func (ps *PushContext) createNewContext(env *Environment, wp *PushContextWorkerPool) error {
if err := ps.initServiceRegistry(env); err != nil {
return err
}
Expand Down Expand Up @@ -1164,14 +1164,15 @@ func (ps *PushContext) createNewContext(env *Environment) error {
}

// Must be initialized in the end
if err := ps.initSidecarScopes(env); err != nil {
if err := ps.initSidecarScopes(env, wp); err != nil {
return err
}
return nil
}

func (ps *PushContext) updateContext(
env *Environment,
wp *PushContextWorkerPool,
oldPushContext *PushContext,
pushReq *PushRequest) error {
var servicesChanged, virtualServicesChanged, destinationRulesChanged, gatewayChanged,
Expand Down Expand Up @@ -1352,7 +1353,7 @@ func (ps *PushContext) updateContext(
// Sidecars need to be updated if services, virtual services, destination rules, or the sidecar configs change
if servicesChanged || virtualServicesChanged || destinationRulesChanged || sidecarsChanged {
log.Infof("[Ying] sidecar scope changed")
if err := ps.initSidecarScopes(env); err != nil {
if err := ps.initSidecarScopes(env, wp); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -1618,7 +1619,7 @@ func (ps *PushContext) initDefaultExportMaps() {
// When proxies connect to Pilot, we identify the sidecar scope associated
// with the proxy and derive listeners/routes/clusters based on the sidecar
// scope.
func (ps *PushContext) initSidecarScopes(env *Environment) error {
func (ps *PushContext) initSidecarScopes(env *Environment, wp *PushContextWorkerPool) error {
t := time.Now()
sidecarConfigs, err := env.List(gvk.Sidecar, NamespaceAll)
if err != nil {
Expand Down Expand Up @@ -1664,40 +1665,34 @@ func (ps *PushContext) initSidecarScopes(env *Environment) error {
}
ps.sidecarIndex.rootConfig = rootNSConfig

numWorkers := 100
configsPerWorker := len(sidecarConfigs) / 100
if configsPerWorker == 0 {
configsPerWorker = 1
}

ch := make(chan *SidecarScope)

var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func(i int) {
defer wg.Done()
// [start,end)
start := i * configsPerWorker
end := (i + 1) * configsPerWorker
if len(sidecarConfigs) < end {
end = len(sidecarConfigs)
}
if wp != nil {
ch := make(chan *SidecarScope)

for j := start; j < end; j++ {
c := sidecarConfigs[j]
n := len(sidecarConfigs)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
work := func() {
defer wg.Done()
c := sidecarConfigs[i]
ch <- ConvertToSidecarScope(ps, &c, c.Namespace)
}
}(i)
}
wp.PushWork(work)
}

go func() {
wg.Wait()
close(ch)
}()
go func() {
wg.Wait()
close(ch)
}()

for sidecarScope := range ch {
ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace] = append(ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace], sidecarScope)
for sidecarScope := range ch {
ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace] = append(ps.sidecarIndex.sidecarsByNamespace[sidecarScope.Namespace], sidecarScope)
}
} else {
for _, sidecarConfig := range sidecarConfigs {
ps.sidecarIndex.sidecarsByNamespace[sidecarConfig.Namespace] = append(ps.sidecarIndex.sidecarsByNamespace[sidecarConfig.Namespace],
ConvertToSidecarScope(ps, &sidecarConfig, sidecarConfig.Namespace))
}
}

log.Infof("[Ying] populate sidecarIndex took %v seconds", time.Since(t))
Expand Down Expand Up @@ -2293,3 +2288,45 @@ func (ps *PushContext) ReferenceAllowed(kind config.GroupVersionKind, resourceNa
}
return false
}

type PushContextWorkerPool struct {
workerCount int

workQueue chan func()
}

func NewPushContextWorkerPool(workerCount int) *PushContextWorkerPool {
// There must be at least one worker.
if workerCount < 1 {
workerCount = 1
}

pool := &PushContextWorkerPool{
workerCount: workerCount,
workQueue: make(chan func(), workerCount),
}
return pool
}

func (p *PushContextWorkerPool) GetWorkerCount() int {
return p.workerCount
}

func (p *PushContextWorkerPool) PushWork(w func()) {
p.workQueue <- w
}

func (p *PushContextWorkerPool) Run(stop <-chan struct{}) {
for i := 0; i < p.workerCount; i++ {
go func() {
for {
select {
case f := <-p.workQueue:
f()
case <-stop:
return
}
}
}()
}
}
6 changes: 3 additions & 3 deletions pilot/pkg/model/push_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func TestServiceIndex(t *testing.T) {

// Init a new push context
pc := NewPushContext()
if err := pc.InitContext(env, nil, nil); err != nil {
if err := pc.InitContext(env, nil, nil, nil); err != nil {
t.Fatal(err)
}
si := pc.ServiceIndex
Expand Down Expand Up @@ -917,14 +917,14 @@ func TestInitPushContext(t *testing.T) {

// Init a new push context
old := NewPushContext()
if err := old.InitContext(env, nil, nil); err != nil {
if err := old.InitContext(env, nil, nil, nil); err != nil {
t.Fatal(err)
}

// Create a new one, copying from the old one
// Pass a ConfigsUpdated otherwise we would just copy it directly
newPush := NewPushContext()
if err := newPush.InitContext(env, old, &PushRequest{
if err := newPush.InitContext(env, nil, old, &PushRequest{
ConfigsUpdated: map[ConfigKey]struct{}{
{Kind: gvk.Secret}: {},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestClusterPatching(t *testing.T) {
serviceDiscovery := memory.NewServiceDiscovery(nil)
env := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches))
push := model.NewPushContext()
push.InitContext(env, nil, nil)
push.InitContext(env, nil, nil, nil)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
efw := push.EnvoyFilters(tc.proxy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestInsertedExtensionConfig(t *testing.T) {
serviceDiscovery := memory.NewServiceDiscovery(nil)
env := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches))
push := model.NewPushContext()
push.InitContext(env, nil, nil)
push.InitContext(env, nil, nil, nil)

for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newTestEnvironment(serviceDiscovery model.ServiceDiscovery, meshConfig mesh

e.PushContext = model.NewPushContext()
e.Init()
_ = e.PushContext.InitContext(e, nil, nil)
_ = e.PushContext.InitContext(e, nil, nil, nil)

return e
}
Expand Down Expand Up @@ -1699,7 +1699,7 @@ func TestApplyListenerPatches(t *testing.T) {
serviceDiscovery := memregistry.NewServiceDiscovery(nil)
e := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches))
push := model.NewPushContext()
_ = push.InitContext(e, nil, nil)
_ = push.InitContext(e, nil, nil, nil)

type args struct {
patchContext networking.EnvoyFilter_PatchContext
Expand Down Expand Up @@ -1841,7 +1841,7 @@ func BenchmarkTelemetryV2Filters(b *testing.B) {
serviceDiscovery := memregistry.NewServiceDiscovery(nil)
e := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches))
push := model.NewPushContext()
_ = push.InitContext(e, nil, nil)
_ = push.InitContext(e, nil, nil, nil)

var got interface{}
b.ResetTimer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func TestApplyRouteConfigurationPatches(t *testing.T) {
serviceDiscovery := memory.NewServiceDiscovery(nil)
env := newTestEnvironment(serviceDiscovery, testMesh, buildEnvoyFilterConfigStore(configPatches))
push := model.NewPushContext()
push.InitContext(env, nil, nil)
push.InitContext(env, nil, nil, nil)

sidecarNode := &model.Proxy{Type: model.SidecarProxy, ConfigNamespace: "not-default"}
gatewayNode := &model.Proxy{Type: model.Router, ConfigNamespace: "not-default"}
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/networking/core/v1alpha3/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func NewConfigGenTest(t test.Failer, opts TestOptions) *ConfigGenTest {
if err := env.InitNetworksManager(&FakeXdsUpdater{}); err != nil {
t.Fatal(err)
}
if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
t.Fatalf("Failed to initialize push context: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/networking/core/v1alpha3/httproute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,7 @@ func testSidecarRDSVHosts(t *testing.T, services []*model.Service,

env := buildListenerEnvWithAdditionalConfig(services, virtualServices, nil)

if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
t.Fatalf("failed to initialize push context")
}
if registryOnly {
Expand Down
4 changes: 2 additions & 2 deletions pilot/pkg/networking/core/v1alpha3/listener_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestVirtualListenerBuilder(t *testing.T) {
services := []*model.Service{service}

env := buildListenerEnv(services)
if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
t.Fatalf("init push context error: %s", err.Error())
}
instances := make([]*model.ServiceInstance, len(services))
Expand Down Expand Up @@ -128,7 +128,7 @@ func prepareListeners(t *testing.T, services []*model.Service, mode model.Traffi
ldsEnv := getDefaultLdsEnv()

env := buildListenerEnv(services)
if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
t.Fatalf("init push context error: %s", err.Error())
}
instances := make([]*model.ServiceInstance, len(services))
Expand Down
14 changes: 7 additions & 7 deletions pilot/pkg/networking/core/v1alpha3/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,7 +1714,7 @@ func TestHttpProxyListener(t *testing.T) {
configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{})

env := buildListenerEnv(nil)
if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
t.Fatalf("error in initializing push context: %s", err)
}

Expand All @@ -1740,7 +1740,7 @@ func TestHttpProxyListenerPerWorkload(t *testing.T) {
configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{})

env := buildListenerEnv(nil)
if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
t.Fatalf("error in initializing push context: %s", err)
}

Expand Down Expand Up @@ -2166,7 +2166,7 @@ func TestHttpProxyListener_Tracing(t *testing.T) {
features.EnableIstioTags = !tc.disableIstioTags

env := buildListenerEnv(nil)
if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
t.Fatalf("error in initializing push context: %s", err)
}

Expand Down Expand Up @@ -2440,7 +2440,7 @@ func getOldestService(services ...*model.Service) *model.Service {
func buildAllListeners(p plugin.Plugin, env *model.Environment) []*listener.Listener {
configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{})

if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
return nil
}

Expand Down Expand Up @@ -2478,7 +2478,7 @@ func buildInboundListeners(t *testing.T, p plugin.Plugin, proxy *model.Proxy, si
t.Helper()
configgen := NewConfigGenerator([]plugin.Plugin{p}, &model.DisabledCache{})
env := buildListenerEnv(services)
if err := env.PushContext.InitContext(env, nil, nil); err != nil {
if err := env.PushContext.InitContext(env, nil, nil, nil); err != nil {
return nil
}
proxy.SetServiceInstances(env)
Expand Down Expand Up @@ -2694,7 +2694,7 @@ func buildListenerEnvWithAdditionalConfig(services []*model.Service, virtualServ
func TestAppendListenerFallthroughRouteForCompleteListener(t *testing.T) {
env := buildListenerEnv(nil)
push := model.NewPushContext()
_ = push.InitContext(env, nil, nil)
_ = push.InitContext(env, nil, nil, nil)

tests := []struct {
name string
Expand Down Expand Up @@ -2766,7 +2766,7 @@ func TestAppendListenerFallthroughRouteForCompleteListener(t *testing.T) {
func TestMergeTCPFilterChains(t *testing.T) {
env := buildListenerEnv(nil)
push := model.NewPushContext()
_ = push.InitContext(env, nil, nil)
_ = push.InitContext(env, nil, nil, nil)

node := &model.Proxy{
ID: "foo.bar",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func buildEnvForClustersWithDistribute(distribute []*networking.LocalityLoadBala

env.PushContext = model.NewPushContext()
env.Init()
_ = env.PushContext.InitContext(env, nil, nil)
_ = env.PushContext.InitContext(env, nil, nil, nil)
env.PushContext.SetDestinationRules([]config.Config{
{
Meta: config.Meta{
Expand Down Expand Up @@ -659,7 +659,7 @@ func buildEnvForClustersWithFailover() *model.Environment {

env.PushContext = model.NewPushContext()
env.Init()
_ = env.PushContext.InitContext(env, nil, nil)
_ = env.PushContext.InitContext(env, nil, nil, nil)
env.PushContext.SetDestinationRules([]config.Config{
{
Meta: config.Meta{
Expand Down Expand Up @@ -713,7 +713,7 @@ func buildEnvForClustersWithFailoverPriority(failoverPriority []string) *model.E

env.PushContext = model.NewPushContext()
env.Init()
_ = env.PushContext.InitContext(env, nil, nil)
_ = env.PushContext.InitContext(env, nil, nil, nil)
env.PushContext.SetDestinationRules([]config.Config{
{
Meta: config.Meta{
Expand Down
Loading

0 comments on commit 2e1c2a0

Please sign in to comment.