• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4308

14 Jun 2025 02:06PM UTC coverage: 62.454% (-0.3%) from 62.777%
#4308

push

travis-ci

web-flow
fix: taosdump windows pthread_mutex_unlock crash(3.0) (#31357)

* fix: windows pthread_mutex_unlock crash

* enh: sync from main fix taosdump crash windows

* fix: restore .github action branch to main

153985 of 315105 branches covered (48.87%)

Branch coverage included in aggregate %.

238120 of 312727 relevant lines covered (76.14%)

6462519.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.14
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24
#include "tref.h"
25
#include "ttimer.h"
26

27
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
28
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO  || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  INFO  ", DEBUG_INFO,  tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
29
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30

31
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
32
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
33
#define DEFAULT_HEARTBEAT_INTERVAL     3000
34
#define DEFAULT_ASKEP_INTERVAL         1000
35
#define DEFAULT_COMMIT_CNT             1
36
#define SUBSCRIBE_RETRY_MAX_COUNT      240
37
#define SUBSCRIBE_RETRY_INTERVAL       500
38

39

40
#define SET_ERROR_MSG_TMQ(MSG) \
41
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
42

43
#define PROCESS_POLL_RSP(FUNC,DATA) \
44
  SDecoder decoder = {0}; \
45
  tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \
46
  if (FUNC(&decoder, DATA) < 0) { \
47
    tDecoderClear(&decoder); \
48
    code = terrno; \
49
    goto END;\
50
  }\
51
  tDecoderClear(&decoder);\
52
  (void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
53

54
#define DELETE_POLL_RSP(FUNC,DATA) \
55
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
56
  taosMemoryFreeClear(pRsp->pEpset);             \
57
  taosMemoryFreeClear(pRsp->data);               \
58
  FUNC(DATA);
59

60
enum {
61
  TMQ_VG_STATUS__IDLE = 0,
62
  TMQ_VG_STATUS__WAIT,
63
};
64

65
enum {
66
  TMQ_CONSUMER_STATUS__INIT = 0,
67
  TMQ_CONSUMER_STATUS__READY,
68
  TMQ_CONSUMER_STATUS__LOST,
69
  TMQ_CONSUMER_STATUS__CLOSED,
70
};
71

72
enum {
73
  TMQ_DELAYED_TASK__ASK_EP = 1,
74
  TMQ_DELAYED_TASK__COMMIT,
75
};
76

77
typedef struct {
78
  tmr_h               timer;
79
  int32_t             rsetId;
80
  TdThreadMutex       lock;
81
} SMqMgmt;
82

83
struct tmq_list_t {
84
  SArray container;
85
};
86

87
struct tmq_conf_t {
88
  char           clientId[TSDB_CLIENT_ID_LEN];
89
  char           groupId[TSDB_CGROUP_LEN];
90
  int8_t         autoCommit;
91
  int8_t         resetOffset;
92
  int8_t         withTbName;
93
  int8_t         snapEnable;
94
  int8_t         replayEnable;
95
  int8_t         sourceExcluded;  // do not consume, bit
96
  int8_t         rawData;         // fetch raw data
97
  int32_t        maxPollWaitTime;
98
  int32_t        minPollRows;
99
  uint16_t       port;
100
  int32_t        autoCommitInterval;
101
  int32_t        sessionTimeoutMs;
102
  int32_t        heartBeatIntervalMs;
103
  int32_t        maxPollIntervalMs;
104
  char*          ip;
105
  char*          user;
106
  char*          pass;
107
  tmq_commit_cb* commitCb;
108
  void*          commitCbUserParam;
109
  int8_t         enableBatchMeta;
110
};
111

112
struct tmq_t {
113
  int64_t        refId;
114
  char           groupId[TSDB_CGROUP_LEN];
115
  char           clientId[TSDB_CLIENT_ID_LEN];
116
  char           user[TSDB_USER_LEN];
117
  char           fqdn[TSDB_FQDN_LEN];
118
  int8_t         withTbName;
119
  int8_t         useSnapshot;
120
  int8_t         autoCommit;
121
  int32_t        autoCommitInterval;
122
  int32_t        sessionTimeoutMs;
123
  int32_t        heartBeatIntervalMs;
124
  int32_t        maxPollIntervalMs;
125
  int8_t         resetOffsetCfg;
126
  int8_t         replayEnable;
127
  int8_t         sourceExcluded;  // do not consume, bit
128
  int8_t         rawData;         // fetch raw data
129
  int32_t        maxPollWaitTime;
130
  int32_t        minPollRows;
131
  int64_t        consumerId;
132
  tmq_commit_cb* commitCb;
133
  void*          commitCbUserParam;
134
  int8_t         enableBatchMeta;
135

136
  // status
137
  SRWLatch lock;
138
  int8_t   status;
139
  int32_t  epoch;
140
  // poll info
141
  int64_t pollCnt;
142
  int64_t totalRows;
143
  int8_t  pollFlag;
144

145
  // timer
146
  tmr_h       hbLiveTimer;
147
  tmr_h       epTimer;
148
  tmr_h       commitTimer;
149
  STscObj*    pTscObj;       // connection
150
  SArray*     clientTopics;  // SArray<SMqClientTopic>
151
  STaosQueue* mqueue;        // queue of rsp
152
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
153
  tsem2_t     rspSem;
154
};
155

156
typedef struct {
157
  int32_t code;
158
  tsem2_t sem;
159
} SAskEpInfo;
160

161
typedef struct {
162
  STqOffsetVal committedOffset;
163
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
164
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
165
  int64_t      walVerBegin;
166
  int64_t      walVerEnd;
167
} SVgOffsetInfo;
168

169
typedef struct {
170
  int64_t       pollCnt;
171
  int64_t       numOfRows;
172
  SVgOffsetInfo offsetInfo;
173
  int32_t       vgId;
174
  int32_t       vgStatus;
175
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
176
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
177
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
178
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
179
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
180
  SEpSet        epSet;
181
} SMqClientVg;
182

183
typedef struct {
184
  char           topicName[TSDB_TOPIC_FNAME_LEN];
185
  char           db[TSDB_DB_FNAME_LEN];
186
  SArray*        vgs;  // SArray<SMqClientVg>
187
  SSchemaWrapper schema;
188
  int8_t         noPrivilege;
189
} SMqClientTopic;
190

191
typedef struct {
192
  int32_t         vgId;
193
  char            topicName[TSDB_TOPIC_FNAME_LEN];
194
  SMqClientTopic* topicHandle;
195
  uint64_t        reqId;
196
  SEpSet*         pEpset;
197
  void*           data;
198
  uint32_t        len;
199
  union {
200
    struct{
201
      SMqRspHead   head;
202
      STqOffsetVal rspOffset;
203
    };
204
    SMqDataRsp      dataRsp;
205
    SMqMetaRsp      metaRsp;
206
    SMqBatchMetaRsp batchMetaRsp;
207
  };
208
} SMqPollRspWrapper;
209

210
typedef struct {
211
  int32_t code;
212
  int8_t  tmqRspType;
213
  int32_t epoch;
214
  union{
215
    SMqPollRspWrapper pollRsp;
216
    SMqAskEpRsp       epRsp;
217
  };
218
} SMqRspWrapper;
219

220
typedef struct {
221
  tsem2_t rspSem;
222
  int32_t rspErr;
223
} SMqSubscribeCbParam;
224

225
typedef struct {
226
  int64_t refId;
227
  bool    sync;
228
  void*   pParam;
229
} SMqAskEpCbParam;
230

231
typedef struct {
232
  int64_t  refId;
233
  char     topicName[TSDB_TOPIC_FNAME_LEN];
234
  int32_t  vgId;
235
  uint64_t requestId;  // request id for debug purpose
236
} SMqPollCbParam;
237

238
typedef struct {
239
  tsem2_t       rsp;
240
  int32_t       numOfRsp;
241
  SArray*       pList;
242
  TdThreadMutex mutex;
243
  int64_t       consumerId;
244
  char*         pTopicName;
245
  int32_t       code;
246
} SMqVgCommon;
247

248
typedef struct {
249
  tsem2_t sem;
250
  int32_t code;
251
} SMqSeekParam;
252

253
typedef struct {
254
  tsem2_t     sem;
255
  int32_t     code;
256
  SMqVgOffset vgOffset;
257
} SMqCommittedParam;
258

259
typedef struct {
260
  int32_t      vgId;
261
  int32_t      epoch;
262
  int32_t      totalReq;
263
  SMqVgCommon* pCommon;
264
} SMqVgWalInfoParam;
265

266
typedef struct {
267
  int64_t        refId;
268
  int32_t        epoch;
269
  int32_t        waitingRspNum;
270
  int32_t        code;
271
  tmq_commit_cb* callbackFn;
272
  void*          userParam;
273
} SMqCommitCbParamSet;
274

275
typedef struct {
276
  SMqCommitCbParamSet* params;
277
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
278
  int32_t              vgId;
279
  int64_t              consumerId;
280
} SMqCommitCbParam;
281

282
typedef struct {
283
  tsem2_t sem;
284
  int32_t code;
285
} SSyncCommitInfo;
286

287
typedef struct {
288
  STqOffsetVal currentOffset;
289
  STqOffsetVal commitOffset;
290
  STqOffsetVal seekOffset;
291
  int64_t      numOfRows;
292
  int32_t      vgStatus;
293
} SVgroupSaveInfo;
294

295
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
296
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
297
static   SMqMgmt        tmqMgmt = {0};
298

299
tmq_conf_t* tmq_conf_new() {
451✔
300
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
451!
301
  if (conf == NULL) {
454!
302
    return conf;
×
303
  }
304

305
  conf->withTbName = false;
454✔
306
  conf->autoCommit = true;
454✔
307
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
454✔
308
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
454✔
309
  conf->enableBatchMeta = false;
454✔
310
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
454✔
311
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
454✔
312
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
454✔
313
  conf->maxPollWaitTime = DEFAULT_MAX_POLL_WAIT_TIME;
454✔
314
  conf->minPollRows = DEFAULT_MIN_POLL_ROWS;
454✔
315

316
  return conf;
454✔
317
}
318

319
void tmq_conf_destroy(tmq_conf_t* conf) {
455✔
320
  if (conf) {
455!
321
    if (conf->ip) {
455✔
322
      taosMemoryFree(conf->ip);
2!
323
    }
324
    if (conf->user) {
455✔
325
      taosMemoryFree(conf->user);
453!
326
    }
327
    if (conf->pass) {
455✔
328
      taosMemoryFree(conf->pass);
453!
329
    }
330
    taosMemoryFree(conf);
455!
331
  }
332
}
455✔
333

334
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
3,136✔
335
  int32_t code = 0;
3,136✔
336
  if (conf == NULL || key == NULL || value == NULL) {
3,136!
337
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
338
    return TMQ_CONF_INVALID;
×
339
  }
340
  if (strcasecmp(key, "group.id") == 0) {
3,147✔
341
    if (strchr(value, TMQ_SEPARATOR_CHAR) != NULL) {
455!
342
      tqErrorC("invalid group.id:%s, can not contains ':'", value);
×
343
      return TMQ_CONF_INVALID;
×
344
    }
345
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
455✔
346
    return TMQ_CONF_OK;
455✔
347
  }
348

349
  if (strcasecmp(key, "client.id") == 0) {
2,692✔
350
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
60✔
351
    return TMQ_CONF_OK;
60✔
352
  }
353

354
  if (strcasecmp(key, "enable.auto.commit") == 0) {
2,632✔
355
    if (strcasecmp(value, "true") == 0) {
436✔
356
      conf->autoCommit = true;
223✔
357
      return TMQ_CONF_OK;
223✔
358
    } else if (strcasecmp(value, "false") == 0) {
213!
359
      conf->autoCommit = false;
213✔
360
      return TMQ_CONF_OK;
213✔
361
    } else {
362
      tqErrorC("invalid value for enable.auto.commit:%s", value);
×
363
      return TMQ_CONF_INVALID;
×
364
    }
365
  }
366

367
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
2,196✔
368
    int64_t tmp;
369
    code = taosStr2int64(value, &tmp);
261✔
370
    if (tmp < 0 || code != 0) {
261!
371
      tqErrorC("invalid value for auto.commit.interval.ms:%s", value);
×
372
      return TMQ_CONF_INVALID;
×
373
    }
374
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
261✔
375
    return TMQ_CONF_OK;
261✔
376
  }
377

378
  if (strcasecmp(key, "session.timeout.ms") == 0) {
1,935✔
379
    int64_t tmp;
380
    code = taosStr2int64(value, &tmp);
4✔
381
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
4!
382
      tqErrorC("invalid value for session.timeout.ms:%s", value);
×
383
      return TMQ_CONF_INVALID;
×
384
    }
385
    conf->sessionTimeoutMs = tmp;
4✔
386
    return TMQ_CONF_OK;
4✔
387
  }
388

389
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
1,931✔
390
    int64_t tmp;
391
    code = taosStr2int64(value, &tmp);
1✔
392
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
1!
393
      tqErrorC("invalid value for heartbeat.interval.ms:%s", value);
1!
394
      return TMQ_CONF_INVALID;
1✔
395
    }
396
    conf->heartBeatIntervalMs = tmp;
×
397
    return TMQ_CONF_OK;
×
398
  }
399

400
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
1,930✔
401
    int32_t tmp;
402
    code = taosStr2int32(value, &tmp);
3✔
403
    if (tmp < 1000 || code != 0) {
3!
404
      tqErrorC("invalid value for max.poll.interval.ms:%s", value);
×
405
      return TMQ_CONF_INVALID;
×
406
    }
407
    conf->maxPollIntervalMs = tmp;
3✔
408
    return TMQ_CONF_OK;
3✔
409
  }
410

411
  if (strcasecmp(key, "auto.offset.reset") == 0) {
1,927✔
412
    if (strcasecmp(value, "none") == 0) {
421✔
413
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
11✔
414
      return TMQ_CONF_OK;
11✔
415
    } else if (strcasecmp(value, "earliest") == 0) {
410✔
416
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
390✔
417
      return TMQ_CONF_OK;
390✔
418
    } else if (strcasecmp(value, "latest") == 0) {
20!
419
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
22✔
420
      return TMQ_CONF_OK;
22✔
421
    } else {
422
      tqErrorC("invalid value for auto.offset.reset:%s", value);
×
423
      return TMQ_CONF_INVALID;
×
424
    }
425
  }
426

427
  if (strcasecmp(key, "msg.with.table.name") == 0) {
1,506✔
428
    if (strcasecmp(value, "true") == 0) {
403✔
429
      conf->withTbName = true;
381✔
430
      return TMQ_CONF_OK;
381✔
431
    } else if (strcasecmp(value, "false") == 0) {
22!
432
      conf->withTbName = false;
22✔
433
      return TMQ_CONF_OK;
22✔
434
    } else {
435
      tqErrorC("invalid value for msg.with.table.name:%s", value);
×
436
      return TMQ_CONF_INVALID;
×
437
    }
438
  }
439

440
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
1,103✔
441
    if (strcasecmp(value, "true") == 0) {
140✔
442
      conf->snapEnable = true;
105✔
443
      return TMQ_CONF_OK;
105✔
444
    } else if (strcasecmp(value, "false") == 0) {
35!
445
      conf->snapEnable = false;
35✔
446
      return TMQ_CONF_OK;
35✔
447
    } else {
448
      tqErrorC("invalid value for experimental.snapshot.enable:%s", value);
×
449
      return TMQ_CONF_INVALID;
×
450
    }
451
  }
452

453
  if (strcasecmp(key, "td.connect.ip") == 0) {
963✔
454
    void *tmp = taosStrdup(value);
2!
455
    if (tmp == NULL) {
2!
456
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
457
      return TMQ_CONF_INVALID;
×
458
    }
459
    conf->ip = tmp;
2✔
460
    return TMQ_CONF_OK;
2✔
461
  }
462

463
  if (strcasecmp(key, "td.connect.user") == 0) {
961✔
464
    void *tmp = taosStrdup(value);
448!
465
    if (tmp == NULL) {
448!
466
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
467
      return TMQ_CONF_INVALID;
×
468
    }
469
    conf->user = tmp;
448✔
470
    return TMQ_CONF_OK;
448✔
471
  }
472

473
  if (strcasecmp(key, "td.connect.pass") == 0) {
513✔
474
    void *tmp = taosStrdup(value);
453!
475
    if (tmp == NULL) {
452!
476
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
477
      return TMQ_CONF_INVALID;
×
478
    }
479
    conf->pass = tmp;
452✔
480
    return TMQ_CONF_OK;
452✔
481
  }
482

483
  if (strcasecmp(key, "td.connect.port") == 0) {
60!
484
    int64_t tmp;
485
    code = taosStr2int64(value, &tmp);
×
486
    if (tmp <= 0 || tmp > 65535 || code != 0) {
×
487
      tqErrorC("invalid value for td.connect.port:%s", value);
×
488
      return TMQ_CONF_INVALID;
×
489
    }
490

491
    conf->port = tmp;
×
492
    return TMQ_CONF_OK;
×
493
  }
494

495
  if (strcasecmp(key, "enable.replay") == 0) {
60✔
496
    if (strcasecmp(value, "true") == 0) {
7!
497
      conf->replayEnable = true;
7✔
498
      return TMQ_CONF_OK;
7✔
499
    } else if (strcasecmp(value, "false") == 0) {
×
500
      conf->replayEnable = false;
×
501
      return TMQ_CONF_OK;
×
502
    } else {
503
      tqErrorC("invalid value for enable.replay:%s", value);
×
504
      return TMQ_CONF_INVALID;
×
505
    }
506
  }
507
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
53✔
508
    int64_t tmp = 0;
43✔
509
    code = taosStr2int64(value, &tmp);
43✔
510
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
43!
511
    return TMQ_CONF_OK;
43✔
512
  }
513
  if (strcasecmp(key, "msg.consume.rawdata") == 0) {
10✔
514
    int64_t tmp = 0;
1✔
515
    code = taosStr2int64(value, &tmp);
1✔
516
    conf->rawData = (0 == code && tmp != 0) ? 1 : 0;
1!
517
    return TMQ_CONF_OK;
1✔
518
  }
519

520
  if (strcasecmp(key, "fetch.max.wait.ms") == 0) {
9!
521
    int64_t tmp = 0;
×
522
    code = taosStr2int64(value, &tmp);
×
523
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
×
524
      tqErrorC("invalid value for fetch.max.wait.ms:%s", value);
×
525
      return TMQ_CONF_INVALID;
×
526
    }
527
    conf->maxPollWaitTime = tmp;
×
528
    return TMQ_CONF_OK;
×
529
  }
530

531
  if (strcasecmp(key, "min.poll.rows") == 0) {
9!
532
    int64_t tmp = 0;
×
533
    code = taosStr2int64(value, &tmp);
×
534
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
×
535
      tqErrorC("invalid value for min.poll.rows:%s", value);
×
536
      return TMQ_CONF_INVALID;
×
537
    }
538
    conf->minPollRows = tmp;
×
539
    return TMQ_CONF_OK;
×
540
  }
541

542
  if (strcasecmp(key, "td.connect.db") == 0) {
9!
543
    return TMQ_CONF_OK;
×
544
  }
545

546
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
9✔
547
    int64_t tmp;
548
    code = taosStr2int64(value, &tmp);
4✔
549
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
4!
550
    return TMQ_CONF_OK;
4✔
551
  }
552

553
  tqErrorC("unknown key:%s", key);
5!
554
  return TMQ_CONF_UNKNOWN;
×
555
}
556

557
tmq_list_t* tmq_list_new() {
1,315✔
558
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
1,315✔
559
}
560

561
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
542✔
562
  if (list == NULL) {
542!
563
    return TSDB_CODE_INVALID_PARA;
×
564
  }
565
  SArray* container = &list->container;
542✔
566
  if (src == NULL || src[0] == 0) {
542!
567
    return TSDB_CODE_INVALID_PARA;
×
568
  }
569
  char* topic = taosStrdup(src);
542!
570
  if (topic == NULL) return terrno;
542!
571
  if (taosArrayPush(container, &topic) == NULL) {
542!
572
    taosMemoryFree(topic);
×
573
    return terrno;
×
574
  }
575
  return 0;
542✔
576
}
577

578
void tmq_list_destroy(tmq_list_t* list) {
1,315✔
579
  if (list == NULL) return;
1,315!
580
  SArray* container = &list->container;
1,315✔
581
  taosArrayDestroyP(container, NULL);
1,315✔
582
}
583

584
int32_t tmq_list_get_size(const tmq_list_t* list) {
7✔
585
  if (list == NULL) {
7!
586
    return TSDB_CODE_INVALID_PARA;
×
587
  }
588
  const SArray* container = &list->container;
7✔
589
  return taosArrayGetSize(container);
7✔
590
}
591

592
char** tmq_list_to_c_array(const tmq_list_t* list) {
7✔
593
  if (list == NULL) {
7!
594
    return NULL;
×
595
  }
596
  const SArray* container = &list->container;
7✔
597
  return container->pData;
7✔
598
}
599

600
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
9,092✔
601
  if (pParamSet == NULL) {
9,092!
602
    return TSDB_CODE_INVALID_PARA;
×
603
  }
604
  int64_t refId = pParamSet->refId;
9,092✔
605
  int32_t code = 0;
9,092✔
606
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
9,092✔
607
  if (tmq == NULL) {
9,092!
608
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
609
  }
610

611
  // if no more waiting rsp
612
  if (pParamSet->callbackFn != NULL) {
9,092✔
613
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
9,073✔
614
  }
615

616
  taosMemoryFree(pParamSet);
9,092!
617
  if (tmq != NULL) {
9,092!
618
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
9,092✔
619
  }
620

621
  return code;
9,092✔
622
}
623

624
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
13,216✔
625
  if (pParamSet == NULL) {
13,216!
626
    return TSDB_CODE_INVALID_PARA;
×
627
  }
628
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
13,216✔
629
  if (waitingRspNum == 0) {
13,216✔
630
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
9,092!
631
             vgId);
632
    return tmqCommitDone(pParamSet);
9,092✔
633
  } else {
634
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
4,124!
635
             waitingRspNum);
636
  }
637
  return 0;
4,125✔
638
}
639

640
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
4,128✔
641
  if (pBuf){
4,128!
642
    taosMemoryFreeClear(pBuf->pData);
4,128!
643
    taosMemoryFreeClear(pBuf->pEpSet);
4,128!
644
  }
645
  if(param == NULL){
4,128!
646
    return TSDB_CODE_INVALID_PARA;
×
647
  }
648
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
4,128✔
649
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
4,128✔
650

651
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
4,128✔
652
}
653

654
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
4,129✔
655
                               SMqCommitCbParamSet* pParamSet) {
656
  if (tmq == NULL || epSet == NULL || offset == NULL || pTopicName == NULL || pParamSet == NULL) {
4,129!
657
    return TSDB_CODE_INVALID_PARA;
×
658
  }
659
  SMqVgOffset pOffset = {0};
4,129✔
660

661
  pOffset.consumerId = tmq->consumerId;
4,129✔
662
  pOffset.offset.val = *offset;
4,129✔
663
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
4,129✔
664
  int32_t len = 0;
4,129✔
665
  int32_t code = 0;
4,129✔
666
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
4,129!
667
  if (code < 0) {
4,129!
668
    return TSDB_CODE_INVALID_PARA;
×
669
  }
670

671
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
4,129!
672
  if (buf == NULL) {
4,129!
673
    return terrno;
×
674
  }
675

676
  ((SMsgHead*)buf)->vgId = htonl(vgId);
4,129✔
677

678
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
4,129✔
679

680
  SEncoder encoder = {0};
4,129✔
681
  tEncoderInit(&encoder, abuf, len);
4,129✔
682
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
4,129!
683
    tEncoderClear(&encoder);
×
684
    taosMemoryFree(buf);
×
685
    return TSDB_CODE_INVALID_PARA;
×
686
  }
687
  tEncoderClear(&encoder);
4,129✔
688

689
  // build param
690
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
4,129!
691
  if (pParam == NULL) {
4,129!
692
    taosMemoryFree(buf);
×
693
    return terrno;
×
694
  }
695

696
  pParam->params = pParamSet;
4,129✔
697
  pParam->vgId = vgId;
4,129✔
698
  pParam->consumerId = tmq->consumerId;
4,129✔
699

700
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
4,129✔
701

702
  // build send info
703
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4,129!
704
  if (pMsgSendInfo == NULL) {
4,129!
705
    taosMemoryFree(buf);
×
706
    taosMemoryFree(pParam);
×
707
    return terrno;
×
708
  }
709

710
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
4,129✔
711

712
  pMsgSendInfo->requestId = generateRequestId();
4,129✔
713
  pMsgSendInfo->requestObjRefId = 0;
4,129✔
714
  pMsgSendInfo->param = pParam;
4,129✔
715
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
4,129✔
716
  pMsgSendInfo->fp = tmqCommitCb;
4,129✔
717
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
4,129✔
718

719
  // int64_t transporterId = 0;
720
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
4,129✔
721
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
4,129✔
722
  if (code != 0) {
4,129!
723
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
724
  }
725
  return code;
4,129✔
726
}
727

728
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
147✔
729
  if (tmq == NULL || pTopicName == NULL || topic == NULL) {
147!
730
    return TSDB_CODE_INVALID_PARA;
×
731
  }
732
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
147✔
733
  for (int32_t i = 0; i < numOfTopics; ++i) {
148✔
734
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
118✔
735
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
118!
736
      continue;
1✔
737
    }
738
    *topic = pTopic;
117✔
739
    return 0;
117✔
740
  }
741

742
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
30!
743
  return TSDB_CODE_TMQ_INVALID_TOPIC;
30✔
744
}
745

746
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
9,096✔
747
                                       SMqCommitCbParamSet** ppParamSet) {
748
  if (tmq == NULL || ppParamSet == NULL) {
9,096!
749
    return TSDB_CODE_INVALID_PARA;
×
750
  }
751
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
9,096!
752
  if (pParamSet == NULL) {
9,096!
753
    return terrno;
×
754
  }
755

756
  pParamSet->refId = tmq->refId;
9,096✔
757
  pParamSet->epoch = tmq->epoch;
9,096✔
758
  pParamSet->callbackFn = pCommitFp;
9,096✔
759
  pParamSet->userParam = userParam;
9,096✔
760
  pParamSet->waitingRspNum = rspNum;
9,096✔
761
  *ppParamSet = pParamSet;
9,096✔
762
  return 0;
9,096✔
763
}
764

765
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
136✔
766
  if (tmq == NULL || pTopicName == NULL || pVg == NULL) {
136!
767
    return TSDB_CODE_INVALID_PARA;
×
768
  }
769
  SMqClientTopic* pTopic = NULL;
136✔
770
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
136✔
771
  if (code != 0) {
136✔
772
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
27!
773
    return code;
27✔
774
  }
775

776
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
109✔
777
  for (int32_t i = 0; i < numOfVgs; ++i) {
253✔
778
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
192✔
779
    if (pClientVg && pClientVg->vgId == vgId) {
192!
780
      *pVg = pClientVg;
48✔
781
      break;
48✔
782
    }
783
  }
784

785
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
109✔
786
}
787

788
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
15,735✔
789
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) {
15,735!
790
    return TSDB_CODE_INVALID_PARA;
×
791
  }
792
  int32_t code = 0;
15,735✔
793
  if (offsetVal->type <= 0) {
15,735✔
794
    code = TSDB_CODE_TMQ_INVALID_MSG;
1,006✔
795
    return code;
1,006✔
796
  }
797
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
14,729✔
798
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
10,600✔
799
    return code;
10,600✔
800
  }
801
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
4,129✔
802
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
4,129✔
803

804
  char commitBuf[TSDB_OFFSET_LEN] = {0};
4,129✔
805
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
4,129✔
806

807
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
4,129✔
808
  if (code != TSDB_CODE_SUCCESS) {
4,129!
809
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
810
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
811
    return code;
×
812
  }
813

814
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
4,129!
815
           tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
816
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
4,129✔
817
  return code;
4,129✔
818
}
819

820
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
8✔
821
                                 tmq_commit_cb* pCommitFp, void* userParam) {
822
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL) {
8!
823
    return TSDB_CODE_INVALID_PARA;
×
824
  }
825
  tqDebugC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
8!
826
  SMqCommitCbParamSet* pParamSet = NULL;
8✔
827
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
8✔
828
  if (code != 0){
8!
829
    return code;
×
830
  }
831

832
  taosRLockLatch(&tmq->lock);
8✔
833
  SMqClientVg* pVg = NULL;
8✔
834
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
8✔
835
  if (code == 0) {
8!
836
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
8✔
837
  }
838
  taosRUnLockLatch(&tmq->lock);
8✔
839

840
  if (code != 0){
8✔
841
    taosMemoryFree(pParamSet);
4!
842
  }
843
  return code;
8✔
844
}
845

846
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
×
847
  char*        pTopicName = NULL;
×
848
  int32_t      vgId = 0;
×
849
  STqOffsetVal offsetVal = {0};
×
850
  int32_t      code = 0;
×
851

852
  if (pRes == NULL || tmq == NULL) {
×
853
    code = TSDB_CODE_INVALID_PARA;
×
854
    goto end;
×
855
  }
856

857
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
×
858
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
859
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
×
860
    pTopicName = pRspObj->topic;
×
861
    vgId = pRspObj->vgId;
×
862
    offsetVal = pRspObj->rspOffset;
×
863
  } else {
864
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
865
    goto end;
×
866
  }
867

868
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
×
869

870
  end:
×
871
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
×
872
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
873
    pCommitFp(tmq, code, userParam);
×
874
  }
875
}
×
876

877
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
9,088✔
878
  if (tmq == NULL || pParamSet == NULL) {
9,088!
879
    return TSDB_CODE_INVALID_PARA;
×
880
  }
881
  int32_t code = 0;
9,088✔
882
  taosRLockLatch(&tmq->lock);
9,088✔
883
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
9,088✔
884
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
9,088!
885

886
  for (int32_t i = 0; i < numOfTopics; i++) {
18,069✔
887
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
8,981✔
888
    if (pTopic == NULL) {
8,981!
889
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
890
      goto END;
×
891
    }
892
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
8,981✔
893
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
8,981!
894
    for (int32_t j = 0; j < numOfVgroups; j++) {
24,708✔
895
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
15,727✔
896
      if (pVg == NULL) {
15,727!
897
        code = terrno;
×
898
        goto END;
×
899
      }
900

901
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
15,727✔
902
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
15,727✔
903
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
1,006!
904
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
905
      }
906
    }
907
  }
908
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
9,088!
909
           numOfTopics);
910
  END:
369✔
911
  taosRUnLockLatch(&tmq->lock);
9,088✔
912
  return code;
9,088✔
913
}
914

915
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
9,088✔
916
  if (tmq == NULL) {
9,088!
917
    return;
×
918
  }
919
  int32_t code = 0;
9,088✔
920
  SMqCommitCbParamSet* pParamSet = NULL;
9,088✔
921
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
922
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
9,088✔
923
  if (code != 0) {
9,088!
924
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
925
    if (pCommitFp != NULL) {
×
926
      pCommitFp(tmq, code, userParam);
×
927
    }
928
    return;
×
929
  }
930
  code = innerCommitAll(tmq, pParamSet);
9,088✔
931
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
9,088✔
932
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
148!
933
  }
934

935
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
9,088✔
936
  if (code != 0) {
9,088!
937
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
938
  }
939
  return;
9,088✔
940
}
941

942
static void generateTimedTask(int64_t refId, int32_t type) {
12,755✔
943
  tmq_t*  tmq = NULL;
12,755✔
944
  int8_t* pTaskType = NULL;
12,755✔
945
  int32_t code = 0;
12,755✔
946

947
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
12,755✔
948
  if (tmq == NULL) return;
12,755!
949

950
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
12,755✔
951
  if (code == TSDB_CODE_SUCCESS) {
12,755!
952
    *pTaskType = type;
12,755✔
953
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
12,755!
954
      if (tsem2_post(&tmq->rspSem) != 0){
12,755!
955
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
956
      }
957
    }else{
958
      taosFreeQitem(pTaskType);
×
959
    }
960
  }
961

962
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
12,755✔
963
  if (code != 0){
12,755!
964
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
965
  }
966
}
967

968
void tmqAssignAskEpTask(void* param, void* tmrId) {
4,255✔
969
  int64_t refId = (int64_t)param;
4,255✔
970
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
4,255✔
971
}
4,255✔
972

973
void tmqReplayTask(void* param, void* tmrId) {
12✔
974
  int64_t refId = (int64_t)param;
12✔
975
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
12✔
976
  if (tmq == NULL) return;
12!
977

978
  if (tsem2_post(&tmq->rspSem) != 0){
12!
979
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
980
  }
981
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
12✔
982
  if (code != 0){
12!
983
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
984
  }
985
}
986

987
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
8,500✔
988
  int64_t refId = (int64_t)param;
8,500✔
989
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
8,500✔
990
}
8,500✔
991

992
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
2,525✔
993
  if (pMsg == NULL) {
2,525!
994
    return TSDB_CODE_INVALID_PARA;
×
995
  }
996

997
  if (param == NULL || code != 0){
2,525!
998
    goto END;
18✔
999
  }
1000

1001
  SMqHbRsp rsp = {0};
2,507✔
1002
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
2,507✔
1003
  if (code != 0) {
2,507!
1004
    goto END;
×
1005
  }
1006

1007
  int64_t refId = (int64_t)param;
2,507✔
1008
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,507✔
1009
  if (tmq != NULL) {
2,507!
1010
    taosWLockLatch(&tmq->lock);
2,507✔
1011
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
4,807✔
1012
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
2,300✔
1013
      if (privilege == NULL) {
2,300!
1014
        continue;
×
1015
      }
1016
      int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
2,300✔
1017
      for (int32_t j = 0; j < topicNumCur; j++) {
5,126✔
1018
        SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
2,826✔
1019
        if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
2,826!
1020
          tqInfoC("consumer:0x%" PRIx64 ", update noPrivilege:%d, topic:%s", tmq->consumerId, privilege->noPrivilege, privilege->topic);
2,294!
1021
          pTopicCur->noPrivilege = privilege->noPrivilege;
2,294✔
1022
        }
1023
      }
1024
    }
1025
    taosWUnLockLatch(&tmq->lock);
2,507✔
1026
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
2,507✔
1027
    if (code != 0){
2,507!
1028
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
1029
    }
1030
  }
1031

1032
  tqClientDebugFlag = rsp.debugFlag;
2,507✔
1033

1034
  tDestroySMqHbRsp(&rsp);
2,507✔
1035

1036
  END:
2,525✔
1037
  taosMemoryFree(pMsg->pData);
2,525!
1038
  taosMemoryFree(pMsg->pEpSet);
2,525!
1039
  return code;
2,525✔
1040
}
1041

1042
void tmqSendHbReq(void* param, void* tmrId) {
2,525✔
1043
  int64_t refId = (int64_t)param;
2,525✔
1044

1045
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,525✔
1046
  if (tmq == NULL) {
2,525!
1047
    return;
×
1048
  }
1049

1050
  SMqHbReq req = {0};
2,525✔
1051
  req.consumerId = tmq->consumerId;
2,525✔
1052
  req.epoch = tmq->epoch;
2,525✔
1053
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
2,525✔
1054
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
2,525!
1055
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
2,525✔
1056
  if (req.topics == NULL) {
2,525!
1057
    goto END;
×
1058
  }
1059
  taosRLockLatch(&tmq->lock);
2,525✔
1060
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
4,888✔
1061
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
2,363✔
1062
    if (pTopic == NULL) {
2,363!
1063
      continue;
×
1064
    }
1065
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
2,363✔
1066
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
2,363✔
1067
    if (data == NULL) {
2,363!
1068
      continue;
×
1069
    }
1070
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
2,363✔
1071
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
2,363✔
1072
    if (data->offsetRows == NULL) {
2,363!
1073
      continue;
×
1074
    }
1075
    for (int j = 0; j < numOfVgroups; j++) {
8,218✔
1076
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
5,855✔
1077
      if (pVg == NULL) {
5,855!
1078
        continue;
×
1079
      }
1080
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
5,855✔
1081
      if (offRows == NULL) {
5,855!
1082
        continue;
×
1083
      }
1084
      offRows->vgId = pVg->vgId;
5,855✔
1085
      offRows->rows = pVg->numOfRows;
5,855✔
1086
      offRows->offset = pVg->offsetInfo.endOffset;
5,855✔
1087
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
5,855✔
1088
      char buf[TSDB_OFFSET_LEN] = {0};
5,855✔
1089
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
5,855✔
1090
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
5,855!
1091
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1092
    }
1093
  }
1094
  taosRUnLockLatch(&tmq->lock);
2,525✔
1095

1096
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
2,525✔
1097
  if (tlen < 0) {
2,525!
1098
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1099
    goto END;
×
1100
  }
1101

1102
  void* pReq = taosMemoryCalloc(1, tlen);
2,525!
1103
  if (pReq == NULL) {
2,525!
1104
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1105
    goto END;
×
1106
  }
1107

1108
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
2,525!
1109
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1110
    taosMemoryFree(pReq);
×
1111
    goto END;
×
1112
  }
1113

1114
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,525!
1115
  if (sendInfo == NULL) {
2,525!
1116
    taosMemoryFree(pReq);
×
1117
    goto END;
×
1118
  }
1119

1120
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2,525✔
1121

1122
  sendInfo->requestId = generateRequestId();
2,525✔
1123
  sendInfo->requestObjRefId = 0;
2,525✔
1124
  sendInfo->param = (void*)refId;
2,525✔
1125
  sendInfo->fp = tmqHbCb;
2,525✔
1126
  sendInfo->msgType = TDMT_MND_TMQ_HB;
2,525✔
1127

1128

1129
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
2,525✔
1130

1131
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
2,525✔
1132
  if (code != 0) {
2,525!
1133
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1134
  }
1135
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
2,525✔
1136

1137
  END:
2,525✔
1138
  tDestroySMqHbReq(&req);
2,525✔
1139
  if (tmrId != NULL) {
2,525✔
1140
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
1,698✔
1141
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
1,698!
1142
  }
1143
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
2,525✔
1144
  if (ret != 0){
2,525!
1145
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1146
  }
1147
}
1148

1149
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
831✔
1150
  if (code != 0 && pTmq != NULL) {
831!
1151
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1152
  }
1153
}
831✔
1154

1155
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
46,545✔
1156
  if (rspWrapper == NULL) {
46,545!
1157
    return;
×
1158
  }
1159
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
46,545✔
1160
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
6,440✔
1161
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
40,105✔
1162
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
39,790!
1163
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
315✔
1164
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
25!
1165
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
290✔
1166
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
227!
1167
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
63✔
1168
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
19!
1169
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
44!
1170
    DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
44!
1171
  }
1172
}
1173

1174
static void freeClientVg(void* param) {
4,244✔
1175
  if (param == NULL) {
4,244!
1176
    return;
×
1177
  }
1178
  SMqClientVg* pVg = param;
4,244✔
1179
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
4,244✔
1180
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
4,244✔
1181
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
4,244✔
1182
}
1183
static void freeClientTopic(void* param) {
3,233✔
1184
  if (param == NULL) {
3,233!
1185
    return;
×
1186
  }
1187
  SMqClientTopic* pTopic = param;
3,233✔
1188
  taosMemoryFreeClear(pTopic->schema.pSchema);
3,233!
1189
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
3,233✔
1190
}
1191

1192
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
3,234✔
1193
                                   tmq_t* tmq) {
1194
  if (pTopic == NULL || pTopicEp == NULL || pVgOffsetHashMap == NULL || tmq == NULL) {
3,234!
1195
    return;
×
1196
  }
1197
  pTopic->schema = pTopicEp->schema;
3,234✔
1198
  pTopicEp->schema.nCols = 0;
3,234✔
1199
  pTopicEp->schema.pSchema = NULL;
3,234✔
1200

1201
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
3,234✔
1202
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
3,234✔
1203

1204
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
3,234✔
1205
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
3,234✔
1206

1207
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
3,234!
1208
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
3,234✔
1209
  if (pTopic->vgs == NULL) {
3,234!
1210
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1211
    return;
×
1212
  }
1213
  for (int32_t j = 0; j < vgNumGet; j++) {
7,479✔
1214
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
4,245✔
1215
    if (pVgEp == NULL) {
4,245!
1216
      continue;
×
1217
    }
1218
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
4,245✔
1219
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
4,245✔
1220

1221
    STqOffsetVal offsetNew = {0};
4,245✔
1222
    offsetNew.type = tmq->resetOffsetCfg;
4,245✔
1223

1224
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
4,245!
1225
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1226

1227
    SMqClientVg clientVg = {
12,735✔
1228
        .pollCnt = 0,
1229
        .vgId = pVgEp->vgId,
4,245✔
1230
        .epSet = pVgEp->epSet,
1231
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
4,245✔
1232
        .vgSkipCnt = 0,
1233
        .emptyBlockReceiveTs = 0,
1234
        .blockReceiveTs = 0,
1235
        .blockSleepForReplay = 0,
1236
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
4,245✔
1237
    };
1238

1239
    clientVg.offsetInfo.walVerBegin = -1;
4,245✔
1240
    clientVg.offsetInfo.walVerEnd = -1;
4,245✔
1241
    clientVg.seekUpdated = false;
4,245✔
1242
    if (pInfo) {
4,245✔
1243
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
2,758✔
1244
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
2,758✔
1245
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
2,758✔
1246
    } else {
1247
      clientVg.offsetInfo.endOffset = offsetNew;
1,487✔
1248
      clientVg.offsetInfo.committedOffset = offsetNew;
1,487✔
1249
      clientVg.offsetInfo.beginOffset = offsetNew;
1,487✔
1250
    }
1251
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
8,490!
1252
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1253
               pTopic->topicName);
1254
      freeClientVg(&clientVg);
×
1255
    }
1256
  }
1257
}
1258

1259
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
3,143✔
1260
  if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
3,143!
1261
    return;
×
1262
  }
1263
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
3,143✔
1264
  if (pVgOffsetHashMap == NULL) {
3,143!
1265
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1266
    return;
×
1267
  }
1268

1269
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
3,143✔
1270
  for (int32_t i = 0; i < topicNumCur; i++) {
5,865✔
1271
    // find old topic
1272
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
2,722✔
1273
    if (pTopicCur && pTopicCur->vgs) {
2,722!
1274
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
2,722✔
1275
      tqInfoC("consumer:0x%" PRIx64 ", current vg num:%d", tmq->consumerId, vgNumCur);
2,722!
1276
      for (int32_t j = 0; j < vgNumCur; j++) {
5,499✔
1277
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
2,777✔
1278
        if (pVgCur == NULL) {
2,777!
1279
          continue;
×
1280
        }
1281
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
2,777✔
1282
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
2,777✔
1283

1284
        char buf[TSDB_OFFSET_LEN] = {0};
2,777✔
1285
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
2,777✔
1286
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
2,777!
1287

1288
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
2,777✔
1289
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1290
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1291
            .numOfRows = pVgCur->numOfRows,
2,777✔
1292
            .vgStatus = pVgCur->vgStatus};
2,777✔
1293
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
2,777!
1294
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1295
        }
1296
      }
1297
    }
1298
  }
1299

1300
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
6,377✔
1301
    SMqClientTopic topic = {0};
3,234✔
1302
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
3,234✔
1303
    if (pTopicEp == NULL) {
3,234!
1304
      continue;
×
1305
    }
1306
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
3,234✔
1307
    if (taosArrayPush(newTopics, &topic) == NULL) {
3,234!
1308
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1309
      freeClientTopic(&topic);
×
1310
    }
1311
  }
1312

1313
  taosHashCleanup(pVgOffsetHashMap);
3,143✔
1314
}
1315

1316
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
7,735✔
1317
  if (tmq == NULL || pRsp == NULL) {
7,735!
1318
    return;
×
1319
  }
1320
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
7,735✔
1321
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1322
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1323
  if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
7,735✔
1324
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
4,147!
1325
             tmq->epoch, epoch, topicNumGet);
1326
    return;
4,147✔
1327
  }
1328

1329
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
3,588✔
1330
  if (newTopics == NULL) {
3,588!
1331
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1332
    return;
×
1333
  }
1334
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
3,588!
1335
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1336

1337
  taosWLockLatch(&tmq->lock);
3,588✔
1338
  if (topicNumGet > 0){
3,588✔
1339
    buildNewTopicList(tmq, newTopics, pRsp);
3,143✔
1340
  }
1341
  // destroy current buffered existed topics info
1342
  if (tmq->clientTopics) {
3,588!
1343
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
3,588✔
1344
  }
1345
  tmq->clientTopics = newTopics;
3,588✔
1346
  taosWUnLockLatch(&tmq->lock);
3,588✔
1347

1348
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
3,588✔
1349
  atomic_store_32(&tmq->epoch, epoch);
3,588✔
1350

1351
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
3,588!
1352
}
1353

1354
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
10,520✔
1355
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
10,520✔
1356
  if (pParam == NULL) {
10,520!
1357
    goto _ERR;
×
1358
  }
1359

1360
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
10,520✔
1361
  if (tmq == NULL) {
10,520✔
1362
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
9✔
1363
    goto _ERR;
9✔
1364
  }
1365

1366
  if (code != TSDB_CODE_SUCCESS) {
10,511✔
1367
    if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
2,764✔
1368
      tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
20!
1369
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){
20✔
1370
        atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__LOST);
15✔
1371
      }
1372
    }
1373
    goto END;
2,764✔
1374
  }
1375

1376
  if (pMsg == NULL) {
7,747!
1377
    goto END;
×
1378
  }
1379
  SMqRspHead* head = pMsg->pData;
7,747✔
1380
  int32_t     epoch = atomic_load_32(&tmq->epoch);
7,747✔
1381
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
7,747!
1382
  if (pParam->sync) {
7,747✔
1383
    SMqAskEpRsp rsp = {0};
1,307✔
1384
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
2,614!
1385
      doUpdateLocalEp(tmq, head->epoch, &rsp);
1,307✔
1386
    }
1387
    tDeleteSMqAskEpRsp(&rsp);
1388
  } else {
1389
    SMqRspWrapper* pWrapper = NULL;
6,440✔
1390
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
6,440✔
1391
    if (code) {
6,440!
1392
      goto END;
×
1393
    }
1394

1395
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
6,440✔
1396
    pWrapper->epoch = head->epoch;
6,440✔
1397
    (void)memcpy(&pWrapper->epRsp, pMsg->pData, sizeof(SMqRspHead));
6,440✔
1398
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->epRsp) == NULL) {
12,880!
1399
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1400
      taosFreeQitem(pWrapper);
×
1401
    } else {
1402
      code = taosWriteQitem(tmq->mqueue, pWrapper);
6,440✔
1403
      if (code != 0) {
6,440!
1404
        tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1405
        taosFreeQitem(pWrapper);
×
1406
        tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1407
      }
1408
    }
1409
  }
1410

1411
  END:
10,511✔
1412
  {
1413
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
10,511✔
1414
    if (ret != 0){
10,511!
1415
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1416
    }
1417
  }
1418

1419
  _ERR:
10,511✔
1420
  if (pParam && pParam->sync) {
10,520!
1421
    SAskEpInfo* pInfo = pParam->pParam;
4,058✔
1422
    if (pInfo) {
4,058!
1423
      pInfo->code = code;
4,058✔
1424
      if (tsem2_post(&pInfo->sem) != 0){
4,058!
1425
        tqErrorC("failed to post rsp sem askep cb");
×
1426
      }
1427
    }
1428
  }
1429

1430
  if (pMsg) {
10,520!
1431
    taosMemoryFree(pMsg->pEpSet);
10,520!
1432
    taosMemoryFree(pMsg->pData);
10,520!
1433
  }
1434

1435
  return code;
10,520✔
1436
}
1437

1438
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
10,520✔
1439
  if (pTmq == NULL) {
10,520!
1440
    return TSDB_CODE_INVALID_PARA;
×
1441
  }
1442
  int32_t code = 0;
10,520✔
1443
  int32_t lino = 0;
10,520✔
1444
  SMqAskEpReq req = {0};
10,520✔
1445
  req.consumerId = pTmq->consumerId;
10,520✔
1446
  req.epoch = updateEpSet ? -1 : pTmq->epoch;
10,520✔
1447
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
10,520✔
1448
  SMqAskEpCbParam* pParam = NULL;
10,520✔
1449
  void*            pReq = NULL;
10,520✔
1450

1451
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
10,520✔
1452
  TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
10,520!
1453
  pReq = taosMemoryCalloc(1, tlen);
10,520!
1454
  TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
10,520!
1455

1456
  code = tSerializeSMqAskEpReq(pReq, tlen, &req);
10,520✔
1457
  TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
10,520!
1458

1459
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
10,520!
1460
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
10,520!
1461

1462
  pParam->refId = pTmq->refId;
10,520✔
1463
  pParam->sync = sync;
10,520✔
1464
  pParam->pParam = param;
10,520✔
1465

1466
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
10,520!
1467
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
10,520!
1468

1469
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
10,520✔
1470
  sendInfo->requestId = generateRequestId();
10,520✔
1471
  sendInfo->requestObjRefId = 0;
10,520✔
1472
  sendInfo->param = pParam;
10,520✔
1473
  sendInfo->paramFreeFp = taosAutoMemoryFree;
10,520✔
1474
  sendInfo->fp = askEpCb;
10,520✔
1475
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
10,520✔
1476

1477
  pReq = NULL;
10,520✔
1478
  pParam = NULL;
10,520✔
1479

1480
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
10,520✔
1481
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode, QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
10,520!
1482
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
10,520✔
1483

1484
END:
10,520✔
1485
  if (code != 0) {
10,520!
1486
    tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
×
1487
  }
1488
  taosMemoryFree(pReq);
10,520!
1489
  taosMemoryFree(pParam);
10,520!
1490
  return code;
10,520✔
1491
}
1492

1493
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
78,520✔
1494
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
78,520!
1495
  while (1) {
12,203✔
1496
    int8_t* pTaskType = NULL;
90,726✔
1497
    taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
90,726✔
1498
    if (pTaskType == NULL) {break;}
90,726✔
1499
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
12,203✔
1500
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
3,835!
1501
      int32_t code = askEp(pTmq, NULL, false, false);
3,835✔
1502
      if (code != 0) {
3,835!
1503
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1504
      }
1505
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
3,835!
1506
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
3,835✔
1507
                              &pTmq->epTimer);
1508
      tqDebugC("reset timer for tmq ask ep:%d", ret);
3,835!
1509
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
8,368!
1510
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
8,368✔
1511
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
8,368✔
1512
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
8,368!
1513
               pTmq->autoCommitInterval / 1000.0);
1514
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
8,368✔
1515
                              &pTmq->commitTimer);
1516
      tqDebugC("reset timer for commit:%d", ret);
8,368!
1517
    } else {
1518
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1519
    }
1520

1521
    taosFreeQitem(pTaskType);
12,203✔
1522
  }
1523

1524
  return 0;
78,523✔
1525
}
1526

1527
void tmqClearUnhandleMsg(tmq_t* tmq) {
1,280✔
1528
  if (tmq == NULL) return;
1,280!
1529
  while (1) {
894✔
1530
    SMqRspWrapper* rspWrapper = NULL;
2,174✔
1531
    taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
2,174✔
1532
    if (rspWrapper == NULL) break;
2,174✔
1533
    tmqFreeRspWrapper(rspWrapper);
894✔
1534
    taosFreeQitem(rspWrapper);
894✔
1535
  }
1536
}
1537

1538
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
1,311✔
1539
  if (pMsg) {
1,311!
1540
    taosMemoryFreeClear(pMsg->pEpSet);
1,311!
1541
  }
1542

1543
  if (param == NULL) {
1,311!
1544
    return code;
×
1545
  }
1546

1547
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
1,311✔
1548
  pParam->rspErr = code;
1,311✔
1549

1550
  if (tsem2_post(&pParam->rspSem) != 0){
1,311!
1551
    tqErrorC("failed to post sem, subscribe cb");
×
1552
  }
1553
  return 0;
1,311✔
1554
}
1555

1556
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
30✔
1557
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
30!
1558
  if (*topics == NULL) {
30✔
1559
    *topics = tmq_list_new();
23✔
1560
    if (*topics == NULL) {
23!
1561
      return terrno;
×
1562
    }
1563
  }
1564
  taosRLockLatch(&tmq->lock);
30✔
1565
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
53✔
1566
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
23✔
1567
    if (topic == NULL) {
23!
1568
      tqErrorC("topic is null");
×
1569
      continue;
×
1570
    }
1571
    char* tmp = strchr(topic->topicName, '.');
23✔
1572
    if (tmp == NULL) {
23!
1573
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1574
      continue;
×
1575
    }
1576
    if (tmq_list_append(*topics, tmp + 1) != 0) {
23!
1577
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1578
      continue;
×
1579
    }
1580
  }
1581
  taosRUnLockLatch(&tmq->lock);
30✔
1582
  return 0;
30✔
1583
}
1584

1585
void tmqFreeImpl(void* handle) {
453✔
1586
  if (handle == NULL) return;
453!
1587
  tmq_t*  tmq = (tmq_t*)handle;
453✔
1588
  int64_t id = tmq->consumerId;
453✔
1589

1590
  if (tmq->mqueue) {
453!
1591
    tmqClearUnhandleMsg(tmq);
453✔
1592
    taosCloseQueue(tmq->mqueue);
453✔
1593
  }
1594

1595
  if (tmq->delayedTask) {
453!
1596
    taosCloseQueue(tmq->delayedTask);
453✔
1597
  }
1598

1599
  if(tsem2_destroy(&tmq->rspSem) != 0) {
453!
1600
    tqErrorC("failed to destroy sem in free tmq");
×
1601
  }
1602

1603
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
453✔
1604
  taos_close_internal(tmq->pTscObj);
453✔
1605

1606
  if (tmq->commitTimer) {
453✔
1607
    if (!taosTmrStopA(&tmq->commitTimer)) {
231✔
1608
      tqErrorC("failed to stop commit timer");
132!
1609
    }
1610
  }
1611
  if (tmq->epTimer) {
453✔
1612
    if (!taosTmrStopA(&tmq->epTimer)) {
445✔
1613
      tqErrorC("failed to stop ep timer");
419!
1614
    }
1615
  }
1616
  if (tmq->hbLiveTimer) {
453!
1617
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
453!
1618
      tqErrorC("failed to stop hb timer");
×
1619
    }
1620
  }
1621
  taosMemoryFree(tmq);
453!
1622

1623
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
453!
1624
}
1625

1626
static void tmqMgmtInit(void) {
311✔
1627
  tmqInitRes = 0;
311✔
1628

1629
  if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
311!
1630
    goto END;
×
1631
  }
1632

1633
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
311✔
1634

1635
  if (tmqMgmt.timer == NULL) {
311!
1636
    goto END;
×
1637
  }
1638

1639
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
311✔
1640
  if (tmqMgmt.rsetId < 0) {
311!
1641
    goto END;
×
1642
  }
1643

1644
  return;
311✔
1645
END:
×
1646
  tmqInitRes = terrno;
×
1647
}
1648

1649
void tmqMgmtClose(void) {
17,037✔
1650
  if (tmqMgmt.timer) {
17,037✔
1651
    taosTmrCleanUp(tmqMgmt.timer);
311✔
1652
    tmqMgmt.timer = NULL;
311✔
1653
  }
1654

1655
  if (tmqMgmt.rsetId > 0) {
17,037✔
1656
    (void) taosThreadMutexLock(&tmqMgmt.lock);
311✔
1657
    tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
311✔
1658
    int64_t  refId = 0;
311✔
1659

1660
    while (tmq) {
313✔
1661
      refId = tmq->refId;
2✔
1662
      if (refId == 0) {
2!
1663
        break;
×
1664
      }
1665
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
2✔
1666
      tmq = taosIterateRef(tmqMgmt.rsetId, refId);
2✔
1667
    }
1668
    taosCloseRef(tmqMgmt.rsetId);
311✔
1669
    tmqMgmt.rsetId = -1;
311✔
1670
    (void)taosThreadMutexUnlock(&tmqMgmt.lock);
311✔
1671
  }
1672
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
17,037✔
1673
}
17,037✔
1674

1675
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
452✔
1676
  int32_t code = 0;
452✔
1677

1678
  if (conf == NULL) {
452!
1679
    SET_ERROR_MSG_TMQ("configure is null")
×
1680
    return NULL;
×
1681
  }
1682
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
452✔
1683
  if (code != 0) {
455!
1684
    SET_ERROR_MSG_TMQ("tmq init error")
×
1685
    return NULL;
×
1686
  }
1687
  if (tmqInitRes != 0) {
455!
1688
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1689
    return NULL;
×
1690
  }
1691

1692
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
455!
1693
  if (pTmq == NULL) {
455!
1694
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1695
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1696
    return NULL;
×
1697
  }
1698

1699
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
455✔
1700
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
455✔
1701

1702
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
455✔
1703
  if (pTmq->clientTopics == NULL) {
455!
1704
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1705
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1706
    goto _failed;
×
1707
  }
1708
  code = taosOpenQueue(&pTmq->mqueue);
455✔
1709
  if (code) {
455!
1710
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1711
             pTmq->groupId);
1712
    SET_ERROR_MSG_TMQ("open queue failed")
×
1713
    goto _failed;
×
1714
  }
1715

1716
  code = taosOpenQueue(&pTmq->delayedTask);
455✔
1717
  if (code) {
455!
1718
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1719
             pTmq->groupId);
1720
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1721
    goto _failed;
×
1722
  }
1723

1724
  if (conf->groupId[0] == 0) {
455!
1725
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1726
             pTmq->groupId);
1727
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1728
    goto _failed;
×
1729
  }
1730

1731
  // init status
1732
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
455✔
1733
  pTmq->pollCnt = 0;
455✔
1734
  pTmq->epoch = 0;
455✔
1735
  pTmq->pollFlag = 0;
455✔
1736

1737
  // set conf
1738
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
455✔
1739
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
455✔
1740
  pTmq->withTbName = conf->withTbName;
455✔
1741
  pTmq->useSnapshot = conf->snapEnable;
455✔
1742
  pTmq->autoCommit = conf->autoCommit;
455✔
1743
  pTmq->autoCommitInterval = conf->autoCommitInterval;
455✔
1744
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
455✔
1745
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
455✔
1746
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
455✔
1747
  pTmq->commitCb = conf->commitCb;
455✔
1748
  pTmq->commitCbUserParam = conf->commitCbUserParam;
455✔
1749
  pTmq->resetOffsetCfg = conf->resetOffset;
455✔
1750
  pTmq->replayEnable = conf->replayEnable;
455✔
1751
  pTmq->sourceExcluded = conf->sourceExcluded;
455✔
1752
  pTmq->rawData = conf->rawData;
455✔
1753
  pTmq->maxPollWaitTime = conf->maxPollWaitTime;
455✔
1754
  pTmq->minPollRows = conf->minPollRows;
455✔
1755
  pTmq->enableBatchMeta = conf->enableBatchMeta;
455✔
1756
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
455✔
1757
  if (taosGetFqdn(pTmq->fqdn) != 0) {
455!
1758
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1759
  }
1760
  if (conf->replayEnable) {
455✔
1761
    pTmq->autoCommit = false;
7✔
1762
  }
1763
  taosInitRWLatch(&pTmq->lock);
455✔
1764

1765
  // assign consumerId
1766
  pTmq->consumerId = tGenIdPI64();
454✔
1767

1768
  // init semaphore
1769
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
455!
1770
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1771
             tstrerror(TAOS_SYSTEM_ERROR(ERRNO)), pTmq->groupId);
1772
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1773
    goto _failed;
×
1774
  }
1775

1776
  // init connection
1777
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
451✔
1778
  if (code) {
455!
1779
    terrno = code;
×
1780
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1781
    SET_ERROR_MSG_TMQ("init tscObj failed")
×
1782
    goto _failed;
×
1783
  }
1784

1785
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
455✔
1786
  if (pTmq->refId < 0) {
455!
1787
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1788
    goto _failed;
×
1789
  }
1790

1791
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
455✔
1792
  if (pTmq->hbLiveTimer == NULL) {
455!
1793
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1794
    goto _failed;
×
1795
  }
1796
  char         buf[TSDB_OFFSET_LEN] = {0};
455✔
1797
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
455✔
1798
  tFormatOffset(buf, tListLen(buf), &offset);
455✔
1799
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
455!
1800
              ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
1801
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1802
          buf);
1803

1804
  return pTmq;
455✔
1805

1806
  _failed:
×
1807
  tmqFreeImpl(pTmq);
×
1808
  return NULL;
×
1809
}
1810

1811
static int32_t syncAskEp(tmq_t* pTmq) {
4,058✔
1812
  if (pTmq == NULL) return TSDB_CODE_INVALID_PARA;
4,058!
1813
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
4,058!
1814
  if (pInfo == NULL) return terrno;
4,058!
1815
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
4,058!
1816
    taosMemoryFree(pInfo);
×
1817
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1818
  }
1819

1820
  int32_t code = askEp(pTmq, pInfo, true, false);
4,058✔
1821
  if (code == 0) {
4,058!
1822
    if (tsem2_wait(&pInfo->sem) != 0){
4,058!
1823
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1824
    }
1825
    code = pInfo->code;
4,058✔
1826
  }
1827

1828
  if(tsem2_destroy(&pInfo->sem) != 0) {
4,058!
1829
    tqErrorC("failed to destroy sem sync ask ep");
×
1830
  }
1831
  taosMemoryFree(pInfo);
4,058!
1832
  return code;
4,058✔
1833
}
1834

1835
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1,311✔
1836
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
1,311!
1837
  const SArray*   container = &topic_list->container;
1,311✔
1838
  int32_t         sz = taosArrayGetSize(container);
1,311✔
1839
  void*           buf = NULL;
1,311✔
1840
  SMsgSendInfo*   sendInfo = NULL;
1,311✔
1841
  SCMSubscribeReq req = {0};
1,311✔
1842
  int32_t         code = 0;
1,311✔
1843

1844
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
1,311!
1845

1846
  req.consumerId = tmq->consumerId;
1,311✔
1847
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
1,311✔
1848
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1,311✔
1849
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
1,311✔
1850
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
1,311✔
1851

1852
  req.topicNames = taosArrayInit(sz, sizeof(void*));
1,311✔
1853
  if (req.topicNames == NULL) {
1,311!
1854
    code = terrno;
×
1855
    goto END;
×
1856
  }
1857

1858
  req.withTbName = tmq->withTbName;
1,311✔
1859
  req.autoCommit = tmq->autoCommit;
1,311✔
1860
  req.autoCommitInterval = tmq->autoCommitInterval;
1,311✔
1861
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
1,311✔
1862
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
1,311✔
1863
  req.resetOffsetCfg = tmq->resetOffsetCfg;
1,311✔
1864
  req.enableReplay = tmq->replayEnable;
1,311✔
1865
  req.enableBatchMeta = tmq->enableBatchMeta;
1,311✔
1866

1867
  for (int32_t i = 0; i < sz; i++) {
1,856✔
1868
    char* topic = taosArrayGetP(container, i);
545✔
1869
    if (topic == NULL) {
545!
1870
      code = terrno;
×
1871
      goto END;
×
1872
    }
1873
    SName name = {0};
545✔
1874
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
545✔
1875
    if (code) {
545!
1876
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1877
               code);
1878
      goto END;
×
1879
    }
1880
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
545!
1881
    if (topicFName == NULL) {
545!
1882
      code = terrno;
×
1883
      goto END;
×
1884
    }
1885

1886
    code = tNameExtractFullName(&name, topicFName);
545✔
1887
    if (code) {
545!
1888
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1889
               code);
1890
      taosMemoryFree(topicFName);
×
1891
      goto END;
×
1892
    }
1893

1894
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
1,090!
1895
      code = terrno;
×
1896
      taosMemoryFree(topicFName);
×
1897
      goto END;
×
1898
    }
1899
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
545!
1900
  }
1901

1902
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1,311✔
1903
  buf = taosMemoryMalloc(tlen);
1,311!
1904
  if (buf == NULL) {
1,311!
1905
    code = terrno;
×
1906
    goto END;
×
1907
  }
1908

1909
  void* abuf = buf;
1,311!
1910
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
1,311✔
1911

1912
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,311!
1913
  if (sendInfo == NULL) {
1,311!
1914
    code = terrno;
×
1915
    taosMemoryFree(buf);
×
1916
    goto END;
×
1917
  }
1918

1919
  SMqSubscribeCbParam param = {.rspErr = 0};
1,311✔
1920
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
1,311!
1921
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1922
    taosMemoryFree(buf);
×
1923
    taosMemoryFree(sendInfo);
×
1924
    goto END;
×
1925
  }
1926

1927
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1,311✔
1928
  sendInfo->requestId = generateRequestId();
1,311✔
1929
  sendInfo->requestObjRefId = 0;
1,311✔
1930
  sendInfo->param = &param;
1,311✔
1931
  sendInfo->fp = tmqSubscribeCb;
1,311✔
1932
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
1,311✔
1933

1934
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1,311✔
1935

1936
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
1,311✔
1937
  if (code != 0) {
1,311!
1938
    goto END;
×
1939
  }
1940

1941
  if (tsem2_wait(&param.rspSem) != 0){
1,311!
1942
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1943
  }
1944
  if(tsem2_destroy(&param.rspSem) != 0) {
1,311!
1945
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1946
  }
1947

1948
  if (param.rspErr != 0) {
1,311✔
1949
    code = param.rspErr;
10✔
1950
    goto END;
10✔
1951
  }
1952

1953
  int32_t retryCnt = 0;
1,301✔
1954
  while ((code = syncAskEp(tmq)) != 0) {
4,045✔
1955
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2,751!
1956
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
7!
1957
               tmq->consumerId, tstrerror(code));
1958
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
7!
1959
        code = 0;
7✔
1960
      }
1961
      goto END;
7✔
1962
    }
1963

1964
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
2,744!
1965
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
2,744✔
1966
  }
1967

1968
  if (tmq->epTimer == NULL){
1,294✔
1969
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
446✔
1970
    if (tmq->epTimer == NULL) {
446!
1971
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1972
      goto END;
×
1973
    }
1974
  }
1975
  if (tmq->autoCommit && tmq->commitTimer == NULL){
1,294✔
1976
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
231✔
1977
    if (tmq->commitTimer == NULL) {
231!
1978
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1979
      goto END;
×
1980
    }
1981
  }
1982

1983
  END:
1,294✔
1984
  taosArrayDestroyP(req.topicNames, NULL);
1,311✔
1985
  return code;
1,311✔
1986
}
1987

1988
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
345✔
1989
  if (conf == NULL) return;
345!
1990
  conf->commitCb = cb;
345✔
1991
  conf->commitCbUserParam = param;
345✔
1992
}
1993

1994
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
39,223✔
1995
  if (tmq == NULL || topicName == NULL || pVg == NULL) {
39,223!
1996
    return;
×
1997
  }
1998
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
39,223✔
1999
  for (int i = 0; i < topicNumCur; i++) {
40,508✔
2000
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
40,500✔
2001
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
40,500!
2002
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
39,223✔
2003
      for (int32_t j = 0; j < vgNumCur; j++) {
72,023✔
2004
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
72,015✔
2005
        if (pVgCur && pVgCur->vgId == vgId) {
72,015!
2006
          *pVg = pVgCur;
39,215✔
2007
          return;
39,215✔
2008
        }
2009
      }
2010
    }
2011
  }
2012
}
2013

2014
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
35,350✔
2015
  if (tmq == NULL || topicName == NULL) {
35,350!
2016
    return NULL;
×
2017
  }
2018
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
35,350✔
2019
  for (int i = 0; i < topicNumCur; i++) {
36,616!
2020
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
36,616✔
2021
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
36,616✔
2022
      return pTopicCur;
35,349✔
2023
    }
2024
  }
2025
  return NULL;
×
2026
}
2027

2028
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
40,106✔
2029
  tmq_t*             tmq = NULL;
40,106✔
2030
  SMqRspWrapper*     pRspWrapper = NULL;
40,106✔
2031
  int8_t             rspType = 0;
40,106✔
2032
  int32_t            vgId = 0;
40,106✔
2033
  uint64_t           requestId = 0;
40,106✔
2034
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
40,106✔
2035
  if (pMsg == NULL) {
40,106!
2036
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2037
  }
2038
  if (pParam == NULL) {
40,106!
2039
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2040
    goto EXIT;
×
2041
  }
2042
  int64_t refId = pParam->refId;
40,106✔
2043
  vgId = pParam->vgId;
40,106✔
2044
  requestId = pParam->requestId;
40,106✔
2045
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
40,106✔
2046
  if (tmq == NULL) {
40,110✔
2047
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
4✔
2048
    goto EXIT;
4✔
2049
  }
2050

2051
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
40,106✔
2052
  if (ret) {
40,102!
2053
    code = ret;
×
2054
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
×
2055
    goto END;
×
2056
  }
2057

2058
  if (code != 0) {
40,105✔
2059
    goto END;
3,933✔
2060
  }
2061

2062
  if (pMsg->pData == NULL) {
36,172!
2063
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
2064
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2065
    goto END;
×
2066
  }
2067

2068
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
36,172✔
2069
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
36,172✔
2070

2071
  if (msgEpoch != clientEpoch) {
36,168✔
2072
    tqErrorC("consumer:0x%" PRIx64
45!
2073
                 " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
2074
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
2075
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
46✔
2076
    goto END;
46✔
2077
  }
2078
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
36,123✔
2079
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s), QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
36,123!
2080

2081
  pRspWrapper->tmqRspType = rspType;
36,126✔
2082
  pRspWrapper->pollRsp.reqId = requestId;
36,126✔
2083
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
36,126✔
2084
  pRspWrapper->pollRsp.data = pMsg->pData;
36,126✔
2085
  pRspWrapper->pollRsp.len = pMsg->len;
36,126✔
2086
  pMsg->pData = NULL;
36,126✔
2087
  pMsg->pEpSet = NULL;
36,126✔
2088

2089
  END:
40,105✔
2090
  if (pRspWrapper) {
40,105✔
2091
    pRspWrapper->code = code;
40,104✔
2092
    pRspWrapper->pollRsp.vgId = vgId;
40,104✔
2093
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
40,104✔
2094
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
40,104✔
2095
    if (code != 0) {
40,106!
2096
      tmqFreeRspWrapper(pRspWrapper);
×
2097
      taosFreeQitem(pRspWrapper);
×
2098
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2099
    } else {
2100
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d, QID:0x%" PRIx64,
40,106!
2101
               tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
2102
    }
2103
  }
2104

2105
  if (tsem2_post(&tmq->rspSem) != 0){
40,107!
2106
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2107
  }
2108
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
40,106✔
2109
  if (ret != 0){
40,106!
2110
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2111
  }
2112

2113
  EXIT:
40,106✔
2114
  taosMemoryFreeClear(pMsg->pData);
40,110!
2115
  taosMemoryFreeClear(pMsg->pEpSet);
40,110!
2116
  return code;
40,110✔
2117
}
2118

2119
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
40,113✔
2120
  if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) {
40,113!
2121
    return;
×
2122
  }
2123
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
40,113✔
2124
  pReq->withTbName = tmq->withTbName;
40,113✔
2125
  pReq->consumerId = tmq->consumerId;
40,113✔
2126
  pReq->timeout = tmq->maxPollWaitTime;
40,113✔
2127
  pReq->minPollRows = tmq->minPollRows;
40,113✔
2128
  pReq->epoch = tmq->epoch;
40,113✔
2129
  pReq->reqOffset = pVg->offsetInfo.endOffset;
40,113✔
2130
  pReq->head.vgId = pVg->vgId;
40,113✔
2131
  pReq->useSnapshot = tmq->useSnapshot;
40,113✔
2132
  pReq->reqId = generateRequestId();
40,113✔
2133
  pReq->enableReplay = tmq->replayEnable;
40,113✔
2134
  pReq->sourceExcluded = tmq->sourceExcluded;
40,113✔
2135
  pReq->rawData = tmq->rawData;
40,113✔
2136
  pReq->enableBatchMeta = tmq->enableBatchMeta;
40,113✔
2137
}
2138

2139
void changeByteEndian(char* pData) {
369,242✔
2140
  if (pData == NULL) {
369,242!
2141
    return;
×
2142
  }
2143
  char* p = pData;
369,242✔
2144

2145
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2146
  // length | version:
2147
  int32_t blockVersion = *(int32_t*)p;
369,242✔
2148
  if (blockVersion != BLOCK_VERSION_1) {
369,242!
2149
    tqErrorC("invalid block version:%d", blockVersion);
×
2150
    return;
×
2151
  }
2152
  *(int32_t*)p = BLOCK_VERSION_2;
369,242✔
2153

2154
  p += sizeof(int32_t);
369,242✔
2155
  p += sizeof(int32_t);
369,242✔
2156
  p += sizeof(int32_t);
369,242✔
2157
  int32_t cols = *(int32_t*)p;
369,242✔
2158
  p += sizeof(int32_t);
369,242✔
2159
  p += sizeof(int32_t);
369,242✔
2160
  p += sizeof(uint64_t);
369,242✔
2161
  // check fields
2162
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
369,242✔
2163

2164
  int32_t* colLength = (int32_t*)p;
369,242✔
2165
  for (int32_t i = 0; i < cols; ++i) {
2,323,343✔
2166
    colLength[i] = htonl(colLength[i]);
1,954,101✔
2167
  }
2168
}
2169

2170
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
722,343✔
2171
  if (pRetrieve == NULL || rawData == NULL || rows == NULL) {
722,343!
2172
    return;
×
2173
  }
2174
  if (*(int64_t*)pRetrieve == 0) {
722,346!
2175
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2176
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2177
    if (precision != NULL) {
×
2178
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2179
    }
2180
  } else if (*(int64_t*)pRetrieve == 1) {
722,346!
2181
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
722,351✔
2182
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
722,351✔
2183
    if (precision != NULL) {
722,343✔
2184
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
353,127✔
2185
    }
2186
  }
2187
}
2188

2189
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
26,468✔
2190
                                        SMqRspObj* pRspObj) {
2191
  if (pWrapper == NULL || pVg == NULL || numOfRows == NULL || pRspObj == NULL) {
26,468!
2192
    return;
×
2193
  }
2194
  pRspObj->resIter = -1;
26,468✔
2195
  pRspObj->resInfo.totalRows = 0;
26,468✔
2196
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
26,468✔
2197

2198
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
26,468✔
2199
  bool needTransformSchema = !pDataRsp->withSchema;
26,468✔
2200
  if (!pDataRsp->withSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
26,468✔
2201
    pDataRsp->withSchema = true;
23,339✔
2202
    pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
23,339✔
2203
    if (pDataRsp->blockSchema == NULL) {
23,339!
2204
      tqErrorC("failed to allocate memory for blockSchema");
×
2205
      return;
×
2206
    }
2207
  }
2208
  // extract the rows in this data packet
2209
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
395,631✔
2210
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
369,191✔
2211
    void*   rawData = NULL;
369,278✔
2212
    int64_t rows = 0;
369,278✔
2213
    // deal with compatibility
2214
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
369,278✔
2215

2216
    pVg->numOfRows += rows;
369,252✔
2217
    (*numOfRows) += rows;
369,252✔
2218
    changeByteEndian(rawData);
369,252✔
2219
    if (needTransformSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
369,164✔
2220
      SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
249,754!
2221
      if (schema) {
249,756!
2222
        if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
499,509!
2223
          tqErrorC("failed to push schema into blockSchema");
×
2224
          continue;
×
2225
        }
2226
      }
2227
    }
2228
  }
2229
}
2230

2231
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
40,110✔
2232
  SMqPollReq      req = {0};
40,110✔
2233
  char*           msg = NULL;
40,110✔
2234
  SMqPollCbParam* pParam = NULL;
40,110✔
2235
  SMsgSendInfo*   sendInfo = NULL;
40,110✔
2236
  int             code = 0;
40,110✔
2237
  int             lino = 0;
40,110✔
2238
  tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg);
40,110✔
2239

2240
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
40,110✔
2241
  TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
40,110!
2242

2243
  msg = taosMemoryCalloc(1, msgSize);
40,110!
2244
  TSDB_CHECK_NULL(msg, code, lino, END, terrno);
40,110!
2245

2246
  TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
40,110!
2247

2248
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
40,110!
2249
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
40,110!
2250

2251
  pParam->refId = pTmq->refId;
40,110✔
2252
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
40,110✔
2253
  pParam->vgId = pVg->vgId;
40,110✔
2254
  pParam->requestId = req.reqId;
40,110✔
2255

2256
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
40,110!
2257
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
40,110!
2258

2259
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
40,110✔
2260
  sendInfo->requestId = req.reqId;
40,110✔
2261
  sendInfo->requestObjRefId = 0;
40,110✔
2262
  sendInfo->param = pParam;
40,110✔
2263
  sendInfo->paramFreeFp = taosAutoMemoryFree;
40,110✔
2264
  sendInfo->fp = tmqPollCb;
40,110✔
2265
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
40,110✔
2266

2267
  msg = NULL;
40,110✔
2268
  pParam = NULL;
40,110✔
2269

2270
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
40,110✔
2271
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
40,110✔
2272
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
40,110✔
2273
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, QID:0x%" PRIx64, pTmq->consumerId,
40,110!
2274
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2275
  TSDB_CHECK_CODE(code, lino, END);
40,110!
2276

2277
  pVg->pollCnt++;
40,110✔
2278
  pVg->seekUpdated = false;  // reset this flag.
40,110✔
2279
  pTmq->pollCnt++;
40,110✔
2280

2281
END:
40,110✔
2282
  if (code != 0){
40,110!
2283
    tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
×
2284
  }
2285
  taosMemoryFreeClear(pParam);
40,110!
2286
  taosMemoryFreeClear(msg);
40,110!
2287
  return code;
40,110✔
2288
}
2289

2290
static int32_t tmqPollImpl(tmq_t* tmq) {
51,792✔
2291
  if (tmq == NULL) {
51,792!
2292
    return TSDB_CODE_INVALID_MSG;
×
2293
  }
2294
  int32_t code = 0;
51,792✔
2295
  taosWLockLatch(&tmq->lock);
51,792✔
2296

2297
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){
51,792✔
2298
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
7✔
2299
    goto end;
7✔
2300
  }
2301

2302
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
51,784✔
2303
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
51,785!
2304

2305
  for (int i = 0; i < numOfTopics; i++) {
105,915✔
2306
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
54,130✔
2307
    if (pTopic == NULL) {
54,130!
2308
      continue;
×
2309
    }
2310
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
54,130✔
2311
    if (pTopic->noPrivilege) {
54,130✔
2312
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
4!
2313
      continue;
4✔
2314
    }
2315
    for (int j = 0; j < numOfVg; j++) {
183,602✔
2316
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
129,476✔
2317
      if (pVg == NULL) {
129,476!
2318
        continue;
×
2319
      }
2320

2321
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
129,476✔
2322
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than 10ms
129,476!
2323
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
31,324!
2324
                 tmq->epoch, pVg->vgId);
2325
        continue;
31,324✔
2326
      }
2327

2328
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
98,152✔
2329
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
98,152!
2330
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
22!
2331
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2332
        continue;
22✔
2333
      }
2334

2335
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
98,130✔
2336
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
98,130✔
2337
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
58,020✔
2338
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
58,020!
2339
                 pVg->vgId, vgSkipCnt);
2340
        continue;
58,020✔
2341
      }
2342

2343
      atomic_store_32(&pVg->vgSkipCnt, 0);
40,110✔
2344
      code = doTmqPollImpl(tmq, pTopic, pVg);
40,110✔
2345
      if (code != TSDB_CODE_SUCCESS) {
40,110!
2346
        goto end;
×
2347
      }
2348
    }
2349
  }
2350

2351
  end:
51,785✔
2352
  taosWUnLockLatch(&tmq->lock);
51,792✔
2353
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
51,792!
2354
  return code;
51,792✔
2355
}
2356

2357
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
35,349✔
2358
                         int64_t consumerId, bool hasData) {
2359
  if (pVg == NULL || reqOffset == NULL || rspOffset == NULL) {
35,349!
2360
    return;
×
2361
  }
2362
  if (!pVg->seekUpdated) {
35,349✔
2363
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
35,347!
2364
    if (hasData) {
35,348✔
2365
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
26,731✔
2366
    }
2367
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
35,348✔
2368
  } else {
2369
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
2!
2370
  }
2371

2372
  // update the status
2373
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
35,350✔
2374

2375
  // update the valid wal version range
2376
  pVg->offsetInfo.walVerBegin = sver;
35,350✔
2377
  pVg->offsetInfo.walVerEnd = ever + 1;
35,350✔
2378
}
2379

2380
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
26,731✔
2381
  typedef union {
2382
    SMqDataRsp      dataRsp;
2383
    SMqMetaRsp      metaRsp;
2384
    SMqBatchMetaRsp batchMetaRsp;
2385
  } MEMSIZE;
2386

2387
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
26,731!
2388
  if (pRspObj == NULL) {
26,731!
2389
    tqErrorC("buildRsp:failed to allocate memory");
×
2390
    return NULL;
×
2391
  }
2392
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
26,731✔
2393
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
26,731✔
2394
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
26,731✔
2395
  pRspObj->vgId = pollRspWrapper->vgId;
26,731✔
2396
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
26,731✔
2397
  return pRspObj;
26,731✔
2398
}
2399

2400
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
3,873✔
2401
  int32_t code = 0;
3,873✔
2402
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
3,873✔
2403

2404
  tqErrorC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
3,873!
2405
    tstrerror(pRspWrapper->code));
2406
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
3,873✔
2407
    code = askEp(tmq, NULL, false, true);
2,627✔
2408
    if (code != 0) {
2,627!
2409
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep wher vnode transform, code:%s", tmq->consumerId, tstrerror(code));
×
2410
    }
2411
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1,246✔
2412
    code = syncAskEp(tmq);
13✔
2413
    if (code != 0) {
13!
2414
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
×
2415
    }
2416
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
1,233✔
2417
    code = 0;
604✔
2418
  }
2419
  
2420
  taosWLockLatch(&tmq->lock);
3,873✔
2421
  SMqClientVg* pVg = NULL;
3,873✔
2422
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
3,873✔
2423
  if (pVg) {
3,873✔
2424
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
3,865✔
2425
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
3,865✔
2426
  }
2427
  taosWUnLockLatch(&tmq->lock);
3,873✔
2428

2429
  return code;
3,873✔
2430
}
2431

2432
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
35,350✔
2433
  int32_t code = 0;
35,350✔
2434
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
35,350✔
2435
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
35,038!
2436
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
35,038✔
2437
    pRspWrapper->pollRsp.data = NULL;
35,038✔
2438
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
312✔
2439
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
227!
2440
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
85✔
2441
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
25!
2442
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
25✔
2443
    pRspWrapper->pollRsp.data = NULL;
25✔
2444
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
60✔
2445
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
19!
2446
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
41!
2447
    PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
41!
2448
    pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
41✔
2449
    pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
41✔
2450
    pRspWrapper->pollRsp.data = NULL;
41✔
2451
  } else {
2452
    tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
×
2453
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2454
    goto END;
×
2455
  }
2456
  END:
35,350✔
2457
  return code;
35,350✔
2458
}
2459

2460
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
41,778✔
2461
  int32_t    code = 0;
41,778✔
2462
  SMqRspObj* pRspObj = NULL;
41,778✔
2463

2464
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
41,778✔
2465
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
6,428!
2466
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
6,428✔
2467
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
6,428✔
2468
    terrno = code;
6,428✔
2469
    return pRspObj;
6,428✔
2470
  }
2471

2472
  code = processWrapperData(pRspWrapper);
35,350✔
2473
  if (code != 0) {
35,350!
2474
    goto END;
×
2475
  }
2476
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
35,350✔
2477
  taosWLockLatch(&tmq->lock);
35,350✔
2478
  SMqClientVg* pVg = NULL;
35,350✔
2479
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
35,350✔
2480
  if(pVg == NULL) {
35,350!
2481
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
×
2482
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2483
    code = TSDB_CODE_TMQ_INVALID_VGID;
×
2484
    goto END;
×
2485
  }
2486
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
35,350✔
2487
  if (pollRspWrapper->pEpset != NULL) {
35,349✔
2488
    pVg->epSet = *pollRspWrapper->pEpset;
35✔
2489
  }
2490

2491
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
35,349✔
2492
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
312✔
2493
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
35,391✔
2494
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
35,103✔
2495
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
35,103✔
2496

2497
    char buf[TSDB_OFFSET_LEN] = {0};
35,104✔
2498
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
35,104✔
2499
    if (pollRspWrapper->dataRsp.blockNum == 0) {
35,104✔
2500
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
8,619✔
2501
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
8,619!
2502
                   ", total:%" PRId64 ", QID:0x%" PRIx64,
2503
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2504
    } else {
2505
      pRspObj = buildRsp(pollRspWrapper);
26,485✔
2506
      if (pRspObj == NULL) {
26,485!
2507
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2508
        goto END;
×
2509
      }
2510
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
52,953✔
2511
                         (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
26,468✔
2512
      int64_t numOfRows = 0;
26,485✔
2513
      if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
26,485✔
2514
        tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
26,468✔
2515
        tmq->totalRows += numOfRows;
26,468✔
2516
      }
2517
      if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
26,485!
2518
        pVg->blockReceiveTs = taosGetTimestampMs();
27✔
2519
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
27✔
2520
        if (pVg->blockSleepForReplay > 0) {
27✔
2521
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
12!
2522
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2523
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2524
          }
2525
        }
2526
      }
2527
      pVg->emptyBlockReceiveTs = 0;
26,485✔
2528
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
26,485!
2529
                   ", vg total:%" PRId64 ", total:%" PRId64 ", QID:0x%" PRIx64,
2530
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2531
               pollRspWrapper->reqId);
2532
    }
2533
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
246!
2534
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
246✔
2535
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2536

2537

2538
    pRspObj = buildRsp(pollRspWrapper);
246✔
2539
    if (pRspObj == NULL) {
246!
2540
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2541
      goto END;
×
2542
    }
2543
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
246✔
2544
  }
2545

2546
END:
×
2547
  terrno = code;
35,350✔
2548
  taosWUnLockLatch(&tmq->lock);
35,350✔
2549
  return pRspObj;
35,350✔
2550
}
2551

2552
static void* tmqHandleAllRsp(tmq_t* tmq) {
78,522✔
2553
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
78,522!
2554

2555
  int32_t code = 0;
78,523✔
2556
  void* returnVal = NULL;
78,523✔
2557
  while (1) {
18,920✔
2558
    SMqRspWrapper* pRspWrapper = NULL;
97,443✔
2559
    taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
97,443✔
2560
    if (pRspWrapper == NULL) {break;}
124,174✔
2561

2562
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
45,651!
2563
    if (pRspWrapper->code != 0) {
45,651✔
2564
      code = processMqRspError(tmq, pRspWrapper);
3,873✔
2565
    }else{
2566
      returnVal = processMqRsp(tmq, pRspWrapper);
41,778✔
2567
      code = terrno;
41,778✔
2568
    }
2569

2570
    tmqFreeRspWrapper(pRspWrapper);
45,651✔
2571
    taosFreeQitem(pRspWrapper);
45,651✔
2572
    if(returnVal != NULL || code != 0){
45,651!
2573
      break;
2574
    }
2575
  }
2576

2577
END:
78,523✔
2578
  terrno = code;
78,523✔
2579
  return returnVal;
78,523✔
2580
}
2581

2582
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
27,005✔
2583
  int32_t lino = 0;
27,005✔
2584
  int32_t code = 0;
27,005✔
2585
  TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
27,005!
2586

2587
  void*   rspObj = NULL;
27,005✔
2588
  int64_t startTime = taosGetTimestampMs();
27,005✔
2589

2590
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
27,005!
2591
  TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
27,006✔
2592

2593
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
27,004✔
2594

2595
  while (1) {
2596
    code = tmqHandleAllDelayedTask(tmq);
78,522✔
2597
    TSDB_CHECK_CODE(code, lino, END);
78,522!
2598

2599
    rspObj = tmqHandleAllRsp(tmq);
78,522✔
2600
    if (rspObj) {
78,523✔
2601
      tqDebugC("consumer:0x%" PRIx64 "end to poll, return rsp:%p", tmq->consumerId, rspObj);
26,731!
2602
      return (TAOS_RES*)rspObj;
26,731✔
2603
    }
2604
    code = terrno;
51,792✔
2605
    TSDB_CHECK_CODE(code, lino, END);
51,792!
2606

2607
    code = tmqPollImpl(tmq);
51,792✔
2608
    TSDB_CHECK_CODE(code, lino, END);
51,792✔
2609

2610
    if (timeout >= 0) {
51,785!
2611
      int64_t currentTime = taosGetTimestampMs();
51,785✔
2612
      int64_t elapsedTime = currentTime - startTime;
51,785✔
2613
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
51,785✔
2614
      TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
51,781!
2615
    } else {
2616
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2617
    }
2618
  }
2619

2620
END:
275✔
2621
  terrno = code;
275✔
2622
  if (tmq != NULL && terrno != 0) {
275!
2623
    tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
9!
2624
  }
2625
  return NULL;
275✔
2626
}
2627

2628
static void displayConsumeStatistics(tmq_t* pTmq) {
837✔
2629
  if (pTmq == NULL) return;
837!
2630
  taosRLockLatch(&pTmq->lock);
837✔
2631
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
837✔
2632
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
837!
2633
          pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2634

2635
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
837!
2636
  for (int32_t i = 0; i < numOfTopics; ++i) {
1,349✔
2637
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
512✔
2638
    if (pTopics == NULL) continue;
512!
2639
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
512!
2640
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
512✔
2641
    for (int32_t j = 0; j < numOfVgs; ++j) {
1,983✔
2642
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
1,471✔
2643
      if (pVg == NULL) continue;
1,471!
2644
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
1,471!
2645
    }
2646
  }
2647
  taosRUnLockLatch(&pTmq->lock);
837✔
2648
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
837!
2649
}
2650

2651
int32_t tmq_unsubscribe(tmq_t* tmq) {
837✔
2652
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
837!
2653
  int32_t code = 0;
837✔
2654
  int8_t status = atomic_load_8(&tmq->status);
837✔
2655
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
837!
2656

2657
  displayConsumeStatistics(tmq);
837✔
2658
  if (status != TMQ_CONSUMER_STATUS__READY && status != TMQ_CONSUMER_STATUS__LOST) {
837✔
2659
    tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
10!
2660
    goto END;
10✔
2661
  }
2662
  if (tmq->autoCommit) {
827✔
2663
    code = tmq_commit_sync(tmq, NULL);
407✔
2664
    if (code != 0) {
407!
2665
      goto END;
×
2666
    }
2667
  }
2668
  tmqSendHbReq((void*)(tmq->refId), NULL);
827✔
2669

2670
  tmq_list_t* lst = tmq_list_new();
827✔
2671
  if (lst == NULL) {
827!
2672
    code = terrno;
×
2673
    goto END;
×
2674
  }
2675
  code = tmq_subscribe(tmq, lst);
827✔
2676
  tmq_list_destroy(lst);
827✔
2677
  tmqClearUnhandleMsg(tmq);
827✔
2678
  if(code != 0){
827!
2679
    goto END;
×
2680
  }
2681

2682
  END:
827✔
2683
  return code;
837✔
2684
}
2685

2686
int32_t tmq_consumer_close(tmq_t* tmq) {
453✔
2687
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
453!
2688
  int32_t code = 0;
453✔
2689
  (void) taosThreadMutexLock(&tmqMgmt.lock);
453✔
2690
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
453!
2691
    goto end;
×
2692
  }
2693
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
453!
2694
  code = tmq_unsubscribe(tmq);
453✔
2695
  if (code == 0) {
453!
2696
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
453✔
2697
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
453✔
2698
    if (code != 0){
453!
2699
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2700
    }
2701
  }
2702

2703
end:
453✔
2704
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
453✔
2705
  return code;
453✔
2706
}
2707

2708
const char* tmq_err2str(int32_t err) {
298✔
2709
  if (err == 0) {
298✔
2710
    return "success";
276✔
2711
  } else if (err == -1) {
22!
2712
    return "fail";
×
2713
  } else {
2714
    if (*(taosGetErrMsg()) == 0) {
22✔
2715
      return tstrerror(err);
8✔
2716
    } else {
2717
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
14✔
2718
      return (const char*)taosGetErrMsgReturn();
14✔
2719
    }
2720
  }
2721
}
2722

2723
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
26,691✔
2724
  if (res == NULL) {
26,691✔
2725
    return TMQ_RES_INVALID;
21✔
2726
  }
2727
  if (TD_RES_TMQ(res)) {
26,670✔
2728
    return TMQ_RES_DATA;
26,398✔
2729
  } else if (TD_RES_TMQ_META(res)) {
272✔
2730
    return TMQ_RES_TABLE_META;
200✔
2731
  } else if (TD_RES_TMQ_METADATA(res)) {
72✔
2732
    return TMQ_RES_METADATA;
21✔
2733
  } else if (TD_RES_TMQ_BATCH_META(res)) {
51✔
2734
    return TMQ_RES_TABLE_META;
17✔
2735
  } else if (TD_RES_TMQ_RAW(res)) {
34!
2736
    return TMQ_RES_RAWDATA;
34✔
2737
  } else {
2738
    return TMQ_RES_INVALID;
×
2739
  }
2740
}
2741

2742
const char* tmq_get_topic_name(TAOS_RES* res) {
25,310✔
2743
  if (res == NULL) {
25,310✔
2744
    return NULL;
17✔
2745
  }
2746
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
25,293✔
2747
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
214!
2748
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
25,293✔
2749
    if (tmp == NULL) {
25,293!
2750
      return NULL;
×
2751
    }
2752
    return tmp + 1;
25,293✔
2753
  } else {
2754
    return NULL;
×
2755
  }
2756
}
2757

2758
const char* tmq_get_db_name(TAOS_RES* res) {
25,308✔
2759
  if (res == NULL) {
25,308✔
2760
    return NULL;
15✔
2761
  }
2762

2763
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
25,293✔
2764
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
214!
2765
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
25,293✔
2766
    if (tmp == NULL) {
25,293!
2767
      return NULL;
×
2768
    }
2769
    return tmp + 1;
25,293✔
2770
  } else {
2771
    return NULL;
×
2772
  }
2773
}
2774

2775
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
25,309✔
2776
  if (res == NULL) {
25,309✔
2777
    return TSDB_CODE_INVALID_PARA;
16✔
2778
  }
2779
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
25,293✔
2780
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
214!
2781
    return ((SMqRspObj*)res)->vgId;
25,293✔
2782
  } else {
2783
    return TSDB_CODE_INVALID_PARA;
×
2784
  }
2785
}
2786

2787
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
421✔
2788
  if (res == NULL) {
421✔
2789
    return TSDB_CODE_INVALID_PARA;
19✔
2790
  }
2791
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
402!
2792
    SMqRspObj* pRspObj = (SMqRspObj*)res;
402✔
2793
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
402✔
2794
    if (pOffset->type == TMQ_OFFSET__LOG) {
402!
2795
      return pOffset->version;
402✔
2796
    } else {
2797
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2798
    }
2799
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
2800
    SMqRspObj* pRspObj = (SMqRspObj*)res;
×
2801
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
×
2802
      return pRspObj->rspOffset.version;
×
2803
    }
2804
  } else {
2805
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2806
  }
2807

2808
  // data from tsdb, no valid offset info
2809
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2810
}
2811

2812
const char* tmq_get_table_name(TAOS_RES* res) {
15,353,953✔
2813
  if (res == NULL) {
15,353,953✔
2814
    return NULL;
24✔
2815
  }
2816
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
15,353,929!
2817
    SMqRspObj* pRspObj = (SMqRspObj*)res;
15,353,929✔
2818
    SMqDataRsp* data = &pRspObj->dataRsp;
15,353,929✔
2819
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
15,353,929!
2820
        pRspObj->resIter >= data->blockNum) {
4,193,050!
2821
      return NULL;
11,160,875✔
2822
    }
2823
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
4,193,054✔
2824
  }
2825
  return NULL;
×
2826
}
2827

2828
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
19✔
2829
  if (tmq == NULL) {
19!
2830
    tqErrorC("invalid tmq handle, null");
×
2831
    if (cb != NULL) {
×
2832
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2833
    }
2834
    return;
×
2835
  }
2836
  if (pRes == NULL) {  // here needs to commit all offsets.
19!
2837
    asyncCommitAllOffsets(tmq, cb, param);
19✔
2838
  } else {  // only commit one offset
2839
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2840
  }
2841
}
2842

2843
static void commitCallBackFn(tmq_t* tmq, int32_t code, void* param) {
705✔
2844
  if (param == NULL) {
705!
2845
    tqErrorC("invalid param in commit cb");
×
2846
    return;
×
2847
  }
2848
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
705✔
2849
  pInfo->code = code;
705✔
2850
  if (tsem2_post(&pInfo->sem) != 0){
705!
2851
    tqErrorC("failed to post rsp sem in commit cb");
×
2852
  }
2853
}
2854

2855
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
701✔
2856
  if (tmq == NULL) {
701!
2857
    tqErrorC("invalid tmq handle, null");
×
2858
    return TSDB_CODE_INVALID_PARA;
×
2859
  }
2860

2861
  int32_t code = 0;
701✔
2862

2863
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
701!
2864
  if (pInfo == NULL) {
701!
2865
    tqErrorC("failed to allocate memory for sync commit");
×
2866
    return terrno;
×
2867
  }
2868

2869
  code = tsem2_init(&pInfo->sem, 0, 0);
701✔
2870
  if (code != 0) {
701!
2871
    tqErrorC("failed to init sem for sync commit");
×
2872
    taosMemoryFree(pInfo);
×
2873
    return code;
×
2874
  }
2875
  pInfo->code = 0;
701✔
2876

2877
  if (pRes == NULL) {
701!
2878
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
701✔
2879
  } else {
2880
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
×
2881
  }
2882

2883
  if (tsem2_wait(&pInfo->sem) != 0){
701!
2884
    tqErrorC("failed to wait sem for sync commit");
×
2885
  }
2886
  code = pInfo->code;
701✔
2887

2888
  if(tsem2_destroy(&pInfo->sem) != 0) {
701!
2889
    tqErrorC("failed to destroy sem for sync commit");
×
2890
  }
2891
  taosMemoryFree(pInfo);
701!
2892

2893
  tqDebugC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
701!
2894
  return code;
701✔
2895
}
2896

2897
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2898
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
25✔
2899
  if (offset == NULL) {
25!
2900
    tqErrorC("invalid offset, null");
×
2901
    return TSDB_CODE_INVALID_PARA;
×
2902
  }
2903
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
25!
2904
    tqErrorC("Assignment or poll interface need to be called first");
×
2905
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
2906
  }
2907

2908
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
25!
2909
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
1!
2910
             offset->walVerBegin, offset->walVerEnd);
2911
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
1✔
2912
  }
2913

2914
  return 0;
24✔
2915
}
2916

2917
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
30✔
2918
  if (tmq == NULL || pTopicName == NULL) {
30!
2919
    tqErrorC("invalid tmq handle, null");
×
2920
    return TSDB_CODE_INVALID_PARA;
×
2921
  }
2922

2923
  int32_t accId = tmq->pTscObj->acctId;
30✔
2924
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
30✔
2925
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
30✔
2926

2927
  taosWLockLatch(&tmq->lock);
30✔
2928
  SMqClientVg* pVg = NULL;
30✔
2929
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
30✔
2930
  if (code != 0) {
30✔
2931
    taosWUnLockLatch(&tmq->lock);
22✔
2932
    return code;
22✔
2933
  }
2934

2935
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
8✔
2936
  code = checkWalRange(pOffsetInfo, offset);
8✔
2937
  if (code != 0) {
8!
2938
    taosWUnLockLatch(&tmq->lock);
×
2939
    return code;
×
2940
  }
2941
  taosWUnLockLatch(&tmq->lock);
8✔
2942

2943
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
8✔
2944

2945
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
8!
2946
  if (pInfo == NULL) {
8!
2947
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2948
    return terrno;
×
2949
  }
2950

2951
  code = tsem2_init(&pInfo->sem, 0, 0);
8✔
2952
  if (code != 0) {
8!
2953
    taosMemoryFree(pInfo);
×
2954
    return code;
×
2955
  }
2956
  pInfo->code = 0;
8✔
2957

2958
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
8✔
2959
  if (code == 0) {
8✔
2960
    if (tsem2_wait(&pInfo->sem) != 0){
4!
2961
      tqErrorC("failed to wait sem for sync commit offset");
×
2962
    }
2963
    code = pInfo->code;
4✔
2964
  }
2965

2966
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
8✔
2967
  if(tsem2_destroy(&pInfo->sem) != 0) {
8!
2968
    tqErrorC("failed to destroy sem for sync commit offset");
×
2969
  }
2970
  taosMemoryFree(pInfo);
8!
2971

2972
  tqDebugC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
8!
2973
          offset, tstrerror(code));
2974

2975
  return code;
8✔
2976
}
2977

2978
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
12✔
2979
                             void* param) {
2980
  int32_t code = 0;
12✔
2981
  if (tmq == NULL || pTopicName == NULL) {
12!
2982
    tqErrorC("invalid tmq handle, null");
×
2983
    code = TSDB_CODE_INVALID_PARA;
×
2984
    goto end;
×
2985
  }
2986

2987
  int32_t accId = tmq->pTscObj->acctId;
12✔
2988
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
12✔
2989
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
12✔
2990

2991
  taosWLockLatch(&tmq->lock);
12✔
2992
  SMqClientVg* pVg = NULL;
12✔
2993
  code = getClientVg(tmq, tname, vgId, &pVg);
12✔
2994
  if (code != 0) {
12!
2995
    taosWUnLockLatch(&tmq->lock);
12✔
2996
    goto end;
12✔
2997
  }
2998

2999
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
3000
  code = checkWalRange(pOffsetInfo, offset);
×
3001
  if (code != 0) {
×
3002
    taosWUnLockLatch(&tmq->lock);
×
3003
    goto end;
×
3004
  }
3005
  taosWUnLockLatch(&tmq->lock);
×
3006

3007
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
×
3008

3009
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
×
3010

3011
  tqDebugC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
×
3012
          offset, tstrerror(code));
3013

3014
  end:
×
3015
  if (code != 0 && cb != NULL) {
12!
3016
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
3017
    cb(tmq, code, param);
×
3018
  }
3019
}
12✔
3020

3021

3022
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
379,489✔
3023
  if (res == NULL || pResInfo == NULL) {
379,489!
3024
    return TSDB_CODE_INVALID_PARA;
×
3025
  }
3026
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
379,489✔
3027
  SMqDataRsp* data = &pRspObj->dataRsp;
379,489✔
3028

3029
  pRspObj->resIter++;
379,489✔
3030
  if (pRspObj->resIter < data->blockNum) {
379,489✔
3031
    if (data->withSchema) {
353,138✔
3032
      doFreeReqResultInfo(&pRspObj->resInfo);
353,137✔
3033
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
353,137✔
3034
      if (pSW) {
353,136!
3035
        TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
353,136!
3036
      }
3037
    }
3038

3039
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
353,123✔
3040
    void*   rawData = NULL;
353,124✔
3041
    int64_t rows = 0;
353,124✔
3042
    int32_t precision = 0;
353,124✔
3043
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
353,124✔
3044

3045
    pRspObj->resInfo.pData = rawData;
353,126✔
3046
    pRspObj->resInfo.numOfRows = rows;
353,126✔
3047
    pRspObj->resInfo.current = 0;
353,126✔
3048
    pRspObj->resInfo.precision = precision;
353,126✔
3049

3050
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
353,126✔
3051
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4, false);
353,126✔
3052
    if (code != 0) {
353,124!
3053
      return code;
×
3054
    }
3055
    *pResInfo = &pRspObj->resInfo;
353,124✔
3056
    return code;
353,124✔
3057
  }
3058

3059
  return TSDB_CODE_TSC_INTERNAL_ERROR;
26,351✔
3060
}
3061

3062
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
3✔
3063
  if (param == NULL || pMsg == NULL) {
3!
3064
    return code;
×
3065
  }
3066
  SMqVgWalInfoParam* pParam = param;
3✔
3067
  SMqVgCommon*       pCommon = pParam->pCommon;
3✔
3068

3069
  if (code != TSDB_CODE_SUCCESS) {
3!
3070
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3071
             pParam->vgId, pCommon->pTopicName);
3072

3073
  } else {
3074
    SMqDataRsp rsp = {0};
3✔
3075
    SDecoder   decoder = {0};
3✔
3076
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
3✔
3077
    code = tDecodeMqDataRsp(&decoder, &rsp);
3✔
3078
    tDecoderClear(&decoder);
3✔
3079
    if (code != 0) {
3!
3080
      goto END;
×
3081
    }
3082

3083
    SMqRspHead*          pHead = pMsg->pData;
3✔
3084
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
3✔
3085
        .end = pHead->walever + 1,
3✔
3086
        .currentOffset = rsp.rspOffset.version,
3✔
3087
        .vgId = pParam->vgId};
3✔
3088

3089
    (void)taosThreadMutexLock(&pCommon->mutex);
3✔
3090
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
6!
3091
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3092
               pParam->vgId, pCommon->pTopicName);
3093
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
3094
    }
3095
    (void)taosThreadMutexUnlock(&pCommon->mutex);
3✔
3096
  }
3097

3098
  END:
3✔
3099
  pCommon->code = code;
3✔
3100
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
3✔
3101
  if (total == pParam->totalReq) {
3✔
3102
    if (tsem2_post(&pCommon->rsp) != 0) {
2!
3103
      tqErrorC("failed to post semaphore in get wal cb");
×
3104
    }
3105
  }
3106

3107
  if (pMsg) {
3!
3108
    taosMemoryFree(pMsg->pData);
3!
3109
    taosMemoryFree(pMsg->pEpSet);
3!
3110
  }
3111

3112
  return code;
3✔
3113
}
3114

3115
static void destroyCommonInfo(SMqVgCommon* pCommon) {
11✔
3116
  if (pCommon == NULL) {
11✔
3117
    return;
9✔
3118
  }
3119
  taosArrayDestroy(pCommon->pList);
2✔
3120
  pCommon->pList = NULL;
2✔
3121
  if(tsem2_destroy(&pCommon->rsp) != 0) {
2!
3122
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
3123
  }
3124
  taosMemoryFreeClear(pCommon->pTopicName);
2!
3125
  (void)taosThreadMutexDestroy(&pCommon->mutex);
2✔
3126
  taosMemoryFree(pCommon);
2!
3127
}
3128

3129
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
61✔
3130
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
61!
3131
    return true;
×
3132
  }
3133
  return false;
61✔
3134
}
3135

3136
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
2✔
3137
  if (param == NULL) {
2!
3138
    return code;
×
3139
  }
3140
  SMqCommittedParam* pParam = param;
2✔
3141

3142
  if (code != 0) {
2✔
3143
    goto end;
1✔
3144
  }
3145
  if (pMsg) {
1!
3146
    SDecoder decoder = {0};
1✔
3147
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
1✔
3148
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
1✔
3149
    if (err < 0) {
1!
3150
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3151
      code = err;
×
3152
      goto end;
×
3153
    }
3154
    tDecoderClear(&decoder);
1✔
3155
  }
3156

3157
  end:
×
3158
  if (pMsg) {
2!
3159
    taosMemoryFree(pMsg->pData);
2!
3160
    taosMemoryFree(pMsg->pEpSet);
2!
3161
  }
3162
  pParam->code = code;
2✔
3163
  if (tsem2_post(&pParam->sem) != 0){
2!
3164
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
3165
  }
3166
  return code;
2✔
3167
}
3168

3169
int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) {
2✔
3170
  if (tmq == NULL || tname == NULL || epSet == NULL) {
2!
3171
    return TSDB_CODE_INVALID_PARA;
×
3172
  }
3173
  int32_t     code = 0;
2✔
3174
  SMqVgOffset pOffset = {0};
2✔
3175

3176
  pOffset.consumerId = tmq->consumerId;
2✔
3177
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
2✔
3178

3179
  int32_t len = 0;
2✔
3180
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
2!
3181
  if (code < 0) {
2!
3182
    return TSDB_CODE_INVALID_PARA;
×
3183
  }
3184

3185
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
2!
3186
  if (buf == NULL) {
2!
3187
    return terrno;
×
3188
  }
3189

3190
  ((SMsgHead*)buf)->vgId = htonl(vgId);
2✔
3191

3192
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
2✔
3193

3194
  SEncoder encoder = {0};
2✔
3195
  tEncoderInit(&encoder, abuf, len);
2✔
3196
  code = tEncodeMqVgOffset(&encoder, &pOffset);
2✔
3197
  if (code < 0) {
2!
3198
    taosMemoryFree(buf);
×
3199
    tEncoderClear(&encoder);
×
3200
    return code;
×
3201
  }
3202
  tEncoderClear(&encoder);
2✔
3203

3204
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2!
3205
  if (sendInfo == NULL) {
2!
3206
    taosMemoryFree(buf);
×
3207
    return terrno;
×
3208
  }
3209

3210
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
2!
3211
  if (pParam == NULL) {
2!
3212
    taosMemoryFree(buf);
×
3213
    taosMemoryFree(sendInfo);
×
3214
    return terrno;
×
3215
  }
3216
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
2!
3217
    taosMemoryFree(buf);
×
3218
    taosMemoryFree(sendInfo);
×
3219
    taosMemoryFree(pParam);
×
3220
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3221
  }
3222

3223
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
2✔
3224
  sendInfo->requestId = generateRequestId();
2✔
3225
  sendInfo->requestObjRefId = 0;
2✔
3226
  sendInfo->param = pParam;
2✔
3227
  sendInfo->fp = tmCommittedCb;
2✔
3228
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
2✔
3229

3230
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
2✔
3231
  if (code != 0) {
2!
3232
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3233
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3234
    }
3235
    taosMemoryFree(pParam);
×
3236
    return code;
×
3237
  }
3238

3239
  if (tsem2_wait(&pParam->sem) != 0){
2!
3240
    tqErrorC("failed to wait semaphore in get committed from server");
×
3241
  }
3242
  code = pParam->code;
2✔
3243
  if (code == TSDB_CODE_SUCCESS) {
2✔
3244
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
1!
3245
      code = pParam->vgOffset.offset.val.version;
1✔
3246
    } else {
3247
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3248
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3249
    }
3250
  }
3251
  if(tsem2_destroy(&pParam->sem) != 0) {
2!
3252
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3253
  }
3254
  taosMemoryFree(pParam);
2!
3255

3256
  return code;
2✔
3257
}
3258

3259
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
34✔
3260
  if (tmq == NULL || pTopicName == NULL) {
34!
3261
    tqErrorC("invalid tmq handle, null");
×
3262
    return TSDB_CODE_INVALID_PARA;
×
3263
  }
3264

3265
  int32_t accId = tmq->pTscObj->acctId;
34✔
3266
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
34✔
3267
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
34✔
3268

3269
  taosWLockLatch(&tmq->lock);
34✔
3270

3271
  SMqClientVg* pVg = NULL;
34✔
3272
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
34✔
3273
  if (code != 0) {
34✔
3274
    taosWUnLockLatch(&tmq->lock);
22✔
3275
    return code;
22✔
3276
  }
3277

3278
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
12✔
3279
  int32_t        type = pOffsetInfo->endOffset.type;
12✔
3280
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
12!
3281
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3282
    taosWUnLockLatch(&tmq->lock);
×
3283
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3284
  }
3285

3286
  code = checkWalRange(pOffsetInfo, -1);
12✔
3287
  if (code != 0) {
12!
3288
    taosWUnLockLatch(&tmq->lock);
×
3289
    return code;
×
3290
  }
3291
  SEpSet  epSet = pVg->epSet;
12✔
3292
  int64_t begin = pVg->offsetInfo.walVerBegin;
12✔
3293
  int64_t end = pVg->offsetInfo.walVerEnd;
12✔
3294
  taosWUnLockLatch(&tmq->lock);
12✔
3295

3296
  int64_t position = 0;
12✔
3297
  if (type == TMQ_OFFSET__LOG) {
12✔
3298
    position = pOffsetInfo->endOffset.version;
11✔
3299
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
1!
3300
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3301
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
1!
3302
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3303
        position = begin;
×
3304
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3305
        position = end;
×
3306
      }
3307
    } else {
3308
      position = code;
1✔
3309
    }
3310
  } else {
3311
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3312
  }
3313

3314
  tqDebugC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
12!
3315
  return position;
12✔
3316
}
3317

3318
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
31✔
3319
  if (tmq == NULL || pTopicName == NULL) {
31!
3320
    tqErrorC("invalid tmq handle, null");
×
3321
    return TSDB_CODE_INVALID_PARA;
×
3322
  }
3323

3324
  int32_t accId = tmq->pTscObj->acctId;
31✔
3325
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
31✔
3326
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
31✔
3327

3328
  taosWLockLatch(&tmq->lock);
31✔
3329

3330
  SMqClientVg* pVg = NULL;
31✔
3331
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
31✔
3332
  if (code != 0) {
31✔
3333
    taosWUnLockLatch(&tmq->lock);
16✔
3334
    return code;
16✔
3335
  }
3336

3337
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
15✔
3338
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
15!
3339
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3340
             pOffsetInfo->endOffset.type);
3341
    taosWUnLockLatch(&tmq->lock);
×
3342
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3343
  }
3344

3345
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
15!
3346
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3347
             pOffsetInfo->committedOffset.type);
3348
    taosWUnLockLatch(&tmq->lock);
×
3349
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3350
  }
3351

3352
  int64_t committed = 0;
15✔
3353
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
15✔
3354
    committed = pOffsetInfo->committedOffset.version;
14✔
3355
    taosWUnLockLatch(&tmq->lock);
14✔
3356
    goto end;
14✔
3357
  }
3358
  SEpSet epSet = pVg->epSet;
1✔
3359
  taosWUnLockLatch(&tmq->lock);
1✔
3360

3361
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3362

3363
  end:
15✔
3364
  tqDebugC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
15!
3365
  return committed;
15✔
3366
}
3367

3368
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
24✔
3369
                                 int32_t* numOfAssignment) {
3370
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
24!
3371
    tqErrorC("invalid tmq handle, null");
13!
3372
    return TSDB_CODE_INVALID_PARA;
13✔
3373
  }
3374
  *numOfAssignment = 0;
11✔
3375
  *assignment = NULL;
11✔
3376
  SMqVgCommon* pCommon = NULL;
11✔
3377

3378
  int32_t accId = tmq->pTscObj->acctId;
11✔
3379
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
11✔
3380
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
11✔
3381

3382
  taosWLockLatch(&tmq->lock);
11✔
3383

3384
  SMqClientTopic* pTopic = NULL;
11✔
3385
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
11✔
3386
  if (code != 0) {
11✔
3387
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
3!
3388
    goto end;
3✔
3389
  }
3390

3391
  // in case of snapshot is opened, no valid offset will return
3392
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
8✔
3393
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
22✔
3394
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
14✔
3395
    if (pClientVg == NULL) {
14!
3396
      continue;
×
3397
    }
3398
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
14✔
3399
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
14!
3400
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3401
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3402
      goto end;
×
3403
    }
3404
  }
3405

3406
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
8!
3407
  if (*assignment == NULL) {
8!
3408
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3409
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3410
    code = terrno;
×
3411
    goto end;
×
3412
  }
3413

3414
  bool needFetch = false;
8✔
3415

3416
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
20✔
3417
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
14✔
3418
    if (pClientVg == NULL) {
14!
3419
      continue;
×
3420
    }
3421
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
14✔
3422
      needFetch = true;
2✔
3423
      break;
2✔
3424
    }
3425

3426
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
12✔
3427
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
12✔
3428
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
12✔
3429
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
12✔
3430
    pAssignment->vgId = pClientVg->vgId;
12✔
3431
    tqDebugC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
12!
3432
            pAssignment->currentOffset);
3433
  }
3434

3435
  if (needFetch) {
8✔
3436
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
2!
3437
    if (pCommon == NULL) {
2!
3438
      code = terrno;
×
3439
      goto end;
×
3440
    }
3441

3442
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
2✔
3443
    if (pCommon->pList == NULL) {
2!
3444
      code = terrno;
×
3445
      goto end;
×
3446
    }
3447

3448
    code = tsem2_init(&pCommon->rsp, 0, 0);
2✔
3449
    if (code != 0) {
2!
3450
      goto end;
×
3451
    }
3452
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
2✔
3453
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
2!
3454
    if (pCommon->pTopicName == NULL) {
2!
3455
      code = terrno;
×
3456
      goto end;
×
3457
    }
3458
    pCommon->consumerId = tmq->consumerId;
2✔
3459
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
5✔
3460
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
3✔
3461
      if (pClientVg == NULL) {
3!
3462
        continue;
×
3463
      }
3464
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
3!
3465
      if (pParam == NULL) {
3!
3466
        code = terrno;
×
3467
        goto end;
×
3468
      }
3469

3470
      pParam->epoch = tmq->epoch;
3✔
3471
      pParam->vgId = pClientVg->vgId;
3✔
3472
      pParam->totalReq = *numOfAssignment;
3✔
3473
      pParam->pCommon = pCommon;
3✔
3474

3475
      SMqPollReq req = {0};
3✔
3476
      tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg);
3✔
3477
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
3✔
3478

3479
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
3✔
3480
      if (msgSize < 0) {
3!
3481
        taosMemoryFree(pParam);
×
3482
        code = msgSize;
×
3483
        goto end;
×
3484
      }
3485

3486
      char* msg = taosMemoryCalloc(1, msgSize);
3!
3487
      if (NULL == msg) {
3!
3488
        taosMemoryFree(pParam);
×
3489
        code = terrno;
×
3490
        goto end;
×
3491
      }
3492

3493
      msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
3✔
3494
      if (msgSize < 0) {
3!
3495
        taosMemoryFree(msg);
×
3496
        taosMemoryFree(pParam);
×
3497
        code = msgSize;
×
3498
        goto end;
×
3499
      }
3500

3501
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3!
3502
      if (sendInfo == NULL) {
3!
3503
        taosMemoryFree(pParam);
×
3504
        taosMemoryFree(msg);
×
3505
        code = terrno;
×
3506
        goto end;
×
3507
      }
3508

3509
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
3✔
3510
      sendInfo->requestId = req.reqId;
3✔
3511
      sendInfo->requestObjRefId = 0;
3✔
3512
      sendInfo->param = pParam;
3✔
3513
      sendInfo->paramFreeFp = taosAutoMemoryFree;
3✔
3514
      sendInfo->fp = tmqGetWalInfoCb;
3✔
3515
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
3✔
3516

3517
      // int64_t transporterId = 0;
3518
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
3✔
3519
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
3✔
3520

3521
      tqDebugC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, QID:0x%" PRIx64, tmq->consumerId,
3!
3522
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3523
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
3✔
3524
      if (code != 0) {
3!
3525
        goto end;
×
3526
      }
3527
    }
3528

3529
    if (tsem2_wait(&pCommon->rsp) != 0){
2!
3530
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3531
    }
3532
    code = pCommon->code;
2✔
3533

3534
    if (code != TSDB_CODE_SUCCESS) {
2!
3535
      goto end;
×
3536
    }
3537
    int32_t num = taosArrayGetSize(pCommon->pList);
2✔
3538
    for (int32_t i = 0; i < num; ++i) {
5✔
3539
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
3✔
3540
    }
3541
    *numOfAssignment = num;
2✔
3542

3543
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5✔
3544
      tmq_topic_assignment* p = &(*assignment)[j];
3✔
3545

3546
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
8✔
3547
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
5✔
3548
        if (pClientVg == NULL) {
5!
3549
          continue;
×
3550
        }
3551
        if (pClientVg->vgId != p->vgId) {
5✔
3552
          continue;
2✔
3553
        }
3554

3555
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
3✔
3556
        tqDebugC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
3!
3557
                p->vgId, p->currentOffset);
3558

3559
        pOffsetInfo->walVerBegin = p->begin;
3✔
3560
        pOffsetInfo->walVerEnd = p->end;
3✔
3561
      }
3562
    }
3563
  }
3564

3565
  end:
8✔
3566
  if (code != TSDB_CODE_SUCCESS) {
11✔
3567
    taosMemoryFree(*assignment);
3!
3568
    *assignment = NULL;
3✔
3569
    *numOfAssignment = 0;
3✔
3570
  }
3571
  destroyCommonInfo(pCommon);
11✔
3572
  taosWUnLockLatch(&tmq->lock);
11✔
3573
  return code;
11✔
3574
}
3575

3576
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
31✔
3577
  if (pAssignment == NULL) {
31✔
3578
    return;
24✔
3579
  }
3580

3581
  taosMemoryFree(pAssignment);
7!
3582
}
3583

3584
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
4✔
3585
  if (pMsg) {
4!
3586
    taosMemoryFree(pMsg->pData);
4!
3587
    taosMemoryFree(pMsg->pEpSet);
4!
3588
  }
3589
  if (param == NULL) {
4!
3590
    return code;
×
3591
  }
3592
  SMqSeekParam* pParam = param;
4✔
3593
  pParam->code = code;
4✔
3594
  if (tsem2_post(&pParam->sem) != 0){
4!
3595
    tqErrorC("failed to post sem in tmqSeekCb");
×
3596
  }
3597
  return 0;
4✔
3598
}
3599

3600
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3601
// there is no data to poll
3602
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
21✔
3603
  if (tmq == NULL || pTopicName == NULL) {
21!
3604
    tqErrorC("invalid tmq handle, null");
×
3605
    return TSDB_CODE_INVALID_PARA;
×
3606
  }
3607

3608
  int32_t accId = tmq->pTscObj->acctId;
21✔
3609
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
21✔
3610
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
21✔
3611

3612
  taosWLockLatch(&tmq->lock);
21✔
3613

3614
  SMqClientVg* pVg = NULL;
21✔
3615
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
21✔
3616
  if (code != 0) {
21✔
3617
    taosWUnLockLatch(&tmq->lock);
16✔
3618
    return code;
16✔
3619
  }
3620

3621
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
5✔
3622

3623
  int32_t type = pOffsetInfo->endOffset.type;
5✔
3624
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
5!
3625
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3626
    taosWUnLockLatch(&tmq->lock);
×
3627
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3628
  }
3629

3630
  code = checkWalRange(pOffsetInfo, offset);
5✔
3631
  if (code != 0) {
5✔
3632
    taosWUnLockLatch(&tmq->lock);
1✔
3633
    return code;
1✔
3634
  }
3635

3636
  tqDebugC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
4!
3637
  // update the offset, and then commit to vnode
3638
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
4✔
3639
  pOffsetInfo->endOffset.version = offset;
4✔
3640
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
4✔
3641
  pVg->seekUpdated = true;
4✔
3642
  SEpSet epSet = pVg->epSet;
4✔
3643
  taosWUnLockLatch(&tmq->lock);
4✔
3644

3645
  SMqSeekReq req = {0};
4✔
3646
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
4✔
3647
  req.head.vgId = vgId;
4✔
3648
  req.consumerId = tmq->consumerId;
4✔
3649

3650
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
4✔
3651
  if (msgSize < 0) {
4!
3652
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3653
  }
3654

3655
  char* msg = taosMemoryCalloc(1, msgSize);
4!
3656
  if (NULL == msg) {
4!
3657
    return terrno;
×
3658
  }
3659

3660
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
4!
3661
    taosMemoryFree(msg);
×
3662
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3663
  }
3664

3665
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4!
3666
  if (sendInfo == NULL) {
4!
3667
    taosMemoryFree(msg);
×
3668
    return terrno;
×
3669
  }
3670

3671
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
4!
3672
  if (pParam == NULL) {
4!
3673
    taosMemoryFree(msg);
×
3674
    taosMemoryFree(sendInfo);
×
3675
    return terrno;
×
3676
  }
3677
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
4!
3678
    taosMemoryFree(msg);
×
3679
    taosMemoryFree(sendInfo);
×
3680
    taosMemoryFree(pParam);
×
3681
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3682
  }
3683

3684
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
4✔
3685
  sendInfo->requestId = generateRequestId();
4✔
3686
  sendInfo->requestObjRefId = 0;
4✔
3687
  sendInfo->param = pParam;
4✔
3688
  sendInfo->fp = tmqSeekCb;
4✔
3689
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
4✔
3690

3691
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
4✔
3692
  if (code != 0) {
4!
3693
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3694
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3695
    }
3696
    taosMemoryFree(pParam);
×
3697
    return code;
×
3698
  }
3699

3700
  if (tsem2_wait(&pParam->sem) != 0){
4!
3701
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3702
  }
3703
  code = pParam->code;
4✔
3704
  if(tsem2_destroy(&pParam->sem) != 0) {
4!
3705
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3706
  }
3707
  taosMemoryFree(pParam);
4!
3708

3709
  tqDebugC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
4!
3710

3711
  return code;
4✔
3712
}
3713

3714
TAOS* tmq_get_connect(tmq_t* tmq) {
28✔
3715
  if (tmq && tmq->pTscObj) {
28!
3716
    return (TAOS*)(&(tmq->pTscObj->id));
28✔
3717
  }
3718
  return NULL;
×
3719
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc