• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

renaissanceorg / renaissance / #80

26 Nov 2023 12:24PM UTC coverage: 9.201% (-0.07%) from 9.268%
#80

push

coveralls-ruby

web-flow
Merge 44c130c55 into 24aaf05dd

0 of 5 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

38 of 413 relevant lines covered (9.2%)

0.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/renaissance/connection/connection.d
1
module renaissance.connection.connection;
2

3
import davinci;
4
import core.thread : Thread;
5
import renaissance.server;
6
import river.core;
7
import tristanable;
8
import renaissance.logging;
9
import renaissance.server.messagemanager : MessageManager, Message;
10

11

12
import davinci.base.components : Validatable;
13

14
import davinci.c2s.auth : AuthMessage, AuthResponse;
15
import davinci.c2s.generic : UnknownCommandReply;
16

17
import davinci.c2s.channels : ChannelEnumerateRequest, ChannelEnumerateReply, ChannelMembership, ChannelMessage;
18
import davinci.c2s.test : NopMessage;
19
import renaissance.server.channelmanager : ChannelManager, Channel;
20

21
import std.conv : to;
22

23
public enum LinkType
24
{
25
    UNSET,
26
    USER,
27
    SERVER
28
}
29

30
public class Connection : Thread
31
{
32
    /** 
33
     * Associated server instance
34
     */
35
    private Server associatedServer;
36

37
    /** 
38
     * Underlying stream connecting us to
39
     * the client
40
     */
41
    private Stream clientStream;
42

43
    // TODO: TRistanable manager here
44
    private Manager tManager;
45
    private Queue incomingQueue;
46

47
    /** 
48
     * Whether this is a user connection
49
     * or a server link
50
     */
51
    private LinkType linkType;
52

53
    private this(Server associatedServer, Stream clientStream)
×
54
    {
55
        this.associatedServer = associatedServer;
×
56
        this.clientStream = clientStream;
×
57

58
        // TODO: Setup the tristanable manager here
59
        this.tManager = new Manager(clientStream);
×
60

61
        // TODO: If we not using tasky, probably not, then
62
        // ... register some queues here or use all
63
        // ... we need access ti akk so maybe
64
        // ... when the queue listener support drops
65
        //
66
        // UPDATE: Use the throwaway queue method
67
        initTManager();
×
68

69
        /* Set the worker function for the thread */
70
        super(&worker);
×
71
    }
72

73
    private void initTManager()
74
    {
75
        /* Create a Queue (doesn't matter its ID) */
76
        this.incomingQueue = new Queue(0);
×
77
     
78
        /* Set this Queue as the default Queue */
79
        this.tManager.setDefaultQueue(this.incomingQueue);
×
80
    }
81

82
    public LinkType getLinkType()
83
    {
84
        return this.linkType;
×
85
    }
86

87
    private void worker()
88
    {
89
        // TODO: Start tristanable manager here
90
        this.tManager.start();
×
91

92
        logger.info("Connection thread '"~this.toString()~"' started");
×
93

94
        // TODO: Add ourselves to the server's queue, we might need to figure out, first what
95
        // ... kind of connection we are
96

97
        // TODO: Well, we'd tasky I guess so I'd need to use it there I guess
98

99
        // TODO: Imp,ent nthe loop condition status (exit on error)
100
        bool isGood = true;
×
NEW
101
        queue_loop: while(isGood)
×
102
        {
103
            // TODO: Addn a tasky/tristanable queue managing thing with
104
            // ... socket here (probably just the latter)
105
            // ... which decodes using the `davinci` library
106

107
            import core.thread;
108
            // Thread.sleep(dur!("seconds")(5));
109

110
            // FIXME: If connection dies, something spins inside tristanable me thinks
111
            // ... causing a high load average, it MIGHT be when an error
112
            // ... occurs that it keeps going back to ask for recv
113
            // ... (this would make sense as this woul dbe something)
114
            // ... we didn't test for
115

116
            
NEW
117
            TaggedMessage incomingMessage;
×
118
            
119
            try
120
            {
121
                // Dequeue a message from the incoming queue
NEW
122
                incomingMessage = incomingQueue.dequeue();
×
123
            }
124
            catch(TristanableException e)
125
            {
NEW
126
                logger.error("We had a fatal tristanable exception whilst dequeue()'ing: "~e.toString());
×
NEW
127
                break queue_loop;
×
128
            }
129
            
UNCOV
130
            logger.dbg("Awoken? after dequeue()");
×
131

132
            // Process the message
133
            TaggedMessage response = handle(incomingMessage);
×
134
            if(response !is null)
×
135
            {
136
                logger.dbg("There was a response, sending: ", response);
×
137
                this.tManager.sendMessage(incomingMessage);
×
138
            }
139
            else
140
            {
141
                logger.dbg("There was no response, not sending anything.");
×
142
            }
143
        }
144

145
        // Clean up (TODO: Shutdown the TManager)
146
        
147

148
        // Clean up - notify disconnection
149
        this.associatedServer.onConnectionDisconnect(this);
×
150
    }
151

152
    // FIXME: These should be part of the auth details
153
    // ... associated with this user
154
    string myUsername = "bababooey";
155

156
    private bool isAuthd()
157
    {
158
        return myUsername.length != 0;
×
159
    }
160

161
    /** 
162
     * Given a `TaggedMessage` this method will decode
163
     * it into a Davinci `BaseMessage`, determine the
164
     * payload type via this header and then handle
165
     * the message/command accordingly
166
     *
167
     * Params:
168
     *   incomingMessage = the `TaggedMessage`
169
     * Returns: the response `TaggedMessage`, or
170
     * `null` if no response is to be sent
171
     */
172
    private TaggedMessage handle(TaggedMessage incomingMessage)
173
    {
174
        // TODO: In future this decoder, surely, should be idk
175
        // ... in davinci as in stateful encoder/decoder
176
        // ... reply-generator
177
        logger.dbg("Examining message '"~incomingMessage.toString()~"' ...");
×
178

179
        byte[] payload = incomingMessage.getPayload();
×
180
        import davinci;
181
        BaseMessage baseMessage = BaseMessage.decode(payload);
×
182
        logger.dbg("Incoming message: "~baseMessage.getCommand().toString());
×
183
        
184
        logger.dbg("BaseMessage type: ", baseMessage.getMessageType());
×
185
        Command incomingCommand = baseMessage.getCommand();
×
186
        CommandType incomingCommandType = baseMessage.getCommandType();
×
187
        logger.dbg("Incoming CommandType: ", incomingCommandType);
×
188

189
        BaseMessage response;
×
190
        MessageType mType;
×
191
        Command responseCommand;
×
192
        CommandType responseType;
×
193
        Status responseStatus;
×
194

195
        /** 
196
         * Perform validation before continueing
197
         */
198
        if(cast(Validatable)incomingCommand)
×
199
        {
200
            Validatable validtabaleCommand = cast(Validatable)incomingCommand;
×
201
            string reason;
×
202
            if(!validtabaleCommand.validate(reason))
×
203
            {
204
                logger.error("Validation failed with reason: '", reason, "'");
×
205

206
                
207
                UnknownCommandReply unknownCmdReply = new UnknownCommandReply(reason);
×
208

209
                mType = MessageType.CLIENT_TO_SERVER;
×
210
                responseType = CommandType.UNKNOWN_COMMAND;
×
211
                responseCommand = unknownCmdReply;
×
212

213
                // TODO: Can we do this without gotos?
214
                goto encode_n_send;
×
215
            }
216
        }
217

218
        /** 
219
         * Handle the different types of commands
220
         */
221
        switch(incomingCommandType)
×
222
        {
223
            /** 
224
             * Handle NOP commands
225
             */
226
            case CommandType.NOP_COMMAND:
×
227
            {
228
                logger.dbg("We got a NOP");
×
229
                NopMessage nopMessage = cast(NopMessage)baseMessage.getCommand();
×
230

231
                mType = MessageType.CLIENT_TO_SERVER;
×
232
                responseType = CommandType.NOP_COMMAND;
×
233
                responseCommand = nopMessage;
×
234

235
                break;
×
236
            }
237
            /**
238
             * Handle authentication request
239
             */
240
            case CommandType.AUTH_COMMAND:
×
241
            {
242
                AuthMessage authMessage = cast(AuthMessage)baseMessage.getCommand();
×
243
                bool status = this.associatedServer.attemptAuth(authMessage.getUsername(), authMessage.getPassword());
×
244

245
                // TODO: This is just for testing now - i intend to have a nice auth manager
246
                
247
                
248
                AuthResponse authResp = new AuthResponse();
×
249
                if(status)
×
250
                {
251
                    authResp.good();
×
252

253
                    // Save username
254
                    this.myUsername = authMessage.getUsername();
×
255
                }
256
                else
257
                {
258
                    authResp.bad();
×
259
                }
260

261
                mType = MessageType.CLIENT_TO_SERVER;
×
262
                responseType = CommandType.AUTH_RESPONSE;
×
263
                responseCommand = authResp;
×
264

265
                break;
×
266
            }
267
            /**
268
             * Handle channel list requests
269
             */
270
            case CommandType.CHANNELS_ENUMERATE_REQ:
×
271
            {
272
                // FIXME: Figure out how we want to do auth checks
273
                if(!isAuthd())
×
274
                {
275

276
                }
277
                
278
                ChannelEnumerateRequest chanEnumReq = cast(ChannelEnumerateRequest)baseMessage.getCommand();
×
279
                ubyte limit = chanEnumReq.getLimit();
×
280
                ulong offset = chanEnumReq.getOffset();
×
281

282
                string[] channelNames = this.associatedServer.getChannelNames(offset, limit);
×
283
                ChannelEnumerateReply chanEnumRep = new ChannelEnumerateReply(channelNames);
×
284

285
                mType = MessageType.CLIENT_TO_SERVER;
×
286
                responseType = CommandType.CHANNELS_ENUMERATE_REP;
×
287
                responseCommand = chanEnumRep;
×
288

289
                break;
×
290
            }
291
            /**
292
             * Handle channel joins
293
             */
294
            case CommandType.MEMBERSHIP_JOIN:
×
295
            {
296
                ChannelMembership chanMemReq = cast(ChannelMembership)baseMessage.getCommand();
×
297
                string channel = chanMemReq.getChannel();
×
298

299
                // Join the channel
300
                ChannelManager chanMan = this.associatedServer.getChannelManager();
×
301
                bool status = chanMan.membershipJoin(channel, this.myUsername); // TODO: Handle return value
×
302
                chanMemReq.replyGood();
×
303

304
                mType = MessageType.CLIENT_TO_SERVER;
×
305
                responseType = CommandType.MEMBERSHIP_JOIN_REP;
×
306
                responseCommand = chanMemReq;
×
307

308
                break;
×
309
            }
310
            /**
311
             * Handle channel membership requests
312
             */
313
            case CommandType.MEMBERSHIP_LIST:
×
314
            {
315
                ChannelMembership chanMemReq = cast(ChannelMembership)baseMessage.getCommand();
×
316
                string channel = chanMemReq.getChannel();
×
317

318
                // Obtain the current members
319
                ChannelManager chanMan = this.associatedServer.getChannelManager();
×
320
                string[] currentMembers;
×
321
                
322
                // TODO: Handle return value
323
                bool status = chanMan.membershipList(channel, currentMembers);
×
324
                logger.dbg("Current members of '"~channel~"': ", currentMembers);
×
325
                chanMemReq.listReplyGood(currentMembers);
×
326

327
                mType = MessageType.CLIENT_TO_SERVER;
×
328
                responseType = CommandType.MEMBERSHIP_LIST_REP;
×
329
                responseCommand = chanMemReq;
×
330
                
331
                break;
×
332
            }
333
            /**
334
             * Handle channel leaves
335
             */
336
            case CommandType.MEMBERSHIP_LEAVE:
×
337
            {
338
                ChannelMembership chanMemReq = cast(ChannelMembership)baseMessage.getCommand();
×
339
                string channel = chanMemReq.getChannel();
×
340

341
                // Join the channel
342
                ChannelManager chanMan = this.associatedServer.getChannelManager();
×
343
                bool status = chanMan.membershipLeave(channel, this.myUsername); // TODO: Handle return value
×
344
                chanMemReq.replyGood();
×
345

346
                mType = MessageType.CLIENT_TO_SERVER;
×
347
                responseType = CommandType.MEMBERSHIP_LEAVE_REP;
×
348
                responseCommand = chanMemReq;
×
349

350
                break;
×
351
            }
352
            /**
353
             * Handle message sending
354
             */
355
            case CommandType.CHANNEL_SEND_MESSAGE:
×
356
            {
357
                ChannelMessage chanMesg = cast(ChannelMessage)baseMessage.getCommand();
×
358
            
359
                // TODO: Get channel, lookup and do permission checks
360

361
                // TODO: Use a messagemanager thing here
362
                MessageManager mesgMan = this.associatedServer.getMessageManager();
×
363

364

365
                // TODO: Check multiple recipients
366
                string[] recipients = chanMesg.getRecipients();
×
367
                foreach(string to; recipients)
×
368
                {
369
                    Message message;
×
370
                    message.setBody(chanMesg.getMessage());
×
371
                    message.setFrom(this.myUsername);
×
372
                    message.setDestination(to);
×
373

374
                    logger.dbg("Sending message: ", message);
×
375
                    mesgMan.sendq(message);
×
376
                }
377

378
                // TODO: Set this ONLY if we succeeeded in delivery
379
                chanMesg.messageDelivered();
×
380

381
                mType = MessageType.CLIENT_TO_SERVER;
×
382
                responseType = CommandType.SEND_CHANNEL_MESG_REP;
×
383
                responseCommand = chanMesg;
×
384

385
                break;
×
386
            }
387
            /** 
388
             * Anything else is an unknown
389
             * command, therefore generate
390
             * an error reply
391
             */
392
            default:
×
393
            {
394
                logger.warn("Received unsupported message type", baseMessage);
×
395
            
396
                UnknownCommandReply unknownCmdReply = new UnknownCommandReply("Command with type number: "~to!(string)(cast(ulong)incomingCommandType));
×
397

398
                mType = MessageType.CLIENT_TO_SERVER;
×
399
                responseType = CommandType.UNKNOWN_COMMAND;
×
400
                responseCommand = unknownCmdReply;
×
401

402
                logger.warn("We have generated err: ", responseCommand);
×
403
                break;
×
404
            }
405
        }
406

407
        encode_n_send:
408

409
        // Generate response
410
        response = new BaseMessage(mType, responseType, responseCommand);
×
411

412
        // Construct a response using the same tag
413
        // (for matching) but a new payload (the
414
        // response message)
415
        incomingMessage.setPayload(response.encode());
×
416
        
417
        return incomingMessage;
×
418
    }
419

420
    /** 
421
     * Creates a new connection by associating a newly created
422
     * Connection instance with the provided Server and Socket
423
     * after which it will be added to the server's connection
424
     * queue, finally starting the thread that manages this connection
425
     *
426
     * TODO: Change this, the returning is goofy ah, I think perhaps
427
     * we should only construct it and then let `Server.addConnection()`
428
     * call start etc. - seeing that a Listener will call this
429
     *
430
     * Params:
431
     *   associatedServer = the server to associate with
432
     *   clientStream = the associated stream backing the client
433
     *
434
     * Returns: the newly created Connection object
435
     */
436
    public static Connection newConnection(Server associatedServer, Stream clientStream)
437
    {
438
        Connection conn = new Connection(associatedServer, clientStream);
×
439

440
        /* Associate this connection with the provided server */
441
        associatedServer.addConnection(conn);
×
442

443
        /* Start the worker on a seperate thread */
444
        // new Thread(&worker);
445
        conn.start();
×
446

447

448
        return conn;
×
449
    }
450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc