diff --git a/cmd/main.go b/cmd/main.go index c23324c6..3139087b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,16 +15,17 @@ import ( // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/utils/ptr" // Set runtime concurrency to match CPU limit imposed by Kubernetes _ "go.uber.org/automaxprocs" "github.com/sapcc/go-api-declarations/bininfo" "go.uber.org/zap/zapcore" + coordinationv1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/certwatcher" @@ -44,6 +45,7 @@ import ( nxcontroller "github.com/ironcore-dev/network-operator/internal/controller/cisco/nx" corecontroller "github.com/ironcore-dev/network-operator/internal/controller/core" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" webhooknxv1alpha1 "github.com/ironcore-dev/network-operator/internal/webhook/cisco/nx/v1alpha1" webhookv1alpha1 "github.com/ironcore-dev/network-operator/internal/webhook/core/v1alpha1" // +kubebuilder:scaffold:imports @@ -76,6 +78,10 @@ func main() { var watchFilterValue string var providerName string var requeueInterval time.Duration + var maxConcurrentReconciles int + var lockerNamespace string + var lockerDuration time.Duration + var lockerRenewInterval time.Duration flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") @@ -90,6 +96,10 @@ func main() { flag.StringVar(&watchFilterValue, "watch-filter", "", fmt.Sprintf("Label value that the controller watches to reconcile api objects. Label key is always %q. If unspecified, the controller watches for all api objects.", v1alpha1.WatchLabel)) flag.StringVar(&providerName, "provider", "openconfig", "The provider to use for the controller. If not specified, the default provider is used. Available providers: "+strings.Join(provider.Providers(), ", ")) flag.DurationVar(&requeueInterval, "requeue-interval", 30*time.Second, "The interval after which Kubernetes resources should be reconciled again regardless of whether they have changed.") + flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", 1, "The maximum number of concurrent reconciles per controller. Defaults to 1.") + flag.StringVar(&lockerNamespace, "locker-namespace", "", "The namespace to use for resource locker coordination. If not specified, uses the namespace the manager is deployed in, or 'default' if undetectable.") + flag.DurationVar(&lockerDuration, "locker-duration", 10*time.Second, "The duration of the resource locker lease.") + flag.DurationVar(&lockerRenewInterval, "locker-renew-interval", 5*time.Second, "The interval at which the resource locker lease is renewed.") opts := zap.Options{ Development: true, TimeEncoder: zapcore.ISO8601TimeEncoder, @@ -188,7 +198,7 @@ func main() { mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Cache: cache.Options{ReaderFailOnMissingInformer: true}, - Controller: config.Controller{UsePriorityQueue: ptr.To(true)}, + Controller: config.Controller{UsePriorityQueue: ptr.To(true), MaxConcurrentReconciles: maxConcurrentReconciles}, Scheme: scheme, Metrics: metricsServerOptions, WebhookServer: webhookServer, @@ -200,12 +210,7 @@ func main() { // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly // speeds up voluntary leader transitions as the new leader don't have to wait // LeaseDuration time first. - // - // In the default scaffold provided, the program ends immediately after - // the manager stops, so would be fine to enable this option. However, - // if you are doing or is intended to do any operation such as perform cleanups - // after the manager stops then its usage might be unsafe. - // LeaderElectionReleaseOnCancel: true, + LeaderElectionReleaseOnCancel: true, }) if err != nil { setupLog.Error(err, "unable to start manager") @@ -221,6 +226,38 @@ func main() { ctx := ctrl.SetupSignalHandler() + if lockerNamespace == "" { + if ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + lockerNamespace = strings.TrimSpace(string(ns)) + setupLog.Info("Detected namespace from service account", "namespace", lockerNamespace) + } else { + lockerNamespace = "default" + setupLog.Info("Using default namespace for resource locker", "namespace", lockerNamespace) + } + } + + locker, err := resourcelock.NewResourceLocker(mgr.GetClient(), lockerNamespace, lockerDuration, lockerRenewInterval) + if err != nil { + setupLog.Error(err, "unable to create resource locker") + os.Exit(1) + } + + // Set up cache informer for Lease resources used by ResourceLocker. + // This ensures the cache has an informer for coordination.k8s.io/v1 Lease resources + // before any controller tries to use the ResourceLocker, which is required when + // ReaderFailOnMissingInformer is set to true. + if _, err := mgr.GetCache().GetInformer(ctx, &coordinationv1.Lease{}); err != nil { + setupLog.Error(err, "unable to get informer for Lease resources") + os.Exit(1) + } + setupLog.Info("Lease cache informer initialized", "namespace", lockerNamespace) + + // Add the ResourceLocker to the manager so it will be properly cleaned up on shutdown. + if err := mgr.Add(locker); err != nil { + setupLog.Error(err, "unable to add resource locker to manager") + os.Exit(1) + } + if err := (&corecontroller.DeviceReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -239,6 +276,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("interface-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Interface") @@ -251,6 +289,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("banner-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Banner") os.Exit(1) @@ -262,6 +301,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("user-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "User") os.Exit(1) @@ -273,6 +313,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("dns-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DNS") os.Exit(1) @@ -284,6 +325,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("ntp-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "NTP") os.Exit(1) @@ -295,6 +337,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("acl-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "AccessControlList") os.Exit(1) @@ -306,6 +349,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("certificate-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Certificate") os.Exit(1) @@ -317,6 +361,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("snmp-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "SNMP") os.Exit(1) @@ -328,6 +373,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("syslog-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Syslog") os.Exit(1) @@ -339,6 +385,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("managementaccess-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ManagementAccess") os.Exit(1) @@ -350,6 +397,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("isis-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ISIS") @@ -362,6 +410,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("pim-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PIM") @@ -374,6 +423,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("bgp-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "BGP") @@ -386,6 +436,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("bgppeer-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "BGPPeer") @@ -398,6 +449,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("ospf-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "OSPF") @@ -410,6 +462,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("vlan-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "VLAN") @@ -422,6 +475,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("vrf-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "VRF") @@ -431,9 +485,10 @@ func main() { if err := (&nxcontroller.VPCDomainReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("vpcdomain-controller"), + Recorder: mgr.GetEventRecorderFor("cisco-nx-vpcdomain-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "VPCDomain") @@ -446,6 +501,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("nve-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, RequeueInterval: requeueInterval, }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "NetworkVirtualizationEdge") @@ -458,6 +514,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("cisco-nx-system-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "System") os.Exit(1) @@ -469,6 +526,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("evpn-instance-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "EVPNInstance") os.Exit(1) @@ -480,6 +538,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("prefixset-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PrefixSet") os.Exit(1) @@ -491,6 +550,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("routingpolicy-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RoutingPolicy") os.Exit(1) @@ -502,6 +562,7 @@ func main() { Recorder: mgr.GetEventRecorderFor("cisco-nx-border-gateway-controller"), WatchFilterValue: watchFilterValue, Provider: prov, + Locker: locker, }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "BorderGateway") os.Exit(1) diff --git a/config/develop/manager_patch.yaml b/config/develop/manager_patch.yaml index 776eaff1..1214a401 100644 --- a/config/develop/manager_patch.yaml +++ b/config/develop/manager_patch.yaml @@ -4,4 +4,5 @@ - --leader-elect=false - --health-probe-bind-address=:8081 - --provider=openconfig - - --requeue-interval=10s + - --requeue-interval=15s + - --max-concurrent-reconciles=5 diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 3ab55f80..a84214da 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -28,6 +28,17 @@ rules: - list - update - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - delete + - get + - list + - update + - watch - apiGroups: - networking.metal.ironcore.dev resources: diff --git a/internal/controller/cisco/nx/bordergateway_controller.go b/internal/controller/cisco/nx/bordergateway_controller.go index a70c971d..255e61a6 100644 --- a/internal/controller/cisco/nx/bordergateway_controller.go +++ b/internal/controller/cisco/nx/bordergateway_controller.go @@ -5,8 +5,10 @@ package nx import ( "context" + "errors" "fmt" "slices" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,6 +33,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" "github.com/ironcore-dev/network-operator/internal/provider/cisco/nxos" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // BorderGatewayReconciler reconciles a BorderGateway object @@ -47,6 +50,9 @@ type BorderGatewayReconciler struct { // Provider is the driver that will be used to create & delete the bordergateway. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=nx.cisco.networking.metal.ironcore.dev,resources=bordergateways,verbs=get;list;watch;create;update;patch;delete @@ -97,6 +103,21 @@ func (r *BorderGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "cisco-nx-border-gateway-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "cisco-nx-border-gateway-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/cisco/nx/suite_test.go b/internal/controller/cisco/nx/suite_test.go index 3e2cc4bc..9c92f18a 100644 --- a/internal/controller/cisco/nx/suite_test.go +++ b/internal/controller/cisco/nx/suite_test.go @@ -14,6 +14,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" @@ -26,10 +27,10 @@ import ( nxv1alpha1 "github.com/ironcore-dev/network-operator/api/cisco/nx/v1alpha1" "github.com/ironcore-dev/network-operator/api/core/v1alpha1" - corecontroller "github.com/ironcore-dev/network-operator/internal/controller/core" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" "github.com/ironcore-dev/network-operator/internal/provider/cisco/nxos" + "github.com/ironcore-dev/network-operator/internal/resourcelock" // +kubebuilder:scaffold:imports ) @@ -43,6 +44,7 @@ var ( k8sClient client.Client k8sManager ctrl.Manager testProvider = NewMockProvider() + testLocker *resourcelock.ResourceLocker ) func TestControllers(t *testing.T) { @@ -98,6 +100,16 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + testLocker, err = resourcelock.NewResourceLocker(k8sManager.GetClient(), metav1.NamespaceDefault, 15*time.Second, 10*time.Second) + Expect(err).NotTo(HaveOccurred()) + + err = k8sManager.Add(testLocker) + Expect(err).NotTo(HaveOccurred()) + + // Set up cache informer for Lease resources used by ResourceLocker + _, err = k8sManager.GetCache().GetInformer(ctx, &coordinationv1.Lease{}) + Expect(err).NotTo(HaveOccurred()) + prov := func() provider.Provider { return testProvider } err = (&SystemReconciler{ @@ -105,6 +117,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -113,6 +126,7 @@ var _ = BeforeSuite(func() { Scheme: scheme.Scheme, Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(ctx, k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -121,15 +135,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, - }).SetupWithManager(ctx, k8sManager) - Expect(err).NotTo(HaveOccurred()) - - err = (&corecontroller.NetworkVirtualizationEdgeReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - Recorder: recorder, - Provider: prov, - RequeueInterval: time.Second, + Locker: testLocker, }).SetupWithManager(ctx, k8sManager) Expect(err).NotTo(HaveOccurred()) diff --git a/internal/controller/cisco/nx/system_controller.go b/internal/controller/cisco/nx/system_controller.go index e8ba6af3..af40a93e 100644 --- a/internal/controller/cisco/nx/system_controller.go +++ b/internal/controller/cisco/nx/system_controller.go @@ -5,7 +5,9 @@ package nx import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -24,6 +26,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // SystemReconciler reconciles a System object @@ -40,6 +43,9 @@ type SystemReconciler struct { // Provider is the driver that will be used to create & delete the system. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=nx.cisco.networking.metal.ironcore.dev,resources=systems,verbs=get;list;watch;create;update;patch;delete @@ -90,6 +96,21 @@ func (r *SystemReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ c return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "cisco-nx-system-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "cisco-nx-system-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/cisco/nx/vpcdomain_controller.go b/internal/controller/cisco/nx/vpcdomain_controller.go index 97f46bc4..e5e80f5b 100644 --- a/internal/controller/cisco/nx/vpcdomain_controller.go +++ b/internal/controller/cisco/nx/vpcdomain_controller.go @@ -5,6 +5,7 @@ package nx import ( "context" + "errors" "fmt" "slices" "time" @@ -28,6 +29,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" nxv1 "github.com/ironcore-dev/network-operator/api/cisco/nx/v1alpha1" corev1 "github.com/ironcore-dev/network-operator/api/core/v1alpha1" @@ -50,6 +52,9 @@ type VPCDomainReconciler struct { // Provider is the driver that will be used to create & delete the vPC Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -107,6 +112,21 @@ func (r *VPCDomainReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "cisco-nx-vpcdomain-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "cisco-nx-vpcdomain-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/acl_controller.go b/internal/controller/core/acl_controller.go index d59377b3..e7b76762 100644 --- a/internal/controller/core/acl_controller.go +++ b/internal/controller/core/acl_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -23,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // AccessControlListReconciler reconciles a AccessControlList object @@ -39,6 +42,9 @@ type AccessControlListReconciler struct { // Provider is the driver that will be used to create & delete the accesscontrollist. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=accesscontrollists,verbs=get;list;watch;create;update;patch;delete @@ -89,6 +95,21 @@ func (r *AccessControlListReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "acl-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "acl-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/banner_controller.go b/internal/controller/core/banner_controller.go index 857db289..f4536ca1 100644 --- a/internal/controller/core/banner_controller.go +++ b/internal/controller/core/banner_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -28,6 +30,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // BannerReconciler reconciles a Banner object @@ -44,6 +47,9 @@ type BannerReconciler struct { // Provider is the driver that will be used to create & delete the banner. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=banners,verbs=get;list;watch;create;update;patch;delete @@ -96,6 +102,21 @@ func (r *BannerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ c return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "banner-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "banner-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/bgp_controller.go b/internal/controller/core/bgp_controller.go index 0a0c214c..5745596f 100644 --- a/internal/controller/core/bgp_controller.go +++ b/internal/controller/core/bgp_controller.go @@ -25,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // BGPReconciler reconciles a BGP object @@ -42,6 +43,9 @@ type BGPReconciler struct { // Provider is the driver that will be used to create & delete the bgp. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -95,6 +99,21 @@ func (r *BGPReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "bgp-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "bgp-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/bgp_peer_controller.go b/internal/controller/core/bgp_peer_controller.go index bd76e6cd..d5b0ca29 100644 --- a/internal/controller/core/bgp_peer_controller.go +++ b/internal/controller/core/bgp_peer_controller.go @@ -30,6 +30,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // BGPPeerReconciler reconciles a BGPPeer object @@ -47,6 +48,9 @@ type BGPPeerReconciler struct { // Provider is the driver that will be used to create & delete the bgppeer. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -100,6 +104,21 @@ func (r *BGPPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "bgppeer-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "bgppeer-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/certificate_controller.go b/internal/controller/core/certificate_controller.go index 2d94eb18..3d694dda 100644 --- a/internal/controller/core/certificate_controller.go +++ b/internal/controller/core/certificate_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -28,6 +30,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // CertificateReconciler reconciles a Certificate object @@ -44,6 +47,9 @@ type CertificateReconciler struct { // Provider is the driver that will be used to create & delete the certificate. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=certificates,verbs=get;list;watch;create;update;patch;delete @@ -95,6 +101,21 @@ func (r *CertificateReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "certificate-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "certificate-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/dns_controller.go b/internal/controller/core/dns_controller.go index e32e1398..b28fd8c3 100644 --- a/internal/controller/core/dns_controller.go +++ b/internal/controller/core/dns_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -23,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // DNSReconciler reconciles a DNS object @@ -39,6 +42,9 @@ type DNSReconciler struct { // Provider is the driver that will be used to create & delete the dns. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=dns,verbs=get;list;watch;create;update;patch;delete @@ -89,6 +95,21 @@ func (r *DNSReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "dns-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "dns-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/evpninstance_controller.go b/internal/controller/core/evpninstance_controller.go index 1de337a5..3520e568 100644 --- a/internal/controller/core/evpninstance_controller.go +++ b/internal/controller/core/evpninstance_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -28,6 +30,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // EVPNInstanceReconciler reconciles a EVPNInstance object @@ -44,6 +47,9 @@ type EVPNInstanceReconciler struct { // Provider is the driver that will be used to create & delete the evpninstance. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=evpninstances,verbs=get;list;watch;create;update;patch;delete @@ -96,6 +102,21 @@ func (r *EVPNInstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "evpn-instance-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "evpn-instance-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/interface_controller.go b/internal/controller/core/interface_controller.go index e23e87e8..5388b9f9 100644 --- a/internal/controller/core/interface_controller.go +++ b/internal/controller/core/interface_controller.go @@ -32,6 +32,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // InterfaceReconciler reconciles a Interface object @@ -49,6 +50,9 @@ type InterfaceReconciler struct { // Provider is the driver that will be used to create & delete the interface. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -105,6 +109,21 @@ func (r *InterfaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "interface-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "interface-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/isis_controller.go b/internal/controller/core/isis_controller.go index f8006407..df539351 100644 --- a/internal/controller/core/isis_controller.go +++ b/internal/controller/core/isis_controller.go @@ -25,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // ISISReconciler reconciles a ISIS object @@ -42,6 +43,9 @@ type ISISReconciler struct { // Provider is the driver that will be used to create & delete the isis. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -95,6 +99,21 @@ func (r *ISISReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctr return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "isis-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "isis-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/managementaccess_controller.go b/internal/controller/core/managementaccess_controller.go index 6fbacf50..d90a52c7 100644 --- a/internal/controller/core/managementaccess_controller.go +++ b/internal/controller/core/managementaccess_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -29,6 +31,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // ManagementAccessReconciler reconciles a ManagementAccess object @@ -45,6 +48,9 @@ type ManagementAccessReconciler struct { // Provider is the driver that will be used to create & delete the managementaccess. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=managementaccesses,verbs=get;list;watch;create;update;patch;delete @@ -95,6 +101,21 @@ func (r *ManagementAccessReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "managementaccess-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "managementaccess-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/ntp_controller.go b/internal/controller/core/ntp_controller.go index 2ec62140..e8aa08ca 100644 --- a/internal/controller/core/ntp_controller.go +++ b/internal/controller/core/ntp_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -23,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // NTPReconciler reconciles a NTP object @@ -39,6 +42,9 @@ type NTPReconciler struct { // Provider is the driver that will be used to create & delete the ntp. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=ntp,verbs=get;list;watch;create;update;patch;delete @@ -89,6 +95,21 @@ func (r *NTPReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "ntp-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "ntp-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/nve_controller.go b/internal/controller/core/nve_controller.go index 643b48cf..ff5011fe 100644 --- a/internal/controller/core/nve_controller.go +++ b/internal/controller/core/nve_controller.go @@ -34,6 +34,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // NetworkVirtualizationEdgeReconciler reconciles a NVE object @@ -51,6 +52,9 @@ type NetworkVirtualizationEdgeReconciler struct { // Provider is the driver that will be used to create & delete the dns. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -99,6 +103,21 @@ func (r *NetworkVirtualizationEdgeReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "nve-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "nve-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/nve_controller_test.go b/internal/controller/core/nve_controller_test.go index 6eddde8e..6a708b5b 100644 --- a/internal/controller/core/nve_controller_test.go +++ b/internal/controller/core/nve_controller_test.go @@ -10,19 +10,14 @@ import ( . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/ironcore-dev/network-operator/api/core/v1alpha1" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const ( - defaultTimeout = 5 * time.Second - defaultPoll = 150 * time.Millisecond - testEndpointAddr = "192.168.10.2:9339" -) +const testEndpointAddr = "192.168.10.2:9339" // Helpers func ensureDevice(deviceKey client.ObjectKey, spec v1alpha1.DeviceSpec) { @@ -86,7 +81,7 @@ func cleanupNVEResources(nveKeys, interfaceKeys, deviceKeys []client.ObjectKey) Expect(k8sClient.Delete(ctx, nve)).To(Succeed()) Eventually(func() bool { return errors.IsNotFound(k8sClient.Get(ctx, nveKey, &v1alpha1.NetworkVirtualizationEdge{})) - }, defaultTimeout, defaultPoll).Should(BeTrue(), "NVE should be fully deleted") + }).Should(BeTrue(), "NVE should be fully deleted") } for _, ifKey := range interfaceKeys { @@ -95,7 +90,7 @@ func cleanupNVEResources(nveKeys, interfaceKeys, deviceKeys []client.ObjectKey) Expect(k8sClient.Delete(ctx, ifObj)).To(Succeed()) Eventually(func() bool { return errors.IsNotFound(k8sClient.Get(ctx, ifKey, &v1alpha1.Interface{})) - }, defaultTimeout, defaultPoll).Should(BeTrue(), "Interface should be fully deleted") + }).Should(BeTrue(), "Interface should be fully deleted") } for _, deviceKey := range deviceKeys { @@ -104,12 +99,12 @@ func cleanupNVEResources(nveKeys, interfaceKeys, deviceKeys []client.ObjectKey) Expect(k8sClient.Delete(ctx, device)).To(Succeed()) Eventually(func() bool { return errors.IsNotFound(k8sClient.Get(ctx, deviceKey, &v1alpha1.Device{})) - }, defaultTimeout, defaultPoll).Should(BeTrue(), "Device should be fully deleted") + }).Should(BeTrue(), "Device should be fully deleted") } By("Ensuring the resource is deleted from the provider") Eventually(func(g Gomega) { g.Expect(testProvider.NVE).To(BeNil(), "Provider NVE should be empty") - }, defaultTimeout, defaultPoll).Should(Succeed()) + }).Should(Succeed()) } var _ = Describe("NVE Controller", func() { @@ -119,9 +114,7 @@ var _ = Describe("NVE Controller", func() { nveName = "test-nve-nve" nsName = metav1.NamespaceDefault ) - var ( - nve *v1alpha1.NetworkVirtualizationEdge - ) + var nve *v1alpha1.NetworkVirtualizationEdge interfaceNames := []string{"lo0", "lo1"} nveKey := client.ObjectKey{Name: nveName, Namespace: nsName} @@ -227,7 +220,6 @@ var _ = Describe("NVE Controller", func() { g.Expect(resource.Status.Conditions[2].Status).To(Equal(metav1.ConditionTrue)) }).Should(Succeed()) }) - }) Context("When updating referenced resources", func() { @@ -236,9 +228,7 @@ var _ = Describe("NVE Controller", func() { nveName = "test-nvewithrefupdates-nve" nsName = metav1.NamespaceDefault ) - var ( - nve *v1alpha1.NetworkVirtualizationEdge - ) + var nve *v1alpha1.NetworkVirtualizationEdge interfaceNames := []string{"lo10", "lo11", "lo12"} deviceKey := client.ObjectKey{Name: deviceName, Namespace: nsName} @@ -295,7 +285,7 @@ var _ = Describe("NVE Controller", func() { g.Expect(testProvider.NVE).ToNot(BeNil()) g.Expect(testProvider.NVE.Spec.SourceInterfaceRef.Name).To(Equal(interfaceNames[1])) g.Expect(testProvider.NVE.Status.SourceInterfaceName).To(Equal(interfaceNames[1])) - }, defaultTimeout, defaultPoll).Should(Succeed()) + }).Should(Succeed()) }) It("Should reconcile when AnycastSourceInterfaceRef is added", func() { @@ -352,7 +342,7 @@ var _ = Describe("NVE Controller", func() { g.Expect(cond).NotTo(BeNil()) g.Expect(cond.Status).To(Equal(metav1.ConditionFalse)) g.Expect(cond.Reason).To(Equal(v1alpha1.WaitingForDependenciesReason)) - }, defaultTimeout, defaultPoll).Should(Succeed()) + }).Should(Succeed()) }) }) @@ -391,7 +381,7 @@ var _ = Describe("NVE Controller", func() { Eventually(func(g Gomega) { g.Expect(testProvider.NVE).NotTo(BeNil()) g.Expect(testProvider.NVE.Spec.AnycastSourceInterfaceRef).To(BeNil()) - }, defaultTimeout, defaultPoll).Should(Succeed()) + }).Should(Succeed()) Eventually(func(g Gomega) { cur := &v1alpha1.NetworkVirtualizationEdge{} @@ -400,7 +390,7 @@ var _ = Describe("NVE Controller", func() { cfg := meta.FindStatusCondition(cur.Status.Conditions, v1alpha1.ConfiguredCondition) g.Expect(cfg).NotTo(BeNil()) g.Expect(cfg.Status).To(Equal(metav1.ConditionTrue)) - }, defaultTimeout, defaultPoll).Should(Succeed()) + }).Should(Succeed()) }) }) @@ -454,7 +444,7 @@ var _ = Describe("NVE Controller", func() { g.Expect(cond).NotTo(BeNil()) g.Expect(cond.Status).To(Equal(metav1.ConditionFalse)) g.Expect(cond.Reason).To(Equal(v1alpha1.NVEAlreadyExistsReason)) - }, defaultTimeout, defaultPoll).Should(Succeed()) + }).Should(Succeed()) }) }) @@ -464,9 +454,7 @@ var _ = Describe("NVE Controller", func() { nveName = "test-nvemisconfigurediftype-nve" nsName = metav1.NamespaceDefault ) - var ( - nve *v1alpha1.NetworkVirtualizationEdge - ) + var nve *v1alpha1.NetworkVirtualizationEdge interfaceNames := []string{"eth1", "eth2"} @@ -661,7 +649,7 @@ var _ = Describe("NVE Controller", func() { g.Expect(cond).NotTo(BeNil()) g.Expect(cond.Status).To(Equal(metav1.ConditionFalse)) g.Expect(cond.Reason).To(Equal(v1alpha1.IncompatibleProviderConfigRef)) - }, defaultTimeout, defaultPoll).Should(Succeed()) + }).Should(Succeed()) }) }) }) diff --git a/internal/controller/core/ospf_controller.go b/internal/controller/core/ospf_controller.go index fa000cd3..c321b97a 100644 --- a/internal/controller/core/ospf_controller.go +++ b/internal/controller/core/ospf_controller.go @@ -29,6 +29,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // OSPFReconciler reconciles a OSPF object @@ -46,6 +47,9 @@ type OSPFReconciler struct { // Provider is the driver that will be used to create & delete the ospf. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -99,6 +103,21 @@ func (r *OSPFReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctr return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "ospf-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "ospf-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/pim_controller.go b/internal/controller/core/pim_controller.go index 2d777b60..380962e2 100644 --- a/internal/controller/core/pim_controller.go +++ b/internal/controller/core/pim_controller.go @@ -25,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // PIMReconciler reconciles a PIM object @@ -42,6 +43,9 @@ type PIMReconciler struct { // Provider is the driver that will be used to create & delete the pim. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -95,6 +99,21 @@ func (r *PIMReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "pim-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "pim-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/prefixset_controller.go b/internal/controller/core/prefixset_controller.go index 75b182c2..7d4853ed 100644 --- a/internal/controller/core/prefixset_controller.go +++ b/internal/controller/core/prefixset_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -23,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // PrefixSetReconciler reconciles a PrefixSet object @@ -39,6 +42,9 @@ type PrefixSetReconciler struct { // Provider is the driver that will be used to create & delete the prefixset. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=prefixsets,verbs=get;list;watch;create;update;patch;delete @@ -89,6 +95,21 @@ func (r *PrefixSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "prefixset-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "prefixset-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/routingpolicy_controller.go b/internal/controller/core/routingpolicy_controller.go index 02d335f0..661a1db9 100644 --- a/internal/controller/core/routingpolicy_controller.go +++ b/internal/controller/core/routingpolicy_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -28,6 +30,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // RoutingPolicyReconciler reconciles a RoutingPolicy object @@ -44,6 +47,9 @@ type RoutingPolicyReconciler struct { // Provider is the driver that will be used to create & delete the routingpolicy. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=routingpolicies,verbs=get;list;watch;create;update;patch;delete @@ -94,6 +100,21 @@ func (r *RoutingPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "routingpolicy-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "routingpolicy-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/routingpolicy_controller_test.go b/internal/controller/core/routingpolicy_controller_test.go index dc8cbd37..e138bbb7 100644 --- a/internal/controller/core/routingpolicy_controller_test.go +++ b/internal/controller/core/routingpolicy_controller_test.go @@ -210,7 +210,7 @@ var _ = Describe("RoutingPolicy Controller", func() { Sequence: 10, Conditions: &v1alpha1.PolicyConditions{ MatchPrefixSet: &v1alpha1.PrefixSetMatchCondition{ - PrefixSetRef: v1alpha1.LocalObjectReference{Name: name}, + PrefixSetRef: v1alpha1.LocalObjectReference{Name: "non-existing-prefixset"}, }, }, Actions: v1alpha1.PolicyActions{ diff --git a/internal/controller/core/snmp_controller.go b/internal/controller/core/snmp_controller.go index 1832a04e..edf2dc40 100644 --- a/internal/controller/core/snmp_controller.go +++ b/internal/controller/core/snmp_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -23,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // SNMPReconciler reconciles a snmp object @@ -39,6 +42,9 @@ type SNMPReconciler struct { // Provider is the driver that will be used to create & delete the snmp. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=snmp,verbs=get;list;watch;create;update;patch;delete @@ -89,6 +95,21 @@ func (r *SNMPReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctr return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "snmp-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "snmp-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/suite_test.go b/internal/controller/core/suite_test.go index 5003e65c..b4723cb6 100644 --- a/internal/controller/core/suite_test.go +++ b/internal/controller/core/suite_test.go @@ -16,6 +16,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -30,6 +31,7 @@ import ( "github.com/ironcore-dev/network-operator/api/core/v1alpha1" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" // +kubebuilder:scaffold:imports ) @@ -43,6 +45,7 @@ var ( k8sClient client.Client k8sManager ctrl.Manager testProvider = NewProvider() + testLocker *resourcelock.ResourceLocker ) func TestControllers(t *testing.T) { @@ -53,6 +56,9 @@ func TestControllers(t *testing.T) { var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + SetDefaultEventuallyTimeout(time.Minute) + SetDefaultEventuallyPollingInterval(time.Second) + ctx, cancel = context.WithCancel(ctrl.SetupSignalHandler()) err := corev1.AddToScheme(scheme.Scheme) @@ -95,6 +101,16 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + testLocker, err = resourcelock.NewResourceLocker(k8sManager.GetClient(), metav1.NamespaceDefault, 15*time.Second, 10*time.Second) + Expect(err).NotTo(HaveOccurred()) + + err = k8sManager.Add(testLocker) + Expect(err).NotTo(HaveOccurred()) + + // Set up cache informer for Lease resources used by ResourceLocker + _, err = k8sManager.GetCache().GetInformer(ctx, &coordinationv1.Lease{}) + Expect(err).NotTo(HaveOccurred()) + prov := func() provider.Provider { return testProvider } err = (&DeviceReconciler{ @@ -111,6 +127,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(ctx, k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -120,6 +137,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -128,6 +146,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -136,6 +155,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -144,6 +164,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -152,6 +173,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -160,6 +182,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -168,6 +191,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -176,6 +200,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -184,6 +209,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -192,6 +218,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -201,6 +228,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -210,6 +238,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -219,6 +248,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -228,6 +258,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -237,6 +268,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -246,6 +278,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -255,6 +288,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(ctx, k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -263,6 +297,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, RequeueInterval: time.Second, }).SetupWithManager(ctx, k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -272,6 +307,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) @@ -280,6 +316,7 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Recorder: recorder, Provider: prov, + Locker: testLocker, }).SetupWithManager(ctx, k8sManager) Expect(err).NotTo(HaveOccurred()) diff --git a/internal/controller/core/syslog_controller.go b/internal/controller/core/syslog_controller.go index d0d112ac..bb12b709 100644 --- a/internal/controller/core/syslog_controller.go +++ b/internal/controller/core/syslog_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -23,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // SyslogReconciler reconciles a Syslog object @@ -39,6 +42,9 @@ type SyslogReconciler struct { // Provider is the driver that will be used to create & delete the syslog. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=syslogs,verbs=get;list;watch;create;update;patch;delete @@ -89,6 +95,21 @@ func (r *SyslogReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ c return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "syslog-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "syslog-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/user_controller.go b/internal/controller/core/user_controller.go index e7738da2..6b1c33f5 100644 --- a/internal/controller/core/user_controller.go +++ b/internal/controller/core/user_controller.go @@ -5,7 +5,9 @@ package core import ( "context" + "errors" "fmt" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -28,6 +30,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // UserReconciler reconciles a User object @@ -44,6 +47,9 @@ type UserReconciler struct { // Provider is the driver that will be used to create & delete the user. Provider provider.ProviderFunc + + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker } // +kubebuilder:rbac:groups=networking.metal.ironcore.dev,resources=users,verbs=get;list;watch;create;update;patch;delete @@ -95,6 +101,21 @@ func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctr return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "user-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "user-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/vlan_controller.go b/internal/controller/core/vlan_controller.go index 8ca0d738..e5081ed0 100644 --- a/internal/controller/core/vlan_controller.go +++ b/internal/controller/core/vlan_controller.go @@ -25,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // VLANReconciler reconciles a VLAN object @@ -42,6 +43,9 @@ type VLANReconciler struct { // Provider is the driver that will be used to create & delete the vlan. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -95,6 +99,21 @@ func (r *VLANReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctr return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "vlan-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "vlan-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/controller/core/vrf_controller.go b/internal/controller/core/vrf_controller.go index 657974a3..8c2869a1 100644 --- a/internal/controller/core/vrf_controller.go +++ b/internal/controller/core/vrf_controller.go @@ -25,6 +25,7 @@ import ( "github.com/ironcore-dev/network-operator/internal/conditions" "github.com/ironcore-dev/network-operator/internal/deviceutil" "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/resourcelock" ) // VRFReconciler reconciles a VRF object @@ -42,6 +43,9 @@ type VRFReconciler struct { // Provider is the driver that will be used to create & delete the isis. Provider provider.ProviderFunc + // Locker is used to synchronize operations on resources targeting the same device. + Locker *resourcelock.ResourceLocker + // RequeueInterval is the duration after which the controller should requeue the reconciliation, // regardless of changes. RequeueInterval time.Duration @@ -97,6 +101,21 @@ func (r *VRFReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl return ctrl.Result{}, err } + if err := r.Locker.AcquireLock(ctx, device.Name, "vrf-controller"); err != nil { + if errors.Is(err, resourcelock.ErrLockAlreadyHeld) { + log.Info("Device is already locked, requeuing reconciliation") + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + log.Error(err, "Failed to acquire device lock") + return ctrl.Result{}, err + } + defer func() { + if err := r.Locker.ReleaseLock(ctx, device.Name, "vrf-controller"); err != nil { + log.Error(err, "Failed to release device lock") + reterr = kerrors.NewAggregate([]error{reterr, err}) + } + }() + conn, err := deviceutil.GetDeviceConnection(ctx, r, device) if err != nil { return ctrl.Result{}, err diff --git a/internal/resourcelock/resourcelock.go b/internal/resourcelock/resourcelock.go new file mode 100644 index 00000000..a2ad00c2 --- /dev/null +++ b/internal/resourcelock/resourcelock.go @@ -0,0 +1,250 @@ +// SPDX-FileCopyrightText: 2026 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +// Package resourcelock provides utilities for locking Kubernetes resources using Leases. +package resourcelock + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;delete + +// ErrLockAlreadyHeld is returned when a lock is held by another locker. +var ErrLockAlreadyHeld = errors.New("resourcelock: lock is held by another locker") + +// ResourceLocker provides methods to acquire and release locks on Kubernetes resources using Leases. +// Locks are implemented using coordination.k8s.io/v1 Lease resources and are automatically +// renewed in the background until the context is cancelled or ReleaseLock is called. +type ResourceLocker struct { + client client.Client + leaseDurationSeconds int32 + renewPeriod time.Duration + namespace string + cancelFuncs sync.Map // map[string]context.CancelFunc +} + +// NewResourceLocker creates a new ResourceLocker with the given configuration. +// The namespace specifies where Lease resources will be created. +// The renewPeriod must be shorter than leaseDuration to ensure the lease is renewed before expiration. +func NewResourceLocker(c client.Client, namespace string, leaseDuration, renewPeriod time.Duration) (*ResourceLocker, error) { + if renewPeriod >= leaseDuration { + return nil, fmt.Errorf("resourcelock: renewPeriod (%v) must be shorter than leaseDuration (%v)", renewPeriod, leaseDuration) + } + return &ResourceLocker{ + client: c, + leaseDurationSeconds: int32(leaseDuration.Seconds()), + renewPeriod: renewPeriod, + namespace: namespace, + }, nil +} + +// AcquireLock tries to acquire a lock on the specified Kubernetes Lease. +// It creates or updates a lease resource and starts a background goroutine to renew +// the lease until the context is cancelled. The name parameter specifies the name of the lease +// resource to use for locking, which identifies the resource being locked. Multiple callers using +// the same name compete for the same lock. The lockerID parameter is a unique identifier for the +// specific lock holder (e.g., reconciler, caller), which identifies who is attempting to acquire +// or currently holds the lock. Returns ErrLockAlreadyHeld if the lock is currently held +// by another locker. +func (rl *ResourceLocker) AcquireLock(ctx context.Context, name, lockerID string) error { + log := ctrl.LoggerFrom(ctx).WithValues("namespace", rl.namespace, "lease", name, "locker", lockerID) + + now := metav1.NewMicroTime(time.Now()) + + lease := &coordinationv1.Lease{} + if err := rl.client.Get(ctx, client.ObjectKey{Namespace: rl.namespace, Name: name}, lease); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("resourcelock: failed to get lease: %w", err) + } + + // Lease doesn't exist, create it + lease = &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: rl.namespace, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID, + LeaseDurationSeconds: &rl.leaseDurationSeconds, + AcquireTime: &now, + RenewTime: &now, + }, + } + + if err := rl.client.Create(ctx, lease); err != nil { + if apierrors.IsAlreadyExists(err) { + log.V(2).Info("Lease was created by another locker") + return ErrLockAlreadyHeld + } + return fmt.Errorf("resourcelock: failed to create lease: %w", err) + } + + log.Info("Lock acquired, lease created") + rl.startRenewal(ctx, name, lockerID) + return nil + } + + // Lease exists - check if we already own it + if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == lockerID { + log.V(2).Info("Lock already held by this locker") + rl.startRenewal(ctx, name, lockerID) + return nil + } + + // Check if lease is still valid (not expired) + if lease.Spec.RenewTime != nil && lease.Spec.LeaseDurationSeconds != nil { + expirationTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) + if time.Now().Before(expirationTime) { + log.V(2).Info("Lock held by another locker", "currentLocker", *lease.Spec.HolderIdentity) + return ErrLockAlreadyHeld + } + } + + previousLocker := "" + if lease.Spec.HolderIdentity != nil { + previousLocker = *lease.Spec.HolderIdentity + } + + lease.Spec.HolderIdentity = &lockerID + lease.Spec.LeaseDurationSeconds = &rl.leaseDurationSeconds + lease.Spec.AcquireTime = &now + lease.Spec.RenewTime = &now + + if err := rl.client.Update(ctx, lease); err != nil { + if apierrors.IsConflict(err) { + log.V(2).Info("Lease was claimed by another locker") + return ErrLockAlreadyHeld + } + return fmt.Errorf("resourcelock: failed to update lease: %w", err) + } + + log.Info("Lock acquired, claimed expired lease", "previousLocker", previousLocker) + rl.startRenewal(ctx, name, lockerID) + return nil +} + +// ReleaseLock releases the lock on the specified Kubernetes Lease. +// The name parameter specifies the name of the lease to release. The lockerID parameter is the +// unique identifier of the lock holder attempting to release the lock. If the current holder +// does not match the provided lockerID, the lease is not deleted. +func (rl *ResourceLocker) ReleaseLock(ctx context.Context, name, lockerID string) error { + log := ctrl.LoggerFrom(ctx).WithValues("namespace", rl.namespace, "lease", name, "locker", lockerID) + + lease := &coordinationv1.Lease{} + if err := rl.client.Get(ctx, client.ObjectKey{Namespace: rl.namespace, Name: name}, lease); err != nil { + if apierrors.IsNotFound(err) { + log.V(2).Info("Lease not found, nothing to release") + return nil + } + return fmt.Errorf("resourcelock: failed to get lease: %w", err) + } + + if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != lockerID { + log.V(2).Info("Not the current locker, skipping release", "currentLocker", *lease.Spec.HolderIdentity) + return nil + } + + if err := rl.client.Delete(ctx, lease); err != nil { + if apierrors.IsNotFound(err) { + log.V(2).Info("Lease already deleted") + return nil + } + return fmt.Errorf("resourcelock: failed to delete lease: %w", err) + } + + if cancel, ok := rl.cancelFuncs.LoadAndDelete(name); ok { + cancel.(context.CancelFunc)() + log.V(2).Info("Stopped renewal goroutine") + } + + log.Info("Lock released") + return nil +} + +// startRenewal starts a background goroutine to renew the lease. +// If a renewal goroutine is already running for this lock, it will be cancelled first. +func (rl *ResourceLocker) startRenewal(ctx context.Context, name, lockerID string) { + if oldCancel, loaded := rl.cancelFuncs.Load(name); loaded { + oldCancel.(context.CancelFunc)() + } + + renewCtx, cancel := context.WithCancel(ctx) + rl.cancelFuncs.Store(name, cancel) + + go rl.renewUntilContextDone(renewCtx, name, lockerID) +} + +// Start implements manager.Runnable and blocks until the context is cancelled. +// When the context is cancelled, it calls Close to cancel all active renewal goroutines. +func (rl *ResourceLocker) Start(ctx context.Context) error { + <-ctx.Done() + rl.Close() + return nil +} + +// Close cancels all active renewal goroutines. +// This should be called when the ResourceLocker is no longer needed to ensure proper cleanup. +func (rl *ResourceLocker) Close() { + rl.cancelFuncs.Range(func(key, value any) bool { + if cancel, ok := value.(context.CancelFunc); ok { + cancel() + } + rl.cancelFuncs.Delete(key) + return true + }) +} + +// renewUntilContextDone periodically renews the lease until the context is cancelled. +func (rl *ResourceLocker) renewUntilContextDone(ctx context.Context, name, lockerID string) { + log := ctrl.LoggerFrom(ctx).WithValues("namespace", rl.namespace, "lease", name, "locker", lockerID) + + ticker := time.NewTicker(rl.renewPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := rl.renewLease(ctx, name, lockerID); err != nil { + if apierrors.IsNotFound(err) { + log.V(2).Info("Lease not found during renewal, stopping renewal goroutine") + return + } + log.Error(err, "Failed to renew lease") + } + case <-ctx.Done(): + return + } + } +} + +// renewLease updates the RenewTime of the lease to extend its validity for the current holder. +func (rl *ResourceLocker) renewLease(ctx context.Context, name, lockerID string) error { + lease := &coordinationv1.Lease{} + if err := rl.client.Get(ctx, client.ObjectKey{Namespace: rl.namespace, Name: name}, lease); err != nil { + return fmt.Errorf("resourcelock: failed to get lease for renewal: %w", err) + } + + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != lockerID { + return fmt.Errorf("resourcelock: no longer the holder of lease %s/%s", rl.namespace, name) + } + + now := metav1.NewMicroTime(time.Now()) + lease.Spec.RenewTime = &now + + if err := rl.client.Update(ctx, lease); err != nil { + return fmt.Errorf("resourcelock: failed to update lease: %w", err) + } + + return nil +} diff --git a/internal/resourcelock/resourcelock_test.go b/internal/resourcelock/resourcelock_test.go new file mode 100644 index 00000000..7baca152 --- /dev/null +++ b/internal/resourcelock/resourcelock_test.go @@ -0,0 +1,475 @@ +// SPDX-FileCopyrightText: 2026 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package resourcelock + +import ( + "context" + "errors" + "testing" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func init() { + utilruntime.Must(coordinationv1.AddToScheme(scheme.Scheme)) +} + +func TestNewResourceLocker(t *testing.T) { + tests := []struct { + name string + namespace string + leaseDuration time.Duration + renewPeriod time.Duration + wantErr bool + }{ + { + name: "valid configuration", + namespace: metav1.NamespaceDefault, + leaseDuration: 15 * time.Second, + renewPeriod: 5 * time.Second, + wantErr: false, + }, + { + name: "renewPeriod equal to leaseDuration", + namespace: metav1.NamespaceDefault, + leaseDuration: 10 * time.Second, + renewPeriod: 10 * time.Second, + wantErr: true, + }, + { + name: "renewPeriod greater than leaseDuration", + namespace: metav1.NamespaceDefault, + leaseDuration: 5 * time.Second, + renewPeriod: 10 * time.Second, + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() + rl, err := NewResourceLocker(client, test.namespace, test.leaseDuration, test.renewPeriod) + + if test.wantErr { + if err == nil { + t.Errorf("NewResourceLocker() expected error but got none") + return + } + return + } + + if err != nil { + t.Errorf("NewResourceLocker() unexpected error = %v", err) + return + } + if rl == nil { + t.Errorf("NewResourceLocker() returned nil locker") + return + } + if rl.namespace != test.namespace { + t.Errorf("NewResourceLocker() namespace = %v, want %v", rl.namespace, test.namespace) + } + if rl.leaseDurationSeconds != int32(test.leaseDuration.Seconds()) { + t.Errorf("NewResourceLocker() leaseDurationSeconds = %v, want %v", rl.leaseDurationSeconds, int32(test.leaseDuration.Seconds())) + } + if rl.renewPeriod != test.renewPeriod { + t.Errorf("NewResourceLocker() renewPeriod = %v, want %v", rl.renewPeriod, test.renewPeriod) + } + }) + } +} + +func TestAcquireLock_CreateNew(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + leaseName := "test-lease" + lockerID := "locker-1" + + if err := rl.AcquireLock(ctx, leaseName, lockerID); err != nil { + t.Errorf("AcquireLock() error = %v", err) + return + } + + lease := &coordinationv1.Lease{} + key := types.NamespacedName{Namespace: metav1.NamespaceDefault, Name: leaseName} + if err := client.Get(ctx, key, lease); err != nil { + t.Fatalf("Get lease error = %v", err) + } + + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != lockerID { + t.Errorf("Lease holder = %v, want %v", lease.Spec.HolderIdentity, lockerID) + } + + if lease.Spec.LeaseDurationSeconds == nil || *lease.Spec.LeaseDurationSeconds != 15 { + t.Errorf("Lease duration = %v, want %v", lease.Spec.LeaseDurationSeconds, 15) + } +} + +func TestAcquireLock_AlreadyOwned(t *testing.T) { + lockerID := "locker-1" + leaseName := "test-lease" + + now := metav1.NewMicroTime(time.Now()) + leaseDuration := int32(15) + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID, + LeaseDurationSeconds: &leaseDuration, + AcquireTime: &now, + RenewTime: &now, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(lease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + if err := rl.AcquireLock(ctx, leaseName, lockerID); err != nil { + t.Errorf("AcquireLock() error = %v, expected success when already owned", err) + } +} + +func TestAcquireLock_HeldByAnother(t *testing.T) { + lockerID1 := "locker-1" + lockerID2 := "locker-2" + leaseName := "test-lease" + + now := metav1.NewMicroTime(time.Now()) + leaseDuration := int32(15) + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID1, + LeaseDurationSeconds: &leaseDuration, + AcquireTime: &now, + RenewTime: &now, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(lease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + if err := rl.AcquireLock(ctx, leaseName, lockerID2); !errors.Is(err, ErrLockAlreadyHeld) { + t.Errorf("AcquireLock() error = %v, want %v", err, ErrLockAlreadyHeld) + } +} + +func TestAcquireLock_ClaimExpired(t *testing.T) { + lockerID1 := "locker-1" + lockerID2 := "locker-2" + leaseName := "test-lease" + + // Create an expired lease (renewed 20 seconds ago with 15 second duration) + expiredTime := metav1.NewMicroTime(time.Now().Add(-20 * time.Second)) + leaseDuration := int32(15) + existingLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID1, + LeaseDurationSeconds: &leaseDuration, + AcquireTime: &expiredTime, + RenewTime: &expiredTime, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(existingLease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + if err := rl.AcquireLock(ctx, leaseName, lockerID2); err != nil { + t.Errorf("AcquireLock() error = %v, expected to claim expired lease", err) + return + } + + lease := &coordinationv1.Lease{} + key := types.NamespacedName{Namespace: metav1.NamespaceDefault, Name: leaseName} + if err := client.Get(ctx, key, lease); err != nil { + t.Fatalf("Get lease error = %v", err) + } + + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != lockerID2 { + t.Errorf("Lease holder = %v, want %v", lease.Spec.HolderIdentity, lockerID2) + } +} + +func TestReleaseLock(t *testing.T) { + lockerID := "locker-1" + leaseName := "test-lease" + + now := metav1.NewMicroTime(time.Now()) + leaseDuration := int32(15) + existingLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID, + LeaseDurationSeconds: &leaseDuration, + AcquireTime: &now, + RenewTime: &now, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(existingLease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + if err := rl.ReleaseLock(ctx, leaseName, lockerID); err != nil { + t.Errorf("ReleaseLock() error = %v", err) + return + } + + lease := &coordinationv1.Lease{} + key := types.NamespacedName{Namespace: metav1.NamespaceDefault, Name: leaseName} + if err := client.Get(ctx, key, lease); !apierrors.IsNotFound(err) { + t.Errorf("Expected lease to be deleted, but got error = %v", err) + } +} + +func TestReleaseLock_NotOwned(t *testing.T) { + lockerID1 := "locker-1" + lockerID2 := "locker-2" + leaseName := "test-lease" + + now := metav1.NewMicroTime(time.Now()) + leaseDuration := int32(15) + existingLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID1, + LeaseDurationSeconds: &leaseDuration, + AcquireTime: &now, + RenewTime: &now, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(existingLease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + if err := rl.ReleaseLock(ctx, leaseName, lockerID2); err != nil { + t.Errorf("ReleaseLock() error = %v, expected success (noop) when not owned", err) + return + } + + lease := &coordinationv1.Lease{} + key := types.NamespacedName{Namespace: metav1.NamespaceDefault, Name: leaseName} + if err := client.Get(ctx, key, lease); err != nil { + t.Fatalf("Get lease error = %v", err) + } + + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != lockerID1 { + t.Errorf("Lease holder = %v, want %v", lease.Spec.HolderIdentity, lockerID1) + } +} + +func TestReleaseLock_NotFound(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + if err := rl.ReleaseLock(ctx, "non-existent-lease", "locker-1"); err != nil { + t.Errorf("ReleaseLock() error = %v, expected success (noop) when lease not found", err) + } +} + +func TestRenewLease(t *testing.T) { + lockerID := "locker-1" + leaseName := "test-lease" + + oldTime := metav1.NewMicroTime(time.Now().Add(-10 * time.Second)) + leaseDuration := int32(15) + existingLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID, + LeaseDurationSeconds: &leaseDuration, + AcquireTime: &oldTime, + RenewTime: &oldTime, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(existingLease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + beforeRenew := time.Now() + if err := rl.renewLease(ctx, leaseName, lockerID); err != nil { + t.Errorf("renewLease() error = %v", err) + return + } + + lease := &coordinationv1.Lease{} + key := types.NamespacedName{Namespace: metav1.NamespaceDefault, Name: leaseName} + if err := client.Get(ctx, key, lease); err != nil { + t.Fatalf("Get lease error = %v", err) + } + + if lease.Spec.RenewTime == nil { + t.Errorf("Lease RenewTime is nil") + return + } + + if lease.Spec.RenewTime.Time.Before(beforeRenew) { + t.Errorf("Lease RenewTime = %v, want after %v", lease.Spec.RenewTime.Time, beforeRenew) + } +} + +func TestRenewLease_NotOwned(t *testing.T) { + lockerID1 := "locker-1" + lockerID2 := "locker-2" + leaseName := "test-lease" + + now := metav1.NewMicroTime(time.Now()) + leaseDuration := int32(15) + existingLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID1, + LeaseDurationSeconds: &leaseDuration, + AcquireTime: &now, + RenewTime: &now, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(existingLease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 5*time.Second) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + + ctx := context.Background() + if err := rl.renewLease(ctx, leaseName, lockerID2); err == nil { + t.Errorf("renewLease() expected error when not owner, got nil") + } +} + +func TestReleaseLock_CancelsRenewalGoroutine(t *testing.T) { + leaseName := "test-lease" + lockerID := "test-locker" + + leaseDuration := int32(15) + existingLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: metav1.NamespaceDefault, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &lockerID, + LeaseDurationSeconds: &leaseDuration, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithObjects(existingLease). + Build() + + rl, err := NewResourceLocker(client, metav1.NamespaceDefault, 15*time.Second, 100*time.Millisecond) + if err != nil { + t.Fatalf("NewResourceLocker() error = %v", err) + } + defer rl.Close() + + ctx := context.Background() + + if err := rl.AcquireLock(ctx, leaseName, lockerID); err != nil { + t.Fatalf("AcquireLock() error = %v", err) + } + + if _, exists := rl.cancelFuncs.Load(leaseName); !exists { + t.Error("Expected cancel func to be stored after AcquireLock") + } + + if err := rl.ReleaseLock(ctx, leaseName, lockerID); err != nil { + t.Fatalf("ReleaseLock() error = %v", err) + } + + if _, exists := rl.cancelFuncs.Load(leaseName); exists { + t.Error("Expected cancel func to be removed after ReleaseLock") + } +}