• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenLightingProject / ola / 17141553775

21 Aug 2025 11:23PM UTC coverage: 45.72% (-0.02%) from 45.742%
17141553775

push

github

web-flow
Merge pull request #2014 from peternewman/mac-be

Tidy the Mac OS Endian behaviour

7586 of 17462 branches covered (43.44%)

22424 of 49046 relevant lines covered (45.72%)

53.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/tools/e133/basic-controller.cpp
1
/*
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU Library General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
15
 *
16
 * basic-controller.cpp
17
 * Copyright (C) 2014 Simon Newton
18
 *
19
 * A controller which just listens for new TCP connections from devices.
20
 * I'm using this for scale testing.
21
 */
22

23
#include <errno.h>
24
#include <ola/Callback.h>
25
#include <ola/Clock.h>
26
#include <ola/Constants.h>
27
#include <ola/ExportMap.h>
28
#include <ola/Logging.h>
29
#include <ola/acn/CID.h>
30
#include <ola/base/Flags.h>
31
#include <ola/base/Init.h>
32
#include <ola/base/SysExits.h>
33
#include <ola/e133/MessageBuilder.h>
34
#include <ola/io/NonBlockingSender.h>
35
#include <ola/io/SelectServer.h>
36
#include <ola/network/TCPSocketFactory.h>
37
#include <ola/stl/STLUtils.h>
38
#include <signal.h>
39

40
#include <map>
41
#include <memory>
42
#include <string>
43
#include <utility>
44

45
#include "libs/acn/E133HealthCheckedConnection.h"
46
#include "libs/acn/RootInflator.h"
47
#include "libs/acn/TCPTransport.h"
48

49
DEFINE_string(listen_ip, "", "The IP Address to listen on");
50
DEFINE_uint16(listen_port, 5569, "The port to listen on");
51
DEFINE_uint16(listen_backlog, 100,
52
              "The backlog for the listen() call. Often limited to 128");
53
DEFINE_uint32(expected_devices, 1,
54
              "Time how long it takes until this many devices connect.");
55
DEFINE_default_bool(stop_after_all_devices, false,
56
            "Exit once all devices connect");
57

58
using ola::NewCallback;
59
using ola::NewSingleCallback;
60
using ola::STLFindOrNull;
61
using ola::TimeInterval;
62
using ola::TimeStamp;
63
using ola::io::NonBlockingSender;
64
using ola::network::GenericSocketAddress;
65
using ola::network::IPV4Address;
66
using ola::network::IPV4SocketAddress;
67
using ola::network::TCPSocket;
68
using ola::acn::IncomingTCPTransport;
69
using std::auto_ptr;
70
using std::string;
71

72
class SimpleE133Controller *controller = NULL;
73

74
/**
75
 * Holds the state for each device
76
 */
77
class DeviceState {
78
 public:
79
  DeviceState()
×
80
    : socket(NULL),
×
81
      message_queue(NULL),
×
82
      health_checked_connection(NULL),
×
83
      in_transport(NULL) {
×
84
  }
85

86
  // The following may be NULL.
87
  // The socket connected to the E1.33 device
88
  auto_ptr<TCPSocket> socket;
89
  auto_ptr<NonBlockingSender> message_queue;
90
  // The Health Checked connection
91
  auto_ptr<E133HealthCheckedConnection> health_checked_connection;
92
  auto_ptr<IncomingTCPTransport> in_transport;
93

94
 private:
95
  DISALLOW_COPY_AND_ASSIGN(DeviceState);
96
};
97

98
/**
99
 * A very simple E1.33 Controller that uses the reverse-connection model.
100
 */
101
class SimpleE133Controller {
102
 public:
103
  struct Options {
×
104
    // The controller to connect to.
105
    IPV4SocketAddress controller;
106

107
    explicit Options(const IPV4SocketAddress &controller)
×
108
        : controller(controller) {
×
109
    }
110
  };
111

112
  explicit SimpleE133Controller(const Options &options);
113
  ~SimpleE133Controller();
114

115
  bool Start();
116
  void Stop() { m_ss.Terminate(); }
×
117

118
 private:
119
  typedef std::map<IPV4SocketAddress, DeviceState*> DeviceMap;
120

121
  TimeStamp m_start_time;
122
  DeviceMap m_device_map;
123

124
  const IPV4SocketAddress m_listen_address;
125
  ola::ExportMap m_export_map;
126
  ola::io::SelectServer m_ss;
127
  ola::network::TCPSocketFactory m_tcp_socket_factory;
128
  ola::network::TCPAcceptingSocket m_listen_socket;
129

130
  ola::e133::MessageBuilder m_message_builder;
131

132
  ola::acn::RootInflator m_root_inflator;
133

134
  bool PrintStats();
135

136
  void OnTCPConnect(TCPSocket *socket);
137
  void ReceiveTCPData(IPV4SocketAddress peer,
138
                      IncomingTCPTransport *transport);
139
  void RLPDataReceived(const ola::acn::TransportHeader &header);
140

141
  void SocketUnhealthy(IPV4SocketAddress peer);
142

143
  void SocketClosed(IPV4SocketAddress peer);
144

145
  DISALLOW_COPY_AND_ASSIGN(SimpleE133Controller);
146
};
147

148
SimpleE133Controller::SimpleE133Controller(const Options &options)
×
149
    : m_listen_address(options.controller),
×
150
      m_ss(&m_export_map),
×
151
      m_tcp_socket_factory(
×
152
          NewCallback(this, &SimpleE133Controller::OnTCPConnect)),
153
      m_listen_socket(&m_tcp_socket_factory),
×
154
      m_message_builder(ola::acn::CID::Generate(), "E1.33 Controller"),
×
155
      m_root_inflator(
×
156
          NewCallback(this, &SimpleE133Controller::RLPDataReceived)) {
×
157
}
×
158

159
SimpleE133Controller::~SimpleE133Controller() {}
×
160

161
bool SimpleE133Controller::Start() {
×
162
  ola::Clock clock;
×
163
  clock.CurrentMonotonicTime(&m_start_time);
×
164

165
  if (!m_listen_socket.Listen(m_listen_address, FLAGS_listen_backlog)) {
×
166
    return false;
167
  }
168
  OLA_INFO << "Listening on " << m_listen_address;
×
169

170
  m_ss.AddReadDescriptor(&m_listen_socket);
×
171
  m_ss.RegisterRepeatingTimeout(
×
172
      TimeInterval(0, 500000),
×
173
      NewCallback(this, &SimpleE133Controller::PrintStats));
174
  m_ss.Run();
×
175
  m_ss.RemoveReadDescriptor(&m_listen_socket);
×
176
  return true;
177
}
×
178

179
bool SimpleE133Controller::PrintStats() {
×
180
  const TimeStamp *now = m_ss.WakeUpTime();
×
181
  const TimeInterval delay = *now - m_start_time;
×
182
  ola::CounterVariable *ss_iterations = m_export_map.GetCounterVar(
×
183
      "ss-loop-count");
184
  OLA_INFO << delay << "," << m_device_map.size() << ","
×
185
      << ss_iterations->Value();
×
186
  return true;
×
187
}
188

189
void SimpleE133Controller::OnTCPConnect(TCPSocket *socket_ptr) {
×
190
  auto_ptr<TCPSocket> socket(socket_ptr);
×
191

192
  GenericSocketAddress generic_peer = socket->GetPeerAddress();
×
193
  if (generic_peer.Family() != AF_INET) {
×
194
    OLA_WARN << "Unknown family " << generic_peer.Family();
×
195
    return;
×
196
  }
197
  IPV4SocketAddress peer = generic_peer.V4Addr();
×
198

199
  // OLA_INFO << "Received new TCP connection from: " << peer;
200

201
  auto_ptr<DeviceState> device_state(new DeviceState());
×
202
  device_state->in_transport.reset(
×
203
      new IncomingTCPTransport(&m_root_inflator, socket.get()));
×
204

205
  socket->SetOnData(
×
206
      NewCallback(this, &SimpleE133Controller::ReceiveTCPData, peer,
×
207
                  device_state->in_transport.get()));
×
208
  socket->SetOnClose(
×
209
      NewSingleCallback(this, &SimpleE133Controller::SocketClosed, peer));
×
210

211
  device_state->message_queue.reset(
×
212
      new NonBlockingSender(socket.get(), &m_ss, m_message_builder.pool()));
×
213

214
  auto_ptr<E133HealthCheckedConnection> health_checked_connection(
×
215
      new E133HealthCheckedConnection(
216
          &m_message_builder,
217
          device_state->message_queue.get(),
×
218
          NewSingleCallback(this, &SimpleE133Controller::SocketUnhealthy, peer),
×
219
          &m_ss));
×
220

221
  if (!health_checked_connection->Setup()) {
×
222
    OLA_WARN << "Failed to setup heartbeat controller for " << peer;
×
223
    return;
×
224
  }
225

226
  device_state->health_checked_connection.reset(
×
227
    health_checked_connection.release());
228
  device_state->socket.reset(socket.release());
×
229

230
  m_ss.AddReadDescriptor(socket_ptr);
×
231

232
  std::pair<DeviceMap::iterator, bool> p = m_device_map.insert(
×
233
      std::pair<IPV4SocketAddress, DeviceState*>(peer, NULL));
×
234
  if (!p.second) {
×
235
    OLA_WARN << "Peer " << peer << " is already connected! This is a bug";
×
236
    delete p.first->second;
×
237
  }
238
  p.first->second = device_state.release();
×
239

240
  if (m_device_map.size() == FLAGS_expected_devices) {
×
241
    ola::Clock clock;
×
242
    TimeStamp now;
×
243
    clock.CurrentMonotonicTime(&now);
×
244
    OLA_INFO << FLAGS_expected_devices << " connected in "
×
245
             << (now - m_start_time);
×
246
    if (FLAGS_stop_after_all_devices) {
×
247
      m_ss.Terminate();
×
248
    }
249
  }
×
250
}
×
251

252
void SimpleE133Controller::ReceiveTCPData(IPV4SocketAddress peer,
×
253
                                          IncomingTCPTransport *transport) {
254
  if (!transport->Receive()) {
×
255
    OLA_WARN << "TCP STREAM IS BAD!!!";
×
256
    SocketClosed(peer);
×
257
  }
258
}
×
259

260
void SimpleE133Controller::RLPDataReceived(
×
261
    const ola::acn::TransportHeader &header) {
262
  if (header.Transport() != ola::acn::TransportHeader::TCP)
×
263
    return;
264

265
  DeviceState *device_state = STLFindOrNull(m_device_map, header.Source());
×
266

267
  if (!device_state) {
×
268
    OLA_FATAL << "Received data but unable to lookup socket for "
×
269
        << header.Source();
×
270
    return;
×
271
  }
272

273
  device_state->health_checked_connection->HeartbeatReceived();
×
274
}
275

276
void SimpleE133Controller::SocketUnhealthy(IPV4SocketAddress peer) {
×
277
  OLA_INFO << "connection to " << peer << " went unhealthy";
×
278
  SocketClosed(peer);
×
279
}
×
280

281
void SimpleE133Controller::SocketClosed(IPV4SocketAddress peer) {
×
282
  OLA_INFO << "Connection to " << peer << " was closed";
×
283

284
  auto_ptr<DeviceState> device(ola::STLLookupAndRemovePtr(&m_device_map, peer));
×
285

286
  if (!device.get()) {
×
287
    OLA_WARN << "Can't find device entry";
×
288
    return;
×
289
  }
290

291
  m_ss.RemoveReadDescriptor(device->socket.get());
×
292
}
×
293

294
/**
295
 * Interrupt handler
296
 */
297
static void InteruptSignal(OLA_UNUSED int signo) {
×
298
  int old_errno = errno;
×
299
  if (controller) {
×
300
    controller->Stop();
×
301
  }
302
  errno = old_errno;
×
303
}
×
304

305
int main(int argc, char *argv[]) {
×
306
  ola::SetHelpString("[options]", "Simple E1.33 Controller.");
×
307
  ola::ParseFlags(&argc, argv);
×
308
  ola::InitLoggingFromFlags();
×
309

310
  // Convert the controller's IP address
311
  IPV4Address controller_ip;
×
312
  if (!FLAGS_listen_ip.str().empty() &&
×
313
      !IPV4Address::FromString(FLAGS_listen_ip, &controller_ip)) {
×
314
    ola::DisplayUsage();
×
315
    exit(ola::EXIT_USAGE);
×
316
  }
317

318
  ola::InstallSignal(SIGINT, InteruptSignal);
×
319
  controller = new SimpleE133Controller(
×
320
      SimpleE133Controller::Options(
×
321
          IPV4SocketAddress(controller_ip, FLAGS_listen_port)));
×
322
  controller->Start();
×
323
  delete controller;
×
324
  controller = NULL;
×
325
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc