@@ -24,11 +24,14 @@ import (
2424 "os"
2525 "path/filepath"
2626 "strings"
27+ "time"
2728
29+ apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
2830 "k8s.io/apimachinery/pkg/api/errors"
2931 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3032 "k8s.io/apimachinery/pkg/types"
3133 k8sclient "k8s.io/client-go/kubernetes"
34+ restclient "k8s.io/client-go/rest"
3235 "k8s.io/klog/v2"
3336 kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
3437
@@ -48,10 +51,14 @@ import (
4851)
4952
5053const (
51- // TopologyManagerPolicyAttributeName represents an attribute which defines Topology Manager Policy
54+ // TopologyManagerPolicyAttributeName represents an attribute which defines
55+ // Topology Manager Policy
5256 TopologyManagerPolicyAttributeName = "topologyManagerPolicy"
53- // TopologyManagerScopeAttributeName represents an attribute which defines Topology Manager Policy Scope
57+ // TopologyManagerScopeAttributeName represents an attribute which defines
58+ // Topology Manager Policy Scope
5459 TopologyManagerScopeAttributeName = "topologyManagerScope"
60+ // NodeResourceTopologyCRDName is the name of the NodeResourceTopology CRD
61+ NodeResourceTopologyCRDName = "noderesourcetopologies.topology.node.k8s.io"
5562)
5663
5764// Args are the command line arguments
@@ -141,6 +148,26 @@ func (w *nfdTopologyUpdater) Healthz(writer http.ResponseWriter, _ *http.Request
141148func (w * nfdTopologyUpdater ) Run () error {
142149 klog .InfoS ("Node Feature Discovery Topology Updater" , "version" , version .Get (), "nodeName" , w .nodeName )
143150
151+ // Start HTTP server early so health probes work during initialization.
152+ // This is important because we may wait for the CRD to be available.
153+ httpMux := http .NewServeMux ()
154+ httpMux .HandleFunc ("/healthz" , w .Healthz )
155+
156+ // Register to metrics server
157+ promRegistry := prometheus .NewRegistry ()
158+ promRegistry .MustRegister (
159+ buildInfo ,
160+ scanErrors )
161+ httpMux .Handle ("/metrics" , promhttp .HandlerFor (promRegistry , promhttp.HandlerOpts {}))
162+ registerVersion (version .Get ())
163+
164+ httpServer := http.Server {Addr : fmt .Sprintf (":%d" , w .args .Port ), Handler : httpMux }
165+ go func () {
166+ klog .InfoS ("http server starting" , "port" , httpServer .Addr )
167+ klog .InfoS ("http server stopped" , "exitCode" , httpServer .ListenAndServe ())
168+ }()
169+ defer httpServer .Close () // nolint: errcheck
170+
144171 podResClient , err := podres .GetPodResClient (w .resourcemonitorArgs .PodResourceSocketPath )
145172 if err != nil {
146173 return fmt .Errorf ("failed to get PodResource Client: %w" , err )
@@ -152,7 +179,7 @@ func (w *nfdTopologyUpdater) Run() error {
152179 }
153180 topoClient , err := topologyclientset .NewForConfig (kubeconfig )
154181 if err != nil {
155- return nil
182+ return fmt . Errorf ( "failed to create topology client: %w" , err )
156183 }
157184 w .topoClient = topoClient
158185
@@ -162,20 +189,17 @@ func (w *nfdTopologyUpdater) Run() error {
162189 }
163190 w .k8sClient = k8sClient
164191
192+ // Wait for the NodeResourceTopology CRD to be available before proceeding.
193+ // This handles race conditions during deployment and scenarios where the
194+ // CRD is installed after the topology-updater.
195+ if err := waitForNodeResourceTopologyCRD (kubeconfig , w .stop ); err != nil {
196+ return err
197+ }
198+
165199 if err := w .configure (); err != nil {
166200 return fmt .Errorf ("faild to configure Node Feature Discovery Topology Updater: %w" , err )
167201 }
168202
169- httpMux := http .NewServeMux ()
170-
171- // Register to metrics server
172- promRegistry := prometheus .NewRegistry ()
173- promRegistry .MustRegister (
174- buildInfo ,
175- scanErrors )
176- httpMux .Handle ("/metrics" , promhttp .HandlerFor (promRegistry , promhttp.HandlerOpts {}))
177- registerVersion (version .Get ())
178-
179203 var resScan resourcemonitor.ResourcesScanner
180204
181205 resScan , err = resourcemonitor .NewPodResourcesScanner (w .resourcemonitorArgs .Namespace , podResClient , k8sClient , w .resourcemonitorArgs .PodSetFingerprint )
@@ -194,17 +218,6 @@ func (w *nfdTopologyUpdater) Run() error {
194218 return fmt .Errorf ("failed to obtain node resource information: %w" , err )
195219 }
196220
197- // Register health probe (at this point we're "ready and live")
198- httpMux .HandleFunc ("/healthz" , w .Healthz )
199-
200- // Start HTTP server
201- httpServer := http.Server {Addr : fmt .Sprintf (":%d" , w .args .Port ), Handler : httpMux }
202- go func () {
203- klog .InfoS ("http server starting" , "port" , httpServer .Addr )
204- klog .InfoS ("http server stopped" , "exitCode" , httpServer .ListenAndServe ())
205- }()
206- defer httpServer .Close () // nolint: errcheck
207-
208221 for {
209222 select {
210223 case info := <- w .eventSource :
@@ -282,6 +295,12 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
282295 updateAttributes (& nrtNew .Attributes , scanResponse .Attributes )
283296
284297 if _ , err := w .topoClient .TopologyV1alpha2 ().NodeResourceTopologies ().Create (context .TODO (), & nrtNew , metav1.CreateOptions {}); err != nil {
298+ if errors .IsNotFound (err ) {
299+ return fmt .Errorf ("failed to create NodeResourceTopology: %w. " +
300+ "The NodeResourceTopology CRD may not be installed. " +
301+ "If using Helm, ensure 'topologyUpdater.createCRDs=true' is set" ,
302+ err )
303+ }
285304 return fmt .Errorf ("failed to create NodeResourceTopology: %w" , err )
286305 }
287306 return nil
@@ -380,6 +399,66 @@ func (w *nfdTopologyUpdater) configure() error {
380399 return nil
381400}
382401
402+ // waitForNodeResourceTopologyCRD waits for the NodeResourceTopology CRD to be
403+ // available in the cluster. This handles race conditions during deployment and
404+ // scenarios where the CRD is installed after the topology-updater pods start.
405+ // The function will retry indefinitely until the CRD is found or the stop
406+ // channel is closed.
407+ func waitForNodeResourceTopologyCRD (config * restclient.Config , stop <- chan struct {}) error {
408+ const (
409+ initialBackoff = 5 * time .Second
410+ maxBackoff = 60 * time .Second
411+ )
412+
413+ apiextClient , err := apiextensionsclient .NewForConfig (config )
414+ if err != nil {
415+ klog .V (2 ).InfoS ("unable to create apiextensions client for CRD check, skipping wait" ,
416+ "error" , err )
417+ // Don't block startup, the error will be caught later when creating NRT
418+ return nil
419+ }
420+
421+ backoff := initialBackoff
422+ for {
423+ _ , err = apiextClient .ApiextensionsV1 ().CustomResourceDefinitions ().Get (
424+ context .TODO (), NodeResourceTopologyCRDName , metav1.GetOptions {})
425+ if err == nil {
426+ klog .InfoS ("NodeResourceTopology CRD is available" ,
427+ "crd" , NodeResourceTopologyCRDName )
428+ return nil
429+ }
430+
431+ // If we don't have permission to check CRDs, skip waiting and let the
432+ // actual NRT creation fail with a more descriptive error
433+ if errors .IsForbidden (err ) {
434+ klog .V (2 ).InfoS ("no permission to check CRD existence, skipping wait" ,
435+ "crd" , NodeResourceTopologyCRDName , "error" , err )
436+ return nil
437+ }
438+
439+ if errors .IsNotFound (err ) {
440+ klog .InfoS ("waiting for NodeResourceTopology CRD to be created. " +
441+ "If using Helm, ensure 'topologyUpdater.createCRDs=true' is set" ,
442+ "crd" , NodeResourceTopologyCRDName , "retryIn" , backoff )
443+ } else {
444+ klog .V (2 ).InfoS ("error checking for CRD, will retry" ,
445+ "crd" , NodeResourceTopologyCRDName , "error" , err , "retryIn" , backoff )
446+ }
447+
448+ select {
449+ case <- stop :
450+ return fmt .Errorf ("stopped while waiting for CRD %q" ,
451+ NodeResourceTopologyCRDName )
452+ case <- time .After (backoff ):
453+ // Exponential backoff with max cap
454+ backoff *= 2
455+ if backoff > maxBackoff {
456+ backoff = maxBackoff
457+ }
458+ }
459+ }
460+ }
461+
383462func createTopologyAttributes (policy string , scope string ) v1alpha2.AttributeList {
384463 return v1alpha2.AttributeList {
385464 {
0 commit comments