• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenLightingProject / ola / 16900099157

12 Aug 2025 05:43AM UTC coverage: 45.72% (-0.02%) from 45.742%
16900099157

Pull #2016

github

web-flow
Merge c368ef6ae into eaf937e80
Pull Request #2016: Bump actions/checkout from 4 to 5

7586 of 17462 branches covered (43.44%)

22424 of 49046 relevant lines covered (45.72%)

51.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/tools/e133/DeviceManagerImpl.cpp
1
/*
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU Library General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
15
 *
16
 * DeviceManagerImpl.cpp
17
 * Copyright (C) 2013 Simon Newton
18
 */
19

20
#include <ola/Callback.h>
21
#include <ola/Clock.h>
22
#include <ola/Logging.h>
23
#include <ola/acn/ACNPort.h>
24
#include <ola/acn/CID.h>
25
#include <ola/base/Macro.h>
26
#include <ola/e133/E133Enums.h>
27
#include <ola/io/NonBlockingSender.h>
28
#include <ola/io/SelectServer.h>
29
#include <ola/network/AdvancedTCPConnector.h>
30
#include <ola/network/IPV4Address.h>
31
#include <ola/network/Socket.h>
32
#include <ola/network/TCPSocketFactory.h>
33
#include <ola/stl/STLUtils.h>
34

35
#include <memory>
36
#include <string>
37
#include <vector>
38

39
#include "libs/acn/E133HealthCheckedConnection.h"
40
#include "libs/acn/E133Inflator.h"
41
#include "libs/acn/E133StatusPDU.h"
42
#include "libs/acn/TCPTransport.h"
43

44
#include "tools/e133/DeviceManagerImpl.h"
45
#include "tools/e133/E133Endpoint.h"
46

47
namespace ola {
48
namespace e133 {
49

50
using ola::NewCallback;
51
using ola::NewSingleCallback;
52
using ola::STLContains;
53
using ola::STLFindOrNull;
54
using ola::TimeInterval;
55
using ola::acn::CID;
56
using ola::io::NonBlockingSender;
57
using ola::network::GenericSocketAddress;
58
using ola::network::IPV4Address;
59
using ola::network::IPV4SocketAddress;
60
using ola::network::TCPSocket;
61
using ola::acn::IncomingTCPTransport;
62

63
using std::auto_ptr;
64
using std::string;
65

66

67
/**
68
 * Holds everything we need to manage a TCP connection to a E1.33 device.
69
 */
70
class DeviceState {
71
 public:
72
    DeviceState()
×
73
      : socket(NULL),
×
74
        message_queue(NULL),
×
75
        health_checked_connection(NULL),
×
76
        in_transport(NULL),
×
77
        am_designated_controller(false) {
×
78
    }
79

80
    // The following may be NULL.
81
    // The socket connected to the E1.33 device
82
    auto_ptr<TCPSocket> socket;
83
    auto_ptr<NonBlockingSender> message_queue;
84
    // The Health Checked connection
85
    auto_ptr<E133HealthCheckedConnection> health_checked_connection;
86
    auto_ptr<IncomingTCPTransport> in_transport;
87

88
    // True if we're the designated controller.
89
    bool am_designated_controller;
90

91
 private:
92
    DISALLOW_COPY_AND_ASSIGN(DeviceState);
93
};
94

95

96
// 5 second connect() timeout
97
const TimeInterval DeviceManagerImpl::TCP_CONNECT_TIMEOUT(5, 0);
98
// retry TCP connects after 5 seconds
99
const TimeInterval DeviceManagerImpl::INITIAL_TCP_RETRY_DELAY(5, 0);
100
// we grow the retry interval to a max of 30 seconds
101
const TimeInterval DeviceManagerImpl::MAX_TCP_RETRY_DELAY(30, 0);
102

103

104
/**
105
 * Construct a new DeviceManagerImpl
106
 * @param ss a pointer to a SelectServerInterface to use
107
 * @param cid the CID of this controller.
108
 */
109
DeviceManagerImpl::DeviceManagerImpl(ola::io::SelectServerInterface *ss,
×
110
                             ola::e133::MessageBuilder *message_builder)
×
111
    : m_ss(ss),
×
112
      m_tcp_socket_factory(NewCallback(this, &DeviceManagerImpl::OnTCPConnect)),
×
113
      m_connector(m_ss, &m_tcp_socket_factory, TCP_CONNECT_TIMEOUT),
×
114
      m_backoff_policy(INITIAL_TCP_RETRY_DELAY, MAX_TCP_RETRY_DELAY),
×
115
      m_message_builder(message_builder),
×
116
      m_root_inflator(NewCallback(this, &DeviceManagerImpl::RLPDataReceived)) {
×
117
  m_root_inflator.AddInflator(&m_e133_inflator);
×
118
  m_e133_inflator.AddInflator(&m_rdm_inflator);
×
119
  m_rdm_inflator.SetRDMHandler(
×
120
      NewCallback(this, &DeviceManagerImpl::EndpointRequest));
121
}
×
122

123

124
/**
125
 * Clean up
126
 */
127
DeviceManagerImpl::~DeviceManagerImpl() {
×
128
  // close out all tcp sockets and free state
129
  ola::STLDeleteValues(&m_device_map);
×
130
}
×
131

132

133
/**
134
 * Set the callback to be run when RDMNet data is received from a device.
135
 * @param callback the RDMMessageCallback to run when data is received.
136
 */
137
void DeviceManagerImpl::SetRDMMessageCallback(RDMMessageCallback *callback) {
×
138
  m_rdm_callback.reset(callback);
×
139
}
×
140

141

142
/**
143
 * Set the callback to be run when we become the designated controller for a
144
 * device.
145
 */
146
void DeviceManagerImpl::SetAcquireDeviceCallback(
×
147
    AcquireDeviceCallback *callback) {
148
  m_acquire_device_cb_.reset(callback);
×
149
}
×
150

151

152
/*
153
 * Set the callback to be run when we lose the designated controller status for
154
 * a device.
155
 */
156
void DeviceManagerImpl::SetReleaseDeviceCallback(
×
157
    ReleaseDeviceCallback *callback) {
158
  m_release_device_cb_.reset(callback);
×
159
}
×
160

161

162
/**
163
 * Start maintaining a connection to this device.
164
 */
165
void DeviceManagerImpl::AddDevice(const IPV4Address &ip_address) {
×
166
  if (STLContains(m_device_map, ip_address.AsInt())) {
×
167
    return;
168
  }
169

170
  DeviceState *device_state = new DeviceState();
×
171
  m_device_map[ip_address.AsInt()] = device_state;
×
172

173
  OLA_INFO << "Adding " << ip_address << ":" << ola::acn::E133_PORT;
×
174
  // start the non-blocking connect
175
  m_connector.AddEndpoint(
×
176
      IPV4SocketAddress(ip_address, ola::acn::E133_PORT),
×
177
      &m_backoff_policy);
178
}
179

180

181
/**
182
 * Remove a device, closing the connection if we have one.
183
 */
184
void DeviceManagerImpl::RemoveDevice(const IPV4Address &ip_address) {
×
185
  DeviceMap::iterator iter = m_device_map.find(ip_address.AsInt());
×
186
  if (iter == m_device_map.end())
×
187
    return;
×
188

189
  // TODO(simon): implement this
190
  OLA_WARN << "RemoveDevice not implemented";
×
191
}
192

193

194
/**
195
 * Remove a device if there is no open connection.
196
 */
197
void DeviceManagerImpl::RemoveDeviceIfNotConnected(
×
198
    const IPV4Address &ip_address) {
199
  DeviceMap::iterator iter = m_device_map.find(ip_address.AsInt());
×
200
  if (iter == m_device_map.end())
×
201
    return;
×
202

203
  // TODO(simon): implement this
204
  OLA_WARN << "RemoveDevice not implemented";
×
205
}
206

207

208
/**
209
 * Populate the vector with the devices that we are the designated controller
210
 * for.
211
 */
212
void DeviceManagerImpl::ListManagedDevices(vector<IPV4Address> *devices) const {
×
213
  DeviceMap::const_iterator iter = m_device_map.begin();
×
214
  for (; iter != m_device_map.end(); ++iter) {
×
215
    if (iter->second->am_designated_controller)
×
216
      devices->push_back(IPV4Address(iter->first));
×
217
  }
218
}
×
219

220

221
/**
222
 * Called when a TCP socket is connected. Note that we're not the designated
223
 * controller at this point. That only happens if we receive data on the
224
 * connection.
225
 */
226
void DeviceManagerImpl::OnTCPConnect(TCPSocket *socket_ptr) {
×
227
  auto_ptr<TCPSocket> socket(socket_ptr);
×
228
  GenericSocketAddress address = socket->GetPeerAddress();
×
229
  if (address.Family() != AF_INET) {
×
230
    OLA_WARN << "Non IPv4 socket " << address;
×
231
    return;
×
232
  }
233
  IPV4SocketAddress v4_address = address.V4Addr();
×
234
  DeviceState *device_state = STLFindOrNull(
×
235
      m_device_map, v4_address.Host().AsInt());
×
236
  if (!device_state) {
×
237
    OLA_FATAL << "Unable to locate socket for " << v4_address;
×
238
    return;
×
239
  }
240

241
  // setup the incoming transport, we don't need to setup the outgoing one
242
  // until we've got confirmation that we're the designated controller.
243
  device_state->socket.reset(socket.release());
×
244
  device_state->in_transport.reset(new IncomingTCPTransport(&m_root_inflator,
×
245
                                                            socket_ptr));
×
246

247
  device_state->socket->SetOnData(
×
248
      NewCallback(this, &DeviceManagerImpl::ReceiveTCPData, v4_address.Host(),
×
249
                  device_state->in_transport.get()));
250
  device_state->socket->SetOnClose(
×
251
      NewSingleCallback(this, &DeviceManagerImpl::SocketClosed,
×
252
                        v4_address.Host()));
253
  m_ss->AddReadDescriptor(socket_ptr);
×
254

255
  // TODO(simon): Setup a timeout that closes this connect if we don't receive
256
  // anything.
257
}
×
258

259

260
/**
261
 * Receive data on a TCP connection
262
 */
263
void DeviceManagerImpl::ReceiveTCPData(IPV4Address ip_address,
×
264
                                   IncomingTCPTransport *transport) {
265
  if (!transport->Receive()) {
×
266
    OLA_WARN << "TCP STREAM IS BAD!!!";
×
267
    SocketClosed(ip_address);
×
268
  }
269
}
×
270

271

272
/**
273
 * Called when a connection is deemed unhealthy.
274
 */
275
void DeviceManagerImpl::SocketUnhealthy(IPV4Address ip_address) {
×
276
  OLA_INFO << "connection to " << ip_address << " went unhealthy";
×
277
  SocketClosed(ip_address);
×
278
}
×
279

280

281
/**
282
 * Called when a socket is closed.
283
 * This can mean one of two things:
284
 *  if we weren't the designated controller, then we lost the race.
285
 *  if we were the designated controller, the TCP connection was closed, or
286
 *  went unhealthy.
287
 */
288
void DeviceManagerImpl::SocketClosed(IPV4Address ip_address) {
×
289
  OLA_INFO << "connection to " << ip_address << " was closed";
×
290

291
  DeviceState *device_state = STLFindOrNull(m_device_map, ip_address.AsInt());
×
292
  if (!device_state) {
×
293
    OLA_FATAL << "Unable to locate socket for " << ip_address;
×
294
    return;
×
295
  }
296

297
  if (device_state->am_designated_controller) {
×
298
    device_state->am_designated_controller = false;
×
299
    if (m_release_device_cb_.get())
×
300
      m_release_device_cb_->Run(ip_address);
×
301

302
    m_connector.Disconnect(
×
303
        IPV4SocketAddress(ip_address, ola::acn::E133_PORT));
×
304
  } else {
305
    // we lost the race, so don't try to reconnect
306
    m_connector.Disconnect(
×
307
        IPV4SocketAddress(ip_address, ola::acn::E133_PORT), true);
×
308
  }
309

310
  device_state->health_checked_connection.reset();
×
311
  device_state->message_queue.reset();
×
312
  device_state->in_transport.reset();
×
313
  m_ss->RemoveReadDescriptor(device_state->socket.get());
×
314
  device_state->socket.reset();
×
315
}
316

317

318
/**
319
 * Called when we receive E1.33 data. If this arrived over TCP we notify the
320
 * health checked connection.
321
 */
322
void DeviceManagerImpl::RLPDataReceived(
×
323
    const ola::acn::TransportHeader &header) {
324
  if (header.Transport() != ola::acn::TransportHeader::TCP)
×
325
    return;
×
326
  IPV4Address src_ip = header.Source().Host();
×
327

328
  DeviceState *device_state = STLFindOrNull(m_device_map, src_ip.AsInt());
×
329
  if (!device_state) {
×
330
    OLA_FATAL << "Received data but unable to lookup socket for " <<
×
331
      src_ip;
×
332
    return;
×
333
  }
334

335
  // If we're already the designated controller, we just need to notify the
336
  // HealthChecker.
337
  if (device_state->am_designated_controller) {
×
338
    device_state->health_checked_connection->HeartbeatReceived();
×
339
    return;
×
340
  }
341

342
  // This is the first packet received on this connection, which is a sign
343
  // we're now the designated controller. Setup the HealthChecker & outgoing
344
  // transports.
345
  device_state->am_designated_controller = true;
×
346
  OLA_INFO << "Now the designated controller for " << header.Source();
×
347
  if (m_acquire_device_cb_.get())
×
348
    m_acquire_device_cb_->Run(src_ip);
×
349

350
  device_state->message_queue.reset(
×
351
      new NonBlockingSender(device_state->socket.get(), m_ss,
×
352
                            m_message_builder->pool()));
×
353

354
  E133HealthCheckedConnection *health_checked_connection =
×
355
      new E133HealthCheckedConnection(
356
          m_message_builder,
357
          device_state->message_queue.get(),
358
          NewSingleCallback(this, &DeviceManagerImpl::SocketUnhealthy, src_ip),
×
359
          m_ss);
×
360

361
  if (!health_checked_connection->Setup()) {
×
362
    OLA_WARN << "Failed to setup heartbeat controller for " << src_ip;
×
363
    SocketClosed(src_ip);
×
364
    return;
×
365
  }
366

367
  if (device_state->health_checked_connection.get())
×
368
    OLA_WARN << "pre-existing health_checked_connection for " << src_ip;
×
369
  device_state->health_checked_connection.reset(health_checked_connection);
×
370
}
371

372

373
/**
374
 * Handle a message on the TCP connection.
375
 */
376
void DeviceManagerImpl::EndpointRequest(
×
377
    const ola::acn::TransportHeader *transport_header,
378
    const ola::acn::E133Header *e133_header,
379
    const string &raw_request) {
380
  if (!m_rdm_callback.get())
×
381
    return;
×
382

383
  if (e133_header->Endpoint()) {
×
384
    OLA_WARN << "TCP message for non-0 endpoint. Endpoint = "
×
385
             << e133_header->Endpoint();
×
386
    return;
×
387
  }
388

389
  if (!m_rdm_callback->Run(transport_header->Source().Host(),
×
390
                           e133_header->Endpoint(), raw_request)) {
×
391
    // Don't send an ack
392
    return;
393
  }
394

395
  DeviceState *device_state = STLFindOrNull(
×
396
      m_device_map, transport_header->Source().Host().AsInt());
×
397
  if (!device_state) {
×
398
    OLA_WARN << "Unable to find DeviceState for " << transport_header->Source();
×
399
    return;
×
400
  }
401

402
  ola::io::IOStack packet(m_message_builder->pool());
×
403
  m_message_builder->BuildTCPE133StatusPDU(
×
404
      &packet, e133_header->Sequence(), e133_header->Endpoint(),
×
405
      ola::e133::SC_E133_ACK, "OK");
406
  device_state->message_queue->SendMessage(&packet);
×
407
}
×
408
}  // namespace e133
409
}  // namespace ola
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc