client-go Patterns: Informers, Work Queues, and Rate Limiting


You’re building a Kubernetes controller. You could poll the API server every few seconds, but that doesn’t scale. You could set up a watch, but then you need to handle disconnections, resyncs, and state reconciliation yourself. Or you could use client-go’s battle-tested primitives that handle all of this for you.

This post covers the core building blocks: Informers for efficient API watching, work queues for reliable event processing, and rate limiters for backoff and retries.

The naive approach to watching Kubernetes resources:

// Don't do this
for {
    pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
    if err != nil {
        log.Error(err)
        continue
    }
    
    for _, pod := range pods.Items {
        processPod(pod)
    }
    
    time.Sleep(5 * time.Second)
}

Problems:

  1. Expensive: Every LIST fetches all objects, even unchanged ones
  2. Delayed: 5-second polling means up to 5 seconds of staleness
  3. Scales poorly: More objects = more data transferred each poll
  4. No ordering: You might miss rapid changes between polls

Kubernetes provides WATCH for efficient change streaming:

// Better, but still manual
watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{})
if err != nil {
    return err
}

for event := range watcher.ResultChan() {
    pod := event.Object.(*v1.Pod)
    switch event.Type {
    case watch.Added:
        handleAdd(pod)
    case watch.Modified:
        handleUpdate(pod)
    case watch.Deleted:
        handleDelete(pod)
    }
}

Better, but you still need to handle:

  • Watch disconnections (network blips, API server restarts)
  • Bookmark events and resource versions
  • Initial LIST to populate state before watching
  • Resyncs when the watch falls behind

client-go’s Informers handle all of this.

An Informer combines LIST and WATCH into a single abstraction that maintains a local cache of objects and notifies you of changes.

import (
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

func main() {
    // Create clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // Create shared informer factory
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)

    // Get pod informer
    podInformer := factory.Core().V1().Pods().Informer()

    // Add event handlers
    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*v1.Pod)
            fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            oldPod := oldObj.(*v1.Pod)
            newPod := newObj.(*v1.Pod)
            fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
        },
        DeleteFunc: func(obj interface{}) {
            pod := obj.(*v1.Pod)
            fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
        },
    })

    // Start informer
    stopCh := make(chan struct{})
    factory.Start(stopCh)

    // Wait for cache sync
    if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
        panic("failed to sync cache")
    }

    // Now the cache is populated and handlers are receiving events
    <-stopCh
}
1. Initial LIST
   Informer calls LIST to get all current objects
   Stores them in local cache (thread-safe store)
   Triggers AddFunc for each existing object

2. Start WATCH
   Informer opens WATCH from the LIST's resourceVersion
   Receives streaming events (ADDED, MODIFIED, DELETED)

3. Event Processing
   Each event updates the local cache
   Triggers appropriate handler (Add/Update/Delete)

4. Resync (periodic)
   Every resyncPeriod, triggers UpdateFunc for ALL cached objects
   Ensures eventual consistency even if events were missed

5. Watch Reconnection
   If watch disconnects, Informer re-LISTs and re-WATCHes
   Deduplicates events (won't re-trigger for unchanged objects)

Informers maintain an in-memory cache (called a Store) that you can query directly:

// Get the Lister (read-only cache interface)
podLister := factory.Core().V1().Pods().Lister()

// List all pods (from cache, not API server)
pods, err := podLister.List(labels.Everything())

// List pods in a namespace
pods, err := podLister.Pods("default").List(labels.Everything())

// Get a specific pod
pod, err := podLister.Pods("default").Get("my-pod")

Cache reads are:

  • Fast: No network call, just memory access
  • Eventually consistent: May be slightly behind API server
  • Thread-safe: Safe to call from any goroutine

You rarely create Informers directly. Instead, use SharedInformerFactory:

// Without factory: each informer has its own watch
podInformer1 := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, resyncPeriod)
podInformer2 := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, resyncPeriod)
// Two watches to API server for the same resource!

// With factory: informers are shared
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
podInformer1 := factory.Core().V1().Pods().Informer()
podInformer2 := factory.Core().V1().Pods().Informer()
// Same informer, one watch, multiple handlers

The factory ensures only one watch per resource type, regardless of how many controllers need it.

Watch only what you need:

// Only watch pods in "production" namespace
factory := informers.NewSharedInformerFactoryWithOptions(
    clientset,
    resyncPeriod,
    informers.WithNamespace("production"),
)

// Only watch pods with specific labels
factory := informers.NewSharedInformerFactoryWithOptions(
    clientset,
    resyncPeriod,
    informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
        opts.LabelSelector = "app=myapp"
    }),
)

Filtering at the API server level reduces memory usage and network traffic.

Event handlers run in the Informer’s goroutine. If your handler is slow or blocks, you’ll delay all other events. The solution: queue events for async processing.

// Bad: slow handler blocks all events
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        pod := obj.(*v1.Pod)
        // This takes 5 seconds...
        doExpensiveReconciliation(pod)
        // All other events are blocked!
    },
})
import "k8s.io/client-go/util/workqueue"

// Create a rate-limited work queue
queue := workqueue.NewRateLimitingQueue(
    workqueue.DefaultControllerRateLimiter(),
)

// Handler just enqueues keys
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        key, err := cache.MetaNamespaceKeyFunc(obj)
        if err == nil {
            queue.Add(key)  // Fast: just adds string to queue
        }
    },
    UpdateFunc: func(old, new interface{}) {
        key, err := cache.MetaNamespaceKeyFunc(new)
        if err == nil {
            queue.Add(key)  // Deduplicates: same key won't queue twice
        }
    },
    DeleteFunc: func(obj interface{}) {
        key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
        if err == nil {
            queue.Add(key)
        }
    },
})

// Worker goroutines process the queue
for i := 0; i < workerCount; i++ {
    go func() {
        for processNextItem(queue, podLister) {
        }
    }()
}
func processNextItem(queue workqueue.RateLimitingInterface, lister v1lister.PodLister) bool {
    // Get next item (blocks if queue is empty)
    key, shutdown := queue.Get()
    if shutdown {
        return false
    }
    
    // Tell queue we're done with this item when function returns
    defer queue.Done(key)
    
    // Parse the key
    namespace, name, err := cache.SplitMetaNamespaceKey(key.(string))
    if err != nil {
        // Invalid key, don't requeue
        queue.Forget(key)
        return true
    }
    
    // Get object from cache
    pod, err := lister.Pods(namespace).Get(name)
    if errors.IsNotFound(err) {
        // Object deleted, nothing to do
        queue.Forget(key)
        return true
    }
    if err != nil {
        // Transient error, requeue with rate limiting
        queue.AddRateLimited(key)
        return true
    }
    
    // Do actual reconciliation
    if err := reconcile(pod); err != nil {
        // Reconciliation failed, requeue with rate limiting
        queue.AddRateLimited(key)
        return true
    }
    
    // Success! Remove from rate limiter tracking
    queue.Forget(key)
    return true
}

Keys, not objects: We queue string keys (namespace/name), not full objects. This means:

  • Deduplication: Multiple rapid events for same object = one queue item
  • Freshness: Worker reads latest state from cache, not stale event data
  • Memory efficiency: Queue holds strings, not large objects

Done(): Always call queue.Done(key) when finished processing. This marks the item as no longer being processed.

Forget(): Call queue.Forget(key) on success to reset rate limiter state for this key.

AddRateLimited(): Call on failure to requeue with exponential backoff.

client-go provides several queue implementations:

// Basic FIFO queue
queue := workqueue.New()

// Delayed queue (items become ready after delay)
queue := workqueue.NewDelayingQueue()

// Rate-limited queue (exponential backoff on failures)
queue := workqueue.NewRateLimitingQueue(rateLimiter)

// Named queue (for metrics)
queue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "my-controller")

For controllers, always use RateLimitingQueue.

When reconciliation fails, you want to retry—but not immediately in a tight loop. Rate limiters control retry timing.

// DefaultControllerRateLimiter combines two strategies:
rateLimiter := workqueue.DefaultControllerRateLimiter()

// Equivalent to:
rateLimiter := workqueue.NewMaxOfRateLimiter(
    // Exponential backoff: 5ms, 10ms, 20ms... up to 1000s
    workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
    // Overall rate limit: 10 qps, burst of 100
    &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

This means:

  • First failure: retry after 5ms
  • Second failure: retry after 10ms
  • Third failure: retry after 20ms
  • …exponentially increasing up to 1000 seconds
  • Plus: overall queue is limited to 10 items/second with burst of 100
// Faster retries, shorter max
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(
    1*time.Millisecond,   // base delay
    30*time.Second,       // max delay
)

// Fixed delay (no exponential backoff)
rateLimiter := workqueue.NewItemFastSlowRateLimiter(
    5*time.Millisecond,   // fast delay (first N attempts)
    10*time.Second,       // slow delay (after N attempts)
    5,                    // N = number of fast attempts
)

// Per-item rate limiting with different delays
rateLimiter := workqueue.NewMaxOfRateLimiter(
    workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 60*time.Second),
    workqueue.NewItemFastSlowRateLimiter(500*time.Millisecond, 5*time.Second, 4),
)
// How long until this item should be processed?
delay := rateLimiter.When(key)

// Increment failure count for this item
rateLimiter.NumRequeues(key)  // Returns current count

// Reset failure count (call on success)
rateLimiter.Forget(key)

Separate from work queue rate limiting, you can rate limit API server requests:

import "k8s.io/client-go/rest"

config := &rest.Config{
    Host: "https://kubernetes.default.svc",
    
    // Rate limit API requests
    QPS:   20,    // Queries per second
    Burst: 30,    // Burst allowance
}

clientset, err := kubernetes.NewForConfig(config)

Default is 5 QPS with burst of 10. Increase for high-throughput controllers, but be mindful of API server load.

Putting it all together:

package main

import (
    "context"
    "fmt"
    "time"

    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    v1lister "k8s.io/client-go/listers/core/v1"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"
)

type Controller struct {
    clientset  kubernetes.Interface
    podLister  v1lister.PodLister
    podsSynced cache.InformerSynced
    workqueue  workqueue.RateLimitingInterface
}

func NewController(
    clientset kubernetes.Interface,
    podInformer cache.SharedIndexInformer,
    podLister v1lister.PodLister,
) *Controller {
    controller := &Controller{
        clientset:  clientset,
        podLister:  podLister,
        podsSynced: podInformer.HasSynced,
        workqueue: workqueue.NewNamedRateLimitingQueue(
            workqueue.DefaultControllerRateLimiter(),
            "Pods",
        ),
    }

    // Set up event handlers
    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueuePod,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueuePod(new)
        },
        DeleteFunc: controller.enqueuePod,
    })

    return controller
}

func (c *Controller) enqueuePod(obj interface{}) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        runtime.HandleError(err)
        return
    }
    c.workqueue.Add(key)
}

func (c *Controller) Run(ctx context.Context, workers int) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShutDown()

    klog.Info("Starting controller")

    // Wait for caches to sync
    klog.Info("Waiting for informer caches to sync")
    if !cache.WaitForCacheSync(ctx.Done(), c.podsSynced) {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    klog.Info("Starting workers")
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }

    klog.Info("Started workers")
    <-ctx.Done()
    klog.Info("Shutting down workers")

    return nil
}

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextWorkItem(ctx) {
    }
}

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    defer c.workqueue.Done(obj)

    key := obj.(string)
    
    if err := c.syncHandler(ctx, key); err != nil {
        // Requeue with rate limiting
        c.workqueue.AddRateLimited(key)
        runtime.HandleError(fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()))
        return true
    }

    // Success - forget this item so rate limiter resets
    c.workqueue.Forget(obj)
    klog.Infof("Successfully synced '%s'", key)
    return true
}

func (c *Controller) syncHandler(ctx context.Context, key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return nil // Invalid key, don't requeue
    }

    // Get pod from cache
    pod, err := c.podLister.Pods(namespace).Get(name)
    if errors.IsNotFound(err) {
        // Pod deleted, nothing to do
        klog.Infof("Pod %s deleted", key)
        return nil
    }
    if err != nil {
        return err // Requeue
    }

    // Your reconciliation logic here
    klog.Infof("Reconciling pod %s/%s (phase: %s)", 
        pod.Namespace, pod.Name, pod.Status.Phase)

    return nil
}

func main() {
    // Build config
    config, err := clientcmd.BuildConfigFromFlags("", 
        clientcmd.RecommendedHomeFile)
    if err != nil {
        klog.Fatalf("Error building config: %s", err)
    }

    // Create clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        klog.Fatalf("Error creating clientset: %s", err)
    }

    // Create informer factory
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)
    podInformer := factory.Core().V1().Pods()

    // Create controller
    controller := NewController(
        clientset,
        podInformer.Informer(),
        podInformer.Lister(),
    )

    // Start informers
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    factory.Start(ctx.Done())

    // Run controller with 2 workers
    if err := controller.Run(ctx, 2); err != nil {
        klog.Fatalf("Error running controller: %s", err)
    }
}

Controllers often need to watch multiple resource types:

type Controller struct {
    podLister       v1lister.PodLister
    podsSynced      cache.InformerSynced
    serviceLister   v1lister.ServiceLister
    servicesSynced  cache.InformerSynced
    workqueue       workqueue.RateLimitingInterface
}

func NewController(
    podInformer coreinformers.PodInformer,
    serviceInformer coreinformers.ServiceInformer,
) *Controller {
    c := &Controller{
        podLister:      podInformer.Lister(),
        podsSynced:     podInformer.Informer().HasSynced,
        serviceLister:  serviceInformer.Lister(),
        servicesSynced: serviceInformer.Informer().HasSynced,
        workqueue:      workqueue.NewRateLimitingQueue(...),
    }

    // Watch pods
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    c.handlePod,
        UpdateFunc: func(old, new interface{}) { c.handlePod(new) },
        DeleteFunc: c.handlePod,
    })

    // Watch services - enqueue related pods
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    c.handleService,
        UpdateFunc: func(old, new interface{}) { c.handleService(new) },
        DeleteFunc: c.handleService,
    })

    return c
}

func (c *Controller) handleService(obj interface{}) {
    svc := obj.(*v1.Service)
    
    // Find pods selected by this service
    selector := labels.SelectorFromSet(svc.Spec.Selector)
    pods, _ := c.podLister.Pods(svc.Namespace).List(selector)
    
    // Enqueue each related pod
    for _, pod := range pods {
        c.enqueuePod(pod)
    }
}

// Wait for ALL caches
func (c *Controller) Run(ctx context.Context, workers int) error {
    if !cache.WaitForCacheSync(ctx.Done(), c.podsSynced, c.servicesSynced) {
        return fmt.Errorf("caches failed to sync")
    }
    // ...
}

When a child resource changes, find and enqueue the parent:

func (c *Controller) handlePod(obj interface{}) {
    pod := obj.(*v1.Pod)
    
    // Find owner ReplicaSet
    ownerRef := metav1.GetControllerOf(pod)
    if ownerRef == nil || ownerRef.Kind != "ReplicaSet" {
        return
    }
    
    // Enqueue the ReplicaSet (not the pod)
    c.workqueue.Add(pod.Namespace + "/" + ownerRef.Name)
}

The resync period triggers UpdateFunc for all cached objects periodically:

// 30 minute resync
factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)

Why resync?

  • Catch missed events (network issues, bugs)
  • Ensure eventual consistency
  • Periodic health check of reconciliation

Trade-offs:

  • Shorter: More consistent, more CPU/API load
  • Longer: Less overhead, higher staleness risk
  • Zero (0): No resync (only event-driven)

For most controllers, 10-30 minutes is reasonable.

Don’t enqueue if nothing meaningful changed:

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    UpdateFunc: func(old, new interface{}) {
        oldPod := old.(*v1.Pod)
        newPod := new.(*v1.Pod)
        
        // Skip if resource version unchanged (resync)
        if oldPod.ResourceVersion == newPod.ResourceVersion {
            return
        }
        
        // Skip if only status changed (we only care about spec)
        if reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
            return
        }
        
        c.enqueuePod(newPod)
    },
})

Delete events can be tricky — sometimes you get a DeletedFinalStateUnknown:

DeleteFunc: func(obj interface{}) {
    // Handle DeletedFinalStateUnknown wrapper
    if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
        obj = tombstone.Obj
    }
    
    pod, ok := obj.(*v1.Pod)
    if !ok {
        runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
        return
    }
    
    c.enqueuePod(pod)
},

DeletedFinalStateUnknown occurs when the controller missed the delete event and discovers the deletion during resync.

Add custom indexes for efficient lookups:

// Add index by node name
podInformer.Informer().AddIndexers(cache.Indexers{
    "byNode": func(obj interface{}) ([]string, error) {
        pod := obj.(*v1.Pod)
        if pod.Spec.NodeName == "" {
            return nil, nil
        }
        return []string{pod.Spec.NodeName}, nil
    },
})

// Query by index
indexer := podInformer.Informer().GetIndexer()
pods, err := indexer.ByIndex("byNode", "worker-1")
// Wrong: blocks all events
AddFunc: func(obj interface{}) {
    pod := obj.(*v1.Pod)
    time.Sleep(5 * time.Second)  // Blocks!
    processExpensiveThing(pod)
}

// Right: just enqueue
AddFunc: func(obj interface{}) {
    key, _ := cache.MetaNamespaceKeyFunc(obj)
    queue.Add(key)
}
// Wrong: uses potentially stale object from event
func (c *Controller) sync(key string) error {
    // pod from event might be outdated
    return c.reconcile(c.eventPod)
}

// Right: fetch fresh from cache
func (c *Controller) sync(key string) error {
    ns, name, _ := cache.SplitMetaNamespaceKey(key)
    pod, _ := c.podLister.Pods(ns).Get(name)
    return c.reconcile(pod)
}
// Wrong: processing before cache is populated
factory.Start(stopCh)
// Immediately start processing... but cache is empty!

// Right: wait for sync
factory.Start(stopCh)
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
    return fmt.Errorf("cache sync failed")
}
// Now cache is populated
// Wrong: item stays "processing" forever
func (c *Controller) processNext() bool {
    obj, _ := c.queue.Get()
    // forgot queue.Done(obj)!
    return true
}

// Right: always call Done
func (c *Controller) processNext() bool {
    obj, _ := c.queue.Get()
    defer c.queue.Done(obj)
    // ...
    return true
}
// Wrong: rate limiter keeps growing delay forever
if err := c.sync(key); err != nil {
    c.queue.AddRateLimited(key)
    return
}
// Success but forgot to reset rate limiter

// Right: reset on success
if err := c.sync(key); err != nil {
    c.queue.AddRateLimited(key)
    return
}
c.queue.Forget(key)  // Reset rate limiter state
import (
    "k8s.io/client-go/kubernetes/fake"
    "k8s.io/client-go/informers"
)

func TestController(t *testing.T) {
    // Create fake clientset with initial objects
    clientset := fake.NewSimpleClientset(
        &v1.Pod{
            ObjectMeta: metav1.ObjectMeta{
                Name:      "test-pod",
                Namespace: "default",
            },
        },
    )

    // Create informer factory
    factory := informers.NewSharedInformerFactory(clientset, 0)
    podInformer := factory.Core().V1().Pods()

    // Create controller
    controller := NewController(clientset, podInformer)

    // Start informers
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    factory.Start(ctx.Done())
    cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced)

    // Test: add a pod
    clientset.CoreV1().Pods("default").Create(ctx, &v1.Pod{
        ObjectMeta: metav1.ObjectMeta{Name: "new-pod"},
    }, metav1.CreateOptions{})

    // Wait for controller to process
    time.Sleep(100 * time.Millisecond)

    // Assert expected behavior
    // ...
}

For integration tests, use envtest from controller-runtime, which spins up a real API server:

import (
    "sigs.k8s.io/controller-runtime/pkg/envtest"
)

func TestIntegration(t *testing.T) {
    testEnv := &envtest.Environment{}
    cfg, err := testEnv.Start()
    if err != nil {
        t.Fatal(err)
    }
    defer testEnv.Stop()

    // cfg is a real *rest.Config pointing to the test API server
    clientset, _ := kubernetes.NewForConfig(cfg)
    // ... run tests against real API server
}

client-go provides battle-tested primitives for building Kubernetes controllers:

Component Purpose Key Points
Informer Watch + cache LIST once, WATCH forever, local cache
SharedInformerFactory Share informers One watch per resource type
Lister Read cache Fast, no API calls
Work Queue Async processing Deduplication, rate limiting
Rate Limiter Backoff/retry Exponential backoff on failures

The pattern:

  1. Informer watches resources, maintains cache
  2. Event handlers enqueue keys (fast, non-blocking)
  3. Workers dequeue and reconcile (using fresh cache data)
  4. Rate limiter controls retry timing on failures

This is the same pattern used by every built-in Kubernetes controller. Master it, and you can build controllers that scale.