Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file.

- Client
- Add a `--no-wait` flag to `doublezero disconnect` that skips waiting for the daemon to tear down the tunnel(s), exiting once the onchain user deletion is confirmed. (#3911)
- Add periodic kernel route reconciliation to `doublezerod` that detects and reinstalls missing routes, with a metric tracking install failures ([#3669](https://github.com/malbeclabs/doublezero/issues/3669))
- CLI
- Add hidden `migrate flex-algo` (RFC-18 link-topology and Vpnv4 loopback FlexAlgoNodeSegment backfill); the prior `migrate` command is now `migrate user-pda`. Moved from `doublezero-admin`.
- Add hidden `device migrate-multicast-counts` and `device migrate-unicast-counts` to reconcile stale per-device subscriber, publisher, and unicast-user counts. Moved from `doublezero-admin`.
Expand Down
30 changes: 17 additions & 13 deletions client/doublezerod/cmd/doublezerod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ var (
stateDir = flag.String("state-dir", "/var/lib/doublezerod", "directory for persistent state files")

// Route liveness configuration flags.
routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min")
routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min")
routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult")
routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor")
routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil")
routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)")
routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness")
routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min")
routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min")
routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult")
routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor")
routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil")
routeLivenessReconcileInterval = flag.Duration("route-liveness-reconcile-interval", defaultRouteLivenessReconcileInterval, "interval for periodic kernel route reconciliation; 0 disables")
routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)")
routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness")

// TODO(snormore): These flags are temporary for initial rollout testing.
// They will be superceded by a single `route-liveness-enable` flag, where false means
Expand All @@ -66,12 +67,13 @@ var (
)

const (
defaultOnchainRPCTimeout = 30 * time.Second
defaultRouteLivenessTxMin = 1 * time.Second
defaultRouteLivenessRxMin = 1 * time.Second
defaultRouteLivenessDetectMult = 3
defaultRouteLivenessMinTxFloor = 50 * time.Millisecond
defaultRouteLivenessMaxTxCeil = 3 * time.Second
defaultOnchainRPCTimeout = 30 * time.Second
defaultRouteLivenessTxMin = 1 * time.Second
defaultRouteLivenessRxMin = 1 * time.Second
defaultRouteLivenessDetectMult = 3
defaultRouteLivenessMinTxFloor = 50 * time.Millisecond
defaultRouteLivenessMaxTxCeil = 3 * time.Second
defaultRouteLivenessReconcileInterval = 30 * time.Second

defaultRouteLivenessBindIP = "0.0.0.0"
)
Expand Down Expand Up @@ -179,6 +181,8 @@ func main() {

EnablePeerMetrics: *routeLivenessPeerMetrics,

RouteReconcileInterval: *routeLivenessReconcileInterval,

// Default to treating peers that advertise passive mode as passive. That is, we will
// install their routes immediately and never uninstall them on down events.
HonorPeerAdvertisedPassive: true,
Expand Down
6 changes: 5 additions & 1 deletion client/doublezerod/internal/liveness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ func wait[T any](t *testing.T, ch <-chan T, d time.Duration, name string) T {

func newTestRoute(mutate func(*Route)) *Route {
r := &Route{Route: routing.Route{
Table: 100,
// Use the main routing table: liveness only attaches in IBRL mode, whose
// routes live in RT_TABLE_MAIN, and RouteByProtocol only returns
// main-table routes. A non-main table would never come back from the real
// backend, so reconcile tests must mirror production here.
Table: unix.RT_TABLE_MAIN,
Src: net.IPv4(10, 4, 0, 1),
Dst: &net.IPNet{IP: net.IPv4(10, 4, 0, 11), Mask: net.CIDRMask(32, 32)},
NextHop: net.IPv4(10, 5, 0, 1),
Expand Down
167 changes: 151 additions & 16 deletions client/doublezerod/internal/liveness/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sys/unix"
)

const (
Expand Down Expand Up @@ -92,6 +93,10 @@ type ManagerConfig struct {

// Client version to advertise to peers in control packets.
ClientVersion string

// RouteReconcileInterval controls how often the manager scans the kernel
// routing table for missing routes and reinstalls them. Zero disables.
RouteReconcileInterval time.Duration
}

// Validate fills defaults and enforces constraints for ManagerConfig.
Expand Down Expand Up @@ -151,6 +156,12 @@ func (c *ManagerConfig) Validate() error {
if c.ClientVersion == "" {
return errors.New("clientVersion is required")
}
if c.RouteReconcileInterval < 0 {
return errors.New("routeReconcileInterval must be non-negative")
}
// Note: RouteReconcileInterval == 0 is left as-is to disable reconciliation
// (see the `> 0` guard in NewManager). The operational default comes from
// the flag default in main.go, not from here.
return nil
}

Expand Down Expand Up @@ -291,6 +302,26 @@ func NewManager(ctx context.Context, cfg *ManagerConfig, cr *routing.ConfiguredR
}
}()

// Route reconciliation goroutine: periodically scans the kernel routing
// table for missing routes and reinstalls them.
if cfg.RouteReconcileInterval > 0 {
log.Info("liveness: route reconciliation enabled", "interval", cfg.RouteReconcileInterval.String())
m.wg.Add(1)
go func() {
defer m.wg.Done()
ticker := time.NewTicker(cfg.RouteReconcileInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.reconcileRoutes()
}
}
}()
}

// If any routes are configured to be excluded, mark then as AdminDown immediately.
if m.cr != nil {
for ip := range m.cr.GetExcluded() {
Expand Down Expand Up @@ -415,15 +446,14 @@ func (m *manager) WithdrawRoute(r *Route, iface string) error {

m.log.Info("liveness: withdrawing route", "route", r.String(), "iface", iface)

if m.cfg.PassiveMode && !r.NoUninstall {
// Passive-mode: caller wants immediate kernel update independent of liveness.
if err := m.cfg.Netlinker.RouteDelete(&r.Route); err != nil {
m.metrics.RouteUninstallFailures.WithLabelValues(iface, srcIP).Inc()
return fmt.Errorf("error withdrawing route: %v", err)
}
m.metrics.routeWithdraw(iface, srcIP)
}

// Clear desired/installed under the lock *before* issuing any kernel
// RouteDelete, mirroring onSessionDown. reconcileRoutes re-checks
// installed[rk] under m.mu before its RouteAdd, so clearing the flag first
// closes the resurrection race: reconcile either observes the withdrawal
// and skips, or its add lands before our delete and the end state is the
// route gone. (The previous passive-mode ordering deleted the kernel route
// first, leaving a window where reconcile could resurrect a route the
// manager believed was withdrawn.)
rk := routeKeyFor(iface, r)
m.mu.Lock()
delete(m.desired, rk)
Expand All @@ -448,12 +478,13 @@ func (m *manager) WithdrawRoute(r *Route, iface string) error {
m.metrics.SessionsMapSize.Set(float64(len(m.sessions)))
m.mu.Unlock()

// If we previously installed the route (and not in PassiveMode), remove it now.
if wasInstalled && !m.cfg.PassiveMode && !r.NoUninstall {
err := m.cfg.Netlinker.RouteDelete(&r.Route)
if err != nil {
// Remove the kernel route. In passive mode the caller wants an immediate
// kernel update independent of liveness, so we always delete; otherwise we
// only delete a route we previously installed.
if !r.NoUninstall && (m.cfg.PassiveMode || wasInstalled) {
if err := m.cfg.Netlinker.RouteDelete(&r.Route); err != nil {
m.metrics.RouteUninstallFailures.WithLabelValues(iface, srcIP).Inc()
return err
return fmt.Errorf("error withdrawing route: %v", err)
}
m.metrics.routeWithdraw(iface, srcIP)
}
Expand Down Expand Up @@ -834,7 +865,7 @@ func (m *manager) onSessionDown(sess *Session) {
}

if m.cfg.PassiveMode {
m.log.Debug("liveness: session down (global passive; keeping route)",
m.log.Info("liveness: session down (global passive; keeping route)",
"peer", peer.String(),
"route", snap.Route.String(),
"downSince", snap.DownSince.UTC().String(),
Expand All @@ -845,7 +876,7 @@ func (m *manager) onSessionDown(sess *Session) {
}

if effectivelyPassive {
m.log.Debug("liveness: session down (peer passive; keeping route)",
m.log.Info("liveness: session down (peer passive; keeping route)",
"peer", peer.String(),
"route", snap.Route.String(),
"downSince", snap.DownSince.UTC().String(),
Expand Down Expand Up @@ -875,6 +906,110 @@ func (m *manager) onSessionDown(sess *Session) {
)
}

// reconcileRoutes scans the kernel routing table for routes that should be
// installed but are missing, and reinstalls them. This mitigates routes being
// removed by external processes.
func (m *manager) reconcileRoutes() {
// Snapshot installed and desired under lock.
type installedRoute struct {
rk RouteKey
route *Route
}
m.mu.Lock()
toCheck := make([]installedRoute, 0, len(m.installed))
for rk, ok := range m.installed {
if !ok {
continue
}
r, exists := m.desired[rk]
if !exists {
continue
}
// Skip excluded destinations: the manager's Netlinker is a
// ConfiguredRouteReaderWriter whose RouteAdd is a silent no-op for
// these, so they are never actually present in the kernel. Without
// this guard every excluded route would be flagged "missing" and
// "reinstalled" (a no-op) on every tick, inflating the reinstall
// counter and spamming logs forever.
if m.cr != nil && r.Dst != nil && r.Dst.IP != nil && m.cr.IsExcluded(r.Dst.IP.String()) {
continue
}
toCheck = append(toCheck, installedRoute{rk: rk, route: r})
}
m.mu.Unlock()

if len(toCheck) == 0 {
return
}

kernelRoutes, err := m.cfg.Netlinker.RouteByProtocol(unix.RTPROT_BGP)
Comment thread
nikw9944 marked this conversation as resolved.
if err != nil {
m.log.Error("liveness: error fetching kernel routes for reconciliation", "error", err)
return
}

// Build a lookup set keyed by (table, dst, nexthop, src) for fast matching.
// Dst uses the full prefix (IP + mask) via *net.IPNet.String() so a kernel
// route with a different mask does not satisfy a desired route at the same
// IP (e.g. 10.0.0.0/16 must not match a desired 10.0.0.0/24).
type kernelKey struct {
Table int
Dst string
NextHop string
SrcIP string
}
dstString := func(ipnet *net.IPNet) string {
if ipnet == nil || ipnet.IP == nil {
return ""
}
return ipnet.String()
}
kernelSet := make(map[kernelKey]struct{}, len(kernelRoutes))
for _, kr := range kernelRoutes {
var nhIP, srcIP string
if kr.NextHop != nil && kr.NextHop.To4() != nil {
nhIP = kr.NextHop.To4().String()
}
if kr.Src != nil && kr.Src.To4() != nil {
srcIP = kr.Src.To4().String()
}
kernelSet[kernelKey{Table: kr.Table, Dst: dstString(kr.Dst), NextHop: nhIP, SrcIP: srcIP}] = struct{}{}
}

for _, ir := range toCheck {
Comment thread
nikw9944 marked this conversation as resolved.
kk := kernelKey{Table: ir.route.Table, Dst: dstString(ir.route.Dst), NextHop: ir.rk.NextHop, SrcIP: ir.rk.SrcIP}
if _, present := kernelSet[kk]; present {
continue
}
// Re-check and reinstall under the lock. onSessionDown flips
// installed[rk] to false under m.mu *before* issuing RouteDelete, so
// holding the lock across the re-check and RouteAdd closes the race:
// either we observe the withdrawal and skip, or our add completes
// before the delete lands and the end state stays consistent. The
// netlink call under the lock only happens for genuinely-missing
// routes, which are rare by definition.
m.mu.Lock()
if !m.installed[ir.rk] {
m.mu.Unlock()
continue
}
err := m.cfg.Netlinker.RouteAdd(&ir.route.Route)
m.mu.Unlock()

if err != nil {
m.log.Error("liveness: error reinstalling route",
"error", err, "route", ir.route.String())
m.metrics.RouteInstallFailures.WithLabelValues(ir.rk.Interface, ir.rk.SrcIP).Inc()
continue
}
m.log.Warn("liveness: reinstalled missing route",
"route", ir.route.String(),
"iface", ir.rk.Interface,
)
m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP)
}
}

// isPeerEffectivelyPassive returns true when this session should not have its
// dataplane (kernel route) managed due to peer-advertised passive mode.
//
Expand Down
Loading
Loading