Skip to content

Commit 1b8198f

Browse files
authored
Merge pull request #9356 from abdelrahman882/buffers_metric
Add capacity buffer oldest reconciliation timestamp metric
2 parents 2180a5a + 457155b commit 1b8198f

File tree

9 files changed

+735
-32
lines changed

9 files changed

+735
-32
lines changed

cluster-autoscaler/builder/autoscaler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,7 @@ func (b *AutoscalerBuilder) Build(ctx context.Context) (core.Autoscaler, *loop.L
199199
} else {
200200
fakePodsResolver = fakepods.NewDefaultingResolver()
201201
}
202-
nodeBufferController := cbctrl.NewDefaultBufferController(capacitybufferClient, fakePodsResolver)
203-
go nodeBufferController.Run(ctx.Done())
202+
cbctrl.InitializeAndRunDefaultBufferController(ctx, capacitybufferClient, fakePodsResolver)
204203
}
205204
}
206205

cluster-autoscaler/capacitybuffer/controller/controller.go

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package controller
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"sort"
@@ -36,8 +37,10 @@ import (
3637
cbclient "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client"
3738
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/fakepods"
3839
filters "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
40+
cbmetrics "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/metrics"
3941
translators "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators"
4042
updater "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/updater"
43+
"k8s.io/utils/clock"
4144
)
4245

4346
// BufferController performs updates on Buffers and convert them to pods to be injected
@@ -47,12 +50,14 @@ type BufferController interface {
4750
}
4851

4952
type bufferController struct {
50-
client *cbclient.CapacityBufferClient
51-
strategyFilter filters.Filter
52-
translator translators.Translator
53-
quotaAllocator *resourceQuotaAllocator
54-
updater updater.StatusUpdater
55-
queue workqueue.TypedRateLimitingInterface[string]
53+
client *cbclient.CapacityBufferClient
54+
strategyFilter filters.Filter
55+
translator translators.Translator
56+
quotaAllocator *resourceQuotaAllocator
57+
updater updater.StatusUpdater
58+
queue workqueue.TypedRateLimitingInterface[string]
59+
clock clock.Clock
60+
reconciliationTimeCache *cbmetrics.ReconciliationCache
5661
}
5762

5863
// NewBufferController creates new bufferController object
@@ -61,6 +66,8 @@ func NewBufferController(
6166
strategyFilter filters.Filter,
6267
translator translators.Translator,
6368
updater updater.StatusUpdater,
69+
clock clock.Clock,
70+
reconciliationTimeCache *cbmetrics.ReconciliationCache,
6471
) BufferController {
6572
bc := &bufferController{
6673
client: client,
@@ -71,20 +78,42 @@ func NewBufferController(
7178
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
7279
workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "CapacityBuffers"},
7380
),
81+
clock: clock,
82+
reconciliationTimeCache: reconciliationTimeCache,
7483
}
7584
bc.configureEventHandlers()
7685
return bc
7786
}
7887

88+
// InitializeAndRunDefaultBufferController creates the default Capacity buffer controller and processing interval metric collector
89+
// and runs each of them asyncrounsly
90+
func InitializeAndRunDefaultBufferController(
91+
ctx context.Context,
92+
client *cbclient.CapacityBufferClient,
93+
resolver fakepods.Resolver,
94+
95+
) {
96+
realClock := clock.RealClock{}
97+
reconciledBuffersCache := cbmetrics.NewReconciliationCache()
98+
// Accepting empty string as it represents nil value for ProvisioningStrategy
99+
defaultStrategies := []string{capacitybuffer.ActiveProvisioningStrategy, ""}
100+
controller := NewDefaultBufferController(client, resolver, defaultStrategies, reconciledBuffersCache, realClock)
101+
go controller.Run(ctx.Done())
102+
103+
cbmetrics.RegisterReconciliationTimestampCollector(client, defaultStrategies, reconciledBuffersCache, realClock)
104+
}
105+
79106
// NewDefaultBufferController creates bufferController with default configs
80107
func NewDefaultBufferController(
81108
client *cbclient.CapacityBufferClient,
82109
resolver fakepods.Resolver,
110+
strategies []string,
111+
reconciliationTimeCache *cbmetrics.ReconciliationCache,
112+
clock clock.Clock,
83113
) BufferController {
84114
bc := &bufferController{
85-
client: client,
86-
// Accepting empty string as it represents nil value for ProvisioningStrategy
87-
strategyFilter: filters.NewStrategyFilter([]string{capacitybuffer.ActiveProvisioningStrategy, ""}),
115+
client: client,
116+
strategyFilter: filters.NewStrategyFilter(strategies),
88117
translator: translators.NewCombinedTranslator(
89118
[]translators.Translator{
90119
translators.NewPodTemplateBufferTranslator(client, resolver),
@@ -96,6 +125,8 @@ func NewDefaultBufferController(
96125
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
97126
workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "CapacityBuffers"},
98127
),
128+
clock: clock,
129+
reconciliationTimeCache: reconciliationTimeCache,
99130
}
100131
bc.configureEventHandlers()
101132
return bc
@@ -278,7 +309,10 @@ func (c *bufferController) reconcileNamespace(namespace string) error {
278309

279310
// Filter the desired provisioning strategy
280311
// Note: We process ALL buffers in the namespace that match the strategy.
281-
filteredBuffers, _ := c.strategyFilter.Filter(buffers)
312+
filteredBuffers, filteredOutBuffers := c.strategyFilter.Filter(buffers)
313+
314+
// Update reconciliation time for filtered out buffers
315+
c.updateReconciliationTimeCache(filteredOutBuffers)
282316

283317
if len(filteredBuffers) == 0 {
284318
return nil
@@ -306,7 +340,8 @@ func (c *bufferController) reconcileNamespace(namespace string) error {
306340
}
307341

308342
// Update buffer status by calling API server
309-
updateErrors := c.updater.Update(filteredBuffers)
343+
updatedBuffers, updateErrors := c.updater.Update(filteredBuffers)
344+
c.updateReconciliationTimeCache(updatedBuffers)
310345
for _, err := range updateErrors {
311346
runtime.HandleError(fmt.Errorf("capacity buffer controller error: %w", err))
312347
}
@@ -318,3 +353,10 @@ func (c *bufferController) reconcileNamespace(namespace string) error {
318353

319354
return nil
320355
}
356+
357+
func (c *bufferController) updateReconciliationTimeCache(buffers []*v1.CapacityBuffer) {
358+
if c.reconciliationTimeCache == nil || len(buffers) == 0 {
359+
return
360+
}
361+
c.reconciliationTimeCache.Update(buffers, c.clock.Now())
362+
}

cluster-autoscaler/capacitybuffer/controller/controller_test.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,74 @@ import (
2929
"k8s.io/apimachinery/pkg/api/resource"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/types"
3233
"k8s.io/apimachinery/pkg/util/wait"
3334
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1beta1"
3435
fakebuffers "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/client/clientset/versioned/fake"
3536
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer"
3637
cbclient "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client"
3738
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/fakepods"
39+
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/metrics"
3840
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/testutil"
3941
fakek8s "k8s.io/client-go/kubernetes/fake"
4042
k8stesting "k8s.io/client-go/testing"
43+
"k8s.io/utils/clock"
44+
testclock "k8s.io/utils/clock/testing"
4145
"k8s.io/utils/ptr"
4246
)
4347

48+
func TestController_CacheUpdates(t *testing.T) {
49+
now := metav1.Now().Time
50+
fakeClock := testclock.NewFakeClock(now)
51+
reconciliationCache := metrics.NewReconciliationCache()
52+
53+
bSupported := &v1.CapacityBuffer{
54+
ObjectMeta: metav1.ObjectMeta{
55+
Name: "supported",
56+
Namespace: "default",
57+
UID: types.UID("uid-supported"),
58+
},
59+
Spec: v1.CapacityBufferSpec{
60+
ProvisioningStrategy: ptr.To(capacitybuffer.ActiveProvisioningStrategy),
61+
},
62+
}
63+
bUnsupported := &v1.CapacityBuffer{
64+
ObjectMeta: metav1.ObjectMeta{
65+
Name: "unsupported",
66+
Namespace: "default",
67+
UID: types.UID("uid-unsupported"),
68+
},
69+
Spec: v1.CapacityBufferSpec{
70+
ProvisioningStrategy: ptr.To("unsupported-strategy"),
71+
},
72+
}
73+
74+
buffersClient := fakebuffers.NewSimpleClientset(bSupported, bUnsupported)
75+
k8sClient := fakek8s.NewSimpleClientset()
76+
client, _ := cbclient.NewCapacityBufferClientFromClients(buffersClient, k8sClient, nil, nil)
77+
78+
resolver := fakepods.NewDefaultingResolver()
79+
controller := NewDefaultBufferController(
80+
client,
81+
resolver,
82+
[]string{capacitybuffer.ActiveProvisioningStrategy},
83+
reconciliationCache,
84+
fakeClock,
85+
).(*bufferController)
86+
87+
// Reconcile
88+
err := controller.reconcileNamespace("default")
89+
assert.NoError(t, err)
90+
91+
// Verify Cache
92+
snapshot := reconciliationCache.Snapshot()
93+
assert.Len(t, snapshot, 2)
94+
assert.Contains(t, snapshot, types.UID("uid-supported"))
95+
assert.Contains(t, snapshot, types.UID("uid-unsupported"))
96+
assert.Equal(t, now, snapshot[types.UID("uid-supported")])
97+
assert.Equal(t, now, snapshot[types.UID("uid-unsupported")])
98+
}
99+
44100
func TestControllerIntegration_ResourceQuotas(t *testing.T) {
45101
// TODO: refactor to ginkgo and envtest
46102
// Setup
@@ -91,7 +147,7 @@ func TestControllerIntegration_ResourceQuotas(t *testing.T) {
91147

92148
// TODO: use DryRunResolver once migrated to envtest
93149
resolver := fakepods.NewDefaultingResolver()
94-
controller := NewDefaultBufferController(client, resolver).(*bufferController)
150+
controller := NewDefaultBufferController(client, resolver, []string{capacitybuffer.ActiveProvisioningStrategy}, metrics.NewReconciliationCache(), clock.RealClock{}).(*bufferController)
95151

96152
ctx, cancel := context.WithCancel(context.Background())
97153
defer cancel()
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/types"
24+
"k8s.io/apimachinery/pkg/util/sets"
25+
"k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1beta1"
26+
)
27+
28+
// ReconciliationCache is a thread safe map for buffers last reconciliation time
29+
type ReconciliationCache struct {
30+
reconciledBuffers map[types.UID]time.Time
31+
mutex sync.RWMutex
32+
}
33+
34+
// NewReconciliationCache creates a new instance of ReconciliationCache
35+
func NewReconciliationCache() *ReconciliationCache {
36+
return &ReconciliationCache{
37+
reconciledBuffers: make(map[types.UID]time.Time),
38+
}
39+
}
40+
41+
// Update updates the underlying map with the provided buffers and time.
42+
func (r *ReconciliationCache) Update(buffers []*v1beta1.CapacityBuffer, t time.Time) {
43+
if len(buffers) == 0 {
44+
return
45+
}
46+
r.mutex.Lock()
47+
defer r.mutex.Unlock()
48+
49+
for _, buffer := range buffers {
50+
r.reconciledBuffers[buffer.UID] = t
51+
}
52+
}
53+
54+
// Prune removes entries from the cache that are not present in the provided buffers list.
55+
func (r *ReconciliationCache) Prune(buffers []*v1beta1.CapacityBuffer) {
56+
existingBuffers := sets.New[types.UID]()
57+
for _, buffer := range buffers {
58+
existingBuffers.Insert(buffer.UID)
59+
}
60+
61+
r.mutex.Lock()
62+
defer r.mutex.Unlock()
63+
64+
for uid := range r.reconciledBuffers {
65+
if !existingBuffers.Has(uid) {
66+
delete(r.reconciledBuffers, uid)
67+
}
68+
}
69+
}
70+
71+
// Snapshot creates and returns a copy of the current map.
72+
func (r *ReconciliationCache) Snapshot() map[types.UID]time.Time {
73+
r.mutex.RLock()
74+
defer r.mutex.RUnlock()
75+
76+
snapshot := make(map[types.UID]time.Time, len(r.reconciledBuffers))
77+
for key, value := range r.reconciledBuffers {
78+
snapshot[key] = value
79+
}
80+
81+
return snapshot
82+
}

0 commit comments

Comments
 (0)