• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5228265546

10 Jun 2023 05:08AM UTC coverage: 67.303% (+0.07%) from 67.23%
5228265546

push

web-flow
Merge dba1461bb into ab3769797

16 of 16 new or added lines in 2 files covered. (100.0%)

58428 of 86814 relevant lines covered (67.3%)

2257715.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.16
/worker/server_state.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package worker
18

19
import (
20
        "context"
21
        "math"
22
        "os"
23
        "time"
24

25
        "github.com/golang/glog"
26

27
        "github.com/dgraph-io/badger/v4"
28
        "github.com/dgraph-io/dgraph/protos/pb"
29
        "github.com/dgraph-io/dgraph/raftwal"
30
        "github.com/dgraph-io/dgraph/x"
31
        "github.com/dgraph-io/ristretto/z"
32
)
33

34
const (
35
        // NOTE: SuperFlag defaults must include every possible option that can be used. This way, if a
36
        //       user makes a typo while defining a SuperFlag we can catch it and fail right away rather
37
        //       than fail during runtime while trying to retrieve an option that isn't there.
38
        //
39
        //       For easy readability, keep the options without default values (if any) at the end of
40
        //       the *Defaults string. Also, since these strings are printed in --help text, avoid line
41
        //       breaks.
42
        AuditDefaults  = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;`
43
        BadgerDefaults = `compression=snappy; numgoroutines=8;`
44
        RaftDefaults   = `learner=false; snapshot-after-entries=10000; ` +
45
                `snapshot-after-duration=30m; pending-proposals=256; idx=; group=;`
46
        SecurityDefaults = `token=; whitelist=;`
47
        CDCDefaults      = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` +
48
                `client_key=; sasl-mechanism=PLAIN; tls=false;`
49
        LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` +
50
                `mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` +
51
                ` max-retries=10;max-pending-queries=10000;shared-instance=false`
52
        ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;`
53
        GraphQLDefaults    = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` +
54
                `lambda-url=;`
55
        CacheDefaults        = `size-mb=1024; percentage=0,65,35;`
56
        FeatureFlagsDefaults = `list-in-normalize=true`
57
)
58

59
// ServerState holds the state of the Dgraph server.
60
type ServerState struct {
61
        FinishCh chan struct{} // channel to wait for all pending reqs to finish.
62

63
        Pstore   *badger.DB
64
        WALstore *raftwal.DiskStorage
65
        gcCloser *z.Closer // closer for valueLogGC
66

67
        needTs chan tsReq
68
}
69

70
// State is the instance of ServerState used by the current server.
71
var State ServerState
72

73
// InitServerState initializes this server's state.
74
func InitServerState() {
91✔
75
        Config.validate()
91✔
76

91✔
77
        State.FinishCh = make(chan struct{})
91✔
78
        State.needTs = make(chan tsReq, 100)
91✔
79

91✔
80
        State.initStorage()
91✔
81
        go State.fillTimestampRequests()
91✔
82

91✔
83
        groupId, err := x.ReadGroupIdFile(Config.PostingDir)
91✔
84
        if err != nil {
91✔
85
                glog.Warningf("Could not read %s file inside posting directory %s.", x.GroupIdFileName,
×
86
                        Config.PostingDir)
×
87
        }
×
88
        x.WorkerConfig.ProposedGroupId = groupId
91✔
89
}
90

91
func setBadgerOptions(opt badger.Options) badger.Options {
91✔
92
        opt = opt.WithSyncWrites(false).
91✔
93
                WithLogger(&x.ToGlog{}).
91✔
94
                WithEncryptionKey(x.WorkerConfig.EncryptionKey)
91✔
95

91✔
96
        // Disable conflict detection in badger. Alpha runs in managed mode and
91✔
97
        // perform its own conflict detection so we don't need badger's conflict
91✔
98
        // detection. Using badger's conflict detection uses memory which can be
91✔
99
        // saved by disabling it.
91✔
100
        opt.DetectConflicts = false
91✔
101

91✔
102
        // Settings for the data directory.
91✔
103
        return opt
91✔
104
}
91✔
105

106
func (s *ServerState) initStorage() {
91✔
107
        var err error
91✔
108

91✔
109
        if x.WorkerConfig.EncryptionKey != nil {
113✔
110
                // non-nil key file
22✔
111
                if !EnterpriseEnabled() {
22✔
112
                        // not licensed --> crash.
×
113
                        glog.Fatal("Valid Enterprise License needed for the Encryption feature.")
×
114
                } else {
22✔
115
                        // licensed --> OK.
22✔
116
                        glog.Infof("Encryption feature enabled.")
22✔
117
                }
22✔
118
        }
119

120
        {
91✔
121
                // Write Ahead Log directory
91✔
122
                x.Checkf(os.MkdirAll(Config.WALDir, 0700), "Error while creating WAL dir.")
91✔
123
                s.WALstore, err = raftwal.InitEncrypted(Config.WALDir, x.WorkerConfig.EncryptionKey)
91✔
124
                x.Check(err)
91✔
125
        }
91✔
126
        {
91✔
127
                // Postings directory
91✔
128
                // All the writes to posting store should be synchronous. We use batched writers
91✔
129
                // for posting lists, so the cost of sync writes is amortized.
91✔
130
                x.Check(os.MkdirAll(Config.PostingDir, 0700))
91✔
131
                opt := x.WorkerConfig.Badger.
91✔
132
                        WithDir(Config.PostingDir).WithValueDir(Config.PostingDir).
91✔
133
                        WithNumVersionsToKeep(math.MaxInt32).
91✔
134
                        WithNamespaceOffset(x.NamespaceOffset)
91✔
135
                opt = setBadgerOptions(opt)
91✔
136

91✔
137
                // Print the options w/o exposing key.
91✔
138
                // TODO: Build a stringify interface in Badger options, which is used to print nicely here.
91✔
139
                key := opt.EncryptionKey
91✔
140
                opt.EncryptionKey = nil
91✔
141
                glog.Infof("Opening postings BadgerDB with options: %+v\n", opt)
91✔
142
                opt.EncryptionKey = key
91✔
143

91✔
144
                s.Pstore, err = badger.OpenManaged(opt)
91✔
145
                x.Checkf(err, "Error while creating badger KV posting store")
91✔
146

91✔
147
                // zero out from memory
91✔
148
                opt.EncryptionKey = nil
91✔
149
        }
91✔
150
        // Temp directory
151
        x.Check(os.MkdirAll(x.WorkerConfig.TmpDir, 0700))
91✔
152

91✔
153
        s.gcCloser = z.NewCloser(3)
91✔
154
        go x.RunVlogGC(s.Pstore, s.gcCloser)
91✔
155
        // Commenting this out because Badger is doing its own cache checks.
91✔
156
        go x.MonitorCacheHealth(s.Pstore, s.gcCloser)
91✔
157
        go x.MonitorDiskMetrics("postings_fs", Config.PostingDir, s.gcCloser)
91✔
158
}
159

160
// Dispose stops and closes all the resources inside the server state.
161
func (s *ServerState) Dispose() {
91✔
162
        s.gcCloser.SignalAndWait()
91✔
163
        if err := s.Pstore.Close(); err != nil {
91✔
164
                glog.Errorf("Error while closing postings store: %v", err)
×
165
        }
×
166
        if err := s.WALstore.Close(); err != nil {
91✔
167
                glog.Errorf("Error while closing WAL store: %v", err)
×
168
        }
×
169
}
170

171
func (s *ServerState) GetTimestamp(readOnly bool) uint64 {
65,230✔
172
        tr := tsReq{readOnly: readOnly, ch: make(chan uint64)}
65,230✔
173
        s.needTs <- tr
65,230✔
174
        return <-tr.ch
65,230✔
175
}
65,230✔
176

177
func (s *ServerState) fillTimestampRequests() {
91✔
178
        const (
91✔
179
                initDelay = 10 * time.Millisecond
91✔
180
                maxDelay  = time.Second
91✔
181
        )
91✔
182

91✔
183
        defer func() {
182✔
184
                glog.Infoln("Exiting fillTimestampRequests")
91✔
185
        }()
91✔
186

187
        var reqs []tsReq
91✔
188
        for {
64,425✔
189
                // Reset variables.
64,334✔
190
                reqs = reqs[:0]
64,334✔
191
                delay := initDelay
64,334✔
192

64,334✔
193
                select {
64,334✔
194
                case <-s.gcCloser.HasBeenClosed():
91✔
195
                        return
91✔
196
                case req := <-s.needTs:
64,243✔
197
                slurpLoop:
64,243✔
198
                        for {
129,472✔
199
                                reqs = append(reqs, req)
65,229✔
200
                                select {
65,229✔
201
                                case req = <-s.needTs:
986✔
202
                                default:
64,243✔
203
                                        break slurpLoop
64,243✔
204
                                }
205
                        }
206
                }
207

208
                // Generate the request.
209
                num := &pb.Num{}
64,243✔
210
                for _, r := range reqs {
129,472✔
211
                        if r.readOnly {
67,630✔
212
                                num.ReadOnly = true
2,401✔
213
                        } else {
65,229✔
214
                                num.Val++
62,828✔
215
                        }
62,828✔
216
                }
217

218
                // Execute the request with infinite retries.
219
        retry:
220
                if s.gcCloser.Ctx().Err() != nil {
64,243✔
221
                        return
×
222
                }
×
223
                ctx, cancel := context.WithTimeout(s.gcCloser.Ctx(), 10*time.Second)
64,243✔
224
                ts, err := Timestamps(ctx, num)
64,243✔
225
                cancel()
64,243✔
226
                if err != nil {
64,243✔
227
                        glog.Warningf("Error while retrieving timestamps: %v with delay: %v."+
×
228
                                " Will retry...\n", err, delay)
×
229
                        time.Sleep(delay)
×
230
                        delay *= 2
×
231
                        if delay > maxDelay {
×
232
                                delay = maxDelay
×
233
                        }
×
234
                        goto retry
×
235
                }
236
                var offset uint64
64,243✔
237
                for _, req := range reqs {
129,472✔
238
                        if req.readOnly {
67,630✔
239
                                req.ch <- ts.ReadOnly
2,401✔
240
                        } else {
65,229✔
241
                                req.ch <- ts.StartId + offset
62,828✔
242
                                offset++
62,828✔
243
                        }
62,828✔
244
                }
245
                x.AssertTrue(ts.StartId == 0 || ts.StartId+offset-1 == ts.EndId)
64,243✔
246
        }
247
}
248

249
type tsReq struct {
250
        readOnly bool
251
        // A one-shot chan which we can send a txn timestamp upon.
252
        ch chan uint64
253
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc