• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deavmi / tristanable / #23

02 Oct 2023 01:49PM UTC coverage: 95.161%. First build
#23

push

coveralls-ruby

web-flow
Nextgen: Tristanable v2 (#4)

* - Added new logo to `README.md`
- Fixed typos in `README.md`
- Added new logo (source included)

* - Added `bformat` version `3.1.13` as dependency

* - Removed executable

* - Updated `.gitignore`

* - Use `https` link rather to `bformat` homepage

* Package (tristanable)

- Added public imports along with comments per each

Encoding

- Added a stub class, `TaggedMessage`, for encoding and decoding the tristanable byte payload

Exceptions

- Added `TristanableException` exception type along with the `Error` enum sub-type

Manager

- Added stub code for `Manager` to manage the queues and socket

Queue

- Added stub class representing a queue with a tag (`Queue`)

QueueItem

- Added stub class `QueueItem` which represents an item that is enqueued/dequeued onto a `Queue`

Watcher

- Added stub class `Watcher` which will manage the socket reading-wise

* Manager

- Added field `watcher` of type `Watcher`

* Watcher

- Added constructor which takes in an instance of `Manager` and an instance of `Socket`

* Manager

- Added unit test TODO

* - Moved `Watcher` and `Manager` modules to their own package
- Ensured `Watcher`'s constructor is package-level accessible only

Manager

- The constructor now creates an instance of `Watcher`
- Added a `start()` method which calls `watcher.start()`

* Manager

- Added stub `sendMessage(TaggedMessage)` which will encode into the tristanable format, then wrap into bformat and send over the socket
- Added import for `TaggedMessage` from `tristanable.encoding` module

* Package (tristanable)

- Added an import for `TaggedMessage` from module `tristanable.encoding`

* Encoding

- Added stub class `TaggedMessage`
- Added constructor, static decoder (unimplemented), `encoder (implemented), getters and setters
- Added module `tristanable.encoding`

* - Attempt merge

* Encoding

- Added parameter-less (default) const... (continued)

248 of 248 new or added lines in 6 files covered. (100.0%)

236 of 248 relevant lines covered (95.16%)

7.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/source/tristanable/queue/queue.d
1
/**
2
 * A queue of queue items all of the same tag
3
 */
4
module tristanable.queue.queue;
5

6
import core.sync.mutex : Mutex;
7
import core.sync.condition : Condition;
8
import core.sync.exception : SyncError;
9
import std.container.slist : SList;
10
import tristanable.encoding;
11
import core.time : Duration, dur;
12
import tristanable.exceptions;
13

14
version(unittest)
15
{
16
   import std.stdio;
17
   import std.conv : to;
18
}
19

20
/** 
21
 * Represents a queue whereby messages of a certain tag/id
22
 * can be enqueued to (by the `Watcher`) and dequeued from
23
 * (by the user application)
24
 */
25
public class Queue
26
{
27
    /** 
28
     * This queue's unique ID
29
     */
30
    private ulong queueID;
31

32
    /** 
33
     * The libsnooze event used to sleep/wake
34
     * on queue events
35
     * Mutex for the condition variable
36
     */
37
    private Mutex mutex;
38

39
    /** 
40
     * The condition variable used to sleep/wake
41
     * on queue of events
42
     */
43
    private Condition signal;
44

45
    /** 
46
     * The queue of messages
47
     */
48
    private SList!(TaggedMessage) queue;
49

50
    /** 
51
     * The lock for the message queue
52
     */
53
    private Mutex queueLock;
54

55
    /**
56
     * If a message is enqueued prior
57
     * to us sleeping then we won't
58
     * wake up and return for it.
59
     *
60
     * Therefore a periodic wakeup
61
     * is required.
62
     */
63
    private Duration wakeInterval;
64

65
    /** 
66
     * Constructs a new Queue and immediately sets up the notification
67
     * sub-system for the calling thread (the thread constructing this
68
     * object) which ensures that a call to dequeue will immediately
69
     * unblock on the first message received under this tag
70
     *
71
     * Params:
72
     *   queueID = the id to use for this queue
73
     */
74
    this(ulong queueID)
9✔
75
    {
76
        /* Initialize the queue lock */
77
        this.queueLock = new Mutex();
9✔
78

79
        /* Initialize the condition variable */
80
        this.mutex = new Mutex();
9✔
81
        this.signal = new Condition(this.mutex);
9✔
82

83
        /* Set the queue id */
84
        this.queueID = queueID;
9✔
85

86
        /* Set the slumber interval */
87
        this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
9✔
88
    }
89

90
    /** 
91
     * Returns the current wake interval
92
     * for the queue checker
93
     *
94
     * Returns: the `Duration`
95
     */
96
    public Duration getWakeInterval()
97
    {
98
        return this.wakeInterval;
×
99
    }
100

101
    /** 
102
     * Sets the wake up interval
103
     *
104
     * Params:
105
     *   interval = the new interval
106
     */
107
    public void setWakeInterval(Duration interval)
108
    {
109
        this.wakeInterval = interval;
×
110
    }
111

112
    /** 
113
     * Enqueues the provided tagged message onto this queue
114
     * and then wakes up any thread that has called dequeue
115
     * on this queue as well
116
     *
117
     * On error enqueueing a `TristanableException` will be
118
     * thrown.
119
     *
120
     * Params:
121
     *   message = the TaggedMessage to enqueue
122
     */
123
    public void enqueue(TaggedMessage message)
124
    {
125
        version(unittest)
126
        {
127
            writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'...");
5✔
128
        }
129

130
        scope(exit)
131
        {
132
            version(unittest)
133
            {
134
                writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!");
5✔
135
            }
136

137
            /* Unlock the item queue */
138
            queueLock.unlock();
5✔
139
        }
140

141
        /* Lock the item queue */
142
        queueLock.lock();
5✔
143

144
        /* Add the item to the queue */
145
        queue.insertAfter(queue[], message);
5✔
146

147
        /* Wake up anyone wanting to dequeue from us */
148
        try
149
        {
150
            // TODO: Make us wait on the event (optional with a time-out)
151
            signal.notifyAll();
5✔
152
        }
153
        catch(SyncError snozErr)
154
        {
155
            // Throw an exception on a fatal exception
156
            throw new TristanableException(ErrorType.ENQUEUE_FAILED);
×
157
        }
158
    }
159

160
    // TODO: Make a version of this which can time out
161

162
    /** 
163
     * Blocks till a message can be dequeued from this queue
164
     *
165
     * On error dequeueing a `TristanableException` will be
166
     * thrown.
167
     *
168
     * Returns: the dequeued TaggedMessage
169
     */
170
    public TaggedMessage dequeue()
171
    {
172
        version(unittest)
173
        {
174
            writeln("queue["~to!(string)(queueID)~"]: Dequeueing...");
5✔
175
        }
176

177
        /* The dequeued message */
178
        TaggedMessage dequeuedMessage;
5✔
179

180
        scope(exit)
181
        {
182
            version(unittest)
183
            {
184
                writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!");
5✔
185
            }
186
        }
187

188
        /* Block till we dequeue a message successfully */
189
        while(dequeuedMessage is null)
149✔
190
        {
191
            scope(exit)
192
            {
193
                // Unlock the mutex
194
                this.mutex.unlock();
144✔
195
            }
196

197
            // Lock the mutex
198
            this.mutex.lock();
144✔
199

200
            try
201
            {
202
                this.signal.wait(this.wakeInterval);
144✔
203
            }
204
            catch(SyncError e)
205
            {
206
                // Throw an exception on a fatal exception
207
                throw new TristanableException(ErrorType.DEQUEUE_FAILED);
×
208
            }
209

210

211
            /* Lock the item queue */
212
            queueLock.lock();
144✔
213

214
            /* Consume the front of the queue (if non-empty) */
215
            if(!queue.empty())
144✔
216
            {
217
                /* Pop the front item off */
218
                dequeuedMessage = queue.front();
5✔
219

220
                /* Remove the front item from the queue */
221
                queue.linearRemoveElement(dequeuedMessage);
5✔
222
            }
223

224
            /* Unlock the item queue */
225
            queueLock.unlock();
144✔
226
        }
227

228
        return dequeuedMessage;
5✔
229
    }
230

231
    /** 
232
     * Get the id/tag of this queue
233
     *
234
     * Returns: the queue's id
235
     */
236
    public ulong getID()
237
    {
238
        return queueID;
34✔
239
    }
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc