Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/endpoint: make state synchronization atomic #32439

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 36 additions & 39 deletions pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,9 @@ func (e *Endpoint) removeOldRedirects(desiredRedirects, realizedRedirects map[st
// Returns the policy revision number when the regeneration has called,
// Whether the new state dir is populated with all new BPF state files,
// and an error if something failed.
func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint64, stateDirComplete bool, reterr error) {
func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint64, reterr error) {
var (
err error
compilationExecuted bool
headerfileChanged bool
err error
)

stats := &regenContext.Stats
Expand All @@ -539,7 +537,7 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
// below (runPreCompilationSteps()), but this caused a deadlock with the
// IPCache. Therefore, we obtain the DNSRules outside the critical section.
rules := e.owner.GetDNSRules(e.ID)
headerfileChanged, err = e.runPreCompilationSteps(regenContext, rules)
err = e.runPreCompilationSteps(regenContext, rules)

// Keep track of the side-effects of the regeneration that need to be
// reverted in case of failure.
Expand All @@ -553,12 +551,12 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
}()

if err != nil {
return 0, false, err
return 0, err
}

// No need to compile BPF in dry mode.
if e.isProperty(PropertyFakeEndpoint) {
return e.nextPolicyRevision, false, nil
return e.nextPolicyRevision, nil
}

// Skip BPF if the endpoint has no policy map
Expand All @@ -572,20 +570,20 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = e.waitForProxyCompletions(datapathRegenCtxt.proxyWaitGroup)
stats.proxyWaitForAck.End(err == nil)
if err != nil {
return 0, false, fmt.Errorf("Error while updating network policy: %w", err)
return 0, fmt.Errorf("Error while updating network policy: %w", err)
}

return e.nextPolicyRevision, false, nil
return e.nextPolicyRevision, nil
}

// Wait for connection tracking cleaning to complete
stats.waitingForCTClean.Start()
<-datapathRegenCtxt.ctCleaned
stats.waitingForCTClean.End(true)

compilationExecuted, err = e.realizeBPFState(regenContext)
err = e.realizeBPFState(regenContext)
if err != nil {
return datapathRegenCtxt.epInfoCache.revision, compilationExecuted, err
return datapathRegenCtxt.epInfoCache.revision, err
}

if !datapathRegenCtxt.epInfoCache.IsHost() || option.Config.EnableHostFirewall {
Expand All @@ -594,7 +592,7 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = lxcmap.WriteEndpoint(datapathRegenCtxt.epInfoCache)
stats.mapSync.End(err == nil)
if err != nil {
return 0, compilationExecuted, fmt.Errorf("Exposing new BPF failed: %w", err)
return 0, fmt.Errorf("Exposing new BPF failed: %w", err)
}
}

Expand All @@ -611,14 +609,14 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = e.waitForProxyCompletions(datapathRegenCtxt.proxyWaitGroup)
stats.proxyWaitForAck.End(err == nil)
if err != nil {
return 0, compilationExecuted, fmt.Errorf("error while configuring proxy redirects: %w", err)
return 0, fmt.Errorf("error while configuring proxy redirects: %w", err)
}

stats.waitingForLock.Start()
err = e.lockAlive()
stats.waitingForLock.End(err == nil)
if err != nil {
return 0, compilationExecuted, err
return 0, err
}
defer e.unlock()

Expand All @@ -637,14 +635,13 @@ func (e *Endpoint) regenerateBPF(regenContext *regenerationContext) (revnum uint
err = e.syncPolicyMap()
stats.mapSync.End(err == nil)
if err != nil {
return 0, compilationExecuted, fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
return 0, fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
}

stateDirComplete = headerfileChanged && compilationExecuted
return datapathRegenCtxt.epInfoCache.revision, stateDirComplete, err
return datapathRegenCtxt.epInfoCache.revision, err
}

func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilationExecuted bool, err error) {
func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (err error) {
stats := &regenContext.Stats
datapathRegenCtxt := regenContext.datapathRegenerationContext
debugEnabled := logging.CanLogAt(e.getLogger().Logger, logrus.DebugLevel)
Expand All @@ -669,7 +666,6 @@ func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilati
} else if !errors.Is(err, context.Canceled) {
e.getLogger().WithError(err).Error("Error while rewriting endpoint BPF program")
}
compilationExecuted = true
} else { // RegenerateWithDatapathLoad
err = e.owner.Datapath().Loader().ReloadDatapath(datapathRegenCtxt.completionCtx, datapathRegenCtxt.epInfoCache, &stats.datapathRealization)
if err == nil {
Expand All @@ -680,23 +676,23 @@ func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilati
}

if err != nil {
return compilationExecuted, err
return err
}
e.bpfHeaderfileHash = datapathRegenCtxt.bpfHeaderfilesHash
} else if debugEnabled {
e.getLogger().WithField(logfields.BPFHeaderfileHash, datapathRegenCtxt.bpfHeaderfilesHash).
Debug("BPF header file unchanged, skipping BPF compilation and installation")
}

return compilationExecuted, nil
return nil
}

// runPreCompilationSteps runs all of the regeneration steps that are necessary
// right before compiling the BPF for the given endpoint.
// The endpoint mutex must not be held.
//
// Returns whether the headerfile changed and/or an error.
func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rules restore.DNSRules) (headerfileChanged bool, preCompilationError error) {
func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rules restore.DNSRules) (preCompilationError error) {
stats := &regenContext.Stats
datapathRegenCtxt := regenContext.datapathRegenerationContext

Expand All @@ -707,14 +703,14 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
policyResult, err := e.regeneratePolicy()
stats.policyCalculation.End(err == nil)
if err != nil {
return false, fmt.Errorf("unable to regenerate policy for '%s': %w", e.StringID(), err)
return fmt.Errorf("unable to regenerate policy for '%s': %w", e.StringID(), err)
}

stats.waitingForLock.Start()
err = e.lockAlive()
stats.waitingForLock.End(err == nil)
if err != nil {
return false, err
return err
}

defer e.unlock()
Expand Down Expand Up @@ -750,7 +746,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
// where an unnecessary policy computation was skipped. In that case
// e.desiredPolicy == e.realizedPolicy also after this call.
if err := e.setDesiredPolicy(policyResult); err != nil {
return false, err
return err
}

// We cannot obtain the rules while e.mutex is held, because obtaining
Expand All @@ -766,17 +762,17 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
// Dry mode needs Network Policy Updates, but the proxy wait group must
// not be initialized, as there is no proxy ACKing the changes.
if err, _ = e.updateNetworkPolicy(nil); err != nil {
return false, err
return err
}

if err = e.writeHeaderfile(nextDir); err != nil {
return false, fmt.Errorf("Unable to write header file: %w", err)
return fmt.Errorf("Unable to write header file: %w", err)
}

if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithField(logfields.EndpointID, e.ID).Debug("Skipping bpf updates due to dry mode")
}
return false, nil
return nil
}

// Endpoints without policy maps only need Network Policy Updates
Expand All @@ -796,25 +792,25 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
err, networkPolicyRevertFunc := e.updateNetworkPolicy(datapathRegenCtxt.proxyWaitGroup)
stats.proxyPolicyCalculation.End(err == nil)
if err != nil {
return false, err
return err
}
datapathRegenCtxt.revertStack.Push(networkPolicyRevertFunc)
}
return false, nil
return nil
}

if e.policyMap == nil {
e.policyMap, err = policymap.OpenOrCreate(e.policyMapPath())
if err != nil {
return false, err
return err
}

// Synchronize the in-memory realized state with BPF map entries,
// so that any potential discrepancy between desired and realized
// state would be dealt with by the following e.syncPolicyMap.
pm, err := e.dumpPolicyMapToMapState()
if err != nil {
return false, err
return err
}
e.realizedPolicy.SetPolicyMap(pm)
e.updatePolicyMapPressureMetric()
Expand Down Expand Up @@ -843,7 +839,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
desiredRedirects, err, finalizeFunc, revertFunc = e.addNewRedirects(datapathRegenCtxt.proxyWaitGroup)
stats.proxyConfiguration.End(err == nil)
if err != nil {
return false, err
return err
}
datapathRegenCtxt.finalizeList.Append(finalizeFunc)
datapathRegenCtxt.revertStack.Push(revertFunc)
Expand Down Expand Up @@ -873,7 +869,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
err, networkPolicyRevertFunc := e.updateNetworkPolicy(datapathRegenCtxt.proxyWaitGroup)
stats.proxyPolicyCalculation.End(err == nil)
if err != nil {
return false, err
return err
}
datapathRegenCtxt.revertStack.Push(networkPolicyRevertFunc)

Expand All @@ -887,7 +883,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
err = e.syncPolicyMap()
stats.mapSync.End(err == nil)
if err != nil {
return false, fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
return fmt.Errorf("unable to regenerate policy because PolicyMap synchronization failed: %w", err)
}

// At this point, traffic is no longer redirected to the proxy for
Expand All @@ -901,7 +897,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
}

if e.isProperty(PropertySkipBPFRegeneration) {
return false, nil
return nil
}

stats.prepareBuild.Start()
Expand All @@ -911,6 +907,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul

// Avoid BPF program compilation and installation if the headerfile for the endpoint
// or the node have not changed.
var headerfileChanged bool
datapathRegenCtxt.bpfHeaderfilesHash, err = e.owner.Datapath().Loader().EndpointHash(e)
if err != nil {
e.getLogger().WithError(err).Warn("Unable to hash header file")
Expand All @@ -929,7 +926,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
}
if datapathRegenCtxt.regenerationLevel >= regeneration.RegenerateWithDatapathRewrite {
if err := e.writeHeaderfile(nextDir); err != nil {
return false, fmt.Errorf("unable to write header file: %w", err)
return fmt.Errorf("unable to write header file: %w", err)
}
}

Expand All @@ -940,10 +937,10 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
datapathRegenCtxt.epInfoCache = e.createEpInfoCache(currentDir)
}
if datapathRegenCtxt.epInfoCache == nil {
return headerfileChanged, fmt.Errorf("Unable to cache endpoint information")
return fmt.Errorf("Unable to cache endpoint information")
}

return headerfileChanged, nil
return nil
}

func (e *Endpoint) finalizeProxyState(regenContext *regenerationContext, err error) {
Expand Down
68 changes: 11 additions & 57 deletions pkg/endpoint/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"path/filepath"

"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"

"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/option"
)

Expand Down Expand Up @@ -43,16 +43,10 @@ func (e *Endpoint) NextDirectoryPath() string {
return filepath.Join(".", fmt.Sprintf("%d%s", e.ID, nextDirectorySuffix))
}

func (e *Endpoint) backupDirectoryPath() string {
return e.DirectoryPath() + backupDirectorySuffix
}

// moveNewFilesTo copies all files, that do not exist in newDir, from oldDir.
// copyExistingState copies all files, that do not exist in newDir, from oldDir.
// It assumes that oldDir and newDir are an endpoint's old and new state
// directories (see synchronizeDirectories below).
func moveNewFilesTo(oldDir, newDir string) error {
var err error

func copyExistingState(oldDir, newDir string) error {
oldDirFile, err := os.Open(oldDir)
if err != nil {
return fmt.Errorf("failed to open old endpoint state dir: %w", err)
Expand Down Expand Up @@ -84,7 +78,7 @@ func moveNewFilesTo(oldDir, newDir string) error {

for _, oldFile := range oldFiles {
if _, ok = newFilesHash[oldFile]; !ok {
if err := os.Rename(filepath.Join(oldDir, oldFile), filepath.Join(newDir, oldFile)); err != nil {
if err := os.Link(filepath.Join(oldDir, oldFile), filepath.Join(newDir, oldFile)); err != nil {
return fmt.Errorf("failed to move endpoint state file: %w", err)
}
}
Expand All @@ -99,7 +93,7 @@ func moveNewFilesTo(oldDir, newDir string) error {
// Returns the original regenerationError if regenerationError was non-nil,
// or if any updates to directories for the endpoint's directories fails.
// Must be called with endpoint.mutex Lock()ed.
func (e *Endpoint) synchronizeDirectories(origDir string, stateDirComplete bool) error {
func (e *Endpoint) synchronizeDirectories(origDir string) error {
scopedLog := e.getLogger()
debugLogEnabled := logging.CanLogAt(scopedLog.Logger, logrus.DebugLevel)

Expand All @@ -115,53 +109,14 @@ func (e *Endpoint) synchronizeDirectories(origDir string, stateDirComplete bool)
// An endpoint directory already exists. We need to back it up before attempting
// to move the new directory in its place so we can attempt recovery.
case !os.IsNotExist(err):
scopedLog.Debug("endpoint directory exists; backing it up")
backupDir := e.backupDirectoryPath()

// Remove any eventual old backup directory. This may fail if
// the directory does not exist. The error is deliberately
// ignored.
e.removeDirectory(backupDir)

// Move the current endpoint directory to a backup location
if debugLogEnabled {
scopedLog.WithFields(logrus.Fields{
"originalDirectory": origDir,
"backupDirectory": backupDir,
}).Debug("moving current directory to backup location")
}

if err := os.Rename(origDir, backupDir); err != nil {
return fmt.Errorf("unable to rename current endpoint directory: %w", err)
if err := copyExistingState(origDir, tmpDir); err != nil {
scopedLog.WithError(err).Debugf("unable to copy state "+
"from %s into the new directory %s.", tmpDir, origDir)
}

// Regarldess of whether the atomic replace succeeds or not,
// ensure that the backup directory is removed when the
// function returns.
defer e.removeDirectory(backupDir)

// Make temporary directory the new endpoint directory
if err := os.Rename(tmpDir, origDir); err != nil {
if err2 := os.Rename(backupDir, origDir); err2 != nil {
scopedLog.WithFields(logrus.Fields{
logfields.Path: backupDir,
}).Warn("restoring directory for endpoint failed, endpoint " +
"is in inconsistent state. Keeping stale directory.")
return err2
}

return fmt.Errorf("restored original endpoint directory, atomic directory move failed: %w", err)
}

// If the compilation was skipped then we need to copy the old
// bpf objects into the new directory
if !stateDirComplete {
scopedLog.Debug("some BPF state files were not recreated; moving old BPF objects into new directory")
err := moveNewFilesTo(backupDir, origDir)
if err != nil {
log.WithError(err).Debugf("unable to copy old bpf object "+
"files from %s into the new directory %s.", backupDir, origDir)
}
// Atomically exchange the two directories.
if err := unix.Renameat2(unix.AT_FDCWD, tmpDir, unix.AT_FDCWD, origDir, unix.RENAME_EXCHANGE); err != nil {
return fmt.Errorf("unable to exchange %s with %s: %w", origDir, tmpDir, err)
}

// No existing endpoint directory, synchronizing the directory is a
Expand Down Expand Up @@ -198,5 +153,4 @@ func (e *Endpoint) removeDirectories() {
e.removeDirectory(e.DirectoryPath())
e.removeDirectory(e.FailedDirectoryPath())
e.removeDirectory(e.NextDirectoryPath())
e.removeDirectory(e.backupDirectoryPath())
}