Skip to content

Commit

Permalink
pkg/endpoint: make state synchronization atomic
Browse files Browse the repository at this point in the history
BPF regeneration writes state into a new temporary directory. Once it
has succeeded we need to swap the old and new directory. This is currently
achieved by "backing up" the current state by renaming the directory.
This code has a bunch of corner cases around cleaning up old directories
and so on which are necessary since the synchronization isn't truly
atomic.

Instead, use the RENAME_EXCHANGE flag to atomically exchange the two
existing directories. Also use hard links to retain existing state
so that killing the agent during a synchronization doesn't lead
to corruption.

Signed-off-by: Lorenz Bauer <[email protected]>
  • Loading branch information
lmb committed May 9, 2024
1 parent d9ea218 commit b4a4f46
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 53 deletions.
64 changes: 13 additions & 51 deletions pkg/endpoint/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"path/filepath"

"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"

"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/option"
)

Expand Down Expand Up @@ -43,14 +43,10 @@ func (e *Endpoint) NextDirectoryPath() string {
return filepath.Join(".", fmt.Sprintf("%d%s", e.ID, nextDirectorySuffix))
}

func (e *Endpoint) backupDirectoryPath() string {
return e.DirectoryPath() + backupDirectorySuffix
}

// moveNewFilesTo copies all files, that do not exist in newDir, from oldDir.
// copyExistingState copies all files, that do not exist in newDir, from oldDir.
// It assumes that oldDir and newDir are an endpoint's old and new state
// directories (see synchronizeDirectories below).
func moveNewFilesTo(oldDir, newDir string) error {
func copyExistingState(oldDir, newDir string) error {
var err error

oldDirFile, err := os.Open(oldDir)
Expand Down Expand Up @@ -84,7 +80,7 @@ func moveNewFilesTo(oldDir, newDir string) error {

for _, oldFile := range oldFiles {
if _, ok = newFilesHash[oldFile]; !ok {
if err := os.Rename(filepath.Join(oldDir, oldFile), filepath.Join(newDir, oldFile)); err != nil {
if err := os.Link(filepath.Join(oldDir, oldFile), filepath.Join(newDir, oldFile)); err != nil {
return fmt.Errorf("failed to move endpoint state file: %w", err)
}
}
Expand Down Expand Up @@ -115,55 +111,22 @@ func (e *Endpoint) synchronizeDirectories(origDir string, stateDirComplete bool)
// An endpoint directory already exists. We need to back it up before attempting
// to move the new directory in its place so we can attempt recovery.
case !os.IsNotExist(err):
scopedLog.Debug("endpoint directory exists; backing it up")
backupDir := e.backupDirectoryPath()

// Remove any eventual old backup directory. This may fail if
// the directory does not exist. The error is deliberately
// ignored.
e.removeDirectory(backupDir)

// Move the current endpoint directory to a backup location
if debugLogEnabled {
scopedLog.WithFields(logrus.Fields{
"originalDirectory": origDir,
"backupDirectory": backupDir,
}).Debug("moving current directory to backup location")
}

if err := os.Rename(origDir, backupDir); err != nil {
return fmt.Errorf("unable to rename current endpoint directory: %w", err)
}

// Regarldess of whether the atomic replace succeeds or not,
// ensure that the backup directory is removed when the
// function returns.
defer e.removeDirectory(backupDir)

// Make temporary directory the new endpoint directory
if err := os.Rename(tmpDir, origDir); err != nil {
if err2 := os.Rename(backupDir, origDir); err2 != nil {
scopedLog.WithFields(logrus.Fields{
logfields.Path: backupDir,
}).Warn("restoring directory for endpoint failed, endpoint " +
"is in inconsistent state. Keeping stale directory.")
return err2
}

return fmt.Errorf("restored original endpoint directory, atomic directory move failed: %w", err)
}

// If the compilation was skipped then we need to copy the old
// bpf objects into the new directory
if !stateDirComplete {
scopedLog.Debug("some BPF state files were not recreated; moving old BPF objects into new directory")
err := moveNewFilesTo(backupDir, origDir)
scopedLog.Debug("retaining existing state")
err := copyExistingState(tmpDir, origDir)
if err != nil {
log.WithError(err).Debugf("unable to copy old bpf object "+
"files from %s into the new directory %s.", backupDir, origDir)
scopedLog.WithError(err).Debugf("unable to copy state "+
"from %s into the new directory %s.", tmpDir, origDir)
}
}

// Atomically exchange the two directories.
if err := unix.Renameat2(unix.AT_FDCWD, tmpDir, unix.AT_FDCWD, origDir, unix.RENAME_EXCHANGE); err != nil {
return fmt.Errorf("unable to exchange %s with %s: %w", origDir, tmpDir, err)
}

// No existing endpoint directory, synchronizing the directory is a
// simple move
default:
Expand Down Expand Up @@ -198,5 +161,4 @@ func (e *Endpoint) removeDirectories() {
e.removeDirectory(e.DirectoryPath())
e.removeDirectory(e.FailedDirectoryPath())
e.removeDirectory(e.NextDirectoryPath())
e.removeDirectory(e.backupDirectoryPath())
}
4 changes: 2 additions & 2 deletions pkg/endpoint/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *EndpointSuite) TestMoveNewFilesTo(t *testing.T) {
},
}
for _, tt := range tests {
if err := moveNewFilesTo(tt.args.oldDir, tt.args.newDir); (err != nil) != tt.wantErr {
if err := copyExistingState(tt.args.oldDir, tt.args.newDir); (err != nil) != tt.wantErr {
require.Equal(t, tt.wantErr, err != nil)
compareDir(tt.args.oldDir, tt.wantOldDir)
compareDir(tt.args.newDir, tt.wantNewDir)
Expand Down Expand Up @@ -98,7 +98,7 @@ func benchmarkMoveNewFilesTo(b *testing.B, numFiles int) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := moveNewFilesTo(oldDir, newDir); err != nil {
if err := copyExistingState(oldDir, newDir); err != nil {
b.Fatal(err)
}
}
Expand Down

0 comments on commit b4a4f46

Please sign in to comment.