• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricks-micropython / 19016791193

02 Nov 2025 06:40PM UTC coverage: 57.167% (-2.6%) from 59.744%
19016791193

Pull #406

github

laurensvalk
bricks/virtualhub: Replace with embedded simulation.

Instead of using the newly introduced simhub alongside the virtualhub, we'll just replace the old one entirely now that it has reached feature parity. We can keep calling it the virtualhub.
Pull Request #406: New virtual hub for more effective debugging

41 of 48 new or added lines in 7 files covered. (85.42%)

414 existing lines in 53 files now uncovered.

4479 of 7835 relevant lines covered (57.17%)

17178392.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pybricks/tools/pb_type_app_data.c
1
// SPDX-License-Identifier: MIT
2
// Copyright (c) 2024 The Pybricks Authors
3

4
#include "py/mpconfig.h"
5

6
#if PYBRICKS_PY_TOOLS_APP_DATA
7

8
#include <string.h>
9

10
#include <pbsys/command.h>
11
#include <pbsys/host.h>
12

13
#include "py/mphal.h"
14
#include "py/objstr.h"
15

16
#include <pybricks/tools.h>
17
#include <pybricks/tools/pb_type_async.h>
18

19
#include <pybricks/util_mp/pb_kwarg_helper.h>
20
#include <pybricks/util_mp/pb_obj_helper.h>
21
#include <pybricks/util_pb/pb_error.h>
22

23
typedef struct _pb_type_app_data_obj_t {
24
    mp_obj_base_t base;
25
    pb_type_async_t *tx_iter;
26
    mp_obj_t rx_format;
27
    mp_obj_str_t rx_bytes_obj;
28
    uint8_t rx_buffer[] __attribute__((aligned(4)));
29
} pb_type_app_data_obj_t;
30

31
// pointer to dynamically allocated app_data singleton for driver callback.
32
static pb_type_app_data_obj_t *app_data_instance;
33

34
static pbio_error_t handle_incoming_app_data(uint16_t offset, uint32_t size, const uint8_t *data) {
×
35
    // Can't write if rx_buffer does not exist or isn't big enough.
36
    if (!app_data_instance || offset + size > app_data_instance->rx_bytes_obj.len) {
×
UNCOV
37
        return PBIO_ERROR_INVALID_ARG;
×
38
    }
39
    memcpy(app_data_instance->rx_buffer + offset, data, size);
×
40
    return PBIO_SUCCESS;
×
41
}
42

43
static mp_obj_t pb_type_app_data_get_bytes(mp_obj_t self_in) {
×
44
    pb_type_app_data_obj_t *self = MP_OBJ_TO_PTR(self_in);
×
45
    // Don't return internal bytes object but make a copy so the user bytes
46
    // object is constant as would be expected. Revisit: enable and return
47
    // a memoryview, especially if using large buffers.
48
    return mp_obj_new_bytes(self->rx_bytes_obj.data, self->rx_bytes_obj.len);
×
49
}
50
static MP_DEFINE_CONST_FUN_OBJ_1(pb_type_app_data_get_bytes_obj, pb_type_app_data_get_bytes);
51

52
static mp_obj_t pb_type_app_data_get_values(mp_obj_t self_in) {
×
53

54
    // Implementation in MicroPython is static, so import from ustruct.unpack.
55
    mp_obj_t ustruct_unpack = pb_function_import_helper(MP_QSTR_ustruct, MP_QSTR_unpack);
×
56

57
    // Host (sender) is responsible for making sure that each individual
58
    // value remains valid, i.e. is written in a single chunk, since the
59
    // following may allocate, and thus be updated between unpacking values.
60
    pb_type_app_data_obj_t *self = MP_OBJ_TO_PTR(self_in);
×
61
    return mp_call_function_2(ustruct_unpack, self->rx_format, MP_OBJ_FROM_PTR(&self->rx_bytes_obj));
×
62
}
63
static MP_DEFINE_CONST_FUN_OBJ_1(pb_type_app_data_get_values_obj, pb_type_app_data_get_values);
64

65
static pbio_error_t app_data_write_bytes_iterate_once(pbio_os_state_t *state, mp_obj_t parent_obj) {
×
66
    // No need to pass in buffered arguments since they were copied on the
67
    // inital run. We can just keep calling this until completion.
68
    return pbsys_host_send_event(state, 0, NULL, 0);
×
69
}
70

71
static mp_obj_t pb_type_app_data_write_bytes(mp_obj_t self_in, mp_obj_t data_in) {
×
72

73
    // The first call will copy given data (or raise errors) so we don't need
74
    // to buffer things here.
75
    size_t size;
76
    const uint8_t *data = (const uint8_t *)mp_obj_str_get_data(data_in, &size);
×
77
    pbio_os_state_t state = 0;
×
78
    pbio_error_t err = pbsys_host_send_event(&state, PBIO_PYBRICKS_EVENT_WRITE_STDOUT, data, size);
×
79

80
    // Expect yield after the initial call.
81
    if (err == PBIO_SUCCESS) {
×
UNCOV
82
        pb_assert(PBIO_ERROR_FAILED);
×
83
    } else if (err != PBIO_ERROR_AGAIN) {
×
84
        pb_assert(err);
×
85
    }
86

87
    pb_type_async_t config = {
×
88
        .parent_obj = self_in,
89
        .iter_once = app_data_write_bytes_iterate_once,
90
        .state = state,
91
    };
92

93
    pb_type_app_data_obj_t *self = MP_OBJ_TO_PTR(self_in);
×
94
    return pb_type_async_wait_or_await(&config, &self->tx_iter, true);
×
95
}
96
static MP_DEFINE_CONST_FUN_OBJ_2(pb_type_app_data_write_bytes_obj, pb_type_app_data_write_bytes);
97

98
static const mp_obj_str_t pb_const_empty_str_obj = {{&mp_type_str}, 0, 0, (const byte *)""};
99

100
static mp_obj_t pb_type_app_data_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
×
101

102
    PB_PARSE_ARGS_CLASS(n_args, n_kw, args,
×
103
        PB_ARG_DEFAULT_OBJ(rx_format, pb_const_empty_str_obj));
104

105
    // Use ustruct.calcsize to parse user rx_format for size.
106
    mp_obj_t ustruct_calcsize = pb_function_import_helper(MP_QSTR_ustruct, MP_QSTR_calcsize);
×
107
    size_t size = mp_obj_get_int(mp_call_function_1(ustruct_calcsize, rx_format_in));
×
108

109
    // Can only create one instance for now.
110
    if (app_data_instance) {
×
111
        mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT("host rx_buffer already allocated"));
×
112
    }
113

114
    // Use finalizer so we can deactivate the data callback when rx_buffer is garbage collected.
115
    app_data_instance = mp_obj_malloc_var_with_finaliser(pb_type_app_data_obj_t, uint8_t, size, type);
×
116
    app_data_instance->rx_format = rx_format_in;
×
117

118
    // Keep rx_buffer in bytes object rx_format for compatibility with unpack.
119
    app_data_instance->rx_bytes_obj.base.type = &mp_type_bytes;
×
120
    app_data_instance->rx_bytes_obj.len = size;
×
121
    app_data_instance->rx_bytes_obj.data = app_data_instance->rx_buffer;
×
122

123
    // Activate callback now that we have allocated the rx_buffer.
124
    pbsys_command_set_write_app_data_callback(handle_incoming_app_data);
×
125

126
    app_data_instance->tx_iter = NULL;
×
127

128
    return MP_OBJ_FROM_PTR(app_data_instance);
×
129
}
130

131
mp_obj_t pb_type_app_data_close(mp_obj_t stream) {
×
132
    if (app_data_instance) {
×
133
        pbsys_command_set_write_app_data_callback(NULL);
×
134
        app_data_instance = NULL;
×
135
    }
136
    return mp_const_none;
×
137
}
138
MP_DEFINE_CONST_FUN_OBJ_1(pb_type_app_data_close_obj, pb_type_app_data_close);
139

140
static const mp_rom_map_elem_t pb_type_app_data_locals_dict_table[] = {
141
    { MP_ROM_QSTR(MP_QSTR___del__),      MP_ROM_PTR(&pb_type_app_data_close_obj) },
142
    { MP_ROM_QSTR(MP_QSTR_close),        MP_ROM_PTR(&pb_type_app_data_close_obj) },
143
    { MP_ROM_QSTR(MP_QSTR_get_bytes),    MP_ROM_PTR(&pb_type_app_data_get_bytes_obj) },
144
    { MP_ROM_QSTR(MP_QSTR_get_values),   MP_ROM_PTR(&pb_type_app_data_get_values_obj) },
145
    { MP_ROM_QSTR(MP_QSTR_write_bytes),    MP_ROM_PTR(&pb_type_app_data_write_bytes_obj) },
146
};
147
static MP_DEFINE_CONST_DICT(pb_type_app_data_locals_dict, pb_type_app_data_locals_dict_table);
148

149
MP_DEFINE_CONST_OBJ_TYPE(pb_type_app_data,
150
    MP_QSTR_AppData,
151
    MP_TYPE_FLAG_NONE,
152
    make_new, pb_type_app_data_make_new,
153
    locals_dict, &pb_type_app_data_locals_dict);
154

155
#endif // PYBRICKS_PY_TOOLS_APP_DATA
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc