• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4856

17 Nov 2025 09:53AM UTC coverage: 64.286% (+0.2%) from 64.039%
#4856

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

218 of 311 new or added lines in 32 files covered. (70.1%)

4657 existing lines in 112 files now uncovered.

151658 of 235910 relevant lines covered (64.29%)

116320814.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.49
/utils/test/c/tmq_offset_test.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <assert.h>
17
#include <stdio.h>
18
#include <stdlib.h>
19
#include <string.h>
20
#include <time.h>
21
#include "taos.h"
22
#include "types.h"
23

24
int buildData(TAOS* pConn){
×
25
  TAOS_RES* pRes = taos_query(pConn, "drop topic if exists tp");
×
26
  if (taos_errno(pRes) != 0) {
×
27
    printf("error in drop tp, reason:%s\n", taos_errstr(pRes));
×
28
    return -1;
×
29
  }
30
  taos_free_result(pRes);
×
31

32
  pRes = taos_query(pConn, "drop database if exists db_ts3756");
×
33
  if (taos_errno(pRes) != 0) {
×
34
    printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
×
35
    return -1;
×
36
  }
37
  taos_free_result(pRes);
×
38

39
  pRes = taos_query(pConn, "create database if not exists db_ts3756 vgroups 2 wal_retention_period 3600");
×
40
  if (taos_errno(pRes) != 0) {
×
41
    printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
×
42
    return -1;
×
43
  }
44
  taos_free_result(pRes);
×
45

46
  pRes = taos_query(pConn, "use db_ts3756");
×
47
  if (taos_errno(pRes) != 0) {
×
48
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
×
49
    return -1;
×
50
  }
51
  taos_free_result(pRes);
×
52

53
  pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
×
54
  if (taos_errno(pRes) != 0) {
×
55
    printf("failed to create table meters, reason:%s\n", taos_errstr(pRes));
×
56
    return -1;
×
57
  }
58
  taos_free_result(pRes);
×
59

60
  pRes = taos_query(pConn, "insert into t1 values(now, 1)");
×
61
  if (taos_errno(pRes) != 0) {
×
62
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
×
63
    return -1;
×
64
  }
65
  taos_free_result(pRes);
×
66

67
  pRes = taos_query(pConn, "insert into t1 values(now + 1s, 2)");
×
68
  if (taos_errno(pRes) != 0) {
×
69
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
×
70
    return -1;
×
71
  }
72
  taos_free_result(pRes);
×
73

74
  pRes = taos_query(pConn, "create topic tp as select * from t1");
×
75
  if (taos_errno(pRes) != 0) {
×
76
    printf("failed to create topic tp, reason:%s\n", taos_errstr(pRes));
×
77
    return -1;
×
78
  }
79
  taos_free_result(pRes);
×
80
  return 0;
×
81
}
82

83
void test_ts5679(TAOS* pConn){
205✔
84
  TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t_5679");
205✔
85
  ASSERT(taos_errno(pRes) == 0);
205✔
86
  taos_free_result(pRes);
205✔
87

88
  pRes = taos_query(pConn, "drop database if exists db_ts5679");
205✔
89
  ASSERT(taos_errno(pRes) == 0);
205✔
90
  taos_free_result(pRes);
205✔
91

92
  pRes = taos_query(pConn, "create database if not exists db_ts5679 vgroups 1 wal_retention_period 3600");
205✔
93
  ASSERT(taos_errno(pRes) == 0);
205✔
94
  taos_free_result(pRes);
205✔
95

96
  pRes = taos_query(pConn, "create topic t_5679 as database db_ts5679");
205✔
97
  ASSERT(taos_errno(pRes) == 0);
205✔
98
  taos_free_result(pRes);
205✔
99

100
  pRes = taos_query(pConn, "use db_ts5679");
205✔
101
  ASSERT(taos_errno(pRes) == 0);
205✔
102
  taos_free_result(pRes);
205✔
103

104
  pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
205✔
105
  ASSERT(taos_errno(pRes) == 0);
205✔
106
  taos_free_result(pRes);
205✔
107

108
  pRes = taos_query(pConn, "insert into t1 values(now, 1)");
205✔
109
  ASSERT(taos_errno(pRes) == 0);
205✔
110
  taos_free_result(pRes);
205✔
111

112
  tmq_conf_t* conf = tmq_conf_new();
205✔
113

114
  tmq_conf_set(conf, "enable.auto.commit", "false");
205✔
115
  tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
205✔
116
  tmq_conf_set(conf, "group.id", "group_id_2");
205✔
117
  tmq_conf_set(conf, "td.connect.user", "root");
205✔
118
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
205✔
119
  tmq_conf_set(conf, "auto.offset.reset", "earliest");
205✔
120
  tmq_conf_set(conf, "msg.with.table.name", "false");
205✔
121

122
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
205✔
123
  tmq_conf_destroy(conf);
205✔
124

125
  // 创建订阅 topics 列表
126
  tmq_list_t* topicList = tmq_list_new();
205✔
127
  tmq_list_append(topicList, "t_5679");
205✔
128

129
  // 启动订阅
130
  tmq_subscribe(tmq, topicList);
205✔
131
  tmq_list_destroy(topicList);
205✔
132

133
  while(1){
134
    pRes = tmq_consumer_poll(tmq, 1000);
410✔
135
    if (pRes == NULL){
410✔
136
      break;
205✔
137
    }
138
    taosSsleep(3);
205✔
139
  }
140
  tmq_topic_assignment* pAssign = NULL;
205✔
141
  int32_t numOfAssign = 0;
205✔
142
  int32_t code = tmq_get_topic_assignment(tmq, "t_5679", &pAssign, &numOfAssign);
205✔
143
  ASSERT (code == 0);
205✔
144

145
  for(int i = 0; i < numOfAssign; i++){
410✔
146
    int64_t committed = tmq_committed(tmq, "t_5679", pAssign[i].vgId);
205✔
147
    printf("committed offset:%"PRId64"\n", committed);
205✔
148
    ASSERT(committed == TSDB_CODE_TMQ_NO_COMMITTED);
205✔
149
  }
150

151
  taos_free_result(pRes);
205✔
152
}
205✔
153

154
void test_offset(TAOS* pConn){
×
155
  if(buildData(pConn) != 0){
×
156
    ASSERT(0);
×
157
  }
158
  tmq_conf_t* conf = tmq_conf_new();
×
159

160
  tmq_conf_set(conf, "enable.auto.commit", "false");
×
161
  tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
×
162
  tmq_conf_set(conf, "group.id", "group_id_2");
×
163
  tmq_conf_set(conf, "td.connect.user", "root");
×
164
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
×
165
  tmq_conf_set(conf, "auto.offset.reset", "earliest");
×
166
  tmq_conf_set(conf, "msg.with.table.name", "false");
×
NEW
167
  tmq_conf_set(conf, "enable.wal.marker", "true");
×
168

UNCOV
169
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
×
170
  tmq_conf_destroy(conf);
×
171

172
  // 创建订阅 topics 列表
UNCOV
173
  tmq_list_t* topicList = tmq_list_new();
×
174
  tmq_list_append(topicList, "tp");
×
175

176
  // 启动订阅
UNCOV
177
  tmq_subscribe(tmq, topicList);
×
178
  tmq_list_destroy(topicList);
×
179

UNCOV
180
  int32_t     timeout = 200;
×
181

UNCOV
182
  tmq_topic_assignment* pAssign1 = NULL;
×
183
  int32_t numOfAssign1 = 0;
×
184

UNCOV
185
  tmq_topic_assignment* pAssign2 = NULL;
×
186
  int32_t numOfAssign2 = 0;
×
187

UNCOV
188
  tmq_topic_assignment* pAssign3 = NULL;
×
189
  int32_t numOfAssign3 = 0;
×
190

UNCOV
191
  int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign1, &numOfAssign1);
×
192
  if (code != 0) {
×
193
    printf("error occurs:%s\n", tmq_err2str(code));
×
194
    tmq_free_assignment(pAssign1);
×
195
    tmq_consumer_close(tmq);
×
196
    ASSERT(0);
×
197
  }
198

UNCOV
199
  code = tmq_get_topic_assignment(tmq, "tp", &pAssign2, &numOfAssign2);
×
200
  if (code != 0) {
×
201
    printf("error occurs:%s\n", tmq_err2str(code));
×
202
    tmq_free_assignment(pAssign2);
×
203
    tmq_consumer_close(tmq);
×
204
    ASSERT(0);
×
205
  }
206

UNCOV
207
  code = tmq_get_topic_assignment(tmq, "tp", &pAssign3, &numOfAssign3);
×
208
  if (code != 0) {
×
209
    printf("error occurs:%s\n", tmq_err2str(code));
×
210
    tmq_free_assignment(pAssign3);
×
211
    tmq_consumer_close(tmq);
×
212
    ASSERT(0);
×
213
    return;
×
214
  }
215

UNCOV
216
  ASSERT(numOfAssign1 == 2);
×
217
  ASSERT(numOfAssign1 == numOfAssign2);
×
218
  ASSERT(numOfAssign1 == numOfAssign3);
×
219

UNCOV
220
  for(int i = 0; i < numOfAssign1; i++){
×
221
    int j = 0;
×
222
    int k = 0;
×
223
    for(; j < numOfAssign2; j++){
×
224
      if(pAssign1[i].vgId == pAssign2[j].vgId){
×
225
        break;
×
226
      }
227
    }
UNCOV
228
    for(; k < numOfAssign3; k++){
×
229
      if(pAssign1[i].vgId == pAssign3[k].vgId){
×
230
        break;
×
231
      }
232
    }
233

UNCOV
234
    ASSERT(pAssign1[i].currentOffset == pAssign2[j].currentOffset);
×
235
    ASSERT(pAssign1[i].currentOffset == pAssign3[k].currentOffset);
×
236

UNCOV
237
    ASSERT(pAssign1[i].begin == pAssign2[j].begin);
×
238
    ASSERT(pAssign1[i].begin == pAssign3[k].begin);
×
239

UNCOV
240
    ASSERT(pAssign1[i].end == pAssign2[j].end);
×
241
    ASSERT(pAssign1[i].end == pAssign3[k].end);
×
242
  }
UNCOV
243
  tmq_free_assignment(pAssign1);
×
244
  tmq_free_assignment(pAssign2);
×
245
  tmq_free_assignment(pAssign3);
×
246

UNCOV
247
  int cnt = 0;
×
248
  int offset1 = -1;
×
249
  int offset2 = -1;
×
250
  while (cnt++ < 10) {
×
251
    printf("start to poll:%d\n", cnt);
×
252
    TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
×
253
    if (pRes) {
×
254
      tmq_topic_assignment* pAssign = NULL;
×
255
      int32_t numOfAssign = 0;
×
256

UNCOV
257
      code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
×
258
      if (code != 0) {
×
259
        printf("error occurs:%s\n", tmq_err2str(code));
×
260
        tmq_free_assignment(pAssign);
×
261
        tmq_consumer_close(tmq);
×
262
        ASSERT(0);
×
263
      }
264

UNCOV
265
      for(int i = 0; i < numOfAssign; i++){
×
266
        int64_t position = tmq_position(tmq, "tp", pAssign[i].vgId);
×
267
        if(position == 0) continue;
×
268

UNCOV
269
        printf("position = %d\n", (int)position);
×
270
        tmq_commit_offset_sync(tmq, "tp", pAssign[i].vgId, position);
×
271
        int64_t committed = tmq_committed(tmq, "tp", pAssign[i].vgId);
×
272
        ASSERT(position == committed);
×
273
      }
274

UNCOV
275
      tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
×
276
      tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
×
277

UNCOV
278
      if(offset1 != -1){
×
279
        ASSERT(offset1 == pAssign[0].currentOffset);
×
280
      }
UNCOV
281
      if(offset2 != -1){
×
282
        ASSERT(offset2 == pAssign[1].currentOffset);
×
283
      }
284

UNCOV
285
      offset1 = pAssign[0].currentOffset;
×
286
      offset2 = pAssign[1].currentOffset;
×
287

UNCOV
288
      tmq_free_assignment(pAssign);
×
289

UNCOV
290
      taos_free_result(pRes);
×
291
    }
292
  }
293

UNCOV
294
  tmq_consumer_close(tmq);
×
295
}
296

297
// run taosBenchmark first
UNCOV
298
void test_ts3756(TAOS* pConn){
×
299
  TAOS_RES*pRes = taos_query(pConn, "use test");
×
300
  if (taos_errno(pRes) != 0) {
×
301
    ASSERT(0);
×
302
  }
UNCOV
303
  taos_free_result(pRes);
×
304
  pRes = taos_query(pConn, "drop topic if exists t1");
×
305
  if (taos_errno(pRes) != 0) {
×
306
    ASSERT(0);
×
307
  }
UNCOV
308
  taos_free_result(pRes);
×
309
  pRes = taos_query(pConn, "create topic t1 as select * from meters");
×
310
  if (taos_errno(pRes) != 0) {
×
311
    ASSERT(0);
×
312
  }
UNCOV
313
  taos_free_result(pRes);
×
314
  tmq_conf_t* conf = tmq_conf_new();
×
315

UNCOV
316
  tmq_conf_set(conf, "enable.auto.commit", "false");
×
317
  tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
×
318
  tmq_conf_set(conf, "group.id", "group_id_2");
×
319
  tmq_conf_set(conf, "td.connect.user", "root");
×
320
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
×
321
  tmq_conf_set(conf, "auto.offset.reset", "latest");
×
322
  tmq_conf_set(conf, "msg.with.table.name", "false");
×
323

UNCOV
324
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
×
325
  tmq_conf_destroy(conf);
×
326

327
  // 创建订阅 topics 列表
UNCOV
328
  tmq_list_t* topicList = tmq_list_new();
×
329
  tmq_list_append(topicList, "t1");
×
330

331
  // 启动订阅
UNCOV
332
  tmq_subscribe(tmq, topicList);
×
333
  tmq_list_destroy(topicList);
×
334

UNCOV
335
  int32_t     timeout = 200;
×
336

UNCOV
337
  tmq_topic_assignment* pAssign = NULL;
×
338
  int32_t numOfAssign = 0;
×
339

340
  while (1) {
341
//    printf("start to poll\n");
342

UNCOV
343
    pRes = tmq_consumer_poll(tmq, timeout);
×
344
    if (pRes) {
×
345
      tmq_topic_assignment* pAssignTmp = NULL;
×
346
      int32_t numOfAssignTmp = 0;
×
347

UNCOV
348
      int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssignTmp, &numOfAssignTmp);
×
349
      if (code != 0) {
×
350
        printf("error occurs:%s\n", tmq_err2str(code));
×
351
        tmq_free_assignment(pAssign);
×
352
        tmq_consumer_close(tmq);
×
353
        ASSERT(0);
×
354
      }
355

UNCOV
356
      if(numOfAssign != 0){
×
357
        int i = 0;
×
358
        for(; i < numOfAssign; i++){
×
359
          if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
×
360
            break;
×
361
          }
362
        }
UNCOV
363
        if(i == numOfAssign){
×
364
          ASSERT(0);
×
365
        }
UNCOV
366
        tmq_free_assignment(pAssign);
×
367
      }
UNCOV
368
      numOfAssign = numOfAssignTmp;
×
369
      pAssign = pAssignTmp;
×
370
      taos_free_result(pRes);
×
371
    }
372
  }
373

374
  tmq_free_assignment(pAssign);
375
}
376

377
int main(int argc, char* argv[]) {
205✔
378
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
205✔
379
  if (argc == 2) {
205✔
380
    test_ts5679(pConn);
205✔
381
  }else{
UNCOV
382
    test_offset(pConn);
×
383
    test_ts3756(pConn);
×
384
  }
385

386
  taos_close(pConn);
205✔
387
  return 0;
205✔
388
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc