• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e43c7-e893-4c40-a63b-442d9dceb207

15 Mar 2024 08:22PM UTC coverage: 64.882% (-0.01%) from 64.892%
018e43c7-e893-4c40-a63b-442d9dceb207

push

buildkite

web-flow
Global ratelimiter, part 1: core algorithm for computing weights (#5689)

A high level overview is covered in `common/quotas/global/doc.go`, and this specific piece is further covered in the `./algorithm/requestweighted.go` file.

At a very high level though, this is an isolated piece of a "deployment-wide load-balance-aware ratelimiter", intended to solve problems with our internal clusters with our current high-level frontend ratelimiters around client-triggered domain actions (start workflow, poll, etc).

This is the "logical core" of the whole system, with as few dependencies and concerns as I can manage.  All frontend hosts that will be imposing limits eventually send the update arguments to an aggregating host, and the aggregating host largely just delegates to a single instance of this algorithm.  Essentially all the aggregating host needs to do to respond is multiply the weight by each configured ratelimit (from dynamic config).

I've left the concurrency fairly coarse, both for simplicity and for computational speed. I don't believe this will be a bottleneck in even our largest cluster, but even if it is it should be fairly trivial to split each batch's ratelimit keys into N different sharded instances, which can be completely independent.

# The problem

All of our current ratelimiters currently fall into two conceptual buckets:
1. they enforce N rps on that in-memory instance
2. they enforce N rps on that in-memory instance _divided by the number of hosts in a ring_

While these are very straightforward and have served us well, they're proving inadequate in our internal clusters due to imbalanced load on our frontend instances.  Sometimes _wildly_ imbalanced, e.g. 10% of hosts receiving _all_ traffic for a domain.

What this leads to is a lower _effective_ ratelimit than is intended, e.g.:
- 100 rps is allowed
- 10 hosts exist (so each host allows 10 rps)
- 1 host receives 100 rps, the others receive 0
- the requests are... (continued)

191 of 191 new or added lines in 1 file covered. (100.0%)

102 existing lines in 20 files now uncovered.

94707 of 145968 relevant lines covered (64.88%)

2377.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.49
/common/persistence/sql/sqlplugin/mysql/db.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package mysql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "time"
27

28
        "github.com/VividCortex/mysqlerr"
29
        "github.com/go-sql-driver/mysql"
30
        "github.com/jmoiron/sqlx"
31

32
        "github.com/uber/cadence/common/persistence/sql/sqldriver"
33
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
34
)
35

36
type (
37
        db struct {
38
                converter   DataConverter
39
                driver      sqldriver.Driver
40
                originalDBs []*sqlx.DB
41
                numDBShards int
42
        }
43
)
44

45
func (mdb *db) GetTotalNumDBShards() int {
32,079✔
46
        return mdb.numDBShards
32,079✔
47
}
32,079✔
48

49
var _ sqlplugin.AdminDB = (*db)(nil)
50
var _ sqlplugin.DB = (*db)(nil)
51
var _ sqlplugin.Tx = (*db)(nil)
52

53
func (mdb *db) IsDupEntryError(err error) bool {
12✔
54
        sqlErr, ok := err.(*mysql.MySQLError)
12✔
55
        // ErrDupEntry MySQL Error 1062 indicates a duplicate primary key i.e. the row already exists,
12✔
56
        // so we don't do the insert and return a ConditionalUpdate error.
12✔
57
        return ok && sqlErr.Number == mysqlerr.ER_DUP_ENTRY
12✔
58
}
12✔
59

60
func (mdb *db) IsNotFoundError(err error) bool {
777✔
61
        return err == sql.ErrNoRows
777✔
62
}
777✔
63

64
func (mdb *db) IsTimeoutError(err error) bool {
597✔
65
        if err == context.DeadlineExceeded {
597✔
66
                return true
×
67
        }
×
68
        sqlErr, ok := err.(*mysql.MySQLError)
597✔
69
        if ok {
662✔
70
                if sqlErr.Number == mysqlerr.ER_NET_READ_INTERRUPTED ||
65✔
71
                        sqlErr.Number == mysqlerr.ER_NET_WRITE_INTERRUPTED ||
65✔
72
                        sqlErr.Number == mysqlerr.ER_LOCK_WAIT_TIMEOUT ||
65✔
73
                        sqlErr.Number == mysqlerr.ER_XA_RBTIMEOUT ||
65✔
74
                        sqlErr.Number == mysqlerr.ER_QUERY_TIMEOUT ||
65✔
75
                        sqlErr.Number == mysqlerr.ER_LOCKING_SERVICE_TIMEOUT ||
65✔
76
                        sqlErr.Number == mysqlerr.ER_REGEXP_TIME_OUT {
65✔
77
                        return true
×
78
                }
×
79
        }
80
        return false
597✔
81
}
82

83
func (mdb *db) IsThrottlingError(err error) bool {
597✔
84
        sqlErr, ok := err.(*mysql.MySQLError)
597✔
85
        if ok {
662✔
86
                if sqlErr.Number == mysqlerr.ER_CON_COUNT_ERROR ||
65✔
87
                        sqlErr.Number == mysqlerr.ER_TOO_MANY_USER_CONNECTIONS ||
65✔
88
                        sqlErr.Number == mysqlerr.ER_TOO_MANY_CONCURRENT_TRXS ||
65✔
89
                        sqlErr.Number == mysqlerr.ER_CLONE_TOO_MANY_CONCURRENT_CLONES {
65✔
90
                        return true
×
91
                }
×
92
        }
93
        return false
597✔
94
}
95

96
// newDB returns an instance of DB, which is a logical
97
// connection to the underlying mysql database
98
// dbShardID is needed when tx is not nil
99
func newDB(xdbs []*sqlx.DB, tx *sqlx.Tx, dbShardID int, numDBShards int) (*db, error) {
6,103✔
100
        driver, err := sqldriver.NewDriver(xdbs, tx, dbShardID)
6,103✔
101
        if err != nil {
6,103✔
102
                return nil, err
×
103
        }
×
104

105
        db := &db{
6,103✔
106
                converter:   &converter{},
6,103✔
107
                originalDBs: xdbs, // this is kept because newDB will be called again when starting a transaction
6,103✔
108
                driver:      driver,
6,103✔
109
                numDBShards: numDBShards,
6,103✔
110
        }
6,103✔
111

6,103✔
112
        return db, nil
6,103✔
113
}
114

115
// BeginTx starts a new transaction and returns a reference to the Tx object
116
func (mdb *db) BeginTx(ctx context.Context, dbShardID int) (sqlplugin.Tx, error) {
6,005✔
117
        xtx, err := mdb.driver.BeginTxx(ctx, dbShardID, nil)
6,005✔
118
        if err != nil {
6,005✔
UNCOV
119
                return nil, err
×
UNCOV
120
        }
×
121
        return newDB(mdb.originalDBs, xtx, dbShardID, mdb.numDBShards)
6,005✔
122
}
123

124
// Commit commits a previously started transaction
125
func (mdb *db) Commit() error {
5,838✔
126
        return mdb.driver.Commit()
5,838✔
127
}
5,838✔
128

129
// Rollback triggers rollback of a previously started transaction
130
func (mdb *db) Rollback() error {
168✔
131
        return mdb.driver.Rollback()
168✔
132
}
168✔
133

134
// Close closes the connection to the mysql db
135
func (mdb *db) Close() error {
99✔
136
        return mdb.driver.Close()
99✔
137
}
99✔
138

139
// PluginName returns the name of the mysql plugin
140
func (mdb *db) PluginName() string {
1,922✔
141
        return PluginName
1,922✔
142
}
1,922✔
143

144
// SupportsTTL returns weather MySQL supports TTL
145
func (mdb *db) SupportsTTL() bool {
3,389✔
146
        return false
3,389✔
147
}
3,389✔
148

149
// MaxAllowedTTL returns the max allowed ttl MySQL supports
150
func (mdb *db) MaxAllowedTTL() (*time.Duration, error) {
×
151
        return nil, sqlplugin.ErrTTLNotSupported
×
152
}
×
153

154
// SupportsTTL returns weather MySQL supports Asynchronous transaction
155
func (mdb *db) SupportsAsyncTransaction() bool {
×
156
        return false
×
157
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc