• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

eclipse-bluechi / bluechi / 12314336845

13 Dec 2024 10:55AM UTC coverage: 80.956% (-0.06%) from 81.013%
12314336845

Pull #1012

github

web-flow
Merge bb0a7370e into c31e3e539
Pull Request #1012: Fix peer ip error log

7 of 8 new or added lines in 3 files covered. (87.5%)

4 existing lines in 3 files now uncovered.

5386 of 6653 relevant lines covered (80.96%)

1102.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.9
/src/controller/node.c
1
/*
2
 * Copyright Contributors to the Eclipse BlueChi project
3
 *
4
 * SPDX-License-Identifier: LGPL-2.1-or-later
5
 */
6
#include <systemd/sd-bus.h>
7

8
#include "libbluechi/bus/bus.h"
9
#include "libbluechi/bus/utils.h"
10
#include "libbluechi/common/parse-util.h"
11
#include "libbluechi/common/time-util.h"
12
#include "libbluechi/log/log.h"
13

14
#include "controller.h"
15
#include "job.h"
16
#include "metrics.h"
17
#include "monitor.h"
18
#include "node.h"
19
#include "proxy_monitor.h"
20

21
#define DEBUG_AGENT_MESSAGES 0
22

23
static void node_send_agent_subscribe_all(Node *node);
24
static void node_start_proxy_dependency_all(Node *node);
25
static int node_run_unit_lifecycle_method(sd_bus_message *m, Node *node, const char *job_type, const char *method);
26

27
static int node_method_register(sd_bus_message *m, void *userdata, sd_bus_error *ret_error);
28
static int node_disconnected(sd_bus_message *message, void *userdata, sd_bus_error *error);
29
static int node_method_list_units(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
30
static int node_method_list_unit_files(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
31
static int node_method_set_unit_properties(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
32
static int node_method_start_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
33
static int node_method_stop_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
34
static int node_method_restart_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
35
static int node_method_reload_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
36
static int node_method_passthrough_to_agent(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
37
static int node_method_set_log_level(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error);
38
static int node_property_get_status(
39
                sd_bus *bus,
40
                const char *path,
41
                const char *interface,
42
                const char *property,
43
                sd_bus_message *reply,
44
                void *userdata,
45
                sd_bus_error *ret_error);
46
static int node_property_get_peer_ip(
47
                sd_bus *bus,
48
                const char *path,
49
                const char *interface,
50
                const char *property,
51
                sd_bus_message *reply,
52
                void *userdata,
53
                sd_bus_error *ret_error);
54

55
static const sd_bus_vtable internal_controller_controller_vtable[] = {
56
        SD_BUS_VTABLE_START(0),
57
        SD_BUS_METHOD("Register", "s", "", node_method_register, 0),
58
        SD_BUS_SIGNAL("Heartbeat", "", 0),
59
        SD_BUS_VTABLE_END
60
};
61

62
static const sd_bus_vtable node_vtable[] = {
63
        SD_BUS_VTABLE_START(0),
64
        SD_BUS_METHOD("ListUnits", "", UNIT_INFO_STRUCT_ARRAY_TYPESTRING, node_method_list_units, 0),
65
        SD_BUS_METHOD("ListUnitFiles", "", UNIT_FILE_INFO_STRUCT_ARRAY_TYPESTRING, node_method_list_unit_files, 0),
66
        SD_BUS_METHOD("GetUnitFileState", "s", "s", node_method_passthrough_to_agent, 0),
67
        SD_BUS_METHOD("StartUnit", "ss", "o", node_method_start_unit, 0),
68
        SD_BUS_METHOD("StopUnit", "ss", "o", node_method_stop_unit, 0),
69
        SD_BUS_METHOD("FreezeUnit", "s", "", node_method_passthrough_to_agent, 0),
70
        SD_BUS_METHOD("ThawUnit", "s", "", node_method_passthrough_to_agent, 0),
71
        SD_BUS_METHOD("RestartUnit", "ss", "o", node_method_restart_unit, 0),
72
        SD_BUS_METHOD("ReloadUnit", "ss", "o", node_method_reload_unit, 0),
73
        SD_BUS_METHOD("ResetFailed", "", "", node_method_passthrough_to_agent, 0),
74
        SD_BUS_METHOD("ResetFailedUnit", "s", "", node_method_passthrough_to_agent, 0),
75
        SD_BUS_METHOD("GetUnitProperties", "ss", "a{sv}", node_method_passthrough_to_agent, 0),
76
        SD_BUS_METHOD("GetUnitProperty", "sss", "v", node_method_passthrough_to_agent, 0),
77
        SD_BUS_METHOD("SetUnitProperties", "sba(sv)", "", node_method_set_unit_properties, 0),
78
        SD_BUS_METHOD("EnableUnitFiles", "asbb", "ba(sss)", node_method_passthrough_to_agent, 0),
79
        SD_BUS_METHOD("DisableUnitFiles", "asb", "a(sss)", node_method_passthrough_to_agent, 0),
80
        SD_BUS_METHOD("Reload", "", "", node_method_passthrough_to_agent, 0),
81
        SD_BUS_METHOD("KillUnit", "ssi", "", node_method_passthrough_to_agent, 0),
82
        SD_BUS_METHOD("SetLogLevel", "s", "", node_method_set_log_level, 0),
83
        SD_BUS_METHOD("GetDefaultTarget", "", "s", node_method_passthrough_to_agent, 0),
84
        SD_BUS_METHOD("SetDefaultTarget", "sb", "a(sss)", node_method_passthrough_to_agent, 0),
85
        SD_BUS_PROPERTY("Name", "s", NULL, offsetof(Node, name), SD_BUS_VTABLE_PROPERTY_CONST),
86
        SD_BUS_PROPERTY("Status", "s", node_property_get_status, 0, SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
87
        SD_BUS_PROPERTY("PeerIp", "s", node_property_get_peer_ip, 0, SD_BUS_VTABLE_PROPERTY_EXPLICIT),
88
        SD_BUS_PROPERTY("LastSeenTimestamp", "t", NULL, offsetof(Node, last_seen), SD_BUS_VTABLE_PROPERTY_EXPLICIT),
89
        SD_BUS_PROPERTY("LastSeenTimestampMonotonic",
90
                        "t",
91
                        NULL,
92
                        offsetof(Node, last_seen_monotonic),
93
                        SD_BUS_VTABLE_PROPERTY_EXPLICIT),
94
        SD_BUS_VTABLE_END
95
};
96

97
struct ProxyDependency {
98
        char *unit_name;
99
        int n_deps;
100
        LIST_FIELDS(ProxyDependency, deps);
101
};
102

103
static void proxy_dependency_free(struct ProxyDependency *dep) {
11✔
104
        free_and_null(dep->unit_name);
11✔
105
        free_and_null(dep);
11✔
106
}
11✔
107

108
typedef struct UnitSubscription UnitSubscription;
109

110
struct UnitSubscription {
111
        Subscription *sub;
112
        LIST_FIELDS(UnitSubscription, subs);
113
};
114

115
typedef struct {
116
        char *unit;
117
        LIST_HEAD(UnitSubscription, subs);
118
        bool loaded;
119
        UnitActiveState active_state;
120
        char *substate;
121
} UnitSubscriptions;
122

123
typedef struct {
124
        char *unit;
125
} UnitSubscriptionsKey;
126

127
static void unit_subscriptions_clear(void *item) {
28✔
128
        UnitSubscriptions *usubs = item;
28✔
129
        free_and_null(usubs->unit);
28✔
130
        free_and_null(usubs->substate);
28✔
131
        assert(LIST_IS_EMPTY(usubs->subs));
28✔
132
}
28✔
133

134
static uint64_t unit_subscriptions_hash(const void *item, uint64_t seed0, uint64_t seed1) {
865✔
135
        const UnitSubscriptions *usubs = item;
865✔
136
        return hashmap_sip(usubs->unit, strlen(usubs->unit), seed0, seed1);
865✔
137
}
138

139
static int unit_subscriptions_compare(const void *a, const void *b, UNUSED void *udata) {
489✔
140
        const UnitSubscriptions *usubs_a = a;
489✔
141
        const UnitSubscriptions *usubs_b = b;
489✔
142

143
        return strcmp(usubs_a->unit, usubs_b->unit);
489✔
144
}
145

146

147
Node *node_new(Controller *controller, const char *name) {
414✔
148
        _cleanup_node_ Node *node = malloc0(sizeof(Node));
414✔
149
        if (node == NULL) {
414✔
150
                return NULL;
151
        }
152

153
        node->ref_count = 1;
414✔
154
        node->controller = controller;
414✔
155
        LIST_INIT(nodes, node);
414✔
156
        LIST_HEAD_INIT(node->outstanding_requests);
414✔
157
        LIST_HEAD_INIT(node->proxy_monitors);
414✔
158
        LIST_HEAD_INIT(node->proxy_dependencies);
414✔
159

160
        node->unit_subscriptions = hashmap_new(
414✔
161
                        sizeof(UnitSubscriptions),
162
                        0,
163
                        0,
164
                        0,
165
                        unit_subscriptions_hash,
166
                        unit_subscriptions_compare,
167
                        unit_subscriptions_clear,
168
                        NULL);
169
        if (node->unit_subscriptions == NULL) {
414✔
170
                return NULL;
171
        }
172

173
        node->last_seen = 0;
414✔
174
        node->last_seen_monotonic = 0;
414✔
175

176
        node->name = NULL;
414✔
177
        if (name) {
414✔
178
                node->name = strdup(name);
280✔
179
                if (node->name == NULL) {
280✔
180
                        return NULL;
181
                }
182

183
                int r = assemble_object_path_string(NODE_OBJECT_PATH_PREFIX, name, &node->object_path);
280✔
184
                if (r < 0) {
280✔
185
                        return NULL;
186
                }
187
        }
188
        node->peer_ip = NULL;
414✔
189

190
        node->is_shutdown = false;
414✔
191

192
        return steal_pointer(&node);
414✔
193
}
194

195
Node *node_ref(Node *node) {
131✔
196
        node->ref_count++;
131✔
197
        return node;
131✔
198
}
199

200
void node_unref(Node *node) {
545✔
201
        node->ref_count--;
545✔
202
        if (node->ref_count != 0) {
545✔
203
                return;
204
        }
205

206
        ProxyMonitor *proxy_monitor = NULL;
414✔
207
        ProxyMonitor *next_proxy_monitor = NULL;
414✔
208
        LIST_FOREACH_SAFE(monitors, proxy_monitor, next_proxy_monitor, node->proxy_monitors) {
414✔
209
                node_remove_proxy_monitor(node, proxy_monitor);
×
210
        }
211

212

213
        ProxyDependency *dep = NULL;
414✔
214
        ProxyDependency *next_dep = NULL;
414✔
215
        LIST_FOREACH_SAFE(deps, dep, next_dep, node->proxy_dependencies) {
414✔
216
                proxy_dependency_free(dep);
×
217
        }
218

219

220
        node_unset_agent_bus(node);
414✔
221
        sd_bus_slot_unrefp(&node->export_slot);
414✔
222

223
        hashmap_free(node->unit_subscriptions);
414✔
224

225
        free_and_null(node->name);
414✔
226
        free_and_null(node->object_path);
414✔
227
        free_and_null(node->peer_ip);
414✔
228
        free(node);
414✔
229
}
230

231
void node_shutdown(Node *node) {
10✔
232
        AgentRequest *req = NULL;
10✔
233
        AgentRequest *next_req = NULL;
10✔
234
        node->is_shutdown = true;
10✔
235
        LIST_FOREACH_SAFE(outstanding_requests, req, next_req, node->outstanding_requests) {
10✔
236
                agent_request_cancel(req);
×
237
        }
238
}
10✔
239

240
bool node_export(Node *node) {
280✔
241
        Controller *controller = node->controller;
280✔
242

243
        assert(node->name != NULL);
280✔
244

245
        int r = sd_bus_add_object_vtable(
560✔
246
                        controller->api_bus,
247
                        &node->export_slot,
248
                        node->object_path,
280✔
249
                        NODE_INTERFACE,
250
                        node_vtable,
251
                        node);
252
        if (r < 0) {
280✔
253
                bc_log_errorf("Failed to add node vtable: %s", strerror(-r));
×
254
                return false;
×
255
        }
256

257
        return true;
258
}
259

260
static int debug_messages_handler(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
261
        Node *node = userdata;
262
        if (node->name) {
263
                bc_log_infof("Incoming message from node '%s' (fd %d): path: %s, iface: %s, member: %s, signature: '%s'",
264
                             node->name,
265
                             sd_bus_get_fd(node->agent_bus),
266
                             sd_bus_message_get_path(m),
267
                             sd_bus_message_get_interface(m),
268
                             sd_bus_message_get_member(m),
269
                             sd_bus_message_get_signature(m, true));
270
        } else {
271
                bc_log_infof("Incoming message from node fd %d: path: %s, iface: %s, member: %s, signature: '%s'",
272
                             sd_bus_get_fd(node->agent_bus),
273
                             sd_bus_message_get_path(m),
274
                             sd_bus_message_get_interface(m),
275
                             sd_bus_message_get_member(m),
276
                             sd_bus_message_get_signature(m, true));
277
        }
278
        return 0;
279
}
280

281
bool node_has_agent(Node *node) {
1,331✔
282
        return node->agent_bus != NULL;
1,331✔
283
}
284

285
bool node_is_online(Node *node) {
417✔
286
        return node && node->name && node_has_agent(node);
417✔
287
}
288

289

290
static uint64_t subscription_hashmap_hash(const void *item, UNUSED uint64_t seed0, UNUSED uint64_t seed1) {
311✔
291
        const Subscription * const *subscriptionp = item;
311✔
292
        return (uint64_t) ((uintptr_t) *subscriptionp);
311✔
293
}
294

295
static int subscription_hashmap_compare(const void *a, const void *b, UNUSED void *udata) {
×
296
        const Subscription * const *subscription_a_p = a;
×
297
        const Subscription * const *subscription_b_p = b;
×
298
        if ((*subscription_a_p)->monitor == (*subscription_b_p)->monitor) {
×
299
                return 0;
×
300
        }
301
        return 1;
302
}
303

304
static struct hashmap *node_compute_unique_monitor_subscriptions(Node *node, const char *unit) {
315✔
305
        struct hashmap *unique_subs = hashmap_new(
315✔
306
                        sizeof(void *), 0, 0, 0, subscription_hashmap_hash, subscription_hashmap_compare, NULL, NULL);
307
        if (unique_subs == NULL) {
315✔
308
                return NULL;
315✔
309
        }
310

311
        const UnitSubscriptionsKey key = { (char *) unit };
315✔
312
        const UnitSubscriptions *usubs = hashmap_get(node->unit_subscriptions, &key);
315✔
313
        if (usubs != NULL) {
315✔
314
                UnitSubscription *usub = NULL;
273✔
315
                UnitSubscription *next_usub = NULL;
273✔
316
                LIST_FOREACH_SAFE(subs, usub, next_usub, usubs->subs) {
546✔
317
                        Subscription *sub = usub->sub;
273✔
318
                        hashmap_set(unique_subs, &sub);
273✔
319
                        if (hashmap_oom(unique_subs)) {
273✔
320
                                bc_log_error("Failed to compute vector of unique monitors, OOM");
×
321

322
                                hashmap_free(unique_subs);
×
323
                                unique_subs = NULL;
×
324
                                return NULL;
×
325
                        }
326
                }
327
        }
328

329
        /* Only check for wildcards if the unit itself is not one. */
330
        if (!streq(unit, SYMBOL_WILDCARD)) {
315✔
331
                const UnitSubscriptionsKey wildcard_key = { (char *) SYMBOL_WILDCARD };
307✔
332
                const UnitSubscriptions *usubs_wildcard = hashmap_get(node->unit_subscriptions, &wildcard_key);
307✔
333
                if (usubs_wildcard != NULL) {
307✔
334
                        UnitSubscription *usub = NULL;
38✔
335
                        UnitSubscription *next_usub = NULL;
38✔
336
                        LIST_FOREACH_SAFE(subs, usub, next_usub, usubs_wildcard->subs) {
76✔
337
                                Subscription *sub = usub->sub;
38✔
338
                                hashmap_set(unique_subs, &sub);
38✔
339
                                if (hashmap_oom(unique_subs)) {
38✔
340
                                        bc_log_error("Failed to compute vector of unique monitors, OOM");
×
341

342
                                        hashmap_free(unique_subs);
×
343
                                        unique_subs = NULL;
×
344
                                        return NULL;
×
345
                                }
346
                        }
347
                }
348
        }
349

350
        return unique_subs;
351
}
352

353

354
static int node_match_job_state_changed(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
4✔
355
        Node *node = userdata;
4✔
356
        Controller *controller = node->controller;
4✔
357
        uint32_t bc_job_id = 0;
4✔
358
        const char *state = NULL;
4✔
359

360
        int r = sd_bus_message_read(m, "us", &bc_job_id, &state);
4✔
361
        if (r < 0) {
4✔
362
                bc_log_errorf("Invalid JobStateChange signal: %s", strerror(-r));
×
363
                return 0;
×
364
        }
365

366
        controller_job_state_changed(controller, bc_job_id, state);
4✔
367
        return 1;
368
}
369

370
static int node_match_unit_properties_changed(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
212✔
371
        Node *node = userdata;
212✔
372
        const char *unit = NULL;
212✔
373
        const char *interface = NULL;
212✔
374

375
        int r = sd_bus_message_read(m, "ss", &unit, &interface);
212✔
376
        if (r >= 0) {
212✔
377
                r = sd_bus_message_rewind(m, false);
212✔
378
        }
379
        if (r < 0) {
212✔
380
                bc_log_error("Invalid UnitPropertiesChanged signal");
×
381
                return 0;
×
382
        }
383

384
        struct hashmap *unique_subs = node_compute_unique_monitor_subscriptions(node, unit);
212✔
385
        if (unique_subs != NULL) {
212✔
386
                Subscription **subp = NULL;
212✔
387
                size_t i = 0;
212✔
388
                while (hashmap_iter(unique_subs, &i, (void **) &subp)) {
633✔
389
                        Subscription *sub = *subp;
209✔
390
                        int r = sub->handle_unit_property_changed(sub->monitor, node->name, unit, interface, m);
209✔
391
                        if (r < 0) {
209✔
392
                                bc_log_error("Failed to emit UnitPropertyChanged signal");
×
393
                        }
394
                }
395
                hashmap_free(unique_subs);
212✔
396
        }
397

398
        return 1;
399
}
400

401

402
static int node_match_unit_new(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
34✔
403
        Node *node = userdata;
34✔
404
        const char *unit = NULL;
34✔
405
        const char *reason = NULL;
34✔
406

407
        int r = sd_bus_message_read(m, "ss", &unit, &reason);
34✔
408
        if (r < 0) {
34✔
409
                bc_log_errorf("Invalid UnitNew signal: %s", strerror(-r));
×
410
                return 0;
×
411
        }
412

413
        const UnitSubscriptionsKey key = { (char *) unit };
34✔
414
        UnitSubscriptions *usubs = (UnitSubscriptions *) hashmap_get(node->unit_subscriptions, &key);
34✔
415
        if (usubs != NULL) {
34✔
416
                usubs->loaded = true;
32✔
417
                if (is_wildcard(unit)) {
32✔
418
                        usubs->active_state = UNIT_ACTIVE;
6✔
419
                        free(usubs->substate);
6✔
420
                        usubs->substate = strdup("running");
6✔
421
                }
422
        }
423

424
        struct hashmap *unique_subs = node_compute_unique_monitor_subscriptions(node, unit);
34✔
425
        if (unique_subs != NULL) {
34✔
426
                Subscription **subp = NULL;
34✔
427
                size_t i = 0;
34✔
428
                while (hashmap_iter(unique_subs, &i, (void **) &subp)) {
102✔
429
                        Subscription *sub = *subp;
34✔
430
                        int r = sub->handle_unit_new(sub->monitor, node->name, unit, reason);
34✔
431
                        if (r < 0) {
34✔
432
                                bc_log_errorf("Failed to emit UnitNew signal: %s", strerror(-r));
×
433
                        }
434
                }
435
                hashmap_free(unique_subs);
34✔
436
        }
437

438
        return 1;
439
}
440

441
static int node_match_unit_state_changed(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
49✔
442
        Node *node = userdata;
49✔
443
        const char *unit = NULL;
49✔
444
        const char *active_state = NULL;
49✔
445
        const char *substate = NULL;
49✔
446
        const char *reason = NULL;
49✔
447

448
        int r = sd_bus_message_read(m, "ssss", &unit, &active_state, &substate, &reason);
49✔
449
        if (r < 0) {
49✔
450
                bc_log_errorf("Invalid UnitStateChanged signal: %s", strerror(-r));
×
451
                return 0;
×
452
        }
453

454
        const UnitSubscriptionsKey key = { (char *) unit };
49✔
455
        UnitSubscriptions *usubs = (UnitSubscriptions *) hashmap_get(node->unit_subscriptions, &key);
49✔
456
        if (usubs != NULL) {
49✔
457
                usubs->loaded = true;
45✔
458
                usubs->active_state = active_state_from_string(active_state);
45✔
459
                free(usubs->substate);
45✔
460
                usubs->substate = strdup(substate);
45✔
461
        }
462

463
        struct hashmap *unique_subs = node_compute_unique_monitor_subscriptions(node, unit);
49✔
464
        if (unique_subs != NULL) {
49✔
465
                Subscription **subp = NULL;
49✔
466
                size_t i = 0;
49✔
467
                while (hashmap_iter(unique_subs, &i, (void **) &subp)) {
147✔
468
                        Subscription *sub = *subp;
49✔
469
                        int r = sub->handle_unit_state_changed(
98✔
470
                                        sub->monitor, node->name, unit, active_state, substate, reason);
49✔
471
                        if (r < 0) {
49✔
472
                                bc_log_errorf("Failed to emit UnitStateChanged signal: %s", strerror(-r));
×
473
                        }
474
                }
475
                hashmap_free(unique_subs);
49✔
476
        }
477

478
        return 1;
479
}
480

481
static int node_match_unit_removed(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
18✔
482
        Node *node = userdata;
18✔
483
        const char *unit = NULL;
18✔
484

485
        int r = sd_bus_message_read(m, "s", &unit);
18✔
486
        if (r < 0) {
18✔
487
                bc_log_errorf("Invalid UnitRemoved signal: %s", strerror(-r));
×
488
                return 0;
×
489
        }
490

491
        const UnitSubscriptionsKey key = { (char *) unit };
18✔
492
        UnitSubscriptions *usubs = (UnitSubscriptions *) hashmap_get(node->unit_subscriptions, &key);
18✔
493
        if (usubs != NULL) {
18✔
494
                usubs->loaded = false;
15✔
495
        }
496

497
        struct hashmap *unique_subs = node_compute_unique_monitor_subscriptions(node, unit);
18✔
498
        if (unique_subs != NULL) {
18✔
499
                Subscription **subp = NULL;
18✔
500
                size_t i = 0;
18✔
501
                while (hashmap_iter(unique_subs, &i, (void **) &subp)) {
53✔
502
                        Subscription *sub = *subp;
17✔
503
                        int r = sub->handle_unit_removed(sub->monitor, node->name, unit, "real");
17✔
504
                        if (r < 0) {
17✔
505
                                bc_log_errorf("Failed to emit UnitRemoved signal: %s", strerror(-r));
×
506
                        }
507
                }
508
                hashmap_free(unique_subs);
18✔
509
        }
510

511
        return 1;
512
}
513

514
static int node_match_job_done(UNUSED sd_bus_message *m, UNUSED void *userdata, UNUSED sd_bus_error *error) {
62✔
515
        Node *node = userdata;
62✔
516
        Controller *controller = node->controller;
62✔
517
        uint32_t bc_job_id = 0;
62✔
518
        const char *result = NULL;
62✔
519

520
        int r = sd_bus_message_read(m, "us", &bc_job_id, &result);
62✔
521
        if (r < 0) {
62✔
522
                bc_log_errorf("Invalid JobDone signal: %s", strerror(-r));
×
523
                return 0;
×
524
        }
525

526
        controller_finish_job(controller, bc_job_id, result);
62✔
527
        return 1;
528
}
529

530
static int node_match_heartbeat(UNUSED sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
33✔
531
        Node *node = userdata;
33✔
532
        uint64_t now = 0;
33✔
533
        uint64_t now_monotonic = 0;
33✔
534

535
        now = get_time_micros();
33✔
536
        if (now == USEC_INFINITY) {
33✔
537
                bc_log_error("Failed to get current time on heartbeat");
×
538
                return 0;
×
539
        }
540

541
        now_monotonic = get_time_micros_monotonic();
33✔
542
        if (now_monotonic == USEC_INFINITY) {
33✔
543
                bc_log_error("Failed to get current monotonic time on heartbeat");
×
544
                return 0;
×
545
        }
546

547
        node->last_seen = now;
33✔
548
        node->last_seen_monotonic = now_monotonic;
33✔
549
        return 1;
33✔
550
}
551

552
static ProxyMonitor *node_find_proxy_monitor(Node *node, const char *target_node_name, const char *unit_name) {
23✔
553
        ProxyMonitor *proxy_monitor = NULL;
23✔
554
        LIST_FOREACH(monitors, proxy_monitor, node->proxy_monitors) {
23✔
555
                if (streq(proxy_monitor->target_node->name, target_node_name) &&
10✔
556
                    streq(proxy_monitor->unit_name, unit_name)) {
10✔
557
                        return proxy_monitor;
558
                }
559
        }
560

561
        return NULL;
562
}
563

564
static int node_on_match_proxy_new(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
13✔
565
        Node *node = userdata;
13✔
566
        Controller *controller = node->controller;
13✔
567

568
        const char *target_node_name = NULL;
13✔
569
        const char *unit_name = NULL;
13✔
570
        const char *proxy_object_path = NULL;
13✔
571

572
        int r = sd_bus_message_read(m, "sso", &target_node_name, &unit_name, &proxy_object_path);
13✔
573
        if (r < 0) {
13✔
574
                bc_log_errorf("Invalid arguments in ProxyNew signal: %s", strerror(-r));
×
575
                return r;
×
576
        }
577
        bc_log_infof("Node '%s' registered new proxy for unit '%s' on node '%s'",
13✔
578
                     node->name,
579
                     unit_name,
580
                     target_node_name);
581

582
        _cleanup_proxy_monitor_ ProxyMonitor *monitor = proxy_monitor_new(
26✔
583
                        node, target_node_name, unit_name, proxy_object_path);
584
        if (monitor == NULL) {
13✔
585
                bc_log_error("Failed to create proxy monitor, OOM");
×
586
                return -ENOMEM;
587
        }
588

589
        Node *target_node = controller_find_node(controller, target_node_name);
13✔
590
        if (target_node == NULL) {
13✔
591
                bc_log_error("Proxy requested for non-existing node");
1✔
592
                proxy_monitor_send_error(monitor, "No such node");
1✔
593
                return 0;
594
        }
595

596
        ProxyMonitor *old_monitor = node_find_proxy_monitor(node, target_node_name, unit_name);
12✔
597
        if (old_monitor != NULL) {
12✔
598
                bc_log_warnf("Proxy for '%s' (on '%s') requested, but old proxy already exists, removing it",
×
599
                             unit_name,
600
                             target_node_name);
601
                node_remove_proxy_monitor(node, old_monitor);
×
602
        }
603

604
        r = proxy_monitor_set_target_node(monitor, target_node);
12✔
605
        if (r < 0) {
12✔
606
                bc_log_errorf("Failed to add proxy dependency: %s", strerror(-r));
×
607
                proxy_monitor_send_error(monitor, "Failed to add proxy dependency");
×
608
                return 0;
609
        }
610

611
        /* We now have a valid monitor, add it to the list and enable monitor.
612
           From this point we should not send errors. */
613
        controller_add_subscription(controller, monitor->subscription);
12✔
614
        LIST_APPEND(monitors, node->proxy_monitors, proxy_monitor_ref(monitor));
12✔
615

616
        /* TODO: What about !!node_is_online(target_node) ? Tell now or wait for it to connect? */
617

618
        return 0;
619
}
620

621
void node_remove_proxy_monitor(Node *node, ProxyMonitor *monitor) {
12✔
622
        Controller *controller = node->controller;
12✔
623

624
        proxy_monitor_close(monitor);
12✔
625

626
        controller_remove_subscription(controller, monitor->subscription);
12✔
627
        LIST_REMOVE(monitors, node->proxy_monitors, monitor);
12✔
628

629
        proxy_monitor_unref(monitor);
12✔
630
}
12✔
631

632
static int node_on_match_proxy_removed(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
11✔
633
        Node *node = userdata;
11✔
634
        const char *target_node_name = NULL;
11✔
635
        const char *unit_name = NULL;
11✔
636

637
        int r = sd_bus_message_read(m, "ss", &target_node_name, &unit_name);
11✔
638
        if (r < 0) {
11✔
639
                bc_log_errorf("Invalid arguments in ProxyRemoved signal: %s", strerror(-r));
×
640
                return r;
×
641
        }
642
        bc_log_infof("Node '%s' unregistered proxy for unit '%s' on node '%s'",
11✔
643
                     node->name,
644
                     unit_name,
645
                     target_node_name);
646

647
        ProxyMonitor *proxy_monitor = node_find_proxy_monitor(node, target_node_name, unit_name);
11✔
648
        if (proxy_monitor == NULL) {
11✔
649
                bc_log_error("Got ProxyRemoved for unknown monitor");
1✔
650
                return 0;
1✔
651
        }
652

653
        node_remove_proxy_monitor(node, proxy_monitor);
10✔
654
        return 0;
655
}
656

657
bool node_set_agent_bus(Node *node, sd_bus *bus) {
266✔
658
        int r = 0;
266✔
659

660
        if (node->agent_bus != NULL) {
266✔
661
                bc_log_error("Error: Trying to add two agents for a node");
×
662
                return false;
×
663
        }
664

665
        node->agent_bus = sd_bus_ref(bus);
266✔
666

667
        // If getting peer IP fails, only log and proceed as normal.
668
        _cleanup_free_ char *peer_ip = NULL;
266✔
669
        uint16_t peer_port = 0;
266✔
670
        r = get_peer_ip_address(node->agent_bus, &peer_ip, &peer_port);
266✔
671
        if (r < 0 && r != -EINVAL) {
266✔
UNCOV
672
                bc_log_errorf("Failed to get peer IP: %s", strerror(-r));
×
673
        } else {
674
                node->peer_ip = steal_pointer(&peer_ip);
266✔
675
        }
676

677
        if (node->name == NULL) {
266✔
678
                // We only connect to this on the unnamed nodes so register
679
                // can be called. We can't reconnect it during migration.
680
                r = sd_bus_add_object_vtable(
134✔
681
                                bus,
682
                                &node->internal_controller_slot,
683
                                INTERNAL_CONTROLLER_OBJECT_PATH,
684
                                INTERNAL_CONTROLLER_INTERFACE,
685
                                internal_controller_controller_vtable,
686
                                node);
687
                if (r < 0) {
134✔
688
                        node_unset_agent_bus(node);
×
689
                        bc_log_errorf("Failed to add peer bus vtable: %s", strerror(-r));
×
690
                        return false;
691
                }
692
        } else {
693
                // Only listen to signals on named nodes
694
                r = sd_bus_match_signal(
132✔
695
                                bus,
696
                                NULL,
697
                                NULL,
698
                                INTERNAL_AGENT_OBJECT_PATH,
699
                                INTERNAL_AGENT_INTERFACE,
700
                                "JobDone",
701
                                node_match_job_done,
702
                                node);
703
                if (r < 0) {
132✔
704
                        bc_log_errorf("Failed to add JobDone peer bus match: %s", strerror(-r));
×
705
                        return false;
706
                }
707

708
                r = sd_bus_match_signal(
132✔
709
                                bus,
710
                                NULL,
711
                                NULL,
712
                                INTERNAL_AGENT_OBJECT_PATH,
713
                                INTERNAL_AGENT_INTERFACE,
714
                                "JobStateChanged",
715
                                node_match_job_state_changed,
716
                                node);
717
                if (r < 0) {
132✔
718
                        bc_log_errorf("Failed to add JobStateChanged peer bus match: %s", strerror(-r));
×
719
                        return false;
720
                }
721

722
                r = sd_bus_match_signal(
132✔
723
                                bus,
724
                                NULL,
725
                                NULL,
726
                                INTERNAL_AGENT_OBJECT_PATH,
727
                                INTERNAL_AGENT_INTERFACE,
728
                                "UnitPropertiesChanged",
729
                                node_match_unit_properties_changed,
730
                                node);
731
                if (r < 0) {
132✔
732
                        bc_log_errorf("Failed to add UnitPropertiesChanged peer bus match: %s", strerror(-r));
×
733
                        return false;
734
                }
735

736
                r = sd_bus_match_signal(
132✔
737
                                bus,
738
                                NULL,
739
                                NULL,
740
                                INTERNAL_AGENT_OBJECT_PATH,
741
                                INTERNAL_AGENT_INTERFACE,
742
                                "UnitNew",
743
                                node_match_unit_new,
744
                                node);
745
                if (r < 0) {
132✔
746
                        bc_log_errorf("Failed to add UnitNew peer bus match: %s", strerror(-r));
×
747
                        return false;
748
                }
749

750
                r = sd_bus_match_signal(
132✔
751
                                bus,
752
                                NULL,
753
                                NULL,
754
                                INTERNAL_AGENT_OBJECT_PATH,
755
                                INTERNAL_AGENT_INTERFACE,
756
                                "UnitStateChanged",
757
                                node_match_unit_state_changed,
758
                                node);
759
                if (r < 0) {
132✔
760
                        bc_log_errorf("Failed to add UnitStateChanged peer bus match: %s", strerror(-r));
×
761
                        return false;
762
                }
763

764
                r = sd_bus_match_signal(
132✔
765
                                bus,
766
                                NULL,
767
                                NULL,
768
                                INTERNAL_AGENT_OBJECT_PATH,
769
                                INTERNAL_AGENT_INTERFACE,
770
                                "UnitRemoved",
771
                                node_match_unit_removed,
772
                                node);
773
                if (r < 0) {
132✔
774
                        bc_log_errorf("Failed to add UnitRemoved peer bus match: %s", strerror(-r));
×
775
                        return false;
776
                }
777

778
                r = sd_bus_match_signal(
132✔
779
                                bus,
780
                                NULL,
781
                                NULL,
782
                                INTERNAL_AGENT_OBJECT_PATH,
783
                                INTERNAL_AGENT_INTERFACE,
784
                                "ProxyNew",
785
                                node_on_match_proxy_new,
786
                                node);
787
                if (r < 0) {
132✔
788
                        bc_log_errorf("Failed to add ProxyNew peer bus match: %s", strerror(-r));
×
789
                        return false;
790
                }
791

792
                r = sd_bus_match_signal(
132✔
793
                                bus,
794
                                NULL,
795
                                NULL,
796
                                INTERNAL_AGENT_OBJECT_PATH,
797
                                INTERNAL_AGENT_INTERFACE,
798
                                "ProxyRemoved",
799
                                node_on_match_proxy_removed,
800
                                node);
801
                if (r < 0) {
132✔
802
                        bc_log_errorf("Failed to add ProxyNew peer bus match: %s", strerror(-r));
×
803
                        return false;
804
                }
805

806
                r = sd_bus_emit_properties_changed(
264✔
807
                                node->controller->api_bus, node->object_path, NODE_INTERFACE, "Status", NULL);
132✔
808
                if (r < 0) {
132✔
809
                        bc_log_errorf("Failed to emit status property changed: %s", strerror(-r));
×
810
                }
811

812
                r = sd_bus_match_signal(
132✔
813
                                bus,
814
                                NULL,
815
                                NULL,
816
                                INTERNAL_AGENT_OBJECT_PATH,
817
                                INTERNAL_AGENT_INTERFACE,
818
                                AGENT_HEARTBEAT_SIGNAL_NAME,
819
                                node_match_heartbeat,
820
                                node);
821
                if (r < 0) {
132✔
822
                        bc_log_errorf("Failed to add heartbeat signal match: %s", strerror(-r));
×
823
                        return false;
824
                }
825
        }
826

827
        r = sd_bus_match_signal_async(
266✔
828
                        bus,
829
                        &node->disconnect_slot,
830
                        "org.freedesktop.DBus.Local",
831
                        "/org/freedesktop/DBus/Local",
832
                        "org.freedesktop.DBus.Local",
833
                        "Disconnected",
834
                        node_disconnected,
835
                        NULL,
836
                        node);
837
        if (r < 0) {
266✔
838
                node_unset_agent_bus(node);
×
839
                bc_log_errorf("Failed to request match for Disconnected message: %s", strerror(-r));
×
840
                return false;
841
        }
842

843
        if (DEBUG_AGENT_MESSAGES) {
266✔
844
                sd_bus_add_filter(bus, NULL, debug_messages_handler, node);
845
        }
846

847

848
        /* Register any active subscriptions with new agent */
849
        node_send_agent_subscribe_all(node);
266✔
850

851
        /* Register any active dependencies with new agent */
852
        node_start_proxy_dependency_all(node);
266✔
853

854
        return true;
855
}
856

857
void node_unset_agent_bus(Node *node) {
668✔
858
        bool was_online = node->name && node_has_agent(node);
668✔
859

860
        sd_bus_slot_unrefp(&node->disconnect_slot);
668✔
861
        node->disconnect_slot = NULL;
668✔
862

863
        sd_bus_slot_unrefp(&node->internal_controller_slot);
668✔
864
        node->internal_controller_slot = NULL;
668✔
865

866
        sd_bus_slot_unrefp(&node->metrics_matching_slot);
668✔
867
        node->metrics_matching_slot = NULL;
668✔
868

869
        sd_bus_unrefp(&node->agent_bus);
668✔
870
        node->agent_bus = NULL;
668✔
871

872
        free_and_null(node->peer_ip);
668✔
873

874
        if (was_online) {
668✔
875
                int r = sd_bus_emit_properties_changed(
264✔
876
                                node->controller->api_bus, node->object_path, NODE_INTERFACE, "Status", NULL);
132✔
877
                if (r < 0) {
132✔
878
                        bc_log_errorf("Failed to emit status property changed: %s", strerror(-r));
×
879
                }
880
        }
881
}
668✔
882

883
/* org.eclipse.bluechi.internal.Controller.Register(in s name)) */
884
static int node_method_register(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
133✔
885
        Node *node = userdata;
133✔
886
        Controller *controller = node->controller;
133✔
887
        char *name = NULL;
133✔
888
        _cleanup_free_ char *description = NULL;
133✔
889

890
        /* Once we're not anonymous, don't allow register calls */
891
        if (node->name != NULL) {
133✔
892
                return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_ADDRESS_IN_USE, "Can't register twice");
×
893
        }
894

895
        /* Read the parameters */
896
        int r = sd_bus_message_read(m, "s", &name);
133✔
897
        if (r < 0) {
133✔
898
                bc_log_errorf("Failed to parse parameters: %s", strerror(-r));
×
899
                return r;
900
        }
901

902
        Node *named_node = controller_find_node(controller, name);
133✔
903
        if (named_node == NULL) {
133✔
904
                return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_SERVICE_UNKNOWN, "Unexpected node name");
1✔
905
        }
906

907
        if (node_has_agent(named_node)) {
132✔
908
                return sd_bus_reply_method_errorf(
×
909
                                m, SD_BUS_ERROR_ADDRESS_IN_USE, "The node is already connected");
910
        }
911

912
        named_node->last_seen = get_time_micros();
132✔
913
        named_node->last_seen_monotonic = get_time_micros_monotonic();
132✔
914

915
        r = asprintf(&description, "node-%s", name);
132✔
916
        if (r >= 0) {
132✔
917
                (void) sd_bus_set_description(node->agent_bus, description);
132✔
918
        }
919

920
        /* Migrate the agent connection to the named node */
921
        _cleanup_sd_bus_ sd_bus *agent_bus = sd_bus_ref(node->agent_bus);
265✔
922
        if (!node_set_agent_bus(named_node, agent_bus)) {
132✔
923
                return sd_bus_reply_method_errorf(
×
924
                                m, SD_BUS_ERROR_FAILED, "Internal error: Couldn't set agent bus");
925
        }
926

927
        if (controller->metrics_enabled) {
132✔
928
                node_enable_metrics(named_node);
×
929
        }
930

931
        node_unset_agent_bus(node);
132✔
932

933
        /* update number of online nodes and check the new system state */
934
        controller_check_system_status(controller, controller->number_of_nodes_online++);
132✔
935

936
        bc_log_infof("Registered managed node from fd %d as '%s'", sd_bus_get_fd(agent_bus), name);
132✔
937

938
        return sd_bus_reply_method_return(m, "");
132✔
939
}
940

941
static int node_disconnected(UNUSED sd_bus_message *message, void *userdata, UNUSED sd_bus_error *error) {
123✔
942
        Node *node = userdata;
123✔
943

944
        node_disconnect(node);
123✔
945

946
        return 0;
123✔
947
}
948

949
void node_disconnect(Node *node) {
124✔
950
        Controller *controller = node->controller;
124✔
951
        void *item = NULL;
124✔
952
        size_t i = 0;
124✔
953

954
        /* Send virtual unit remove and state change for any reported loaded units */
955
        while (hashmap_iter(node->unit_subscriptions, &i, &item)) {
125✔
956
                UnitSubscriptions *usubs = item;
1✔
957
                bool send_state_change = false;
1✔
958

959
                if (!usubs->loaded) {
1✔
960
                        continue;
×
961
                }
962

963
                if (usubs->active_state >= 0 && usubs->active_state != UNIT_INACTIVE) {
1✔
964
                        /* We previously reported an not-inactive valid state, send a virtual inactive state */
965
                        usubs->active_state = UNIT_INACTIVE;
1✔
966
                        free(usubs->substate);
1✔
967
                        usubs->substate = strdup("agent-offline");
1✔
968
                        send_state_change = true;
1✔
969
                }
970

971
                usubs->loaded = false;
1✔
972

973
                int r = 0;
1✔
974
                if (send_state_change) {
×
975
                        struct hashmap *unique_subs = node_compute_unique_monitor_subscriptions(
2✔
976
                                        node, usubs->unit);
1✔
977
                        if (unique_subs != NULL) {
1✔
978

979
                                Subscription **subp = NULL;
1✔
980
                                size_t s = 0;
1✔
981
                                while (hashmap_iter(unique_subs, &s, (void **) &subp)) {
3✔
982
                                        Subscription *sub = *subp;
1✔
983
                                        r = sub->handle_unit_state_changed(
3✔
984
                                                        sub->monitor,
985
                                                        node->name,
1✔
986
                                                        usubs->unit,
1✔
987
                                                        active_state_to_string(usubs->active_state),
988
                                                        usubs->substate,
1✔
989
                                                        "virtual");
990
                                        if (r < 0) {
1✔
991
                                                bc_log_error("Failed to emit UnitStateChanged signal");
×
992
                                        }
993
                                }
994
                                hashmap_free(unique_subs);
1✔
995
                        }
996
                }
997

998

999
                struct hashmap *unique_subs = node_compute_unique_monitor_subscriptions(node, usubs->unit);
1✔
1000
                if (unique_subs != NULL) {
1✔
1001

1002
                        Subscription **subp = NULL;
1✔
1003
                        size_t s = 0;
1✔
1004
                        while (hashmap_iter(unique_subs, &s, (void **) &subp)) {
3✔
1005
                                Subscription *sub = *subp;
1✔
1006
                                r = sub->handle_unit_removed(sub->monitor, node->name, usubs->unit, "virtual");
1✔
1007
                                if (r < 0) {
1✔
1008
                                        bc_log_error("Failed to emit UnitRemoved signal");
×
1009
                                }
1010
                        }
1011
                        hashmap_free(unique_subs);
1✔
1012
                }
1013
        }
1014

1015
        ProxyMonitor *proxy_monitor = NULL;
124✔
1016
        ProxyMonitor *next_proxy_monitor = NULL;
124✔
1017
        LIST_FOREACH_SAFE(monitors, proxy_monitor, next_proxy_monitor, node->proxy_monitors) {
126✔
1018
                node_remove_proxy_monitor(node, proxy_monitor);
2✔
1019
        }
1020

1021
        /* Remove anonymous nodes when they disconnect */
1022
        if (node->name == NULL) {
124✔
1023
                bc_log_info("Anonymous node disconnected");
2✔
1024
                controller_remove_node(controller, node);
2✔
1025
        } else {
1026
                bc_log_infof("Node '%s' disconnected", node->name);
122✔
1027
                /* Remove all jobs associated with the registered node that got disconnected. */
1028
                if (!LIST_IS_EMPTY(controller->jobs)) {
122✔
1029
                        Job *job = NULL;
1030
                        Job *next_job = NULL;
4✔
1031
                        LIST_FOREACH_SAFE(jobs, job, next_job, controller->jobs) {
4✔
1032
                                if (strcmp(job->node->name, node->name) == 0) {
2✔
1033
                                        bc_log_debugf("Removing job %d from node %s", job->id, job->node->name);
2✔
1034
                                        LIST_REMOVE(jobs, controller->jobs, job);
2✔
1035
                                        job_unref(job);
2✔
1036
                                }
1037
                        }
1038
                }
1039
                node_unset_agent_bus(node);
122✔
1040

1041
                /* update number of online nodes and check the new system state */
1042
                controller_check_system_status(controller, controller->number_of_nodes_online--);
122✔
1043
        }
1044
}
124✔
1045

1046
const char *node_get_status(Node *node) {
309✔
1047
        if (node_has_agent(node)) {
309✔
1048
                return "online";
164✔
1049
        }
1050
        return "offline";
1051
}
1052

1053
static int node_property_get_status(
283✔
1054
                UNUSED sd_bus *bus,
1055
                UNUSED const char *path,
1056
                UNUSED const char *interface,
1057
                UNUSED const char *property,
1058
                sd_bus_message *reply,
1059
                void *userdata,
1060
                UNUSED sd_bus_error *ret_error) {
1061
        Node *node = userdata;
283✔
1062
        return sd_bus_message_append(reply, "s", node_get_status(node));
283✔
1063
}
1064

1065
static int node_property_get_peer_ip(
3✔
1066
                UNUSED sd_bus *bus,
1067
                UNUSED const char *path,
1068
                UNUSED const char *interface,
1069
                UNUSED const char *property,
1070
                sd_bus_message *reply,
1071
                void *userdata,
1072
                UNUSED sd_bus_error *ret_error) {
1073
        Node *node = userdata;
3✔
1074
        return sd_bus_message_append(reply, "s", node->peer_ip);
3✔
1075
}
1076

1077
AgentRequest *agent_request_ref(AgentRequest *req) {
115✔
1078
        req->ref_count++;
115✔
1079
        return req;
115✔
1080
}
1081

1082
void agent_request_unref(AgentRequest *req) {
230✔
1083
        req->ref_count--;
230✔
1084
        if (req->ref_count != 0) {
230✔
1085
                return;
1086
        }
1087

1088
        if (req->userdata && req->free_userdata) {
115✔
1089
                req->free_userdata(req->userdata);
111✔
1090
        }
1091
        sd_bus_slot_unrefp(&req->slot);
115✔
1092
        sd_bus_message_unrefp(&req->message);
115✔
1093

1094
        Node *node = req->node;
115✔
1095
        LIST_REMOVE(outstanding_requests, node->outstanding_requests, req);
115✔
1096
        node_unref(req->node);
115✔
1097
        free(req);
115✔
1098
}
1099

1100
int node_create_request(
115✔
1101
                AgentRequest **ret,
1102
                Node *node,
1103
                const char *method,
1104
                agent_request_response_t cb,
1105
                void *userdata,
1106
                free_func_t free_userdata) {
1107
        AgentRequest *req = malloc0(sizeof(AgentRequest));
115✔
1108
        if (req == NULL) {
115✔
1109
                return -ENOMEM;
1110
        }
1111

1112
        int r = sd_bus_message_new_method_call(
115✔
1113
                        node->agent_bus,
1114
                        &req->message,
1115
                        BC_AGENT_DBUS_NAME,
1116
                        INTERNAL_AGENT_OBJECT_PATH,
1117
                        INTERNAL_AGENT_INTERFACE,
1118
                        method);
1119
        if (r < 0) {
115✔
1120
                free(req);
×
1121
                req = NULL;
×
1122
                return r;
×
1123
        }
1124

1125
        req->ref_count = 1;
115✔
1126
        req->node = node_ref(node);
115✔
1127
        LIST_INIT(outstanding_requests, req);
115✔
1128
        req->cb = cb;
115✔
1129
        req->userdata = userdata;
115✔
1130
        req->free_userdata = free_userdata;
115✔
1131
        req->is_cancelled = false;
115✔
1132
        LIST_APPEND(outstanding_requests, node->outstanding_requests, req);
115✔
1133

1134
        *ret = req;
115✔
1135
        return 0;
115✔
1136
}
1137

1138
static int agent_request_callback(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
115✔
1139
        _cleanup_agent_request_ AgentRequest *req = userdata;
115✔
1140
        if (req->is_cancelled) {
115✔
1141
                bc_log_debugf("Response received to a cancelled request for node %s. Dropping message.",
×
1142
                              req->node->name);
1143
                return 0;
1144
        }
1145

1146
        return req->cb(req, m, ret_error);
115✔
1147
}
1148

1149
int agent_request_cancel(AgentRequest *r) {
×
1150
        _cleanup_agent_request_ AgentRequest *req = r;
×
1151
        req->is_cancelled = true;
×
1152
        _cleanup_sd_bus_message_ sd_bus_message *m = NULL;
×
1153
        sd_bus_message_new_method_errorf(req->message, &m, SD_BUS_ERROR_FAILED, "Request cancelled");
×
1154

1155
        return req->cb(req, m, NULL);
×
1156
}
1157

1158
int agent_request_start(AgentRequest *req) {
115✔
1159
        Node *node = req->node;
115✔
1160

1161
        int r = sd_bus_call_async(
115✔
1162
                        node->agent_bus,
1163
                        &req->slot,
1164
                        req->message,
1165
                        agent_request_callback,
1166
                        req,
1167
                        BC_DEFAULT_DBUS_TIMEOUT);
1168
        if (r < 0) {
115✔
1169
                return r;
1170
        }
1171

1172
        agent_request_ref(req); /* Keep alive while operation is outstanding */
115✔
1173
        return 1;
115✔
1174
}
1175

1176
AgentRequest *node_request_list_units(
4✔
1177
                Node *node, agent_request_response_t cb, void *userdata, free_func_t free_userdata) {
1178
        if (!node_has_agent(node)) {
4✔
1179
                return NULL;
4✔
1180
        }
1181

1182
        _cleanup_agent_request_ AgentRequest *req = NULL;
4✔
1183
        node_create_request(&req, node, "ListUnits", cb, userdata, free_userdata);
4✔
1184
        if (req == NULL) {
4✔
1185
                return NULL;
1186
        }
1187

1188
        if (agent_request_start(req) < 0) {
4✔
1189
                return NULL;
1190
        }
1191

1192
        return steal_pointer(&req);
4✔
1193
}
1194

1195
AgentRequest *node_request_list_unit_files(
4✔
1196
                Node *node, agent_request_response_t cb, void *userdata, free_func_t free_userdata) {
1197
        if (!node_has_agent(node)) {
4✔
1198
                return NULL;
4✔
1199
        }
1200

1201
        _cleanup_agent_request_ AgentRequest *req = NULL;
4✔
1202
        node_create_request(&req, node, "ListUnitFiles", cb, userdata, free_userdata);
4✔
1203
        if (req == NULL) {
4✔
1204
                return NULL;
1205
        }
1206

1207
        if (agent_request_start(req) < 0) {
4✔
1208
                return NULL;
1209
        }
1210

1211
        return steal_pointer(&req);
4✔
1212
}
1213

1214
/*************************************************************************
1215
 ********** org.eclipse.bluechi.Node.ListUnits **************************
1216
 ************************************************************************/
1217

1218
static int method_list_units_callback(AgentRequest *req, sd_bus_message *m, UNUSED sd_bus_error *ret_error) {
2✔
1219
        sd_bus_message *request_message = req->userdata;
2✔
1220

1221
        if (sd_bus_message_is_method_error(m, NULL)) {
2✔
1222
                /* Forward error */
1223
                return sd_bus_reply_method_error(request_message, sd_bus_message_get_error(m));
×
1224
        }
1225

1226
        _cleanup_sd_bus_message_ sd_bus_message *reply = NULL;
2✔
1227
        int r = sd_bus_message_new_method_return(request_message, &reply);
2✔
1228
        if (r < 0) {
2✔
1229
                return sd_bus_reply_method_errorf(
×
1230
                                request_message,
1231
                                SD_BUS_ERROR_FAILED,
1232
                                "Failed to create a reply message for ListUnits request: %s",
1233
                                strerror(-r));
1234
        }
1235

1236
        r = sd_bus_message_copy(reply, m, true);
2✔
1237
        if (r < 0) {
2✔
1238
                return sd_bus_reply_method_errorf(
×
1239
                                request_message,
1240
                                SD_BUS_ERROR_FAILED,
1241
                                "Failed to copy the bus message for ListUnits request: %s",
1242
                                strerror(-r));
1243
        }
1244

1245
        return sd_bus_message_send(reply);
2✔
1246
}
1247

1248

1249
static int node_method_list_units(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
2✔
1250
        Node *node = userdata;
2✔
1251

1252
        if (node->is_shutdown) {
2✔
1253
                return sd_bus_reply_method_errorf(
×
1254
                                m, SD_BUS_ERROR_FAILED, "Request not allowed: node is in shutdown state");
1255
        }
1256

1257
        _cleanup_agent_request_ AgentRequest *agent_req = node_request_list_units(
2✔
1258
                        node,
1259
                        method_list_units_callback,
1260
                        sd_bus_message_ref(m),
2✔
1261
                        (free_func_t) sd_bus_message_unref);
1262
        if (agent_req == NULL) {
2✔
1263
                return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "List units not found");
×
1264
        }
1265

1266
        return 1;
1267
}
1268

1269
/*************************************************************************
1270
 ********** org.eclipse.bluechi.Node.ListUnitFiles ***********************
1271
 ************************************************************************/
1272

1273
static int method_list_unit_files_callback(AgentRequest *req, sd_bus_message *m, UNUSED sd_bus_error *ret_error) {
2✔
1274
        sd_bus_message *request_message = req->userdata;
2✔
1275

1276
        if (sd_bus_message_is_method_error(m, NULL)) {
2✔
1277
                /* Forward error */
1278
                return sd_bus_reply_method_error(request_message, sd_bus_message_get_error(m));
×
1279
        }
1280

1281
        _cleanup_sd_bus_message_ sd_bus_message *reply = NULL;
2✔
1282
        int r = sd_bus_message_new_method_return(request_message, &reply);
2✔
1283
        if (r < 0) {
2✔
1284
                return sd_bus_reply_method_errorf(
×
1285
                                request_message,
1286
                                SD_BUS_ERROR_FAILED,
1287
                                "Failed to create a reply message for ListUnitFiles request: %s",
1288
                                strerror(-r));
1289
        }
1290

1291
        r = sd_bus_message_copy(reply, m, true);
2✔
1292
        if (r < 0) {
2✔
1293
                return sd_bus_reply_method_errorf(
×
1294
                                request_message,
1295
                                SD_BUS_ERROR_FAILED,
1296
                                "Failed to copy the bus message for ListUnitFiles request: %s",
1297
                                strerror(-r));
1298
        }
1299

1300
        return sd_bus_message_send(reply);
2✔
1301
}
1302

1303
static int node_method_list_unit_files(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
2✔
1304
        Node *node = userdata;
2✔
1305

1306
        if (node->is_shutdown) {
2✔
1307
                return sd_bus_reply_method_errorf(
×
1308
                                m, SD_BUS_ERROR_FAILED, "Request not allowed: node is in shutdown state");
1309
        }
1310

1311
        _cleanup_agent_request_ AgentRequest *agent_req = node_request_list_unit_files(
2✔
1312
                        node,
1313
                        method_list_unit_files_callback,
1314
                        sd_bus_message_ref(m),
2✔
1315
                        (free_func_t) sd_bus_message_unref);
1316
        if (agent_req == NULL) {
2✔
1317
                return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "List unit files not found");
×
1318
        }
1319
        return 1;
1320
}
1321

1322
/*************************************************************************
1323
 ********** org.eclipse.bluechi.Node.SetUnitProperty ******************
1324
 ************************************************************************/
1325

1326
static int node_method_set_unit_properties_callback(
1✔
1327
                AgentRequest *req, sd_bus_message *m, UNUSED sd_bus_error *ret_error) {
1328
        sd_bus_message *request_message = req->userdata;
1✔
1329

1330
        if (sd_bus_message_is_method_error(m, NULL)) {
1✔
1331
                /* Forward error */
1332
                return sd_bus_reply_method_error(request_message, sd_bus_message_get_error(m));
×
1333
        }
1334

1335
        _cleanup_sd_bus_message_ sd_bus_message *reply = NULL;
1✔
1336
        int r = sd_bus_message_new_method_return(request_message, &reply);
1✔
1337
        if (r < 0) {
1✔
1338
                return sd_bus_reply_method_errorf(
×
1339
                                request_message,
1340
                                SD_BUS_ERROR_FAILED,
1341
                                "Failed to create a reply message: %s",
1342
                                strerror(-r));
1343
        }
1344

1345
        return sd_bus_message_send(reply);
1✔
1346
}
1347

1348
static int node_method_set_unit_properties(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
1✔
1349
        Node *node = userdata;
1✔
1350
        const char *unit = NULL;
1✔
1351
        int runtime = 0;
1✔
1352

1353
        if (node->is_shutdown) {
1✔
1354
                return sd_bus_reply_method_errorf(
×
1355
                                m, SD_BUS_ERROR_FAILED, "Request not allowed: node is in shutdown state");
1356
        }
1357

1358
        int r = sd_bus_message_read(m, "sb", &unit, &runtime);
1✔
1359
        if (r < 0) {
1✔
1360
                return sd_bus_reply_method_errorf(
×
1361
                                m,
1362
                                SD_BUS_ERROR_INVALID_ARGS,
1363
                                "Invalid argument for unit or runtime: %s",
1364
                                strerror(-r));
1365
        }
1366

1367
        _cleanup_agent_request_ AgentRequest *req = NULL;
1✔
1368
        r = node_create_request(
1✔
1369
                        &req,
1370
                        node,
1371
                        "SetUnitProperties",
1372
                        node_method_set_unit_properties_callback,
1373
                        sd_bus_message_ref(m),
1✔
1374
                        (free_func_t) sd_bus_message_unref);
1375
        if (req == NULL) {
1✔
1376
                sd_bus_message_unref(m);
×
1377

1378
                return sd_bus_reply_method_errorf(
×
1379
                                m, SD_BUS_ERROR_FAILED, "Failed to create an agent request: %s", strerror(-r));
1380
        }
1381

1382
        r = sd_bus_message_append(req->message, "sb", unit, runtime);
1✔
1383
        if (r < 0) {
1✔
1384
                return sd_bus_reply_method_errorf(
×
1385
                                m,
1386
                                SD_BUS_ERROR_FAILED,
1387
                                "Failed to append unit and runtime to the message: %s",
1388
                                strerror(-r));
1389
        }
1390

1391
        r = sd_bus_message_copy(req->message, m, false);
1✔
1392
        if (r < 0) {
1✔
1393
                return sd_bus_reply_method_errorf(
×
1394
                                m, SD_BUS_ERROR_FAILED, "Failed to copy a message: %s", strerror(-r));
1395
        }
1396

1397
        r = agent_request_start(req);
1✔
1398
        if (r < 0) {
1✔
1399
                return sd_bus_reply_method_errorf(
×
1400
                                m,
1401
                                SD_BUS_ERROR_FAILED,
1402
                                "Failed to call the method to start the node: %s",
1403
                                strerror(-r));
1404
        }
1405

1406
        return 1;
1407
}
1408

1409
static int node_method_passthrough_to_agent_callback(
38✔
1410
                AgentRequest *req, sd_bus_message *m, UNUSED sd_bus_error *ret_error) {
1411
        sd_bus_message *request_message = req->userdata;
38✔
1412

1413
        if (sd_bus_message_is_method_error(m, NULL)) {
38✔
1414
                return sd_bus_reply_method_error(request_message, sd_bus_message_get_error(m));
2✔
1415
        }
1416

1417
        _cleanup_sd_bus_message_ sd_bus_message *reply = NULL;
38✔
1418
        int r = sd_bus_message_new_method_return(request_message, &reply);
36✔
1419
        if (r < 0) {
36✔
1420
                return sd_bus_reply_method_errorf(
×
1421
                                request_message,
1422
                                SD_BUS_ERROR_FAILED,
1423
                                "Failed to create a reply message: %s",
1424
                                strerror(-r));
1425
        }
1426

1427
        r = sd_bus_message_copy(reply, m, true);
36✔
1428
        if (r < 0) {
36✔
1429
                return sd_bus_reply_method_errorf(
×
1430
                                request_message,
1431
                                SD_BUS_ERROR_FAILED,
1432
                                "Failed to copy a reply message: %s",
1433
                                strerror(-r));
1434
        }
1435

1436
        return sd_bus_message_send(reply);
36✔
1437
}
1438

1439
static int node_method_passthrough_to_agent(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
38✔
1440
        Node *node = userdata;
38✔
1441

1442
        if (node->is_shutdown) {
38✔
1443
                return sd_bus_reply_method_errorf(
×
1444
                                m, SD_BUS_ERROR_FAILED, "Request not allowed: node is in shutdown state");
1445
        }
1446

1447
        _cleanup_agent_request_ AgentRequest *req = NULL;
38✔
1448
        int r = node_create_request(
38✔
1449
                        &req,
1450
                        node,
1451
                        sd_bus_message_get_member(m),
1452
                        node_method_passthrough_to_agent_callback,
1453
                        sd_bus_message_ref(m),
38✔
1454
                        (free_func_t) sd_bus_message_unref);
1455
        if (req == NULL) {
38✔
1456
                sd_bus_message_unref(m);
×
1457

1458
                return sd_bus_reply_method_errorf(
×
1459
                                m, SD_BUS_ERROR_FAILED, "Failed to create an agent request: %s", strerror(-r));
1460
        }
1461

1462
        r = sd_bus_message_copy(req->message, m, true);
38✔
1463
        if (r < 0) {
38✔
1464
                return sd_bus_reply_method_errorf(
×
1465
                                m, SD_BUS_ERROR_FAILED, "Failed to copy a reply message: %s", strerror(-r));
1466
        }
1467

1468
        r = agent_request_start(req);
38✔
1469
        if (r < 0) {
38✔
1470
                return sd_bus_reply_method_errorf(
×
1471
                                m,
1472
                                SD_BUS_ERROR_FAILED,
1473
                                "Failed to call the method to start the node: %s",
1474
                                strerror(-r));
1475
        }
1476

1477
        return 1;
1478
}
1479

1480
/* Keep track of data related to setting up a job. For example calling
1481
   the initial agent request before we know the job is actually going to
1482
   happen. */
1483
typedef struct {
1484
        int ref_count;
1485
        sd_bus_message *request_message;
1486
        Job *job;
1487
} JobSetup;
1488

1489
static JobSetup *job_setup_ref(JobSetup *setup) {
67✔
1490
        setup->ref_count++;
67✔
1491
        return setup;
67✔
1492
}
1493

1494
static void job_setup_unref(JobSetup *setup) {
134✔
1495
        setup->ref_count--;
134✔
1496
        if (setup->ref_count != 0) {
134✔
1497
                return;
1498
        }
1499

1500
        job_unrefp(&setup->job);
67✔
1501
        sd_bus_message_unrefp(&setup->request_message);
67✔
1502
        free(setup);
67✔
1503
}
1504

1505
DEFINE_CLEANUP_FUNC(JobSetup, job_setup_unref)
67✔
1506
#define _cleanup_job_setup_ _cleanup_(job_setup_unrefp)
1507

1508
static JobSetup *job_setup_new(sd_bus_message *request_message, Node *node, const char *unit, const char *type) {
67✔
1509
        _cleanup_job_setup_ JobSetup *setup = malloc0(sizeof(JobSetup));
67✔
1510
        if (setup == NULL) {
67✔
1511
                return NULL;
1512
        }
1513

1514
        setup->ref_count = 1;
67✔
1515
        setup->request_message = sd_bus_message_ref(request_message);
67✔
1516
        setup->job = job_new(node, unit, type);
67✔
1517
        if (setup->job == NULL) {
67✔
1518
                NULL;
67✔
1519
        }
1520

1521
        return steal_pointer(&setup);
67✔
1522
}
1523

1524
static int unit_lifecycle_method_callback(AgentRequest *req, sd_bus_message *m, UNUSED sd_bus_error *ret_error) {
67✔
1525
        Node *node = req->node;
67✔
1526
        Controller *controller = node->controller;
67✔
1527
        JobSetup *setup = req->userdata;
67✔
1528

1529
        if (sd_bus_message_is_method_error(m, NULL)) {
67✔
1530
                /* Forward error */
1531
                return sd_bus_reply_method_error(setup->request_message, sd_bus_message_get_error(m));
1✔
1532
        }
1533

1534
        if (!controller_add_job(controller, setup->job)) {
66✔
1535
                return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_FAILED, "Failed to add a job");
×
1536
        }
1537

1538
        return sd_bus_reply_method_return(setup->request_message, "o", setup->job->object_path);
66✔
1539
}
1540

1541
static int node_run_unit_lifecycle_method(
67✔
1542
                sd_bus_message *m, Node *node, const char *job_type, const char *method) {
1543
        const char *unit = NULL;
67✔
1544
        const char *mode = NULL;
67✔
1545
        uint64_t start_time = get_time_micros();
67✔
1546

1547
        if (node->is_shutdown) {
67✔
1548
                return sd_bus_reply_method_errorf(
×
1549
                                m, SD_BUS_ERROR_FAILED, "Request not allowed: node is in shutdown state");
1550
        }
1551

1552
        int r = sd_bus_message_read(m, "ss", &unit, &mode);
67✔
1553
        if (r < 0) {
67✔
1554
                return sd_bus_reply_method_errorf(
×
1555
                                m,
1556
                                SD_BUS_ERROR_INVALID_ARGS,
1557
                                "Invalid argument for unit or mode: %s",
1558
                                strerror(-r));
1559
        }
1560

1561
        _cleanup_job_setup_ JobSetup *setup = job_setup_new(m, node, unit, job_type);
134✔
1562
        if (setup == NULL) {
67✔
1563
                return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_NO_MEMORY, "Out of memory");
×
1564
        }
1565

1566
        if (node->controller->metrics_enabled) {
67✔
1567
                setup->job->job_start_micros = start_time;
5✔
1568
        }
1569

1570
        _cleanup_agent_request_ AgentRequest *req = NULL;
67✔
1571
        r = node_create_request(
67✔
1572
                        &req,
1573
                        node,
1574
                        method,
1575
                        unit_lifecycle_method_callback,
1576
                        job_setup_ref(setup),
67✔
1577
                        (free_func_t) job_setup_unref);
1578
        if (req == NULL) {
67✔
1579
                job_setup_unref(setup);
×
1580

1581
                return sd_bus_reply_method_errorf(
×
1582
                                m, SD_BUS_ERROR_FAILED, "Failed to create an agent request: %s", strerror(-r));
1583
        }
1584

1585
        r = sd_bus_message_append(req->message, "ssu", unit, mode, setup->job->id);
67✔
1586
        if (r < 0) {
67✔
1587
                return sd_bus_reply_method_errorf(
×
1588
                                m,
1589
                                SD_BUS_ERROR_FAILED,
1590
                                "Failed to append unit, mode, and job ID to the message: %s",
1591
                                strerror(-r));
1592
        }
1593

1594
        r = agent_request_start(req);
67✔
1595
        if (r < 0) {
67✔
1596
                return sd_bus_reply_method_errorf(
×
1597
                                m,
1598
                                SD_BUS_ERROR_FAILED,
1599
                                "Failed to call the method to start the node: %s",
1600
                                strerror(-r));
1601
        }
1602

1603
        return 1;
1604
}
1605

1606

1607
/*************************************************************************
1608
 ********** org.eclipse.bluechi.Node.StartUnit **************************
1609
 ************************************************************************/
1610

1611
static int node_method_start_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
48✔
1612
        return node_run_unit_lifecycle_method(m, (Node *) userdata, "start", "StartUnit");
48✔
1613
}
1614

1615
/*************************************************************************
1616
 ********** org.eclipse.bluechi.Node.StopUnit ***************************
1617
 ************************************************************************/
1618

1619

1620
static int node_method_stop_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
16✔
1621
        return node_run_unit_lifecycle_method(m, (Node *) userdata, "stop", "StopUnit");
16✔
1622
}
1623

1624
/*************************************************************************
1625
 ********** org.eclipse.bluechi.Node.RestartUnit ************************
1626
 ************************************************************************/
1627

1628
static int node_method_restart_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
2✔
1629
        return node_run_unit_lifecycle_method(m, (Node *) userdata, "restart", "RestartUnit");
2✔
1630
}
1631

1632
/*************************************************************************
1633
 ********** org.eclipse.bluechi.Node.ReloadUnit **************************
1634
 ************************************************************************/
1635

1636
static int node_method_reload_unit(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) {
1✔
1637
        return node_run_unit_lifecycle_method(m, (Node *) userdata, "reload", "ReloadUnit");
1✔
1638
}
1639

1640
/*************************************************************************
1641
 ********** org.eclipse.bluechi.Node.SetLogLevel *******************
1642
 ************************************************************************/
1643

1644
static int node_method_set_log_level(sd_bus_message *m, UNUSED void *userdata, UNUSED sd_bus_error *ret_error) {
3✔
1645
        const char *level = NULL;
3✔
1646
        Node *node = (Node *) userdata;
3✔
1647
        sd_bus_error error = SD_BUS_ERROR_NULL;
3✔
1648
        _cleanup_sd_bus_message_ sd_bus_message *sub_m = NULL;
3✔
1649

1650
        int r = sd_bus_message_read(m, "s", &level);
3✔
1651
        if (r < 0) {
3✔
1652
                return sd_bus_reply_method_errorf(
×
1653
                                m,
1654
                                SD_BUS_ERROR_INVALID_ARGS,
1655
                                "Invalid argument for the log-level: %s",
1656
                                strerror(-r));
1657
        }
1658
        LogLevel loglevel = string_to_log_level(level);
3✔
1659
        if (loglevel == LOG_LEVEL_INVALID) {
3✔
1660
                r = sd_bus_reply_method_return(m, "");
1✔
1661
                if (r < 0) {
1✔
1662
                        return sd_bus_reply_method_errorf(
×
1663
                                        m,
1664
                                        SD_BUS_ERROR_INVALID_ARGS,
1665
                                        "Invalid argument for the log level invalid");
1666
                }
1667
        }
1668
        r = sd_bus_call_method(
3✔
1669
                        node->agent_bus,
1670
                        BC_AGENT_DBUS_NAME,
1671
                        INTERNAL_AGENT_OBJECT_PATH,
1672
                        INTERNAL_AGENT_INTERFACE,
1673
                        "SetLogLevel",
1674
                        &error,
1675
                        &sub_m,
1676
                        "s",
1677
                        level);
1678
        if (r < 0) {
3✔
1679
                bc_log_errorf("Failed to set log level call: %s", error.message);
1✔
1680
                sd_bus_error_free(&error);
1✔
1681
                return sd_bus_reply_method_errorf(
1✔
1682
                                m,
1683
                                SD_BUS_ERROR_FAILED,
1684
                                "Failed to call a method to set the log level: %s",
1685
                                strerror(-r));
1686
        }
1687
        return sd_bus_reply_method_return(m, "");
2✔
1688
}
1689

1690
static int send_agent_simple_message(Node *node, const char *method, const char *arg) {
84✔
1691
        _cleanup_sd_bus_message_ sd_bus_message *m = NULL;
84✔
1692
        int r = sd_bus_message_new_method_call(
84✔
1693
                        node->agent_bus,
1694
                        &m,
1695
                        BC_AGENT_DBUS_NAME,
1696
                        INTERNAL_AGENT_OBJECT_PATH,
1697
                        INTERNAL_AGENT_INTERFACE,
1698
                        method);
1699
        if (r < 0) {
84✔
1700
                return r;
1701
        }
1702

1703
        if (arg != NULL) {
84✔
1704
                r = sd_bus_message_append(m, "s", arg);
80✔
1705
                if (r < 0) {
80✔
1706
                        return r;
1707
                }
1708
        }
1709

1710
        return sd_bus_send(node->agent_bus, m, NULL);
84✔
1711
}
1712

1713
static void node_send_agent_subscribe(Node *node, const char *unit) {
29✔
1714
        if (!node_has_agent(node)) {
29✔
1715
                return;
1716
        }
1717

1718
        int r = send_agent_simple_message(node, "Subscribe", unit);
29✔
1719
        if (r < 0) {
29✔
1720
                bc_log_error("Failed to subscribe w/ agent");
×
1721
        }
1722
}
1723

1724

1725
static void node_send_agent_unsubscribe(Node *node, const char *unit) {
28✔
1726
        if (!node_has_agent(node)) {
28✔
1727
                return;
1728
        }
1729

1730
        int r = send_agent_simple_message(node, "Unsubscribe", unit);
28✔
1731
        if (r < 0) {
28✔
1732
                bc_log_error("Failed to unsubscribe w/ agent");
×
1733
        }
1734
}
1735

1736
/* Resubscribe to all subscriptions */
1737
static void node_send_agent_subscribe_all(Node *node) {
266✔
1738
        void *item = NULL;
266✔
1739
        size_t i = 0;
266✔
1740

1741
        while (hashmap_iter(node->unit_subscriptions, &i, &item)) {
267✔
1742
                UnitSubscriptions *usubs = item;
1✔
1743
                node_send_agent_subscribe(node, usubs->unit);
1✔
1744
        }
1745
}
266✔
1746

1747
void node_subscribe(Node *node, Subscription *sub) {
28✔
1748
        SubscribedUnit *sub_unit = NULL;
28✔
1749
        SubscribedUnit *next_sub_unit = NULL;
28✔
1750
        LIST_FOREACH_SAFE(units, sub_unit, next_sub_unit, sub->subscribed_units) {
57✔
1751
                const UnitSubscriptionsKey key = { sub_unit->name };
29✔
1752
                UnitSubscriptions *usubs = NULL;
29✔
1753

1754
                _cleanup_free_ UnitSubscription *usub = malloc0(sizeof(UnitSubscription));
29✔
1755
                if (usub == NULL) {
29✔
1756
                        bc_log_error("Failed to subscribe to unit, OOM");
×
1757
                        return;
1758
                }
1759
                usub->sub = sub;
29✔
1760

1761
                usubs = (UnitSubscriptions *) hashmap_get(node->unit_subscriptions, &key);
29✔
1762
                if (usubs == NULL) {
29✔
1763
                        UnitSubscriptions v = { NULL, NULL, false, _UNIT_ACTIVE_STATE_INVALID, NULL };
28✔
1764
                        v.unit = strdup(key.unit);
28✔
1765
                        if (v.unit == NULL) {
28✔
1766
                                bc_log_error("Failed to subscribe to unit, OOM");
×
1767
                                return;
×
1768
                        }
1769

1770
                        usubs = (UnitSubscriptions *) hashmap_set(node->unit_subscriptions, &v);
28✔
1771
                        if (usubs == NULL && hashmap_oom(node->unit_subscriptions)) {
28✔
1772
                                free(v.unit);
×
1773
                                bc_log_error("Failed to subscribe to unit, OOM");
×
1774
                                return;
1775
                        }
1776

1777
                        /* First sub to this unit, pass to agent */
1778
                        node_send_agent_subscribe(node, sub_unit->name);
28✔
1779

1780
                        usubs = (UnitSubscriptions *) hashmap_get(node->unit_subscriptions, &key);
28✔
1781
                }
1782

1783
                LIST_APPEND(subs, usubs->subs, steal_pointer(&usub));
29✔
1784

1785
                /* We know this is loaded, so we won't get notified from
1786
                   the agent, instead send a virtual event here. */
1787
                if (usubs->loaded) {
29✔
1788
                        int r = sub->handle_unit_new(sub->monitor, node->name, sub_unit->name, "virtual");
1✔
1789
                        if (r < 0) {
1✔
1790
                                bc_log_error("Failed to emit UnitNew signal");
×
1791
                        }
1792

1793
                        if (usubs->active_state >= 0) {
1✔
1794
                                r = sub->handle_unit_state_changed(
1✔
1795
                                                sub->monitor,
1796
                                                node->name,
1✔
1797
                                                sub_unit->name,
1✔
1798
                                                active_state_to_string(usubs->active_state),
1799
                                                usubs->substate ? usubs->substate : "invalid",
1✔
1800
                                                "virtual");
1801
                                if (r < 0) {
1✔
1802
                                        bc_log_error("Failed to emit UnitNew signal");
×
1803
                                }
1804
                        }
1805
                }
1806
        }
1807
}
1808

1809
void node_unsubscribe(Node *node, Subscription *sub) {
28✔
1810
        SubscribedUnit *sub_unit = NULL;
28✔
1811
        SubscribedUnit *next_sub_unit = NULL;
28✔
1812
        LIST_FOREACH_SAFE(units, sub_unit, next_sub_unit, sub->subscribed_units) {
57✔
1813
                UnitSubscriptionsKey key = { sub_unit->name };
29✔
1814
                UnitSubscriptions *usubs = NULL;
29✔
1815
                UnitSubscription *usub = NULL;
29✔
1816
                UnitSubscription *found = NULL;
29✔
1817
                UnitSubscriptions *deleted = NULL;
29✔
1818

1819
                /* NOTE: If there are errors during subscribe we may still
1820
                   call unsubscribe, so this must silently handle the
1821
                   case of too many unsubscribes. */
1822

1823
                usubs = (UnitSubscriptions *) hashmap_get(node->unit_subscriptions, &key);
29✔
1824
                if (usubs == NULL) {
29✔
1825
                        continue;
×
1826
                }
1827

1828
                LIST_FOREACH(subs, usub, usubs->subs) {
29✔
1829
                        if (usub->sub == sub) {
29✔
1830
                                found = usub;
1831
                                break;
1832
                        }
1833
                }
1834

1835
                if (found == NULL) {
29✔
1836
                        continue;
×
1837
                }
1838

1839
                LIST_REMOVE(subs, usubs->subs, found);
29✔
1840
                free_and_null(found);
29✔
1841

1842
                if (LIST_IS_EMPTY(usubs->subs)) {
29✔
1843
                        /* Last subscription for this unit, tell agent */
1844
                        node_send_agent_unsubscribe(node, sub_unit->name);
28✔
1845
                        deleted = (UnitSubscriptions *) hashmap_delete(node->unit_subscriptions, &key);
28✔
1846
                        if (deleted) {
28✔
1847
                                unit_subscriptions_clear(deleted);
28✔
1848
                        }
1849
                }
1850
        }
1851
}
28✔
1852

1853
static void node_start_proxy_dependency(Node *node, ProxyDependency *dep) {
12✔
1854
        if (!node_has_agent(node)) {
12✔
1855
                return;
1856
        }
1857

1858
        bc_log_infof("Starting dependency %s on node %s", dep->unit_name, node->name);
12✔
1859

1860
        int r = send_agent_simple_message(node, "StartDep", dep->unit_name);
12✔
1861
        if (r < 0) {
12✔
1862
                bc_log_error("Failed to send StartDep to agent");
×
1863
        }
1864
}
1865

1866
static void node_start_proxy_dependency_all(Node *node) {
266✔
1867
        ProxyDependency *dep = NULL;
266✔
1868
        LIST_FOREACH(deps, dep, node->proxy_dependencies) {
266✔
1869
                node_start_proxy_dependency(node, dep);
×
1870
        }
1871
}
266✔
1872

1873
static void node_stop_proxy_dependency(Node *node, ProxyDependency *dep) {
11✔
1874
        if (!node_has_agent(node)) {
11✔
1875
                return;
1876
        }
1877

1878
        bc_log_infof("Stopping dependency %s on node %s", dep->unit_name, node->name);
11✔
1879

1880
        int r = send_agent_simple_message(node, "StopDep", dep->unit_name);
11✔
1881
        if (r < 0) {
11✔
1882
                bc_log_error("Failed to send StopDep to agent");
×
1883
        }
1884
}
1885

1886
static struct ProxyDependency *node_find_proxy_dependency(Node *node, const char *unit_name) {
24✔
1887
        ProxyDependency *dep = NULL;
24✔
1888
        LIST_FOREACH(deps, dep, node->proxy_dependencies) {
24✔
1889
                if (streq(dep->unit_name, unit_name)) {
13✔
1890
                        return dep;
1891
                }
1892
        }
1893

1894
        return NULL;
1895
}
1896

1897
int node_add_proxy_dependency(Node *node, const char *unit_name) {
12✔
1898
        ProxyDependency *dep = NULL;
12✔
1899

1900
        dep = node_find_proxy_dependency(node, unit_name);
12✔
1901
        if (dep) {
12✔
1902
                dep->n_deps++;
1✔
1903
                /* Always start, if the dep service was stopped by
1904
                   the target service stopping */
1905
                node_start_proxy_dependency(node, dep);
1✔
1906
                return 0;
1✔
1907
        }
1908

1909
        _cleanup_free_ char *unit_name_copy = strdup(unit_name);
23✔
1910
        if (unit_name_copy == NULL) {
11✔
1911
                return -ENOMEM;
1912
        }
1913

1914
        dep = malloc0(sizeof(ProxyDependency));
11✔
1915
        if (dep == NULL) {
11✔
1916
                return -ENOMEM;
1917
        }
1918

1919
        dep->unit_name = steal_pointer(&unit_name_copy);
11✔
1920
        dep->n_deps = 1;
11✔
1921
        LIST_APPEND(deps, node->proxy_dependencies, dep);
11✔
1922

1923
        node_start_proxy_dependency(node, dep);
11✔
1924

1925
        return 0;
1926
}
1927

1928
int node_remove_proxy_dependency(Node *node, const char *unit_name) {
12✔
1929
        ProxyDependency *dep = NULL;
12✔
1930
        dep = node_find_proxy_dependency(node, unit_name);
12✔
1931
        if (!dep) {
12✔
1932
                return -ENOENT;
1933
        }
1934

1935
        dep->n_deps--;
12✔
1936

1937
        if (dep->n_deps == 0) {
12✔
1938
                /* Only stop on the last dep */
1939
                node_stop_proxy_dependency(node, dep);
11✔
1940

1941
                LIST_REMOVE(deps, node->proxy_dependencies, dep);
11✔
1942
                proxy_dependency_free(dep);
11✔
1943
        }
1944

1945
        return 0;
1946
}
1947

1948
int node_method_get_unit_uint64_property_sync(Node *node, char *unit, char *property, uint64_t *value) {
6✔
1949
        int r = 0;
6✔
1950
        _cleanup_sd_bus_message_ sd_bus_message *message = NULL;
6✔
1951
        sd_bus_error error = SD_BUS_ERROR_NULL;
6✔
1952
        r = sd_bus_call_method(
6✔
1953
                        node->agent_bus,
1954
                        BC_AGENT_DBUS_NAME,
1955
                        INTERNAL_AGENT_OBJECT_PATH,
1956
                        INTERNAL_AGENT_INTERFACE,
1957
                        "GetUnitProperty",
1958
                        &error,
1959
                        &message,
1960
                        "sss",
1961
                        unit,
1962
                        "org.freedesktop.systemd1.Unit",
1963
                        property);
1964
        if (r < 0) {
6✔
1965
                bc_log_errorf("Failed to issue GetUnitProperty call: %s", error.message);
×
1966
                sd_bus_error_free(&error);
×
1967
                return r;
1968
        }
1969

1970
        r = sd_bus_message_enter_container(message, SD_BUS_TYPE_VARIANT, "t");
6✔
1971
        if (r < 0) {
6✔
1972
                bc_log_errorf("Failed to parse response message: %s", strerror(-r));
×
1973
                return r;
1974
        }
1975

1976
        r = sd_bus_message_read_basic(message, SD_BUS_TYPE_UINT64, value);
6✔
1977
        if (r < 0) {
6✔
1978
                bc_log_errorf("Failed to parse response message: %s", strerror(-r));
×
1979
                return r;
1980
        }
1981

1982
        r = sd_bus_message_exit_container(message);
6✔
1983
        if (r < 0) {
6✔
1984
                bc_log_errorf("Failed to parse response message: %s", strerror(-r));
×
1985
                return r;
1986
        }
1987

1988
        return 0;
1989
}
1990

1991
void node_enable_metrics(Node *node) {
3✔
1992
        if (!node_has_agent(node)) {
3✔
1993
                return;
1994
        }
1995

1996
        int r = send_agent_simple_message(node, "EnableMetrics", NULL);
3✔
1997
        if (r < 0) {
3✔
1998
                bc_log_error("Failed to enable metrics on agent");
×
1999
        }
2000

2001
        if (!metrics_node_signal_matching_register(node)) {
3✔
2002
                bc_log_error("Failed to enable metrics on agent");
×
2003
        }
2004
}
2005

2006
void node_disable_metrics(Node *node) {
1✔
2007
        if (!node_has_agent(node)) {
1✔
2008
                return;
2009
        }
2010

2011
        int r = send_agent_simple_message(node, "DisableMetrics", NULL);
1✔
2012
        if (r < 0) {
1✔
2013
                bc_log_error("Failed to disable metrics on agent");
×
2014
        }
2015

2016
        sd_bus_slot_unrefp(&node->metrics_matching_slot);
1✔
2017
        node->metrics_matching_slot = NULL;
1✔
2018
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc