• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01904bcc-3b56-4d05-8b48-d9bec7a9b7ea

24 Jun 2024 07:49PM UTC coverage: 71.549% (+0.02%) from 71.533%
01904bcc-3b56-4d05-8b48-d9bec7a9b7ea

push

buildkite

web-flow
Introducing MAPQ: Multi-tenant, Auto-partitioned, Persistent Queue (#6132)

377 of 442 new or added lines in 8 files covered. (85.29%)

67 existing lines in 12 files now uncovered.

107095 of 149680 relevant lines covered (71.55%)

2566.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.78
/common/mapq/tree/queue_tree.go
1
// The MIT License (MIT)
2

3
// Copyright (c) 2017-2020 Uber Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package tree
24

25
import (
26
        "context"
27
        "fmt"
28
        "strings"
29

30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/mapq/types"
33
        "github.com/uber/cadence/common/metrics"
34
)
35

36
// QueueTree is a tree structure that represents the queue structure for MAPQ
37
type QueueTree struct {
38
        originalLogger  log.Logger
39
        logger          log.Logger
40
        scope           metrics.Scope
41
        partitions      []string
42
        policyCol       types.NodePolicyCollection
43
        persister       types.Persister
44
        consumerFactory types.ConsumerFactory
45
        root            *QueueTreeNode
46
}
47

48
func New(
49
        logger log.Logger,
50
        scope metrics.Scope,
51
        partitions []string,
52
        policies []types.NodePolicy,
53
        persister types.Persister,
54
        consumerFactory types.ConsumerFactory,
55
) (*QueueTree, error) {
2✔
56
        t := &QueueTree{
2✔
57
                originalLogger:  logger,
2✔
58
                logger:          logger.WithTags(tag.ComponentMapQTree),
2✔
59
                scope:           scope,
2✔
60
                partitions:      partitions,
2✔
61
                policyCol:       types.NewNodePolicyCollection(policies),
2✔
62
                persister:       persister,
2✔
63
                consumerFactory: consumerFactory,
2✔
64
        }
2✔
65

2✔
66
        return t, t.init()
2✔
67
}
2✔
68

69
// Start the dispatchers for all leaf nodes
70
func (t *QueueTree) Start(ctx context.Context) error {
2✔
71
        t.logger.Info("Starting MAPQ tree", tag.Dynamic("tree", t.String()))
2✔
72
        err := t.root.Start(ctx, t.consumerFactory, nil, map[string]any{})
2✔
73
        if err != nil {
2✔
NEW
74
                return fmt.Errorf("failed to start root node: %w", err)
×
NEW
75
        }
×
76

77
        t.logger.Info("Started MAPQ tree")
2✔
78
        return nil
2✔
79
}
80

81
// Stop the dispatchers for all leaf nodes
82
func (t *QueueTree) Stop(ctx context.Context) error {
2✔
83
        t.logger.Info("Stopping MAPQ tree", tag.Dynamic("tree", t.String()))
2✔
84

2✔
85
        err := t.root.Stop(ctx)
2✔
86
        if err != nil {
2✔
NEW
87
                return fmt.Errorf("failed to stop nodes: %w", err)
×
NEW
88
        }
×
89

90
        t.logger.Info("Stopped MAPQ tree")
2✔
91
        return nil
2✔
92
}
93

94
func (t *QueueTree) String() string {
4✔
95
        var sb strings.Builder
4✔
96
        var nodes []*QueueTreeNode
4✔
97
        nodes = append(nodes, t.root)
4✔
98
        for len(nodes) > 0 {
64✔
99
                node := nodes[0]
60✔
100
                nodes = nodes[1:]
60✔
101
                sb.WriteString(node.String())
60✔
102
                sb.WriteString("\n")
60✔
103
                for _, child := range node.Children {
116✔
104
                        nodes = append(nodes, child)
56✔
105
                }
56✔
106
        }
107

108
        return sb.String()
4✔
109
}
110

111
func (t *QueueTree) Enqueue(ctx context.Context, items []types.Item) ([]types.ItemToPersist, error) {
1✔
112
        if t.root == nil {
1✔
NEW
113
                return nil, fmt.Errorf("root node is nil")
×
NEW
114
        }
×
115

116
        var itemsToPersist []types.ItemToPersist
1✔
117
        for _, item := range items {
10✔
118
                itemToPersist, err := t.root.Enqueue(ctx, item, nil, map[string]any{})
9✔
119
                if err != nil {
9✔
NEW
120
                        return nil, err
×
NEW
121
                }
×
122
                itemsToPersist = append(itemsToPersist, itemToPersist)
9✔
123
        }
124

125
        return itemsToPersist, t.persister.Persist(ctx, itemsToPersist)
1✔
126
}
127

128
func (t *QueueTree) init() error {
2✔
129
        t.root = &QueueTreeNode{
2✔
130
                Path:     "*", // Root node
2✔
131
                Children: map[any]*QueueTreeNode{},
2✔
132
        }
2✔
133

2✔
134
        if err := t.root.Init(t.originalLogger, t.scope, t.policyCol, t.partitions); err != nil {
2✔
NEW
135
                return fmt.Errorf("failed to initialize node: %w", err)
×
NEW
136
        }
×
137

138
        // Create tree nodes with catch-all nodes at all levels and predefined splits.
139
        // There will be len(partitions) levels in the tree.
140
        err := t.constructInitialNodes(t.root)
2✔
141
        if err != nil {
2✔
NEW
142
                return fmt.Errorf("failed to construct initial tree: %w", err)
×
NEW
143
        }
×
144

145
        return nil
2✔
146
}
147

148
func (t *QueueTree) constructInitialNodes(n *QueueTreeNode) error {
30✔
149
        nodeLevel := nodeLevel(n.Path)
30✔
150
        if nodeLevel == len(t.partitions) { // reached the leaf level
44✔
151
                return nil
14✔
152
        }
14✔
153

154
        if n.Children["*"] != nil { // catch-all node already exists
16✔
NEW
155
                return nil
×
NEW
156
        }
×
157

158
        _, err := n.addChild("*", t.policyCol, t.partitions)
16✔
159
        if err != nil {
16✔
NEW
160
                return err
×
NEW
161
        }
×
162

163
        for _, child := range n.Children {
44✔
164
                if err := t.constructInitialNodes(child); err != nil {
28✔
NEW
165
                        return err
×
NEW
166
                }
×
167
        }
168

169
        return nil
16✔
170
}
171

172
func nodeLevel(path string) int {
70✔
173
        return len(strings.Split(path, "/")) - 1
70✔
174
}
70✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc