Skip to content

Commit

Permalink
loader: refactor replaceDatapath to loadDatapath
Browse files Browse the repository at this point in the history
This unblocks #29333. See #32468 for more context.

This commit refactors replaceDatapath() to loadDatapath() by factoring device attachment
out of the function into the caller. The main reasons are flexibility and transparency.
replaceDatapath() was called from many places and needed to do a lot. This change is the
first step to handing individual callers an object representing actual bpf object handles,
so they can correctly manage its lifecycle. In the future, ebpf.LoadAndAssign will be used
for better readability.

Some callers attach the same program to multiple interfaces, some attach multiple programs
(ingress/egress) to the same interface, and some use a mixture of both. This has caused
loops to creep into replaceDatapath, giving it many arguments and many overall
responsibilities, making it hard to form intuition around.

Major changes made in this commit:
- lifted attach{SKB,XDP}Program out of the function, into all callers, making them
  call attach* methods explicitly
- removed `replaceDatapathOptions`
- reduced the window during which a 'revert' can happen (see code comments) due to
  the risks involved and its questionable effectiveness
- removed a few points where context cancellations are obeyed, to be continued in a
  subsequent commit

Fixes: #32468

Signed-off-by: Timo Beckers <[email protected]>
  • Loading branch information
ti-mo committed May 22, 2024
1 parent 62117e7 commit cfc31c4
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 285 deletions.
43 changes: 24 additions & 19 deletions pkg/datapath/loader/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,38 +199,43 @@ func (l *loader) reinitializeIPSec(ctx context.Context) error {
return nil
}

progs := []progDefinition{{progName: symbolFromNetwork, direction: dirIngress}}
spec, err := bpf.LoadCollectionSpec(networkObj)
if err != nil {
return fmt.Errorf("loading eBPF ELF %s: %w", networkObj, err)
}

coll, finalize, err := loadDatapath(ctx, spec, nil, nil)
if err != nil {
return fmt.Errorf("loading %s: %w", networkObj, err)
}
defer coll.Close()

var errs error
for _, iface := range interfaces {
device, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", iface, err)
errs = errors.Join(errs, fmt.Errorf("retrieving device %s: %w", iface, err))
continue
}

finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: iface,
elf: networkObj,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), device),
tcx: option.Config.EnableTCX,
},
)
if err != nil {
log.WithField(logfields.Interface, iface).WithError(err).Error("Load encryption network failed")
// collect errors, but keep trying replacing other interfaces.
errs = errors.Join(errs, err)
} else {
log.WithField(logfields.Interface, iface).Info("Encryption network program (re)loaded")
// Defer map removal until all interfaces' progs have been replaced.
defer finalize()
if err := attachSKBProgram(device, coll.Programs[symbolFromNetwork], symbolFromNetwork,
bpffsDeviceLinksDir(bpf.CiliumPath(), device), netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {

// Collect errors, keep attaching to other interfaces.
errs = errors.Join(errs, fmt.Errorf("interface %s: %w", iface, err))
continue
}

log.WithField(logfields.Interface, iface).Info("Encryption network program (re)loaded")
}

if errs != nil {
return fmt.Errorf("failed to load encryption program: %w", errs)
}

// Defer map removal until all interfaces' progs have been replaced.
finalize()

return nil
}

Expand Down
210 changes: 85 additions & 125 deletions pkg/datapath/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,96 +336,68 @@ func removeObsoleteNetdevPrograms(devices []string) error {
return nil
}

// reloadHostDatapath (re)attaches BPF programs to:
// - cilium_host: ingress and egress
// - cilium_net: ingress
// - native devices: ingress and (optionally) egress if certain features require it
// reloadHostDatapath (re)attaches programs from bpf_host.c to:
// - cilium_host: cil_to_host ingress and cil_from_host to egress
// - cilium_net: cil_to_host to ingress
// - native devices: cil_from_netdev to ingress and (optionally) cil_to_netdev to egress if certain features require it
func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, spec *ebpf.CollectionSpec, devices []string) error {
// Warning: here be dragons. There used to be a single loop over
// interfaces+objs+progs here from the iproute2 days, but this was never
// correct to begin with. Tail call maps were always reused when possible,
// causing control flow to transition through invalid states as new tail calls
// were sequentially upserted into the array.
//
// Take care not to call replaceDatapath() twice for a single ELF/interface.
// Map migration should only be run once per ELF, otherwise cilium_calls_*
// created by prior loads will be unpinned, causing them to be emptied,
// missing all tail calls.

// Replace programs on cilium_host.
host, err := netlink.LinkByName(ep.InterfaceName())
if err != nil {
return fmt.Errorf("retrieving device %s: %w", ep.InterfaceName(), err)
}

progs := []progDefinition{
{progName: symbolToHostEp, direction: dirIngress},
{progName: symbolFromHostEp, direction: dirEgress},
}

finalize, err := replaceDatapathFromSpec(ctx, spec,
replaceDatapathOptions{
device: ep.InterfaceName(),
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), host),
tcx: option.Config.EnableTCX,
mapRenames: ELFMapSubstitutions(ep),
constants: ELFVariableSubstitutions(ep),
},
)
coll, finalize, err := loadDatapath(ctx, spec, ELFMapSubstitutions(ep), ELFVariableSubstitutions(ep))
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Veth: ep.InterfaceName(),
})
// Don't log an error here if the context was canceled or timed out;
// this log message should only represent failures with respect to
// loading the program.
if ctx.Err() == nil {
scopedLog.WithError(err).Warningf("JoinEP: Failed to load program for %s", ep.InterfaceName())
}
return err
}
// Defer map removal until all interfaces' progs have been replaced.
defer finalize()
defer coll.Close()

// Attach cil_to_host to cilium_host ingress.
if err := attachSKBProgram(host, coll.Programs[symbolToHostEp], symbolToHostEp,
bpffsDeviceLinksDir(bpf.CiliumPath(), host), netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", ep.InterfaceName(), err)
}
// Attach cil_from_host to cilium_host egress.
if err := attachSKBProgram(host, coll.Programs[symbolFromHostEp], symbolFromHostEp,
bpffsDeviceLinksDir(bpf.CiliumPath(), host), netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", ep.InterfaceName(), err)
}

finalize()

// Replace program on cilium_net.
net, err := netlink.LinkByName(defaults.SecondHostDevice)
if err != nil {
log.WithError(err).WithField("device", defaults.SecondHostDevice).Error("Link does not exist")
return fmt.Errorf("device '%s' not found: %w", defaults.SecondHostDevice, err)
return fmt.Errorf("retrieving device %s: %w", defaults.SecondHostDevice, err)
}

secondConsts, secondRenames, err := l.patchHostNetdevDatapath(ep, defaults.SecondHostDevice)
if err != nil {
return err
}

progs = []progDefinition{
{progName: symbolToHostEp, direction: dirIngress},
}

finalize, err = replaceDatapathFromSpec(ctx, spec,
replaceDatapathOptions{
device: defaults.SecondHostDevice,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), net),
tcx: option.Config.EnableTCX,
mapRenames: secondRenames,
constants: secondConsts,
},
)
coll, finalize, err = loadDatapath(ctx, spec, secondRenames, secondConsts)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Veth: defaults.SecondHostDevice,
})
if ctx.Err() == nil {
scopedLog.WithError(err).Warningf("JoinEP: Failed to load program for %s", defaults.SecondHostDevice)
}
return err
}
defer finalize()
defer coll.Close()

// Attach cil_to_host to cilium_net.
if err := attachSKBProgram(net, coll.Programs[symbolToHostEp], symbolToHostEp,
bpffsDeviceLinksDir(bpf.CiliumPath(), net), netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", defaults.SecondHostDevice, err)
}

finalize()

// Replace programs on physical devices.
// Replace programs on physical devices, ignoring devices that don't exist.
for _, device := range devices {
iface, err := netlink.LinkByName(device)
if err != nil {
Expand All @@ -440,8 +412,16 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, s
return err
}

progs := []progDefinition{
{progName: symbolFromHostNetdevEp, direction: dirIngress},
coll, finalize, err := loadDatapath(ctx, spec, netdevRenames, netdevConsts)
if err != nil {
return err
}
defer coll.Close()

// Attach cil_from_netdev to ingress.
if err := attachSKBProgram(iface, coll.Programs[symbolFromHostNetdevEp], symbolFromHostNetdevEp,
linkDir, netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", device, err)
}

if option.Config.AreDevicesRequired() &&
Expand All @@ -450,7 +430,11 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, s
// the rev-NAT xlations.
device != wgTypes.IfaceName {

progs = append(progs, progDefinition{symbolToHostNetdevEp, dirEgress})
// Attach cil_to_netdev to egress.
if err := attachSKBProgram(iface, coll.Programs[symbolToHostNetdevEp], symbolToHostNetdevEp,
linkDir, netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", device, err)
}
} else {
// Remove any previously attached device from egress path if BPF
// NodePort and host firewall are disabled.
Expand All @@ -459,26 +443,7 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, s
}
}

finalize, err := replaceDatapathFromSpec(ctx, spec,
replaceDatapathOptions{
device: device,
programs: progs,
linkDir: linkDir,
tcx: option.Config.EnableTCX,
mapRenames: netdevRenames,
constants: netdevConsts,
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Veth: device,
})
if ctx.Err() == nil {
scopedLog.WithError(err).Warningf("JoinEP: Failed to load program for physical device %s", device)
}
return err
}
defer finalize()
finalize()
}

// call at the end of the function so that we can easily detect if this removes necessary
Expand Down Expand Up @@ -532,44 +497,35 @@ func (l *loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, spec
return err
}
} else {
progs := []progDefinition{{progName: symbolFromEndpoint, direction: dirIngress}}
coll, finalize, err := loadDatapath(ctx, spec, ELFMapSubstitutions(ep), ELFVariableSubstitutions(ep))
if err != nil {
return err
}
defer coll.Close()

iface, err := netlink.LinkByName(device)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", device, err)
}

linkDir := bpffsEndpointLinksDir(bpf.CiliumPath(), ep)
if err := attachSKBProgram(iface, coll.Programs[symbolFromEndpoint], symbolFromEndpoint,
linkDir, netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", device, err)
}

if ep.RequireEgressProg() {
progs = append(progs, progDefinition{progName: symbolToEndpoint, direction: dirEgress})
} else {
iface, err := netlink.LinkByName(device)
if err != nil {
log.WithError(err).WithField("device", device).Warn("Link does not exist")
if err := attachSKBProgram(iface, coll.Programs[symbolToEndpoint], symbolToEndpoint,
linkDir, netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", device, err)
}
} else {
if err := detachSKBProgram(iface, symbolToEndpoint, linkDir, netlink.HANDLE_MIN_EGRESS); err != nil {
log.WithField("device", device).Error(err)
}
}

finalize, err := replaceDatapathFromSpec(ctx, spec,
replaceDatapathOptions{
device: device,
programs: progs,
linkDir: linkDir,
tcx: option.Config.EnableTCX,
mapRenames: ELFMapSubstitutions(ep),
constants: ELFVariableSubstitutions(ep),
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Veth: device,
})
// Don't log an error here if the context was canceled or timed out;
// this log message should only represent failures with respect to
// loading the program.
if ctx.Err() == nil {
scopedLog.WithError(err).Warn("JoinEP: Failed to attach program(s)")
}
return err
}
defer finalize()
finalize()
}

if ep.RequireEndpointRoute() {
Expand All @@ -593,31 +549,35 @@ func (l *loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, spec

func (l *loader) replaceOverlayDatapath(ctx context.Context, cArgs []string, iface string) error {
if err := compileOverlay(ctx, cArgs); err != nil {
log.WithError(err).Fatal("failed to compile overlay programs")
return fmt.Errorf("compiling overlay program: %w", err)
}

device, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", iface, err)
}

progs := []progDefinition{
{progName: symbolFromOverlay, direction: dirIngress},
{progName: symbolToOverlay, direction: dirEgress},
spec, err := bpf.LoadCollectionSpec(overlayObj)
if err != nil {
return fmt.Errorf("loading eBPF ELF %s: %w", overlayObj, err)
}

finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: iface,
elf: overlayObj,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), device),
tcx: option.Config.EnableTCX,
},
)
coll, finalize, err := loadDatapath(ctx, spec, nil, nil)
if err != nil {
log.WithField(logfields.Interface, iface).WithError(err).Fatal("Load overlay network failed")
return err
}
defer coll.Close()

linkDir := bpffsDeviceLinksDir(bpf.CiliumPath(), device)
if err := attachSKBProgram(device, coll.Programs[symbolFromOverlay], symbolFromOverlay,
linkDir, netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", device, err)
}
if err := attachSKBProgram(device, coll.Programs[symbolToOverlay], symbolToOverlay,
linkDir, netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", device, err)
}

finalize()

return nil
Expand Down

0 comments on commit cfc31c4

Please sign in to comment.