From c29e7ad51ac4dd12af142d0cbb2b2ade48e52cef Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Tue, 5 May 2026 20:14:33 +0000 Subject: [PATCH 1/9] doublezerod: add periodic kernel route reconciliation (#3669) Add a reconciliation loop to the liveness manager that periodically scans the kernel routing table for missing BGP routes and reinstalls them, mitigating connectivity loss caused by external processes removing routes. Also promote liveness session down logs from DEBUG to INFO for passive/peer-passive modes so operators can see the full up/down lifecycle. --- client/doublezerod/cmd/doublezerod/main.go | 17 ++- .../doublezerod/internal/liveness/manager.go | 101 +++++++++++++- .../internal/liveness/manager_test.go | 124 ++++++++++++++++++ .../doublezerod/internal/liveness/metrics.go | 13 ++ 4 files changed, 246 insertions(+), 9 deletions(-) diff --git a/client/doublezerod/cmd/doublezerod/main.go b/client/doublezerod/cmd/doublezerod/main.go index c0453969cb..f44f1b8949 100644 --- a/client/doublezerod/cmd/doublezerod/main.go +++ b/client/doublezerod/cmd/doublezerod/main.go @@ -45,13 +45,14 @@ var ( stateDir = flag.String("state-dir", "/var/lib/doublezerod", "directory for persistent state files") // Route liveness configuration flags. - routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min") - routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min") - routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult") - routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor") - routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil") - routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)") - routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness") + routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min") + routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min") + routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult") + routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor") + routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil") + routeLivenessReconcileInterval = flag.Duration("route-liveness-reconcile-interval", 30*time.Second, "interval for periodic kernel route reconciliation; 0 disables") + routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)") + routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness") // TODO(snormore): These flags are temporary for initial rollout testing. // They will be superceded by a single `route-liveness-enable` flag, where false means @@ -179,6 +180,8 @@ func main() { EnablePeerMetrics: *routeLivenessPeerMetrics, + RouteReconcileInterval: *routeLivenessReconcileInterval, + // Default to treating peers that advertise passive mode as passive. That is, we will // install their routes immediately and never uninstall them on down events. HonorPeerAdvertisedPassive: true, diff --git a/client/doublezerod/internal/liveness/manager.go b/client/doublezerod/internal/liveness/manager.go index 32556c282b..135494beb3 100644 --- a/client/doublezerod/internal/liveness/manager.go +++ b/client/doublezerod/internal/liveness/manager.go @@ -13,6 +13,7 @@ import ( "github.com/malbeclabs/doublezero/client/doublezerod/internal/routing" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sys/unix" ) const ( @@ -25,6 +26,9 @@ const ( defaultBackoffMax = 1 * time.Minute defaultMaxEvents = 10240 + + // Default interval for periodic kernel route reconciliation. + defaultRouteReconcileInterval = 30 * time.Second ) // Peer identifies a remote endpoint and the local interface context used to reach it. @@ -92,6 +96,10 @@ type ManagerConfig struct { // Client version to advertise to peers in control packets. ClientVersion string + + // RouteReconcileInterval controls how often the manager scans the kernel + // routing table for missing routes and reinstalls them. Zero disables. + RouteReconcileInterval time.Duration } // Validate fills defaults and enforces constraints for ManagerConfig. @@ -151,6 +159,9 @@ func (c *ManagerConfig) Validate() error { if c.ClientVersion == "" { return errors.New("clientVersion is required") } + if c.RouteReconcileInterval == 0 { + c.RouteReconcileInterval = defaultRouteReconcileInterval + } return nil } @@ -291,6 +302,25 @@ func NewManager(ctx context.Context, cfg *ManagerConfig, cr *routing.ConfiguredR } }() + // Route reconciliation goroutine: periodically scans the kernel routing + // table for missing routes and reinstalls them. + if cfg.RouteReconcileInterval > 0 { + m.wg.Add(1) + go func() { + defer m.wg.Done() + ticker := time.NewTicker(cfg.RouteReconcileInterval) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.reconcileRoutes() + } + } + }() + } + // If any routes are configured to be excluded, mark then as AdminDown immediately. if m.cr != nil { for ip := range m.cr.GetExcluded() { @@ -834,7 +864,7 @@ func (m *manager) onSessionDown(sess *Session) { } if m.cfg.PassiveMode { - m.log.Debug("liveness: session down (global passive; keeping route)", + m.log.Info("liveness: session down (global passive; keeping route)", "peer", peer.String(), "route", snap.Route.String(), "downSince", snap.DownSince.UTC().String(), @@ -845,7 +875,7 @@ func (m *manager) onSessionDown(sess *Session) { } if effectivelyPassive { - m.log.Debug("liveness: session down (peer passive; keeping route)", + m.log.Info("liveness: session down (peer passive; keeping route)", "peer", peer.String(), "route", snap.Route.String(), "downSince", snap.DownSince.UTC().String(), @@ -875,6 +905,73 @@ func (m *manager) onSessionDown(sess *Session) { ) } +// reconcileRoutes scans the kernel routing table for routes that should be +// installed but are missing, and reinstalls them. This mitigates routes being +// removed by external processes. +func (m *manager) reconcileRoutes() { + // Snapshot installed and desired under lock. + type installedRoute struct { + rk RouteKey + route *Route + } + m.mu.Lock() + var toCheck []installedRoute + for rk, ok := range m.installed { + if !ok { + continue + } + if r, exists := m.desired[rk]; exists { + toCheck = append(toCheck, installedRoute{rk: rk, route: r}) + } + } + m.mu.Unlock() + + if len(toCheck) == 0 { + return + } + + kernelRoutes, err := m.cfg.Netlinker.RouteByProtocol(unix.RTPROT_BGP) + if err != nil { + m.log.Error("liveness: error fetching kernel routes for reconciliation", "error", err) + return + } + + // Build a lookup set keyed by (table, dst, nexthop) for fast matching. + type kernelKey struct { + Table int + DstIP string + NextHop string + } + kernelSet := make(map[kernelKey]struct{}, len(kernelRoutes)) + for _, kr := range kernelRoutes { + var dstIP, nhIP string + if kr.Dst != nil && kr.Dst.IP != nil && kr.Dst.IP.To4() != nil { + dstIP = kr.Dst.IP.To4().String() + } + if kr.NextHop != nil && kr.NextHop.To4() != nil { + nhIP = kr.NextHop.To4().String() + } + kernelSet[kernelKey{Table: kr.Table, DstIP: dstIP, NextHop: nhIP}] = struct{}{} + } + + for _, ir := range toCheck { + kk := kernelKey{Table: ir.route.Table, DstIP: ir.rk.DstPrefix, NextHop: ir.rk.NextHop} + if _, present := kernelSet[kk]; present { + continue + } + m.log.Warn("liveness: reinstalling missing route", + "route", ir.route.String(), + "iface", ir.rk.Interface, + ) + if err := m.cfg.Netlinker.RouteAdd(&ir.route.Route); err != nil { + m.log.Error("liveness: error reinstalling route", + "error", err, "route", ir.route.String()) + } else { + m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP) + } + } +} + // isPeerEffectivelyPassive returns true when this session should not have its // dataplane (kernel route) managed due to peer-advertised passive mode. // diff --git a/client/doublezerod/internal/liveness/manager_test.go b/client/doublezerod/internal/liveness/manager_test.go index c015ef4a1d..eac35a9195 100644 --- a/client/doublezerod/internal/liveness/manager_test.go +++ b/client/doublezerod/internal/liveness/manager_test.go @@ -1650,6 +1650,130 @@ func metricHasLabels(m *prom.Metric, labels prometheus.Labels) bool { return true } +func TestClient_Liveness_Manager_ReconcileRoutes_ReinstallsMissing(t *testing.T) { + t.Parallel() + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Return empty — no routes in kernel. + return nil, nil + }, + } + + m, reg, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = -1 // disable ticker; we call manually + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(nil) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // RegisterRoute in PassiveMode calls RouteAdd once. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 1, addCalls, "expected one RouteAdd call to reinstall the missing route") + mock.mu.Unlock() + + reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(1), reinstalls) +} + +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsPresent(t *testing.T) { + t.Parallel() + + r := newTestRoute(nil) + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(rr *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Return the route as present in kernel. + return []*routing.Route{&r.Route}, nil + }, + } + + m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = -1 + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // Reset after RegisterRoute's install. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall a route that is present in the kernel") + mock.mu.Unlock() +} + +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsUninstalled(t *testing.T) { + t.Parallel() + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + return nil, nil + }, + } + + // Active mode: route is registered but not installed until session goes Up. + m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = false + cfg.RouteReconcileInterval = -1 + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(func(r *Route) { + r.Src = net.IPv4(127, 0, 0, 1) + r.Dst = &net.IPNet{IP: net.IPv4(127, 0, 0, 2), Mask: net.CIDRMask(32, 32)} + }) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // In active mode, installed[rk] is false until session goes Up. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall a route that was never installed") + mock.mu.Unlock() +} + func getHistogramCount(t *testing.T, reg *prometheus.Registry, name string, labels prometheus.Labels) float64 { t.Helper() diff --git a/client/doublezerod/internal/liveness/metrics.go b/client/doublezerod/internal/liveness/metrics.go index d1e9e31842..4aff9a6a1e 100644 --- a/client/doublezerod/internal/liveness/metrics.go +++ b/client/doublezerod/internal/liveness/metrics.go @@ -42,6 +42,7 @@ type Metrics struct { DesiredMapSize prometheus.Gauge PeerSessions *prometheus.GaugeVec PeerDetectTime *prometheus.GaugeVec + RouteReinstalls *prometheus.CounterVec } var ( @@ -212,6 +213,13 @@ func newMetrics() *Metrics { Help: "Size of the desired map", }, ), + RouteReinstalls: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "doublezero_liveness_route_reinstalls_total", + Help: "Count of routes reinstalled after being removed from the kernel by an external process", + }, + serviceLabels, + ), } } @@ -241,6 +249,7 @@ func (m *Metrics) Register(r prometheus.Registerer) { m.DesiredMapSize, m.PeerSessions, m.PeerDetectTime, + m.RouteReinstalls, ) } @@ -296,6 +305,10 @@ func (m *Metrics) convergenceToUp(peer Peer, convergence time.Duration) { m.ConvergenceToUp.WithLabelValues(peer.Interface, peer.LocalIP).Observe(convergence.Seconds()) } +func (m *Metrics) routeReinstall(iface, localIP string) { + m.RouteReinstalls.WithLabelValues(iface, localIP).Inc() +} + func (m *Metrics) convergenceToDown(peer Peer, convergence time.Duration) { m.ConvergenceToDown.WithLabelValues(peer.Interface, peer.LocalIP).Observe(convergence.Seconds()) } From 7c33d53c6d6afcc607cedf6799fcec1097bf785b Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Tue, 5 May 2026 20:17:45 +0000 Subject: [PATCH 2/9] doublezerod: add install failure metric to route reconciliation Increment RouteInstallFailures counter when a reconciliation reinstall fails, matching the observability pattern in onSessionUp. Also pre-allocate the toCheck slice. --- client/doublezerod/internal/liveness/manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/doublezerod/internal/liveness/manager.go b/client/doublezerod/internal/liveness/manager.go index 135494beb3..d3b5cf2dd3 100644 --- a/client/doublezerod/internal/liveness/manager.go +++ b/client/doublezerod/internal/liveness/manager.go @@ -915,7 +915,7 @@ func (m *manager) reconcileRoutes() { route *Route } m.mu.Lock() - var toCheck []installedRoute + toCheck := make([]installedRoute, 0, len(m.installed)) for rk, ok := range m.installed { if !ok { continue @@ -966,6 +966,7 @@ func (m *manager) reconcileRoutes() { if err := m.cfg.Netlinker.RouteAdd(&ir.route.Route); err != nil { m.log.Error("liveness: error reinstalling route", "error", err, "route", ir.route.String()) + m.metrics.RouteInstallFailures.WithLabelValues(ir.rk.Interface, ir.rk.SrcIP).Inc() } else { m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP) } From 44bf3bb132e5f42b488d2a3671903f25de5f1de2 Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Tue, 5 May 2026 20:23:56 +0000 Subject: [PATCH 3/9] doublezerod: add changelog entry for route reconciliation --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d77743a1c..c0e6572390 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file. - Client - Add a `--no-wait` flag to `doublezero disconnect` that skips waiting for the daemon to tear down the tunnel(s), exiting once the onchain user deletion is confirmed. (#3911) + - Add periodic kernel route reconciliation to `doublezerod` that detects and reinstalls missing routes, with a metric tracking install failures ([#3669](https://github.com/malbeclabs/doublezero/issues/3669)) - CLI - Add hidden `migrate flex-algo` (RFC-18 link-topology and Vpnv4 loopback FlexAlgoNodeSegment backfill); the prior `migrate` command is now `migrate user-pda`. Moved from `doublezero-admin`. - Add hidden `device migrate-multicast-counts` and `device migrate-unicast-counts` to reconcile stale per-device subscriber, publisher, and unicast-user counts. Moved from `doublezero-admin`. From 8e5b15886093fd21daca438eaa0f690cb25e5305 Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Tue, 5 May 2026 20:45:17 +0000 Subject: [PATCH 4/9] doublezerod: fix route reconciliation concurrency and correctness issues - Re-check installed state under lock before RouteAdd to prevent resurrecting routes intentionally withdrawn by onSessionDown - Add SrcIP to kernel route lookup key for tighter matching in multi-interface setups - Reject negative RouteReconcileInterval in Validate() - Use named const for reconcile interval flag default - Log when route reconciliation is enabled at startup --- client/doublezerod/cmd/doublezerod/main.go | 15 ++++++------ .../doublezerod/internal/liveness/manager.go | 24 +++++++++++++++---- .../internal/liveness/manager_test.go | 6 ++--- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/client/doublezerod/cmd/doublezerod/main.go b/client/doublezerod/cmd/doublezerod/main.go index f44f1b8949..f2d4b8dde3 100644 --- a/client/doublezerod/cmd/doublezerod/main.go +++ b/client/doublezerod/cmd/doublezerod/main.go @@ -50,7 +50,7 @@ var ( routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult") routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor") routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil") - routeLivenessReconcileInterval = flag.Duration("route-liveness-reconcile-interval", 30*time.Second, "interval for periodic kernel route reconciliation; 0 disables") + routeLivenessReconcileInterval = flag.Duration("route-liveness-reconcile-interval", defaultRouteLivenessReconcileInterval, "interval for periodic kernel route reconciliation; 0 disables") routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)") routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness") @@ -67,12 +67,13 @@ var ( ) const ( - defaultOnchainRPCTimeout = 30 * time.Second - defaultRouteLivenessTxMin = 1 * time.Second - defaultRouteLivenessRxMin = 1 * time.Second - defaultRouteLivenessDetectMult = 3 - defaultRouteLivenessMinTxFloor = 50 * time.Millisecond - defaultRouteLivenessMaxTxCeil = 3 * time.Second + defaultOnchainRPCTimeout = 30 * time.Second + defaultRouteLivenessTxMin = 1 * time.Second + defaultRouteLivenessRxMin = 1 * time.Second + defaultRouteLivenessDetectMult = 3 + defaultRouteLivenessMinTxFloor = 50 * time.Millisecond + defaultRouteLivenessMaxTxCeil = 3 * time.Second + defaultRouteLivenessReconcileInterval = 30 * time.Second defaultRouteLivenessBindIP = "0.0.0.0" ) diff --git a/client/doublezerod/internal/liveness/manager.go b/client/doublezerod/internal/liveness/manager.go index d3b5cf2dd3..0bfd72d3b8 100644 --- a/client/doublezerod/internal/liveness/manager.go +++ b/client/doublezerod/internal/liveness/manager.go @@ -159,6 +159,9 @@ func (c *ManagerConfig) Validate() error { if c.ClientVersion == "" { return errors.New("clientVersion is required") } + if c.RouteReconcileInterval < 0 { + return errors.New("routeReconcileInterval must be non-negative") + } if c.RouteReconcileInterval == 0 { c.RouteReconcileInterval = defaultRouteReconcileInterval } @@ -305,6 +308,7 @@ func NewManager(ctx context.Context, cfg *ManagerConfig, cr *routing.ConfiguredR // Route reconciliation goroutine: periodically scans the kernel routing // table for missing routes and reinstalls them. if cfg.RouteReconcileInterval > 0 { + log.Info("liveness: route reconciliation enabled", "interval", cfg.RouteReconcileInterval.String()) m.wg.Add(1) go func() { defer m.wg.Done() @@ -936,29 +940,41 @@ func (m *manager) reconcileRoutes() { return } - // Build a lookup set keyed by (table, dst, nexthop) for fast matching. + // Build a lookup set keyed by (table, dst, nexthop, src) for fast matching. type kernelKey struct { Table int DstIP string NextHop string + SrcIP string } kernelSet := make(map[kernelKey]struct{}, len(kernelRoutes)) for _, kr := range kernelRoutes { - var dstIP, nhIP string + var dstIP, nhIP, srcIP string if kr.Dst != nil && kr.Dst.IP != nil && kr.Dst.IP.To4() != nil { dstIP = kr.Dst.IP.To4().String() } if kr.NextHop != nil && kr.NextHop.To4() != nil { nhIP = kr.NextHop.To4().String() } - kernelSet[kernelKey{Table: kr.Table, DstIP: dstIP, NextHop: nhIP}] = struct{}{} + if kr.Src != nil && kr.Src.To4() != nil { + srcIP = kr.Src.To4().String() + } + kernelSet[kernelKey{Table: kr.Table, DstIP: dstIP, NextHop: nhIP, SrcIP: srcIP}] = struct{}{} } for _, ir := range toCheck { - kk := kernelKey{Table: ir.route.Table, DstIP: ir.rk.DstPrefix, NextHop: ir.rk.NextHop} + kk := kernelKey{Table: ir.route.Table, DstIP: ir.rk.DstPrefix, NextHop: ir.rk.NextHop, SrcIP: ir.rk.SrcIP} if _, present := kernelSet[kk]; present { continue } + // Re-check under lock: the route may have been intentionally withdrawn + // between our snapshot and now (e.g. by onSessionDown). + m.mu.Lock() + stillInstalled := m.installed[ir.rk] + m.mu.Unlock() + if !stillInstalled { + continue + } m.log.Warn("liveness: reinstalling missing route", "route", ir.route.String(), "iface", ir.rk.Interface, diff --git a/client/doublezerod/internal/liveness/manager_test.go b/client/doublezerod/internal/liveness/manager_test.go index eac35a9195..0c8a3a607e 100644 --- a/client/doublezerod/internal/liveness/manager_test.go +++ b/client/doublezerod/internal/liveness/manager_test.go @@ -1668,7 +1668,7 @@ func TestClient_Liveness_Manager_ReconcileRoutes_ReinstallsMissing(t *testing.T) m, reg, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { cfg.Netlinker = mock cfg.PassiveMode = true - cfg.RouteReconcileInterval = -1 // disable ticker; we call manually + cfg.RouteReconcileInterval = time.Hour // disable ticker; we call manually }) require.NoError(t, err) t.Cleanup(func() { _ = m.Close() }) @@ -1712,7 +1712,7 @@ func TestClient_Liveness_Manager_ReconcileRoutes_SkipsPresent(t *testing.T) { m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { cfg.Netlinker = mock cfg.PassiveMode = true - cfg.RouteReconcileInterval = -1 + cfg.RouteReconcileInterval = time.Hour }) require.NoError(t, err) t.Cleanup(func() { _ = m.Close() }) @@ -1750,7 +1750,7 @@ func TestClient_Liveness_Manager_ReconcileRoutes_SkipsUninstalled(t *testing.T) m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { cfg.Netlinker = mock cfg.PassiveMode = false - cfg.RouteReconcileInterval = -1 + cfg.RouteReconcileInterval = time.Hour }) require.NoError(t, err) t.Cleanup(func() { _ = m.Close() }) From a588e27e576f19d802c85da2f3b98e73826d0667 Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Wed, 17 Jun 2026 17:35:42 +0000 Subject: [PATCH 5/9] doublezerod: address route reconciliation review feedback - Let RouteReconcileInterval=0 disable reconciliation (restore the kill switch); drop the duplicate default constant in the liveness package. - Skip excluded destinations in reconcileRoutes so they no longer churn the reinstall counter and logs every tick. - Hold m.mu across the installed re-check and RouteAdd to close the reconcile/onSessionDown TOCTOU race. - Match kernel routes by full destination prefix (Dst.String()) instead of IP only. - Document the main-table assumption in Netlink.RouteByProtocol. - Add tests for excluded-route skip, install-failure metric on reinstall error, and the 0-disables validation. --- .../doublezerod/internal/liveness/manager.go | 75 +++++++----- .../internal/liveness/manager_test.go | 112 +++++++++++++++++- .../doublezerod/internal/routing/netlink.go | 9 ++ 3 files changed, 168 insertions(+), 28 deletions(-) diff --git a/client/doublezerod/internal/liveness/manager.go b/client/doublezerod/internal/liveness/manager.go index 0bfd72d3b8..139370e39f 100644 --- a/client/doublezerod/internal/liveness/manager.go +++ b/client/doublezerod/internal/liveness/manager.go @@ -26,9 +26,6 @@ const ( defaultBackoffMax = 1 * time.Minute defaultMaxEvents = 10240 - - // Default interval for periodic kernel route reconciliation. - defaultRouteReconcileInterval = 30 * time.Second ) // Peer identifies a remote endpoint and the local interface context used to reach it. @@ -162,9 +159,9 @@ func (c *ManagerConfig) Validate() error { if c.RouteReconcileInterval < 0 { return errors.New("routeReconcileInterval must be non-negative") } - if c.RouteReconcileInterval == 0 { - c.RouteReconcileInterval = defaultRouteReconcileInterval - } + // Note: RouteReconcileInterval == 0 is left as-is to disable reconciliation + // (see the `> 0` guard in NewManager). The operational default comes from + // the flag default in main.go, not from here. return nil } @@ -924,9 +921,20 @@ func (m *manager) reconcileRoutes() { if !ok { continue } - if r, exists := m.desired[rk]; exists { - toCheck = append(toCheck, installedRoute{rk: rk, route: r}) + r, exists := m.desired[rk] + if !exists { + continue + } + // Skip excluded destinations: the manager's Netlinker is a + // ConfiguredRouteReaderWriter whose RouteAdd is a silent no-op for + // these, so they are never actually present in the kernel. Without + // this guard every excluded route would be flagged "missing" and + // "reinstalled" (a no-op) on every tick, inflating the reinstall + // counter and spamming logs forever. + if m.cr != nil && r.Dst != nil && r.Dst.IP != nil && m.cr.IsExcluded(r.Dst.IP.String()) { + continue } + toCheck = append(toCheck, installedRoute{rk: rk, route: r}) } m.mu.Unlock() @@ -941,51 +949,64 @@ func (m *manager) reconcileRoutes() { } // Build a lookup set keyed by (table, dst, nexthop, src) for fast matching. + // Dst uses the full prefix (IP + mask) via *net.IPNet.String() so a kernel + // route with a different mask does not satisfy a desired route at the same + // IP (e.g. 10.0.0.0/16 must not match a desired 10.0.0.0/24). type kernelKey struct { Table int - DstIP string + Dst string NextHop string SrcIP string } + dstString := func(ipnet *net.IPNet) string { + if ipnet == nil || ipnet.IP == nil { + return "" + } + return ipnet.String() + } kernelSet := make(map[kernelKey]struct{}, len(kernelRoutes)) for _, kr := range kernelRoutes { - var dstIP, nhIP, srcIP string - if kr.Dst != nil && kr.Dst.IP != nil && kr.Dst.IP.To4() != nil { - dstIP = kr.Dst.IP.To4().String() - } + var nhIP, srcIP string if kr.NextHop != nil && kr.NextHop.To4() != nil { nhIP = kr.NextHop.To4().String() } if kr.Src != nil && kr.Src.To4() != nil { srcIP = kr.Src.To4().String() } - kernelSet[kernelKey{Table: kr.Table, DstIP: dstIP, NextHop: nhIP, SrcIP: srcIP}] = struct{}{} + kernelSet[kernelKey{Table: kr.Table, Dst: dstString(kr.Dst), NextHop: nhIP, SrcIP: srcIP}] = struct{}{} } for _, ir := range toCheck { - kk := kernelKey{Table: ir.route.Table, DstIP: ir.rk.DstPrefix, NextHop: ir.rk.NextHop, SrcIP: ir.rk.SrcIP} + kk := kernelKey{Table: ir.route.Table, Dst: dstString(ir.route.Dst), NextHop: ir.rk.NextHop, SrcIP: ir.rk.SrcIP} if _, present := kernelSet[kk]; present { continue } - // Re-check under lock: the route may have been intentionally withdrawn - // between our snapshot and now (e.g. by onSessionDown). + // Re-check and reinstall under the lock. onSessionDown flips + // installed[rk] to false under m.mu *before* issuing RouteDelete, so + // holding the lock across the re-check and RouteAdd closes the race: + // either we observe the withdrawal and skip, or our add completes + // before the delete lands and the end state stays consistent. The + // netlink call under the lock only happens for genuinely-missing + // routes, which are rare by definition. m.mu.Lock() - stillInstalled := m.installed[ir.rk] - m.mu.Unlock() - if !stillInstalled { + if !m.installed[ir.rk] { + m.mu.Unlock() continue } - m.log.Warn("liveness: reinstalling missing route", - "route", ir.route.String(), - "iface", ir.rk.Interface, - ) - if err := m.cfg.Netlinker.RouteAdd(&ir.route.Route); err != nil { + err := m.cfg.Netlinker.RouteAdd(&ir.route.Route) + m.mu.Unlock() + + if err != nil { m.log.Error("liveness: error reinstalling route", "error", err, "route", ir.route.String()) m.metrics.RouteInstallFailures.WithLabelValues(ir.rk.Interface, ir.rk.SrcIP).Inc() - } else { - m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP) + continue } + m.log.Warn("liveness: reinstalled missing route", + "route", ir.route.String(), + "iface", ir.rk.Interface, + ) + m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP) } } diff --git a/client/doublezerod/internal/liveness/manager_test.go b/client/doublezerod/internal/liveness/manager_test.go index 0c8a3a607e..42c72604e6 100644 --- a/client/doublezerod/internal/liveness/manager_test.go +++ b/client/doublezerod/internal/liveness/manager_test.go @@ -68,6 +68,12 @@ func TestClient_Liveness_Manager_ConfigValidate(t *testing.T) { require.NotZero(t, cfg.BackoffMax) require.GreaterOrEqual(t, int64(cfg.MaxTxCeil), int64(cfg.MinTxFloor)) require.GreaterOrEqual(t, int64(cfg.BackoffMax), int64(cfg.MinTxFloor)) + + // RouteReconcileInterval == 0 must survive Validate() unchanged so that 0 + // disables reconciliation (the kill switch). A negative value is rejected. + require.Zero(t, cfg.RouteReconcileInterval, "Validate must not rewrite a zero RouteReconcileInterval") + cfg.RouteReconcileInterval = -1 + require.Error(t, cfg.Validate()) } func TestClient_Liveness_Manager_NewManager_BindsAndLocalAddr(t *testing.T) { @@ -1564,6 +1570,10 @@ func newTestManager(t *testing.T, mutate func(*ManagerConfig)) (*manager, error) } func newTestManagerWithMetrics(t *testing.T, mutate func(*ManagerConfig)) (*manager, *prometheus.Registry, error) { + return newTestManagerWithRoutesAndMetrics(t, nil, mutate) +} + +func newTestManagerWithRoutesAndMetrics(t *testing.T, cr *routing.ConfiguredRoutes, mutate func(*ManagerConfig)) (*manager, *prometheus.Registry, error) { reg := prometheus.NewRegistry() cfg := &ManagerConfig{ Logger: newTestLogger(t), @@ -1582,7 +1592,7 @@ func newTestManagerWithMetrics(t *testing.T, mutate func(*ManagerConfig)) (*mana if mutate != nil { mutate(cfg) } - m, err := NewManager(t.Context(), cfg, nil) + m, err := NewManager(t.Context(), cfg, cr) return m, reg, err } @@ -1774,6 +1784,106 @@ func TestClient_Liveness_Manager_ReconcileRoutes_SkipsUninstalled(t *testing.T) mock.mu.Unlock() } +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsExcluded(t *testing.T) { + t.Parallel() + + r := newTestRoute(nil) + + // Configured routes that exclude this route's destination. In production the + // manager's Netlinker is a ConfiguredRouteReaderWriter whose RouteAdd is a + // silent no-op for excluded destinations, so they are never present in the + // kernel even though installed[rk] is marked true. Reconciliation must not + // treat them as missing. + dir := t.TempDir() + cfgPath := filepath.Join(dir, "routes.json") + require.NoError(t, os.WriteFile(cfgPath, []byte(`{"exclude":["`+r.Dst.IP.String()+`"]}`), 0o600)) + cfg, err := routing.NewConfiguredRoutes(cfgPath) + require.NoError(t, err) + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(rr *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Empty — no routes in kernel (the excluded route was never added). + return nil, nil + }, + } + + m, reg, err := newTestManagerWithRoutesAndMetrics(t, cfg, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // Reset after RegisterRoute's install. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall an excluded route") + mock.mu.Unlock() + + reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(0), reinstalls, "excluded route must not increment the reinstall counter") +} + +func TestClient_Liveness_Manager_ReconcileRoutes_IncrementsInstallFailureMetric(t *testing.T) { + t.Parallel() + + addShouldFail := false + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + if addShouldFail { + return errors.New("boom") + } + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Empty — route is missing from the kernel. + return nil, nil + }, + } + + m, reg, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(nil) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // Make the reconcile-time RouteAdd fail. + mock.mu.Lock() + addShouldFail = true + mock.mu.Unlock() + + m.reconcileRoutes() + + failures := getCounterValue(t, reg, "doublezero_liveness_route_install_failures_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(1), failures, "a failed reinstall must increment the install failure metric") + + reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(0), reinstalls, "a failed reinstall must not increment the reinstall counter") +} + func getHistogramCount(t *testing.T, reg *prometheus.Registry, name string, labels prometheus.Labels) float64 { t.Helper() diff --git a/client/doublezerod/internal/routing/netlink.go b/client/doublezerod/internal/routing/netlink.go index 5e774eabf7..197b1c8a39 100644 --- a/client/doublezerod/internal/routing/netlink.go +++ b/client/doublezerod/internal/routing/netlink.go @@ -142,6 +142,15 @@ func (n Netlink) RouteGet(ip net.IP) ([]*Route, error) { return routes, nil } +// RouteByProtocol returns IPv4 routes matching the given routing protocol. +// +// NOTE: only RT_FILTER_PROTOCOL is set, so vishvananda/netlink restricts the +// listing to the main routing table (RT_TABLE_MAIN); routes in other tables +// are skipped unless RT_FILTER_TABLE is also supplied. This is fine for the +// current callers (liveness route reconciliation runs only in IBRL mode, whose +// routes live in the main table). If this is ever used for edge-filtering +// routes in tables 100/101, add RT_FILTER_TABLE and list per desired table, +// otherwise those routes will never be returned. func (n Netlink) RouteByProtocol(protocol int) ([]*Route, error) { routeFilter := &nl.Route{ Protocol: nl.RouteProtocol(protocol), From a69d991e76b97430268213422ef864a82363188c Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Wed, 17 Jun 2026 20:05:38 +0000 Subject: [PATCH 6/9] telemetry/migrations: poll for row visibility in isis latest views test The isis_global_state_latest / isis_overload_bit_latest assertions read the views immediately after inserting, and under CI load the just-inserted rows were not yet visible on the pooled read connection, returning 0 rows. Retry the read until the expected rows appear so the test is deterministic. --- .../migrations/isis_latest_views_test.go | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/telemetry/migrations/isis_latest_views_test.go b/telemetry/migrations/isis_latest_views_test.go index 979ddd6028..b9b7e07be4 100644 --- a/telemetry/migrations/isis_latest_views_test.go +++ b/telemetry/migrations/isis_latest_views_test.go @@ -57,7 +57,7 @@ func TestIsisLatestViews_LastSeenPerNetworkInstance(t *testing.T) { Timestamp time.Time NetworkInstance string } - rows := selectAll(t, db, ` + rows := selectAll(t, db, 2, ` SELECT timestamp, network_instance FROM isis_global_state_latest WHERE device_pubkey = ? @@ -81,7 +81,7 @@ func TestIsisLatestViews_LastSeenPerNetworkInstance(t *testing.T) { NetworkInstance string OverloadBit bool } - rows := selectAll(t, db, ` + rows := selectAll(t, db, 2, ` SELECT timestamp, network_instance, overload_bit FROM isis_overload_bit_latest WHERE device_pubkey = ? @@ -134,18 +134,41 @@ func mustExec(t *testing.T, db *sql.DB, query string, args ...any) { require.NoError(t, err, "exec failed: %s", query) } -func selectAll[T any](t *testing.T, db *sql.DB, query string, scan func(*sql.Rows) (T, error), args ...any) []T { +// selectAll runs query and returns the scanned rows, retrying until at least +// wantLen rows are visible (or a timeout elapses). ClickHouse inserts are not +// always immediately visible to a subsequent read on a different pooled +// connection, so a freshly-inserted row can be momentarily absent under load; +// polling makes the read deterministic without changing what is asserted. +func selectAll[T any](t *testing.T, db *sql.DB, wantLen int, query string, scan func(*sql.Rows) (T, error), args ...any) []T { t.Helper() - rows, err := db.QueryContext(context.Background(), query, args...) - require.NoError(t, err, "query failed: %s", query) - defer rows.Close() - var out []T - for rows.Next() { - v, err := scan(rows) - require.NoError(t, err) - out = append(out, v) + query1 := func() ([]T, error) { + rows, err := db.QueryContext(context.Background(), query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []T + for rows.Next() { + v, err := scan(rows) + if err != nil { + return nil, err + } + out = append(out, v) + } + return out, rows.Err() } - require.NoError(t, rows.Err()) + + var out []T + require.Eventually(t, func() bool { + res, err := query1() + if err != nil { + return false + } + out = res + return len(out) >= wantLen + }, 10*time.Second, 100*time.Millisecond, "query did not return %d row(s): %s", wantLen, query) + return out } From 55f5e0e7d13d079c0a1451a63b38f97db5b8449c Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Wed, 17 Jun 2026 20:21:14 +0000 Subject: [PATCH 7/9] telemetry/migrations: insert isis latest view rows one per statement A multi-row VALUES list with placeholders is not reliably bound by the clickhouse-go database/sql driver and can silently drop rows, leaving the isis_global_state_latest / isis_overload_bit_latest views empty so the test times out waiting for 2 rows. Insert each row in its own single-row INSERT, which the driver binds reliably. --- .../migrations/isis_latest_views_test.go | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/telemetry/migrations/isis_latest_views_test.go b/telemetry/migrations/isis_latest_views_test.go index b9b7e07be4..2fc615c203 100644 --- a/telemetry/migrations/isis_latest_views_test.go +++ b/telemetry/migrations/isis_latest_views_test.go @@ -38,19 +38,23 @@ func TestIsisLatestViews_LastSeenPerNetworkInstance(t *testing.T) { scrapeA := time.Now().UTC().Add(-time.Hour).Truncate(time.Second) scrapeB := scrapeA.Add(time.Minute) - mustExec(t, db, ` - INSERT INTO isis_global_state (timestamp, device_pubkey, network_instance, instance, net, level_capability) VALUES - (?, ?, 'default', 'default', '49.0001.0000.0000.0001.00', 'LEVEL_2'), - (?, ?, 'vrf1', 'vrf1', '49.0002.0000.0000.0001.00', 'LEVEL_2'), - (?, ?, 'default', 'default', '49.0001.0000.0000.0001.00', 'LEVEL_2') - `, scrapeA, device, scrapeA, device, scrapeB, device) - - mustExec(t, db, ` - INSERT INTO isis_overload_bit (timestamp, device_pubkey, network_instance, overload_bit) VALUES - (?, ?, 'default', false), - (?, ?, 'vrf1', true), - (?, ?, 'default', false) - `, scrapeA, device, scrapeA, device, scrapeB, device) + // Insert one row per statement. The clickhouse-go database/sql driver only + // reliably binds placeholders for single-row INSERTs; a multi-row VALUES + // list with placeholders can silently drop rows, leaving the latest views + // empty. + const insertGlobalState = ` + INSERT INTO isis_global_state (timestamp, device_pubkey, network_instance, instance, net, level_capability) + VALUES (?, ?, ?, ?, ?, ?)` + mustExec(t, db, insertGlobalState, scrapeA, device, "default", "default", "49.0001.0000.0000.0001.00", "LEVEL_2") + mustExec(t, db, insertGlobalState, scrapeA, device, "vrf1", "vrf1", "49.0002.0000.0000.0001.00", "LEVEL_2") + mustExec(t, db, insertGlobalState, scrapeB, device, "default", "default", "49.0001.0000.0000.0001.00", "LEVEL_2") + + const insertOverloadBit = ` + INSERT INTO isis_overload_bit (timestamp, device_pubkey, network_instance, overload_bit) + VALUES (?, ?, ?, ?)` + mustExec(t, db, insertOverloadBit, scrapeA, device, "default", false) + mustExec(t, db, insertOverloadBit, scrapeA, device, "vrf1", true) + mustExec(t, db, insertOverloadBit, scrapeB, device, "default", false) t.Run("isis_global_state_latest", func(t *testing.T) { type row struct { From cf310f0d095bff8f7708f1375bc8495a599a6009 Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Thu, 18 Jun 2026 13:44:25 +0000 Subject: [PATCH 8/9] telemetry/migrations: drop redundant view-read polling The read polling added earlier was based on a misdiagnosis: rows appeared missing not because of read-after-write visibility delay but because the multi-row placeholder INSERT silently dropped rows. With single-row inserts the acked data is immediately queryable, so revert selectAll to a direct read. --- .../migrations/isis_latest_views_test.go | 47 +++++-------------- 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/telemetry/migrations/isis_latest_views_test.go b/telemetry/migrations/isis_latest_views_test.go index 2fc615c203..31296e5777 100644 --- a/telemetry/migrations/isis_latest_views_test.go +++ b/telemetry/migrations/isis_latest_views_test.go @@ -61,7 +61,7 @@ func TestIsisLatestViews_LastSeenPerNetworkInstance(t *testing.T) { Timestamp time.Time NetworkInstance string } - rows := selectAll(t, db, 2, ` + rows := selectAll(t, db, ` SELECT timestamp, network_instance FROM isis_global_state_latest WHERE device_pubkey = ? @@ -85,7 +85,7 @@ func TestIsisLatestViews_LastSeenPerNetworkInstance(t *testing.T) { NetworkInstance string OverloadBit bool } - rows := selectAll(t, db, 2, ` + rows := selectAll(t, db, ` SELECT timestamp, network_instance, overload_bit FROM isis_overload_bit_latest WHERE device_pubkey = ? @@ -138,41 +138,18 @@ func mustExec(t *testing.T, db *sql.DB, query string, args ...any) { require.NoError(t, err, "exec failed: %s", query) } -// selectAll runs query and returns the scanned rows, retrying until at least -// wantLen rows are visible (or a timeout elapses). ClickHouse inserts are not -// always immediately visible to a subsequent read on a different pooled -// connection, so a freshly-inserted row can be momentarily absent under load; -// polling makes the read deterministic without changing what is asserted. -func selectAll[T any](t *testing.T, db *sql.DB, wantLen int, query string, scan func(*sql.Rows) (T, error), args ...any) []T { +func selectAll[T any](t *testing.T, db *sql.DB, query string, scan func(*sql.Rows) (T, error), args ...any) []T { t.Helper() - - query1 := func() ([]T, error) { - rows, err := db.QueryContext(context.Background(), query, args...) - if err != nil { - return nil, err - } - defer rows.Close() - - var out []T - for rows.Next() { - v, err := scan(rows) - if err != nil { - return nil, err - } - out = append(out, v) - } - return out, rows.Err() - } + rows, err := db.QueryContext(context.Background(), query, args...) + require.NoError(t, err, "query failed: %s", query) + defer rows.Close() var out []T - require.Eventually(t, func() bool { - res, err := query1() - if err != nil { - return false - } - out = res - return len(out) >= wantLen - }, 10*time.Second, 100*time.Millisecond, "query did not return %d row(s): %s", wantLen, query) - + for rows.Next() { + v, err := scan(rows) + require.NoError(t, err) + out = append(out, v) + } + require.NoError(t, rows.Err()) return out } From daddc70e9c1246f606d477757ae656c91abe85c2 Mon Sep 17 00:00:00 2001 From: nikw9944 Date: Wed, 24 Jun 2026 14:47:22 +0000 Subject: [PATCH 9/9] doublezerod: close passive WithdrawRoute resurrection race and harden reconcile tests Clear installed[rk]/desired[rk] under the lock before the kernel RouteDelete in passive-mode WithdrawRoute, mirroring onSessionDown. The previous ordering deleted the kernel route first, leaving a window where reconcileRoutes could observe the route missing while installed[rk] was still true and resurrect a route the manager believed was withdrawn. Make the reconcile tests reflect production: default test routes to RT_TABLE_MAIN (RouteByProtocol only returns main-table routes) and have SkipsPresent return a freshly-constructed *routing.Route instead of the identical pointer, so kernelKey matching is genuinely exercised on both sides. Add a test asserting the passive WithdrawRoute clear-before-delete ordering. --- .../internal/liveness/main_test.go | 6 +- .../doublezerod/internal/liveness/manager.go | 28 ++++----- .../internal/liveness/manager_test.go | 59 ++++++++++++++++++- 3 files changed, 76 insertions(+), 17 deletions(-) diff --git a/client/doublezerod/internal/liveness/main_test.go b/client/doublezerod/internal/liveness/main_test.go index 4736b6fe9f..1f2ee80c1f 100644 --- a/client/doublezerod/internal/liveness/main_test.go +++ b/client/doublezerod/internal/liveness/main_test.go @@ -70,7 +70,11 @@ func wait[T any](t *testing.T, ch <-chan T, d time.Duration, name string) T { func newTestRoute(mutate func(*Route)) *Route { r := &Route{Route: routing.Route{ - Table: 100, + // Use the main routing table: liveness only attaches in IBRL mode, whose + // routes live in RT_TABLE_MAIN, and RouteByProtocol only returns + // main-table routes. A non-main table would never come back from the real + // backend, so reconcile tests must mirror production here. + Table: unix.RT_TABLE_MAIN, Src: net.IPv4(10, 4, 0, 1), Dst: &net.IPNet{IP: net.IPv4(10, 4, 0, 11), Mask: net.CIDRMask(32, 32)}, NextHop: net.IPv4(10, 5, 0, 1), diff --git a/client/doublezerod/internal/liveness/manager.go b/client/doublezerod/internal/liveness/manager.go index 139370e39f..e0c0fea128 100644 --- a/client/doublezerod/internal/liveness/manager.go +++ b/client/doublezerod/internal/liveness/manager.go @@ -446,15 +446,14 @@ func (m *manager) WithdrawRoute(r *Route, iface string) error { m.log.Info("liveness: withdrawing route", "route", r.String(), "iface", iface) - if m.cfg.PassiveMode && !r.NoUninstall { - // Passive-mode: caller wants immediate kernel update independent of liveness. - if err := m.cfg.Netlinker.RouteDelete(&r.Route); err != nil { - m.metrics.RouteUninstallFailures.WithLabelValues(iface, srcIP).Inc() - return fmt.Errorf("error withdrawing route: %v", err) - } - m.metrics.routeWithdraw(iface, srcIP) - } - + // Clear desired/installed under the lock *before* issuing any kernel + // RouteDelete, mirroring onSessionDown. reconcileRoutes re-checks + // installed[rk] under m.mu before its RouteAdd, so clearing the flag first + // closes the resurrection race: reconcile either observes the withdrawal + // and skips, or its add lands before our delete and the end state is the + // route gone. (The previous passive-mode ordering deleted the kernel route + // first, leaving a window where reconcile could resurrect a route the + // manager believed was withdrawn.) rk := routeKeyFor(iface, r) m.mu.Lock() delete(m.desired, rk) @@ -479,12 +478,13 @@ func (m *manager) WithdrawRoute(r *Route, iface string) error { m.metrics.SessionsMapSize.Set(float64(len(m.sessions))) m.mu.Unlock() - // If we previously installed the route (and not in PassiveMode), remove it now. - if wasInstalled && !m.cfg.PassiveMode && !r.NoUninstall { - err := m.cfg.Netlinker.RouteDelete(&r.Route) - if err != nil { + // Remove the kernel route. In passive mode the caller wants an immediate + // kernel update independent of liveness, so we always delete; otherwise we + // only delete a route we previously installed. + if !r.NoUninstall && (m.cfg.PassiveMode || wasInstalled) { + if err := m.cfg.Netlinker.RouteDelete(&r.Route); err != nil { m.metrics.RouteUninstallFailures.WithLabelValues(iface, srcIP).Inc() - return err + return fmt.Errorf("error withdrawing route: %v", err) } m.metrics.routeWithdraw(iface, srcIP) } diff --git a/client/doublezerod/internal/liveness/manager_test.go b/client/doublezerod/internal/liveness/manager_test.go index 42c72604e6..091a35e736 100644 --- a/client/doublezerod/internal/liveness/manager_test.go +++ b/client/doublezerod/internal/liveness/manager_test.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" ) func TestClient_Liveness_Manager_ConfigValidate(t *testing.T) { @@ -1714,8 +1715,19 @@ func TestClient_Liveness_Manager_ReconcileRoutes_SkipsPresent(t *testing.T) { return nil }, RouteByProtocolFunc: func(int) ([]*routing.Route, error) { - // Return the route as present in kernel. - return []*routing.Route{&r.Route}, nil + // Return a freshly-constructed route with the same field values rather + // than the identical &r.Route pointer the manager installed. This is + // how the real netlink backend behaves (4-byte vs 16-byte net.IP, + // prefsrc echo, mask normalization), so it genuinely exercises + // kernelKey construction on both the kernel and desired sides instead + // of trivially matching by pointer identity. + return []*routing.Route{{ + Table: r.Table, + Src: net.IPv4(10, 4, 0, 1).To4(), + Dst: &net.IPNet{IP: net.IPv4(10, 4, 0, 11).To4(), Mask: net.CIDRMask(32, 32)}, + NextHop: net.IPv4(10, 5, 0, 1).To4(), + Protocol: unix.RTPROT_BGP, + }}, nil }, } @@ -1884,6 +1896,49 @@ func TestClient_Liveness_Manager_ReconcileRoutes_IncrementsInstallFailureMetric( require.Equal(t, float64(0), reinstalls, "a failed reinstall must not increment the reinstall counter") } +// TestClient_Liveness_Manager_WithdrawRoute_PassiveClearsInstalledBeforeDelete +// guards the ordering the reconcile resurrection-race fix depends on: in passive +// mode WithdrawRoute must clear installed[rk] under the lock *before* issuing the +// kernel RouteDelete. If it deleted first (the old ordering), reconcile could +// observe the route missing from the kernel while installed[rk] was still true +// and resurrect a route the manager believed was withdrawn. +func TestClient_Liveness_Manager_WithdrawRoute_PassiveClearsInstalledBeforeDelete(t *testing.T) { + t.Parallel() + + r := newTestRoute(nil) + rk := routeKeyFor("lo", r) + + var installedAtDelete bool + var deleteCalled bool + var mgr *manager + mock := &MockRouteReaderWriter{ + RouteDeleteFunc: func(*routing.Route) error { + deleteCalled = true + installedAtDelete = mgr.IsInstalled(rk) + return nil + }, + } + + m, err := newTestManager(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + mgr = m + + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + require.True(t, mgr.IsInstalled(rk), "route should be installed after RegisterRoute in passive mode") + + err = m.WithdrawRoute(r, "lo") + require.NoError(t, err) + + require.True(t, deleteCalled, "passive WithdrawRoute must issue a kernel delete") + require.False(t, installedAtDelete, "installed[rk] must be cleared before the kernel RouteDelete") + require.False(t, mgr.IsInstalled(rk), "route should not be installed after WithdrawRoute") +} + func getHistogramCount(t *testing.T, reg *prometheus.Registry, name string, labels prometheus.Labels) float64 { t.Helper()