• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TEN-framework / ten-framework / 20358357829

19 Dec 2025 03:00AM UTC coverage: 57.587% (+0.2%) from 57.34%
20358357829

push

github

web-flow
fix: coveralls remove container for all test jobs (#1878)

* chore: calculate coverage in debug mode

* Revert "chore: calculate coverage in debug mode"

This reverts commit f68a031af.

* Reapply "chore: calculate coverage in debug mode"

This reverts commit bc27f2c1c.

* fix: remove container for test jobs

* fix: add C++ toolchain, go builder, sanitizer support, nodejs env

* fix: compiler version incompatible

* Revert "fix: compiler version incompatible"

This reverts commit 23f5c3f7d.

* fix: set PATH so that clang-21 is find earlier than older versions

* fix: uninstall older versions of clang

* Revert "fix: set PATH so that clang-21 is find earlier than older versions"

This reverts commit b66e56c08.

* fix: uninstall old clang first then install clang21

* fix: supplement test jobs deps according to tools Dockerfile

* fix: install libasan5 for all jobs and remove unecessary deps

* chore: refine codes and output clang version before every test

* fix: go back to clang 18 to detect __lsan_do_leak_check

* Revert "fix: go back to clang 18 to detect __lsan_do_leak_check"

This reverts commit d247818a8.

* fix: upgrade libasan5 to libasan8 to match clang 21

* fix: install libclang-rt-21-dev

* Revert "fix: install libclang-rt-21-dev"

This reverts commit 39b96e030.

* fix: ten_enable_go_app_leak_check

* chore: uninstall useless dep libasan8

* chore: refine codes and uninstall unecessary dep clang-tool-21

54480 of 94605 relevant lines covered (57.59%)

682622.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.66
/core/src/ten_runtime/extension_thread/msg_interface/common.c
1
//
2
// Copyright © 2025 Agora
3
// This file is part of TEN Framework, an open source project.
4
// Licensed under the Apache License, Version 2.0, with certain conditions.
5
// Refer to the "LICENSE" file in the root directory for more information.
6
//
7
#include "include_internal/ten_runtime/extension_thread/msg_interface/common.h"
8

9
#include "include_internal/ten_runtime/app/app.h"
10
#include "include_internal/ten_runtime/app/msg_interface/common.h"
11
#include "include_internal/ten_runtime/common/constant_str.h"
12
#include "include_internal/ten_runtime/common/loc.h"
13
#include "include_internal/ten_runtime/engine/engine.h"
14
#include "include_internal/ten_runtime/engine/internal/extension_interface.h"
15
#include "include_internal/ten_runtime/engine/msg_interface/common.h"
16
#include "include_internal/ten_runtime/extension/extension.h"
17
#include "include_internal/ten_runtime/extension/msg_handling.h"
18
#include "include_internal/ten_runtime/extension_context/extension_context.h"
19
#include "include_internal/ten_runtime/extension_group/extension_group.h"
20
#include "include_internal/ten_runtime/extension_group/metadata.h"
21
#include "include_internal/ten_runtime/extension_store/extension_store.h"
22
#include "include_internal/ten_runtime/extension_thread/extension_thread.h"
23
#include "include_internal/ten_runtime/extension_thread/telemetry.h"
24
#include "include_internal/ten_runtime/msg/msg.h"
25
#include "ten_runtime/app/app.h"
26
#include "ten_runtime/msg/cmd_result/cmd_result.h"
27
#include "ten_runtime/ten_env/ten_env.h"
28
#include "ten_utils/io/runloop.h"
29
#include "ten_utils/lib/event.h"
30
#include "ten_utils/lib/smart_ptr.h"
31
#include "ten_utils/lib/string.h"
32
#include "ten_utils/macro/check.h"
33
#include "ten_utils/macro/mark.h"
34

35
void ten_extension_thread_handle_start_msg_task(void *self_,
36
                                                TEN_UNUSED void *arg) {
2,011✔
37
  ten_extension_thread_t *self = (ten_extension_thread_t *)self_;
2,011✔
38
  TEN_ASSERT(self, "Invalid argument.");
2,011✔
39
  TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
2,011✔
40
             "Invalid use of extension_thread %p.", self);
2,011✔
41

42
  TEN_ASSERT(self->extension_group, "Should not happen.");
2,011✔
43

44
  ten_extension_group_load_metadata(self->extension_group);
2,011✔
45
}
2,011✔
46

47
static void ten_extension_thread_handle_in_msg_sync(
48
    ten_extension_thread_t *self, ten_shared_ptr_t *msg) {
36,368✔
49
  TEN_ASSERT(self, "Invalid argument.");
36,368✔
50
  TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
36,368✔
51
             "Invalid use of extension_thread %p.", self);
36,368✔
52
  TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1,
36,368✔
53
             "When this function is executed, there should be only one "
36,368✔
54
             "destination remaining in the message's dest.");
36,368✔
55

56
  // Find the extension according to 'loc'.
57
  ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
36,368✔
58

59
  ten_extension_t *extension = ten_extension_store_find_extension(
36,368✔
60
      self->extension_store, ten_string_get_raw_str(&dest_loc->extension_name),
36,368✔
61
      self->in_lock_mode ? false : true);
36,368✔
62

63
  if (!extension) {
36,368✔
64
    // ten_msg_dump(msg, NULL,
65
    //              "Failed to find destination extension %s for msg ^m in %s",
66
    //              ten_string_get_raw_str(&dest_loc->extension_name),
67
    //              ten_string_get_raw_str(&self->extension_group->name));
68

69
    // Return a result, so that the sender can know what's going on.
70
    if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD) {
19✔
71
      ten_string_t detail;
2✔
72
      ten_string_init_formatted(
2✔
73
          &detail, "The extension[%s] is invalid.",
2✔
74
          ten_string_get_raw_str(&dest_loc->extension_name));
2✔
75

76
      ten_extension_thread_create_cmd_result_and_dispatch(
2✔
77
          self, msg, TEN_STATUS_CODE_ERROR, ten_string_get_raw_str(&detail));
2✔
78

79
      ten_string_deinit(&detail);
2✔
80
    } else {
17✔
81
      // The reason for the disappearance of the extension might be that the
82
      // extension's termination process is kind of _smooth_, allowing it to end
83
      // directly without waiting for anything to happen. In such a case, it is
84
      // possible that the already terminated extension instance cannot be
85
      // found.
86
      TEN_LOGW(
17✔
87
          "Unable to send the message %s to the absent destination extension "
17✔
88
          "%s.",
17✔
89
          ten_msg_get_name(msg),
17✔
90
          ten_string_get_raw_str(&dest_loc->extension_name));
17✔
91
    }
17✔
92
  } else {
36,349✔
93
    if (extension->extension_thread != self) {
36,349✔
94
      // ten_msg_dump(msg, NULL, "Unexpected msg ^m for extension %s",
95
      //              ten_string_get_raw_str(&extension->name));
96

97
      TEN_ASSERT(0, "Should not happen.");
×
98
    }
×
99

100
    ten_extension_handle_in_msg(extension, msg);
36,349✔
101
  }
36,349✔
102
}
36,368✔
103

104
void ten_extension_thread_handle_in_msg_task(void *self_, void *arg) {
8,180✔
105
  ten_extension_thread_t *self = (ten_extension_thread_t *)self_;
8,180✔
106
  TEN_ASSERT(self, "Invalid argument.");
8,180✔
107
  TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
8,180✔
108
             "Invalid use of extension_thread %p.", self);
8,180✔
109

110
  ten_shared_ptr_t *msg = (ten_shared_ptr_t *)arg;
8,180✔
111
  TEN_ASSERT(msg, "Invalid argument.");
8,180✔
112
  TEN_ASSERT(ten_msg_check_integrity(msg), "Invalid argument.");
8,180✔
113
  TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1, "Should not happen.");
8,180✔
114

115
#if defined(TEN_ENABLE_TEN_RUST_APIS)
8,180✔
116
  int64_t timestamp = ten_msg_get_timestamp(msg);
8,180✔
117
  ten_extension_thread_record_extension_thread_msg_queue_stay_time(self,
8,180✔
118
                                                                   timestamp);
8,180✔
119
#endif
8,180✔
120

121
  switch (ten_extension_thread_get_state(self)) {
8,180✔
122
  case TEN_EXTENSION_THREAD_STATE_INIT:
×
123
  case TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS:
×
124
#if defined(_DEBUG)
×
125
    // ten_msg_dump(msg, NULL,
126
    //              "A message (^m) comes when extension thread (%p) is in "
127
    //              "state (%d)",
128
    //              self, ten_extension_thread_get_state(self));
129
#endif
×
130

131
    // At this stage, the extensions have not been created yet, so any
132
    // received messages are placed into a `pending_msgs` list. Once the
133
    // extensions are created, the messages will be delivered to the
134
    // corresponding extensions.
135
    ten_list_push_smart_ptr_back(&self->pending_msgs_received_in_init_stage,
×
136
                                 msg);
×
137
    break;
×
138

139
  case TEN_EXTENSION_THREAD_STATE_NORMAL:
8,048✔
140
  case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE:
8,180✔
141
    ten_extension_thread_handle_in_msg_sync(self, msg);
8,180✔
142
    break;
8,180✔
143

144
  case TEN_EXTENSION_THREAD_STATE_CLOSED:
×
145
    // All extensions are removed from this extension thread, so the only
146
    // thing we can do is to discard this cmd result.
147
    break;
×
148

149
  default:
×
150
    TEN_ASSERT(0, "Should not happen.");
×
151
    break;
×
152
  }
8,180✔
153

154
  ten_shared_ptr_destroy(msg);
8,180✔
155
}
8,180✔
156

157
static void ten_extension_thread_process_release_lock_mode_task(
158
    void *self_, TEN_UNUSED void *arg) {
556✔
159
  ten_extension_thread_t *self = (ten_extension_thread_t *)self_;
556✔
160
  TEN_ASSERT(self, "Invalid argument.");
556✔
161
  TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
556✔
162
             "Invalid use of extension_thread %p.", self);
556✔
163

164
  // Unset `in_lock_mode` to reflect the effect of the below `ten_mutex_unlock`
165
  // releasing the block on the extension thread.
166
  self->in_lock_mode = false;
556✔
167

168
  int rc = ten_mutex_unlock(self->lock_mode_lock);
556✔
169
  TEN_ASSERT(!rc, "Should not happen.");
556✔
170
}
556✔
171

172
// This task is used to allow the outer thread to wait for the extension thread
173
// to reach a certain point in time. Subsequently, the extension thread will
174
// be blocked in this function.
175
void ten_extension_thread_process_acquire_lock_mode_task(void *self_,
176
                                                         void *arg) {
556✔
177
  ten_extension_thread_t *self = (ten_extension_thread_t *)self_;
556✔
178
  TEN_ASSERT(self, "Invalid argument.");
556✔
179
  TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
556✔
180
             "Invalid use of extension_thread %p.", self);
556✔
181

182
  ten_acquire_lock_mode_result_t *acquire_result =
556✔
183
      (ten_acquire_lock_mode_result_t *)arg;
556✔
184
  TEN_ASSERT(acquire_result, "Invalid argument.");
556✔
185

186
  // Because the extension thread is about to acquire the lock mode lock to
187
  // prevent the outer thread from directly using the TEN world, a task to
188
  // release the lock mode is inserted, allowing the extension thread to exit
189
  // this mode and giving the outer thread a chance to acquire the lock mode
190
  // lock.
191
  int rc = ten_runloop_post_task_tail(
556✔
192
      self->runloop, ten_extension_thread_process_release_lock_mode_task, self,
556✔
193
      NULL);
556✔
194
  if (rc) {
556✔
195
    TEN_LOGW("Failed to post task to extension thread's runloop: %d", rc);
×
196
    TEN_ASSERT(0, "Should not happen.");
×
197
  }
×
198

199
  // Set `in_lock_mode` to reflect the effect of the below `ten_mutex_lock`
200
  // blocking the extension thread.
201
  self->in_lock_mode = true;
556✔
202

203
  // Inform the outer thread that the extension thread has also entered the
204
  // lock mode.
205
  ten_event_set(acquire_result->completed);
556✔
206

207
  // Extension thread will block here until the outer thread releases this lock.
208
  rc = ten_mutex_lock(self->lock_mode_lock);
556✔
209
  TEN_ASSERT(!rc, "Should not happen.");
556✔
210
}
556✔
211

212
void ten_extension_thread_dispatch_msg(ten_extension_thread_t *self,
213
                                       ten_shared_ptr_t *msg) {
36,434✔
214
  TEN_ASSERT(self, "Invalid argument.");
36,434✔
215
  TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
36,434✔
216
             "Invalid use of extension_thread %p.", self);
36,434✔
217
  TEN_ASSERT(msg && (ten_msg_get_dest_cnt(msg) == 1),
36,434✔
218
             "When this function is executed, there should be only one "
36,434✔
219
             "destination remaining in the message's dest.");
36,434✔
220

221
  ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
36,434✔
222
  TEN_ASSERT(dest_loc, "Should not happen.");
36,434✔
223
  TEN_ASSERT(ten_loc_check_integrity(dest_loc), "Should not happen.");
36,434✔
224

225
  ten_extension_group_t *extension_group = self->extension_group;
36,434✔
226
  TEN_ASSERT(extension_group, "Should not happen.");
36,434✔
227
  TEN_ASSERT(ten_extension_group_check_integrity(extension_group, true),
36,434✔
228
             "Should not happen.");
36,434✔
229

230
  ten_engine_t *engine = self->extension_context->engine;
36,434✔
231
  TEN_ASSERT(engine, "Should not happen.");
36,434✔
232
  TEN_ASSERT(ten_engine_check_integrity(engine, false), "Should not happen.");
36,434✔
233

234
  ten_app_t *app = engine->app;
36,434✔
235
  TEN_ASSERT(app, "Should not happen.");
36,434✔
236
  TEN_ASSERT(ten_app_check_integrity(app, false), "Should not happen.");
36,434✔
237

238
  if (!ten_string_is_equal_c_str(&dest_loc->app_uri, ten_app_get_uri(app))) {
36,434✔
239
    // Other TEN apps.
240
    TEN_ASSERT(!ten_string_is_empty(&dest_loc->app_uri), "Should not happen.");
4,670✔
241

242
    // Because the remote might be added or deleted at runtime, so ask the
243
    // engine to route the message to the specified remote to keep thread
244
    // safety.
245
    ten_engine_append_to_in_msgs_queue(engine, msg);
4,670✔
246
  } else {
31,764✔
247
    if (
31,764✔
248
        // It means asking the current app to do something.
249
        ten_string_is_empty(&dest_loc->graph_id) ||
31,764✔
250
        // It means asking another engine in the same app to do something.
251
        !ten_string_is_equal(&dest_loc->graph_id, &engine->graph_id)) {
31,764✔
252
      // The message should not be handled in this engine, so ask the app to
253
      // handle this message.
254

255
      ten_app_push_to_in_msgs_queue(app, msg);
102✔
256
    } else {
31,662✔
257
      if (ten_string_is_empty(&dest_loc->extension_name)) {
31,662✔
258
        // Because the destination is the current engine, so ask the engine to
259
        // handle this message.
260

261
        ten_engine_append_to_in_msgs_queue(engine, msg);
39✔
262
      } else {
31,623✔
263
        const char *extension_group_name =
31,623✔
264
            ten_extension_context_get_extension_group_name(
31,623✔
265
                self->extension_context,
31,623✔
266
                ten_string_get_raw_str(&dest_loc->app_uri),
31,623✔
267
                ten_string_get_raw_str(&dest_loc->graph_id),
31,623✔
268
                ten_string_get_raw_str(&dest_loc->extension_name), false);
31,623✔
269

270
        if (extension_group_name) {
31,623✔
271
          if (!ten_string_is_equal_c_str(&extension_group->name,
31,622✔
272
                                         extension_group_name)) {
31,622✔
273
            // Find the correct extension thread to handle this message.
274
            ten_engine_append_to_in_msgs_queue(engine, msg);
3,434✔
275
          } else {
28,188✔
276
            // The message should be handled in the current extension thread, so
277
            // dispatch the message to the current extension thread.
278
            ten_extension_thread_handle_in_msg_sync(self, msg);
28,188✔
279
          }
28,188✔
280
        } else {
31,622✔
281
          if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD) {
1✔
282
            ten_string_t detail;
1✔
283
            ten_string_init_formatted(&detail, "Failed to find destination.");
1✔
284

285
            ten_extension_thread_create_cmd_result_and_dispatch(
1✔
286
                self, msg, TEN_STATUS_CODE_ERROR,
1✔
287
                ten_string_get_raw_str(&detail));
1✔
288

289
            ten_string_deinit(&detail);
1✔
290
          }
1✔
291
        }
1✔
292
      }
31,623✔
293
    }
31,662✔
294
  }
31,764✔
295
}
36,434✔
296

297
void ten_extension_thread_create_cmd_result_and_dispatch(
298
    ten_extension_thread_t *self, ten_shared_ptr_t *origin_cmd,
299
    TEN_STATUS_CODE status_code, const char *detail) {
7✔
300
  TEN_ASSERT(self, "Invalid argument.");
7✔
301
  TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
7✔
302
             "Invalid argument.");
7✔
303

304
  TEN_ASSERT(origin_cmd, "Invalid argument.");
7✔
305
  TEN_ASSERT(ten_msg_is_cmd(origin_cmd), "Invalid argument.");
7✔
306

307
  ten_shared_ptr_t *cmd_result =
7✔
308
      ten_cmd_result_create_from_cmd(status_code, origin_cmd);
7✔
309

310
  if (detail) {
7✔
311
    ten_msg_set_property(cmd_result, TEN_STR_DETAIL,
7✔
312
                         ten_value_create_string(detail), NULL);
7✔
313
  }
7✔
314

315
  // TODO(Wei): Here, an optimization can be made: check whether
316
  // cmd_result.dest_loc is the current extension_thread (i.e., `self`), and
317
  // avoid post the `cmd_result` to the engine msg queue.
318
  //
319
  // - If it is, `cmd_result` can be directly placed into `self`'s message
320
  //   queue.
321
  // - Alternatively, the specific `extension` within this extension thread can
322
  //   be identified, and its corresponding `on_xxx` function can be called
323
  //   directly.
324

325
  ten_engine_t *engine = self->extension_context->engine;
7✔
326
  TEN_ASSERT(engine, "Should not happen.");
7✔
327
  TEN_ASSERT(ten_engine_check_integrity(engine, false), "Should not happen.");
7✔
328

329
  ten_engine_append_to_in_msgs_queue(engine, cmd_result);
7✔
330

331
  ten_shared_ptr_destroy(cmd_result);
7✔
332
}
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc