• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deavmi / tristanable / #36

26 Nov 2023 05:09PM UTC coverage: 95.019% (-0.1%) from 95.161%
#36

Pull #10

coveralls-ruby

deavmi
Manager

- Implemented `shutdownAllQueues()`
- Calling `stop()` now shuts down all queues
Pull Request #10: Unblock queues on shutdown or errors

12 of 13 new or added lines in 2 files covered. (92.31%)

8 existing lines in 1 file now uncovered.

248 of 261 relevant lines covered (95.02%)

8.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.84
/source/tristanable/queue/queue.d
1
/**
2
 * A queue of queue items all of the same tag
3
 */
4
module tristanable.queue.queue;
5

6
import core.sync.mutex : Mutex;
7
import core.sync.condition : Condition;
8
import core.sync.exception : SyncError;
9
import std.container.slist : SList;
10
import tristanable.encoding;
11
import core.time : Duration, dur;
12
import tristanable.exceptions;
13

14
version(unittest)
15
{
16
   import std.stdio;
17
   import std.conv : to;
18
}
19

20
/** 
21
 * Represents a queue whereby messages of a certain tag/id
22
 * can be enqueued to (by the `Watcher`) and dequeued from
23
 * (by the user application)
24
 */
25
public class Queue
26
{
27
    /** 
28
     * This queue's unique ID
29
     */
30
    private ulong queueID;
31

32
    /** 
33
     * The libsnooze event used to sleep/wake
34
     * on queue events
35
     * Mutex for the condition variable
36
     */
37
    private Mutex mutex;
38

39
    /** 
40
     * The condition variable used to sleep/wake
41
     * on queue of events
42
     */
43
    private Condition signal;
44

45
    /** 
46
     * The queue of messages
47
     */
48
    private SList!(TaggedMessage) queue;
49

50
    /** 
51
     * The lock for the message queue
52
     */
53
    private Mutex queueLock;
54

55
    /**
56
     * If a message is enqueued prior
57
     * to us sleeping then we won't
58
     * wake up and return for it.
59
     *
60
     * Therefore a periodic wakeup
61
     * is required.
62
     */
63
    private Duration wakeInterval;
64

65
    /** 
66
     * Reason for a `dequeue()`
67
     * to have failed
68
     */
69
    private ErrorType exitReason;
70
    private bool alive;
71

72
    /** 
73
     * Constructs a new Queue and immediately sets up the notification
74
     * sub-system for the calling thread (the thread constructing this
75
     * object) which ensures that a call to dequeue will immediately
76
     * unblock on the first message received under this tag
77
     *
78
     * Params:
79
     *   queueID = the id to use for this queue
80
     */
81
    this(ulong queueID)
9✔
82
    {
83
        /* Initialize the queue lock */
84
        this.queueLock = new Mutex();
9✔
85

86
        /* Initialize the condition variable */
87
        this.mutex = new Mutex();
9✔
88
        this.signal = new Condition(this.mutex);
9✔
89

90
        /* Set the queue id */
91
        this.queueID = queueID;
9✔
92

93
        /* Set the slumber interval */
94
        this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
9✔
95

96
        /* Set status to alive */
97
        this.alive = true;
9✔
98
    }
99

100
    /** 
101
     * Returns the current wake interval
102
     * for the queue checker
103
     *
104
     * Returns: the `Duration`
105
     */
106
    public Duration getWakeInterval()
107
    {
108
        return this.wakeInterval;
×
109
    }
110

111
    /** 
112
     * Sets the wake up interval
113
     *
114
     * Params:
115
     *   interval = the new interval
116
     */
117
    public void setWakeInterval(Duration interval)
118
    {
119
        this.wakeInterval = interval;
×
120
    }
121

122
    /** 
123
     * Enqueues the provided tagged message onto this queue
124
     * and then wakes up any thread that has called dequeue
125
     * on this queue as well
126
     *
127
     * On error enqueueing a `TristanableException` will be
128
     * thrown.
129
     *
130
     * Params:
131
     *   message = the TaggedMessage to enqueue
132
     */
133
    public void enqueue(TaggedMessage message)
134
    {
135
        version(unittest)
136
        {
137
            writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'...");
5✔
138
        }
139

140
        scope(exit)
141
        {
142
            version(unittest)
143
            {
144
                writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!");
5✔
145
            }
146

147
            /* Unlock the item queue */
148
            queueLock.unlock();
5✔
149
        }
150

151
        /* Lock the item queue */
152
        queueLock.lock();
5✔
153

154
        /* Add the item to the queue */
155
        queue.insertAfter(queue[], message);
5✔
156

157
        /* Wake up anyone wanting to dequeue from us */
158
        try
159
        {
160
            // TODO: Make us wait on the event (optional with a time-out)
161
            signal.notifyAll();
5✔
162
        }
163
        catch(SyncError snozErr)
164
        {
165
            // Throw an exception on a fatal exception
166
            throw new TristanableException(ErrorType.ENQUEUE_FAILED);
×
167
        }
168
    }
169

170

171
    public void shutdownQueue(ErrorType reason)
172
    {
173
        // Set running state and reason
174
        this.alive = false;
2✔
175
        this.exitReason = reason;
2✔
176

177
        // Wakeup sleeping dequeue()
178

179
        // Lock the mutex
180
        this.mutex.lock();
2✔
181

182
        // Awake all condition variable sleepers
183
        this.signal.notifyAll();
2✔
184

185
        // Unlock the mutex
186
        this.mutex.unlock();
2✔
187
    }
188

189
    // TODO: Make a version of this which can time out
190

191
    /** 
192
     * Blocks till a message can be dequeued from this queue
193
     *
194
     * On error dequeueing a `TristanableException` will be
195
     * thrown.
196
     *
197
     * Returns: the dequeued TaggedMessage
198
     */
199
    public TaggedMessage dequeue()
200
    {
201
        version(unittest)
202
        {
203
            writeln("queue["~to!(string)(queueID)~"]: Dequeueing...");
5✔
204
        }
205

206
        /* The dequeued message */
207
        TaggedMessage dequeuedMessage;
5✔
208

209
        scope(exit)
210
        {
211
            version(unittest)
212
            {
213
                writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!");
5✔
214
            }
215
        }
216

217
        /* Block till we dequeue a message successfully */
218
        while(dequeuedMessage is null)
149✔
219
        {
220
            /* Check if this queue is still alive */
221
            if(!this.alive)
144✔
222
            {
223
                // Throw an exception to unblock the calling `dequeue()`
NEW
224
                throw new TristanableException(this.exitReason);
×
225
            }
226

227
            scope(exit)
228
            {
229
                // Unlock the mutex
230
                this.mutex.unlock();
144✔
231
            }
232

233
            // Lock the mutex
234
            this.mutex.lock();
144✔
235

236
            try
237
            {
238
                this.signal.wait(this.wakeInterval);
144✔
239
            }
240
            catch(SyncError e)
241
            {
242
                // Throw an exception on a fatal exception
243
                throw new TristanableException(ErrorType.DEQUEUE_FAILED);
×
244
            }
245

246
            /* Lock the item queue */
247
            queueLock.lock();
144✔
248

249
            /* Consume the front of the queue (if non-empty) */
250
            if(!queue.empty())
144✔
251
            {
252
                /* Pop the front item off */
253
                dequeuedMessage = queue.front();
5✔
254

255
                /* Remove the front item from the queue */
256
                queue.linearRemoveElement(dequeuedMessage);
5✔
257
            }
258

259
            /* Unlock the item queue */
260
            queueLock.unlock();
144✔
261
        }
262

263
        return dequeuedMessage;
5✔
264
    }
265

266
    /** 
267
     * Get the id/tag of this queue
268
     *
269
     * Returns: the queue's id
270
     */
271
    public ulong getID()
272
    {
273
        return queueID;
34✔
274
    }
275
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc