• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zeromq / cppzmq / 5659739096

pending completion
5659739096

Pull #606

github

web-flow
Merge 2533a7e7a into f9f6b79e1
Pull Request #606: Add file descriptor support for poller

43 of 43 new or added lines in 2 files covered. (100.0%)

845 of 979 relevant lines covered (86.31%)

23.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.02
/zmq_addon.hpp
1
/*
2
    Copyright (c) 2016-2017 ZeroMQ community
3
    Copyright (c) 2016 VOCA AS / Harald Nøkland
4

5
    Permission is hereby granted, free of charge, to any person obtaining a copy
6
    of this software and associated documentation files (the "Software"), to
7
    deal in the Software without restriction, including without limitation the
8
    rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9
    sell copies of the Software, and to permit persons to whom the Software is
10
    furnished to do so, subject to the following conditions:
11

12
    The above copyright notice and this permission notice shall be included in
13
    all copies or substantial portions of the Software.
14

15
    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20
    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21
    IN THE SOFTWARE.
22
*/
23

24
#ifndef __ZMQ_ADDON_HPP_INCLUDED__
25
#define __ZMQ_ADDON_HPP_INCLUDED__
26

27
#include "zmq.hpp"
28

29
#include <deque>
30
#include <iomanip>
31
#include <sstream>
32
#include <stdexcept>
33
#ifdef ZMQ_CPP11
34
#include <limits>
35
#include <functional>
36
#include <unordered_map>
37

38
namespace zmq
39
{
40
        // socket ref or native file descriptor for poller
41
        class poller_ref_t
42
        {
43
        public:
44
                enum RefType
45
                {
46
                        RT_SOCKET,
47
                        RT_FD
48
                };
49

50
                poller_ref_t() : poller_ref_t(socket_ref{})
51
                {}
52

53
                poller_ref_t(const zmq::socket_ref& socket) : data{RT_SOCKET, socket, {}}
46✔
54
                {}
46✔
55

56
                poller_ref_t(zmq::fd_t fd) : data{RT_FD, {}, fd}
4✔
57
                {}
4✔
58

59
                size_t hash() const ZMQ_NOTHROW        
74✔
60
                {
61
                        std::size_t h = 0;
74✔
62
                        hash_combine(h, std::get<0>(data));
74✔
63
                hash_combine(h, std::get<1>(data));
74✔
64
                hash_combine(h, std::get<2>(data));
74✔
65
                        return h;
74✔
66
                }
67

68
                bool operator == (const poller_ref_t& o) const ZMQ_NOTHROW
20✔
69
                {
70
                        return data == o.data;
20✔
71
                }
72

73
        private:
74
                template <class T>
75
                static void hash_combine(std::size_t& seed, const T& v) ZMQ_NOTHROW
222✔
76
                {
77
                    std::hash<T> hasher;
78
                    seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
222✔
79
                }
222✔
80

81
                std::tuple<int, zmq::socket_ref, zmq::fd_t> data;
82

83
        }; // class poller_ref_t
84

85
} // namespace zmq
86

87
// std::hash<> specialization for std::unordered_map
88
template <> struct std::hash<zmq::poller_ref_t>
89
{
90
        size_t operator()(const zmq::poller_ref_t& ref) const ZMQ_NOTHROW
74✔
91
        {
92
                return ref.hash();
74✔
93
        }
94
};
95
#endif //  ZMQ_CPP11
96

97
namespace zmq
98
{
99
#ifdef ZMQ_CPP11
100

101
namespace detail
102
{
103
template<bool CheckN, class OutputIt>
104
recv_result_t
105
recv_multipart_n(socket_ref s, OutputIt out, size_t n, recv_flags flags)
17✔
106
{
107
    size_t msg_count = 0;
17✔
108
    message_t msg;
34✔
109
    while (true) {
6✔
110
        if ZMQ_CONSTEXPR_IF (CheckN) {
111
            if (msg_count >= n)
10✔
112
                throw std::runtime_error(
113
                  "Too many message parts in recv_multipart_n");
2✔
114
        }
115
        if (!s.recv(msg, flags)) {
21✔
116
            // zmq ensures atomic delivery of messages
117
            assert(msg_count == 0);
4✔
118
            return {};
4✔
119
        }
120
        ++msg_count;
15✔
121
        const bool more = msg.more();
15✔
122
        *out++ = std::move(msg);
15✔
123
        if (!more)
15✔
124
            break;
9✔
125
    }
126
    return msg_count;
9✔
127
}
128

129
inline bool is_little_endian()
2✔
130
{
131
    const uint16_t i = 0x01;
2✔
132
    return *reinterpret_cast<const uint8_t *>(&i) == 0x01;
2✔
133
}
134

135
inline void write_network_order(unsigned char *buf, const uint32_t value)
1✔
136
{
137
    if (is_little_endian()) {
1✔
138
        ZMQ_CONSTEXPR_VAR uint32_t mask = (std::numeric_limits<std::uint8_t>::max)();
1✔
139
        *buf++ = static_cast<unsigned char>((value >> 24) & mask);
1✔
140
        *buf++ = static_cast<unsigned char>((value >> 16) & mask);
1✔
141
        *buf++ = static_cast<unsigned char>((value >> 8) & mask);
1✔
142
        *buf++ = static_cast<unsigned char>(value & mask);
1✔
143
    } else {
144
        std::memcpy(buf, &value, sizeof(value));
×
145
    }
146
}
1✔
147

148
inline uint32_t read_u32_network_order(const unsigned char *buf)
1✔
149
{
150
    if (is_little_endian()) {
1✔
151
        return (static_cast<uint32_t>(buf[0]) << 24)
1✔
152
               + (static_cast<uint32_t>(buf[1]) << 16)
1✔
153
               + (static_cast<uint32_t>(buf[2]) << 8)
1✔
154
               + static_cast<uint32_t>(buf[3]);
1✔
155
    } else {
156
        uint32_t value;
157
        std::memcpy(&value, buf, sizeof(value));
×
158
        return value;
×
159
    }
160
}
161
} // namespace detail
162

163
/*  Receive a multipart message.
164
    
165
    Writes the zmq::message_t objects to OutputIterator out.
166
    The out iterator must handle an unspecified number of writes,
167
    e.g. by using std::back_inserter.
168
    
169
    Returns: the number of messages received or nullopt (on EAGAIN).
170
    Throws: if recv throws. Any exceptions thrown
171
    by the out iterator will be propagated and the message
172
    may have been only partially received with pending
173
    message parts. It is adviced to close this socket in that event.
174
*/
175
template<class OutputIt>
176
ZMQ_NODISCARD recv_result_t recv_multipart(socket_ref s,
9✔
177
                                           OutputIt out,
178
                                           recv_flags flags = recv_flags::none)
179
{
180
    return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
9✔
181
}
182

183
/*  Receive a multipart message.
184
    
185
    Writes at most n zmq::message_t objects to OutputIterator out.
186
    If the number of message parts of the incoming message exceeds n
187
    then an exception will be thrown.
188
    
189
    Returns: the number of messages received or nullopt (on EAGAIN).
190
    Throws: if recv throws. Throws std::runtime_error if the number
191
    of message parts exceeds n (exactly n messages will have been written
192
    to out). Any exceptions thrown
193
    by the out iterator will be propagated and the message
194
    may have been only partially received with pending
195
    message parts. It is adviced to close this socket in that event.
196
*/
197
template<class OutputIt>
198
ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
8✔
199
                                             OutputIt out,
200
                                             size_t n,
201
                                             recv_flags flags = recv_flags::none)
202
{
203
    return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
8✔
204
}
205

206
/*  Send a multipart message.
207
    
208
    The range must be a ForwardRange of zmq::message_t,
209
    zmq::const_buffer or zmq::mutable_buffer.
210
    The flags may be zmq::send_flags::sndmore if there are 
211
    more message parts to be sent after the call to this function.
212
    
213
    Returns: the number of messages sent (exactly msgs.size()) or nullopt (on EAGAIN).
214
    Throws: if send throws. Any exceptions thrown
215
    by the msgs range will be propagated and the message
216
    may have been only partially sent. It is adviced to close this socket in that event.
217
*/
218
template<class Range
219
#ifndef ZMQ_CPP11_PARTIAL
220
         ,
221
         typename = typename std::enable_if<
222
           detail::is_range<Range>::value
223
           && (std::is_same<detail::range_value_t<Range>, message_t>::value
224
               || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
225
#endif
226
         >
227
send_result_t
228
send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
11✔
229
{
230
    using std::begin;
231
    using std::end;
232
    auto it = begin(msgs);
11✔
233
    const auto end_it = end(msgs);
11✔
234
    size_t msg_count = 0;
11✔
235
    while (it != end_it) {
26✔
236
        const auto next = std::next(it);
17✔
237
        const auto msg_flags =
238
          flags | (next == end_it ? send_flags::none : send_flags::sndmore);
17✔
239
        if (!s.send(*it, msg_flags)) {
17✔
240
            // zmq ensures atomic delivery of messages
241
            assert(it == begin(msgs));
1✔
242
            return {};
1✔
243
        }
244
        ++msg_count;
15✔
245
        it = next;
15✔
246
    }
247
    return msg_count;
9✔
248
}
249

250
/* Encode a multipart message.
251

252
   The range must be a ForwardRange of zmq::message_t.  A
253
   zmq::multipart_t or STL container may be passed for encoding.
254

255
   Returns: a zmq::message_t holding the encoded multipart data.
256

257
   Throws: std::range_error is thrown if the size of any single part
258
   can not fit in an unsigned 32 bit integer.
259

260
   The encoding is compatible with that used by the CZMQ function
261
   zmsg_encode(), see https://rfc.zeromq.org/spec/50/.
262
   Each part consists of a size followed by the data.
263
   These are placed contiguously into the output message.  A part of
264
   size less than 255 bytes will have a single byte size value.
265
   Larger parts will have a five byte size value with the first byte
266
   set to 0xFF and the remaining four bytes holding the size of the
267
   part's data.
268
*/
269
template<class Range
270
#ifndef ZMQ_CPP11_PARTIAL
271
         ,
272
         typename = typename std::enable_if<
273
           detail::is_range<Range>::value
274
           && (std::is_same<detail::range_value_t<Range>, message_t>::value
275
               || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
276
#endif
277
         >
278
message_t encode(const Range &parts)
9✔
279
{
280
    size_t mmsg_size = 0;
9✔
281

282
    // First pass check sizes
283
    for (const auto &part : parts) {
23✔
284
        const size_t part_size = part.size();
14✔
285
        if (part_size > (std::numeric_limits<std::uint32_t>::max)()) {
14✔
286
            // Size value must fit into uint32_t.
287
            throw std::range_error("Invalid size, message part too large");
×
288
        }
289
        const size_t count_size =
14✔
290
          part_size < (std::numeric_limits<std::uint8_t>::max)() ? 1 : 5;
14✔
291
        mmsg_size += part_size + count_size;
14✔
292
    }
293

294
    message_t encoded(mmsg_size);
9✔
295
    unsigned char *buf = encoded.data<unsigned char>();
9✔
296
    for (const auto &part : parts) {
23✔
297
        const uint32_t part_size = static_cast<uint32_t>(part.size());
14✔
298
        const unsigned char *part_data =
299
          static_cast<const unsigned char *>(part.data());
14✔
300

301
        if (part_size < (std::numeric_limits<std::uint8_t>::max)()) {
14✔
302
            // small part
303
            *buf++ = (unsigned char) part_size;
13✔
304
        } else {
305
            // big part
306
            *buf++ = (std::numeric_limits<uint8_t>::max)();
1✔
307
            detail::write_network_order(buf, part_size);
1✔
308
            buf += sizeof(part_size);
1✔
309
        }
310
        std::memcpy(buf, part_data, part_size);
14✔
311
        buf += part_size;
14✔
312
    }
313

314
    assert(static_cast<size_t>(buf - encoded.data<unsigned char>()) == mmsg_size);
9✔
315
    return encoded;
9✔
316
}
317

318
/*  Decode an encoded message to multiple parts.
319

320
    The given output iterator must be a ForwardIterator to a container
321
    holding zmq::message_t such as a zmq::multipart_t or various STL
322
    containers.
323

324
    Returns the ForwardIterator advanced once past the last decoded
325
    part.
326

327
    Throws: a std::out_of_range is thrown if the encoded part sizes
328
    lead to exceeding the message data bounds.
329

330
    The decoding assumes the message is encoded in the manner
331
    performed by zmq::encode(), see https://rfc.zeromq.org/spec/50/.
332
 */
333
template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
10✔
334
{
335
    const unsigned char *source = encoded.data<unsigned char>();
10✔
336
    const unsigned char *const limit = source + encoded.size();
10✔
337

338
    while (source < limit) {
24✔
339
        size_t part_size = *source++;
16✔
340
        if (part_size == (std::numeric_limits<std::uint8_t>::max)()) {
16✔
341
            if (static_cast<size_t>(limit - source) < sizeof(uint32_t)) {
1✔
342
                throw std::out_of_range(
343
                  "Malformed encoding, overflow in reading size");
×
344
            }
345
            part_size = detail::read_u32_network_order(source);
1✔
346
            // the part size is allowed to be less than 0xFF
347
            source += sizeof(uint32_t);
1✔
348
        }
349

350
        if (static_cast<size_t>(limit - source) < part_size) {
16✔
351
            throw std::out_of_range("Malformed encoding, overflow in reading part");
2✔
352
        }
353
        *out = message_t(source, part_size);
14✔
354
        ++out;
14✔
355
        source += part_size;
14✔
356
    }
357

358
    assert(source == limit);
8✔
359
    return out;
8✔
360
}
361

362
#endif
363

364

365
#ifdef ZMQ_HAS_RVALUE_REFS
366

367
/*
368
    This class handles multipart messaging. It is the C++ equivalent of zmsg.h,
369
    which is part of CZMQ (the high-level C binding). Furthermore, it is a major
370
    improvement compared to zmsg.hpp, which is part of the examples in the ØMQ
371
    Guide. Unnecessary copying is avoided by using move semantics to efficiently
372
    add/remove parts.
373
*/
374
class multipart_t
375
{
376
  private:
377
    std::deque<message_t> m_parts;
378

379
  public:
380
    typedef std::deque<message_t>::value_type value_type;
381

382
    typedef std::deque<message_t>::iterator iterator;
383
    typedef std::deque<message_t>::const_iterator const_iterator;
384

385
    typedef std::deque<message_t>::reverse_iterator reverse_iterator;
386
    typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
387

388
    // Default constructor
389
    multipart_t() {}
16✔
390

391
    // Construct from socket receive
392
    multipart_t(socket_ref socket) { recv(socket); }
1✔
393

394
    // Construct from memory block
395
    multipart_t(const void *src, size_t size) { addmem(src, size); }
1✔
396

397
    // Construct from string
398
    multipart_t(const std::string &string) { addstr(string); }
1✔
399

400
    // Construct from message part
401
    multipart_t(message_t &&message) { add(std::move(message)); }
1✔
402

403
    // Move constructor
404
    multipart_t(multipart_t &&other) ZMQ_NOTHROW { m_parts = std::move(other.m_parts); }
1✔
405

406
    // Move assignment operator
407
    multipart_t &operator=(multipart_t &&other) ZMQ_NOTHROW
408
    {
409
        m_parts = std::move(other.m_parts);
410
        return *this;
411
    }
412

413
    // Destructor
414
    virtual ~multipart_t() { clear(); }
21✔
415

416
    message_t &operator[](size_t n) { return m_parts[n]; }
7✔
417

418
    const message_t &operator[](size_t n) const { return m_parts[n]; }
419

420
    message_t &at(size_t n) { return m_parts.at(n); }
421

422
    const message_t &at(size_t n) const { return m_parts.at(n); }
18✔
423

424
    iterator begin() { return m_parts.begin(); }
425

426
    const_iterator begin() const { return m_parts.begin(); }
12✔
427

428
    const_iterator cbegin() const { return m_parts.cbegin(); }
429

430
    reverse_iterator rbegin() { return m_parts.rbegin(); }
431

432
    const_reverse_iterator rbegin() const { return m_parts.rbegin(); }
433

434
    iterator end() { return m_parts.end(); }
435

436
    const_iterator end() const { return m_parts.end(); }
12✔
437

438
    const_iterator cend() const { return m_parts.cend(); }
439

440
    reverse_iterator rend() { return m_parts.rend(); }
441

442
    const_reverse_iterator rend() const { return m_parts.rend(); }
443

444
    // Delete all parts
445
    void clear() { m_parts.clear(); }
28✔
446

447
    // Get number of parts
448
    size_t size() const { return m_parts.size(); }
79✔
449

450
    // Check if number of parts is zero
451
    bool empty() const { return m_parts.empty(); }
19✔
452

453
    // Receive multipart message from socket
454
    bool recv(socket_ref socket, int flags = 0)
3✔
455
    {
456
        clear();
3✔
457
        bool more = true;
3✔
458
        while (more) {
13✔
459
            message_t message;
10✔
460
#ifdef ZMQ_CPP11
461
            if (!socket.recv(message, static_cast<recv_flags>(flags)))
10✔
462
                return false;
×
463
#else
464
            if (!socket.recv(&message, flags))
465
                return false;
466
#endif
467
            more = message.more();
10✔
468
            add(std::move(message));
10✔
469
        }
470
        return true;
3✔
471
    }
472

473
    // Send multipart message to socket
474
    bool send(socket_ref socket, int flags = 0)
3✔
475
    {
476
        flags &= ~(ZMQ_SNDMORE);
3✔
477
        bool more = size() > 0;
3✔
478
        while (more) {
13✔
479
            message_t message = pop();
10✔
480
            more = size() > 0;
10✔
481
#ifdef ZMQ_CPP11
482
            if (!socket.send(message, static_cast<send_flags>(
10✔
483
                                        (more ? ZMQ_SNDMORE : 0) | flags)))
10✔
484
                return false;
×
485
#else
486
            if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
487
                return false;
488
#endif
489
        }
490
        clear();
3✔
491
        return true;
3✔
492
    }
493

494
    // Concatenate other multipart to front
495
    void prepend(multipart_t &&other)
3✔
496
    {
497
        while (!other.empty())
3✔
498
            push(other.remove());
2✔
499
    }
1✔
500

501
    // Concatenate other multipart to back
502
    void append(multipart_t &&other)
5✔
503
    {
504
        while (!other.empty())
5✔
505
            add(other.pop());
3✔
506
    }
2✔
507

508
    // Push memory block to front
509
    void pushmem(const void *src, size_t size)
2✔
510
    {
511
        m_parts.push_front(message_t(src, size));
2✔
512
    }
2✔
513

514
    // Push memory block to back
515
    void addmem(const void *src, size_t size)
5✔
516
    {
517
        m_parts.push_back(message_t(src, size));
5✔
518
    }
5✔
519

520
    // Push string to front
521
    void pushstr(const std::string &string)
3✔
522
    {
523
        m_parts.push_front(message_t(string.data(), string.size()));
3✔
524
    }
3✔
525

526
    // Push string to back
527
    void addstr(const std::string &string)
13✔
528
    {
529
        m_parts.push_back(message_t(string.data(), string.size()));
13✔
530
    }
13✔
531

532
    // Push type (fixed-size) to front
533
    template<typename T> void pushtyp(const T &type)
1✔
534
    {
535
        static_assert(!std::is_same<T, std::string>::value,
536
                      "Use pushstr() instead of pushtyp<std::string>()");
537
        m_parts.push_front(message_t(&type, sizeof(type)));
1✔
538
    }
1✔
539

540
    // Push type (fixed-size) to back
541
    template<typename T> void addtyp(const T &type)
2✔
542
    {
543
        static_assert(!std::is_same<T, std::string>::value,
544
                      "Use addstr() instead of addtyp<std::string>()");
545
        m_parts.push_back(message_t(&type, sizeof(type)));
2✔
546
    }
2✔
547

548
    // Push message part to front
549
    void push(message_t &&message) { m_parts.push_front(std::move(message)); }
4✔
550

551
    // Push message part to back
552
    void add(message_t &&message) { m_parts.push_back(std::move(message)); }
15✔
553

554
    // Alias to allow std::back_inserter()
555
    void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
8✔
556

557
    // Pop string from front
558
    std::string popstr()
11✔
559
    {
560
        std::string string(m_parts.front().data<char>(), m_parts.front().size());
11✔
561
        m_parts.pop_front();
11✔
562
        return string;
11✔
563
    }
564

565
    // Pop type (fixed-size) from front
566
    template<typename T> T poptyp()
3✔
567
    {
568
        static_assert(!std::is_same<T, std::string>::value,
569
                      "Use popstr() instead of poptyp<std::string>()");
570
        if (sizeof(T) != m_parts.front().size())
3✔
571
            throw std::runtime_error(
572
              "Invalid type, size does not match the message size");
×
573
        T type = *m_parts.front().data<T>();
3✔
574
        m_parts.pop_front();
3✔
575
        return type;
3✔
576
    }
577

578
    // Pop message part from front
579
    message_t pop()
16✔
580
    {
581
        message_t message = std::move(m_parts.front());
16✔
582
        m_parts.pop_front();
16✔
583
        return message;
16✔
584
    }
585

586
    // Pop message part from back
587
    message_t remove()
3✔
588
    {
589
        message_t message = std::move(m_parts.back());
3✔
590
        m_parts.pop_back();
3✔
591
        return message;
3✔
592
    }
593

594
    // get message part from front
595
    const message_t &front() { return m_parts.front(); }
1✔
596

597
    // get message part from back
598
    const message_t &back() { return m_parts.back(); }
1✔
599

600
    // Get pointer to a specific message part
601
    const message_t *peek(size_t index) const { return &m_parts[index]; }
602

603
    // Get a string copy of a specific message part
604
    std::string peekstr(size_t index) const
605
    {
606
        std::string string(m_parts[index].data<char>(), m_parts[index].size());
607
        return string;
608
    }
609

610
    // Peek type (fixed-size) from front
611
    template<typename T> T peektyp(size_t index) const
612
    {
613
        static_assert(!std::is_same<T, std::string>::value,
614
                      "Use peekstr() instead of peektyp<std::string>()");
615
        if (sizeof(T) != m_parts[index].size())
616
            throw std::runtime_error(
617
              "Invalid type, size does not match the message size");
618
        T type = *m_parts[index].data<T>();
619
        return type;
620
    }
621

622
    // Create multipart from type (fixed-size)
623
    template<typename T> static multipart_t create(const T &type)
1✔
624
    {
625
        multipart_t multipart;
1✔
626
        multipart.addtyp(type);
1✔
627
        return multipart;
1✔
628
    }
629

630
    // Copy multipart
631
    multipart_t clone() const
1✔
632
    {
633
        multipart_t multipart;
1✔
634
        for (size_t i = 0; i < size(); i++)
4✔
635
            multipart.addmem(m_parts[i].data(), m_parts[i].size());
3✔
636
        return multipart;
1✔
637
    }
638

639
    // Dump content to string
640
    std::string str() const
641
    {
642
        std::stringstream ss;
643
        for (size_t i = 0; i < m_parts.size(); i++) {
644
            const unsigned char *data = m_parts[i].data<unsigned char>();
645
            size_t size = m_parts[i].size();
646

647
            // Dump the message as text or binary
648
            bool isText = true;
649
            for (size_t j = 0; j < size; j++) {
650
                if (data[j] < 32 || data[j] > 127) {
651
                    isText = false;
652
                    break;
653
                }
654
            }
655
            ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size
656
               << "] ";
657
            if (size >= 1000) {
658
                ss << "... (too big to print)";
659
                continue;
660
            }
661
            for (size_t j = 0; j < size; j++) {
662
                if (isText)
663
                    ss << static_cast<char>(data[j]);
664
                else
665
                    ss << std::hex << std::setw(2) << std::setfill('0')
666
                       << static_cast<short>(data[j]);
667
            }
668
        }
669
        return ss.str();
670
    }
671

672
    // Check if equal to other multipart
673
    bool equal(const multipart_t *other) const ZMQ_NOTHROW
1✔
674
    {
675
        return *this == *other;
1✔
676
    }
677

678
    bool operator==(const multipart_t &other) const ZMQ_NOTHROW
7✔
679
    {
680
        if (size() != other.size())
7✔
681
            return false;
2✔
682
        for (size_t i = 0; i < size(); i++)
14✔
683
            if (at(i) != other.at(i))
9✔
684
                return false;
×
685
        return true;
5✔
686
    }
687

688
    bool operator!=(const multipart_t &other) const ZMQ_NOTHROW
2✔
689
    {
690
        return !(*this == other);
2✔
691
    }
692

693
#ifdef ZMQ_CPP11
694

695
    // Return single part message_t encoded from this multipart_t.
696
    message_t encode() const { return zmq::encode(*this); }
4✔
697

698
    // Decode encoded message into multiple parts and append to self.
699
    void decode_append(const message_t &encoded)
3✔
700
    {
701
        zmq::decode(encoded, std::back_inserter(*this));
3✔
702
    }
3✔
703

704
    // Return a new multipart_t containing the decoded message_t.
705
    static multipart_t decode(const message_t &encoded)
3✔
706
    {
707
        multipart_t tmp;
3✔
708
        zmq::decode(encoded, std::back_inserter(tmp));
3✔
709
        return tmp;
1✔
710
    }
711

712
#endif
713

714
  private:
715
    // Disable implicit copying (moving is more efficient)
716
    multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
717
    void operator=(const multipart_t &other) ZMQ_DELETED_FUNCTION;
718
}; // class multipart_t
719

720
inline std::ostream &operator<<(std::ostream &os, const multipart_t &msg)
721
{
722
    return os << msg.str();
723
}
724

725
#endif // ZMQ_HAS_RVALUE_REFS
726

727
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
728
class active_poller_t
3✔
729
{
730
  public:
731
    active_poller_t() = default;
34✔
732
    ~active_poller_t() = default;
38✔
733

734
    active_poller_t(const active_poller_t &) = delete;
735
    active_poller_t &operator=(const active_poller_t &) = delete;
736

737
    active_poller_t(active_poller_t &&src) = default;
4✔
738
    active_poller_t &operator=(active_poller_t &&src) = default;
739

740
    using handler_type = std::function<void(event_flags)>;
741

742
    void add(zmq::socket_ref socket, event_flags events, handler_type handler)
33✔
743
    {
744
        const poller_ref_t ref{socket};
33✔
745

746
        if (!handler)
33✔
747
            throw std::invalid_argument("null handler in active_poller_t::add (socket)");
1✔
748
        auto ret = handlers.emplace(
749
          ref, std::make_shared<handler_type>(std::move(handler)));
32✔
750
        if (!ret.second)
32✔
751
            throw error_t(EINVAL); // already added
1✔
752
        try {
753
            base_poller.add(socket, events, ret.first->second.get());
31✔
754
            need_rebuild = true;
29✔
755
        }
756
        catch (...) {
4✔
757
            // rollback
758
            handlers.erase(ref);
2✔
759
            throw;
2✔
760
        }
761
    }
29✔
762

763
    void add(fd_t fd, event_flags events, handler_type handler)
3✔
764
    {
765
        const poller_ref_t ref{fd};
3✔
766

767
        if (!handler)
3✔
768
            throw std::invalid_argument("null handler in active_poller_t::add (fd)");
×
769
        auto ret = handlers.emplace(
770
          ref, std::make_shared<handler_type>(std::move(handler)));
3✔
771
        if (!ret.second)
3✔
772
            throw error_t(EINVAL); // already added
×
773
        try {
774
            base_poller.add(fd, events, ret.first->second.get());
3✔
775
            need_rebuild = true;
3✔
776
        }
777
        catch (...) {
×
778
            // rollback
779
            handlers.erase(ref);
×
780
            throw;
×
781
        }
782
    }
3✔
783

784
    void remove(zmq::socket_ref socket)
15✔
785
    {
786
        base_poller.remove(socket);
15✔
787
        handlers.erase(socket);
13✔
788
        need_rebuild = true;
13✔
789
    }
13✔
790

791
    void remove(fd_t fd)
2✔
792
    {
793
        base_poller.remove(fd);
2✔
794
        handlers.erase(fd);
1✔
795
        need_rebuild = true;
1✔
796
    }
1✔
797

798
    void modify(zmq::socket_ref socket, event_flags events)
5✔
799
    {
800
        base_poller.modify(socket, events);
5✔
801
    }
2✔
802

803
    void modify(fd_t fd, event_flags events)
804
    {
805
        base_poller.modify(fd, events);
806
    }
807

808
    size_t wait(std::chrono::milliseconds timeout)
18✔
809
    {
810
        if (need_rebuild) {
18✔
811
            poller_events.resize(handlers.size());
14✔
812
            poller_handlers.clear();
14✔
813
            poller_handlers.reserve(handlers.size());
14✔
814
            for (const auto &handler : handlers) {
36✔
815
                poller_handlers.push_back(handler.second);
22✔
816
            }
817
            need_rebuild = false;
14✔
818
        }
819
        const auto count = base_poller.wait_all(poller_events, timeout);
18✔
820
        std::for_each(poller_events.begin(),
821
                      poller_events.begin() + static_cast<ptrdiff_t>(count),
30✔
822
                      [](decltype(base_poller)::event_type &event) {
24✔
823
                          assert(event.user_data != nullptr);
24✔
824
                          (*event.user_data)(event.events);
24✔
825
                      });
15✔
826
        return count;
15✔
827
    }
828

829
    ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
14✔
830

831
    size_t size() const noexcept { return handlers.size(); }
28✔
832

833
  private:
834
    bool need_rebuild{false};
835

836
    poller_t<handler_type> base_poller{};
837

838
    std::unordered_map<zmq::poller_ref_t, std::shared_ptr<handler_type>> handlers{};
839

840
    std::vector<decltype(base_poller)::event_type> poller_events{};
841
    std::vector<std::shared_ptr<handler_type>> poller_handlers{};
842
};     // class active_poller_t
843
#endif //  defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
844

845

846
} // namespace zmq
847

848
#endif // __ZMQ_ADDON_HPP_INCLUDED__
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc