• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

BehaviorTree / BehaviorTree.CPP / 21626879657

03 Feb 2026 10:37AM UTC coverage: 69.604%. Remained the same
21626879657

push

github

facontidavide
Merge branch 'master' of github.com:BehaviorTree/BehaviorTree.CPP

5 of 10 new or added lines in 1 file covered. (50.0%)

172 existing lines in 3 files now uncovered.

3760 of 5402 relevant lines covered (69.6%)

24260.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.08
/src/loggers/groot2_publisher.cpp
1
#include "behaviortree_cpp/loggers/groot2_publisher.h"
2

3
#include "zmq_addon.hpp"
4

5
#include "behaviortree_cpp/loggers/groot2_protocol.h"
6
#include "behaviortree_cpp/xml_parsing.h"
7

8
#include <tuple>
9

10
namespace BT
11
{
12
//------------------------------------------------------
13
std::mutex Groot2Publisher::used_ports_mutex;
14
std::set<unsigned> Groot2Publisher::used_ports;
15

16
enum
17
{
18
  IDLE_FROM_SUCCESS = 10 + static_cast<int>(NodeStatus::SUCCESS),
19
  IDLE_FROM_FAILURE = 10 + static_cast<int>(NodeStatus::FAILURE),
20
  IDLE_FROM_RUNNING = 10 + static_cast<int>(NodeStatus::RUNNING)
21
};
22

23
struct Transition
24
{
25
  // when serializing, we will remove the initial time and serialize only
26
  // 6 bytes, instead of 8
27
  uint64_t timestamp_usec;
28
  // if you have more than 64.000 nodes, you are doing something wrong :)
29
  uint16_t node_uid;
30
  // enough bits to contain NodeStatus
31
  uint8_t status;
32

33
  uint8_t padding[5];
34
};
35

36
namespace
37
{
38
std::array<char, 16> CreateRandomUUID()
10✔
39
{
40
  std::random_device rd;
10✔
41
  std::mt19937 gen(rd());
10✔
42
  std::uniform_int_distribution<uint32_t> dist;
10✔
43
  std::array<char, 16> out{};
10✔
44
  char* bytes = out.data();
10✔
45
  for(int i = 0; i < 16; i += 4)
50✔
46
  {
47
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
48
    *reinterpret_cast<uint32_t*>(bytes + i) = dist(gen);
40✔
49
  }
50
  // variant must be 10xxxxxx
51
  bytes[8] &= static_cast<char>(0xBF);
10✔
52
  bytes[8] |= static_cast<char>(0x80);
10✔
53

54
  // version must be 0100xxxx
55
  bytes[6] &= 0x4F;
10✔
56
  bytes[6] |= 0x40;
10✔
57

58
  return out;
10✔
59
}
10✔
60
}  // namespace
61

62
struct Groot2Publisher::PImpl
63
{
64
  PImpl() : context(), server(context, ZMQ_REP), publisher(context, ZMQ_PUB)
20✔
65
  {
66
    server.set(zmq::sockopt::linger, 0);
10✔
67
    publisher.set(zmq::sockopt::linger, 0);
10✔
68

69
    const int timeout_rcv = 100;
10✔
70
    server.set(zmq::sockopt::rcvtimeo, timeout_rcv);
10✔
71
    publisher.set(zmq::sockopt::rcvtimeo, timeout_rcv);
10✔
72

73
    const int timeout_ms = 1000;
10✔
74
    server.set(zmq::sockopt::sndtimeo, timeout_ms);
10✔
75
    publisher.set(zmq::sockopt::sndtimeo, timeout_ms);
10✔
76
  }
10✔
77

78
  unsigned server_port = 0;
79
  std::string server_address;
80
  std::string publisher_address;
81

82
  std::string tree_xml;
83

84
  std::atomic_bool active_server = true;
85
  std::thread server_thread;
86

87
  std::mutex status_mutex;
88

89
  std::string status_buffer;
90
  // each element of this map points to a character in _p->status_buffer
91
  std::unordered_map<uint16_t, char*> status_buffermap;
92

93
  // weak reference to the tree.
94
  std::unordered_map<std::string, std::weak_ptr<BT::Tree::Subtree>> subtrees;
95
  std::unordered_map<uint16_t, std::weak_ptr<BT::TreeNode>> nodes_by_uid;
96

97
  std::mutex hooks_map_mutex;
98
  std::unordered_map<uint16_t, Monitor::Hook::Ptr> pre_hooks;
99
  std::unordered_map<uint16_t, Monitor::Hook::Ptr> post_hooks;
100

101
  std::mutex last_heartbeat_mutex;
102
  std::chrono::steady_clock::time_point last_heartbeat;
103
  std::chrono::milliseconds max_heartbeat_delay = std::chrono::milliseconds(5000);
10✔
104

105
  std::atomic_bool recording = false;
106
  std::deque<Transition> transitions_buffer;
107
  std::chrono::microseconds recording_fist_time{};
108

109
  std::thread heartbeat_thread;
110

111
  zmq::context_t context;
112
  zmq::socket_t server;
113
  zmq::socket_t publisher;
114
};
115

116
Groot2Publisher::Groot2Publisher(const BT::Tree& tree, unsigned server_port)
10✔
117
  : StatusChangeLogger(tree.rootNode()), _p(new PImpl())
10✔
118
{
119
  _p->server_port = server_port;
10✔
120

121
  {
122
    const std::unique_lock<std::mutex> lk(Groot2Publisher::used_ports_mutex);
10✔
123
    if(Groot2Publisher::used_ports.count(server_port) != 0 ||
20✔
124
       Groot2Publisher::used_ports.count(server_port + 1) != 0)
20✔
125
    {
126
      auto msg = StrCat("Another instance of Groot2Publisher is using port ",
×
127
                        std::to_string(server_port));
×
128
      throw LogicError(msg);
×
129
    }
×
130
    Groot2Publisher::used_ports.insert(server_port);
10✔
131
    Groot2Publisher::used_ports.insert(server_port + 1);
10✔
132
  }
10✔
133

134
  _p->tree_xml = WriteTreeToXML(tree, true, true);
10✔
135

136
  //-------------------------------
137
  // Prepare the status buffer
138
  size_t node_count = 0;
10✔
139
  for(const auto& subtree : tree.subtrees)
20✔
140
  {
141
    node_count += subtree->nodes.size();
10✔
142
  }
143
  _p->status_buffer.resize(3 * node_count);
10✔
144

145
  unsigned ptr_offset = 0;
10✔
146
  char* buffer_ptr = _p->status_buffer.data();
10✔
147

148
  for(const auto& subtree : tree.subtrees)
20✔
149
  {
150
    auto name =
151
        subtree->instance_name.empty() ? subtree->tree_ID : subtree->instance_name;
10✔
152
    _p->subtrees.insert({ name, subtree });
10✔
153

154
    for(const auto& node : subtree->nodes)
22✔
155
    {
156
      _p->nodes_by_uid.insert({ node->UID(), node });
12✔
157

158
      ptr_offset += Monitor::Serialize(buffer_ptr, ptr_offset, node->UID());
12✔
159
      _p->status_buffermap.insert({ node->UID(), buffer_ptr + ptr_offset });
12✔
160
      ptr_offset += Monitor::Serialize(buffer_ptr, ptr_offset, uint8_t(NodeStatus::IDLE));
12✔
161
    }
162
  }
10✔
163
  //-------------------------------
164
  _p->server_address = StrCat("tcp://*:", std::to_string(server_port));
10✔
165
  _p->publisher_address = StrCat("tcp://*:", std::to_string(server_port + 1));
10✔
166

167
  _p->server.bind(_p->server_address.c_str());
10✔
168
  _p->publisher.bind(_p->publisher_address.c_str());
10✔
169

170
  _p->server_thread = std::thread(&Groot2Publisher::serverLoop, this);
10✔
171
  _p->heartbeat_thread = std::thread(&Groot2Publisher::heartbeatLoop, this);
10✔
172
}
10✔
173

174
void Groot2Publisher::setMaxHeartbeatDelay(std::chrono::milliseconds delay)
×
175
{
176
  _p->max_heartbeat_delay = delay;
×
177
}
×
178

179
std::chrono::milliseconds Groot2Publisher::maxHeartbeatDelay() const
×
180
{
181
  return _p->max_heartbeat_delay;
×
182
}
183

184
Groot2Publisher::~Groot2Publisher()
10✔
185
{
186
  // First, signal threads to stop
187
  _p->active_server = false;
10✔
188

189
  // Shutdown the ZMQ context to unblock any recv() calls immediately.
190
  // This prevents waiting for the recv timeout (100ms) before threads can exit.
191
  // Context shutdown will cause all blocking operations to return with ETERM error.
192
  _p->context.shutdown();
10✔
193

194
  // Now join the threads - they should exit quickly
195
  if(_p->server_thread.joinable())
10✔
196
  {
197
    _p->server_thread.join();
10✔
198
  }
199

200
  if(_p->heartbeat_thread.joinable())
10✔
201
  {
202
    _p->heartbeat_thread.join();
10✔
203
  }
204

205
  // Remove hooks after threads are stopped to avoid race conditions
206
  removeAllHooks();
10✔
207

208
  flush();
10✔
209

210
  {
211
    const std::unique_lock<std::mutex> lk(Groot2Publisher::used_ports_mutex);
10✔
212
    Groot2Publisher::used_ports.erase(_p->server_port);
10✔
213
    Groot2Publisher::used_ports.erase(_p->server_port + 1);
10✔
214
  }
10✔
215
}
10✔
216

217
void Groot2Publisher::callback(Duration ts, const TreeNode& node, NodeStatus prev_status,
12✔
218
                               NodeStatus new_status)
219
{
220
  const std::unique_lock<std::mutex> lk(_p->status_mutex);
12✔
221
  auto status = static_cast<char>(new_status);
12✔
222

223
  if(new_status == NodeStatus::IDLE)
12✔
224
  {
UNCOV
225
    status = static_cast<char>(10 + static_cast<int>(prev_status));
×
226
  }
227
  *(_p->status_buffermap.at(node.UID())) = status;
12✔
228

229
  if(_p->recording)
12✔
230
  {
UNCOV
231
    Transition trans{};
×
UNCOV
232
    trans.node_uid = node.UID();
×
UNCOV
233
    trans.status = static_cast<uint8_t>(new_status);
×
234
    auto timestamp = ts - _p->recording_fist_time;
×
UNCOV
235
    trans.timestamp_usec =
×
UNCOV
236
        std::chrono::duration_cast<std::chrono::microseconds>(timestamp).count();
×
UNCOV
237
    _p->transitions_buffer.push_back(trans);
×
UNCOV
238
    while(_p->transitions_buffer.size() > 1000)
×
239
    {
240
      _p->transitions_buffer.pop_front();
×
241
    }
242
  }
243
}
12✔
244

245
void Groot2Publisher::flush()
10✔
246
{
247
  // nothing to do here...
248
}
10✔
249

250
void Groot2Publisher::serverLoop()
10✔
251
{
252
  auto const serialized_uuid = CreateRandomUUID();
10✔
253

254
  auto& socket = _p->server;
10✔
255

UNCOV
256
  auto sendErrorReply = [&socket](const std::string& msg) {
×
UNCOV
257
    zmq::multipart_t error_msg;
×
UNCOV
258
    error_msg.addstr("error");
×
UNCOV
259
    error_msg.addstr(msg);
×
UNCOV
260
    error_msg.send(socket);
×
UNCOV
261
  };
×
262

263
  // initialize _p->last_heartbeat
264
  {
265
    const std::unique_lock lk(_p->last_heartbeat_mutex);
10✔
266
    _p->last_heartbeat = std::chrono::steady_clock::now();
10✔
267
  }
10✔
268

269
  while(_p->active_server)
10✔
270
  {
271
    zmq::multipart_t requestMsg;
10✔
272
    try
273
    {
274
      if(!requestMsg.recv(socket) || requestMsg.size() == 0)
10✔
275
      {
NEW
276
        continue;
×
277
      }
278
    }
279
    catch(const zmq::error_t&)
10✔
280
    {
281
      // Context was terminated or socket error - exit the loop
282
      break;
10✔
283
    }
10✔
284

285
    // this heartbeat will help establishing if Groot is connected or not
286
    {
UNCOV
287
      const std::unique_lock lk(_p->last_heartbeat_mutex);
×
UNCOV
288
      _p->last_heartbeat = std::chrono::steady_clock::now();
×
UNCOV
289
    }
×
290

UNCOV
291
    std::string const request_str = requestMsg[0].to_string();
×
UNCOV
292
    if(request_str.size() != Monitor::RequestHeader::size())
×
293
    {
UNCOV
294
      sendErrorReply("wrong request header");
×
UNCOV
295
      continue;
×
296
    }
297

UNCOV
298
    auto request_header = Monitor::DeserializeRequestHeader(request_str);
×
299

UNCOV
300
    Monitor::ReplyHeader reply_header;
×
UNCOV
301
    reply_header.request = request_header;
×
UNCOV
302
    reply_header.request.protocol = Monitor::kProtocolID;
×
UNCOV
303
    reply_header.tree_id = serialized_uuid;
×
304

305
    zmq::multipart_t reply_msg;
×
306
    reply_msg.addstr(Monitor::SerializeHeader(reply_header));
×
307

308
    switch(request_header.type)
×
309
    {
UNCOV
310
      case Monitor::RequestType::FULLTREE: {
×
311
        reply_msg.addstr(_p->tree_xml);
×
312
      }
UNCOV
313
      break;
×
314

315
      case Monitor::RequestType::STATUS: {
×
UNCOV
316
        const std::unique_lock<std::mutex> lk(_p->status_mutex);
×
317
        reply_msg.addstr(_p->status_buffer);
×
318
      }
×
319
      break;
×
320

UNCOV
321
      case Monitor::RequestType::BLACKBOARD: {
×
322
        if(requestMsg.size() != 2)
×
323
        {
UNCOV
324
          sendErrorReply("must be 2 parts message");
×
325
          continue;
×
326
        }
327
        std::string const bb_names_str = requestMsg[1].to_string();
×
328
        auto msg = generateBlackboardsDump(bb_names_str);
×
UNCOV
329
        reply_msg.addmem(msg.data(), msg.size());
×
330
      }
×
UNCOV
331
      break;
×
332

333
      case Monitor::RequestType::HOOK_INSERT: {
×
334
        if(requestMsg.size() != 2)
×
335
        {
336
          sendErrorReply("must be 2 parts message");
×
UNCOV
337
          continue;
×
338
        }
339

UNCOV
340
        auto InsertHook = [this](nlohmann::json const& json) {
×
341
          uint16_t const node_uid = json["uid"].get<uint16_t>();
×
342
          Position const pos = static_cast<Position>(json["position"].get<int>());
×
343

344
          if(auto hook = getHook(pos, node_uid))
×
345
          {
346
            std::unique_lock<std::mutex> lk(hook->mutex);
×
347
            const bool was_interactive = (hook->mode == Monitor::Hook::Mode::BREAKPOINT);
×
348
            BT::Monitor::from_json(json, *hook);
×
349

350
            // if it WAS interactive and it is not anymore, unlock it
351
            if(was_interactive && (hook->mode == Monitor::Hook::Mode::REPLACE))
×
352
            {
353
              hook->ready = true;
×
354
              lk.unlock();
×
UNCOV
355
              hook->wakeup.notify_all();
×
356
            }
357
          }
×
358
          else  // if not found, create a new one
359
          {
UNCOV
360
            auto new_hook = std::make_shared<Monitor::Hook>();
×
361
            BT::Monitor::from_json(json, *new_hook);
×
UNCOV
362
            insertHook(new_hook);
×
363
          }
×
364
        };
×
365

UNCOV
366
        auto const received_json = nlohmann::json::parse(requestMsg[1].to_string());
×
367

368
        // the json may contain a Hook or an array of Hooks
UNCOV
369
        if(received_json.is_array())
×
370
        {
371
          for(auto const& json : received_json)
×
372
          {
UNCOV
373
            InsertHook(json);
×
374
          }
375
        }
376
        else
377
        {
378
          InsertHook(received_json);
×
379
        }
380
      }
×
381
      break;
×
382

383
      case Monitor::RequestType::BREAKPOINT_UNLOCK: {
×
UNCOV
384
        if(requestMsg.size() != 2)
×
385
        {
386
          sendErrorReply("must be 2 parts message");
×
UNCOV
387
          continue;
×
388
        }
389

390
        auto json = nlohmann::json::parse(requestMsg[1].to_string());
×
UNCOV
391
        const uint16_t node_uid = json.at("uid").get<uint16_t>();
×
UNCOV
392
        const std::string status_str = json.at("desired_status").get<std::string>();
×
UNCOV
393
        auto position = static_cast<Position>(json.at("position").get<int>());
×
UNCOV
394
        const bool remove = json.at("remove_when_done").get<bool>();
×
395

UNCOV
396
        NodeStatus desired_status = NodeStatus::SKIPPED;
×
397
        if(status_str == "SUCCESS")
×
398
        {
UNCOV
399
          desired_status = NodeStatus::SUCCESS;
×
400
        }
401
        else if(status_str == "FAILURE")
×
402
        {
403
          desired_status = NodeStatus::FAILURE;
×
404
        }
405

UNCOV
406
        if(!unlockBreakpoint(position, node_uid, desired_status, remove))
×
407
        {
408
          sendErrorReply("Node ID not found");
×
409
          continue;
×
410
        }
411
      }
×
UNCOV
412
      break;
×
413

414
      case Monitor::RequestType::REMOVE_ALL_HOOKS: {
×
UNCOV
415
        removeAllHooks();
×
416
      }
UNCOV
417
      break;
×
418

UNCOV
419
      case Monitor::RequestType::DISABLE_ALL_HOOKS: {
×
420
        enableAllHooks(false);
×
421
      }
UNCOV
422
      break;
×
423

UNCOV
424
      case Monitor::RequestType::HOOK_REMOVE: {
×
425
        if(requestMsg.size() != 2)
×
426
        {
UNCOV
427
          sendErrorReply("must be 2 parts message");
×
428
          continue;
×
429
        }
430

431
        auto json = nlohmann::json::parse(requestMsg[1].to_string());
×
432
        const uint16_t node_uid = json.at("uid").get<uint16_t>();
×
UNCOV
433
        auto position = static_cast<Position>(json.at("position").get<int>());
×
434

UNCOV
435
        if(!removeHook(position, node_uid))
×
436
        {
437
          sendErrorReply("Node ID not found");
×
UNCOV
438
          continue;
×
439
        }
UNCOV
440
      }
×
441
      break;
×
442

UNCOV
443
      case Monitor::RequestType::HOOKS_DUMP: {
×
444
        const std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
×
445
        auto json_out = nlohmann::json::array();
×
UNCOV
446
        for(const auto& [node_uid, breakpoint] : _p->pre_hooks)
×
447
        {
448
          std::ignore = node_uid;  // unused in this loop
×
449
          json_out.push_back(*breakpoint);
×
450
        }
UNCOV
451
        reply_msg.addstr(json_out.dump());
×
452
      }
×
UNCOV
453
      break;
×
454

455
      case Monitor::RequestType::TOGGLE_RECORDING: {
×
UNCOV
456
        if(requestMsg.size() != 2)
×
457
        {
458
          sendErrorReply("must be 2 parts message");
×
UNCOV
459
          continue;
×
460
        }
461

462
        auto const cmd = (requestMsg[1].to_string());
×
463
        if(cmd == "start")
×
464
        {
465
          _p->recording = true;
×
466
          // to keep the first time for callback
UNCOV
467
          _p->recording_fist_time = std::chrono::duration_cast<std::chrono::microseconds>(
×
468
              std::chrono::high_resolution_clock::now().time_since_epoch());
×
469
          // to send consistent time for client
470
          auto now = std::chrono::duration_cast<std::chrono::microseconds>(
×
UNCOV
471
              std::chrono::system_clock::now().time_since_epoch());
×
472
          reply_msg.addstr(std::to_string(now.count()));
×
473
          const std::unique_lock<std::mutex> lk(_p->status_mutex);
×
UNCOV
474
          _p->transitions_buffer.clear();
×
475
        }
×
476
        else if(cmd == "stop")
×
477
        {
UNCOV
478
          _p->recording = false;
×
479
        }
480
      }
×
UNCOV
481
      break;
×
482

UNCOV
483
      case Monitor::RequestType::GET_TRANSITIONS: {
×
484
        thread_local std::string trans_buffer;
×
485
        trans_buffer.resize(9 * _p->transitions_buffer.size());
×
486

487
        const std::unique_lock<std::mutex> lk(_p->status_mutex);
×
488
        size_t offset = 0;
×
489
        for(const auto& trans : _p->transitions_buffer)
×
490
        {
491
          std::memcpy(&trans_buffer[offset], &trans.timestamp_usec, 6);
×
492
          offset += 6;
×
493
          std::memcpy(&trans_buffer[offset], &trans.node_uid, 2);
×
UNCOV
494
          offset += 2;
×
495
          std::memcpy(&trans_buffer[offset], &trans.status, 1);
×
UNCOV
496
          offset += 1;
×
497
        }
498
        _p->transitions_buffer.clear();
×
UNCOV
499
        trans_buffer.resize(offset);
×
500
        reply_msg.addstr(trans_buffer);
×
501
      }
×
502
      break;
×
503

504
      default: {
×
505
        sendErrorReply("Request not recognized");
×
506
        continue;
×
507
      }
508
    }
×
509
    // send the reply
510
    try
511
    {
NEW
512
      reply_msg.send(socket);
×
513
    }
NEW
514
    catch(const zmq::error_t&)
×
515
    {
516
      // Context was terminated or socket error - exit the loop
NEW
517
      break;
×
NEW
518
    }
×
519
  }
10✔
520
}
10✔
521

UNCOV
522
void BT::Groot2Publisher::enableAllHooks(bool enable)
×
523
{
524
  const std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
×
525
  for(const auto& [node_uid, hook] : _p->pre_hooks)
×
526
  {
527
    std::ignore = node_uid;  // unused in this loop
×
UNCOV
528
    std::unique_lock<std::mutex> lk(hook->mutex);
×
529
    hook->enabled = enable;
×
530
    // when disabling, remember to wake up blocked ones
531
    if(!hook->enabled && hook->mode == Monitor::Hook::Mode::BREAKPOINT)
×
532
    {
533
      lk.unlock();
×
UNCOV
534
      hook->wakeup.notify_all();
×
535
    }
UNCOV
536
  }
×
537
}
×
538

539
void Groot2Publisher::heartbeatLoop()
10✔
540
{
541
  bool has_heartbeat = true;
10✔
542

543
  while(_p->active_server)
20✔
544
  {
545
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
10✔
546

547
    auto now = std::chrono::steady_clock::now();
10✔
548
    const bool prev_heartbeat = has_heartbeat;
10✔
549

550
    {
551
      const std::unique_lock lk(_p->last_heartbeat_mutex);
10✔
552
      has_heartbeat = (now - _p->last_heartbeat < _p->max_heartbeat_delay);
10✔
553
    }
10✔
554

555
    // if we loose or gain heartbeat, disable/enable all breakpoints
556
    if(has_heartbeat != prev_heartbeat)
10✔
557
    {
558
      enableAllHooks(has_heartbeat);
×
559
    }
560
  }
561
}
10✔
562

UNCOV
563
std::vector<uint8_t> Groot2Publisher::generateBlackboardsDump(const std::string& bb_list)
×
564
{
UNCOV
565
  auto json = nlohmann::json();
×
UNCOV
566
  auto const bb_names = BT::splitString(bb_list, ';');
×
UNCOV
567
  for(auto name : bb_names)
×
568
  {
UNCOV
569
    std::string const bb_name(name);
×
UNCOV
570
    auto it = _p->subtrees.find(bb_name);
×
571

UNCOV
572
    if(it != _p->subtrees.end())
×
573
    {
574
      // lock the weak pointer
UNCOV
575
      if(auto subtree = it->second.lock())
×
576
      {
UNCOV
577
        json[bb_name] = ExportBlackboardToJSON(*subtree->blackboard);
×
UNCOV
578
      }
×
579
    }
UNCOV
580
  }
×
UNCOV
581
  return nlohmann::json::to_msgpack(json);
×
UNCOV
582
}
×
583

UNCOV
584
bool Groot2Publisher::insertHook(std::shared_ptr<Monitor::Hook> hook)
×
585
{
UNCOV
586
  auto const node_uid = hook->node_uid;
×
UNCOV
587
  auto it = _p->nodes_by_uid.find(node_uid);
×
588
  if(it == _p->nodes_by_uid.end())
×
589
  {
590
    return false;
×
591
  }
592
  const TreeNode::Ptr node = it->second.lock();
×
UNCOV
593
  if(!node)
×
594
  {
595
    return false;
×
596
  }
597

UNCOV
598
  auto injectedCallback = [hook, this](TreeNode& node) -> NodeStatus {
×
UNCOV
599
    std::unique_lock<std::mutex> lk(hook->mutex);
×
600
    if(!hook->enabled)
×
601
    {
602
      return NodeStatus::SKIPPED;
×
603
    }
604

605
    // Notify that a breakpoint was reached, using the _p->publisher
606
    const Monitor::RequestHeader breakpoint_request(Monitor::BREAKPOINT_REACHED);
×
607
    zmq::multipart_t request_msg;
×
UNCOV
608
    request_msg.addstr(Monitor::SerializeHeader(breakpoint_request));
×
609
    request_msg.addstr(std::to_string(hook->node_uid));
×
UNCOV
610
    request_msg.send(_p->publisher);
×
611

612
    // wait until someone wake us up
613
    if(hook->mode == Monitor::Hook::Mode::BREAKPOINT)
×
614
    {
615
      hook->wakeup.wait(lk, [hook]() { return hook->ready || !hook->enabled; });
×
616

617
      hook->ready = false;
×
618
      // wait was unblocked but it could be the breakpoint becoming disabled.
619
      // in this case, just skip
620
      if(!hook->enabled)
×
621
      {
UNCOV
622
        return NodeStatus::SKIPPED;
×
623
      }
624
    }
625

UNCOV
626
    if(hook->remove_when_done)
×
627
    {
628
      // self-destruction at the end of this lambda function
UNCOV
629
      const std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
×
UNCOV
630
      _p->pre_hooks.erase(hook->node_uid);
×
631
      node.setPreTickFunction({});
×
632
    }
×
633
    return hook->desired_status;
×
634
  };
×
635

UNCOV
636
  const std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
×
UNCOV
637
  _p->pre_hooks[node_uid] = hook;
×
638
  node->setPreTickFunction(injectedCallback);
×
639

640
  return true;
×
UNCOV
641
}
×
642

UNCOV
643
bool Groot2Publisher::unlockBreakpoint(Position pos, uint16_t node_uid, NodeStatus result,
×
644
                                       bool remove)
645
{
UNCOV
646
  auto it = _p->nodes_by_uid.find(node_uid);
×
647
  if(it == _p->nodes_by_uid.end())
×
648
  {
UNCOV
649
    return false;
×
650
  }
651
  const TreeNode::Ptr node = it->second.lock();
×
UNCOV
652
  if(!node)
×
653
  {
654
    return false;
×
655
  }
656

657
  auto hook = getHook(pos, node_uid);
×
658
  if(!hook)
×
659
  {
UNCOV
660
    return false;
×
661
  }
662

663
  {
UNCOV
664
    std::unique_lock<std::mutex> lk(hook->mutex);
×
665
    hook->desired_status = result;
×
666
    hook->remove_when_done |= remove;
×
UNCOV
667
    if(hook->mode == Monitor::Hook::Mode::BREAKPOINT)
×
668
    {
UNCOV
669
      hook->ready = true;
×
UNCOV
670
      lk.unlock();
×
671
      hook->wakeup.notify_all();
×
672
    }
UNCOV
673
  }
×
674
  return true;
×
UNCOV
675
}
×
676

677
bool Groot2Publisher::removeHook(Position pos, uint16_t node_uid)
×
678
{
679
  auto it = _p->nodes_by_uid.find(node_uid);
×
UNCOV
680
  if(it == _p->nodes_by_uid.end())
×
681
  {
682
    return false;
×
683
  }
UNCOV
684
  const TreeNode::Ptr node = it->second.lock();
×
685
  if(!node)
×
686
  {
UNCOV
687
    return false;
×
688
  }
689

690
  auto hook = getHook(pos, node_uid);
×
691
  if(!hook)
×
692
  {
UNCOV
693
    return false;
×
694
  }
695

696
  {
UNCOV
697
    const std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
×
698
    _p->pre_hooks.erase(node_uid);
×
699
  }
×
700
  node->setPreTickFunction({});
×
701

702
  // Disable breakpoint, if it was interactive and blocked
703
  {
704
    std::unique_lock<std::mutex> lk(hook->mutex);
×
705
    if(hook->mode == Monitor::Hook::Mode::BREAKPOINT)
×
706
    {
707
      hook->enabled = false;
×
UNCOV
708
      lk.unlock();
×
709
      hook->wakeup.notify_all();
×
710
    }
UNCOV
711
  }
×
712
  return true;
×
UNCOV
713
}
×
714

715
void Groot2Publisher::removeAllHooks()
10✔
716
{
717
  std::vector<uint16_t> uids;
10✔
718

719
  for(auto pos : { Position::PRE, Position::POST })
30✔
720
  {
721
    uids.clear();
20✔
722
    auto* hooks = pos == Position::PRE ? &_p->pre_hooks : &_p->post_hooks;
20✔
723
    std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
20✔
724
    if(!hooks->empty())
20✔
725
    {
UNCOV
726
      uids.reserve(hooks->size());
×
UNCOV
727
      for(const auto& [node_uid, hook_ptr] : *hooks)
×
728
      {
729
        std::ignore = hook_ptr;  // unused in this loop
×
730
        uids.push_back(node_uid);
×
731
      }
732

733
      lk.unlock();
×
734
      for(auto node_uid : uids)
×
735
      {
736
        removeHook(pos, node_uid);
×
737
      }
738
    }
739
  }
20✔
740
}
10✔
741

UNCOV
742
Monitor::Hook::Ptr Groot2Publisher::getHook(Position pos, uint16_t node_uid)
×
743
{
UNCOV
744
  auto* hooks = pos == Position::PRE ? &_p->pre_hooks : &_p->post_hooks;
×
UNCOV
745
  const std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
×
UNCOV
746
  auto bk_it = hooks->find(node_uid);
×
UNCOV
747
  if(bk_it == hooks->end())
×
748
  {
UNCOV
749
    return {};
×
750
  }
751
  return bk_it->second;
×
752
}
×
753

754
}  // namespace BT
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc