• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricks-micropython / 20654832194

02 Jan 2026 09:20AM UTC coverage: 50.678% (-0.4%) from 51.097%
20654832194

Pull #435

github

web-flow
Merge f2786c234 into 10af94a3e
Pull Request #435: virtualhub: Implement BTStack bluetooth.

4596 of 9069 relevant lines covered (50.68%)

14840074.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pybricks/tools/pb_type_app_data.c
1
// SPDX-License-Identifier: MIT
2
// Copyright (c) 2024 The Pybricks Authors
3

4
#include "py/mpconfig.h"
5

6
#if PYBRICKS_PY_TOOLS_APP_DATA
7

8
#include <string.h>
9

10
#include <pbsys/command.h>
11
#include <pbsys/host.h>
12

13
#include "py/mphal.h"
14
#include "py/objstr.h"
15

16
#include <pybricks/tools.h>
17
#include <pybricks/tools/pb_type_async.h>
18

19
#include <pybricks/util_mp/pb_kwarg_helper.h>
20
#include <pybricks/util_mp/pb_obj_helper.h>
21
#include <pybricks/util_pb/pb_error.h>
22

23
typedef struct _pb_type_app_data_obj_t {
24
    mp_obj_base_t base;
25
    pb_type_async_t *tx_iter;
26
    mp_obj_t rx_format;
27
    mp_obj_str_t rx_bytes_obj;
28
    uint8_t rx_buffer[] __attribute__((aligned(4)));
29
} pb_type_app_data_obj_t;
30

31
// pointer to dynamically allocated app_data singleton for driver callback.
32
static pb_type_app_data_obj_t *app_data_instance;
33

34
static pbio_error_t handle_incoming_app_data(uint16_t offset, uint32_t size, const uint8_t *data) {
×
35
    // Can't write if rx_buffer does not exist or isn't big enough.
36
    if (!app_data_instance || offset + size > app_data_instance->rx_bytes_obj.len) {
×
37
        return PBIO_ERROR_INVALID_ARG;
×
38
    }
39
    memcpy(app_data_instance->rx_buffer + offset, data, size);
×
40
    return PBIO_SUCCESS;
×
41
}
42

43
static mp_obj_t pb_type_app_data_get_bytes(mp_obj_t self_in) {
×
44
    pb_type_app_data_obj_t *self = MP_OBJ_TO_PTR(self_in);
×
45
    // Don't return internal bytes object but make a copy so the user bytes
46
    // object is constant as would be expected. Revisit: enable and return
47
    // a memoryview, especially if using large buffers.
48
    return mp_obj_new_bytes(self->rx_bytes_obj.data, self->rx_bytes_obj.len);
×
49
}
50
static MP_DEFINE_CONST_FUN_OBJ_1(pb_type_app_data_get_bytes_obj, pb_type_app_data_get_bytes);
51

52
static mp_obj_t pb_type_app_data_get_values(mp_obj_t self_in) {
×
53

54
    // Implementation in MicroPython is static, so import from ustruct.unpack.
55
    mp_obj_t ustruct_unpack = pb_function_import_helper(MP_QSTR_ustruct, MP_QSTR_unpack);
×
56

57
    // Host (sender) is responsible for making sure that each individual
58
    // value remains valid, i.e. is written in a single chunk, since the
59
    // following may allocate, and thus be updated between unpacking values.
60
    pb_type_app_data_obj_t *self = MP_OBJ_TO_PTR(self_in);
×
61
    return mp_call_function_2(ustruct_unpack, self->rx_format, MP_OBJ_FROM_PTR(&self->rx_bytes_obj));
×
62
}
63
static MP_DEFINE_CONST_FUN_OBJ_1(pb_type_app_data_get_values_obj, pb_type_app_data_get_values);
64

65
static pbio_error_t app_data_write_bytes_iterate_once(pbio_os_state_t *state, mp_obj_t parent_obj) {
×
66
    // No need to pass in buffered arguments since they were copied on the
67
    // inital run. We can just keep calling this until completion.
68
    return pbsys_host_send_event(state, PBIO_PYBRICKS_EVENT_WRITE_APP_DATA, NULL, 0);
×
69
}
70

71
static mp_obj_t pb_type_app_data_write_bytes(mp_obj_t self_in, mp_obj_t data_in) {
×
72

73
    // The first call will copy given data (or raise errors) so we don't need
74
    // to buffer things here.
75
    size_t size;
76
    const uint8_t *data = (const uint8_t *)mp_obj_str_get_data(data_in, &size);
×
77
    pbio_os_state_t state = 0;
×
78
    pbio_error_t err = pbsys_host_send_event(&state, PBIO_PYBRICKS_EVENT_WRITE_APP_DATA, data, size);
×
79

80
    // Expect yield after the initial call.
81
    if (err == PBIO_SUCCESS) {
×
82
        pb_assert(PBIO_ERROR_FAILED);
×
83
    } else if (err != PBIO_ERROR_AGAIN) {
×
84
        pb_assert(err);
×
85
    }
86

87
    pb_type_async_t config = {
×
88
        .parent_obj = self_in,
89
        .iter_once = app_data_write_bytes_iterate_once,
90
        .state = state,
91
    };
92

93
    pb_type_app_data_obj_t *self = MP_OBJ_TO_PTR(self_in);
×
94
    return pb_type_async_wait_or_await(&config, &self->tx_iter, true);
×
95
}
96
static MP_DEFINE_CONST_FUN_OBJ_2(pb_type_app_data_write_bytes_obj, pb_type_app_data_write_bytes);
97

98
static const mp_obj_str_t pb_const_empty_str_obj = {{&mp_type_str}, 0, 0, (const byte *)""};
99

100
static mp_obj_t pb_type_app_data_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
×
101

102
    PB_PARSE_ARGS_CLASS(n_args, n_kw, args,
×
103
        PB_ARG_DEFAULT_OBJ(rx_format, pb_const_empty_str_obj));
104

105
    // Use ustruct.calcsize to parse user rx_format for size.
106
    mp_obj_t ustruct_calcsize = pb_function_import_helper(MP_QSTR_ustruct, MP_QSTR_calcsize);
×
107
    size_t size = mp_obj_get_int(mp_call_function_1(ustruct_calcsize, rx_format_in));
×
108

109
    // Can only create one instance for now.
110
    if (app_data_instance) {
×
111
        mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT("host rx_buffer already allocated"));
×
112
    }
113

114
    // Use finalizer so we can deactivate the data callback when rx_buffer is garbage collected.
115
    app_data_instance = mp_obj_malloc_var_with_finaliser(pb_type_app_data_obj_t, uint8_t, size, type);
×
116
    app_data_instance->rx_format = rx_format_in;
×
117

118
    // Keep rx_buffer in bytes object rx_format for compatibility with unpack.
119
    app_data_instance->rx_bytes_obj.base.type = &mp_type_bytes;
×
120
    app_data_instance->rx_bytes_obj.len = size;
×
121
    app_data_instance->rx_bytes_obj.data = app_data_instance->rx_buffer;
×
122

123
    // Activate callback now that we have allocated the rx_buffer.
124
    pbsys_command_set_write_app_data_callback(handle_incoming_app_data);
×
125

126
    app_data_instance->tx_iter = NULL;
×
127

128
    return MP_OBJ_FROM_PTR(app_data_instance);
×
129
}
130

131
mp_obj_t pb_type_app_data_close(mp_obj_t stream) {
×
132
    if (app_data_instance) {
×
133
        pbsys_command_set_write_app_data_callback(NULL);
×
134
        app_data_instance = NULL;
×
135
    }
136
    return mp_const_none;
×
137
}
138
MP_DEFINE_CONST_FUN_OBJ_1(pb_type_app_data_close_obj, pb_type_app_data_close);
139

140
static const mp_rom_map_elem_t pb_type_app_data_locals_dict_table[] = {
141
    { MP_ROM_QSTR(MP_QSTR___del__),      MP_ROM_PTR(&pb_type_app_data_close_obj) },
142
    { MP_ROM_QSTR(MP_QSTR_close),        MP_ROM_PTR(&pb_type_app_data_close_obj) },
143
    { MP_ROM_QSTR(MP_QSTR_get_bytes),    MP_ROM_PTR(&pb_type_app_data_get_bytes_obj) },
144
    { MP_ROM_QSTR(MP_QSTR_get_values),   MP_ROM_PTR(&pb_type_app_data_get_values_obj) },
145
    { MP_ROM_QSTR(MP_QSTR_write_bytes),    MP_ROM_PTR(&pb_type_app_data_write_bytes_obj) },
146
};
147
static MP_DEFINE_CONST_DICT(pb_type_app_data_locals_dict, pb_type_app_data_locals_dict_table);
148

149
MP_DEFINE_CONST_OBJ_TYPE(pb_type_app_data,
150
    MP_QSTR_AppData,
151
    MP_TYPE_FLAG_NONE,
152
    make_new, pb_type_app_data_make_new,
153
    locals_dict, &pb_type_app_data_locals_dict);
154

155
#endif // PYBRICKS_PY_TOOLS_APP_DATA
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc