Skip to content

Commit

Permalink
Introducing portprober as successor to uplinkprober
Browse files Browse the repository at this point in the history
As we generalize the concept of NI port probing, which is currently
limited to uplink & freeuplink shared labels and one port selected
for all routes, into a multipath routing with probing-based port
selection with user-defined shared labels, it is necessary to replace
uplinkprober with a more advanced component
(and preferrably avoid using "uplink" for the name).
See: lf-edge/eve-api#53

This commit introduces portprober - a sucessor to uplinkprober.
It will be used by zedrouter to probe port connectivity for multipath
IP routes. These are routes that select multiple possible output ports
using a shared label. This can be EVE-defined label, such as "uplink"
matching all mgmt ports, or a user-defined label selecting a custom
subset of ports.
For now we will not support load-balancing and zedrouter will have to
pick one output port at a time for each multipath route. This will be
done by portprober. It will probe connectivity of each port (with
possibly user-defined probing endpoint and probing method) and based
on the results and some other criteria such as cost, wwan signal
strength, etc., it will pick the best port. The probing algorithm is
pretty much the same as implemented in uplinkprober, just extended to
support user-defined shared labels, user-defined probing method, etc.

Note that in this commit we just add the portprober package and make
only minimal changes in the pillar/types package (e.g. introducing
SharedLabels into DPC & DNS).
In the follow-up commit(s), uplinkprober will be removed and zedrouter
will be updated to support multipath routes and to use portprober
instead.

Signed-off-by: Milan Lenco <milan@zededa.com>
  • Loading branch information
milan-zededa committed Jul 30, 2024
1 parent 61281e5 commit 695453d
Show file tree
Hide file tree
Showing 24 changed files with 2,960 additions and 171 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
/pkg/pillar/nistate/ @milan-zededa
/pkg/pillar/sriov/ @uncleDecart
/pkg/pillar/uplinkprober/ @milan-zededa
/pkg/pillar/portprober/ @milan-zededa
/pkg/pillar/utils/ @milan-zededa
/pkg/pillar/volumehandlers/ @OhmSpectator
/pkg/pillar/zedcloud/ @christoph-zededa @rouming
Expand Down
31 changes: 26 additions & 5 deletions pkg/pillar/cmd/nim/nim.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,14 @@ func (n *nim) handleDPCFileModify(_ interface{}, key string, configArg, _ interf

func (n *nim) handleDPCImpl(key string, configArg interface{}, fromFile bool) {
dpc := configArg.(types.DevicePortConfig)
dpc.DoSanitize(n.Log, true, true, key, true, true)
dpc.DoSanitize(n.Log, types.DPCSanitizeArgs{
SanitizeTimePriority: true,
SanitizeKey: true,
KeyToUseIfEmpty: key,
SanitizeName: true,
SanitizeL3Port: true,
SanitizeSharedLabels: true,
})
if fromFile {
// Use sha to determine if file has already been ingested
filename := filepath.Join(types.TmpDirname, "DevicePortConfig",
Expand Down Expand Up @@ -759,7 +766,14 @@ func (n *nim) handleDPCImpl(key string, configArg interface{}, fromFile bool) {

func (n *nim) handleDPCDelete(_ interface{}, key string, configArg interface{}) {
dpc := configArg.(types.DevicePortConfig)
dpc.DoSanitize(n.Log, false, true, key, true, true)
dpc.DoSanitize(n.Log, types.DPCSanitizeArgs{
SanitizeTimePriority: false,
SanitizeKey: true,
KeyToUseIfEmpty: key,
SanitizeName: true,
SanitizeL3Port: true,
SanitizeSharedLabels: true,
})
n.dpcManager.DelDPC(dpc)
}

Expand Down Expand Up @@ -908,7 +922,14 @@ func (n *nim) ingestDevicePortConfigFile(oldDirname string, newDirname string, n
return
}
key := strings.TrimSuffix(name, ".json")
dpc.DoSanitize(n.Log, true, true, key, true, true)
dpc.DoSanitize(n.Log, types.DPCSanitizeArgs{
SanitizeTimePriority: true,
SanitizeKey: true,
KeyToUseIfEmpty: key,
SanitizeName: true,
SanitizeL3Port: true,
SanitizeSharedLabels: true,
})

// Use sha to determine if file has already been ingested
basename := filepath.Base(filename)
Expand Down Expand Up @@ -1030,7 +1051,7 @@ func (n *nim) makeLastResortDPC() (types.DevicePortConfig, error) {
},
}
dns := n.dpcManager.GetDNS()
portStatus := dns.GetPortByIfName(ifName)
portStatus := dns.LookupPortByIfName(ifName)
if portStatus != nil {
port.WirelessCfg = portStatus.WirelessCfg
}
Expand Down Expand Up @@ -1089,7 +1110,7 @@ func (n *nim) processInterfaceChange(ifChange netmonitor.IfChange) {
return
}
includePort := n.includeLastResortPort(ifChange.Attrs)
port := n.lastResort.GetPortByIfName(ifChange.Attrs.IfName)
port := n.lastResort.LookupPortByIfName(ifChange.Attrs.IfName)
if port == nil && includePort {
n.publishLastResortDPC(fmt.Sprintf("interface %s should be included",
ifChange.Attrs.IfName))
Expand Down
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/zedagent/handlemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,11 +1681,11 @@ func protoEncodeNetworkInstanceMetricProto(status types.NetworkInstanceMetrics)

func protoEncodeProbeMetrics(probeMetrics types.ProbeMetrics) *metrics.ZProbeNIMetrics {
protoMetrics := &metrics.ZProbeNIMetrics{
CurrentIntf: probeMetrics.SelectedUplinkIntf,
CurrentIntf: probeMetrics.SelectedPortIfName,
RemoteEndpoint: strings.Join(probeMetrics.RemoteEndpoints, ", "),
PingIntv: probeMetrics.LocalPingIntvl,
RemotePingIntv: probeMetrics.RemotePingIntvl,
UplinkCnt: probeMetrics.UplinkCount,
UplinkCnt: probeMetrics.PortCount,
}
for _, intfStats := range probeMetrics.IntfProbeStats {
var nextHops []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/pillar/cmd/zedagent/parseconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func parseStaticRoute(route *zconfig.IPRoute, config *types.NetworkInstanceConfi
if gatewayIP.IsUnspecified() {
return errors.New("gateway IP address is all-zeroes")
}
config.StaticRoutes = append(config.StaticRoutes, types.IPRoute{
config.StaticRoutes = append(config.StaticRoutes, types.IPRouteConfig{
DstNetwork: dstNetwork,
Gateway: gatewayIP,
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/pillar/cmd/zedagent/reportinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,9 +1005,9 @@ func encodeSystemAdapterInfo(ctx *zedagentContext) *info.SystemAdapterInfo {
}
if i == dpcl.CurrentIndex {
// For the currently used DPC we publish the status (DeviceNetworkStatus).
portStatus := deviceNetworkStatus.GetPortsByLogicallabel(p.Logicallabel)
if len(portStatus) == 1 {
dps.Ports[j] = encodeNetworkPortStatus(ctx, portStatus[0], p.NetworkUUID)
portStatus := deviceNetworkStatus.LookupPortByLogicallabel(p.Logicallabel)
if portStatus != nil {
dps.Ports[j] = encodeNetworkPortStatus(ctx, portStatus, p.NetworkUUID)
continue
}
}
Expand Down
28 changes: 10 additions & 18 deletions pkg/pillar/cmd/zedrouter/networkinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (z *zedrouter) getNIPortConfig(
if ifName == "" {
return nil
}
port := z.deviceNetworkStatus.GetPortByIfName(ifName)
port := z.deviceNetworkStatus.LookupPortByIfName(ifName)
if port == nil {
return nil
}
Expand Down Expand Up @@ -124,26 +124,18 @@ func (z *zedrouter) setSelectedUplink(uplinkLogicalLabel string,
// and uplink probing eventually finding a suitable uplink port.
return fmt.Errorf("no selected uplink port")
}
ports := z.deviceNetworkStatus.GetPortsByLogicallabel(uplinkLogicalLabel)
switch len(ports) {
case 0:
err := fmt.Errorf("label of selected uplink (%s) does not match any port (%v)",
uplinkLogicalLabel, ports)
port := z.deviceNetworkStatus.LookupPortByLogicallabel(uplinkLogicalLabel)
if port == nil {
err := fmt.Errorf("label of selected uplink (%s) does not match any port",
uplinkLogicalLabel)
// Wait for DPC update
return err
case 1:
if ports[0].InvalidConfig {
return fmt.Errorf("port %s has invalid config: %s", ports[0].Logicallabel,
ports[0].LastError)
}
// Selected port is OK
break
default:
// Note: soon we will support NI with multiple ports.
err := fmt.Errorf("label of selected uplink matches multiple ports (%v)", ports)
return err
}
ifName := ports[0].IfName
if port.InvalidConfig {
return fmt.Errorf("port %s has invalid config: %s", port.Logicallabel,
port.LastError)
}
ifName := port.IfName
status.SelectedUplinkIntfName = ifName
ifIndex, exists, _ := z.networkMonitor.GetInterfaceIndex(ifName)
if !exists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pillar/conntester/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (t *MockConnectivityTester) TestConnectivity(dns types.DeviceNetworkStatus,
intfStatusMap.RecordFailure(ifName, err.Error())
continue
}
port := dns.GetPortByIfName(ifName)
port := dns.LookupPortByIfName(ifName)
if !port.IsMgmt {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pillar/conntester/zedcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ func (t *ZedcloudConnectivityTester) getPortsNotReady(
for _, attempt := range sendErr.Attempts {
var dnsErr *types.DNSNotAvailError
if errors.As(attempt.Err, &dnsErr) {
if port := dns.GetPortByIfName(dnsErr.IfName); port != nil {
if port := dns.LookupPortByIfName(dnsErr.IfName); port != nil {
portMap[port.Logicallabel] = struct{}{}
}
}
var ipErr *types.IPAddrNotAvailError
if errors.As(attempt.Err, &ipErr) {
if port := dns.GetPortByIfName(ipErr.IfName); port != nil {
if port := dns.LookupPortByIfName(ipErr.IfName); port != nil {
portMap[port.Logicallabel] = struct{}{}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pillar/devicenetwork/wpad.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func CheckAndGetNetworkProxy(log *base.LogObject, dns *types.DeviceNetworkStatus,
ifname string, metrics *zedcloud.AgentMetrics) error {

portStatus := dns.GetPortByIfName(ifname)
portStatus := dns.LookupPortByIfName(ifname)
if portStatus == nil {
errStr := fmt.Sprintf("Missing port status for interface %s", ifname)
log.Errorln(errStr)
Expand Down
1 change: 1 addition & 0 deletions pkg/pillar/dpcmanager/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (m *DpcManager) updateDNS() {
m.deviceNetStatus.Ports[ix].IfName = port.IfName
m.deviceNetStatus.Ports[ix].Phylabel = port.Phylabel
m.deviceNetStatus.Ports[ix].Logicallabel = port.Logicallabel
m.deviceNetStatus.Ports[ix].SharedLabels = port.SharedLabels
m.deviceNetStatus.Ports[ix].Alias = port.Alias
m.deviceNetStatus.Ports[ix].IsMgmt = port.IsMgmt
m.deviceNetStatus.Ports[ix].IsL3Port = port.IsL3Port
Expand Down
8 changes: 7 additions & 1 deletion pkg/pillar/dpcmanager/dpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,13 @@ func (m *DpcManager) ingestDPCList() (dpclPresentAtBoot bool) {
var dpcl types.DevicePortConfigList
for _, portConfig := range storedDpcl.PortConfigList {
// Sanitize port labels and IsL3Port flag.
portConfig.DoSanitize(m.Log, false, false, "", true, true)
portConfig.DoSanitize(m.Log, types.DPCSanitizeArgs{
SanitizeTimePriority: false,
SanitizeKey: false,
SanitizeName: true,
SanitizeL3Port: true,
SanitizeSharedLabels: true,
})
// Clear runtime errors (not config validation errors) from before reboot
// and start fresh.
for i := 0; i < len(portConfig.Ports); i++ {
Expand Down
16 changes: 11 additions & 5 deletions pkg/pillar/dpcmanager/dpcmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,12 +1772,12 @@ func TestTransientDNSError(test *testing.T) {
t.Consistently(testingInProgressCb(), 8*time.Second).Should(BeTrue())
t.Expect(getDPC(0).State).To(Equal(types.DPCStateIPDNSWait))
dpc = getDPC(0)
dpcEth0 := dpc.GetPortByIfName("eth0")
dpcEth0 := dpc.LookupPortByIfName("eth0")
t.Expect(dpcEth0).ToNot(BeNil())
t.Expect(dpcEth0.HasError()).To(BeTrue())
t.Expect(dpcEth0.LastError).To(Equal("interface eth0: no DNS server available"))
dns := getDNS()
dnsEth0 := dns.GetPortByIfName("eth0")
dnsEth0 := dns.LookupPortByIfName("eth0")
t.Expect(dnsEth0).ToNot(BeNil())
t.Expect(dnsEth0.HasError()).To(BeTrue())
t.Expect(dnsEth0.LastError).To(Equal("interface eth0: no DNS server available"))
Expand All @@ -1787,12 +1787,12 @@ func TestTransientDNSError(test *testing.T) {
t.Eventually(testingInProgressCb()).Should(BeFalse())
t.Expect(getDPC(0).State).To(Equal(types.DPCStateSuccess))
dpc = getDPC(0)
dpcEth0 = dpc.GetPortByIfName("eth0")
dpcEth0 = dpc.LookupPortByIfName("eth0")
t.Expect(dpcEth0).ToNot(BeNil())
t.Expect(dpcEth0.HasError()).To(BeFalse())
t.Expect(dpcEth0.LastError).To(BeEmpty())
dns = getDNS()
dnsEth0 = dns.GetPortByIfName("eth0")
dnsEth0 = dns.LookupPortByIfName("eth0")
t.Expect(dnsEth0).ToNot(BeNil())
t.Expect(dnsEth0.HasError()).To(BeFalse())
t.Expect(dnsEth0.LastError).To(BeEmpty())
Expand Down Expand Up @@ -1821,7 +1821,13 @@ func TestOldDPC(test *testing.T) {

// This is run by nim for any input DPC to make sure that it is compliant
// with the latest EVE version.
dpc.DoSanitize(logObj, true, false, "", true, true)
dpc.DoSanitize(logObj, types.DPCSanitizeArgs{
SanitizeTimePriority: true,
SanitizeKey: false,
SanitizeName: true,
SanitizeL3Port: true,
SanitizeSharedLabels: true,
})

dpcManager.AddDPC(dpc)

Expand Down
8 changes: 4 additions & 4 deletions pkg/pillar/dpcmanager/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,13 +538,13 @@ func (m *DpcManager) isInterfaceCrucial(ifName string) bool {
return false
}
// Is part of DPC at CurrentIndex in DPCL?
portStatus := portConfigList[currentIndex].GetPortByIfName(ifName)
if portStatus != nil {
portConfig := portConfigList[currentIndex].LookupPortByIfName(ifName)
if portConfig != nil {
return true
}
// Is part of DPC at index 0 in DPCL?
portStatus = portConfigList[0].GetPortByIfName(ifName)
if portStatus != nil {
portConfig = portConfigList[0].LookupPortByIfName(ifName)
if portConfig != nil {
return true
}
return false
Expand Down
Loading

0 comments on commit 695453d

Please sign in to comment.