• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6562442758

18 Oct 2023 02:33PM UTC coverage: 89.595% (-0.003%) from 89.598%
6562442758

push

gh-ci

jeroiraz
feat(embedded/sql): table renaming

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

175 of 175 new or added lines in 3 files covered. (100.0%)

33546 of 37442 relevant lines covered (89.59%)

145176.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.69
/pkg/server/stream_replication.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "bytes"
21
        "encoding/binary"
22
        "strconv"
23

24
        "github.com/codenotary/immudb/pkg/api/schema"
25
        "github.com/codenotary/immudb/pkg/stream"
26
        "google.golang.org/grpc/metadata"
27
)
28

29
func (s *ImmuServer) ExportTx(req *schema.ExportTxRequest, txsServer schema.ImmuService_ExportTxServer) error {
6✔
30
        return s.exportTx(req, txsServer, true, make([]byte, s.Options.StreamChunkSize))
6✔
31
}
6✔
32

33
// StreamExportTx implements the bidirectional streaming endpoint used to export transactions
34
func (s *ImmuServer) StreamExportTx(stream schema.ImmuService_StreamExportTxServer) error {
35✔
35
        buf := make([]byte, s.Options.StreamChunkSize)
35✔
36

35✔
37
        for {
8,476✔
38
                req, err := stream.Recv()
8,441✔
39
                if err != nil {
8,441✔
40
                        return err
×
41
                }
×
42

43
                err = s.exportTx(req, stream, false, buf)
8,441✔
44
                if err != nil {
8,476✔
45
                        return err
35✔
46
                }
35✔
47
        }
48
}
49

50
func (s *ImmuServer) exportTx(req *schema.ExportTxRequest, txsServer schema.ImmuService_ExportTxServer, setTrailer bool, buf []byte) error {
8,447✔
51
        if req == nil || req.Tx == 0 || txsServer == nil {
8,449✔
52
                return ErrIllegalArguments
2✔
53
        }
2✔
54

55
        db, err := s.getDBFromCtx(txsServer.Context(), "ExportTx")
8,445✔
56
        if err != nil {
8,447✔
57
                return err
2✔
58
        }
2✔
59

60
        txbs, mayCommitUpToTxID, mayCommitUpToAlh, err := db.ExportTxByID(txsServer.Context(), req)
8,443✔
61
        if err != nil {
8,445✔
62
                return err
2✔
63
        }
2✔
64

65
        var streamMetadata map[string][]byte
8,441✔
66

8,441✔
67
        if req.ReplicaState != nil {
15,202✔
68
                var bMayCommitUpToTxID [8]byte
6,761✔
69
                binary.BigEndian.PutUint64(bMayCommitUpToTxID[:], mayCommitUpToTxID)
6,761✔
70

6,761✔
71
                var bCommittedTxID [8]byte
6,761✔
72
                state, err := db.CurrentState()
6,761✔
73
                if err == nil {
13,522✔
74
                        binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId)
6,761✔
75
                }
6,761✔
76

77
                streamMetadata = map[string][]byte{
6,761✔
78
                        "may-commit-up-to-txid-bin": bMayCommitUpToTxID[:],
6,761✔
79
                        "may-commit-up-to-alh-bin":  mayCommitUpToAlh[:],
6,761✔
80
                        "committed-txid-bin":        bCommittedTxID[:],
6,761✔
81
                }
6,761✔
82

6,761✔
83
                if setTrailer {
6,762✔
84
                        // trailer metadata is kept for backward compatibility
1✔
85
                        // it should not be sent when replication is done with bidirectional streaming
1✔
86
                        // otherwise metadata will get accumulated over time
1✔
87
                        md := metadata.Pairs(
1✔
88
                                "may-commit-up-to-txid-bin", string(bMayCommitUpToTxID[:]),
1✔
89
                                "may-commit-up-to-alh-bin", string(mayCommitUpToAlh[:]),
1✔
90
                                "committed-txid-bin", string(bCommittedTxID[:]),
1✔
91
                        )
1✔
92
                        txsServer.SetTrailer(md)
1✔
93
                }
1✔
94
        }
95

96
        sender := stream.NewMsgSender(txsServer, buf)
8,441✔
97

8,441✔
98
        return sender.Send(bytes.NewReader(txbs), len(txbs), streamMetadata)
8,441✔
99
}
100

101
func (s *ImmuServer) ReplicateTx(replicateTxServer schema.ImmuService_ReplicateTxServer) error {
4✔
102
        if replicateTxServer == nil {
5✔
103
                return ErrIllegalArguments
1✔
104
        }
1✔
105

106
        ctx := replicateTxServer.Context()
3✔
107

3✔
108
        db, err := s.getDBFromCtx(ctx, "ReplicateTx")
3✔
109
        if err != nil {
4✔
110
                return err
1✔
111
        }
1✔
112

113
        if s.replicationInProgressFor(db.GetName()) {
2✔
114
                return ErrReplicationInProgress
×
115
        }
×
116

117
        var skipIntegrityCheck bool
2✔
118
        var waitForIndexing bool
2✔
119

2✔
120
        md, ok := metadata.FromIncomingContext(ctx)
2✔
121
        if ok {
4✔
122
                if len(md.Get("skip-integrity-check")) > 0 {
3✔
123
                        skipIntegrityCheck, err = strconv.ParseBool(md.Get("skip-integrity-check")[0])
1✔
124
                        if err != nil {
1✔
125
                                return err
×
126
                        }
×
127
                }
128

129
                if len(md.Get("wait-for-indexing")) > 0 {
3✔
130
                        waitForIndexing, err = strconv.ParseBool(md.Get("wait-for-indexing")[0])
1✔
131
                        if err != nil {
1✔
132
                                return err
×
133
                        }
×
134
                }
135
        }
136

137
        receiver := s.StreamServiceFactory.NewMsgReceiver(replicateTxServer)
2✔
138

2✔
139
        bs, _, err := receiver.ReadFully()
2✔
140
        if err != nil {
3✔
141
                return err
1✔
142
        }
1✔
143

144
        hdr, err := db.ReplicateTx(replicateTxServer.Context(), bs, skipIntegrityCheck, waitForIndexing)
1✔
145
        if err != nil {
1✔
146
                return err
×
147
        }
×
148

149
        return replicateTxServer.SendAndClose(hdr)
1✔
150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc