• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

beatlabs / harvester / 9155019498

20 May 2024 07:16AM UTC coverage: 92.938% (-0.8%) from 93.785%
9155019498

push

github

web-flow
Bump github.com/hashicorp/consul/api from 1.28.2 to 1.28.3 (#176)

Bumps [github.com/hashicorp/consul/api](https://github.com/hashicorp/consul) from 1.28.2 to 1.28.3.
- [Release notes](https://github.com/hashicorp/consul/releases)
- [Changelog](https://github.com/hashicorp/consul/blob/main/CHANGELOG.md)
- [Commits](https://github.com/hashicorp/consul/compare/api/v1.28.2...api/v1.28.3)

---
updated-dependencies:
- dependency-name: github.com/hashicorp/consul/api
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

987 of 1062 relevant lines covered (92.94%)

26.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.77
/monitor/consul/watcher.go
1
// Package consul handles the monitor capabilities of harvester using ConsulLogger.
2
package consul
3

4
import (
5
        "context"
6
        "errors"
7
        "log/slog"
8
        "path"
9
        "time"
10

11
        "github.com/beatlabs/harvester/change"
12
        "github.com/beatlabs/harvester/config"
13
        "github.com/hashicorp/consul/api"
14
        "github.com/hashicorp/consul/api/watch"
15
)
16

17
// Item definition.
18
type Item struct {
19
        tp     string
20
        key    string
21
        prefix string
22
}
23

24
// NewKeyItem creates a new key watch item for the watcher.
25
func NewKeyItem(key string) Item {
×
26
        return Item{tp: "key", key: key}
×
27
}
×
28

29
// NewKeyItemWithPrefix creates a new key item for a given key and prefix.
30
func NewKeyItemWithPrefix(key, prefix string) Item {
2✔
31
        return Item{tp: "key", key: key, prefix: prefix}
2✔
32
}
2✔
33

34
// NewPrefixItem creates a prefix key watch item for the watcher.
35
func NewPrefixItem(key string) Item {
1✔
36
        return Item{tp: "keyprefix", key: key}
1✔
37
}
1✔
38

39
// Watcher of ConsulLogger changes.
40
type Watcher struct {
41
        cl    *api.Client
42
        dc    string
43
        token string
44
        pp    []*watch.Plan
45
        ii    []Item
46
}
47

48
// New creates a new watcher.
49
func New(addr, dc, token string, timeout time.Duration, ii ...Item) (*Watcher, error) {
6✔
50
        if addr == "" {
7✔
51
                return nil, errors.New("address is empty")
1✔
52
        }
1✔
53
        if len(ii) == 0 {
6✔
54
                return nil, errors.New("items are empty")
1✔
55
        }
1✔
56
        cfg := api.DefaultConfig()
4✔
57
        cfg.Address = addr
4✔
58
        if timeout > 0 {
5✔
59
                cfg.WaitTime = timeout
1✔
60
        }
1✔
61

62
        cl, err := api.NewClient(cfg)
4✔
63
        if err != nil {
4✔
64
                return nil, err
×
65
        }
×
66
        return &Watcher{cl: cl, dc: dc, token: token, ii: ii}, nil
4✔
67
}
68

69
// Watch key and prefixes for changes.
70
func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error {
3✔
71
        if ctx == nil {
4✔
72
                return errors.New("context is nil")
1✔
73
        }
1✔
74
        if ch == nil {
3✔
75
                return errors.New("change channel is nil")
1✔
76
        }
1✔
77
        for _, i := range w.ii {
4✔
78
                var pl *watch.Plan
3✔
79
                var err error
3✔
80
                switch i.tp {
3✔
81
                case "key":
2✔
82
                        pl, err = w.createKeyPlanWithPrefix(i.key, i.prefix, ch)
2✔
83
                case "keyprefix":
1✔
84
                        pl, err = w.createKeyPrefixPlan(i.key, ch)
1✔
85
                }
86
                if err != nil {
3✔
87
                        return err
×
88
                }
×
89
                w.pp = append(w.pp, pl)
3✔
90
                go func(tp, key string) {
6✔
91
                        err := pl.RunWithClientAndHclog(w.cl, logger)
3✔
92
                        if err != nil {
3✔
93
                                slog.Error("plan failed", "plan", tp, "type", key, "err", err)
×
94
                        } else {
3✔
95
                                slog.Debug("plan running", "plan", tp, "type", key)
3✔
96
                        }
3✔
97
                }(i.tp, i.key)
98
        }
99
        go func() {
2✔
100
                <-ctx.Done()
1✔
101
                for _, pl := range w.pp {
4✔
102
                        pl.Stop()
3✔
103
                }
3✔
104
                slog.Debug("all watch plans have been stopped")
1✔
105
        }()
106

107
        return nil
1✔
108
}
109

110
func (w *Watcher) createKeyPlanWithPrefix(key, prefix string, ch chan<- []*change.Change) (*watch.Plan, error) {
2✔
111
        pl, err := w.getPlan("key", path.Join(prefix, key))
2✔
112
        if err != nil {
2✔
113
                return nil, err
×
114
        }
×
115
        pl.Handler = func(idx uint64, data interface{}) {
4✔
116
                if data == nil {
2✔
117
                        return
×
118
                }
×
119
                pair, ok := data.(*api.KVPair)
2✔
120
                if !ok {
2✔
121
                        slog.Error("data is not a kv pair", "data", data)
×
122
                } else {
2✔
123
                        ch <- []*change.Change{change.New(config.SourceConsul, key, string(pair.Value), pair.ModifyIndex)}
2✔
124
                }
2✔
125
        }
126
        slog.Debug("plan created", "key", key)
2✔
127
        return pl, nil
2✔
128
}
129

130
func (w *Watcher) createKeyPrefixPlan(keyPrefix string, ch chan<- []*change.Change) (*watch.Plan, error) {
1✔
131
        pl, err := w.getPlan("keyprefix", keyPrefix)
1✔
132
        if err != nil {
1✔
133
                return nil, err
×
134
        }
×
135
        pl.Handler = func(idx uint64, data interface{}) {
1✔
136
                if data == nil {
×
137
                        return
×
138
                }
×
139
                pp, ok := data.(api.KVPairs)
×
140
                if !ok {
×
141
                        slog.Error("data is not a kv pairs", "data", data)
×
142
                } else {
×
143
                        cc := make([]*change.Change, len(pp))
×
144
                        for i := 0; i < len(pp); i++ {
×
145
                                cc[i] = change.New(config.SourceConsul, pp[i].Key, string(pp[i].Value), pp[i].ModifyIndex)
×
146
                        }
×
147
                        ch <- cc
×
148
                }
149
        }
150
        slog.Debug("plan created", "keyPrefix", keyPrefix)
1✔
151
        return pl, nil
1✔
152
}
153

154
func (w *Watcher) getPlan(tp, key string) (*watch.Plan, error) {
3✔
155
        params := map[string]interface{}{}
3✔
156
        params["datacenter"] = w.dc
3✔
157
        params["token"] = w.token
3✔
158
        if tp == "key" {
5✔
159
                params["key"] = key
2✔
160
        } else {
3✔
161
                params["prefix"] = key
1✔
162
        }
1✔
163
        params["type"] = tp
3✔
164
        return watch.Parse(params)
3✔
165
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc