You’re building a Kubernetes controller. You could poll the API server every few seconds, but that doesn’t scale. You could set up a watch, but then you need to handle disconnections, resyncs, and state reconciliation yourself. Or you could use client-go’s battle-tested primitives that handle all of this for you.
This post covers the core building blocks: Informers for efficient API watching, work queues for reliable event processing, and rate limiters for backoff and retries.
Why Not Just Use the REST API? ¶
The naive approach to watching Kubernetes resources:
// Don't do this
for {
pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
if err != nil {
log.Error(err)
continue
}
for _, pod := range pods.Items {
processPod(pod)
}
time.Sleep(5 * time.Second)
}
Problems:
- Expensive: Every LIST fetches all objects, even unchanged ones
- Delayed: 5-second polling means up to 5 seconds of staleness
- Scales poorly: More objects = more data transferred each poll
- No ordering: You might miss rapid changes between polls
Kubernetes provides WATCH for efficient change streaming:
// Better, but still manual
watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for event := range watcher.ResultChan() {
pod := event.Object.(*v1.Pod)
switch event.Type {
case watch.Added:
handleAdd(pod)
case watch.Modified:
handleUpdate(pod)
case watch.Deleted:
handleDelete(pod)
}
}
Better, but you still need to handle:
- Watch disconnections (network blips, API server restarts)
- Bookmark events and resource versions
- Initial LIST to populate state before watching
- Resyncs when the watch falls behind
client-go’s Informers handle all of this.
Informers: The Foundation ¶
An Informer combines LIST and WATCH into a single abstraction that maintains a local cache of objects and notifies you of changes.
Basic Informer Usage ¶
import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
func main() {
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// Create shared informer factory
factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)
// Get pod informer
podInformer := factory.Core().V1().Pods().Informer()
// Add event handlers
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
},
})
// Start informer
stopCh := make(chan struct{})
factory.Start(stopCh)
// Wait for cache sync
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
panic("failed to sync cache")
}
// Now the cache is populated and handlers are receiving events
<-stopCh
}
What Happens Under the Hood ¶
1. Initial LIST
Informer calls LIST to get all current objects
Stores them in local cache (thread-safe store)
Triggers AddFunc for each existing object
2. Start WATCH
Informer opens WATCH from the LIST's resourceVersion
Receives streaming events (ADDED, MODIFIED, DELETED)
3. Event Processing
Each event updates the local cache
Triggers appropriate handler (Add/Update/Delete)
4. Resync (periodic)
Every resyncPeriod, triggers UpdateFunc for ALL cached objects
Ensures eventual consistency even if events were missed
5. Watch Reconnection
If watch disconnects, Informer re-LISTs and re-WATCHes
Deduplicates events (won't re-trigger for unchanged objects)
The Local Cache ¶
Informers maintain an in-memory cache (called a Store) that you can query directly:
// Get the Lister (read-only cache interface)
podLister := factory.Core().V1().Pods().Lister()
// List all pods (from cache, not API server)
pods, err := podLister.List(labels.Everything())
// List pods in a namespace
pods, err := podLister.Pods("default").List(labels.Everything())
// Get a specific pod
pod, err := podLister.Pods("default").Get("my-pod")
Cache reads are:
- Fast: No network call, just memory access
- Eventually consistent: May be slightly behind API server
- Thread-safe: Safe to call from any goroutine
SharedInformerFactory ¶
You rarely create Informers directly. Instead, use SharedInformerFactory:
// Without factory: each informer has its own watch
podInformer1 := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, resyncPeriod)
podInformer2 := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, resyncPeriod)
// Two watches to API server for the same resource!
// With factory: informers are shared
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
podInformer1 := factory.Core().V1().Pods().Informer()
podInformer2 := factory.Core().V1().Pods().Informer()
// Same informer, one watch, multiple handlers
The factory ensures only one watch per resource type, regardless of how many controllers need it.
Filtered Informers ¶
Watch only what you need:
// Only watch pods in "production" namespace
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
resyncPeriod,
informers.WithNamespace("production"),
)
// Only watch pods with specific labels
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
resyncPeriod,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = "app=myapp"
}),
)
Filtering at the API server level reduces memory usage and network traffic.
Work Queues: Reliable Event Processing ¶
Event handlers run in the Informer’s goroutine. If your handler is slow or blocks, you’ll delay all other events. The solution: queue events for async processing.
The Problem with Direct Processing ¶
// Bad: slow handler blocks all events
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
// This takes 5 seconds...
doExpensiveReconciliation(pod)
// All other events are blocked!
},
})
Work Queue Pattern ¶
import "k8s.io/client-go/util/workqueue"
// Create a rate-limited work queue
queue := workqueue.NewRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
)
// Handler just enqueues keys
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key) // Fast: just adds string to queue
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key) // Deduplicates: same key won't queue twice
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
// Worker goroutines process the queue
for i := 0; i < workerCount; i++ {
go func() {
for processNextItem(queue, podLister) {
}
}()
}
Processing Items ¶
func processNextItem(queue workqueue.RateLimitingInterface, lister v1lister.PodLister) bool {
// Get next item (blocks if queue is empty)
key, shutdown := queue.Get()
if shutdown {
return false
}
// Tell queue we're done with this item when function returns
defer queue.Done(key)
// Parse the key
namespace, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
// Invalid key, don't requeue
queue.Forget(key)
return true
}
// Get object from cache
pod, err := lister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
// Object deleted, nothing to do
queue.Forget(key)
return true
}
if err != nil {
// Transient error, requeue with rate limiting
queue.AddRateLimited(key)
return true
}
// Do actual reconciliation
if err := reconcile(pod); err != nil {
// Reconciliation failed, requeue with rate limiting
queue.AddRateLimited(key)
return true
}
// Success! Remove from rate limiter tracking
queue.Forget(key)
return true
}
Key Concepts ¶
Keys, not objects: We queue string keys (namespace/name), not full objects. This means:
- Deduplication: Multiple rapid events for same object = one queue item
- Freshness: Worker reads latest state from cache, not stale event data
- Memory efficiency: Queue holds strings, not large objects
Done(): Always call queue.Done(key) when finished processing. This marks the item as no longer being processed.
Forget(): Call queue.Forget(key) on success to reset rate limiter state for this key.
AddRateLimited(): Call on failure to requeue with exponential backoff.
Queue Types ¶
client-go provides several queue implementations:
// Basic FIFO queue
queue := workqueue.New()
// Delayed queue (items become ready after delay)
queue := workqueue.NewDelayingQueue()
// Rate-limited queue (exponential backoff on failures)
queue := workqueue.NewRateLimitingQueue(rateLimiter)
// Named queue (for metrics)
queue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "my-controller")
For controllers, always use RateLimitingQueue.
Rate Limiting: Backoff and Retry ¶
When reconciliation fails, you want to retry—but not immediately in a tight loop. Rate limiters control retry timing.
Default Rate Limiter ¶
// DefaultControllerRateLimiter combines two strategies:
rateLimiter := workqueue.DefaultControllerRateLimiter()
// Equivalent to:
rateLimiter := workqueue.NewMaxOfRateLimiter(
// Exponential backoff: 5ms, 10ms, 20ms... up to 1000s
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// Overall rate limit: 10 qps, burst of 100
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
This means:
- First failure: retry after 5ms
- Second failure: retry after 10ms
- Third failure: retry after 20ms
- …exponentially increasing up to 1000 seconds
- Plus: overall queue is limited to 10 items/second with burst of 100
Custom Rate Limiters ¶
// Faster retries, shorter max
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(
1*time.Millisecond, // base delay
30*time.Second, // max delay
)
// Fixed delay (no exponential backoff)
rateLimiter := workqueue.NewItemFastSlowRateLimiter(
5*time.Millisecond, // fast delay (first N attempts)
10*time.Second, // slow delay (after N attempts)
5, // N = number of fast attempts
)
// Per-item rate limiting with different delays
rateLimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 60*time.Second),
workqueue.NewItemFastSlowRateLimiter(500*time.Millisecond, 5*time.Second, 4),
)
Rate Limiter Methods ¶
// How long until this item should be processed?
delay := rateLimiter.When(key)
// Increment failure count for this item
rateLimiter.NumRequeues(key) // Returns current count
// Reset failure count (call on success)
rateLimiter.Forget(key)
Client-Side Rate Limiting ¶
Separate from work queue rate limiting, you can rate limit API server requests:
import "k8s.io/client-go/rest"
config := &rest.Config{
Host: "https://kubernetes.default.svc",
// Rate limit API requests
QPS: 20, // Queries per second
Burst: 30, // Burst allowance
}
clientset, err := kubernetes.NewForConfig(config)
Default is 5 QPS with burst of 10. Increase for high-throughput controllers, but be mindful of API server load.
The Complete Controller Pattern ¶
Putting it all together:
package main
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
type Controller struct {
clientset kubernetes.Interface
podLister v1lister.PodLister
podsSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
}
func NewController(
clientset kubernetes.Interface,
podInformer cache.SharedIndexInformer,
podLister v1lister.PodLister,
) *Controller {
controller := &Controller{
clientset: clientset,
podLister: podLister,
podsSynced: podInformer.HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
"Pods",
),
}
// Set up event handlers
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueuePod,
UpdateFunc: func(old, new interface{}) {
controller.enqueuePod(new)
},
DeleteFunc: controller.enqueuePod,
})
return controller
}
func (c *Controller) enqueuePod(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) Run(ctx context.Context, workers int) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("Starting controller")
// Wait for caches to sync
klog.Info("Waiting for informer caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), c.podsSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
klog.Info("Started workers")
<-ctx.Done()
klog.Info("Shutting down workers")
return nil
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(obj)
key := obj.(string)
if err := c.syncHandler(ctx, key); err != nil {
// Requeue with rate limiting
c.workqueue.AddRateLimited(key)
runtime.HandleError(fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()))
return true
}
// Success - forget this item so rate limiter resets
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return true
}
func (c *Controller) syncHandler(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return nil // Invalid key, don't requeue
}
// Get pod from cache
pod, err := c.podLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
// Pod deleted, nothing to do
klog.Infof("Pod %s deleted", key)
return nil
}
if err != nil {
return err // Requeue
}
// Your reconciliation logic here
klog.Infof("Reconciling pod %s/%s (phase: %s)",
pod.Namespace, pod.Name, pod.Status.Phase)
return nil
}
func main() {
// Build config
config, err := clientcmd.BuildConfigFromFlags("",
clientcmd.RecommendedHomeFile)
if err != nil {
klog.Fatalf("Error building config: %s", err)
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Error creating clientset: %s", err)
}
// Create informer factory
factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)
podInformer := factory.Core().V1().Pods()
// Create controller
controller := NewController(
clientset,
podInformer.Informer(),
podInformer.Lister(),
)
// Start informers
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory.Start(ctx.Done())
// Run controller with 2 workers
if err := controller.Run(ctx, 2); err != nil {
klog.Fatalf("Error running controller: %s", err)
}
}
Advanced Patterns ¶
Watching Multiple Resources ¶
Controllers often need to watch multiple resource types:
type Controller struct {
podLister v1lister.PodLister
podsSynced cache.InformerSynced
serviceLister v1lister.ServiceLister
servicesSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
}
func NewController(
podInformer coreinformers.PodInformer,
serviceInformer coreinformers.ServiceInformer,
) *Controller {
c := &Controller{
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
serviceLister: serviceInformer.Lister(),
servicesSynced: serviceInformer.Informer().HasSynced,
workqueue: workqueue.NewRateLimitingQueue(...),
}
// Watch pods
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePod,
UpdateFunc: func(old, new interface{}) { c.handlePod(new) },
DeleteFunc: c.handlePod,
})
// Watch services - enqueue related pods
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleService,
UpdateFunc: func(old, new interface{}) { c.handleService(new) },
DeleteFunc: c.handleService,
})
return c
}
func (c *Controller) handleService(obj interface{}) {
svc := obj.(*v1.Service)
// Find pods selected by this service
selector := labels.SelectorFromSet(svc.Spec.Selector)
pods, _ := c.podLister.Pods(svc.Namespace).List(selector)
// Enqueue each related pod
for _, pod := range pods {
c.enqueuePod(pod)
}
}
// Wait for ALL caches
func (c *Controller) Run(ctx context.Context, workers int) error {
if !cache.WaitForCacheSync(ctx.Done(), c.podsSynced, c.servicesSynced) {
return fmt.Errorf("caches failed to sync")
}
// ...
}
Owner Reference Lookups ¶
When a child resource changes, find and enqueue the parent:
func (c *Controller) handlePod(obj interface{}) {
pod := obj.(*v1.Pod)
// Find owner ReplicaSet
ownerRef := metav1.GetControllerOf(pod)
if ownerRef == nil || ownerRef.Kind != "ReplicaSet" {
return
}
// Enqueue the ReplicaSet (not the pod)
c.workqueue.Add(pod.Namespace + "/" + ownerRef.Name)
}
Resync Period Considerations ¶
The resync period triggers UpdateFunc for all cached objects periodically:
// 30 minute resync
factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)
Why resync?
- Catch missed events (network issues, bugs)
- Ensure eventual consistency
- Periodic health check of reconciliation
Trade-offs:
- Shorter: More consistent, more CPU/API load
- Longer: Less overhead, higher staleness risk
- Zero (
0): No resync (only event-driven)
For most controllers, 10-30 minutes is reasonable.
Event Filtering ¶
Don’t enqueue if nothing meaningful changed:
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
oldPod := old.(*v1.Pod)
newPod := new.(*v1.Pod)
// Skip if resource version unchanged (resync)
if oldPod.ResourceVersion == newPod.ResourceVersion {
return
}
// Skip if only status changed (we only care about spec)
if reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
return
}
c.enqueuePod(newPod)
},
})
Handling Deleted Objects ¶
Delete events can be tricky — sometimes you get a DeletedFinalStateUnknown:
DeleteFunc: func(obj interface{}) {
// Handle DeletedFinalStateUnknown wrapper
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
pod, ok := obj.(*v1.Pod)
if !ok {
runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}
c.enqueuePod(pod)
},
DeletedFinalStateUnknown occurs when the controller missed the delete event and discovers the deletion during resync.
Index Functions ¶
Add custom indexes for efficient lookups:
// Add index by node name
podInformer.Informer().AddIndexers(cache.Indexers{
"byNode": func(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
if pod.Spec.NodeName == "" {
return nil, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
// Query by index
indexer := podInformer.Informer().GetIndexer()
pods, err := indexer.ByIndex("byNode", "worker-1")
Common Mistakes ¶
1. Blocking in Event Handlers ¶
// Wrong: blocks all events
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
time.Sleep(5 * time.Second) // Blocks!
processExpensiveThing(pod)
}
// Right: just enqueue
AddFunc: func(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj)
queue.Add(key)
}
2. Using Stale Object from Event ¶
// Wrong: uses potentially stale object from event
func (c *Controller) sync(key string) error {
// pod from event might be outdated
return c.reconcile(c.eventPod)
}
// Right: fetch fresh from cache
func (c *Controller) sync(key string) error {
ns, name, _ := cache.SplitMetaNamespaceKey(key)
pod, _ := c.podLister.Pods(ns).Get(name)
return c.reconcile(pod)
}
3. Forgetting cache.WaitForCacheSync ¶
// Wrong: processing before cache is populated
factory.Start(stopCh)
// Immediately start processing... but cache is empty!
// Right: wait for sync
factory.Start(stopCh)
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
return fmt.Errorf("cache sync failed")
}
// Now cache is populated
4. Not Calling Done() ¶
// Wrong: item stays "processing" forever
func (c *Controller) processNext() bool {
obj, _ := c.queue.Get()
// forgot queue.Done(obj)!
return true
}
// Right: always call Done
func (c *Controller) processNext() bool {
obj, _ := c.queue.Get()
defer c.queue.Done(obj)
// ...
return true
}
5. Forgetting to Forget ¶
// Wrong: rate limiter keeps growing delay forever
if err := c.sync(key); err != nil {
c.queue.AddRateLimited(key)
return
}
// Success but forgot to reset rate limiter
// Right: reset on success
if err := c.sync(key); err != nil {
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key) // Reset rate limiter state
Testing Controllers ¶
Unit Testing with Fake Client ¶
import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/informers"
)
func TestController(t *testing.T) {
// Create fake clientset with initial objects
clientset := fake.NewSimpleClientset(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
},
},
)
// Create informer factory
factory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := factory.Core().V1().Pods()
// Create controller
controller := NewController(clientset, podInformer)
// Start informers
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced)
// Test: add a pod
clientset.CoreV1().Pods("default").Create(ctx, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod"},
}, metav1.CreateOptions{})
// Wait for controller to process
time.Sleep(100 * time.Millisecond)
// Assert expected behavior
// ...
}
Integration Testing ¶
For integration tests, use envtest from controller-runtime, which spins up a real API server:
import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
)
func TestIntegration(t *testing.T) {
testEnv := &envtest.Environment{}
cfg, err := testEnv.Start()
if err != nil {
t.Fatal(err)
}
defer testEnv.Stop()
// cfg is a real *rest.Config pointing to the test API server
clientset, _ := kubernetes.NewForConfig(cfg)
// ... run tests against real API server
}
Summary ¶
client-go provides battle-tested primitives for building Kubernetes controllers:
| Component | Purpose | Key Points |
|---|---|---|
| Informer | Watch + cache | LIST once, WATCH forever, local cache |
| SharedInformerFactory | Share informers | One watch per resource type |
| Lister | Read cache | Fast, no API calls |
| Work Queue | Async processing | Deduplication, rate limiting |
| Rate Limiter | Backoff/retry | Exponential backoff on failures |
The pattern:
- Informer watches resources, maintains cache
- Event handlers enqueue keys (fast, non-blocking)
- Workers dequeue and reconcile (using fresh cache data)
- Rate limiter controls retry timing on failures
This is the same pattern used by every built-in Kubernetes controller. Master it, and you can build controllers that scale.