• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / servicecomb-kie / 214

28 Dec 2019 08:06AM UTC coverage: 53.976% (+7.3%) from 46.679%
214

Pull #58

travis-ci

web-flow
SCB-1549 peer to peer event notification
Pull Request #58: SCB-1549 peer to peer event notification

174 of 307 new or added lines in 10 files covered. (56.68%)

5 existing lines in 4 files now uncovered.

733 of 1358 relevant lines covered (53.98%)

1.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.73
/server/pubsub/bus.go
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package pubsub
19

20
import (
21
        "encoding/json"
22
        "github.com/apache/servicecomb-kie/server/config"
23
        "github.com/go-mesh/openlogging"
24
        "github.com/hashicorp/serf/cmd/serf/command/agent"
25
        "github.com/hashicorp/serf/serf"
26
        "sync"
27
)
28

29
var once sync.Once
30
var bus *Bus
31

32
//const
33
const (
34
        EventKVChange = "kv-changed"
35
)
36

37
var mutexObservers sync.RWMutex
38
var topics sync.Map
39

40
//Bus is message bug
41
type Bus struct {
42
        agent *agent.Agent
43
}
44

45
//Init create serf agent
46
func Init() {
1✔
47
        once.Do(func() {
2✔
48
                ac := agent.DefaultConfig()
1✔
49
                if config.Configurations.ListenPeerAddr != "" {
1✔
NEW
50
                        ac.BindAddr = config.Configurations.ListenPeerAddr
×
NEW
51
                }
×
52
                if config.Configurations.AdvertiseAddr != "" {
1✔
NEW
53
                        ac.AdvertiseAddr = config.Configurations.AdvertiseAddr
×
NEW
54
                }
×
55
                sc := serf.DefaultConfig()
1✔
56
                if config.Configurations.NodeName != "" {
1✔
NEW
57
                        sc.NodeName = config.Configurations.NodeName
×
NEW
58
                }
×
59
                ac.UserEventSizeLimit = 512
1✔
60
                a, err := agent.Create(ac, sc, nil)
1✔
61
                if err != nil {
1✔
NEW
62
                        openlogging.Fatal("can not sync key value change events to other kie nodes:" + err.Error())
×
NEW
63
                }
×
64
                bus = &Bus{
1✔
65
                        agent: a,
1✔
66
                }
1✔
67
                if config.Configurations.PeerAddr != "" {
1✔
NEW
68
                        err := join([]string{config.Configurations.PeerAddr})
×
NEW
69
                        if err != nil {
×
NEW
70
                                openlogging.Fatal("lost event message")
×
NEW
71
                        } else {
×
NEW
72
                                openlogging.Info("join kie node:" + config.Configurations.PeerAddr)
×
NEW
73
                        }
×
74
                }
75
        })
76
}
77

78
//Start start serf agent
79
func Start() {
1✔
80
        err := bus.agent.Start()
1✔
81
        if err != nil {
1✔
NEW
82
                openlogging.Fatal("can not sync key value change events to other kie nodes" + err.Error())
×
NEW
83
        }
×
84
        openlogging.Info("kie message bus started")
1✔
85
        bus.agent.RegisterEventHandler(&EventHandler{})
1✔
86
}
NEW
87
func join(addresses []string) error {
×
NEW
88
        _, err := bus.agent.Join(addresses, false)
×
NEW
89
        if err != nil {
×
NEW
90
                return err
×
NEW
91
        }
×
NEW
92
        return nil
×
93
}
94

95
//Publish send event
96
func Publish(event *KVChangeEvent) error {
1✔
97
        b, err := json.Marshal(event)
1✔
98
        if err != nil {
1✔
NEW
99
                return err
×
NEW
100
        }
×
101
        return bus.agent.UserEvent(EventKVChange, b, true)
1✔
102

103
}
104

105
//ObserveOnce observe key changes by (key or labels) or (key and labels)
106
func ObserveOnce(o *Observer, topic *Topic) error {
1✔
107
        topic.Format()
1✔
108
        b, err := json.Marshal(topic)
1✔
109
        if err != nil {
1✔
NEW
110
                return err
×
NEW
111
        }
×
112
        t := string(b)
1✔
113
        observers, ok := topics.Load(t)
1✔
114
        if !ok {
2✔
115
                topics.Store(t, map[string]*Observer{
1✔
116
                        o.UUID: o,
1✔
117
                })
1✔
118
                openlogging.Info("new topic:" + t)
1✔
119
                return nil
1✔
120
        }
1✔
NEW
121
        mutexObservers.Lock()
×
NEW
122
        observers.(map[string]*Observer)[o.UUID] = o
×
NEW
123
        mutexObservers.Unlock()
×
NEW
124
        openlogging.Debug("add new observer for topic:" + t)
×
NEW
125
        return nil
×
126
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2023 Coveralls, Inc