diff --git a/cmd/hubagent/main.go b/cmd/hubagent/main.go index 42409c0ea..d69d1d867 100644 --- a/cmd/hubagent/main.go +++ b/cmd/hubagent/main.go @@ -46,6 +46,8 @@ import ( "github.com/kubefleet-dev/kubefleet/cmd/hubagent/options" "github.com/kubefleet-dev/kubefleet/cmd/hubagent/workload" mcv1beta1 "github.com/kubefleet-dev/kubefleet/pkg/controllers/membercluster/v1beta1" + readiness "github.com/kubefleet-dev/kubefleet/pkg/utils/informer/readiness" + "github.com/kubefleet-dev/kubefleet/pkg/utils/validator" "github.com/kubefleet-dev/kubefleet/pkg/webhook" // +kubebuilder:scaffold:imports ) @@ -165,7 +167,17 @@ func main() { ctx := ctrl.SetupSignalHandler() if err := workload.SetupControllers(ctx, &wg, mgr, config, opts); err != nil { - klog.ErrorS(err, "unable to set up ready check") + klog.ErrorS(err, "unable to set up controllers") + exitWithErrorFunc() + } + + // Add readiness check for dynamic informer cache AFTER controllers are set up. + // This ensures the discovery cache is populated before the hub agent is marked ready, + // which is critical for all controllers that rely on dynamic resource discovery. + // AddReadyzCheck adds additional readiness check instead of replacing the one registered earlier provided the name is different. + // Both registered checks need to pass for the manager to be considered ready. + if err := mgr.AddReadyzCheck("informer-cache", readiness.InformerReadinessChecker(validator.ResourceInformer)); err != nil { + klog.ErrorS(err, "unable to set up informer cache readiness check") exitWithErrorFunc() } diff --git a/pkg/utils/informer/informermanager.go b/pkg/utils/informer/informermanager.go index 4ea1a5143..53da3343a 100644 --- a/pkg/utils/informer/informermanager.go +++ b/pkg/utils/informer/informermanager.go @@ -61,6 +61,9 @@ type Manager interface { // GetNameSpaceScopedResources returns the list of namespace scoped resources we are watching. GetNameSpaceScopedResources() []schema.GroupVersionResource + // GetAllResources returns the list of all resources (both cluster-scoped and namespace-scoped) we are watching. + GetAllResources() []schema.GroupVersionResource + // IsClusterScopedResources returns if a resource is cluster scoped. IsClusterScopedResources(resource schema.GroupVersionKind) bool @@ -224,6 +227,19 @@ func (s *informerManagerImpl) GetNameSpaceScopedResources() []schema.GroupVersio return res } +func (s *informerManagerImpl) GetAllResources() []schema.GroupVersionResource { + s.resourcesLock.RLock() + defer s.resourcesLock.RUnlock() + + res := make([]schema.GroupVersionResource, 0, len(s.apiResources)) + for _, resource := range s.apiResources { + if resource.isPresent { + res = append(res, resource.GroupVersionResource) + } + } + return res +} + func (s *informerManagerImpl) IsClusterScopedResources(gvk schema.GroupVersionKind) bool { s.resourcesLock.RLock() defer s.resourcesLock.RUnlock() diff --git a/pkg/utils/informer/informermanager_test.go b/pkg/utils/informer/informermanager_test.go new file mode 100644 index 000000000..53f2ce74a --- /dev/null +++ b/pkg/utils/informer/informermanager_test.go @@ -0,0 +1,289 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informer + +import ( + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" +) + +func TestGetAllResources(t *testing.T) { + tests := []struct { + name string + namespaceScopedResources []APIResourceMeta + clusterScopedResources []APIResourceMeta + staticResources []APIResourceMeta + expectedResourceCount int + expectedNamespacedCount int + }{ + { + name: "mixed cluster and namespace scoped resources", + namespaceScopedResources: []APIResourceMeta{ + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "ConfigMap", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "configmaps", + }, + IsClusterScoped: false, + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Secret", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "secrets", + }, + IsClusterScoped: false, + }, + }, + clusterScopedResources: []APIResourceMeta{ + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Namespace", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + IsClusterScoped: true, + }, + }, + staticResources: []APIResourceMeta{ + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Node", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "nodes", + }, + IsClusterScoped: true, + isStaticResource: true, + }, + }, + expectedResourceCount: 4, // All resources including static + expectedNamespacedCount: 2, // Only namespace-scoped, excluding static + }, + { + name: "no resources", + expectedResourceCount: 0, + expectedNamespacedCount: 0, + }, + { + name: "only namespace scoped resources", + namespaceScopedResources: []APIResourceMeta{ + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + }, + IsClusterScoped: false, + }, + }, + expectedResourceCount: 1, + expectedNamespacedCount: 1, + }, + { + name: "only cluster scoped resources", + clusterScopedResources: []APIResourceMeta{ + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Kind: "ClusterRole", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "clusterroles", + }, + IsClusterScoped: true, + }, + }, + expectedResourceCount: 1, + expectedNamespacedCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a fake dynamic client + fakeClient := fake.NewSimpleDynamicClient(scheme.Scheme) + stopCh := make(chan struct{}) + defer close(stopCh) + + mgr := NewInformerManager(fakeClient, 0, stopCh) + implMgr := mgr.(*informerManagerImpl) + + // Add namespace-scoped resources + for _, res := range tt.namespaceScopedResources { + res.isPresent = true + implMgr.apiResources[res.GroupVersionKind] = &res + } + + // Add cluster-scoped resources + for _, res := range tt.clusterScopedResources { + res.isPresent = true + implMgr.apiResources[res.GroupVersionKind] = &res + } + + // Add static resources + for _, res := range tt.staticResources { + res.isPresent = true + implMgr.apiResources[res.GroupVersionKind] = &res + } + + // Test GetAllResources + allResources := mgr.GetAllResources() + if got := len(allResources); got != tt.expectedResourceCount { + t.Errorf("GetAllResources() returned %d resources, want %d", got, tt.expectedResourceCount) + } + + // Verify all expected resources are present + resourceMap := make(map[schema.GroupVersionResource]bool) + for _, gvr := range allResources { + resourceMap[gvr] = true + } + + for _, res := range tt.namespaceScopedResources { + if !resourceMap[res.GroupVersionResource] { + t.Errorf("namespace-scoped resource %v should be in GetAllResources", res.GroupVersionResource) + } + } + + for _, res := range tt.clusterScopedResources { + if !resourceMap[res.GroupVersionResource] { + t.Errorf("cluster-scoped resource %v should be in GetAllResources", res.GroupVersionResource) + } + } + + for _, res := range tt.staticResources { + if !resourceMap[res.GroupVersionResource] { + t.Errorf("static resource %v should be in GetAllResources", res.GroupVersionResource) + } + } + + // Test GetNameSpaceScopedResources + namespacedResources := mgr.GetNameSpaceScopedResources() + if got := len(namespacedResources); got != tt.expectedNamespacedCount { + t.Errorf("GetNameSpaceScopedResources() returned %d resources, want %d", got, tt.expectedNamespacedCount) + } + + // Verify only namespace-scoped, non-static resources are present + namespacedMap := make(map[schema.GroupVersionResource]bool) + for _, gvr := range namespacedResources { + namespacedMap[gvr] = true + } + + for _, res := range tt.namespaceScopedResources { + if !namespacedMap[res.GroupVersionResource] { + t.Errorf("namespace-scoped resource %v should be in GetNameSpaceScopedResources", res.GroupVersionResource) + } + } + + // Verify cluster-scoped and static resources are NOT in namespace-scoped list + for _, res := range tt.clusterScopedResources { + if namespacedMap[res.GroupVersionResource] { + t.Errorf("cluster-scoped resource %v should NOT be in GetNameSpaceScopedResources", res.GroupVersionResource) + } + } + + for _, res := range tt.staticResources { + if namespacedMap[res.GroupVersionResource] { + t.Errorf("static resource %v should NOT be in GetNameSpaceScopedResources", res.GroupVersionResource) + } + } + }) + } +} + +func TestGetAllResources_NotPresent(t *testing.T) { + // Test that resources marked as not present are excluded + fakeClient := fake.NewSimpleDynamicClient(scheme.Scheme) + stopCh := make(chan struct{}) + defer close(stopCh) + + mgr := NewInformerManager(fakeClient, 0, stopCh) + implMgr := mgr.(*informerManagerImpl) + + // Add a resource that is present + presentRes := APIResourceMeta{ + GroupVersionKind: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "ConfigMap", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "configmaps", + }, + IsClusterScoped: false, + isPresent: true, + } + implMgr.apiResources[presentRes.GroupVersionKind] = &presentRes + + // Add a resource that is NOT present (deleted) + notPresentRes := APIResourceMeta{ + GroupVersionKind: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Secret", + }, + GroupVersionResource: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "secrets", + }, + IsClusterScoped: false, + isPresent: false, + } + implMgr.apiResources[notPresentRes.GroupVersionKind] = ¬PresentRes + + allResources := mgr.GetAllResources() + if got := len(allResources); got != 1 { + t.Fatalf("GetAllResources() returned %d resources, want 1 (should only return present resources)", got) + } + if got := allResources[0]; got != presentRes.GroupVersionResource { + t.Errorf("GetAllResources()[0] = %v, want %v", got, presentRes.GroupVersionResource) + } +} diff --git a/pkg/utils/informer/readiness/readiness.go b/pkg/utils/informer/readiness/readiness.go new file mode 100644 index 000000000..23e12cf86 --- /dev/null +++ b/pkg/utils/informer/readiness/readiness.go @@ -0,0 +1,60 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readiness + +import ( + "fmt" + "net/http" + + "github.com/kubefleet-dev/kubefleet/pkg/utils/informer" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" +) + +// InformerReadinessChecker creates a readiness check function that verifies +// all resource informer caches are synced before marking the pod as ready. +// This prevents components from processing requests before the discovery cache is populated. +func InformerReadinessChecker(resourceInformer informer.Manager) func(*http.Request) error { + return func(_ *http.Request) error { + if resourceInformer == nil { + return fmt.Errorf("resource informer not initialized") + } + + // Require ALL informer caches to be synced before marking ready + allResources := resourceInformer.GetAllResources() + if len(allResources) == 0 { + // This can happen during startup when the ResourceInformer is created but the ChangeDetector + // hasn't discovered and registered any resources yet via AddDynamicResources(). + return fmt.Errorf("resource informer not ready: no resources registered") + } + + // Check that ALL informers have synced + unsyncedResources := []schema.GroupVersionResource{} + for _, gvr := range allResources { + if !resourceInformer.IsInformerSynced(gvr) { + unsyncedResources = append(unsyncedResources, gvr) + } + } + + if len(unsyncedResources) > 0 { + return fmt.Errorf("resource informer not ready: %d/%d informers not synced yet", len(unsyncedResources), len(allResources)) + } + + klog.V(5).InfoS("All resource informers synced", "totalInformers", len(allResources)) + return nil + } +} diff --git a/pkg/utils/informer/readiness/readiness_test.go b/pkg/utils/informer/readiness/readiness_test.go new file mode 100644 index 000000000..973324fdc --- /dev/null +++ b/pkg/utils/informer/readiness/readiness_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readiness + +import ( + "strings" + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + + "github.com/kubefleet-dev/kubefleet/pkg/utils/informer" + testinformer "github.com/kubefleet-dev/kubefleet/test/utils/informer" +) + +func TestReadinessChecker(t *testing.T) { + tests := []struct { + name string + resourceInformer informer.Manager + expectError bool + errorContains string + }{ + { + name: "nil informer", + resourceInformer: nil, + expectError: true, + errorContains: "resource informer not initialized", + }, + { + name: "no resources registered", + resourceInformer: &testinformer.FakeManager{ + APIResources: map[schema.GroupVersionKind]bool{}, + }, + expectError: true, + errorContains: "no resources registered", + }, + { + name: "all informers synced", + resourceInformer: &testinformer.FakeManager{ + APIResources: map[schema.GroupVersionKind]bool{ + {Group: "", Version: "v1", Kind: "ConfigMap"}: true, // this boolean is ignored + {Group: "", Version: "v1", Kind: "Secret"}: true, + {Group: "", Version: "v1", Kind: "Namespace"}: true, + }, + InformerSynced: ptr.To(true), // this makes all informers synced + }, + expectError: false, + }, + { + name: "some informers not synced", + resourceInformer: &testinformer.FakeManager{ + APIResources: map[schema.GroupVersionKind]bool{ + {Group: "", Version: "v1", Kind: "ConfigMap"}: false, // this boolean is ignored + {Group: "", Version: "v1", Kind: "Secret"}: false, + {Group: "", Version: "v1", Kind: "Namespace"}: false, + }, + IsClusterScopedResource: true, + InformerSynced: ptr.To(false), // this makes all informers not synced + }, + expectError: true, + errorContains: "informers not synced yet", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + checker := InformerReadinessChecker(tt.resourceInformer) + err := checker(nil) + + if tt.expectError { + if err == nil { + t.Errorf("ReadinessChecker() expected error, got nil") + } + if tt.errorContains != "" && err != nil { + if got := err.Error(); !strings.Contains(got, tt.errorContains) { + t.Errorf("error message should contain %q, got: %s", tt.errorContains, got) + } + } + } else { + if err != nil { + t.Errorf("ReadinessChecker() unexpected error: %v", err) + } + } + }) + } +} + +func TestReadinessChecker_NoneSync(t *testing.T) { + // Test the case where we have multiple resources but none are synced + mockManager := &testinformer.FakeManager{ + APIResources: map[schema.GroupVersionKind]bool{ + {Group: "", Version: "v1", Kind: "ConfigMap"}: false, // this boolean is ignored + {Group: "", Version: "v1", Kind: "Secret"}: false, + {Group: "apps", Version: "v1", Kind: "Deployment"}: false, + {Group: "", Version: "v1", Kind: "Namespace"}: false, + }, + InformerSynced: ptr.To(false), // this makes all informers not synced + } + + checker := InformerReadinessChecker(mockManager) + err := checker(nil) + + if err == nil { + t.Fatal("ReadinessChecker() should return error when no informers are synced") + } + if got := err.Error(); !strings.Contains(got, "informers not synced yet") { + t.Errorf("error message should contain 'informers not synced yet', got: %s", got) + } + // Should report 4 unsynced + if got := err.Error(); !strings.Contains(got, "4/4") { + t.Errorf("error message should contain '4/4', got: %s", got) + } +} + +func TestReadinessChecker_AllSyncedMultipleResources(t *testing.T) { + // Test with many resources all synced + mockManager := &testinformer.FakeManager{ + APIResources: map[schema.GroupVersionKind]bool{ + {Group: "", Version: "v1", Kind: "ConfigMap"}: true, // this boolean is ignored + {Group: "", Version: "v1", Kind: "Secret"}: true, + {Group: "", Version: "v1", Kind: "Service"}: true, + {Group: "apps", Version: "v1", Kind: "Deployment"}: true, + {Group: "apps", Version: "v1", Kind: "StatefulSet"}: true, + {Group: "", Version: "v1", Kind: "Namespace"}: true, + {Group: "rbac.authorization.k8s.io", Version: "v1", Kind: "ClusterRole"}: true, + }, + InformerSynced: ptr.To(true), // this makes all informers synced + } + + checker := InformerReadinessChecker(mockManager) + err := checker(nil) + + if err != nil { + t.Errorf("ReadinessChecker() unexpected error when all informers are synced: %v", err) + } +} diff --git a/test/utils/informer/manager.go b/test/utils/informer/manager.go index b67c8fc6a..fde757292 100644 --- a/test/utils/informer/manager.go +++ b/test/utils/informer/manager.go @@ -160,6 +160,21 @@ func (m *FakeManager) GetNameSpaceScopedResources() []schema.GroupVersionResourc return m.NamespaceScopedResources } +func (m *FakeManager) GetAllResources() []schema.GroupVersionResource { + allResources := make([]schema.GroupVersionResource, 0, len(m.APIResources)) + for gvk := range m.APIResources { + // Return a GVR with the same Group/Version and Kind as Resource + // The actual resource name doesn't matter since IsInformerSynced ignores the GVR parameter + gvr := schema.GroupVersionResource{ + Group: gvk.Group, + Version: gvk.Version, + Resource: gvk.Kind, + } + allResources = append(allResources, gvr) + } + return allResources +} + func (m *FakeManager) IsClusterScopedResources(gvk schema.GroupVersionKind) bool { return m.APIResources[gvk] == m.IsClusterScopedResource }