• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5228265546

10 Jun 2023 05:08AM UTC coverage: 67.303% (+0.07%) from 67.23%
5228265546

push

web-flow
Merge dba1461bb into ab3769797

16 of 16 new or added lines in 2 files covered. (100.0%)

58428 of 86814 relevant lines covered (67.3%)

2257715.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.4
/dgraph/cmd/alpha/run.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package alpha
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "fmt"
24
        "log"
25
        "math"
26
        "net"
27
        "net/http"
28
        _ "net/http/pprof" // http profiler
29
        "net/url"
30
        "os"
31
        "os/signal"
32
        "strconv"
33
        "strings"
34
        "sync/atomic"
35
        "syscall"
36
        "time"
37

38
        "github.com/golang/glog"
39
        "github.com/pkg/errors"
40
        "github.com/spf13/cobra"
41
        "go.opencensus.io/plugin/ocgrpc"
42
        otrace "go.opencensus.io/trace"
43
        "go.opencensus.io/zpages"
44
        "golang.org/x/net/trace"
45
        "google.golang.org/grpc"
46
        "google.golang.org/grpc/credentials"
47
        _ "google.golang.org/grpc/encoding/gzip" // grpc compression
48
        "google.golang.org/grpc/health"
49
        hapi "google.golang.org/grpc/health/grpc_health_v1"
50

51
        "github.com/dgraph-io/badger/v4"
52
        "github.com/dgraph-io/dgo/v230/protos/api"
53
        "github.com/dgraph-io/dgraph/edgraph"
54
        "github.com/dgraph-io/dgraph/ee"
55
        "github.com/dgraph-io/dgraph/ee/audit"
56
        "github.com/dgraph-io/dgraph/ee/enc"
57
        "github.com/dgraph-io/dgraph/graphql/admin"
58
        "github.com/dgraph-io/dgraph/posting"
59
        "github.com/dgraph-io/dgraph/schema"
60
        "github.com/dgraph-io/dgraph/tok"
61
        "github.com/dgraph-io/dgraph/worker"
62
        "github.com/dgraph-io/dgraph/x"
63
        _ "github.com/dgraph-io/gqlparser/v2/validator/rules" // make gql validator init() all rules
64
        "github.com/dgraph-io/ristretto/z"
65
)
66

67
var (
68
        bindall bool
69

70
        // used for computing uptime
71
        startTime = time.Now()
72

73
        // Alpha is the sub-command invoked when running "dgraph alpha".
74
        Alpha x.SubCommand
75

76
        // need this here to refer it in admin_backup.go
77
        adminServer admin.IServeGraphQL
78
        initDone    uint32
79
)
80

81
func init() {
159✔
82
        Alpha.Cmd = &cobra.Command{
159✔
83
                Use:   "alpha",
159✔
84
                Short: "Run Dgraph Alpha database server",
159✔
85
                Long: `
159✔
86
A Dgraph Alpha instance stores the data. Each Dgraph Alpha is responsible for
159✔
87
storing and serving one data group. If multiple Alphas serve the same group,
159✔
88
they form a Raft group and provide synchronous replication.
159✔
89
`,
159✔
90
                Run: func(cmd *cobra.Command, args []string) {
250✔
91
                        defer x.StartProfile(Alpha.Conf).Stop()
91✔
92
                        run()
91✔
93
                },
91✔
94
                Annotations: map[string]string{"group": "core"},
95
        }
96
        Alpha.EnvPrefix = "DGRAPH_ALPHA"
159✔
97
        Alpha.Cmd.SetHelpTemplate(x.NonRootTemplate)
159✔
98

159✔
99
        // If you change any of the flags below, you must also update run() to call Alpha.Conf.Get
159✔
100
        // with the flag name so that the values are picked up by Cobra/Viper's various config inputs
159✔
101
        // (e.g, config file, env vars, cli flags, etc.)
159✔
102
        flag := Alpha.Cmd.Flags()
159✔
103

159✔
104
        // common
159✔
105
        x.FillCommonFlags(flag)
159✔
106
        // --tls SuperFlag
159✔
107
        x.RegisterServerTLSFlags(flag)
159✔
108
        // --encryption and --vault Superflag
159✔
109
        ee.RegisterAclAndEncFlags(flag)
159✔
110

159✔
111
        flag.StringP("postings", "p", "p", "Directory to store posting lists.")
159✔
112
        flag.String("tmp", "t", "Directory to store temporary buffers.")
159✔
113

159✔
114
        flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.")
159✔
115
        flag.String("export", "export", "Folder in which to store exports.")
159✔
116
        flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
159✔
117
                "Comma separated list of Dgraph Zero addresses of the form IP_ADDRESS:PORT.")
159✔
118

159✔
119
        // Useful for running multiple servers on the same machine.
159✔
120
        flag.IntP("port_offset", "o", 0,
159✔
121
                "Value added to all listening port numbers. [Internal=7080, HTTP=8080, Grpc=9080]")
159✔
122

159✔
123
        // Custom plugins.
159✔
124
        flag.String("custom_tokenizers", "",
159✔
125
                "Comma separated list of tokenizer plugins for custom indices.")
159✔
126

159✔
127
        // By default Go GRPC traces all requests.
159✔
128
        grpc.EnableTracing = false
159✔
129

159✔
130
        flag.String("badger", worker.BadgerDefaults, z.NewSuperFlagHelp(worker.BadgerDefaults).
159✔
131
                Head("Badger options (Refer to badger documentation for all possible options)").
159✔
132
                Flag("compression",
159✔
133
                        `[none, zstd:level, snappy] Specifies the compression algorithm and
159✔
134
                        compression level (if applicable) for the postings directory."none" would disable
159✔
135
                        compression, while "zstd:1" would set zstd compression at level 1.`).
159✔
136
                Flag("numgoroutines",
159✔
137
                        "The number of goroutines to use in badger.Stream.").
159✔
138
                String())
159✔
139

159✔
140
        // Cache flags.
159✔
141
        flag.String("cache", worker.CacheDefaults, z.NewSuperFlagHelp(worker.CacheDefaults).
159✔
142
                Head("Cache options").
159✔
143
                Flag("size-mb",
159✔
144
                        "Total size of cache (in MB) to be used in Dgraph.").
159✔
145
                Flag("percentage",
159✔
146
                        "Cache percentages summing up to 100 for various caches (FORMAT: PostingListCache,"+
159✔
147
                                "PstoreBlockCache,PstoreIndexCache)").
159✔
148
                String())
159✔
149

159✔
150
        flag.String("raft", worker.RaftDefaults, z.NewSuperFlagHelp(worker.RaftDefaults).
159✔
151
                Head("Raft options").
159✔
152
                Flag("idx",
159✔
153
                        "Provides an optional Raft ID that this Alpha would use to join Raft groups.").
159✔
154
                Flag("group",
159✔
155
                        "Provides an optional Raft Group ID that this Alpha would indicate to Zero to join.").
159✔
156
                Flag("learner",
159✔
157
                        `Make this Alpha a "learner" node. In learner mode, this Alpha will not participate `+
159✔
158
                                "in Raft elections. This can be used to achieve a read-only replica.").
159✔
159
                Flag("snapshot-after-entries",
159✔
160
                        "Create a new Raft snapshot after N number of Raft entries. The lower this number, "+
159✔
161
                                "the more frequent snapshot creation will be. Snapshots are created only if both "+
159✔
162
                                "snapshot-after-duration and snapshot-after-entries threshold are crossed.").
159✔
163
                Flag("snapshot-after-duration",
159✔
164
                        "Frequency at which we should create a new raft snapshots. Set "+
159✔
165
                                "to 0 to disable duration based snapshot.").
159✔
166
                Flag("pending-proposals",
159✔
167
                        "Number of pending mutation proposals. Useful for rate limiting.").
159✔
168
                String())
159✔
169

159✔
170
        flag.String("security", worker.SecurityDefaults, z.NewSuperFlagHelp(worker.SecurityDefaults).
159✔
171
                Head("Security options").
159✔
172
                Flag("token",
159✔
173
                        "If set, all Admin requests to Dgraph will need to have this token. The token can be "+
159✔
174
                                "passed as follows: for HTTP requests, in the X-Dgraph-AuthToken header. For Grpc, "+
159✔
175
                                "in auth-token key in the context.").
159✔
176
                Flag("whitelist",
159✔
177
                        "A comma separated list of IP addresses, IP ranges, CIDR blocks, or hostnames you wish "+
159✔
178
                                "to whitelist for performing admin actions (i.e., --security "+
159✔
179
                                `"whitelist=144.142.126.254,127.0.0.1:127.0.0.3,192.168.0.0/16,host.docker.`+
159✔
180
                                `internal").`).
159✔
181
                String())
159✔
182

159✔
183
        flag.String("limit", worker.LimitDefaults, z.NewSuperFlagHelp(worker.LimitDefaults).
159✔
184
                Head("Limit options").
159✔
185
                Flag("query-edge",
159✔
186
                        "The maximum number of edges that can be returned in a query. This applies to shortest "+
159✔
187
                                "path and recursive queries.").
159✔
188
                Flag("normalize-node",
159✔
189
                        "The maximum number of nodes that can be returned in a query that uses the normalize "+
159✔
190
                                "directive.").
159✔
191
                Flag("mutations",
159✔
192
                        "[allow, disallow, strict] The mutations mode to use.").
159✔
193
                Flag("mutations-nquad",
159✔
194
                        "The maximum number of nquads that can be inserted in a mutation request.").
159✔
195
                Flag("disallow-drop",
159✔
196
                        "Set disallow-drop to true to block drop-all and drop-data operation. It still"+
159✔
197
                                " allows dropping attributes and types.").
159✔
198
                Flag("max-pending-queries",
159✔
199
                        "Number of maximum pending queries before we reject them as too many requests.").
159✔
200
                Flag("query-timeout",
159✔
201
                        "Maximum time after which a query execution will fail. If set to"+
159✔
202
                                " 0, the timeout is infinite.").
159✔
203
                Flag("max-retries",
159✔
204
                        "Commits to disk will give up after these number of retries to prevent locking the "+
159✔
205
                                "worker in a failed state. Use -1 to retry infinitely.").
159✔
206
                Flag("txn-abort-after", "Abort any pending transactions older than this duration."+
159✔
207
                        " The liveness of a transaction is determined by its last mutation.").
159✔
208
                Flag("shared-instance", "When set to true, it disables ACLs for non-galaxy users. "+
159✔
209
                        "It expects the access JWT to be constructed outside dgraph for non-galaxy users as "+
159✔
210
                        "login is denied to them. Additionally, this disables access to environment variables for minio, aws, etc.").
159✔
211
                String())
159✔
212

159✔
213
        flag.String("graphql", worker.GraphQLDefaults, z.NewSuperFlagHelp(worker.GraphQLDefaults).
159✔
214
                Head("GraphQL options").
159✔
215
                Flag("introspection",
159✔
216
                        "Enables GraphQL schema introspection.").
159✔
217
                Flag("debug",
159✔
218
                        "Enables debug mode in GraphQL. This returns auth errors to clients, and we do not "+
159✔
219
                                "recommend turning it on for production.").
159✔
220
                Flag("extensions",
159✔
221
                        "Enables extensions in GraphQL response body.").
159✔
222
                Flag("poll-interval",
159✔
223
                        "The polling interval for GraphQL subscription.").
159✔
224
                Flag("lambda-url",
159✔
225
                        "The URL of a lambda server that implements custom GraphQL Javascript resolvers.").
159✔
226
                String())
159✔
227

159✔
228
        flag.String("cdc", worker.CDCDefaults, z.NewSuperFlagHelp(worker.CDCDefaults).
159✔
229
                Head("Change Data Capture options").
159✔
230
                Flag("file",
159✔
231
                        "The path where audit logs will be stored.").
159✔
232
                Flag("kafka",
159✔
233
                        "A comma separated list of Kafka hosts.").
159✔
234
                Flag("sasl-user",
159✔
235
                        "The SASL username for Kafka.").
159✔
236
                Flag("sasl-password",
159✔
237
                        "The SASL password for Kafka.").
159✔
238
                Flag("sasl-mechanism",
159✔
239
                        "The SASL mechanism for Kafka (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512)").
159✔
240
                Flag("ca-cert",
159✔
241
                        "The path to CA cert file for TLS encryption.").
159✔
242
                Flag("client-cert",
159✔
243
                        "The path to client cert file for TLS encryption.").
159✔
244
                Flag("client-key",
159✔
245
                        "The path to client key file for TLS encryption.").
159✔
246
                String())
159✔
247

159✔
248
        flag.String("audit", worker.AuditDefaults, z.NewSuperFlagHelp(worker.AuditDefaults).
159✔
249
                Head("Audit options").
159✔
250
                Flag("output",
159✔
251
                        `[stdout, /path/to/dir] This specifies where audit logs should be output to.
159✔
252
                        "stdout" is for standard output. You can also specify the directory where audit logs
159✔
253
                        will be saved. When stdout is specified as output other fields will be ignored.`).
159✔
254
                Flag("compress",
159✔
255
                        "Enables the compression of old audit logs.").
159✔
256
                Flag("encrypt-file",
159✔
257
                        "The path to the key file to be used for audit log encryption.").
159✔
258
                Flag("days",
159✔
259
                        "The number of days audit logs will be preserved.").
159✔
260
                Flag("size",
159✔
261
                        "The audit log max size in MB after which it will be rolled over.").
159✔
262
                String())
159✔
263

159✔
264
        flag.String("feature-flags", worker.FeatureFlagsDefaults, z.NewSuperFlagHelp(worker.FeatureFlagsDefaults).
159✔
265
                Head("Feature flags to enable various experimental features").
159✔
266
                Flag("list-in-normalize", "enables returning a list when there are multiple fields with same alias, "+
159✔
267
                        "see here for more details https://github.com/dgraph-io/dgraph/pull/7639").
159✔
268
                String())
159✔
269
}
270

271
func setupCustomTokenizers() {
91✔
272
        customTokenizers := Alpha.Conf.GetString("custom_tokenizers")
91✔
273
        if customTokenizers == "" {
181✔
274
                return
90✔
275
        }
90✔
276
        for _, soFile := range strings.Split(customTokenizers, ",") {
5✔
277
                tok.LoadCustomTokenizer(soFile)
4✔
278
        }
4✔
279
}
280

281
// Parses a comma-delimited list of IP addresses, IP ranges, CIDR blocks, or hostnames
282
// and returns a slice of []IPRange.
283
//
284
// e.g. "144.142.126.222:144.142.126.244,144.142.126.254,192.168.0.0/16,host.docker.internal"
285
func getIPsFromString(str string) ([]x.IPRange, error) {
104✔
286
        if str == "" {
105✔
287
                return []x.IPRange{}, nil
1✔
288
        }
1✔
289

290
        var ipRanges []x.IPRange
103✔
291
        rangeStrings := strings.Split(str, ",")
103✔
292

103✔
293
        for _, s := range rangeStrings {
391✔
294
                isIPv6 := strings.Contains(s, "::")
288✔
295
                tuple := strings.Split(s, ":")
288✔
296
                switch {
288✔
297
                case isIPv6 || len(tuple) == 1:
282✔
298
                        if !strings.Contains(s, "/") {
287✔
299
                                // string is hostname like host.docker.internal,
5✔
300
                                // or IPv4 address like 144.124.126.254,
5✔
301
                                // or IPv6 address like fd03:b188:0f3c:9ec4::babe:face
5✔
302
                                ipAddr := net.ParseIP(s)
5✔
303
                                if ipAddr != nil {
8✔
304
                                        ipRanges = append(ipRanges, x.IPRange{Lower: ipAddr, Upper: ipAddr})
3✔
305
                                } else {
5✔
306
                                        ipAddrs, err := net.LookupIP(s)
2✔
307
                                        if err != nil {
2✔
308
                                                return nil, errors.Errorf("invalid IP address or hostname: %s", s)
×
309
                                        }
×
310

311
                                        for _, addr := range ipAddrs {
6✔
312
                                                ipRanges = append(ipRanges, x.IPRange{Lower: addr, Upper: addr})
4✔
313
                                        }
4✔
314
                                }
315
                        } else {
277✔
316
                                // string is CIDR block like 192.168.0.0/16 or fd03:b188:0f3c:9ec4::/64
277✔
317
                                rangeLo, network, err := net.ParseCIDR(s)
277✔
318
                                if err != nil {
278✔
319
                                        return nil, errors.Errorf("invalid CIDR block: %s", s)
1✔
320
                                }
1✔
321

322
                                addrLen, maskLen := len(rangeLo), len(network.Mask)
276✔
323
                                rangeHi := make(net.IP, len(rangeLo))
276✔
324
                                copy(rangeHi, rangeLo)
276✔
325
                                for i := 1; i <= maskLen; i++ {
1,392✔
326
                                        rangeHi[addrLen-i] |= ^network.Mask[maskLen-i]
1,116✔
327
                                }
1,116✔
328

329
                                ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi})
276✔
330
                        }
331
                case len(tuple) == 2:
5✔
332
                        // string is range like a.b.c.d:w.x.y.z
5✔
333
                        rangeLo := net.ParseIP(tuple[0])
5✔
334
                        rangeHi := net.ParseIP(tuple[1])
5✔
335
                        switch {
5✔
336
                        case rangeLo == nil:
2✔
337
                                return nil, errors.Errorf("invalid IP address: %s", tuple[0])
2✔
338
                        case rangeHi == nil:
1✔
339
                                return nil, errors.Errorf("invalid IP address: %s", tuple[1])
1✔
340
                        case bytes.Compare(rangeLo, rangeHi) > 0:
×
341
                                return nil, errors.Errorf("inverted IP address range: %s", s)
×
342
                        }
343
                        ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi})
2✔
344
                default:
1✔
345
                        return nil, errors.Errorf("invalid IP address range: %s", s)
1✔
346
                }
347
        }
348

349
        return ipRanges, nil
98✔
350
}
351

352
func httpPort() int {
273✔
353
        return x.Config.PortOffset + x.PortHTTP
273✔
354
}
273✔
355

356
func grpcPort() int {
182✔
357
        return x.Config.PortOffset + x.PortGrpc
182✔
358
}
182✔
359

360
func healthCheck(w http.ResponseWriter, r *http.Request) {
335✔
361
        x.AddCorsHeaders(w)
335✔
362
        var err error
335✔
363

335✔
364
        if _, ok := r.URL.Query()["all"]; ok {
337✔
365
                w.Header().Set("Content-Type", "application/json")
2✔
366
                w.WriteHeader(http.StatusOK)
2✔
367

2✔
368
                ctx := x.AttachAccessJwt(context.Background(), r)
2✔
369
                var resp *api.Response
2✔
370
                if resp, err = (&edgraph.Server{}).Health(ctx, true); err != nil {
2✔
371
                        x.SetStatus(w, x.Error, err.Error())
×
372
                        return
×
373
                }
×
374
                if resp == nil {
2✔
375
                        x.SetStatus(w, x.ErrorNoData, "No health information available.")
×
376
                        return
×
377
                }
×
378
                _, _ = w.Write(resp.Json)
2✔
379
                return
2✔
380
        }
381

382
        _, ok := r.URL.Query()["live"]
333✔
383
        if !ok {
666✔
384
                if err := x.HealthCheck(); err != nil {
541✔
385
                        w.WriteHeader(http.StatusServiceUnavailable)
208✔
386
                        _, err = w.Write([]byte(err.Error()))
208✔
387
                        if err != nil {
208✔
388
                                glog.V(2).Infof("Error while writing health check response: %v", err)
×
389
                        }
×
390
                        return
208✔
391
                }
392
        }
393

394
        var resp *api.Response
125✔
395
        if resp, err = (&edgraph.Server{}).Health(context.Background(), false); err != nil {
125✔
396
                x.SetStatus(w, x.Error, err.Error())
×
397
                return
×
398
        }
×
399
        if resp == nil {
125✔
400
                x.SetStatus(w, x.ErrorNoData, "No health information available.")
×
401
                return
×
402
        }
×
403
        w.Header().Set("Content-Type", "application/json")
125✔
404
        w.WriteHeader(http.StatusOK)
125✔
405
        _, _ = w.Write(resp.Json)
125✔
406
}
407

408
func stateHandler(w http.ResponseWriter, r *http.Request) {
2✔
409
        var err error
2✔
410
        x.AddCorsHeaders(w)
2✔
411
        w.Header().Set("Content-Type", "application/json")
2✔
412

2✔
413
        ctx := context.Background()
2✔
414
        ctx = x.AttachAccessJwt(ctx, r)
2✔
415

2✔
416
        var aResp *api.Response
2✔
417
        if aResp, err = (&edgraph.Server{}).State(ctx); err != nil {
2✔
418
                x.SetStatus(w, x.Error, err.Error())
×
419
                return
×
420
        }
×
421
        if aResp == nil {
2✔
422
                x.SetStatus(w, x.ErrorNoData, "No state information available.")
×
423
                return
×
424
        }
×
425

426
        if _, err = w.Write(aResp.Json); err != nil {
2✔
427
                x.SetStatus(w, x.Error, err.Error())
×
428
                return
×
429
        }
×
430
}
431

432
// storeStatsHandler outputs some basic stats for data store.
433
func storeStatsHandler(w http.ResponseWriter, r *http.Request) {
×
434
        x.AddCorsHeaders(w)
×
435
        w.Header().Set("Content-Type", "text/html")
×
436
        x.Check2(w.Write([]byte("<pre>")))
×
437
        x.Check2(w.Write([]byte(worker.StoreStats())))
×
438
        x.Check2(w.Write([]byte("</pre>")))
×
439
}
×
440

441
func setupListener(addr string, port int) (net.Listener, error) {
182✔
442
        return net.Listen("tcp", fmt.Sprintf("%s:%d", addr, port))
182✔
443
}
182✔
444

445
func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
91✔
446
        defer closer.Done()
91✔
447

91✔
448
        x.RegisterExporters(Alpha.Conf, "dgraph.alpha")
91✔
449

91✔
450
        opt := []grpc.ServerOption{
91✔
451
                grpc.MaxRecvMsgSize(x.GrpcMaxSize),
91✔
452
                grpc.MaxSendMsgSize(x.GrpcMaxSize),
91✔
453
                grpc.MaxConcurrentStreams(1000),
91✔
454
                grpc.StatsHandler(&ocgrpc.ServerHandler{}),
91✔
455
                grpc.UnaryInterceptor(audit.AuditRequestGRPC),
91✔
456
        }
91✔
457
        if tlsCfg != nil {
115✔
458
                opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg)))
24✔
459
        }
24✔
460

461
        s := grpc.NewServer(opt...)
91✔
462
        api.RegisterDgraphServer(s, &edgraph.Server{})
91✔
463
        hapi.RegisterHealthServer(s, health.NewServer())
91✔
464
        worker.RegisterZeroProxyServer(s)
91✔
465

91✔
466
        err := s.Serve(l)
91✔
467
        glog.Errorf("GRPC listener canceled: %v\n", err)
91✔
468
        s.Stop()
91✔
469
}
470

471
func setupServer(closer *z.Closer) {
91✔
472
        go worker.RunServer(bindall) // For pb.communication.
91✔
473

91✔
474
        laddr := "localhost"
91✔
475
        if bindall {
182✔
476
                laddr = "0.0.0.0"
91✔
477
        }
91✔
478

479
        tlsCfg, err := x.LoadServerTLSConfig(Alpha.Conf)
91✔
480
        if err != nil {
91✔
481
                log.Fatalf("Failed to setup TLS: %v\n", err)
×
482
        }
×
483

484
        httpListener, err := setupListener(laddr, httpPort())
91✔
485
        if err != nil {
91✔
486
                log.Fatal(err)
×
487
        }
×
488

489
        grpcListener, err := setupListener(laddr, grpcPort())
91✔
490
        if err != nil {
91✔
491
                log.Fatal(err)
×
492
        }
×
493

494
        baseMux := http.NewServeMux()
91✔
495
        http.Handle("/", audit.AuditRequestHttp(baseMux))
91✔
496

91✔
497
        baseMux.HandleFunc("/query", queryHandler)
91✔
498
        baseMux.HandleFunc("/query/", queryHandler)
91✔
499
        baseMux.HandleFunc("/mutate", mutationHandler)
91✔
500
        baseMux.HandleFunc("/mutate/", mutationHandler)
91✔
501
        baseMux.HandleFunc("/commit", commitHandler)
91✔
502
        baseMux.HandleFunc("/alter", alterHandler)
91✔
503
        baseMux.HandleFunc("/health", healthCheck)
91✔
504
        baseMux.HandleFunc("/state", stateHandler)
91✔
505
        baseMux.HandleFunc("/debug/jemalloc", x.JemallocHandler)
91✔
506
        zpages.Handle(baseMux, "/debug/z")
91✔
507

91✔
508
        // TODO: Figure out what this is for?
91✔
509
        http.HandleFunc("/debug/store", storeStatsHandler)
91✔
510

91✔
511
        introspection := x.Config.GraphQL.GetBool("introspection")
91✔
512

91✔
513
        // Global Epoch is a lockless synchronization mechanism for graphql service.
91✔
514
        // It's is just an atomic counter used by the graphql subscription to update its state.
91✔
515
        // It's is used to detect the schema changes and server exit.
91✔
516
        // It is also reported by /probe/graphql endpoint as the schemaUpdateCounter.
91✔
517

91✔
518
        // Implementation for schema change:
91✔
519
        // The global epoch is incremented when there is a schema change.
91✔
520
        // Polling goroutine acquires the current epoch count as a local epoch.
91✔
521
        // The local epoch count is checked against the global epoch,
91✔
522
        // If there is change then we terminate the subscription.
91✔
523

91✔
524
        // Implementation for server exit:
91✔
525
        // The global epoch is set to maxUint64 while exiting the server.
91✔
526
        // By using this information polling goroutine terminates the subscription.
91✔
527
        globalEpoch := make(map[uint64]*uint64)
91✔
528
        e := new(uint64)
91✔
529
        atomic.StoreUint64(e, 0)
91✔
530
        globalEpoch[x.GalaxyNamespace] = e
91✔
531
        var mainServer admin.IServeGraphQL
91✔
532
        var gqlHealthStore *admin.GraphQLHealthStore
91✔
533
        // Do not use := notation here because adminServer is a global variable.
91✔
534
        mainServer, adminServer, gqlHealthStore = admin.NewServers(introspection,
91✔
535
                globalEpoch, closer)
91✔
536
        baseMux.HandleFunc("/graphql", func(w http.ResponseWriter, r *http.Request) {
3,720✔
537
                namespace := x.ExtractNamespaceHTTP(r)
3,629✔
538
                r.Header.Set("resolver", strconv.FormatUint(namespace, 10))
3,629✔
539
                if err := admin.LazyLoadSchema(namespace); err != nil {
3,629✔
540
                        admin.WriteErrorResponse(w, r, err)
×
541
                        return
×
542
                }
×
543
                mainServer.HTTPHandler().ServeHTTP(w, r)
3,629✔
544
        })
545

546
        baseMux.Handle("/probe/graphql", graphqlProbeHandler(gqlHealthStore, globalEpoch))
91✔
547

91✔
548
        baseMux.HandleFunc("/admin", func(w http.ResponseWriter, r *http.Request) {
1,513✔
549
                r.Header.Set("resolver", "0")
1,422✔
550
                // We don't need to load the schema for all the admin operations.
1,422✔
551
                // Only a few like getUser, queryGroup require this. So, this can be optimized.
1,422✔
552
                if err := admin.LazyLoadSchema(x.ExtractNamespaceHTTP(r)); err != nil {
1,422✔
553
                        admin.WriteErrorResponse(w, r, err)
×
554
                        return
×
555
                }
×
556
                allowedMethodsHandler(allowedMethods{
1,422✔
557
                        http.MethodGet:     true,
1,422✔
558
                        http.MethodPost:    true,
1,422✔
559
                        http.MethodOptions: true,
1,422✔
560
                }, adminAuthHandler(adminServer.HTTPHandler())).ServeHTTP(w, r)
1,422✔
561
        })
562
        baseMux.Handle("/admin/", getAdminMux())
91✔
563

91✔
564
        addr := fmt.Sprintf("%s:%d", laddr, httpPort())
91✔
565
        glog.Infof("Bringing up GraphQL HTTP API at %s/graphql", addr)
91✔
566
        glog.Infof("Bringing up GraphQL HTTP admin API at %s/admin", addr)
91✔
567

91✔
568
        baseMux.Handle("/", http.HandlerFunc(homeHandler))
91✔
569
        baseMux.Handle("/ui/keywords", http.HandlerFunc(keywordHandler))
91✔
570

91✔
571
        // Initialize the servers.
91✔
572
        x.ServerCloser.AddRunning(3)
91✔
573
        go serveGRPC(grpcListener, tlsCfg, x.ServerCloser)
91✔
574
        go x.StartListenHttpAndHttps(httpListener, tlsCfg, x.ServerCloser)
91✔
575

91✔
576
        go func() {
182✔
577
                defer x.ServerCloser.Done()
91✔
578

91✔
579
                <-x.ServerCloser.HasBeenClosed()
91✔
580
                // TODO - Verify why do we do this and does it have to be done for all namespaces.
91✔
581
                e = globalEpoch[x.GalaxyNamespace]
91✔
582
                atomic.StoreUint64(e, math.MaxUint64)
91✔
583

91✔
584
                // Stops grpc/http servers; Already accepted connections are not closed.
91✔
585
                if err := grpcListener.Close(); err != nil {
91✔
586
                        glog.Warningf("Error while closing gRPC listener: %s", err)
×
587
                }
×
588
                if err := httpListener.Close(); err != nil {
91✔
589
                        glog.Warningf("Error while closing HTTP listener: %s", err)
×
590
                }
×
591
        }()
592

593
        glog.Infoln("gRPC server started.  Listening on port", grpcPort())
91✔
594
        glog.Infoln("HTTP server started.  Listening on port", httpPort())
91✔
595

91✔
596
        atomic.AddUint32(&initDone, 1)
91✔
597
        // Audit needs groupId and nodeId to initialize audit files
91✔
598
        // Therefore we wait for the cluster initialization to be done.
91✔
599
        for {
1,052✔
600
                if x.HealthCheck() == nil {
1,052✔
601
                        // Audit is enterprise feature.
91✔
602
                        x.Check(audit.InitAuditorIfNecessary(worker.Config.Audit, worker.EnterpriseEnabled))
91✔
603
                        break
91✔
604
                }
605
                time.Sleep(500 * time.Millisecond)
870✔
606
        }
607
        x.ServerCloser.Wait()
91✔
608
}
609

610
func run() {
91✔
611
        var err error
91✔
612

91✔
613
        telemetry := z.NewSuperFlag(Alpha.Conf.GetString("telemetry")).MergeAndCheckDefault(
91✔
614
                x.TelemetryDefaults)
91✔
615
        if telemetry.GetBool("sentry") {
182✔
616
                x.InitSentry(enc.EeBuild)
91✔
617
                defer x.FlushSentry()
91✔
618
                x.ConfigureSentryScope("alpha")
91✔
619
                x.WrapPanics()
91✔
620
                x.SentryOptOutNote()
91✔
621
        }
91✔
622

623
        bindall = Alpha.Conf.GetBool("bindall")
91✔
624
        cache := z.NewSuperFlag(Alpha.Conf.GetString("cache")).MergeAndCheckDefault(
91✔
625
                worker.CacheDefaults)
91✔
626
        totalCache := cache.GetInt64("size-mb")
91✔
627
        x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
91✔
628

91✔
629
        cachePercentage := cache.GetString("percentage")
91✔
630
        cachePercent, err := x.GetCachePercentages(cachePercentage, 3)
91✔
631
        x.Check(err)
91✔
632
        postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
91✔
633
        pstoreBlockCacheSize := (cachePercent[1] * (totalCache << 20)) / 100
91✔
634
        pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100
91✔
635

91✔
636
        cacheOpts := fmt.Sprintf("blockcachesize=%d; indexcachesize=%d; ",
91✔
637
                pstoreBlockCacheSize, pstoreIndexCacheSize)
91✔
638
        bopts := badger.DefaultOptions("").FromSuperFlag(worker.BadgerDefaults + cacheOpts).
91✔
639
                FromSuperFlag(Alpha.Conf.GetString("badger"))
91✔
640
        security := z.NewSuperFlag(Alpha.Conf.GetString("security")).MergeAndCheckDefault(
91✔
641
                worker.SecurityDefaults)
91✔
642
        conf := audit.GetAuditConf(Alpha.Conf.GetString("audit"))
91✔
643
        opts := worker.Options{
91✔
644
                PostingDir:      Alpha.Conf.GetString("postings"),
91✔
645
                WALDir:          Alpha.Conf.GetString("wal"),
91✔
646
                CacheMb:         totalCache,
91✔
647
                CachePercentage: cachePercentage,
91✔
648

91✔
649
                MutationsMode:  worker.AllowMutations,
91✔
650
                AuthToken:      security.GetString("token"),
91✔
651
                Audit:          conf,
91✔
652
                ChangeDataConf: Alpha.Conf.GetString("cdc"),
91✔
653
        }
91✔
654

91✔
655
        keys, err := ee.GetKeys(Alpha.Conf)
91✔
656
        x.Check(err)
91✔
657

91✔
658
        if keys.AclKey != nil {
119✔
659
                opts.HmacSecret = keys.AclKey
28✔
660
                opts.AccessJwtTtl = keys.AclAccessTtl
28✔
661
                opts.RefreshJwtTtl = keys.AclRefreshTtl
28✔
662
                glog.Info("ACL secret key loaded successfully.")
28✔
663
        }
28✔
664

665
        x.Config.Limit = z.NewSuperFlag(Alpha.Conf.GetString("limit")).MergeAndCheckDefault(
91✔
666
                worker.LimitDefaults)
91✔
667
        abortDur := x.Config.Limit.GetDuration("txn-abort-after")
91✔
668
        switch strings.ToLower(x.Config.Limit.GetString("mutations")) {
91✔
669
        case "allow":
91✔
670
                opts.MutationsMode = worker.AllowMutations
91✔
671
        case "disallow":
×
672
                opts.MutationsMode = worker.DisallowMutations
×
673
        case "strict":
×
674
                opts.MutationsMode = worker.StrictMutations
×
675
        default:
×
676
                glog.Error(`--limit "mutations=<mode>;" must be one of allow, disallow, or strict`)
×
677
                os.Exit(1)
×
678
        }
679

680
        worker.SetConfiguration(&opts)
91✔
681

91✔
682
        ips, err := getIPsFromString(security.GetString("whitelist"))
91✔
683
        x.Check(err)
91✔
684

91✔
685
        tlsClientConf, err := x.LoadClientTLSConfigForInternalPort(Alpha.Conf)
91✔
686
        x.Check(err)
91✔
687
        tlsServerConf, err := x.LoadServerTLSConfigForInternalPort(Alpha.Conf)
91✔
688
        x.Check(err)
91✔
689

91✔
690
        raft := z.NewSuperFlag(Alpha.Conf.GetString("raft")).MergeAndCheckDefault(worker.RaftDefaults)
91✔
691
        x.WorkerConfig = x.WorkerOptions{
91✔
692
                TmpDir:              Alpha.Conf.GetString("tmp"),
91✔
693
                ExportPath:          Alpha.Conf.GetString("export"),
91✔
694
                ZeroAddr:            strings.Split(Alpha.Conf.GetString("zero"), ","),
91✔
695
                Raft:                raft,
91✔
696
                WhiteListedIPRanges: ips,
91✔
697
                StrictMutations:     opts.MutationsMode == worker.StrictMutations,
91✔
698
                AclEnabled:          keys.AclKey != nil,
91✔
699
                AbortOlderThan:      abortDur,
91✔
700
                StartTime:           startTime,
91✔
701
                Security:            security,
91✔
702
                TLSClientConfig:     tlsClientConf,
91✔
703
                TLSServerConfig:     tlsServerConf,
91✔
704
                HmacSecret:          opts.HmacSecret,
91✔
705
                Audit:               opts.Audit != nil,
91✔
706
                Badger:              bopts,
91✔
707
        }
91✔
708
        x.WorkerConfig.Parse(Alpha.Conf)
91✔
709

91✔
710
        if telemetry.GetBool("reports") {
182✔
711
                go edgraph.PeriodicallyPostTelemetry()
91✔
712
        }
91✔
713

714
        // Set the directory for temporary buffers.
715
        z.SetTmpDir(x.WorkerConfig.TmpDir)
91✔
716

91✔
717
        x.WorkerConfig.EncryptionKey = keys.EncKey
91✔
718

91✔
719
        setupCustomTokenizers()
91✔
720
        x.Config.PortOffset = Alpha.Conf.GetInt("port_offset")
91✔
721
        x.Config.LimitMutationsNquad = int(x.Config.Limit.GetInt64("mutations-nquad"))
91✔
722
        x.Config.LimitQueryEdge = x.Config.Limit.GetUint64("query-edge")
91✔
723
        x.Config.BlockClusterWideDrop = x.Config.Limit.GetBool("disallow-drop")
91✔
724
        x.Config.LimitNormalizeNode = int(x.Config.Limit.GetInt64("normalize-node"))
91✔
725
        x.Config.QueryTimeout = x.Config.Limit.GetDuration("query-timeout")
91✔
726
        x.Config.MaxRetries = x.Config.Limit.GetInt64("max-retries")
91✔
727
        x.Config.SharedInstance = x.Config.Limit.GetBool("shared-instance")
91✔
728

91✔
729
        x.Config.GraphQL = z.NewSuperFlag(Alpha.Conf.GetString("graphql")).MergeAndCheckDefault(
91✔
730
                worker.GraphQLDefaults)
91✔
731
        x.Config.GraphQLDebug = x.Config.GraphQL.GetBool("debug")
91✔
732
        if x.Config.GraphQL.GetString("lambda-url") != "" {
93✔
733
                graphqlLambdaUrl, err := url.Parse(x.Config.GraphQL.GetString("lambda-url"))
2✔
734
                if err != nil {
2✔
735
                        glog.Errorf("unable to parse --graphql lambda-url: %v", err)
×
736
                        return
×
737
                }
×
738
                if !graphqlLambdaUrl.IsAbs() {
2✔
739
                        glog.Errorf("expecting --graphql lambda-url to be an absolute URL, got: %s",
×
740
                                graphqlLambdaUrl.String())
×
741
                        return
×
742
                }
×
743
        }
744
        edgraph.Init()
91✔
745

91✔
746
        // feature flags
91✔
747
        featureFlagsConf := z.NewSuperFlag(Alpha.Conf.GetString("feature-flags")).MergeAndCheckDefault(
91✔
748
                worker.FeatureFlagsDefaults)
91✔
749
        x.Config.ListInNormalize = featureFlagsConf.GetBool("list-in-normalize")
91✔
750

91✔
751
        x.PrintVersion()
91✔
752
        glog.Infof("x.Config: %+v", x.Config)
91✔
753
        glog.Infof("x.WorkerConfig: %+v", x.WorkerConfig)
91✔
754
        glog.Infof("worker.Config: %+v", worker.Config)
91✔
755

91✔
756
        worker.InitServerState()
91✔
757
        worker.InitTasks()
91✔
758

91✔
759
        if Alpha.Conf.GetBool("expose_trace") {
111✔
760
                // TODO: Remove this once we get rid of event logs.
20✔
761
                trace.AuthRequest = func(req *http.Request) (any, sensitive bool) {
20✔
762
                        return true, true
×
763
                }
×
764
        }
765
        otrace.ApplyConfig(otrace.Config{
91✔
766
                DefaultSampler:             otrace.ProbabilitySampler(x.WorkerConfig.Trace.GetFloat64("ratio")),
91✔
767
                MaxAnnotationEventsPerSpan: 256,
91✔
768
        })
91✔
769

91✔
770
        // Posting will initialize index which requires schema. Hence, initialize
91✔
771
        // schema before calling posting.Init().
91✔
772
        schema.Init(worker.State.Pstore)
91✔
773
        posting.Init(worker.State.Pstore, postingListCacheSize)
91✔
774
        defer posting.Cleanup()
91✔
775
        worker.Init(worker.State.Pstore)
91✔
776

91✔
777
        // setup shutdown os signal handler
91✔
778
        sdCh := make(chan os.Signal, 3)
91✔
779

91✔
780
        defer func() {
182✔
781
                signal.Stop(sdCh)
91✔
782
                close(sdCh)
91✔
783
        }()
91✔
784
        // sigint : Ctrl-C, sigterm : kill command.
785
        signal.Notify(sdCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
91✔
786
        go func() {
182✔
787
                var numShutDownSig int
91✔
788
                for range sdCh {
182✔
789
                        closer := x.ServerCloser
91✔
790
                        select {
91✔
791
                        case <-closer.HasBeenClosed():
×
792
                        default:
91✔
793
                                closer.Signal()
91✔
794
                        }
795
                        numShutDownSig++
91✔
796
                        glog.Infoln("Caught Ctrl-C. Terminating now (this may take a few seconds)...")
91✔
797

91✔
798
                        switch {
91✔
799
                        case atomic.LoadUint32(&initDone) < 2:
×
800
                                // Forcefully kill alpha if we haven't finish server initialization.
×
801
                                glog.Infoln("Stopped before initialization completed")
×
802
                                os.Exit(1)
×
803
                        case numShutDownSig == 3:
×
804
                                glog.Infoln("Signaled thrice. Aborting!")
×
805
                                os.Exit(1)
×
806
                        }
807
                }
808
        }()
809

810
        updaters := z.NewCloser(2)
91✔
811
        go func() {
182✔
812
                worker.StartRaftNodes(worker.State.WALstore, bindall)
91✔
813
                atomic.AddUint32(&initDone, 1)
91✔
814

91✔
815
                // initialization of the admin account can only be done after raft nodes are running
91✔
816
                // and health check passes
91✔
817
                edgraph.InitializeAcl(updaters)
91✔
818
                edgraph.RefreshACLs(updaters.Ctx())
91✔
819
                edgraph.SubscribeForAclUpdates(updaters)
91✔
820
        }()
91✔
821

822
        // Graphql subscribes to alpha to get schema updates. We need to close that before we
823
        // close alpha. This closer is for closing and waiting that subscription.
824
        adminCloser := z.NewCloser(1)
91✔
825

91✔
826
        setupServer(adminCloser)
91✔
827
        glog.Infoln("GRPC and HTTP stopped.")
91✔
828

91✔
829
        // This might not close until group is given the signal to close. So, only signal here,
91✔
830
        // wait for it after group is closed.
91✔
831
        updaters.Signal()
91✔
832

91✔
833
        worker.BlockingStop()
91✔
834
        glog.Infoln("worker stopped.")
91✔
835

91✔
836
        adminCloser.SignalAndWait()
91✔
837
        glog.Infoln("adminCloser closed.")
91✔
838

91✔
839
        audit.Close()
91✔
840

91✔
841
        worker.State.Dispose()
91✔
842
        x.RemoveCidFile()
91✔
843
        glog.Info("worker.State disposed.")
91✔
844

91✔
845
        updaters.Wait()
91✔
846
        glog.Infoln("updaters closed.")
91✔
847

91✔
848
        glog.Infoln("Server shutdown. Bye!")
91✔
849
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc