• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

turbonomic / kubeturbo / #607930339

15 Aug 2023 02:55AM UTC coverage: 41.636%. First build
#607930339

Pull #922

travis-ci

Pull Request #922: TRB-43347 Disable scaling for eks spot and windows instance

15 of 15 new or added lines in 1 file covered. (100.0%)

7353 of 17660 relevant lines covered (41.64%)

47297.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.2
/pkg/discovery/dtofactory/node_entity_dto_builder.go
1
package dtofactory
2

3
import (
4
        "fmt"
5
        "math"
6
        "strings"
7

8
        "github.com/golang/glog"
9

10
        api "k8s.io/api/core/v1"
11
        "k8s.io/apimachinery/pkg/util/sets"
12
        utilfeature "k8s.io/apiserver/pkg/util/feature"
13

14
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/dtofactory/property"
15
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/metrics"
16
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/repository"
17
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/stitching"
18
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/util"
19
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/worker/compliance/podaffinity"
20
        "github.ibm.com/turbonomic/kubeturbo/pkg/features"
21
        sdkbuilder "github.ibm.com/turbonomic/turbo-go-sdk/pkg/builder"
22
        "github.ibm.com/turbonomic/turbo-go-sdk/pkg/proto"
23
)
24

25
const (
26
        accessCommodityDefaultCapacity  = 1e10
27
        clusterCommodityDefaultCapacity = 1e10
28
        labelCommodityDefaultCapacity   = 1e10
29
)
30

31
var (
32
        nodeResourceCommoditiesSold = []metrics.ResourceType{
33
                metrics.CPU,
34
                metrics.Memory,
35
                metrics.CPURequest,
36
                metrics.MemoryRequest,
37
                metrics.NumPods,
38
                // TODO, add back provisioned commodity later
39
        }
40

41
        allocationResourceCommoditiesSold = []metrics.ResourceType{
42
                metrics.CPULimitQuota,
43
                metrics.MemoryLimitQuota,
44
                metrics.CPURequestQuota,
45
                metrics.MemoryRequestQuota,
46
        }
47

48
        // List of commodities and a boolean indicating if the commodity should be resized
49
        resizableCommodities = map[proto.CommodityDTO_CommodityType]bool{
50
                proto.CommodityDTO_VCPU:         false,
51
                proto.CommodityDTO_VMEM:         false,
52
                proto.CommodityDTO_VCPU_REQUEST: false,
53
                proto.CommodityDTO_VMEM_REQUEST: false,
54
        }
55
)
56

57
type nodeEntityDTOBuilder struct {
58
        generalBuilder
59
        stitchingManager   *stitching.StitchingManager
60
        clusterKeyInjected string
61
        affinityMapper     *podaffinity.AffinityMapper
2,003✔
62
}
2,003✔
63

2,003✔
64
func NewNodeEntityDTOBuilder(sink *metrics.EntityMetricSink, stitchingManager *stitching.StitchingManager,
2,003✔
65
        clusterSummary *repository.ClusterSummary, affinityMapper *podaffinity.AffinityMapper) *nodeEntityDTOBuilder {
2,003✔
66
        return &nodeEntityDTOBuilder{
2,003✔
67
                generalBuilder:   newGeneralBuilder(sink, clusterSummary),
68
                stitchingManager: stitchingManager,
2,001✔
69
                affinityMapper:   affinityMapper,
2,001✔
70
        }
2,001✔
71
}
2,001✔
72

73
func (builder *nodeEntityDTOBuilder) WithClusterKeyInjected(clusterKeyInjected string) *nodeEntityDTOBuilder {
74
        builder.clusterKeyInjected = clusterKeyInjected
75
        return builder
2,002✔
76
}
2,002✔
77

2,002✔
78
// BuildEntityDTOs builds entityDTOs based on the given node list.
2,002✔
79
func (builder *nodeEntityDTOBuilder) BuildEntityDTOs(nodes []*api.Node, nodesPods map[string][]string,
2,002✔
80
        hostnameSpreadPods, hostnameSpreadWorkloads, otherSpreadPods sets.String,
4,003✔
81
        podsToControllers map[string]string, node2nodegroup map[string]sets.String) ([]*proto.EntityDTO, []string) {
2,001✔
82
        var result []*proto.EntityDTO
2,001✔
83
        var notReadyNodes []string
84

2✔
85
        clusterId := builder.clusterSummary.Name
1✔
86
        for _, node := range nodes {
1✔
87
                // id.
1✔
88
                nodeID := string(node.UID)
1✔
89
                entityDTOBuilder := sdkbuilder.NewEntityDTOBuilder(proto.EntityDTO_VIRTUAL_MACHINE, nodeID)
1✔
90

1✔
91
                // display name.
1✔
92
                displayName := node.Name
1✔
93
                entityDTOBuilder.DisplayName(displayName)
2✔
94
                nodeActive := util.NodeIsReady(node)
1✔
95
                if !nodeActive {
1✔
96
                        glog.Warningf("Node %s is in NotReady status.", node.Name)
97
                }
98

1✔
99
                // compute and constraint commodities sold.
2✔
100
                commoditiesSold, isAvailableForPlacement, err := builder.getNodeCommoditiesSold(node, clusterId)
1✔
101
                if err != nil {
1✔
102
                        glog.Errorf("Error when create commoditiesSold for %s: %s", node.Name, err)
1✔
103
                        nodeActive = false
104
                }
1✔
105
                // allocation commodities sold
2✔
106
                allocationCommoditiesSold, err := builder.getAllocationCommoditiesSold(node)
1✔
107
                if err != nil {
1✔
108
                        glog.Errorf("Error when creating allocation commoditiesSold for %s: %s", node.Name, err)
1✔
109
                        nodeActive = false
110
                }
1✔
111

1✔
112
                commoditiesSold = append(commoditiesSold, allocationCommoditiesSold...)
1✔
113
                // affinity commodities sold
2✔
114
                var affinityCommoditiesSold []*proto.CommodityDTO
1✔
115
                if utilfeature.DefaultFeatureGate.Enabled(features.NewAffinityProcessing) {
1✔
116
                        affinityCommoditiesSold = builder.getAffinityLabelAndSegmentationComms(node,
1✔
117
                                nodesPods, hostnameSpreadPods, hostnameSpreadWorkloads, podsToControllers)
1✔
118
                        commoditiesSold = append(commoditiesSold, affinityCommoditiesSold...)
1✔
119
                }
1✔
120
                entityDTOBuilder.SellsCommodities(commoditiesSold)
1✔
121

1✔
122
                // entities' properties.
1✔
123
                properties, err := builder.getNodeProperties(node)
×
124
                if err != nil {
×
125
                        glog.Errorf("Failed to get node properties: %s", err)
×
126
                        nodeActive = false
8✔
127
                }
7✔
128
                entityDTOBuilder = entityDTOBuilder.WithProperties(properties)
7✔
129

7✔
130
                // reconciliation meta data
7✔
131
                metaData, err := builder.stitchingManager.GenerateReconciliationMetaData()
×
132
                if err != nil {
×
133
                        glog.Errorf("Failed to build reconciling metadata for node %s: %s", displayName, err)
×
134
                        nodeActive = false
×
135
                }
×
136
                entityDTOBuilder = entityDTOBuilder.ReplacedBy(metaData)
×
137

×
138
                // Check whether we have used cache
×
139
                nodeKey := util.NodeKeyFunc(node)
×
140
                cacheUsedMetric := metrics.GenerateEntityStateMetricUID(metrics.NodeType, nodeKey, "NodeCacheUsed")
×
141
                present, _ := builder.metricsSink.GetMetric(cacheUsedMetric)
142
                if present != nil {
143
                        glog.Errorf("We have used the cached data, so the node %s appeared to be flaky", displayName)
1✔
144
                        nodeActive = false
1✔
145
                }
1✔
146

1✔
147
                controllable := util.NodeIsControllable(node)
1✔
148
                entityDTOBuilder = entityDTOBuilder.ConsumerPolicy(&proto.EntityDTO_ConsumerPolicy{
×
149
                        Controllable: &controllable,
×
150
                })
×
151

1✔
152
                // Action settings for a node
1✔
153
                // Allow suspend for all nodes except those marked as HA via kubeturbo config
1✔
154
                isHANode := util.DetectHARole(node)
1✔
155
                entityDTOBuilder.IsSuspendable(!isHANode)
1✔
156

1✔
157
                disableSuspendProvision, nodeType := getSuspendProvisionSettingByNodeType(properties)
1✔
158
                if disableSuspendProvision {
×
159
                        glog.V(2).Infof("Suspend and provision is disabled for node %s, it is a %s", node.GetName(), nodeType)
×
160
                        entityDTOBuilder.IsProvisionable(false)
×
161
                        entityDTOBuilder.IsSuspendable(false)
162
                }
1✔
163

1✔
164
                if utilfeature.DefaultFeatureGate.Enabled(features.KwokClusterTest) {
1✔
165
                        // we force the node status as active, even if we did not find metrics for it
1✔
166
                        nodeActive = true
1✔
167
                }
1✔
168

1✔
169
                if !nodeActive {
1✔
170
                        glog.Warningf("Node %s has NotReady status or has issues accessing kubelet.", node.GetName())
1✔
171
                        notReadyNodes = append(notReadyNodes, nodeID)
1✔
172
                        entityDTOBuilder.IsSuspendable(false)
2✔
173
                        entityDTOBuilder.IsProvisionable(false)
1✔
174
                        clusterCommodityKey := fmt.Sprintf("Node-%v-NotReady", nodeID)
1✔
175
                        clusterComm, err := sdkbuilder.NewCommodityDTOBuilder(proto.CommodityDTO_CLUSTER).
1✔
176
                                Key(clusterCommodityKey).
1✔
177
                                Used(1).
1✔
178
                                Create()
1✔
179
                        if err == nil {
1✔
180
                                provider := sdkbuilder.CreateProvider(proto.EntityDTO_CONTAINER_PLATFORM_CLUSTER, clusterId)
1✔
181
                                entityDTOBuilder.
1✔
182
                                        Provider(provider).
2✔
183
                                        BuysCommodity(clusterComm)
1✔
184
                        }
1✔
185
                }
1✔
186

1✔
187
                // Get CPU capacity in cores.
1✔
188
                cpuMetricValue, err := builder.metricValue(metrics.NodeType, nodeKey, metrics.CPU, metrics.Capacity, nil)
189
                if err != nil {
190
                        glog.Errorf("Failed to get number of CPU in cores for VM %s: %v", nodeKey, err)
191
                        continue
1✔
192
                }
1✔
193
                cpuCores := int32(math.Round(util.MetricMilliToUnit(cpuMetricValue.Avg)))
×
194
                vmdata := &proto.EntityDTO_VirtualMachineData{
×
195
                        IpAddress: getNodeIPs(node),
196
                        // Set numCPUs in cores.
1✔
197
                        NumCpus: &cpuCores,
1✔
198
                }
1✔
199
                entityDTOBuilder = entityDTOBuilder.VirtualMachineData(vmdata)
1✔
200

1✔
201
                // also set up the aggregatedBy relationship with the cluster
1✔
202
                entityDTOBuilder.AggregatedBy(clusterId)
1✔
203

1✔
204
                // build entityDTO.
1✔
205
                entityDto, err := entityDTOBuilder.Create()
1✔
206
                if err != nil {
1✔
207
                        glog.Errorf("Failed to build VM entityDTO: %s", err)
1✔
208
                        continue
1✔
209
                }
1✔
210

×
211
                if !isAvailableForPlacement {
×
212
                        glog.Warningf("Node %s has been marked unavailable for placement due to disk pressure.", node.GetName())
213
                }
214
                nodeSchedulable := nodeActive && util.NodeIsSchedulable(node)
1✔
215
                if !nodeSchedulable {
×
216
                        glog.Warningf("Node %s has been marked unavailable for placement because its Unschedulable.", node.GetName())
×
217
                }
1✔
218
                isAvailableForPlacement = isAvailableForPlacement && nodeSchedulable
2✔
219
                entityDto.ProviderPolicy = &proto.EntityDTO_ProviderPolicy{AvailableForPlacement: &isAvailableForPlacement}
1✔
220

1✔
221
                // Connect node to nodegroup
1✔
222
                // node2nodegroups will be empty if SegmentationBasedTopologySpread is not enabled
1✔
223
                if nodegrps, ok := node2nodegroup[nodeID]; ok {
1✔
224
                        for nodegrp := range nodegrps {
1✔
225
                                connectedEntityID := nodegrp
1✔
226
                                connectedEntityType := proto.ConnectedEntity_NORMAL_CONNECTION
1✔
227
                                entityDto.ConnectedEntities = append(entityDto.ConnectedEntities, &proto.ConnectedEntity{
228
                                        ConnectedEntityId: &connectedEntityID,
229
                                        ConnectionType:    &connectedEntityType,
1✔
230
                                })
231
                        }
232
                }
233

234
                result = append(result, entityDto)
235
                glog.V(4).Infof("Node DTO : %+v", entityDto)
1✔
236
        }
1✔
237

1✔
238
        return result, notReadyNodes
1✔
239
}
1✔
240

2✔
241
func (builder *nodeEntityDTOBuilder) buildAffinityCommodity(key string, used float64) (*proto.CommodityDTO, error) {
1✔
242
        commodityDTO, err := sdkbuilder.NewCommodityDTOBuilder(proto.CommodityDTO_PEER_TO_PEER_AFFINITY).
1✔
243
                Key(key).
244
                Used(used).
×
245
                Capacity(affinityCommodityDefaultCapacity).
×
246
                Create()
×
247
        if err != nil {
×
248
                glog.Errorf("Failed to build commodity sold %s: %v", proto.CommodityDTO_PEER_TO_PEER_AFFINITY, err)
×
249
                return nil, err
×
250
        }
251
        return commodityDTO, err
252
}
253

×
254
func (builder *nodeEntityDTOBuilder) buildAffinityCommodities(node *api.Node) ([]*proto.CommodityDTO, error) {
×
255
        commoditiesSold := []*proto.CommodityDTO{}
×
256
        soldAffinityKeys := sets.Set[string]{}
×
257

×
258
        // Generate commodities for workloads for which the node is currently a provider
×
259
        if providerMapping, exists := builder.affinityMapper.GetProviderMapping(node.Name); exists {
×
260
                for key := range providerMapping.Keys.AffinityKeys {
×
261
                        used := 0.0
×
262
                        if podList, exists := providerMapping.KeyCounts[podaffinity.MappingKey{CommodityKey: key, MappingType: podaffinity.AffinitySrc}]; exists {
263
                                used += float64(podList.Len()) * podaffinity.NONE
×
264
                        }
×
265
                        if podList, exists := providerMapping.KeyCounts[podaffinity.MappingKey{CommodityKey: key, MappingType: podaffinity.AffinityDst}]; exists {
×
266
                                used += float64(podList.Len())
×
267
                        }
×
268
                        affinityComm, err := builder.buildAffinityCommodity(key, used)
×
269
                        if err != nil {
×
270
                                return nil, err
×
271
                        }
×
272
                        commoditiesSold = append(commoditiesSold, affinityComm)
×
273
                        soldAffinityKeys.Insert(key)
×
274
                }
×
275
        }
×
276

×
277
        // Add commodities for workloads for which this node is not a provider
278
        soldKeys := builder.affinityMapper.GetNodeSoldKeys()
279
        for key := range soldKeys.AffinityKeys {
280
                if soldAffinityKeys.Has(key) {
×
281
                        continue
×
282
                }
×
283
                commodity, err := builder.buildAffinityCommodity(key, 0)
×
284
                if err != nil {
×
285
                        return nil, err
×
286
                }
×
287
                commoditiesSold = append(commoditiesSold, commodity)
×
288
        }
×
289

×
290
        return commoditiesSold, nil
×
291
}
×
292

×
293
func (builder *nodeEntityDTOBuilder) buildAntiAffinityCommodity(key string, used float64, peak float64) (*proto.CommodityDTO, error) {
×
294
        commodityDTO, err := sdkbuilder.NewCommodityDTOBuilder(proto.CommodityDTO_PEER_TO_PEER_ANTI_AFFINITY).
×
295
                Key(key).
×
296
                Used(used).
×
297
                Peak(peak).
×
298
                Capacity(affinityCommodityDefaultCapacity).
299
                Create()
300
        if err != nil {
301
                glog.Errorf("Failed to build commodity sold %s: %v", proto.CommodityDTO_PEER_TO_PEER_ANTI_AFFINITY, err)
302
                return nil, err
303
        }
304
        return commodityDTO, nil
305
}
×
306

×
307
func (builder *nodeEntityDTOBuilder) buildAntiAffinityCommodities(node *api.Node) ([]*proto.CommodityDTO, error) {
×
308
        commoditiesSold := []*proto.CommodityDTO{}
×
309
        soldAntiAffinityKeys := sets.Set[string]{}
×
310

×
311
        // Generate commodities for workloads for which the node is currently a provider
×
312
        if providerMapping, exists := builder.affinityMapper.GetProviderMapping(node.Name); exists {
×
313
                for key := range providerMapping.Keys.AntiAffinityKeys {
×
314
                        used := 0.0
×
315
                        peak := 0.0
×
316
                        if podList, exists := providerMapping.KeyCounts[podaffinity.MappingKey{CommodityKey: key, MappingType: podaffinity.AntiAffinitySrc}]; exists {
×
317
                                used += float64(podList.Len())
×
318
                                peak += float64(podList.Len()) * podaffinity.NONE
×
319
                        }
×
320
                        if podList, exists := providerMapping.KeyCounts[podaffinity.MappingKey{CommodityKey: key, MappingType: podaffinity.AntiAffinityDst}]; exists {
×
321
                                used += float64(podList.Len()) * podaffinity.NONE
×
322
                                peak += float64(podList.Len())
×
323
                        }
×
324
                        antiAffinityComm, err := builder.buildAntiAffinityCommodity(key, used, peak)
×
325
                        if err != nil {
×
326
                                return nil, err
×
327
                        }
×
328
                        commoditiesSold = append(commoditiesSold, antiAffinityComm)
329
                        soldAntiAffinityKeys.Insert(key)
330
                }
×
331
        }
×
332

×
333
        // Add commodities for workloads for which this node is not a provider
×
334
        soldKeys := builder.affinityMapper.GetNodeSoldKeys()
×
335
        for key := range soldKeys.AntiAffinityKeys {
336
                if soldAntiAffinityKeys.Has(key) {
×
337
                        continue
×
338
                }
×
339
                commodity, err := builder.buildAntiAffinityCommodity(key, 0, 0)
×
340
                if err != nil {
×
341
                        return nil, err
×
342
                }
×
343
                commoditiesSold = append(commoditiesSold, commodity)
×
344
        }
×
345

×
346
        return commoditiesSold, nil
×
347
}
×
348

×
349
// Build the sold commodityDTO by each node. They include:
×
350
// VCPU, VMem, CPURequest, MemRequest;
351
// VMPMAccessCommodity, ApplicationCommodity, ClusterCommodity.
352
func (builder *nodeEntityDTOBuilder) getNodeCommoditiesSold(node *api.Node, clusterId string) ([]*proto.CommodityDTO, bool, error) {
353
        var commoditiesSold []*proto.CommodityDTO
×
354
        // get cpu frequency
×
355
        key := util.NodeKeyFunc(node)
×
356
        cpuFrequency, err := builder.getNodeCPUFrequency(key)
×
357
        if err != nil {
×
358
                return nil, true, fmt.Errorf("failed to get cpu frequency from sink for node %s: %s", key, err)
359
        }
360
        // cpu and cpu request needs to be converted from number of millicores to frequency.
×
361
        converter := NewConverter().Set(
×
362
                func(input float64) float64 {
×
363
                        // All cpu metrics are stored in millicores in metrics sink for consistency
×
364
                        // But we send the node cpu metrics in MHz.
×
365
                        return util.MetricMilliToUnit(input) * cpuFrequency
×
366
                },
×
367
                metrics.CPU)
×
368

×
369
        // Resource Commodities
×
370
        resourceCommoditiesSold := builder.getResourceCommoditiesSold(metrics.NodeType, key, nodeResourceCommoditiesSold, converter, nil)
×
371
        storageCommoditiesSold, isAvailableForPlacement := builder.getNodeStorageCommoditiesSold(node.Name)
×
372
        resourceCommoditiesSold = append(resourceCommoditiesSold, storageCommoditiesSold...)
×
373

×
374
        // Disable vertical resize of the resource commodities for all nodes
×
375
        for _, commSold := range resourceCommoditiesSold {
×
376
                if isResizeable, exists := resizableCommodities[commSold.GetCommodityType()]; exists {
377
                        commSold.Resizable = &isResizeable
378
                }
379
        }
380
        commoditiesSold = append(commoditiesSold, resourceCommoditiesSold...)
381

382
        // Label commodities
×
383
        for key, value := range node.ObjectMeta.Labels {
×
384
                label := key + "=" + value
×
385
                labelComm, err := sdkbuilder.NewCommodityDTOBuilder(proto.CommodityDTO_LABEL).
×
386
                        Key(label).
×
387
                        Capacity(labelCommodityDefaultCapacity).
×
388
                        Create()
×
389
                if err != nil {
390
                        return nil, isAvailableForPlacement, err
391
                }
×
392
                glog.V(5).Infof("Adding label commodity for Node %s with key : %s", node.Name, label)
393
                commoditiesSold = append(commoditiesSold, labelComm)
394
        }
×
395

396
        // Cluster commodity.
397
        var clusterCommKey string
2,002✔
398
        if len(strings.TrimSpace(builder.clusterKeyInjected)) != 0 {
2,002✔
399
                clusterCommKey = builder.clusterKeyInjected
2,002✔
400
                glog.V(4).Infof("Injecting cluster key for Node %s with key : %s", node.Name, clusterCommKey)
4,003✔
401
        } else {
2,001✔
402
                clusterCommKey = clusterId
2,001✔
403
                glog.V(4).Infof("Adding cluster key for Node %s with key : %s", node.Name, clusterCommKey)
2,001✔
404
        }
1✔
405
        clusterComm, err := sdkbuilder.NewCommodityDTOBuilder(proto.CommodityDTO_CLUSTER).
1✔
406
                Key(clusterCommKey).
×
407
                Capacity(clusterCommodityDefaultCapacity).
×
408
                Create()
×
409
        if err != nil {
1✔
410
                return nil, isAvailableForPlacement, err
411
        }
412
        commoditiesSold = append(commoditiesSold, clusterComm)
1✔
413

1✔
414
        affinityCommodities, err := builder.buildAffinityCommodities(node)
1✔
415
        if err != nil {
1✔
416
                return nil, isAvailableForPlacement, err
1✔
417
        }
2✔
418
        commoditiesSold = append(commoditiesSold, affinityCommodities...)
1✔
419

1✔
420
        antiAffinityCommodities, err := builder.buildAntiAffinityCommodities(node)
421
        if err != nil {
×
422
                return nil, isAvailableForPlacement, err
×
423
        }
×
424
        commoditiesSold = append(commoditiesSold, antiAffinityCommodities...)
×
425

426
        return commoditiesSold, isAvailableForPlacement, nil
427
}
428

×
429
// getNodeStorageCommoditiesSold builds sold storage commodities for the node
×
430
// Returns the built commodities and if this node is available for placement or not.
×
431
// The availability for placement is evaluated based on the current usage crossing the
×
432
// configured threshold. If the usage has crossed the threshold, we mark the node
×
433
// NOT available for placement.
×
434
func (builder *nodeEntityDTOBuilder) getNodeStorageCommoditiesSold(nodeName string) ([]*proto.CommodityDTO, bool) {
×
435
        var resourceCommoditiesSold []*proto.CommodityDTO
436
        vstorageResources := []string{"rootfs", "imagefs"}
437
        var rootfsCapacityBytes, rootfsAvailableBytes float64
×
438
        entityType := metrics.NodeType
×
439
        resourceType := metrics.VStorage
440
        protoCommodityType := proto.CommodityDTO_VSTORAGE
441
        isAvailableForPlacement := true
442
        previousThreshold := float64(0)
2✔
443

2✔
444
        for _, resource := range vstorageResources {
2✔
445
                entityID := nodeName
2✔
446
                if resource == "imagefs" {
2✔
447
                        entityID = fmt.Sprintf("%s-%s", nodeName, resource)
5✔
448
                }
3✔
449
                commSoldBuilder := sdkbuilder.NewCommodityDTOBuilder(protoCommodityType)
6✔
450

3✔
451
                // set capacity value
3✔
452
                capacityBytes, err := builder.metricValue(metrics.NodeType, entityID,
3✔
453
                        metrics.VStorage, metrics.Capacity, nil)
3✔
454
                if err != nil || (capacityBytes.Avg == 0 && capacityBytes.Peak == 0) {
3✔
455
                        if utilfeature.DefaultFeatureGate.Enabled(features.KwokClusterTest) {
3✔
456
                                capacityBytes = metrics.MetricValue{Avg: 100000, Peak: 100000}
3✔
457
                        } else {
×
458
                                glog.Warningf("Missing capacity value for %v : %s for node %s.", resourceType, resource, nodeName)
×
459
                                // If we are missing capacity its unlikely we would have other metrics either
×
460
                                continue
461
                        }
3✔
462
                }
463

464
                if resource == "rootfs" {
2✔
465
                        // We iterate the vstorageResources slice in order so the rootfs values
×
466
                        // are always preserved in the first pass of this loop.
×
467
                        rootfsCapacityBytes = capacityBytes.Avg
×
468
                }
×
469
                // Capacity metric is always a single data point. Use Avg to refer to the single point value
×
470
                commSoldBuilder.Capacity(util.Base2BytesToMegabytes(capacityBytes.Avg))
×
471

×
472
                usedBytes := float64(0)
×
473
                availableBytes, err := builder.metricValue(metrics.NodeType, entityID,
474
                        metrics.VStorage, metrics.Available, nil)
×
475
                if err != nil {
476
                        glog.Warningf("Missing used value for %v : %s for node %s.", resourceType, resource, nodeName)
477
                } else {
2✔
478
                        if resource == "rootfs" {
479
                                rootfsAvailableBytes = availableBytes.Avg
480
                        }
481
                        usedBytes = capacityBytes.Avg - availableBytes.Avg
1✔
482
                        commSoldBuilder.Used(util.Base2BytesToMegabytes(usedBytes))
1✔
483
                        commSoldBuilder.Peak(util.Base2BytesToMegabytes(usedBytes))
1✔
484
                }
1✔
485

1✔
486
                // set commodity key
1✔
487
                commSoldBuilder.Key(fmt.Sprintf("k8s-node-%s", resource))
1✔
488
                resourceCommoditySold, err := commSoldBuilder.Create()
×
489
                if err != nil {
×
490
                        glog.Warning(err.Error())
1✔
491
                        continue
1✔
492
                }
1✔
493

1✔
494
                threshold, err := builder.metricValue(entityType, entityID,
1✔
495
                        resourceType, metrics.Threshold, nil)
1✔
496
                if err != nil {
1✔
497
                        glog.Warningf("Missing threshold value for %v for node %s.", resourceType, nodeName)
498
                } else {
499

2✔
500
                        if threshold.Avg > 0 && threshold.Avg <= 100 {
2✔
501
                                if resource == "rootfs" {
2✔
502
                                        previousThreshold = threshold.Avg
2✔
503
                                }
6✔
504
                                isAvailableAboveThreshold := availableBytes.Avg > threshold.Avg*capacityBytes.Avg/100
4✔
505
                                isAvailableForPlacement = isAvailableAboveThreshold
4✔
506
                                utilizationThreshold := 100 - threshold.Avg
2✔
507
                                // TODO: The settable method for UtilizationThresholdPct can be added to the sdk instead.
508
                                resourceCommoditySold.UtilizationThresholdPct = &utilizationThreshold
509
                        } else {
510
                                glog.Warningf("Threshold value [%.2f] outside range and will not be set for %v : %s for node %s.",
511
                                        threshold.Avg, resourceType, resource, nodeName)
512
                        }
513
                }
514

515
                // We currently have no way of knowing the command line configuration of kubelet
516
                // to understand if there is a separate imagefs partition configured. We use the workaround
517
                // comparing the reported capacity and available bytes, to the last byte, for both
518
                // rootfs and imagefs to determine if we are getting the reported values for the same partition.
519
                isPartitionSame := resource == "imagefs" && rootfsCapacityBytes == capacityBytes.Avg && rootfsAvailableBytes == availableBytes.Avg
520
                if isPartitionSame {
521
                        // We skip adding imagefs commodity, however we still honor the thresholds set for imagefs
522
                        // which can be different compared to rootfs, even when the partitions are same.
523
                        // isAvailableForPlacement is still calculated for both above and would be set to false
524
                        // if either of the values cross threshold.
525
                        if threshold.Avg > previousThreshold {
526
                                prevResourceCommoditySold := resourceCommoditiesSold[len(resourceCommoditiesSold)-1]
527
                                utilThreshold := 100 - threshold.Avg
528
                                prevResourceCommoditySold.UtilizationThresholdPct = &utilThreshold
529
                        } else {
530
                                continue
531
                        }
532
                } else {
533
                        resourceCommoditiesSold = append(resourceCommoditiesSold, resourceCommoditySold)
534
                }
535
        }
536

537
        return resourceCommoditiesSold, isAvailableForPlacement
538
}
539

540
func (builder *nodeEntityDTOBuilder) getAllocationCommoditiesSold(node *api.Node) ([]*proto.CommodityDTO, error) {
541
        var commoditiesSold []*proto.CommodityDTO
542
        // get cpu frequency
543
        key := util.NodeKeyFunc(node)
544
        cpuFrequency, err := builder.getNodeCPUFrequency(key)
545
        if err != nil {
546
                return nil, fmt.Errorf("failed to get cpu frequency from sink for node %s: %s", key, err)
547
        }
548
        // cpuLimitQuota and cpuRequestQuota needs to be converted from number of cores to frequency.
549
        converter := NewConverter().Set(
550
                func(input float64) float64 {
551
                        return input * cpuFrequency
552
                },
553
                metrics.CPULimitQuota, metrics.CPURequestQuota)
554

555
        // Resource Commodities
556
        var resourceCommoditiesSold []*proto.CommodityDTO
557
        for _, resourceType := range allocationResourceCommoditiesSold {
558
                commSold, _ := builder.getSoldResourceCommodityWithKey(metrics.NodeType, key, resourceType, string(node.UID),
559
                        converter, nil)
560
                if commSold != nil {
561
                        resourceCommoditiesSold = append(resourceCommoditiesSold, commSold)
562
                }
563
        }
564

565
        commoditiesSold = append(commoditiesSold, resourceCommoditiesSold...)
566
        return commoditiesSold, nil
567
}
568

569
func (builder *nodeEntityDTOBuilder) getAffinityLabelAndSegmentationComms(node *api.Node, nodesPods map[string][]string,
570
        hostnameSpreadPods, hostnameSpreadWorkloads sets.String, podsToControllers map[string]string) []*proto.CommodityDTO {
571
        var commoditiesSold []*proto.CommodityDTO = nil
572
        // Add label commodities to honor affinities
573
        // This adds LABEL commodities sold for each pod that can be placed on this node
574
        // This also adds SEGMENTATION commodities for spread workload pods
575
        for _, key := range nodesPods[node.Name] {
576
                // We rely on what shows up in nodespods to add LABELs, because that is supposed to be
577
                // filled appropriately, taking care of all feature flags and relevant conditions
578
                controllerId, exists := podsToControllers[key]
579
                if exists {
580
                        // we use parent controller id as the key if we have one
581
                        // if not the pod probably is a standalone pod
582
                        key = controllerId
583
                }
584
                commodityDTO, err := sdkbuilder.NewCommodityDTOBuilder(proto.CommodityDTO_LABEL).
585
                        Key(key).
586
                        Capacity(accessCommodityDefaultCapacity).
587
                        Create()
588
                if err != nil {
589
                        glog.Warningf("Error creating LABEL sold commodity for key %s on node %s", key, node.Name)
590
                        // We ignore a failure and continue to add the rest
591
                        continue
592
                }
593
                commoditiesSold = append(commoditiesSold, commodityDTO)
594
        }
595

596
        segmentationCommodityUsage := make(map[string]float64)
597
        allPodsOnThisNode := getAllPodsOnNode(node, builder.clusterSummary)
598
        for _, podKey := range hostnameSpreadPods.UnsortedList() {
599
                used := 0.0
600
                if allPodsOnThisNode.Has(podKey) {
601
                        used = 1.0
602
                }
603
                if workloadKey, exists := builder.clusterSummary.PodToControllerMap[podKey]; exists {
604
                        if _, exists := segmentationCommodityUsage[workloadKey]; !exists {
605
                                segmentationCommodityUsage[workloadKey] = 0.0
606
                        }
607
                        segmentationCommodityUsage[workloadKey] = segmentationCommodityUsage[workloadKey] + used
608
                }
609
        }
610

611
        for _, workloadKey := range hostnameSpreadWorkloads.UnsortedList() {
612
                used := segmentationCommodityUsage[workloadKey]
613
                commodityDTO, err := sdkbuilder.NewCommodityDTOBuilder(proto.CommodityDTO_SEGMENTATION).
614
                        Key(workloadKey).
615
                        Capacity(1).
616
                        Used(used).
617
                        Create()
618
                if err != nil {
619
                        glog.Warningf("Error creating SEGMENTATION sold commodity for key %s on node %s", workloadKey, node.Name)
620
                        // We ignore a failure and continue to add the rest
621
                        continue
622
                }
623
                commoditiesSold = append(commoditiesSold, commodityDTO)
624

625
        }
626

627
        return commoditiesSold
628
}
629

630
// Get the properties of the node. This includes property related to stitching process and node cluster property.
631
func (builder *nodeEntityDTOBuilder) getNodeProperties(node *api.Node) ([]*proto.EntityDTO_EntityProperty, error) {
632
        var properties []*proto.EntityDTO_EntityProperty
633

634
        // stitching property.
635
        isForReconcile := true
636
        stitchingProperty, err := builder.stitchingManager.BuildDTOProperty(node.Name, isForReconcile)
637
        if err != nil {
638
                return nil, fmt.Errorf("failed to build properties for node %s: %s", node.Name, err)
639
        }
640
        glog.V(4).Infof("Node %s will be reconciled with VM with %s: %s", node.Name, *stitchingProperty.Name,
641
                *stitchingProperty.Value)
642
        properties = append(properties, stitchingProperty)
643

644
        // additional node info properties.
645
        properties = append(properties, property.BuildNodeProperties(node, builder.clusterSummary.Name)...)
646
        return properties, nil
647
}
648

649
func getNodeIPs(node *api.Node) []string {
650
        result := []string{}
651

652
        addrs := node.Status.Addresses
653
        for i := range addrs {
654
                result = append(result, addrs[i].Address)
655
        }
656
        return result
657
}
658

659
func getSuspendProvisionSettingByNodeType(properties []*proto.EntityDTO_EntityProperty) (disableSuspendProvision bool, nodeType string) {
660
        disableSuspendProvision = false
661
        for _, property := range properties {
662
                spot := strings.Contains(property.GetName(), util.EKSCapacityType) && property.GetValue() == util.EKSSpot
663
                windows := strings.Contains(property.GetName(), util.NodeLabelOS) && property.GetValue() == util.WindowsOS
664
                if spot || windows {
665
                        if spot {
666
                                nodeType = "AWS EC2 spot instance"
667
                        } else if windows {
668
                                nodeType = "node with Windows OS"
669
                        }
670
                        disableSuspendProvision = true
671
                        return
672
                }
673
        }
674
        return
675
}
676

677
// getAllWorkloadsOnNode retrieves the controller names for all pods on a node.
678
// It takes a node and a ClusterSummary as input and returns a set of controller names.
679
func getAllWorkloadsOnNode(node *api.Node, clusterSummary *repository.ClusterSummary) sets.String {
680
        controllerNames := sets.NewString()
681
        allPods := append(clusterSummary.GetPendingPodsOnNode(node), clusterSummary.GetRunningPodsOnNode(node)...)
682

683
        for _, pod := range allPods {
684
                podQualifiedName := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
685
                if ctrlName, exists := clusterSummary.PodToControllerMap[podQualifiedName]; exists {
686
                        controllerNames.Insert(ctrlName)
687
                }
688
        }
689
        return controllerNames
690
}
691

692
func getAllPodsOnNode(node *api.Node, clusterSummary *repository.ClusterSummary) sets.String {
693
        podNames := sets.NewString()
694
        allPods := append(clusterSummary.GetPendingPodsOnNode(node), clusterSummary.GetRunningPodsOnNode(node)...)
695
        for _, pod := range allPods {
696
                podNames.Insert(pod.Namespace + "/" + pod.Name)
697
        }
698
        return podNames
699
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc