• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenLightingProject / ola / 6370547479

01 Oct 2023 12:26PM UTC coverage: 45.025% (-0.02%) from 45.048%
6370547479

push

github

web-flow
Merge pull request #1898 from peternewman/mac-type-tests

Add the ability to set a universe to blackout instead via ola_set_dmx

7603 of 17646 branches covered (0.0%)

13 of 13 new or added lines in 1 file covered. (100.0%)

21415 of 47562 relevant lines covered (45.03%)

55.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.65
/libs/acn/DMPE131Inflator.cpp
1
/*
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU Library General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
15
 *
16
 * DMPE131Inflator.cpp
17
 * The Inflator for the DMP PDUs
18
 * Copyright (C) 2007 Simon Newton
19
 */
20

21
#include <sys/time.h>
22
#include <algorithm>
23
#include <map>
24
#include <memory>
25
#include <vector>
26
#include "ola/Logging.h"
27
#include "libs/acn/DMPE131Inflator.h"
28
#include "libs/acn/DMPHeader.h"
29
#include "libs/acn/DMPPDU.h"
30

31
namespace ola {
32
namespace acn {
33

34
using ola::Callback0;
35
using ola::acn::CID;
36
using ola::io::OutputStream;
37
using std::map;
38
using std::pair;
39
using std::vector;
40

41
const TimeInterval DMPE131Inflator::EXPIRY_INTERVAL(2500000);
42

43

44
DMPE131Inflator::~DMPE131Inflator() {
2✔
45
  UniverseHandlers::iterator iter;
2✔
46
  for (iter = m_handlers.begin(); iter != m_handlers.end(); ++iter) {
2✔
47
    delete iter->second.closure;
×
48
  }
49
  m_handlers.clear();
2✔
50
}
2✔
51

52

53
/*
54
 * Handle a DMP PDU for E1.31.
55
 */
56
bool DMPE131Inflator::HandlePDUData(uint32_t vector,
×
57
                                    const HeaderSet &headers,
58
                                    const uint8_t *data,
59
                                    unsigned int pdu_len) {
60
  if (vector != ola::acn::DMP_SET_PROPERTY_VECTOR) {
×
61
    OLA_INFO << "not a set property msg: " << vector;
×
62
    return true;
×
63
  }
64

65
  E131Header e131_header = headers.GetE131Header();
×
66
  UniverseHandlers::iterator universe_iter =
×
67
      m_handlers.find(e131_header.Universe());
×
68

69
  if (e131_header.PreviewData() && m_ignore_preview) {
×
70
    OLA_DEBUG << "Ignoring preview data";
×
71
    return true;
×
72
  }
73

74
  if (universe_iter == m_handlers.end()) {
×
75
    return true;
76
  }
77

78
  DMPHeader dmp_header = headers.GetDMPHeader();
×
79

80
  if (!dmp_header.IsVirtual() || dmp_header.IsRelative() ||
×
81
      dmp_header.Size() != TWO_BYTES ||
×
82
      dmp_header.Type() != RANGE_EQUAL) {
×
83
    OLA_INFO << "malformed E1.31 dmp header " << dmp_header.Header();
×
84
    return true;
×
85
  }
86

87
  if (e131_header.Priority() > MAX_E131_PRIORITY) {
×
88
    OLA_INFO << "Priority " << static_cast<int>(e131_header.Priority())
×
89
             << " is greater than the max priority ("
×
90
             << static_cast<int>(MAX_E131_PRIORITY) << "), ignoring data";
×
91
    return true;
×
92
  }
93

94
  unsigned int available_length = pdu_len;
×
95
  std::auto_ptr<const BaseDMPAddress> address(
×
96
      DecodeAddress(dmp_header.Size(),
97
                    dmp_header.Type(),
98
                    data,
99
                    &available_length));
×
100

101
  if (!address.get()) {
×
102
    OLA_INFO << "DMP address parsing failed, the length is probably too small";
×
103
    return true;
×
104
  }
105

106
  if (address->Increment() != 1) {
×
107
    OLA_INFO << "E1.31 DMP packet with increment " << address->Increment()
×
108
             << ", disarding";
×
109
    return true;
×
110
  }
111

112
  unsigned int length_remaining = pdu_len - available_length;
×
113
  int start_code = -1;
×
114
  if (e131_header.UsingRev2()) {
×
115
    start_code = static_cast<int>(address->Start());
×
116
  } else if (length_remaining && address->Number()) {
×
117
    start_code = *(data + available_length);
×
118
  }
119

120
  // The only time we want to continue processing a non-0 start code is if it
121
  // contains a Terminate message.
122
  if (start_code && !e131_header.StreamTerminated()) {
×
123
    OLA_INFO << "Skipping packet with non-0 start code: " << start_code;
×
124
    return true;
×
125
  }
126

127
  DmxBuffer *target_buffer;
×
128
  if (!TrackSourceIfRequired(&universe_iter->second, headers,
×
129
                             &target_buffer)) {
130
    // no need to continue processing
131
    return true;
132
  }
133

134
  // Reaching here means that we actually have new data and we should merge.
135
  if (target_buffer && start_code == 0) {
×
136
    unsigned int channels = std::min(length_remaining, address->Number());
×
137
    if (e131_header.UsingRev2()) {
×
138
      target_buffer->Set(data + available_length, channels);
×
139
    } else {
140
      target_buffer->Set(data + available_length + 1, channels - 1);
×
141
    }
142
  }
143

144
  if (universe_iter->second.priority) {
×
145
    *universe_iter->second.priority = universe_iter->second.active_priority;
×
146
  }
147

148
  // merge the sources
149
  switch (universe_iter->second.sources.size()) {
×
150
    case 0:
×
151
      universe_iter->second.buffer->Reset();
×
152
      break;
153
    case 1:
×
154
      universe_iter->second.buffer->Set(
×
155
          universe_iter->second.sources[0].buffer);
×
156
      universe_iter->second.closure->Run();
×
157
      break;
158
    default:
×
159
      // HTP Merge
160
      universe_iter->second.buffer->Reset();
×
161
      std::vector<dmx_source>::const_iterator source_iter =
×
162
          universe_iter->second.sources.begin();
×
163
      for (; source_iter != universe_iter->second.sources.end();
×
164
           ++source_iter) {
×
165
        universe_iter->second.buffer->HTPMerge(source_iter->buffer);
×
166
      }
167
      universe_iter->second.closure->Run();
×
168
  }
169
  return true;
170
}
×
171

172

173
/*
174
 * Set the closure to be called when we receive data for this universe.
175
 * @param universe the universe to register the handler for
176
 * @param buffer the DmxBuffer to update with the data
177
 * @param handler the Callback0 to call when there is data for this universe.
178
 * Ownership of the closure is transferred to the node.
179
 */
180
bool DMPE131Inflator::SetHandler(uint16_t universe,
×
181
                                 ola::DmxBuffer *buffer,
182
                                 uint8_t *priority,
183
                                 ola::Callback0<void> *closure) {
184
  if (!closure || !buffer) {
×
185
    return false;
186
  }
187

188
  UniverseHandlers::iterator iter = m_handlers.find(universe);
×
189

190
  if (iter == m_handlers.end()) {
×
191
    universe_handler handler;
×
192
    handler.buffer = buffer;
×
193
    handler.closure = closure;
×
194
    handler.active_priority = 0;
×
195
    handler.priority = priority;
×
196
    m_handlers[universe] = handler;
×
197
  } else {
×
198
    Callback0<void> *old_closure = iter->second.closure;
×
199
    iter->second.closure = closure;
×
200
    iter->second.buffer = buffer;
×
201
    iter->second.priority = priority;
×
202
    delete old_closure;
×
203
  }
204
  return true;
205
}
206

207

208
/*
209
 * Remove the handler for this universe
210
 * @param universe the universe handler to remove
211
 * @param true if removed, false if it didn't exist
212
 */
213
bool DMPE131Inflator::RemoveHandler(uint16_t universe) {
×
214
  UniverseHandlers::iterator iter = m_handlers.find(universe);
×
215

216
  if (iter != m_handlers.end()) {
×
217
    Callback0<void> *old_closure = iter->second.closure;
×
218
    m_handlers.erase(iter);
×
219
    delete old_closure;
×
220
    return true;
×
221
  }
222
  return false;
223
}
224

225

226
/**
227
 * Get the list of registered universes
228
 * @param universes a pointer to a vector which is populated with the list of
229
 *   universes that have handlers installed.
230
 */
231
void DMPE131Inflator::RegisteredUniverses(vector<uint16_t> *universes) {
1✔
232
  universes->clear();
1✔
233
  UniverseHandlers::iterator iter;
1✔
234
  for (iter = m_handlers.begin(); iter != m_handlers.end(); ++iter) {
1✔
235
    universes->push_back(iter->first);
×
236
  }
237
}
1✔
238

239

240
/*
241
 * Check if this source is operating at the highest priority for this universe.
242
 * This takes care of tracking all sources for a universe at the active
243
 * priority.
244
 * @param universe_data the universe_handler struct for this universe,
245
 * @param HeaderSet the set of headers in this packet
246
 * @param buffer, if set to a non-NULL pointer, the caller should copy the data
247
 * in the buffer.
248
 * @returns true if we should remerge the data, false otherwise.
249
 */
250
bool DMPE131Inflator::TrackSourceIfRequired(
×
251
    universe_handler *universe_data,
252
    const HeaderSet &headers,
253
    DmxBuffer **buffer) {
254

255
  *buffer = NULL;  // default the buffer to NULL
×
256
  ola::TimeStamp now;
×
257
  m_clock.CurrentMonotonicTime(&now);
×
258
  const E131Header &e131_header = headers.GetE131Header();
×
259
  uint8_t priority = e131_header.Priority();
×
260
  vector<dmx_source> &sources = universe_data->sources;
×
261
  vector<dmx_source>::iterator iter = sources.begin();
×
262

263
  while (iter != sources.end()) {
×
264
    if (iter->cid != headers.GetRootHeader().GetCid()) {
×
265
      TimeStamp expiry_time = iter->last_heard_from + EXPIRY_INTERVAL;
×
266
      if (now > expiry_time) {
×
267
        OLA_INFO << "source " << iter->cid.ToString() << " has expired";
×
268
        iter = sources.erase(iter);
×
269
        continue;
×
270
      }
271
    }
272
    iter++;
×
273
  }
274

275
  if (sources.empty()) {
×
276
    universe_data->active_priority = 0;
×
277
  }
278

279
  for (iter = sources.begin(); iter != sources.end(); ++iter) {
×
280
    if (iter->cid == headers.GetRootHeader().GetCid()) {
×
281
      break;
282
    }
283
  }
284

285
  if (iter == sources.end()) {
×
286
    // This is an untracked source
287
    if (e131_header.StreamTerminated() ||
×
288
        priority < universe_data->active_priority) {
×
289
      return false;
290
    }
291

292
    if (priority > universe_data->active_priority) {
×
293
      OLA_INFO << "Raising priority for universe " << e131_header.Universe()
×
294
               << " from " << static_cast<int>(universe_data->active_priority)
×
295
               << " to " << static_cast<int>(priority);
×
296
      sources.clear();
×
297
      universe_data->active_priority = priority;
×
298
    }
299

300
    if (sources.size() == MAX_MERGE_SOURCES) {
×
301
      // TODO(simon): flag this in the export map
302
      OLA_WARN << "Max merge sources reached for universe "
×
303
               << e131_header.Universe() << ", "
×
304
               << headers.GetRootHeader().GetCid().ToString()
×
305
               << " won't be tracked";
×
306
        return false;
×
307
    } else {
308
      OLA_INFO << "Added new E1.31 source: "
×
309
               << headers.GetRootHeader().GetCid().ToString();
×
310
      dmx_source new_source;
×
311
      new_source.cid = headers.GetRootHeader().GetCid();
×
312
      new_source.sequence = e131_header.Sequence();
×
313
      new_source.last_heard_from = now;
×
314
      iter = sources.insert(sources.end(), new_source);
×
315
      *buffer = &iter->buffer;
×
316
      return true;
×
317
    }
×
318

319
  } else {
320
    // We already know about this one, check the seq #
321
    int8_t seq_diff = static_cast<int8_t>(e131_header.Sequence() -
×
322
                                          iter->sequence);
×
323
    if (seq_diff <= 0 && seq_diff > SEQUENCE_DIFF_THRESHOLD) {
×
324
      OLA_INFO << "Old packet received, ignoring, this # "
×
325
               << static_cast<int>(e131_header.Sequence()) << ", last "
×
326
               << static_cast<int>(iter->sequence);
×
327
      return false;
×
328
    }
329
    iter->sequence = e131_header.Sequence();
×
330

331
    if (e131_header.StreamTerminated()) {
×
332
      OLA_INFO << "CID " << headers.GetRootHeader().GetCid().ToString()
×
333
               << " sent a termination for universe "
×
334
               << e131_header.Universe();
×
335
      sources.erase(iter);
×
336
      if (sources.empty()) {
×
337
        universe_data->active_priority = 0;
×
338
      }
339
      // We need to trigger a merge here else the buffer will be stale, we keep
340
      // the buffer as NULL though so we don't use the data.
341
      return true;
×
342
    }
343

344
    iter->last_heard_from = now;
×
345
    if (priority < universe_data->active_priority) {
×
346
      if (sources.size() == 1) {
×
347
        universe_data->active_priority = priority;
×
348
      } else {
349
        sources.erase(iter);
×
350
        return true;
×
351
      }
352
    } else if (priority > universe_data->active_priority) {
×
353
      // new active priority
354
      universe_data->active_priority = priority;
×
355
      if (sources.size() != 1) {
×
356
        // clear all sources other than this one
357
        dmx_source this_source = *iter;
×
358
        sources.clear();
×
359
        iter = sources.insert(sources.end(), this_source);
×
360
      }
×
361
    }
362
    *buffer = &iter->buffer;
×
363
    return true;
×
364
  }
365
}
366
}  // namespace acn
367
}  // namespace ola
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc