Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 70 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ import (
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/utils/ptr"

// Set runtime concurrency to match CPU limit imposed by Kubernetes
_ "go.uber.org/automaxprocs"

"github.com/sapcc/go-api-declarations/bininfo"
"go.uber.org/zap/zapcore"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
Expand All @@ -44,6 +45,7 @@ import (
nxcontroller "github.com/ironcore-dev/network-operator/internal/controller/cisco/nx"
corecontroller "github.com/ironcore-dev/network-operator/internal/controller/core"
"github.com/ironcore-dev/network-operator/internal/provider"
"github.com/ironcore-dev/network-operator/internal/resourcelock"
webhooknxv1alpha1 "github.com/ironcore-dev/network-operator/internal/webhook/cisco/nx/v1alpha1"
webhookv1alpha1 "github.com/ironcore-dev/network-operator/internal/webhook/core/v1alpha1"
// +kubebuilder:scaffold:imports
Expand Down Expand Up @@ -76,6 +78,10 @@ func main() {
var watchFilterValue string
var providerName string
var requeueInterval time.Duration
var maxConcurrentReconciles int
var lockerNamespace string
var lockerDuration time.Duration
var lockerRenewInterval time.Duration
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
Expand All @@ -90,6 +96,10 @@ func main() {
flag.StringVar(&watchFilterValue, "watch-filter", "", fmt.Sprintf("Label value that the controller watches to reconcile api objects. Label key is always %q. If unspecified, the controller watches for all api objects.", v1alpha1.WatchLabel))
flag.StringVar(&providerName, "provider", "openconfig", "The provider to use for the controller. If not specified, the default provider is used. Available providers: "+strings.Join(provider.Providers(), ", "))
flag.DurationVar(&requeueInterval, "requeue-interval", 30*time.Second, "The interval after which Kubernetes resources should be reconciled again regardless of whether they have changed.")
flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", 1, "The maximum number of concurrent reconciles per controller. Defaults to 1.")
flag.StringVar(&lockerNamespace, "locker-namespace", "", "The namespace to use for resource locker coordination. If not specified, uses the namespace the manager is deployed in, or 'default' if undetectable.")
flag.DurationVar(&lockerDuration, "locker-duration", 10*time.Second, "The duration of the resource locker lease.")
flag.DurationVar(&lockerRenewInterval, "locker-renew-interval", 5*time.Second, "The interval at which the resource locker lease is renewed.")
opts := zap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
Expand Down Expand Up @@ -188,7 +198,7 @@ func main() {

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Cache: cache.Options{ReaderFailOnMissingInformer: true},
Controller: config.Controller{UsePriorityQueue: ptr.To(true)},
Controller: config.Controller{UsePriorityQueue: ptr.To(true), MaxConcurrentReconciles: maxConcurrentReconciles},
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
Expand All @@ -200,12 +210,7 @@ func main() {
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
LeaderElectionReleaseOnCancel: true,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
Expand All @@ -221,6 +226,38 @@ func main() {

ctx := ctrl.SetupSignalHandler()

if lockerNamespace == "" {
if ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
lockerNamespace = strings.TrimSpace(string(ns))
setupLog.Info("Detected namespace from service account", "namespace", lockerNamespace)
} else {
lockerNamespace = "default"
setupLog.Info("Using default namespace for resource locker", "namespace", lockerNamespace)
}
}

locker, err := resourcelock.NewResourceLocker(mgr.GetClient(), lockerNamespace, lockerDuration, lockerRenewInterval)
if err != nil {
setupLog.Error(err, "unable to create resource locker")
os.Exit(1)
}

// Set up cache informer for Lease resources used by ResourceLocker.
// This ensures the cache has an informer for coordination.k8s.io/v1 Lease resources
// before any controller tries to use the ResourceLocker, which is required when
// ReaderFailOnMissingInformer is set to true.
if _, err := mgr.GetCache().GetInformer(ctx, &coordinationv1.Lease{}); err != nil {
setupLog.Error(err, "unable to get informer for Lease resources")
os.Exit(1)
}
setupLog.Info("Lease cache informer initialized", "namespace", lockerNamespace)

// Add the ResourceLocker to the manager so it will be properly cleaned up on shutdown.
if err := mgr.Add(locker); err != nil {
setupLog.Error(err, "unable to add resource locker to manager")
os.Exit(1)
}

if err := (&corecontroller.DeviceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -239,6 +276,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("interface-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Interface")
Expand All @@ -251,6 +289,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("banner-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Banner")
os.Exit(1)
Expand All @@ -262,6 +301,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("user-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "User")
os.Exit(1)
Expand All @@ -273,6 +313,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("dns-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DNS")
os.Exit(1)
Expand All @@ -284,6 +325,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("ntp-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NTP")
os.Exit(1)
Expand All @@ -295,6 +337,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("acl-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AccessControlList")
os.Exit(1)
Expand All @@ -306,6 +349,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("certificate-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Certificate")
os.Exit(1)
Expand All @@ -317,6 +361,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("snmp-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "SNMP")
os.Exit(1)
Expand All @@ -328,6 +373,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("syslog-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Syslog")
os.Exit(1)
Expand All @@ -339,6 +385,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("managementaccess-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ManagementAccess")
os.Exit(1)
Expand All @@ -350,6 +397,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("isis-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ISIS")
Expand All @@ -362,6 +410,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("pim-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PIM")
Expand All @@ -374,6 +423,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("bgp-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BGP")
Expand All @@ -386,6 +436,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("bgppeer-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BGPPeer")
Expand All @@ -398,6 +449,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("ospf-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "OSPF")
Expand All @@ -410,6 +462,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("vlan-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VLAN")
Expand All @@ -422,6 +475,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("vrf-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VRF")
Expand All @@ -431,9 +485,10 @@ func main() {
if err := (&nxcontroller.VPCDomainReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("vpcdomain-controller"),
Recorder: mgr.GetEventRecorderFor("cisco-nx-vpcdomain-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VPCDomain")
Expand All @@ -446,6 +501,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("nve-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
RequeueInterval: requeueInterval,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NetworkVirtualizationEdge")
Expand All @@ -458,6 +514,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("cisco-nx-system-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "System")
os.Exit(1)
Expand All @@ -469,6 +526,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("evpn-instance-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EVPNInstance")
os.Exit(1)
Expand All @@ -480,6 +538,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("prefixset-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PrefixSet")
os.Exit(1)
Expand All @@ -491,6 +550,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("routingpolicy-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RoutingPolicy")
os.Exit(1)
Expand All @@ -502,6 +562,7 @@ func main() {
Recorder: mgr.GetEventRecorderFor("cisco-nx-border-gateway-controller"),
WatchFilterValue: watchFilterValue,
Provider: prov,
Locker: locker,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BorderGateway")
os.Exit(1)
Expand Down
3 changes: 2 additions & 1 deletion config/develop/manager_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
- --leader-elect=false
- --health-probe-bind-address=:8081
- --provider=openconfig
- --requeue-interval=10s
- --requeue-interval=15s
- --max-concurrent-reconciles=5
11 changes: 11 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ rules:
- list
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- networking.metal.ironcore.dev
resources:
Expand Down
21 changes: 21 additions & 0 deletions internal/controller/cisco/nx/bordergateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package nx

import (
"context"
"errors"
"fmt"
"slices"
"time"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/ironcore-dev/network-operator/internal/deviceutil"
"github.com/ironcore-dev/network-operator/internal/provider"
"github.com/ironcore-dev/network-operator/internal/provider/cisco/nxos"
"github.com/ironcore-dev/network-operator/internal/resourcelock"
)

// BorderGatewayReconciler reconciles a BorderGateway object
Expand All @@ -47,6 +50,9 @@ type BorderGatewayReconciler struct {

// Provider is the driver that will be used to create & delete the bordergateway.
Provider provider.ProviderFunc

// Locker is used to synchronize operations on resources targeting the same device.
Locker *resourcelock.ResourceLocker
}

// +kubebuilder:rbac:groups=nx.cisco.networking.metal.ironcore.dev,resources=bordergateways,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -97,6 +103,21 @@ func (r *BorderGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

if err := r.Locker.AcquireLock(ctx, device.Name, "cisco-nx-border-gateway-controller"); err != nil {
if errors.Is(err, resourcelock.ErrLockAlreadyHeld) {
log.Info("Device is already locked, requeuing reconciliation")
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
log.Error(err, "Failed to acquire device lock")
return ctrl.Result{}, err
}
defer func() {
if err := r.Locker.ReleaseLock(ctx, device.Name, "cisco-nx-border-gateway-controller"); err != nil {
log.Error(err, "Failed to release device lock")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}()

conn, err := deviceutil.GetDeviceConnection(ctx, r, device)
if err != nil {
return ctrl.Result{}, err
Expand Down
Loading
Loading