• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9198351832

22 May 2024 09:03PM UTC coverage: 89.498% (+0.02%) from 89.479%
9198351832

push

gh-ci

ostafen
Fix iteration issue in ServerInfo()

2 of 4 new or added lines in 1 file covered. (50.0%)

6 existing lines in 2 files now uncovered.

34760 of 38839 relevant lines covered (89.5%)

162874.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.27
/pkg/server/server.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "context"
21
        "fmt"
22
        "io/ioutil"
23
        "log"
24
        "net"
25
        "os"
26
        "os/signal"
27
        "path/filepath"
28
        "strings"
29
        "syscall"
30
        "time"
31
        "unicode"
32

33
        "github.com/codenotary/immudb/pkg/server/sessions"
34
        "github.com/codenotary/immudb/pkg/truncator"
35

36
        "github.com/codenotary/immudb/embedded/remotestorage"
37
        "github.com/codenotary/immudb/embedded/store"
38
        "github.com/codenotary/immudb/pkg/errors"
39
        "github.com/codenotary/immudb/pkg/replication"
40

41
        pgsqlsrv "github.com/codenotary/immudb/pkg/pgsql/server"
42

43
        "github.com/codenotary/immudb/pkg/stream"
44

45
        "github.com/codenotary/immudb/pkg/database"
46

47
        "github.com/codenotary/immudb/embedded/logger"
48
        "github.com/codenotary/immudb/pkg/signer"
49

50
        "github.com/codenotary/immudb/cmd/helper"
51
        "github.com/codenotary/immudb/cmd/version"
52
        "github.com/codenotary/immudb/pkg/api/protomodel"
53
        "github.com/codenotary/immudb/pkg/api/schema"
54
        "github.com/codenotary/immudb/pkg/auth"
55
        "github.com/golang/protobuf/ptypes/empty"
56
        grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
57
        grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
58
        "google.golang.org/grpc"
59
        "google.golang.org/grpc/codes"
60
        "google.golang.org/grpc/credentials"
61
        "google.golang.org/grpc/reflection"
62
        "google.golang.org/grpc/status"
63
)
64

65
const (
66
        //KeyPrefixUser All user keys in the key/value store are prefixed by this keys to distinguish them from keys that have other purposes
67
        KeyPrefixUser = iota + 1
68
        //KeyPrefixDBSettings is used for entries related to database settings
69
        KeyPrefixDBSettings
70
)
71

72
var startedAt time.Time
73

74
var immudbTextLogo = " _                               _ _     \n" +
75
        "(_)                             | | |    \n" +
76
        " _ _ __ ___  _ __ ___  _   _  __| | |__  \n" +
77
        "| | '_ ` _ \\| '_ ` _ \\| | | |/ _` | '_ \\ \n" +
78
        "| | | | | | | | | | | | |_| | (_| | |_) |\n" +
79
        "|_|_| |_| |_|_| |_| |_|\\__,_|\\__,_|_.__/ \n"
80

81
// Initialize initializes dependencies, set up multi database capabilities and stats
82
func (s *ImmuServer) Initialize() error {
224✔
83
        // Print to stdout in case of text logger, or in case logs are being written to file
224✔
84
        // This is to avoid mixing text output with json in case the log output is piped
224✔
85
        if (s.Options.IsJSONLogger() && s.Options.IsFileLogger()) || !s.Options.IsJSONLogger() {
448✔
86
                fmt.Fprintf(os.Stdout, "\n%s\n%s\n%s\n\n", immudbTextLogo, version.VersionStr(), s.Options)
224✔
87
        }
224✔
88

89
        // Print the logo to the file in case of a text output only
90
        if s.Options.IsJSONLogger() {
224✔
91
                s.Logger.Infof("\n%s\n%s\n\n", version.VersionStr(), s.Options)
×
92
        } else if s.Options.IsFileLogger() {
224✔
93
                s.Logger.Infof("\n%s\n%s\n%s\n\n", immudbTextLogo, version.VersionStr(), s.Options)
×
94
        }
×
95

96
        if s.Options.GetMaintenance() && s.Options.GetAuth() {
225✔
97
                return ErrAuthMustBeDisabled
1✔
98
        }
1✔
99

100
        adminPassword, err := auth.DecodeBase64Password(s.Options.AdminPassword)
223✔
101
        if err != nil {
224✔
102
                return logErr(s.Logger, "%v", err)
1✔
103
        }
1✔
104

105
        if len(adminPassword) == 0 {
223✔
106
                s.Logger.Errorf(ErrEmptyAdminPassword.Error())
1✔
107
                return ErrEmptyAdminPassword
1✔
108
        }
1✔
109

110
        dataDir := s.Options.Dir
221✔
111
        err = os.MkdirAll(dataDir, store.DefaultFileMode)
221✔
112
        if err != nil {
221✔
113
                return logErr(s.Logger, "Unable to create data dir: %v", err)
×
114
        }
×
115

116
        systemDbRootDir := s.OS.Join(dataDir, s.Options.GetDefaultDBName())
221✔
117

221✔
118
        if s.UUID, err = getOrSetUUID(dataDir, systemDbRootDir, s.Options.RemoteStorageOptions.S3ExternalIdentifier); err != nil {
221✔
119
                return logErr(s.Logger, "Unable to get or set uuid: %v", err)
×
120
        }
×
121

122
        s.remoteStorage, err = s.createRemoteStorageInstance()
221✔
123
        if err != nil {
221✔
124
                return logErr(s.Logger, "Unable to open remote storage: %v", err)
×
125
        }
×
126

127
        err = s.initializeRemoteStorage(s.remoteStorage)
221✔
128
        if err != nil {
222✔
129
                return logErr(s.Logger, "unable to initialize remote storage: %v", err)
1✔
130
        }
1✔
131

132
        if err = s.loadSystemDatabase(dataDir, s.remoteStorage, adminPassword, s.Options.ForceAdminPassword); err != nil {
220✔
133
                return logErr(s.Logger, "unable to load system database: %v", err)
×
134
        }
×
135

136
        if err = s.loadDefaultDatabase(dataDir, s.remoteStorage); err != nil {
220✔
137
                return logErr(s.Logger, "unable to load default database: %v", err)
×
138
        }
×
139

140
        defaultDB, _ := s.dbList.GetByIndex(defaultDbIndex)
220✔
141

220✔
142
        dbSize, _ := defaultDB.TxCount()
220✔
143
        if dbSize <= 1 {
436✔
144
                s.Logger.Infof("started with an empty default database")
216✔
145
        }
216✔
146

147
        if s.sysDB.IsReplica() {
222✔
148
                s.Logger.Infof("recovery mode. Only '%s' and '%s' databases are loaded", SystemDBName, DefaultDBName)
2✔
149
        } else {
220✔
150
                if err = s.loadUserDatabases(dataDir, s.remoteStorage); err != nil {
218✔
151
                        return logErr(s.Logger, "unable load databases: %v", err)
×
152
                }
×
153
        }
154

155
        s.multidbmode = s.mandatoryAuth()
220✔
156
        if !s.Options.GetAuth() && s.multidbmode {
220✔
157
                return ErrAuthMustBeEnabled
×
158
        }
×
159

160
        s.SessManager, err = sessions.NewManager(s.Options.SessionsOptions)
220✔
161
        if err != nil {
220✔
162
                return err
×
163
        }
×
164

165
        grpcSrvOpts := []grpc.ServerOption{}
220✔
166
        if s.Options.TLSConfig != nil {
221✔
167
                grpcSrvOpts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(s.Options.TLSConfig))}
1✔
168
        }
1✔
169

170
        if s.Options.SigningKey != "" {
273✔
171
                if signer, err := signer.NewSigner(s.Options.SigningKey); err != nil {
53✔
172
                        return logErr(s.Logger, "unable to configure the cryptographic signer: %v", err)
×
173
                } else {
53✔
174
                        s.StateSigner = NewStateSigner(signer)
53✔
175
                }
53✔
176
        }
177

178
        if s.Options.usingCustomListener {
222✔
179
                s.Logger.Infof("using custom listener")
2✔
180
                s.Listener = s.Options.listener
2✔
181
        } else {
220✔
182
                s.Listener, err = net.Listen(s.Options.Network, s.Options.Bind())
218✔
183
                if err != nil {
231✔
184
                        return logErr(s.Logger, "immudb unable to listen: %v", err)
13✔
185
                }
13✔
186
        }
187

188
        if s.remoteStorage != nil {
207✔
189
                err := s.updateRemoteUUID(s.remoteStorage)
×
190
                if err != nil {
×
191
                        return logErr(s.Logger, "unable to persist uuid on the remote storage: %v", err)
×
192
                }
×
193
        }
194

195
        auth.AuthEnabled = s.Options.GetAuth()
207✔
196
        auth.DevMode = s.Options.DevMode
207✔
197
        auth.UpdateMetrics = func(ctx context.Context) { Metrics.UpdateClientMetrics(ctx) }
8,042✔
198

199
        if err = s.setupPidFile(); err != nil {
207✔
200
                return err
×
201
        }
×
202

203
        if s.Options.StreamChunkSize < stream.MinChunkSize {
208✔
204
                return errors.New(stream.ErrChunkTooSmall).WithCode(errors.CodInvalidParameterValue)
1✔
205
        }
1✔
206

207
        //===> !NOTE: See Histograms section here:
208
        // https://github.com/grpc-ecosystem/go-grpc-prometheus
209
        // TL;DR:
210
        // Prometheus histograms are a great way to measure latency distributions of
211
        // your RPCs. However, since it is bad practice to have metrics of high
212
        // cardinality the latency monitoring metrics are disabled by default. To
213
        // enable them the following has to be called during initialization code:
214
        if !s.Options.NoHistograms {
412✔
215
                grpc_prometheus.EnableHandlingTimeHistogram()
206✔
216
        }
206✔
217
        //<===
218

219
        uuidContext := NewUUIDContext(s.UUID)
206✔
220

206✔
221
        uis := []grpc.UnaryServerInterceptor{
206✔
222
                ErrorMapper, // converts errors in gRPC ones. Need to be the first
206✔
223
                s.KeepAliveSessionInterceptor,
206✔
224
                uuidContext.UUIDContextSetter,
206✔
225
                grpc_prometheus.UnaryServerInterceptor,
206✔
226
                auth.ServerUnaryInterceptor,
206✔
227
                s.SessionAuthInterceptor,
206✔
228
        }
206✔
229
        sss := []grpc.StreamServerInterceptor{
206✔
230
                ErrorMapperStream, // converts errors in gRPC ones. Need to be the first
206✔
231
                s.KeepALiveSessionStreamInterceptor,
206✔
232
                uuidContext.UUIDStreamContextSetter,
206✔
233
                grpc_prometheus.StreamServerInterceptor,
206✔
234
                auth.ServerStreamInterceptor,
206✔
235
        }
206✔
236
        grpcSrvOpts = append(
206✔
237
                grpcSrvOpts,
206✔
238
                grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(uis...)),
206✔
239
                grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(sss...)),
206✔
240
                grpc.MaxRecvMsgSize(s.Options.MaxRecvMsgSize),
206✔
241
        )
206✔
242

206✔
243
        s.GrpcServer = grpc.NewServer(grpcSrvOpts...)
206✔
244
        if s.Options.GRPCReflectionServerEnabled {
412✔
245
                reflection.Register(s.GrpcServer)
206✔
246
        }
206✔
247

248
        schema.RegisterImmuServiceServer(s.GrpcServer, s)
206✔
249
        protomodel.RegisterDocumentServiceServer(s.GrpcServer, s)
206✔
250
        protomodel.RegisterAuthorizationServiceServer(s.GrpcServer, &authenticationServiceImp{server: s})
206✔
251
        grpc_prometheus.Register(s.GrpcServer)
206✔
252

206✔
253
        if s.Options.PgsqlServer {
233✔
254
                s.PgsqlSrv = pgsqlsrv.New(
27✔
255
                        pgsqlsrv.Host(s.Options.Address),
27✔
256
                        pgsqlsrv.Port(s.Options.PgsqlServerPort),
27✔
257
                        pgsqlsrv.ImmudbPort(s.Listener.Addr().(*net.TCPAddr).Port),
27✔
258
                        pgsqlsrv.TLSConfig(s.Options.TLSConfig),
27✔
259
                        pgsqlsrv.Logger(s.Logger),
27✔
260
                        pgsqlsrv.DatabaseList(s.dbList),
27✔
261
                )
27✔
262

27✔
263
                if err = s.PgsqlSrv.Initialize(); err != nil {
27✔
264
                        return err
×
265
                }
×
266
        }
267

268
        return err
206✔
269
}
270

271
// Start starts the immudb server
272
// Loads and starts the System DB, default db and user db
273
func (s *ImmuServer) Start() (err error) {
77✔
274
        s.mux.Lock()
77✔
275
        s.pgsqlMux.Lock()
77✔
276

77✔
277
        startedAt = time.Now()
77✔
278

77✔
279
        if s.Options.MetricsServer {
78✔
280
                s.metricsServer = StartMetrics(1*time.Minute, s.Options.MetricsBind(), s.Logger, s.metricFuncServerUptimeCounter,
1✔
281
                        s.metricFuncComputeDBSizes, s.metricFuncComputeDBEntries, s.metricFuncComputeLoadedDBSize, s.metricFuncComputeSessionCount,
1✔
282
                        s.Options.PProf)
1✔
283
                defer func() {
2✔
284
                        if err := s.metricsServer.Close(); err != nil {
1✔
285
                                s.Logger.Errorf("failed to shutdown metric server: %s", err)
×
286
                        }
×
287
                }()
288
        }
289

290
        s.installShutdownHandler()
77✔
291

77✔
292
        go func() {
154✔
293
                if err := s.GrpcServer.Serve(s.Listener); err != nil {
77✔
294
                        s.mux.Unlock()
×
295
                        log.Fatal(err)
×
296
                }
×
297
        }()
298

299
        if err = s.SessManager.StartSessionsGuard(); err != nil {
77✔
300
                log.Fatal(err)
×
301
        }
×
302
        s.Logger.Infof("sessions guard started")
77✔
303

77✔
304
        if s.Options.PgsqlServer {
104✔
305
                go func() {
54✔
306
                        s.Logger.Infof("pgsql server is running at port %d", s.Options.PgsqlServerPort)
27✔
307
                        if err := s.PgsqlSrv.Serve(); err != nil {
27✔
308
                                s.pgsqlMux.Unlock()
×
309
                                log.Fatal(err)
×
310
                        }
×
311
                }()
312
        }
313

314
        if s.Options.WebServer {
79✔
315
                if err := s.setUpWebServer(context.Background()); err != nil {
2✔
316
                        log.Fatal(fmt.Sprintf("failed to setup web API/console server: %v", err))
×
317
                }
×
318
                defer func() {
4✔
319
                        if err := s.webServer.Close(); err != nil {
2✔
320
                                s.Logger.Errorf("failed to shutdown web API/console server: %s", err)
×
321
                        }
×
322
                }()
323
        }
324

325
        go s.printUsageCallToAction()
77✔
326

77✔
327
        s.mux.Unlock()
77✔
328
        s.pgsqlMux.Unlock()
77✔
329
        <-s.quit
77✔
330

77✔
331
        return err
77✔
332
}
333

334
func logErr(log logger.Logger, formattedMessage string, err error) error {
297✔
335
        if err != nil {
315✔
336
                log.Errorf(formattedMessage, err)
18✔
337
        }
18✔
338
        return err
297✔
339
}
340

341
func (s *ImmuServer) setupPidFile() error {
209✔
342
        var err error
209✔
343
        if s.Options.Pidfile != "" {
211✔
344
                if s.Pid, err = NewPid(s.Options.Pidfile, s.OS); err != nil {
3✔
345
                        return logErr(s.Logger, "failed to write pidfile: %s", err)
1✔
346
                }
1✔
347
        }
348
        return err
208✔
349
}
350

351
func (s *ImmuServer) setUpWebServer(ctx context.Context) error {
2✔
352
        server, err := startWebServer(
2✔
353
                ctx,
2✔
354
                s.Options.Bind(),
2✔
355
                s.Options.WebBind(),
2✔
356
                s.Options.TLSConfig,
2✔
357
                s,
2✔
358
                s.Logger,
2✔
359
        )
2✔
360
        if err != nil {
2✔
361
                return err
×
362
        }
×
363
        s.webServer = server
2✔
364
        return nil
2✔
365
}
366

367
func (s *ImmuServer) printUsageCallToAction() {
78✔
368
        time.Sleep(200 * time.Millisecond)
78✔
369
        immuadminCLI := helper.Blue + "immuadmin" + helper.Green
78✔
370
        immuclientCLI := helper.Blue + "immuclient" + helper.Green
78✔
371
        defaultUsername := helper.Blue + auth.SysAdminUsername + helper.Green
78✔
372

78✔
373
        // Print to stdout in case of text logger, or in case logs are being written to file
78✔
374
        // This is to avoid mixing text output with json in case the log output is piped
78✔
375
        if (s.Options.IsJSONLogger() && s.Options.IsFileLogger()) || !s.Options.IsJSONLogger() {
155✔
376
                fmt.Fprintf(os.Stdout,
77✔
377
                        "%syou can now use %s and %s CLIs to login with the %s superadmin user and start using immudb.%s\n",
77✔
378
                        helper.Green, immuadminCLI, immuclientCLI, defaultUsername, helper.Reset)
77✔
379
        }
77✔
380

381
        if s.Options.IsFileLogger() {
78✔
382
                s.Logger.Infof(
1✔
383
                        "you can now use immuadmin and immuclient CLIs to login with the %s superadmin user and start using immudb.\n",
1✔
384
                        auth.SysAdminUsername)
1✔
385
        }
1✔
386
}
387

388
func (s *ImmuServer) resetAdminPassword(ctx context.Context, adminPassword string) (bool, error) {
7✔
389
        if s.sysDB.IsReplica() {
8✔
390
                return false, errors.New("database is running as a replica")
1✔
391
        }
1✔
392

393
        adminUser, err := s.getUser(ctx, []byte(auth.SysAdminUsername))
6✔
394
        if err != nil {
8✔
395
                return false, fmt.Errorf("could not read sysadmin user data: %v", err)
2✔
396
        }
2✔
397

398
        err = adminUser.ComparePasswords([]byte(adminPassword))
4✔
399
        if err == nil {
5✔
400
                // Password is as expected, do not overwrite it to avoid unnecessary
1✔
401
                // transactions in systemdb
1✔
402
                return false, nil
1✔
403
        }
1✔
404

405
        _, err = adminUser.SetPassword([]byte(adminPassword))
3✔
406
        if err != nil {
4✔
407
                return false, err
1✔
408
        }
1✔
409

410
        err = s.saveUser(ctx, adminUser)
2✔
411
        if err != nil {
3✔
412
                return false, err
1✔
413
        }
1✔
414

415
        return true, nil
1✔
416
}
417

418
func (s *ImmuServer) loadSystemDatabase(
419
        dataDir string,
420
        remoteStorage remotestorage.Storage,
421
        adminPassword string,
422
        forceAdminPasswordReset bool,
423
) error {
230✔
424
        if s.dbList.Length() != 0 {
230✔
425
                panic("loadSystemDatabase should be called before any other database loading")
×
426
        }
427

428
        dbOpts, err := s.loadDBOptions(s.Options.GetSystemAdminDBName(), false)
230✔
429
        if err != nil {
230✔
430
                return fmt.Errorf("%w: while loading '%s' database settings", err, s.Options.GetSystemAdminDBName())
×
431
        }
×
432

433
        systemDBRootDir := s.OS.Join(dataDir, s.Options.GetSystemAdminDBName())
230✔
434
        _, err = s.OS.Stat(systemDBRootDir)
230✔
435
        if err == nil {
243✔
436
                s.sysDB, err = database.OpenDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger)
13✔
437
                if err != nil {
13✔
438
                        s.Logger.Errorf("database '%s' was not correctly initialized.\n"+"Use replication to recover from external source or start without data folder.", dbOpts.Database)
×
439
                        return err
×
440
                }
×
441

442
                if forceAdminPasswordReset {
15✔
443
                        changed, err := s.resetAdminPassword(context.Background(), adminPassword)
2✔
444
                        if err != nil {
2✔
445
                                s.Logger.Errorf("can not reset admin password, %v", err)
×
446
                                return ErrCantUpdateAdminPassword
×
447
                        }
×
448

449
                        if changed {
3✔
450
                                s.Logger.Warningf("admin password was reset to the value specified in options")
1✔
451
                        } else {
2✔
452
                                s.Logger.Infof("admin password update was not needed")
1✔
453
                        }
1✔
454

455
                } else if adminPassword != auth.SysAdminPassword {
12✔
456
                        // Add warning that the password is not changed even though manually specified
1✔
457
                        user, err := s.getUser(context.Background(), []byte(auth.SysAdminUsername))
1✔
458
                        if err != nil {
1✔
459
                                s.Logger.Errorf("can not validate admin user: %v", err)
×
460
                                return err
×
461
                        }
×
462
                        err = user.ComparePasswords([]byte(adminPassword))
1✔
463
                        if err != nil {
2✔
464
                                s.Logger.Warningf(
1✔
465
                                        "admin password was not updated, " +
1✔
466
                                                "use the force-admin-password option to forcibly reset it",
1✔
467
                                )
1✔
468
                        }
1✔
469
                }
470

471
                if dbOpts.isReplicatorRequired() {
13✔
472
                        err = s.startReplicationFor(s.sysDB, dbOpts)
×
473
                        if err != nil {
×
474
                                s.Logger.Errorf("error starting replication for database '%s'. Reason: %v", s.sysDB.GetName(), err)
×
475
                        }
×
476
                }
477

478
                return nil
13✔
479
        }
480

481
        if !s.OS.IsNotExist(err) {
217✔
482
                return err
×
483
        }
×
484

485
        s.sysDB, err = database.NewDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger)
217✔
486
        if err != nil {
217✔
487
                return err
×
488
        }
×
489

490
        //sys admin can have an empty array of databases as it has full access
491
        if !s.sysDB.IsReplica() {
432✔
492
                s.sysDB.SetSyncReplication(false)
215✔
493

215✔
494
                adminUsername, _, err := s.insertNewUser(context.Background(), []byte(auth.SysAdminUsername), []byte(adminPassword), auth.PermissionSysAdmin, "*", "")
215✔
495
                if err != nil {
215✔
496
                        return logErr(s.Logger, "%v", err)
×
497
                }
×
498

499
                if s.Options.ReplicationOptions.SyncReplication {
215✔
500
                        s.sysDB.SetSyncReplication(true)
×
501
                }
×
502

503
                s.Logger.Infof("admin user '%s' successfully created", adminUsername)
215✔
504
        }
505

506
        if dbOpts.isReplicatorRequired() {
218✔
507
                err = s.startReplicationFor(s.sysDB, dbOpts)
1✔
508
                if err != nil {
1✔
509
                        s.Logger.Errorf("error starting replication for database '%s'. Reason: %v", s.sysDB.GetName(), err)
×
510
                }
×
511
        }
512

513
        return nil
217✔
514
}
515

516
// loadDefaultDatabase
517
func (s *ImmuServer) loadDefaultDatabase(dataDir string, remoteStorage remotestorage.Storage) error {
227✔
518
        if s.dbList.Length() != 0 {
227✔
519
                panic("loadDefaultDatabase should be called right after loading systemDatabase")
×
520
        }
521

522
        dbOpts, err := s.loadDBOptions(s.Options.GetDefaultDBName(), false)
227✔
523
        if err != nil {
227✔
524
                return fmt.Errorf("%w: while loading '%s' database settings", err, s.Options.GetDefaultDBName())
×
525
        }
×
526

527
        defaultDbRootDir := s.OS.Join(dataDir, s.Options.GetDefaultDBName())
227✔
528

227✔
529
        _, err = s.OS.Stat(defaultDbRootDir)
227✔
530
        if err == nil {
238✔
531
                db, err := database.OpenDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger)
11✔
532
                if err != nil {
11✔
533
                        s.Logger.Errorf("database '%s' was not correctly initialized.\n"+"Use replication to recover from external source or start without data folder.", dbOpts.Database)
×
534
                        return err
×
535
                }
×
536

537
                if dbOpts.isReplicatorRequired() {
11✔
538
                        err = s.startReplicationFor(db, dbOpts)
×
539
                        if err != nil {
×
540
                                s.Logger.Errorf("error starting replication for database '%s'. Reason: %v", db.GetName(), err)
×
541
                        }
×
542
                }
543

544
                s.dbList.Put(db)
11✔
545

11✔
546
                return nil
11✔
547
        }
548

549
        if !s.OS.IsNotExist(err) {
216✔
550
                return err
×
551
        }
×
552

553
        db, err := database.NewDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger)
216✔
554
        if err != nil {
216✔
555
                return err
×
556
        }
×
557

558
        if dbOpts.isReplicatorRequired() {
217✔
559
                err = s.startReplicationFor(db, dbOpts)
1✔
560
                if err != nil {
1✔
561
                        s.Logger.Errorf("error starting replication for database '%s'. Reason: %v", db.GetName(), err)
×
562
                }
×
563
        }
564

565
        s.dbList.Put(db)
216✔
566

216✔
567
        return nil
216✔
568
}
569

570
func (s *ImmuServer) loadUserDatabases(dataDir string, remoteStorage remotestorage.Storage) error {
221✔
571
        var dirs []string
221✔
572

221✔
573
        //get first level sub directories of data dir
221✔
574
        files, err := ioutil.ReadDir(s.Options.Dir)
221✔
575
        if err != nil {
221✔
576
                return err
×
577
        }
×
578

579
        for _, f := range files {
890✔
580
                if !f.IsDir() ||
669✔
581
                        f.Name() == s.Options.GetSystemAdminDBName() ||
669✔
582
                        f.Name() == s.Options.GetDefaultDBName() {
1,332✔
583
                        continue
663✔
584
                }
585

586
                dirs = append(dirs, f.Name())
6✔
587
        }
588

589
        //load databases that are inside each directory
590
        for _, val := range dirs {
227✔
591
                //dbname is the directory name where it is stored
6✔
592
                //path iteration above stores the directories as data/db_name
6✔
593
                pathparts := strings.Split(val, string(filepath.Separator))
6✔
594
                dbname := pathparts[len(pathparts)-1]
6✔
595

6✔
596
                dbOpts, err := s.loadDBOptions(dbname, true)
6✔
597
                if err != nil {
6✔
598
                        return err
×
599
                }
×
600

601
                if !dbOpts.Autoload.isEnabled() {
6✔
602
                        s.Logger.Infof("database '%s' is closed (autoload is disabled)", dbname)
×
603
                        s.dbList.Put(&closedDB{name: dbname, opts: s.databaseOptionsFrom(dbOpts)})
×
604
                        continue
×
605
                }
606

607
                s.logDBOptions(dbname, dbOpts)
6✔
608

6✔
609
                db, err := database.OpenDB(dbname, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger)
6✔
610
                if err != nil {
6✔
611
                        s.Logger.Errorf("database '%s' could not be loaded. Reason: %v", dbname, err)
×
612
                        s.dbList.Put(&closedDB{name: dbname, opts: s.databaseOptionsFrom(dbOpts)})
×
613
                        continue
×
614
                }
615

616
                if dbOpts.isReplicatorRequired() {
9✔
617
                        err = s.startReplicationFor(db, dbOpts)
3✔
618
                        if err != nil {
3✔
619
                                s.Logger.Errorf("error starting replication for database '%s'. Reason: %v", db.GetName(), err)
×
620
                        }
×
621
                }
622

623
                if dbOpts.isDataRetentionEnabled() {
7✔
624
                        err = s.startTruncatorFor(db, dbOpts)
1✔
625
                        if err != nil {
2✔
626
                                s.Logger.Errorf("error starting truncation for database '%s'. Reason: %v", db.GetName(), err)
1✔
627
                        }
1✔
628
                }
629

630
                s.dbList.Put(db)
6✔
631
        }
632

633
        return nil
221✔
634
}
635

636
func (s *ImmuServer) replicationInProgressFor(db string) bool {
2✔
637
        s.replicationMutex.Lock()
2✔
638
        defer s.replicationMutex.Unlock()
2✔
639

2✔
640
        _, ok := s.replicators[db]
2✔
641
        return ok
2✔
642
}
2✔
643

644
func (s *ImmuServer) startReplicationFor(db database.DB, dbOpts *dbOptions) error {
149✔
645
        if !dbOpts.isReplicatorRequired() {
265✔
646
                s.Logger.Infof("replication for database '%s' is not required.", db.GetName())
116✔
647
                return ErrReplicatorNotNeeded
116✔
648
        }
116✔
649

650
        s.replicationMutex.Lock()
33✔
651
        defer s.replicationMutex.Unlock()
33✔
652

33✔
653
        replicatorOpts := replication.DefaultOptions().
33✔
654
                WithPrimaryDatabase(dbOpts.PrimaryDatabase).
33✔
655
                WithPrimaryHost(dbOpts.PrimaryHost).
33✔
656
                WithPrimaryPort(dbOpts.PrimaryPort).
33✔
657
                WithPrimaryUsername(dbOpts.PrimaryUsername).
33✔
658
                WithPrimaryPassword(dbOpts.PrimaryPassword).
33✔
659
                WithPrefetchTxBufferSize(dbOpts.PrefetchTxBufferSize).
33✔
660
                WithReplicationCommitConcurrency(dbOpts.ReplicationCommitConcurrency).
33✔
661
                WithAllowTxDiscarding(dbOpts.AllowTxDiscarding).
33✔
662
                WithSkipIntegrityCheck(dbOpts.SkipIntegrityCheck).
33✔
663
                WithWaitForIndexing(dbOpts.WaitForIndexing).
33✔
664
                WithStreamChunkSize(s.Options.StreamChunkSize)
33✔
665

33✔
666
        f, err := replication.NewTxReplicator(s.UUID, db, replicatorOpts, s.Logger)
33✔
667
        if err != nil {
33✔
668
                return err
×
669
        }
×
670

671
        err = f.Start()
33✔
672
        if err != nil {
33✔
673
                return err
×
674
        }
×
675

676
        s.replicators[db.GetName()] = f
33✔
677

33✔
678
        return nil
33✔
679
}
680

681
func (s *ImmuServer) stopReplicationFor(db string) error {
11✔
682
        s.replicationMutex.Lock()
11✔
683
        defer s.replicationMutex.Unlock()
11✔
684

11✔
685
        replicator, ok := s.replicators[db]
11✔
686
        if !ok {
18✔
687
                return ErrReplicationNotInProgress
7✔
688
        }
7✔
689

690
        err := replicator.Stop()
4✔
691
        if err == replication.ErrAlreadyStopped {
5✔
692
                return nil
1✔
693
        }
1✔
694
        if err != nil {
3✔
695
                return err
×
696
        }
×
697

698
        delete(s.replicators, db)
3✔
699

3✔
700
        return nil
3✔
701
}
702

703
func (s *ImmuServer) stopReplication() {
77✔
704
        s.replicationMutex.Lock()
77✔
705
        defer s.replicationMutex.Unlock()
77✔
706

77✔
707
        for db, f := range s.replicators {
106✔
708
                err := f.Stop()
29✔
709
                if err != nil {
29✔
710
                        s.Logger.Warningf("error stopping replication for '%s'. Reason: %v", db, err)
×
711
                }
×
712
        }
713
}
714

715
// Stop stops the immudb server
716
func (s *ImmuServer) Stop() error {
77✔
717
        s.mux.Lock()
77✔
718
        defer s.mux.Unlock()
77✔
719

77✔
720
        s.Logger.Infof("stopping immudb:\n%v", s.Options)
77✔
721

77✔
722
        defer func() { s.quit <- struct{}{} }()
154✔
723

724
        if !s.Options.usingCustomListener {
153✔
725
                s.GrpcServer.Stop()
76✔
726
                defer func() { s.GrpcServer = nil }()
152✔
727
        }
728

729
        s.SessManager.StopSessionsGuard()
77✔
730

77✔
731
        s.stopReplication()
77✔
732

77✔
733
        s.stopTruncation()
77✔
734

77✔
735
        return s.CloseDatabases()
77✔
736
}
737

738
// CloseDatabases closes all opened databases including the consinstency checker
739
func (s *ImmuServer) CloseDatabases() error {
223✔
740
        for i := 0; i < s.dbList.Length(); i++ {
632✔
741
                val, err := s.dbList.GetByIndex(i)
409✔
742
                if err == nil {
817✔
743
                        val.Close()
408✔
744
                }
408✔
745
        }
746

747
        if s.sysDB != nil {
438✔
748
                s.sysDB.Close()
215✔
749
        }
215✔
750

751
        return nil
223✔
752
}
753

754
func (s *ImmuServer) updateConfigItem(key string, newOrUpdatedLine string, unchanged func(string) bool) error {
5✔
755
        configFilepath := s.Options.Config
5✔
756

5✔
757
        if strings.TrimSpace(configFilepath) == "" {
6✔
758
                return fmt.Errorf("config file does not exist")
1✔
759
        }
1✔
760

761
        configBytes, err := s.OS.ReadFile(configFilepath)
4✔
762
        if err != nil {
5✔
763
                return fmt.Errorf("error reading config file '%s'. Reason: %v", configFilepath, err)
1✔
764
        }
1✔
765

766
        configLines := strings.Split(string(configBytes), "\n")
3✔
767

3✔
768
        write := false
3✔
769
        for i, l := range configLines {
7✔
770
                l = strings.TrimSpace(l)
4✔
771
                if strings.HasPrefix(l, key+"=") || strings.HasPrefix(l, key+" =") {
5✔
772
                        kv := strings.Split(l, "=")
1✔
773
                        if unchanged(kv[1]) {
2✔
774
                                return fmt.Errorf("Server config already has '%s'", newOrUpdatedLine)
1✔
775
                        }
1✔
776
                        configLines[i] = newOrUpdatedLine
×
777
                        write = true
×
778
                        break
×
779
                }
780
        }
781

782
        if !write {
4✔
783
                configLines = append(configLines, newOrUpdatedLine)
2✔
784
        }
2✔
785

786
        if err := s.OS.WriteFile(configFilepath, []byte(strings.Join(configLines, "\n")), 0644); err != nil {
3✔
787
                return err
1✔
788
        }
1✔
789

790
        return nil
1✔
791
}
792

793
// UpdateAuthConfig is DEPRECATED
794
func (s *ImmuServer) UpdateAuthConfig(ctx context.Context, req *schema.AuthConfig) (*empty.Empty, error) {
2✔
795
        return nil, ErrNotSupported
2✔
796
}
2✔
797

798
// UpdateMTLSConfig is DEPRECATED
799
func (s *ImmuServer) UpdateMTLSConfig(ctx context.Context, req *schema.MTLSConfig) (*empty.Empty, error) {
2✔
800
        return nil, ErrNotSupported
2✔
801
}
2✔
802

803
// ServerInfo returns information about the server instance.
804
func (s *ImmuServer) ServerInfo(ctx context.Context, req *schema.ServerInfoRequest) (*schema.ServerInfoResponse, error) {
37✔
805
        dbSize, err := s.totalDBSize()
37✔
806
        if err != nil {
37✔
807
                return nil, err
×
808
        }
×
809

810
        numTransactions, err := s.numTransactions()
37✔
811
        if err != nil {
37✔
812
                return nil, err
×
813
        }
×
814

815
        return &schema.ServerInfoResponse{
37✔
816
                Version:           version.Version,
37✔
817
                StartedAt:         startedAt.Unix(),
37✔
818
                NumTransactions:   int64(numTransactions),
37✔
819
                NumDatabases:      int32(s.dbList.Length()),
37✔
820
                DatabasesDiskSize: dbSize,
37✔
821
        }, err
37✔
822
}
823

824
func (s *ImmuServer) numTransactions() (uint64, error) {
37✔
825
        s.dbListMutex.Lock()
37✔
826
        defer s.dbListMutex.Unlock()
37✔
827

37✔
828
        var count uint64
37✔
829
        for i := 0; i < s.dbList.Length(); i++ {
108✔
830
                db, err := s.dbList.GetByIndex(i)
71✔
831
                if err == database.ErrDatabaseNotExists {
71✔
NEW
832
                        continue
×
833
                }
834
                if err != nil {
71✔
835
                        return 0, err
×
836
                }
×
837

838
                dbTxCount, err := db.TxCount()
71✔
839
                if err != nil {
71✔
840
                        return 0, err
×
841
                }
×
842
                count += dbTxCount
71✔
843
        }
844
        return count, nil
37✔
845
}
846

847
func (s *ImmuServer) totalDBSize() (int64, error) {
37✔
848
        s.dbListMutex.Lock()
37✔
849
        defer s.dbListMutex.Unlock()
37✔
850

37✔
851
        var size int64
37✔
852
        for i := 0; i < s.dbList.Length(); i++ {
108✔
853
                db, err := s.dbList.GetByIndex(i)
71✔
854
                if err == database.ErrDatabaseNotExists {
71✔
NEW
855
                        continue
×
856
                }
857
                if err != nil {
71✔
858
                        return -1, err
×
859
                }
×
860

861
                dbSize, err := db.Size()
71✔
862
                if err != nil {
71✔
863
                        return -1, err
×
864
                }
×
865
                size += int64(dbSize)
71✔
866
        }
867
        return size, nil
37✔
868
}
869

870
// Health ...
871
func (s *ImmuServer) Health(ctx context.Context, _ *empty.Empty) (*schema.HealthResponse, error) {
59✔
872
        return &schema.HealthResponse{Status: true, Version: Version.Version}, nil
59✔
873
}
59✔
874

875
func (s *ImmuServer) installShutdownHandler() {
77✔
876
        c := make(chan os.Signal, 1)
77✔
877
        signal.Notify(c, os.Interrupt, syscall.SIGTERM)
77✔
878

77✔
879
        go func() {
154✔
880
                <-c
77✔
881
                s.Logger.Infof("caught SIGTERM")
77✔
882
                if err := s.Stop(); err != nil {
77✔
883
                        s.Logger.Errorf("shutdown error: %v", err)
×
884
                }
×
885
                s.Logger.Infof("shutdown completed")
×
886
        }()
887
}
888

889
// CreateDatabase Create a new database instance
890
func (s *ImmuServer) CreateDatabase(ctx context.Context, req *schema.Database) (*empty.Empty, error) {
2✔
891
        if req == nil {
3✔
892
                return nil, ErrIllegalArguments
1✔
893
        }
1✔
894

895
        _, err := s.CreateDatabaseV2(ctx, &schema.CreateDatabaseRequest{Name: req.DatabaseName})
1✔
896
        if err != nil {
1✔
897
                return nil, err
×
898
        }
×
899

900
        return &empty.Empty{}, nil
1✔
901
}
902

903
// CreateDatabaseWith Create a new database instance
904
func (s *ImmuServer) CreateDatabaseWith(ctx context.Context, req *schema.DatabaseSettings) (*empty.Empty, error) {
29✔
905
        if req == nil {
30✔
906
                return nil, ErrIllegalArguments
1✔
907
        }
1✔
908

909
        _, err := s.CreateDatabaseV2(ctx, &schema.CreateDatabaseRequest{
28✔
910
                Name:     req.DatabaseName,
28✔
911
                Settings: dbSettingsToDBNullableSettings(req),
28✔
912
        })
28✔
913
        if err != nil {
37✔
914
                return nil, err
9✔
915
        }
9✔
916

917
        return &empty.Empty{}, nil
19✔
918
}
919

920
// CreateDatabaseV2 Create a new database instance
921
func (s *ImmuServer) CreateDatabaseV2(ctx context.Context, req *schema.CreateDatabaseRequest) (res *schema.CreateDatabaseResponse, err error) {
146✔
922
        if req == nil {
146✔
923
                return nil, ErrIllegalArguments
×
924
        }
×
925

926
        s.Logger.Infof("creating database '%s'...", req.Name)
146✔
927

146✔
928
        defer func() {
292✔
929
                if err == nil {
281✔
930
                        s.Logger.Infof("database '%s' successfully created", req.Name)
135✔
931
                } else {
146✔
932
                        s.Logger.Infof("database '%s' could not be created. Reason: %v", req.Name, err)
11✔
933
                }
11✔
934
        }()
935

936
        if s.Options.GetMaintenance() {
147✔
937
                return nil, ErrNotAllowedInMaintenanceMode
1✔
938
        }
1✔
939

940
        if !s.Options.GetAuth() {
146✔
941
                return nil, ErrAuthMustBeEnabled
1✔
942
        }
1✔
943

944
        _, user, err := s.getLoggedInUserdataFromCtx(ctx)
144✔
945
        if err != nil {
145✔
946
                return nil, fmt.Errorf("could not get loggedin user data")
1✔
947
        }
1✔
948

949
        if !user.IsSysAdmin {
144✔
950
                return nil, fmt.Errorf("loggedin user does not have permissions for this operation")
1✔
951
        }
1✔
952

953
        if req.Name == s.Options.defaultDBName || req.Name == s.Options.systemAdminDBName {
143✔
954
                return nil, ErrReservedDatabase
1✔
955
        }
1✔
956

957
        req.Name = strings.ToLower(req.Name)
141✔
958
        if err = isValidDBName(req.Name); err != nil {
142✔
959
                return nil, err
1✔
960
        }
1✔
961

962
        s.dbListMutex.Lock()
140✔
963
        defer s.dbListMutex.Unlock()
140✔
964

140✔
965
        //check if database exists
140✔
966
        if s.dbList.GetId(req.Name) >= 0 {
146✔
967
                if !req.IfNotExists {
10✔
968
                        return nil, database.ErrDatabaseAlreadyExists
4✔
969
                }
4✔
970

971
                dbOpts, err := s.loadDBOptions(req.Name, false)
2✔
972
                if err != nil {
2✔
973
                        return nil, fmt.Errorf("%w: while loading database settings", err)
×
974
                }
×
975

976
                return &schema.CreateDatabaseResponse{
2✔
977
                        Name:           req.Name,
2✔
978
                        Settings:       dbOpts.databaseNullableSettings(),
2✔
979
                        AlreadyExisted: true,
2✔
980
                }, nil
2✔
981
        }
982

983
        dbOpts := s.defaultDBOptions(req.Name)
134✔
984

134✔
985
        if req.Settings != nil {
258✔
986
                err = s.overwriteWith(dbOpts, req.Settings, false)
124✔
987
                if err != nil {
125✔
988
                        return nil, err
1✔
989
                }
1✔
990
        }
991

992
        err = s.saveDBOptions(dbOpts)
133✔
993
        if err != nil {
133✔
994
                return nil, err
×
995
        }
×
996

997
        db, err := database.NewDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger)
133✔
998
        if err != nil {
133✔
999
                return nil, err
×
1000
        }
×
1001

1002
        s.dbList.Put(db)
133✔
1003
        s.multidbmode = true
133✔
1004

133✔
1005
        s.logDBOptions(db.GetName(), dbOpts)
133✔
1006

133✔
1007
        err = s.startReplicationFor(db, dbOpts)
133✔
1008
        if err != nil && err != ErrReplicatorNotNeeded {
133✔
1009
                return nil, fmt.Errorf("%w: while starting replication", err)
×
1010
        }
×
1011

1012
        err = s.startTruncatorFor(db, dbOpts)
133✔
1013
        if err != nil && err != ErrTruncatorNotNeeded {
133✔
1014
                return nil, fmt.Errorf("%w: while starting truncation", err)
×
1015
        }
×
1016

1017
        return &schema.CreateDatabaseResponse{
133✔
1018
                Name:     req.Name,
133✔
1019
                Settings: dbOpts.databaseNullableSettings(),
133✔
1020
        }, nil
133✔
1021
}
1022

1023
func (s *ImmuServer) LoadDatabase(ctx context.Context, req *schema.LoadDatabaseRequest) (res *schema.LoadDatabaseResponse, err error) {
9✔
1024
        if req == nil {
10✔
1025
                return nil, ErrIllegalArguments
1✔
1026
        }
1✔
1027

1028
        s.Logger.Infof("loadinig database '%s'...", req.Database)
8✔
1029

8✔
1030
        defer func() {
16✔
1031
                if err == nil {
9✔
1032
                        s.Logger.Infof("database '%s' successfully loaded", req.Database)
1✔
1033
                } else {
8✔
1034
                        s.Logger.Infof("database '%s' could not be loaded. Reason: %v", req.Database, err)
7✔
1035
                }
7✔
1036
        }()
1037

1038
        if req.Database == s.Options.defaultDBName || req.Database == s.Options.systemAdminDBName {
10✔
1039
                return nil, ErrReservedDatabase
2✔
1040
        }
2✔
1041

1042
        if !s.Options.GetAuth() {
6✔
1043
                return nil, ErrAuthMustBeEnabled
×
1044
        }
×
1045

1046
        _, user, err := s.getLoggedInUserdataFromCtx(ctx)
6✔
1047
        if err != nil {
6✔
1048
                return nil, fmt.Errorf("could not get loggedin user data")
×
1049
        }
×
1050

1051
        //if the requesting user has admin permission on this database
1052
        if (!user.IsSysAdmin) &&
6✔
1053
                (!user.HasPermission(req.Database, auth.PermissionAdmin)) {
6✔
1054
                return nil, fmt.Errorf("the database '%s' does not exist or you do not have admin permission on this database", req.Database)
×
1055
        }
×
1056

1057
        s.dbListMutex.Lock()
6✔
1058
        defer s.dbListMutex.Unlock()
6✔
1059

6✔
1060
        db, err := s.dbList.GetByName(req.Database)
6✔
1061
        if err != nil {
9✔
1062
                return nil, err
3✔
1063
        }
3✔
1064

1065
        if !db.IsClosed() {
5✔
1066
                return nil, ErrDatabaseAlreadyLoaded
2✔
1067
        }
2✔
1068

1069
        dbOpts, err := s.loadDBOptions(req.Database, false)
1✔
1070
        if err == store.ErrKeyNotFound {
1✔
1071
                return nil, fmt.Errorf("%w: while opening database '%s'", database.ErrDatabaseNotExists, req.Database)
×
1072
        }
×
1073
        if err != nil {
1✔
1074
                return nil, fmt.Errorf("%w: while loading database settings", err)
×
1075
        }
×
1076

1077
        db, err = database.OpenDB(req.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger)
1✔
1078
        if err != nil {
1✔
1079
                return nil, fmt.Errorf("%w: while opening database", err)
×
1080
        }
×
1081

1082
        s.dbList.Put(db)
1✔
1083

1✔
1084
        if dbOpts.isReplicatorRequired() {
1✔
1085
                err = s.startReplicationFor(db, dbOpts)
×
1086
                if err != nil && err != ErrReplicatorNotNeeded {
×
1087
                        return nil, fmt.Errorf("%w: while starting replication", err)
×
1088
                }
×
1089
        }
1090

1091
        err = s.startTruncatorFor(db, dbOpts)
1✔
1092
        if err != nil && err != ErrTruncatorNotNeeded {
1✔
1093
                return nil, fmt.Errorf("%w: while starting truncation", err)
×
1094
        }
×
1095

1096
        return &schema.LoadDatabaseResponse{
1✔
1097
                Database: req.Database,
1✔
1098
        }, nil
1✔
1099
}
1100

1101
func (s *ImmuServer) UnloadDatabase(ctx context.Context, req *schema.UnloadDatabaseRequest) (res *schema.UnloadDatabaseResponse, err error) {
10✔
1102
        if req == nil {
11✔
1103
                return nil, ErrIllegalArguments
1✔
1104
        }
1✔
1105

1106
        s.Logger.Infof("unloading database '%s'...", req.Database)
9✔
1107

9✔
1108
        defer func() {
18✔
1109
                if err == nil {
13✔
1110
                        s.Logger.Infof("database '%s' successfully unloaded", req.Database)
4✔
1111
                } else {
9✔
1112
                        s.Logger.Infof("database '%s' could not be unloaded. Reason: %v", req.Database, err)
5✔
1113
                }
5✔
1114
        }()
1115

1116
        if req.Database == s.Options.defaultDBName || req.Database == s.Options.systemAdminDBName {
11✔
1117
                return nil, ErrReservedDatabase
2✔
1118
        }
2✔
1119

1120
        if !s.Options.GetAuth() {
7✔
1121
                return nil, ErrAuthMustBeEnabled
×
1122
        }
×
1123

1124
        _, user, err := s.getLoggedInUserdataFromCtx(ctx)
7✔
1125
        if err != nil {
7✔
1126
                return nil, fmt.Errorf("could not get loggedin user data")
×
1127
        }
×
1128

1129
        //if the requesting user has admin permission on this database
1130
        if (!user.IsSysAdmin) &&
7✔
1131
                (!user.HasPermission(req.Database, auth.PermissionAdmin)) {
7✔
1132
                return nil, fmt.Errorf("the database '%s' does not exist or you do not have admin permission on this database", req.Database)
×
1133
        }
×
1134

1135
        s.dbListMutex.Lock()
7✔
1136
        defer s.dbListMutex.Unlock()
7✔
1137

7✔
1138
        db, err := s.dbList.GetByName(req.Database)
7✔
1139
        if err != nil {
9✔
1140
                return nil, err
2✔
1141
        }
2✔
1142

1143
        if db.IsClosed() {
6✔
1144
                return nil, store.ErrAlreadyClosed
1✔
1145
        }
1✔
1146

1147
        dbOpts, err := s.loadDBOptions(req.Database, false)
4✔
1148
        if err != nil {
4✔
1149
                return nil, fmt.Errorf("%w: while reading database settings", err)
×
1150
        }
×
1151

1152
        if dbOpts.isReplicatorRequired() {
4✔
1153
                err = s.stopReplicationFor(req.Database)
×
1154
                if err != nil && err != ErrReplicationNotInProgress {
×
1155
                        return nil, fmt.Errorf("%w: while stopping replication", err)
×
1156
                }
×
1157
        }
1158

1159
        if dbOpts.isDataRetentionEnabled() {
5✔
1160
                err = s.stopTruncatorFor(req.Database)
1✔
1161
                if err != nil && err != ErrTruncatorNotInProgress {
1✔
1162
                        return nil, fmt.Errorf("%w: while stopping truncation", err)
×
1163
                }
×
1164
        }
1165

1166
        err = db.Close()
4✔
1167
        if err != nil {
4✔
1168
                return nil, err
×
1169
        }
×
1170

1171
        return &schema.UnloadDatabaseResponse{
4✔
1172
                Database: req.Database,
4✔
1173
        }, nil
4✔
1174
}
1175

1176
func (s *ImmuServer) DeleteDatabase(ctx context.Context, req *schema.DeleteDatabaseRequest) (res *schema.DeleteDatabaseResponse, err error) {
8✔
1177
        if req == nil {
9✔
1178
                return nil, ErrIllegalArguments
1✔
1179
        }
1✔
1180

1181
        s.Logger.Infof("deleting database '%s'...", req.Database)
7✔
1182

7✔
1183
        defer func() {
14✔
1184
                if err == nil {
9✔
1185
                        s.Logger.Infof("database '%s' successfully deleted", req.Database)
2✔
1186
                } else {
7✔
1187
                        s.Logger.Infof("database '%s' could not be deleted. Reason: %v", req.Database, err)
5✔
1188
                }
5✔
1189
        }()
1190

1191
        if !s.Options.GetAuth() {
7✔
1192
                return nil, ErrAuthMustBeEnabled
×
1193
        }
×
1194

1195
        if req.Database == s.Options.defaultDBName || req.Database == s.Options.systemAdminDBName {
9✔
1196
                return nil, ErrReservedDatabase
2✔
1197
        }
2✔
1198

1199
        _, user, err := s.getLoggedInUserdataFromCtx(ctx)
5✔
1200
        if err != nil {
5✔
1201
                return nil, fmt.Errorf("could not get loggedin user data")
×
1202
        }
×
1203

1204
        //if the requesting user has admin permission on this database
1205
        if (!user.IsSysAdmin) &&
5✔
1206
                (!user.HasPermission(req.Database, auth.PermissionAdmin)) {
5✔
1207
                return nil, fmt.Errorf("the database '%s' does not exist or you do not have admin permission on this database", req.Database)
×
1208
        }
×
1209

1210
        s.dbListMutex.Lock()
5✔
1211
        defer s.dbListMutex.Unlock()
5✔
1212

5✔
1213
        db, err := s.dbList.Delete(req.Database)
5✔
1214
        if err != nil {
8✔
1215
                return nil, err
3✔
1216
        }
3✔
1217

1218
        err = s.deleteDBOptionsFor(req.Database)
2✔
1219
        if err != nil {
2✔
1220
                return nil, err
×
1221
        }
×
1222

1223
        err = os.RemoveAll(db.Path())
2✔
1224
        if err != nil {
2✔
1225
                return nil, err
×
1226
        }
×
1227

1228
        return &schema.DeleteDatabaseResponse{
2✔
1229
                Database: req.Database,
2✔
1230
        }, nil
2✔
1231
}
1232

1233
// UpdateDatabase Updates database settings
1234
func (s *ImmuServer) UpdateDatabase(ctx context.Context, req *schema.DatabaseSettings) (*empty.Empty, error) {
7✔
1235
        if req == nil {
8✔
1236
                return nil, ErrIllegalArguments
1✔
1237
        }
1✔
1238

1239
        _, err := s.UpdateDatabaseV2(ctx, &schema.UpdateDatabaseRequest{
6✔
1240
                Database: req.DatabaseName,
6✔
1241
                Settings: dbSettingsToDBNullableSettings(req),
6✔
1242
        })
6✔
1243
        if err != nil {
10✔
1244
                return nil, err
4✔
1245
        }
4✔
1246

1247
        return &empty.Empty{}, nil
2✔
1248
}
1249

1250
// UpdateDatabaseV2 Updates database settings
1251
func (s *ImmuServer) UpdateDatabaseV2(ctx context.Context, req *schema.UpdateDatabaseRequest) (res *schema.UpdateDatabaseResponse, err error) {
28✔
1252
        if req == nil {
30✔
1253
                return nil, ErrIllegalArguments
2✔
1254
        }
2✔
1255

1256
        s.Logger.Infof("updating database settings for '%s'...", req.Database)
26✔
1257

26✔
1258
        defer func() {
52✔
1259
                if err == nil {
39✔
1260
                        s.Logger.Infof("database '%s' successfully updated", req.Database)
13✔
1261
                } else {
26✔
1262
                        s.Logger.Infof("database '%s' could not be updated. Reason: %v", req.Database, err)
13✔
1263
                }
13✔
1264
        }()
1265

1266
        if s.Options.GetMaintenance() {
27✔
1267
                return nil, ErrNotAllowedInMaintenanceMode
1✔
1268
        }
1✔
1269

1270
        if !s.Options.GetAuth() {
27✔
1271
                return nil, ErrAuthMustBeEnabled
2✔
1272
        }
2✔
1273

1274
        if req.Database == s.Options.defaultDBName || req.Database == s.Options.systemAdminDBName {
29✔
1275
                return nil, ErrReservedDatabase
6✔
1276
        }
6✔
1277

1278
        _, user, err := s.getLoggedInUserdataFromCtx(ctx)
17✔
1279
        if err != nil {
17✔
1280
                return nil, fmt.Errorf("could not get loggedin user data")
×
1281
        }
×
1282

1283
        //if the requesting user has admin permission on this database
1284
        if (!user.IsSysAdmin) &&
17✔
1285
                (!user.HasPermission(req.Database, auth.PermissionAdmin)) {
17✔
1286
                return nil, fmt.Errorf("the database '%s' does not exist or you do not have admin permission on this database", req.Database)
×
1287
        }
×
1288

1289
        s.dbListMutex.Lock()
17✔
1290
        defer s.dbListMutex.Unlock()
17✔
1291

17✔
1292
        dbOpts, err := s.loadDBOptions(req.Database, false)
17✔
1293
        if err == store.ErrKeyNotFound {
20✔
1294
                return nil, database.ErrDatabaseNotExists
3✔
1295
        }
3✔
1296
        if err != nil {
14✔
1297
                return nil, fmt.Errorf("%w: while loading database settings", err)
×
1298
        }
×
1299

1300
        db, err := s.dbList.GetByName(req.Database)
14✔
1301
        if err != nil {
14✔
1302
                return nil, err
×
1303
        }
×
1304

1305
        if req.Settings.ReplicationSettings != nil && !db.IsClosed() {
25✔
1306
                err = s.stopReplicationFor(req.Database)
11✔
1307
                if err != nil && err != ErrReplicationNotInProgress {
11✔
1308
                        return nil, fmt.Errorf("%w: while stopping replication", err)
×
1309
                }
×
1310
        }
1311

1312
        if req.Settings.TruncationSettings != nil && !db.IsClosed() {
14✔
1313
                err = s.stopTruncatorFor(req.Database)
×
1314
                if err != nil && err != ErrTruncatorNotInProgress {
×
1315
                        return nil, fmt.Errorf("%w: while stopping truncation", err)
×
1316
                }
×
1317
        }
1318

1319
        err = s.overwriteWith(dbOpts, req.Settings, true)
14✔
1320
        if err != nil {
15✔
1321
                return nil, err
1✔
1322
        }
1✔
1323

1324
        dbOpts.UpdatedBy = user.Username
13✔
1325

13✔
1326
        err = s.saveDBOptions(dbOpts)
13✔
1327
        if err != nil {
13✔
1328
                return nil, fmt.Errorf("%w: while saving updated settings", err)
×
1329
        }
×
1330

1331
        s.logDBOptions(db.GetName(), dbOpts)
13✔
1332

13✔
1333
        if !db.IsClosed() {
26✔
1334
                db.AsReplica(dbOpts.Replica, dbOpts.SyncReplication, dbOpts.SyncAcks)
13✔
1335
        }
13✔
1336

1337
        if req.Settings.ReplicationSettings != nil && !db.IsClosed() {
24✔
1338
                err = s.startReplicationFor(db, dbOpts)
11✔
1339
                if err != nil && err != ErrReplicatorNotNeeded {
11✔
1340
                        return nil, fmt.Errorf("%w: while staring replication", err)
×
1341
                }
×
1342
        }
1343

1344
        if req.Settings.TruncationSettings != nil && !db.IsClosed() {
13✔
1345
                err = s.startTruncatorFor(db, dbOpts)
×
1346
                if err != nil && err != ErrTruncatorNotNeeded {
×
1347
                        return nil, fmt.Errorf("%w: while starting truncation", err)
×
1348
                }
×
1349
        }
1350

1351
        return &schema.UpdateDatabaseResponse{
13✔
1352
                Database: req.Database,
13✔
1353
                Settings: dbOpts.databaseNullableSettings(),
13✔
1354
        }, nil
13✔
1355
}
1356

1357
func (s *ImmuServer) GetDatabaseSettings(ctx context.Context, _ *empty.Empty) (*schema.DatabaseSettings, error) {
1✔
1358
        res, err := s.GetDatabaseSettingsV2(ctx, &schema.DatabaseSettingsRequest{})
1✔
1359
        if err != nil {
1✔
1360
                return nil, err
×
1361
        }
×
1362

1363
        ret := &schema.DatabaseSettings{
1✔
1364
                DatabaseName: res.Database,
1✔
1365
        }
1✔
1366

1✔
1367
        if res.Settings.ReplicationSettings != nil {
2✔
1368
                if res.Settings.ReplicationSettings.Replica != nil {
2✔
1369
                        ret.Replica = res.Settings.ReplicationSettings.Replica.Value
1✔
1370
                }
1✔
1371
                if res.Settings.ReplicationSettings.PrimaryDatabase != nil {
2✔
1372
                        ret.PrimaryDatabase = res.Settings.ReplicationSettings.PrimaryDatabase.Value
1✔
1373
                }
1✔
1374
                if res.Settings.ReplicationSettings.PrimaryHost != nil {
2✔
1375
                        ret.PrimaryHost = res.Settings.ReplicationSettings.PrimaryHost.Value
1✔
1376
                }
1✔
1377
                if res.Settings.ReplicationSettings.PrimaryPort != nil {
2✔
1378
                        ret.PrimaryPort = res.Settings.ReplicationSettings.PrimaryPort.Value
1✔
1379
                }
1✔
1380
                if res.Settings.ReplicationSettings.PrimaryUsername != nil {
2✔
1381
                        ret.PrimaryUsername = res.Settings.ReplicationSettings.PrimaryUsername.Value
1✔
1382
                }
1✔
1383
                if res.Settings.ReplicationSettings.PrimaryPassword != nil {
2✔
1384
                        ret.PrimaryPassword = res.Settings.ReplicationSettings.PrimaryPassword.Value
1✔
1385
                }
1✔
1386
        }
1387

1388
        if res.Settings.FileSize != nil {
2✔
1389
                ret.FileSize = res.Settings.FileSize.Value
1✔
1390
        }
1✔
1391
        if res.Settings.MaxKeyLen != nil {
2✔
1392
                ret.MaxKeyLen = res.Settings.MaxKeyLen.Value
1✔
1393
        }
1✔
1394
        if res.Settings.MaxValueLen != nil {
2✔
1395
                ret.MaxValueLen = res.Settings.MaxValueLen.Value
1✔
1396
        }
1✔
1397
        if res.Settings.MaxTxEntries != nil {
2✔
1398
                ret.MaxTxEntries = res.Settings.MaxTxEntries.Value
1✔
1399
        }
1✔
1400
        if res.Settings.ExcludeCommitTime != nil {
2✔
1401
                ret.ExcludeCommitTime = res.Settings.ExcludeCommitTime.Value
1✔
1402
        }
1✔
1403

1404
        return ret, nil
1✔
1405
}
1406

1407
func (s *ImmuServer) GetDatabaseSettingsV2(ctx context.Context, _ *schema.DatabaseSettingsRequest) (*schema.DatabaseSettingsResponse, error) {
4✔
1408
        db, err := s.getDBFromCtx(ctx, "DatabaseSettings")
4✔
1409
        if err != nil {
4✔
1410
                return nil, err
×
1411
        }
×
1412

1413
        dbOpts, err := s.loadDBOptions(db.GetName(), false)
4✔
1414
        if err != nil {
4✔
1415
                return nil, err
×
1416
        }
×
1417

1418
        return &schema.DatabaseSettingsResponse{
4✔
1419
                Database: db.GetName(),
4✔
1420
                Settings: dbOpts.databaseNullableSettings(),
4✔
1421
        }, nil
4✔
1422
}
1423

1424
// DatabaseList returns a list of databases based on the requesting user permissions
1425
func (s *ImmuServer) DatabaseList(ctx context.Context, _ *empty.Empty) (*schema.DatabaseListResponse, error) {
35✔
1426
        dbsWithSettings, err := s.DatabaseListV2(ctx, &schema.DatabaseListRequestV2{})
35✔
1427
        if err != nil {
38✔
1428
                return nil, err
3✔
1429
        }
3✔
1430

1431
        resp := &schema.DatabaseListResponse{}
32✔
1432

32✔
1433
        for _, db := range dbsWithSettings.Databases {
67✔
1434
                resp.Databases = append(resp.Databases, &schema.Database{DatabaseName: db.Name})
35✔
1435
        }
35✔
1436

1437
        return resp, nil
32✔
1438
}
1439

1440
// DatabaseList returns a list of databases based on the requesting user permissions
1441
func (s *ImmuServer) DatabaseListV2(ctx context.Context, req *schema.DatabaseListRequestV2) (*schema.DatabaseListResponseV2, error) {
36✔
1442
        if !s.Options.GetAuth() {
37✔
1443
                return nil, fmt.Errorf("this command is available only with authentication on")
1✔
1444
        }
1✔
1445

1446
        resp := &schema.DatabaseListResponseV2{}
35✔
1447

35✔
1448
        databases, err := s.listLoggedInUserDatabases(ctx)
35✔
1449
        if err != nil {
37✔
1450
                return nil, err
2✔
1451
        }
2✔
1452

1453
        for _, db := range databases {
70✔
1454
                dbOpts, err := s.loadDBOptions(db.GetName(), false)
37✔
1455
                if err != nil {
37✔
1456
                        return nil, err
×
1457
                }
×
1458

1459
                size, err := db.Size()
37✔
1460
                if err != nil {
37✔
1461
                        return nil, err
×
1462
                }
×
1463

1464
                txCount, err := db.TxCount()
37✔
1465
                if err != nil {
37✔
1466
                        return nil, err
×
1467
                }
×
1468

1469
                info := &schema.DatabaseInfo{
37✔
1470
                        Name:            db.GetName(),
37✔
1471
                        Settings:        dbOpts.databaseNullableSettings(),
37✔
1472
                        Loaded:          !db.IsClosed(),
37✔
1473
                        DiskSize:        size,
37✔
1474
                        NumTransactions: txCount,
37✔
1475
                }
37✔
1476
                resp.Databases = append(resp.Databases, info)
37✔
1477
        }
1478
        return resp, nil
33✔
1479
}
1480

1481
func (s *ImmuServer) listLoggedInUserDatabases(ctx context.Context) ([]database.DB, error) {
35✔
1482
        _, loggedInuser, err := s.getLoggedInUserdataFromCtx(ctx)
35✔
1483
        if err != nil {
37✔
1484
                return nil, fmt.Errorf("please login")
2✔
1485
        }
2✔
1486

1487
        databases := make([]database.DB, 0, s.dbList.Length())
33✔
1488
        if loggedInuser.IsSysAdmin || s.Options.GetMaintenance() {
65✔
1489
                for i := 0; i < s.dbList.Length(); i++ {
68✔
1490
                        db, err := s.dbList.GetByIndex(i)
36✔
1491
                        if err == database.ErrDatabaseNotExists {
36✔
1492
                                continue
×
1493
                        }
1494
                        if err != nil {
36✔
1495
                                return nil, err
×
1496
                        }
×
1497
                        databases = append(databases, db)
36✔
1498
                }
1499
        } else {
1✔
1500
                for _, perm := range loggedInuser.Permissions {
2✔
1501
                        db, err := s.dbList.GetByName(perm.Database)
1✔
1502
                        if err == database.ErrDatabaseNotExists {
1✔
1503
                                continue
×
1504
                        }
1505
                        if err != nil {
1✔
1506
                                return nil, err
×
1507
                        }
×
1508
                        databases = append(databases, db)
1✔
1509
                }
1510
        }
1511
        return databases, nil
33✔
1512
}
1513

1514
// UseDatabase ...
1515
func (s *ImmuServer) UseDatabase(ctx context.Context, req *schema.Database) (*schema.UseDatabaseReply, error) {
128✔
1516
        if req == nil {
128✔
1517
                return nil, ErrIllegalArguments
×
1518
        }
×
1519

1520
        user := &auth.User{}
128✔
1521
        var err error
128✔
1522

128✔
1523
        if s.Options.GetAuth() {
255✔
1524
                _, user, err = s.getLoggedInUserdataFromCtx(ctx)
127✔
1525
                if err != nil {
128✔
1526
                        if strings.HasPrefix(fmt.Sprintf("%s", err), "token has expired") {
1✔
1527
                                return nil, status.Error(codes.PermissionDenied, err.Error())
×
1528
                        }
×
1529
                        return nil, status.Errorf(codes.Unauthenticated, "Please login")
1✔
1530
                }
1531
        } else {
1✔
1532
                if !s.Options.GetMaintenance() {
2✔
1533
                        return nil, fmt.Errorf("this command is available only with authentication on")
1✔
1534
                }
1✔
1535

1536
                user.IsSysAdmin = true
×
1537
                user.Username = ""
×
1538
                s.addUserToLoginList(user)
×
1539
        }
1540

1541
        dbid := sysDBIndex
126✔
1542
        db := s.sysDB
126✔
1543

126✔
1544
        if req.DatabaseName != SystemDBName {
250✔
1545
                //check if database exists
124✔
1546
                dbid = s.dbList.GetId(req.DatabaseName)
124✔
1547
                if dbid < 0 {
125✔
1548
                        return nil, errors.New(fmt.Sprintf("'%s' does not exist", req.DatabaseName)).WithCode(errors.CodInvalidDatabaseName)
1✔
1549
                }
1✔
1550

1551
                db, err = s.dbList.GetByIndex(dbid)
123✔
1552
                if err != nil {
123✔
1553
                        return nil, err
×
1554
                }
×
1555

1556
                if db.IsClosed() {
123✔
1557
                        return nil, store.ErrAlreadyClosed
×
1558
                }
×
1559
        }
1560

1561
        //check if this user has permission on this database
1562
        //if sysadmin allow to continue
1563
        if (!user.IsSysAdmin) &&
125✔
1564
                (!user.HasPermission(req.DatabaseName, auth.PermissionAdmin)) &&
125✔
1565
                (!user.HasPermission(req.DatabaseName, auth.PermissionR)) &&
125✔
1566
                (!user.HasPermission(req.DatabaseName, auth.PermissionRW)) {
128✔
1567

3✔
1568
                return nil, status.Errorf(codes.PermissionDenied, "Logged in user does not have permission on this database")
3✔
1569
        }
3✔
1570

1571
        token, err := auth.GenerateToken(*user, int64(dbid), s.Options.TokenExpiryTimeMin)
122✔
1572
        if err != nil {
122✔
1573
                return nil, err
×
1574
        }
×
1575

1576
        if auth.GetAuthTypeFromContext(ctx) == auth.SessionAuth {
147✔
1577
                sessionID, err := sessions.GetSessionIDFromContext(ctx)
25✔
1578
                if err != nil {
25✔
1579
                        return nil, err
×
1580
                }
×
1581
                sess, err := s.SessManager.GetSession(sessionID)
25✔
1582
                if err != nil {
25✔
1583
                        return nil, err
×
1584
                }
×
1585
                sess.SetDatabase(db)
25✔
1586
        }
1587

1588
        return &schema.UseDatabaseReply{
122✔
1589
                Token: token,
122✔
1590
        }, nil
122✔
1591
}
1592

1593
// getDBFromCtx checks if user (loggedin from context) has access to methodName.
1594
// returns selected database
1595
func (s *ImmuServer) getDBFromCtx(ctx context.Context, methodName string) (database.DB, error) {
16,894✔
1596
        //if auth is disabled and there is not user created databases returns defaultdb
16,894✔
1597
        if !s.Options.auth && !s.multidbmode && !s.Options.GetMaintenance() {
16,896✔
1598
                db, _ := s.dbList.GetByIndex(defaultDbIndex)
2✔
1599
                return db, nil
2✔
1600
        }
2✔
1601

1602
        if s.Options.GetMaintenance() && !auth.IsMaintenanceMethod(methodName) {
16,893✔
1603
                return nil, ErrNotAllowedInMaintenanceMode
1✔
1604
        }
1✔
1605

1606
        ind, usr, err := s.getLoggedInUserdataFromCtx(ctx)
16,891✔
1607
        if err != nil {
16,943✔
1608
                if strings.HasPrefix(fmt.Sprintf("%s", err), "token has expired") {
52✔
1609
                        return nil, status.Error(codes.PermissionDenied, err.Error())
×
1610
                }
×
1611
                if s.Options.GetMaintenance() && !s.Options.auth {
52✔
1612
                        return nil, fmt.Errorf("please select database first")
×
1613
                }
×
1614
                return nil, err
52✔
1615
        }
1616

1617
        if ind < 0 {
16,839✔
1618
                return nil, fmt.Errorf("please select a database first")
×
1619
        }
×
1620

1621
        // systemdb is always read-only from external access
1622
        if ind == sysDBIndex && !auth.IsMaintenanceMethod(methodName) {
16,839✔
1623
                return nil, ErrPermissionDenied
×
1624
        }
×
1625

1626
        var db database.DB
16,839✔
1627

16,839✔
1628
        if ind == sysDBIndex {
16,841✔
1629
                db = s.sysDB
2✔
1630
        } else {
16,839✔
1631
                db, err = s.dbList.GetByIndex(ind)
16,837✔
1632
                if err != nil {
16,837✔
1633
                        return nil, err
×
1634
                }
×
1635
        }
1636

1637
        if usr.IsSysAdmin {
23,563✔
1638
                return db, nil
6,724✔
1639
        }
6,724✔
1640

1641
        if ok := auth.HasPermissionForMethod(usr.WhichPermission(db.GetName()), methodName); !ok {
10,115✔
1642
                return nil, ErrPermissionDenied
×
1643
        }
×
1644

1645
        return db, nil
10,115✔
1646
}
1647

1648
// isValidDBName checks if the provided database name meets the requirements
1649
func isValidDBName(dbName string) error {
148✔
1650
        if len(dbName) < 1 || len(dbName) > 128 {
151✔
1651
                return fmt.Errorf("database name length outside of limits")
3✔
1652
        }
3✔
1653

1654
        var hasSpecial bool
145✔
1655

145✔
1656
        for _, ch := range dbName {
947✔
1657
                switch {
802✔
1658
                case unicode.IsLower(ch):
656✔
1659
                case unicode.IsDigit(ch):
140✔
1660
                case ch == '_':
4✔
1661
                case unicode.IsPunct(ch) || unicode.IsSymbol(ch):
1✔
1662
                        hasSpecial = true
1✔
1663
                default:
1✔
1664
                        return fmt.Errorf("unrecognized character in database name")
1✔
1665
                }
1666
        }
1667

1668
        if hasSpecial {
145✔
1669
                return fmt.Errorf("punctuation marks and symbols are not allowed in database name")
1✔
1670
        }
1✔
1671

1672
        return nil
143✔
1673
}
1674

1675
// checkMandatoryAuth checks if auth should be madatory for immudb to start
1676
func (s *ImmuServer) mandatoryAuth() bool {
222✔
1677
        if s.Options.GetMaintenance() {
224✔
1678
                return false
2✔
1679
        }
2✔
1680

1681
        //check if there are user created databases, should be zero for auth to be off
1682
        if s.dbList.Length() > 1 {
225✔
1683
                return true
5✔
1684
        }
5✔
1685

1686
        //check if there is only sysadmin on systemdb and no other user
1687
        itemList, err := s.sysDB.Scan(context.Background(), &schema.ScanRequest{
215✔
1688
                Prefix: []byte{KeyPrefixUser},
215✔
1689
        })
215✔
1690

215✔
1691
        if err != nil {
215✔
1692
                s.Logger.Errorf("error getting users: %v", err)
×
1693
                return true
×
1694
        }
×
1695

1696
        for _, val := range itemList.Entries {
429✔
1697
                if len(val.Key) > 2 {
428✔
1698
                        if auth.SysAdminUsername != string(val.Key[1:]) {
215✔
1699
                                //another user detected
1✔
1700
                                return true
1✔
1701
                        }
1✔
1702
                }
1703
        }
1704

1705
        //systemdb exists but there are no other users created
1706
        return false
214✔
1707
}
1708

1709
func (s *ImmuServer) TruncateDatabase(ctx context.Context, req *schema.TruncateDatabaseRequest) (res *schema.TruncateDatabaseResponse, err error) {
6✔
1710
        if req == nil {
6✔
1711
                return nil, ErrIllegalArguments
×
1712
        }
×
1713

1714
        s.Logger.Infof("truncating database '%s'...", req.Database)
6✔
1715

6✔
1716
        defer func() {
12✔
1717
                if err == nil {
6✔
1718
                        s.Logger.Infof("database '%s' successfully truncated", req.Database)
×
1719
                } else {
6✔
1720
                        s.Logger.Infof("database '%s' could not be truncated. Reason: %v", req.Database, err)
6✔
1721
                }
6✔
1722
        }()
1723

1724
        if !s.Options.GetAuth() {
6✔
1725
                return nil, ErrAuthMustBeEnabled
×
1726
        }
×
1727

1728
        if req.Database == s.Options.defaultDBName || req.Database == s.Options.systemAdminDBName {
7✔
1729
                return nil, ErrReservedDatabase
1✔
1730
        }
1✔
1731

1732
        _, user, err := s.getLoggedInUserdataFromCtx(ctx)
5✔
1733
        if err != nil {
5✔
1734
                return nil, fmt.Errorf("could not get loggedin user data")
×
1735
        }
×
1736

1737
        //if the requesting user has admin permission on this database
1738
        if (!user.IsSysAdmin) &&
5✔
1739
                (!user.HasPermission(req.Database, auth.PermissionAdmin)) {
5✔
1740
                return nil, fmt.Errorf("the database '%s' does not exist or you do not have admin permission on this database", req.Database)
×
1741
        }
×
1742

1743
        if req.RetentionPeriod < 0 || (req.RetentionPeriod > 0 && req.RetentionPeriod < store.MinimumRetentionPeriod.Milliseconds()) {
7✔
1744
                return nil, fmt.Errorf(
2✔
1745
                        "%w: invalid retention period for database '%s'. RetentionPeriod should at least '%v' hours",
2✔
1746
                        ErrIllegalArguments, req.Database, store.MinimumRetentionPeriod)
2✔
1747
        }
2✔
1748

1749
        s.dbListMutex.Lock()
3✔
1750
        defer s.dbListMutex.Unlock()
3✔
1751

3✔
1752
        db, err := s.dbList.GetByName(req.Database)
3✔
1753
        if err != nil {
5✔
1754
                return nil, err
2✔
1755
        }
2✔
1756

1757
        rp := time.Duration(req.RetentionPeriod) * time.Millisecond
1✔
1758

1✔
1759
        // check if truncator already exists for the database
1✔
1760
        var t *truncator.Truncator
1✔
1761

1✔
1762
        t, err = s.getTruncatorFor(db.GetName())
1✔
1763
        if err == ErrTruncatorDoesNotExist {
2✔
1764
                t = truncator.NewTruncator(db, rp, 0, s.Logger)
1✔
1765
        }
1✔
1766

1767
        err = t.Truncate(ctx, rp)
1✔
1768
        if err != nil {
2✔
1769
                return nil, err
1✔
1770
        }
1✔
1771

1772
        return &schema.TruncateDatabaseResponse{
×
1773
                Database: req.Database,
×
1774
        }, nil
×
1775
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc