• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01904bcc-3b56-4d05-8b48-d9bec7a9b7ea

24 Jun 2024 07:49PM UTC coverage: 71.549% (+0.02%) from 71.533%
01904bcc-3b56-4d05-8b48-d9bec7a9b7ea

push

buildkite

web-flow
Introducing MAPQ: Multi-tenant, Auto-partitioned, Persistent Queue (#6132)

377 of 442 new or added lines in 8 files covered. (85.29%)

67 existing lines in 12 files now uncovered.

107095 of 149680 relevant lines covered (71.55%)

2566.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.73
/common/mapq/tree/queue_tree_node.go
1
// The MIT License (MIT)
2

3
// Copyright (c) 2017-2020 Uber Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package tree
24

25
import (
26
        "context"
27
        "fmt"
28

29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/log/tag"
31
        "github.com/uber/cadence/common/mapq/dispatcher"
32
        "github.com/uber/cadence/common/mapq/types"
33
        "github.com/uber/cadence/common/metrics"
34
)
35

36
// QueueTreeNode represents a node in the queue tree
37
type QueueTreeNode struct {
38
        // originalLogger is the logger passed in during creation. No node specific tags are added to this logger and it should be passed to child nodes
39
        originalLogger log.Logger
40

41
        // logger is the logger for this node. It has the node specific tags added to it
42
        logger log.Logger
43
        scope  metrics.Scope
44

45
        // The path to the node
46
        Path string
47

48
        // The partition key used by this node to decide which child to enqueue the item.
49
        // Partition key of a node is the attribute key of child node.
50
        PartitionKey string
51

52
        // The attribute key used to create this node by parent
53
        AttributeKey string
54

55
        // The attribute value used to create this node by parent
56
        AttributeVal any
57

58
        // The policy for this node. It's merged policy from all policies that match this node
59
        NodePolicy types.NodePolicy
60

61
        // Children by attribute key
62
        // "*" is a special key that represents the default/fallback child queue
63
        // If there's no children then the node is considered leaf node
64
        Children map[any]*QueueTreeNode
65

66
        // The dispatcher for this node. Only leaf nodes have dispatcher
67
        Dispatcher *dispatcher.Dispatcher
68
}
69

70
func (n *QueueTreeNode) Start(
71
        ctx context.Context,
72
        consumerFactory types.ConsumerFactory,
73
        partitions []string,
74
        partitionMap map[string]any,
75
) error {
30✔
76
        n.logger.Info("Starting node", tag.Dynamic("node", n.String()))
30✔
77

30✔
78
        // If there are no children then this is a leaf node
30✔
79
        if len(n.Children) == 0 {
44✔
80
                n.logger.Info("Creating consumer and starting a new dispatcher for leaf node")
14✔
81
                c, err := consumerFactory.New(types.NewItemPartitions(partitions, partitionMap))
14✔
82
                if err != nil {
14✔
NEW
83
                        return err
×
NEW
84
                }
×
85
                d := dispatcher.New(c)
14✔
86
                if err := d.Start(ctx); err != nil {
14✔
NEW
87
                        return err
×
NEW
88
                }
×
89
                n.Dispatcher = d
14✔
90
                return nil
14✔
91
        }
92

93
        for _, child := range n.Children {
44✔
94
                partitionMap[n.PartitionKey] = child.AttributeVal
28✔
95
                err := child.Start(ctx, consumerFactory, partitions, partitionMap)
28✔
96
                if err != nil {
28✔
NEW
97
                        return fmt.Errorf("failed to start child %s: %w", child.Path, err)
×
NEW
98
                }
×
99
        }
100

101
        n.logger.Info("Started node")
16✔
102
        return nil
16✔
103
}
104

105
func (n *QueueTreeNode) Stop(ctx context.Context) error {
30✔
106
        n.logger.Info("Stopping node")
30✔
107

30✔
108
        if n.Dispatcher != nil { // leaf node
44✔
109
                return n.Dispatcher.Stop(ctx)
14✔
110
        }
14✔
111

112
        for _, child := range n.Children {
44✔
113
                if err := child.Stop(ctx); err != nil {
28✔
NEW
114
                        return fmt.Errorf("failed to stop child %s: %w", child.Path, err)
×
NEW
115
                }
×
116
        }
117

118
        n.logger.Info("Stopped node")
16✔
119
        return nil
16✔
120
}
121

122
func (n *QueueTreeNode) Enqueue(
123
        ctx context.Context,
124
        item types.Item,
125
        partitions []string,
126
        partitionMap map[string]any,
127
) (types.ItemToPersist, error) {
36✔
128
        // If there are no children then this is a leaf node
36✔
129
        if len(n.Children) == 0 {
45✔
130
                return types.NewItemToPersist(item, types.NewItemPartitions(partitions, partitionMap)), nil
9✔
131
        }
9✔
132

133
        // Add the attribute value to queueNodePathParts
134
        partitionVal := item.GetAttribute(n.PartitionKey)
27✔
135
        partitions = append(partitions, n.PartitionKey)
27✔
136
        partitionMap[n.PartitionKey] = partitionVal
27✔
137

27✔
138
        child, ok := n.Children[partitionVal]
27✔
139
        if !ok {
40✔
140
                // TODO: thread safety missing
13✔
141
                child, ok = n.Children["*"]
13✔
142
                partitionMap[n.PartitionKey] = "*"
13✔
143
                if !ok {
13✔
NEW
144
                        // catch-all nodes are created during initalization so this should never happen
×
NEW
145
                        return nil, fmt.Errorf("no child found for attribute %v in node %v", partitionVal, n.Path)
×
NEW
146
                }
×
147
        }
148

149
        return child.Enqueue(ctx, item, partitions, partitionMap)
27✔
150
}
151

152
func (n *QueueTreeNode) String() string {
90✔
153
        return fmt.Sprintf("QueueTreeNode{Path: %q, AttributeKey: %v, AttributeVal: %v, NodePolicy: %s, Num Children: %d}", n.Path, n.AttributeKey, n.AttributeVal, n.NodePolicy, len(n.Children))
90✔
154
}
90✔
155

156
func (n *QueueTreeNode) Init(logger log.Logger, scope metrics.Scope, policyCol types.NodePolicyCollection, partitions []string) error {
30✔
157
        n.originalLogger = logger
30✔
158
        n.logger = logger.WithTags(tag.ComponentMapQTreeNode, tag.Dynamic("path", n.Path))
30✔
159
        n.scope = scope
30✔
160

30✔
161
        // Get the merged policy for this node
30✔
162
        policy, err := policyCol.GetMergedPolicyForNode(n.Path)
30✔
163
        if err != nil {
30✔
NEW
164
                return err
×
NEW
165
        }
×
166
        n.NodePolicy = policy
30✔
167

30✔
168
        // Set partition key of the node
30✔
169
        nodeLevel := nodeLevel(n.Path)
30✔
170
        if nodeLevel < len(partitions) {
46✔
171
                n.PartitionKey = partitions[nodeLevel]
16✔
172
        }
16✔
173

174
        // Create predefined children nodes
175
        return n.addPredefinedSplits(policyCol, partitions)
30✔
176
}
177

178
func (n *QueueTreeNode) addChild(attrVal any, policyCol types.NodePolicyCollection, partitions []string) (*QueueTreeNode, error) {
28✔
179
        path := fmt.Sprintf("%s/%v", n.Path, attrVal)
28✔
180
        ch := &QueueTreeNode{
28✔
181
                Path:         path,
28✔
182
                AttributeKey: n.PartitionKey,
28✔
183
                AttributeVal: attrVal,
28✔
184
                Children:     map[any]*QueueTreeNode{},
28✔
185
        }
28✔
186

28✔
187
        if err := ch.Init(n.originalLogger, n.scope, policyCol, partitions); err != nil {
28✔
NEW
188
                return nil, err
×
NEW
189
        }
×
190

191
        n.Children[attrVal] = ch
28✔
192
        return ch, nil
28✔
193
}
194

195
func (n *QueueTreeNode) addPredefinedSplits(policyCol types.NodePolicyCollection, partitions []string) error {
30✔
196
        if n.NodePolicy.SplitPolicy == nil || len(n.NodePolicy.SplitPolicy.PredefinedSplits) == 0 {
50✔
197
                return nil
20✔
198
        }
20✔
199

200
        if nodeLevel(n.Path) >= len(partitions) {
10✔
NEW
201
                return fmt.Errorf("predefined split is defined for a leaf level node %s", n.Path)
×
NEW
202
        }
×
203

204
        for _, split := range n.NodePolicy.SplitPolicy.PredefinedSplits {
22✔
205

12✔
206
                _, err := n.addChild(split, policyCol, partitions)
12✔
207
                if err != nil {
12✔
NEW
208
                        return err
×
NEW
209
                }
×
210
        }
211

212
        return nil
10✔
213
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc