• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenLightingProject / ola / 6370547479

01 Oct 2023 12:26PM UTC coverage: 45.025% (-0.02%) from 45.048%
6370547479

push

github

web-flow
Merge pull request #1898 from peternewman/mac-type-tests

Add the ability to set a universe to blackout instead via ola_set_dmx

7603 of 17646 branches covered (0.0%)

13 of 13 new or added lines in 1 file covered. (100.0%)

21415 of 47562 relevant lines covered (45.03%)

55.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.38
/common/rpc/RpcChannel.cpp
1
/*
2
 * This library is free software; you can redistribute it and/or
3
 * modify it under the terms of the GNU Lesser General Public
4
 * License as published by the Free Software Foundation; either
5
 * version 2.1 of the License, or (at your option) any later version.
6
 *
7
 * This library is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
10
 * Lesser General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU Lesser General Public
13
 * License along with this library; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
 *
16
 * RpcChannel.cpp
17
 * Interface for the UDP RPC Channel
18
 * Copyright (C) 2005 Simon Newton
19
 */
20

21
#include "common/rpc/RpcChannel.h"
22

23
#include <errno.h>
24
#include <google/protobuf/service.h>
25
#include <google/protobuf/message.h>
26
#include <google/protobuf/descriptor.h>
27
#include <google/protobuf/dynamic_message.h>
28
#include <string>
29

30
#include "common/rpc/Rpc.pb.h"
31
#include "common/rpc/RpcSession.h"
32
#include "common/rpc/RpcController.h"
33
#include "common/rpc/RpcHeader.h"
34
#include "common/rpc/RpcService.h"
35
#include "ola/Callback.h"
36
#include "ola/Logging.h"
37
#include "ola/base/Array.h"
38
#include "ola/stl/STLUtils.h"
39

40
namespace ola {
41
namespace rpc {
42

43
using google::protobuf::Message;
44
using google::protobuf::MethodDescriptor;
45
using google::protobuf::ServiceDescriptor;
46
using std::auto_ptr;
47
using std::string;
48

49
const char RpcChannel::K_RPC_RECEIVED_TYPE_VAR[] = "rpc-received-type";
50
const char RpcChannel::K_RPC_RECEIVED_VAR[] = "rpc-received";
51
const char RpcChannel::K_RPC_SENT_ERROR_VAR[] = "rpc-send-errors";
52
const char RpcChannel::K_RPC_SENT_VAR[] = "rpc-sent";
53
const char RpcChannel::STREAMING_NO_RESPONSE[] = "STREAMING_NO_RESPONSE";
54

55
const char *RpcChannel::K_RPC_VARIABLES[] = {
56
  K_RPC_RECEIVED_VAR,
57
  K_RPC_SENT_ERROR_VAR,
58
  K_RPC_SENT_VAR,
59
};
60

61
class OutstandingRequest {
62
  /*
63
   * These are requests on the server end that haven't completed yet.
64
   */
65
 public:
66
  OutstandingRequest(int id,
4✔
67
                     RpcSession *session,
68
                     google::protobuf::Message *response)
69
      : id(id),
4✔
70
        controller(new RpcController(session)),
4✔
71
        response(response) {
4✔
72
  }
4✔
73
  ~OutstandingRequest() {
4✔
74
    if (controller) {
4✔
75
      delete controller;
4✔
76
    }
77
    if (response) {
4✔
78
      delete response;
4✔
79
    }
80
  }
4✔
81

82
  int id;
83
  RpcController *controller;
84
  google::protobuf::Message *response;
85
};
86

87

88
class OutstandingResponse {
89
  /*
90
   * These are Requests on the client end that haven't completed yet.
91
   */
92
 public:
93
  OutstandingResponse(int id,
4✔
94
                      RpcController *controller,
95
                      SingleUseCallback0<void> *callback,
96
                      Message *reply)
97
      : id(id),
4✔
98
        controller(controller),
4✔
99
        callback(callback),
4✔
100
        reply(reply) {
4✔
101
  }
102

103
  int id;
104
  RpcController *controller;
105
  SingleUseCallback0<void> *callback;
106
  Message *reply;
107
};
108

109
RpcChannel::RpcChannel(
15✔
110
    RpcService *service,
111
    ola::io::ConnectedDescriptor *descriptor,
112
    ExportMap *export_map)
15✔
113
    : m_session(new RpcSession(this)),
15✔
114
      m_service(service),
15✔
115
      m_descriptor(descriptor),
15✔
116
      m_buffer(NULL),
15✔
117
      m_buffer_size(0),
15✔
118
      m_expected_size(0),
15✔
119
      m_current_size(0),
15✔
120
      m_export_map(export_map),
15✔
121
      m_recv_type_map(NULL) {
15✔
122
  if (descriptor) {
15✔
123
    descriptor->SetOnData(
15✔
124
        ola::NewCallback(this, &RpcChannel::DescriptorReady));
125
    descriptor->SetOnClose(
15✔
126
        ola::NewSingleCallback(this, &RpcChannel::HandleChannelClose));
127
  }
128

129
  if (m_export_map) {
15✔
130
    for (unsigned int i = 0; i < arraysize(K_RPC_VARIABLES); ++i) {
12✔
131
      m_export_map->GetCounterVar(string(K_RPC_VARIABLES[i]));
18✔
132
    }
133
    m_recv_type_map = m_export_map->GetUIntMapVar(K_RPC_RECEIVED_TYPE_VAR,
6✔
134
                                                  "type");
135
  }
136
}
15✔
137

138
RpcChannel::~RpcChannel() {
15✔
139
  free(m_buffer);
15✔
140
}
15✔
141

142
void RpcChannel::DescriptorReady() {
10✔
143
  if (!m_expected_size) {
10✔
144
    // this is a new msg
145
    unsigned int version;
10✔
146
    if (ReadHeader(&version, &m_expected_size) < 0)
10✔
147
      return;
×
148

149
    if (!m_expected_size)
10✔
150
      return;
151

152
    if (version != PROTOCOL_VERSION) {
10✔
153
      OLA_WARN << "protocol mismatch " << version << " != " <<
×
154
        PROTOCOL_VERSION;
×
155
      return;
×
156
    }
157

158
    if (m_expected_size > MAX_BUFFER_SIZE) {
10✔
159
      OLA_WARN << "Incoming message size " << m_expected_size
×
160
                << " is larger than MAX_BUFFER_SIZE: " << MAX_BUFFER_SIZE;
×
161
      m_descriptor->Close();
×
162
      return;
×
163
    }
164

165
    m_current_size = 0;
10✔
166
    m_buffer_size = AllocateMsgBuffer(m_expected_size);
10✔
167

168
    if (m_buffer_size < m_expected_size) {
10✔
169
      OLA_WARN << "buffer size to small " << m_buffer_size << " < " <<
×
170
        m_expected_size;
×
171
      return;
×
172
    }
173
  }
174

175
  if (!m_descriptor) {
10✔
176
    return;
177
  }
178

179
  unsigned int data_read;
10✔
180
  if (m_descriptor->Receive(m_buffer + m_current_size,
10✔
181
                            m_expected_size - m_current_size,
10✔
182
                            data_read) < 0) {
183
    OLA_WARN << "something went wrong in descriptor recv\n";
×
184
    return;
×
185
  }
186

187
  m_current_size += data_read;
10✔
188

189
  if (m_current_size == m_expected_size) {
10✔
190
    // we've got all of this message so parse it.
191
    if (!HandleNewMsg(m_buffer, m_expected_size)) {
10✔
192
      // this probably means we've messed the framing up, close the channel
193
      OLA_WARN << "Errors detected on RPC channel, closing";
×
194
      m_descriptor->Close();
×
195
    }
196
    m_expected_size = 0;
10✔
197
  }
198
  return;
199
}
200

201
void RpcChannel::SetChannelCloseHandler(CloseCallback *callback) {
9✔
202
  m_on_close.reset(callback);
9✔
203
}
9✔
204

205
void RpcChannel::CallMethod(const MethodDescriptor *method,
9✔
206
                            RpcController *controller,
207
                            const Message *request,
208
                            Message *reply,
209
                            SingleUseCallback0<void> *done) {
210
  // TODO(simonn): reduce the number of copies here
211
  string output;
9✔
212
  RpcMessage message;
9✔
213
  bool is_streaming = false;
9✔
214

215
  // Streaming methods are those with a reply set to STREAMING_NO_RESPONSE and
216
  // no controller, request or closure provided
217
  if (method->output_type()->name() == STREAMING_NO_RESPONSE) {
9✔
218
    if (controller || reply || done) {
5✔
219
      OLA_FATAL << "Calling streaming method " << method->name() <<
×
220
        " but a controller, reply or closure in non-NULL";
×
221
      return;
×
222
    }
223
    is_streaming = true;
224
  }
225

226
  message.set_type(is_streaming ? STREAM_REQUEST : REQUEST);
9✔
227
  message.set_id(m_sequence.Next());
9✔
228
  message.set_name(method->name());
9✔
229

230
  request->SerializeToString(&output);
9✔
231
  message.set_buffer(output);
9✔
232
  bool r = SendMsg(&message);
9✔
233

234
  if (is_streaming)
9✔
235
    return;
236

237
  if (!r) {
4✔
238
    // Send failed, call the handler now.
239
    controller->SetFailed("Failed to send request");
×
240
    done->Run();
×
241
    return;
242
  }
243

244
  OutstandingResponse *response = new OutstandingResponse(
4✔
245
      message.id(), controller, done, reply);
4✔
246

247
  auto_ptr<OutstandingResponse> old_response(
4✔
248
      STLReplacePtr(&m_responses, message.id(), response));
4✔
249

250
  if (old_response.get()) {
4✔
251
    // fail any outstanding response with the same id
252
    OLA_WARN << "response " << old_response->id << " already pending, failing "
×
253
             << "now";
×
254
    response->controller->SetFailed("Duplicate request found");
×
255
    response->callback->Run();
×
256
  }
257
}
12✔
258

259
void RpcChannel::RequestComplete(OutstandingRequest *request) {
4✔
260
  string output;
4✔
261
  RpcMessage message;
4✔
262

263
  if (request->controller->Failed()) {
4✔
264
    SendRequestFailed(request);
2✔
265
    return;
2✔
266
  }
267

268
  message.set_type(RESPONSE);
2✔
269
  message.set_id(request->id);
2✔
270
  request->response->SerializeToString(&output);
2✔
271
  message.set_buffer(output);
2✔
272
  SendMsg(&message);
2✔
273
  DeleteOutstandingRequest(request);
2✔
274
}
4✔
275

276
RpcSession *RpcChannel::Session() {
6✔
277
  return m_session.get();
6✔
278
}
279

280
// private
281
//-----------------------------------------------------------------------------
282

283
/*
284
 * Write an RpcMessage to the write descriptor.
285
 */
286
bool RpcChannel::SendMsg(RpcMessage *msg) {
13✔
287
  if (!(m_descriptor && m_descriptor->ValidReadDescriptor())) {
13✔
288
    OLA_WARN << "RPC descriptor closed, not sending messages";
×
289
    return false;
×
290
  }
291

292
  uint32_t header;
13✔
293
  // reserve the first 4 bytes for the header
294
  string output(sizeof(header), 0);
13✔
295
  msg->AppendToString(&output);
13✔
296
  int length = output.size();
13✔
297

298
  RpcHeader::EncodeHeader(&header, PROTOCOL_VERSION,
13✔
299
                                length - sizeof(header));
300
  output.replace(
13✔
301
      0, sizeof(header),
302
      reinterpret_cast<const char*>(&header), sizeof(header));
303

304
  ssize_t ret = m_descriptor->Send(
13✔
305
      reinterpret_cast<const uint8_t*>(output.data()), length);
13✔
306

307
  if (ret != length) {
13✔
308
    OLA_WARN << "Failed to send full RPC message, closing channel";
×
309

310
    if (m_export_map) {
×
311
      (*m_export_map->GetCounterVar(K_RPC_SENT_ERROR_VAR))++;
×
312
    }
313

314
    // At this point there is no point using the descriptor since framing has
315
    // probably been messed up.
316
    // TODO(simon): consider if it's worth leaving the descriptor open for
317
    // reading.
318
    m_descriptor = NULL;
×
319

320
    HandleChannelClose();
×
321
    return false;
322
  }
323

324
  if (m_export_map) {
13✔
325
    (*m_export_map->GetCounterVar(K_RPC_SENT_VAR))++;
×
326
  }
327
  return true;
328
}
13✔
329

330

331
/*
332
 * Allocate an incoming message buffer
333
 * @param size the size of the new buffer to allocate
334
 * @returns the size of the new buffer
335
 */
336
int RpcChannel::AllocateMsgBuffer(unsigned int size) {
10✔
337
  unsigned int requested_size = size;
10✔
338
  uint8_t *new_buffer;
10✔
339

340
  if (size < m_buffer_size)
10✔
341
    return size;
2✔
342

343
  if (m_buffer_size == 0 && size < INITIAL_BUFFER_SIZE)
8✔
344
    requested_size = INITIAL_BUFFER_SIZE;
345

346
  if (requested_size > MAX_BUFFER_SIZE) {
×
347
    OLA_WARN << "Incoming message size " << requested_size
×
348
              << " is larger than MAX_BUFFER_SIZE: " << MAX_BUFFER_SIZE;
×
349
    return m_buffer_size;
×
350
  }
351

352
  new_buffer = static_cast<uint8_t*>(realloc(m_buffer, requested_size));
8✔
353
  if (!new_buffer)
8✔
354
    return m_buffer_size;
×
355

356
  m_buffer = new_buffer;
8✔
357
  m_buffer_size = requested_size;
8✔
358
  return requested_size;
8✔
359
}
360

361

362
/*
363
 * Read 4 bytes and decode the header fields.
364
 * @returns: -1 if there is no data is available, version and size are 0
365
 */
366
int RpcChannel::ReadHeader(unsigned int *version,
10✔
367
                                 unsigned int *size) const {
368
  uint32_t header;
10✔
369
  unsigned int data_read = 0;
10✔
370
  *version = *size = 0;
10✔
371

372
  if (m_descriptor->Receive(reinterpret_cast<uint8_t*>(&header),
10✔
373
                            sizeof(header), data_read)) {
374
    OLA_WARN << "read header error: " << strerror(errno);
×
375
    return -1;
×
376
  }
377

378
  if (!data_read)
10✔
379
    return 0;
380

381
  RpcHeader::DecodeHeader(header, version, size);
10✔
382
  return 0;
10✔
383
}
384

385

386
/*
387
 * Parse a new message and handle it.
388
 */
389
bool RpcChannel::HandleNewMsg(uint8_t *data, unsigned int size) {
10✔
390
  RpcMessage msg;
10✔
391
  if (!msg.ParseFromArray(data, size)) {
10✔
392
    OLA_WARN << "Failed to parse RPC";
×
393
    return false;
×
394
  }
395

396
  if (m_export_map)
10✔
397
    (*m_export_map->GetCounterVar(K_RPC_RECEIVED_VAR))++;
×
398

399
  switch (msg.type()) {
10✔
400
    case REQUEST:
4✔
401
      if (m_recv_type_map)
4✔
402
        (*m_recv_type_map)["request"]++;
×
403
      HandleRequest(&msg);
4✔
404
      break;
405
    case RESPONSE:
2✔
406
      if (m_recv_type_map)
2✔
407
        (*m_recv_type_map)["response"]++;
×
408
      HandleResponse(&msg);
2✔
409
      break;
410
    case RESPONSE_CANCEL:
×
411
      if (m_recv_type_map)
×
412
        (*m_recv_type_map)["cancelled"]++;
×
413
      HandleCanceledResponse(&msg);
×
414
      break;
415
    case RESPONSE_FAILED:
2✔
416
      if (m_recv_type_map)
2✔
417
        (*m_recv_type_map)["failed"]++;
×
418
      HandleFailedResponse(&msg);
2✔
419
      break;
420
    case RESPONSE_NOT_IMPLEMENTED:
×
421
      if (m_recv_type_map)
×
422
        (*m_recv_type_map)["not-implemented"]++;
×
423
      HandleNotImplemented(&msg);
×
424
      break;
425
    case STREAM_REQUEST:
2✔
426
      if (m_recv_type_map)
2✔
427
        (*m_recv_type_map)["stream_request"]++;
×
428
      HandleStreamRequest(&msg);
2✔
429
      break;
430
    default:
×
431
      OLA_WARN << "not sure of msg type " << msg.type();
×
432
      break;
×
433
  }
434
  return true;
435
}
10✔
436

437

438
/*
439
 * Handle a new RPC method call.
440
 */
441
void RpcChannel::HandleRequest(RpcMessage *msg) {
4✔
442
  if (!m_service) {
4✔
443
    OLA_WARN << "no service registered";
×
444
    return;
×
445
  }
446

447
  const ServiceDescriptor *service = m_service->GetDescriptor();
4✔
448
  if (!service) {
4✔
449
    OLA_WARN << "failed to get service descriptor";
×
450
    return;
×
451
  }
452
  const MethodDescriptor *method = service->FindMethodByName(msg->name());
4✔
453
  if (!method) {
4✔
454
    OLA_WARN << "failed to get method descriptor";
×
455
    SendNotImplemented(msg->id());
×
456
    return;
×
457
  }
458

459
  Message* request_pb = m_service->GetRequestPrototype(method).New();
4✔
460
  Message* response_pb = m_service->GetResponsePrototype(method).New();
4✔
461

462
  if (!request_pb || !response_pb) {
4✔
463
    OLA_WARN << "failed to get request or response objects";
×
464
    return;
×
465
  }
466

467
  if (!request_pb->ParseFromString(msg->buffer())) {
4✔
468
    OLA_WARN << "parsing of request pb failed";
×
469
    return;
×
470
  }
471

472
  OutstandingRequest *request = new OutstandingRequest(
4✔
473
      msg->id(), m_session.get(), response_pb);
4✔
474

475
  if (m_requests.find(msg->id()) != m_requests.end()) {
4✔
476
    OLA_WARN << "dup sequence number for request " << msg->id();
×
477
    SendRequestFailed(m_requests[msg->id()]);
×
478
  }
479

480
  m_requests[msg->id()] = request;
4✔
481
  SingleUseCallback0<void> *callback = NewSingleCallback(
4✔
482
      this, &RpcChannel::RequestComplete, request);
483
  m_service->CallMethod(method, request->controller, request_pb, response_pb,
4✔
484
                        callback);
485
  delete request_pb;
4✔
486
}
487

488

489
/*
490
 * Handle a streaming RPC call. This doesn't return any response to the client.
491
 */
492
void RpcChannel::HandleStreamRequest(RpcMessage *msg) {
2✔
493
  if (!m_service) {
2✔
494
    OLA_WARN << "no service registered";
×
495
    return;
×
496
  }
497

498
  const ServiceDescriptor *service = m_service->GetDescriptor();
2✔
499
  if (!service) {
2✔
500
    OLA_WARN << "failed to get service descriptor";
×
501
    return;
×
502
  }
503
  const MethodDescriptor *method = service->FindMethodByName(msg->name());
2✔
504
  if (!method) {
2✔
505
    OLA_WARN << "failed to get method descriptor";
×
506
    SendNotImplemented(msg->id());
×
507
    return;
×
508
  }
509

510
  if (method->output_type()->name() != STREAMING_NO_RESPONSE) {
2✔
511
    OLA_WARN << "Streaming request received for " << method->name() <<
×
512
      ", but the output type isn't STREAMING_NO_RESPONSE";
×
513
    return;
×
514
  }
515

516
  Message* request_pb = m_service->GetRequestPrototype(method).New();
2✔
517

518
  if (!request_pb) {
2✔
519
    OLA_WARN << "failed to get request or response objects";
×
520
    return;
×
521
  }
522

523
  if (!request_pb->ParseFromString(msg->buffer())) {
2✔
524
    OLA_WARN << "parsing of request pb failed";
×
525
    return;
×
526
  }
527

528
  RpcController controller(m_session.get());
2✔
529
  m_service->CallMethod(method, &controller, request_pb, NULL, NULL);
2✔
530
  delete request_pb;
2✔
531
}
2✔
532

533

534
// server side
535
/*
536
 * Notify the caller that the request failed.
537
 */
538
void RpcChannel::SendRequestFailed(OutstandingRequest *request) {
2✔
539
  RpcMessage message;
2✔
540
  message.set_type(RESPONSE_FAILED);
2✔
541
  message.set_id(request->id);
2✔
542
  message.set_buffer(request->controller->ErrorText());
4✔
543
  SendMsg(&message);
2✔
544
  DeleteOutstandingRequest(request);
2✔
545
}
2✔
546

547

548
/*
549
 * Sent if we get a request for a non-existent method.
550
 */
551
void RpcChannel::SendNotImplemented(int msg_id) {
×
552
  RpcMessage message;
×
553
  message.set_type(RESPONSE_NOT_IMPLEMENTED);
×
554
  message.set_id(msg_id);
×
555
  SendMsg(&message);
×
556
}
×
557

558

559
/*
560
 * Cleanup an outstanding request after the response has been returned
561
 */
562
void RpcChannel::DeleteOutstandingRequest(OutstandingRequest *request) {
4✔
563
  STLRemoveAndDelete(&m_requests, request->id);
4✔
564
}
4✔
565

566

567
// client side methods
568
/*
569
 * Handle a RPC response by invoking the callback.
570
 */
571
void RpcChannel::HandleResponse(RpcMessage *msg) {
2✔
572
  auto_ptr<OutstandingResponse> response(
2✔
573
      STLLookupAndRemovePtr(&m_responses, msg->id()));
2✔
574
  if (response.get()) {
2✔
575
    if (!response->reply->ParseFromString(msg->buffer())) {
2✔
576
      OLA_WARN << "Failed to parse response proto for "
×
577
               << response->reply->GetTypeName();
×
578
    }
579
    response->callback->Run();
2✔
580
  }
581
}
2✔
582

583

584
/*
585
 * Handle a RPC response by invoking the callback.
586
 */
587
void RpcChannel::HandleFailedResponse(RpcMessage *msg) {
2✔
588
  auto_ptr<OutstandingResponse> response(
2✔
589
      STLLookupAndRemovePtr(&m_responses, msg->id()));
2✔
590
  if (response.get()) {
2✔
591
    response->controller->SetFailed(msg->buffer());
2✔
592
    response->callback->Run();
2✔
593
  }
594
}
2✔
595

596

597
/*
598
 * Handle a RPC response by invoking the callback.
599
 */
600
void RpcChannel::HandleCanceledResponse(RpcMessage *msg) {
×
601
  OLA_INFO << "Received a canceled response";
×
602
  auto_ptr<OutstandingResponse> response(
×
603
      STLLookupAndRemovePtr(&m_responses, msg->id()));
×
604
  if (response.get()) {
×
605
    response->controller->SetFailed(msg->buffer());
×
606
    response->callback->Run();
×
607
  }
608
}
×
609

610

611
/*
612
 * Handle a NOT_IMPLEMENTED by invoking the callback.
613
 */
614
void RpcChannel::HandleNotImplemented(RpcMessage *msg) {
×
615
  OLA_INFO << "Received a non-implemented response";
×
616
  auto_ptr<OutstandingResponse> response(
×
617
      STLLookupAndRemovePtr(&m_responses, msg->id()));
×
618
  if (response.get()) {
×
619
    response->controller->SetFailed("Not Implemented");
×
620
    response->callback->Run();
×
621
  }
622
}
×
623

624
/*
625
 * Invoke the Channel close handler/
626
 */
627
void RpcChannel::HandleChannelClose() {
7✔
628
  if (m_on_close.get()) {
7✔
629
    m_on_close.release()->Run(m_session.get());
7✔
630
  }
631
}
7✔
632
}  // namespace rpc
633
}  // namespace ola
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc