• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

trento-project / agent / 14442149348

14 Apr 2025 09:24AM UTC coverage: 72.673% (+0.4%) from 72.224%
14442149348

push

github

web-flow
Run operations (#422)

* Add operation messages mapping

* Implement operation requests policy

* Listen for operation requests

* Wrap error messages

* Add tests to policy

* Use lowercase to wrap errors

174 of 201 new or added lines in 4 files covered. (86.57%)

4662 of 6415 relevant lines covered (72.67%)

19.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.19
/internal/operations/engine.go
1
package operations
2

3
import (
4
        "context"
5
        "fmt"
6

7
        log "github.com/sirupsen/logrus"
8
        "github.com/trento-project/agent/internal/messaging"
9
        "github.com/trento-project/workbench/pkg/operator"
10
)
11

12
const (
13
        exchange               string = "trento.operations"
14
        agentsQueue            string = "trento.operations.agents.%s"
15
        agentsEventsRoutingKey string = "agents"
16
        operationsRoutingKey   string = "requests"
17
)
18

19
type Engine struct {
20
        agentID          string
21
        amqpServiceURL   string
22
        amqpAdapter      messaging.Adapter
23
        operatorRegistry operator.Registry
24
}
25

26
func NewOperationsEngine(agentID, amqpServiceURL string, registry operator.Registry) *Engine {
1✔
27
        return &Engine{
1✔
28
                agentID:          agentID,
1✔
29
                amqpServiceURL:   amqpServiceURL,
1✔
30
                amqpAdapter:      nil,
1✔
31
                operatorRegistry: registry,
1✔
32
        }
1✔
33
}
1✔
34

35
func (e *Engine) Subscribe() error {
1✔
36
        log.Infof("Subscribing agent %s to the operations reception service on %s", e.agentID, e.amqpServiceURL)
1✔
37
        queue := fmt.Sprintf(agentsQueue, e.agentID)
1✔
38
        amqpAdapter, err := messaging.NewRabbitMQAdapter(
1✔
39
                e.amqpServiceURL,
1✔
40
                queue,
1✔
41
                exchange,
1✔
42
                agentsEventsRoutingKey,
1✔
43
        )
1✔
44
        if err != nil {
1✔
NEW
45
                return err
×
NEW
46
        }
×
47

48
        e.amqpAdapter = amqpAdapter
1✔
49
        log.Infof("Subscription to the operations engine by agent %s in %s done", e.agentID, e.amqpServiceURL)
1✔
50

1✔
51
        return nil
1✔
52
}
53

54
func (e *Engine) Unsubscribe() error {
1✔
55
        log.Infof("Unsubscribing agent %s from the operations engine service", e.agentID)
1✔
56
        if err := e.amqpAdapter.Unsubscribe(); err != nil {
1✔
NEW
57
                return err
×
NEW
58
        }
×
59

60
        log.Infof("Unsubscribed properly")
1✔
61

1✔
62
        return nil
1✔
63
}
64

65
func (e *Engine) Listen(ctx context.Context) error {
1✔
66
        var err error
1✔
67

1✔
68
        log.Infof("Listening for operation events...")
1✔
69
        defer func() {
2✔
70
                err = e.Unsubscribe()
1✔
71
                if err != nil {
1✔
NEW
72
                        log.Errorf("Error during unsubscription: %s", err)
×
NEW
73
                }
×
74
        }()
75
        eventHandler := messaging.MakeEventHandler(
1✔
76
                ctx,
1✔
77
                e.agentID,
1✔
78
                e.amqpAdapter,
1✔
79
                e.operatorRegistry,
1✔
80
                HandleEvent,
1✔
81
        )
1✔
82
        if err := e.amqpAdapter.Listen(eventHandler); err != nil {
1✔
NEW
83
                return err
×
NEW
84
        }
×
85

86
        <-ctx.Done()
1✔
87

1✔
88
        return err
1✔
89
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc