• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smallnest / exp / 12340184181

15 Dec 2024 03:49PM UTC coverage: 74.626% (+0.5%) from 74.114%
12340184181

push

github

smallnest
add Broadcaster

7 of 7 new or added lines in 1 file covered. (100.0%)

18 existing lines in 2 files now uncovered.

2594 of 3476 relevant lines covered (74.63%)

3013519.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.36
/sync/phaser.go
1
package sync
2

3
import (
4
        "sync"
5
        "sync/atomic"
6
)
7

8
// Phaser is a reusable synchronization barrier, similar in functionality to java Phaser.
9
type Phaser struct {
10
        parties      atomic.Int32
11
        arrived      atomic.Int32
12
        phase        atomic.Int32
13
        barrier      *sync.Cond
14
        arriveAction func(parties int32) error
15
        terminated   atomic.Int32
16
}
17

18
// NewPhaser creates a new Phaser instance.
19
func NewPhaser(parties int32) *Phaser {
2✔
20
        var p Phaser
2✔
21
        p.parties.Store(parties)
2✔
22
        p.barrier = sync.NewCond(&sync.Mutex{})
2✔
23
        return &p
2✔
24
}
2✔
25

26
func NewPhaserWithAction(parties int32, arriveAction func(parties int32) error) *Phaser {
×
27
        var p Phaser
×
28
        p.parties.Store(parties)
×
29
        p.barrier = sync.NewCond(&sync.Mutex{})
×
30
        p.arriveAction = arriveAction
×
31

×
32
        return &p
×
33
}
×
34

35
// Join adds a new party to this phaser.
36
// Just like java.util.concurrent.Phaser's register() method.
37
func (p *Phaser) Join() int32 {
6✔
38
        p.barrier.L.Lock()
6✔
39
        defer p.barrier.L.Unlock()
6✔
40

6✔
41
        return p.parties.Add(1)
6✔
42
}
6✔
43

44
// BulkJoin adds a number of new parties to this phaser.
45
// Just like java.util.concurrent.Phaser's bulkRegister(int parties) method.
46
func (p *Phaser) BulkJoin(parties int32) int32 {
×
47
        p.barrier.L.Lock()
×
48
        defer p.barrier.L.Unlock()
×
49

×
50
        return p.parties.Add(parties)
×
51
}
×
52

53
// Arrive arrives at this phaser, without waiting for others to arrive.
54
func (p *Phaser) Arrive() int32 {
×
55
        p.barrier.L.Lock()
×
56
        defer p.barrier.L.Unlock()
×
57

×
58
        currentArrived := p.arrived.Add(1)
×
59
        if currentArrived == p.parties.Load() { // all arrived
×
60
                p.phase.Add(1)
×
61
                p.arrived.Store(0)
×
62
                // call arrive action if it is nil and broadcast
×
63
                if p.arriveAction != nil {
×
64
                        p.arriveAction(p.parties.Load())
×
65
                }
×
66
                p.barrier.Broadcast()
×
67
        }
68

69
        return p.phase.Load()
×
70
}
71

72
// Wait awaits the phase of this phaser to advance from the given phase value,
73
// returning immediately if the current phase is not equal to the given phase value or this phaser is terminated.
74
// Just like java.util.concurrent.Phaser's awaitAdvance(int phase) method.
75
func (p *Phaser) Wait(phase int) int32 {
×
76
        p.barrier.L.Lock()
×
77
        defer p.barrier.L.Unlock()
×
78

×
79
        for int32(phase) == p.phase.Load() && p.terminated.Load() == 0 {
×
80
                p.barrier.Wait()
×
81
        }
×
82

83
        return p.phase.Load()
×
84
}
85

86
// ArriveAndWait arrives at this phaser and waits others.
87
// Just like java.util.concurrent.Phaser's arriveAndAwaitAdvance() method.
88
func (p *Phaser) ArriveAndWait() int32 {
14✔
89
        p.barrier.L.Lock()
14✔
90
        defer p.barrier.L.Unlock()
14✔
91

14✔
92
        phase := p.phase.Load()
14✔
93
        currentArrived := p.arrived.Add(1)
14✔
94
        if currentArrived == p.parties.Load() { // all arrived
19✔
95
                p.phase.Add(1)
5✔
96
                p.arrived.Store(0)
5✔
97
                // call arrive action if it is nil and broadcast
5✔
98
                if p.arriveAction != nil {
5✔
99
                        p.arriveAction(p.parties.Load())
×
100
                }
×
101
                p.barrier.Broadcast()
5✔
102
        } else {
9✔
103
                // wait for phase to change in current phase
9✔
104
                for phase == p.phase.Load() && p.terminated.Load() == 0 {
18✔
105
                        p.barrier.Wait()
9✔
106
                }
9✔
107
        }
108

109
        return p.phase.Load()
14✔
110
}
111

112
// ArriveAndLeave arrives at this phaser and leaves from it without waiting for others to arrive.
113
// Just like java.util.concurrent.Phaser's arriveAndDeregister() method.
114
func (p *Phaser) ArriveAndLeave() int32 {
1✔
115
        p.barrier.L.Lock()
1✔
116
        defer p.barrier.L.Unlock()
1✔
117

1✔
118
        phase := p.phase.Load()
1✔
119
        currentArrived := p.arrived.Add(1)
1✔
120
        if currentArrived == p.parties.Load() { // all arrived
2✔
121
                p.phase.Add(1)
1✔
122
                p.arrived.Store(0)
1✔
123
                // call arrive action if it is nil and broadcast
1✔
124
                if p.arriveAction != nil {
1✔
125
                        p.arriveAction(p.parties.Load())
×
126
                }
×
127
                p.barrier.Broadcast()
1✔
UNCOV
128
        } else {
×
UNCOV
129
                // wait for phase to change in current phase
×
UNCOV
130
                for phase == p.phase.Load() && p.terminated.Load() == 0 {
×
UNCOV
131
                        p.barrier.Wait()
×
UNCOV
132
                }
×
133
        }
134

135
        p.leave()
1✔
136

1✔
137
        return p.phase.Load()
1✔
138
}
139

140
func (p *Phaser) leave() int32 {
2✔
141
        // leave this phaser
2✔
142
        parties := p.parties.Add(-1)
2✔
143
        if parties == 0 { // is the last one, terminate this phaser
2✔
144
                p.parties.Store(0)
×
145
                p.phase.Store(0)
×
146
                p.arrived.Store(0)
×
147
                p.terminated.Store(1)
×
148
                p.barrier.Broadcast()
×
149
        }
×
150

151
        return parties
2✔
152
}
153

154
// Leave leaves from this phaser without waiting for others to arrive.
155
// Just like java.util.concurrent.Phaser's deregister() method.
156
func (p *Phaser) Leave() int32 {
1✔
157
        p.barrier.L.Lock()
1✔
158
        defer p.barrier.L.Unlock()
1✔
159

1✔
160
        return p.leave()
1✔
161
}
1✔
162

163
// Phase returns the current phase number.
164
func (p *Phaser) Phase() int32 {
5✔
165
        return p.phase.Load()
5✔
166
}
5✔
167

168
// Phase returns the current phase number.
169
func (p *Phaser) Arrived() int32 {
×
170
        return p.arrived.Load()
×
171
}
×
172

173
// Parties returns the number of parties joined in this phaser.
174
func (p *Phaser) Parties() int32 {
×
175
        return p.parties.Load()
×
176
}
×
177

178
// ForceTermination forces this phaser to enter termination state.
179
func (p *Phaser) ForceTermination() {
×
180
        p.terminated.Store(1)
×
181
}
×
182

183
// IsTerminated returns true if this phaser has been terminated.
184
func (p *Phaser) IsTerminated() bool {
×
185
        return p.terminated.Load() == 1
×
186
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc