• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3535

23 Nov 2024 02:07AM UTC coverage: 60.85% (+0.03%) from 60.825%
#3535

push

travis-ci

web-flow
Merge pull request #28893 from taosdata/doc/internal

refact: rename taos lib name

120252 of 252737 branches covered (47.58%)

Branch coverage included in aggregate %.

201187 of 275508 relevant lines covered (73.02%)

15886166.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.34
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24
#include "tref.h"
25
#include "ttimer.h"
26

27
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}     while(0)
28
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}            while(0)
29
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30

31
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
32
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
33
#define DEFAULT_HEARTBEAT_INTERVAL     3000
34
#define DEFAULT_ASKEP_INTERVAL         1000
35
#define DEFAULT_COMMIT_CNT             1
36
#define SUBSCRIBE_RETRY_MAX_COUNT      240
37
#define SUBSCRIBE_RETRY_INTERVAL       500
38

39

40
#define SET_ERROR_MSG_TMQ(MSG) \
41
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
42

43
#define PROCESS_POLL_RSP(FUNC,DATA) \
44
  SDecoder decoder = {0}; \
45
  tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); \
46
  if (FUNC(&decoder, DATA) < 0) { \
47
    tDecoderClear(&decoder); \
48
    code = TSDB_CODE_OUT_OF_MEMORY; \
49
    goto END;\
50
  }\
51
  tDecoderClear(&decoder);\
52
  (void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead));
53

54
#define DELETE_POLL_RSP(FUNC,DATA) \
55
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
56
  taosMemoryFreeClear(pRsp->pEpset);\
57
  FUNC(DATA);
58

59
enum {
60
  TMQ_VG_STATUS__IDLE = 0,
61
  TMQ_VG_STATUS__WAIT,
62
};
63

64
enum {
65
  TMQ_CONSUMER_STATUS__INIT = 0,
66
  TMQ_CONSUMER_STATUS__READY,
67
  TMQ_CONSUMER_STATUS__CLOSED,
68
};
69

70
enum {
71
  TMQ_DELAYED_TASK__ASK_EP = 1,
72
  TMQ_DELAYED_TASK__COMMIT,
73
};
74

75
typedef struct {
76
  tmr_h   timer;
77
  int32_t rsetId;
78
} SMqMgmt;
79

80
struct tmq_list_t {
81
  SArray container;
82
};
83

84
struct tmq_conf_t {
85
  char           clientId[TSDB_CLIENT_ID_LEN];
86
  char           groupId[TSDB_CGROUP_LEN];
87
  int8_t         autoCommit;
88
  int8_t         resetOffset;
89
  int8_t         withTbName;
90
  int8_t         snapEnable;
91
  int8_t         replayEnable;
92
  int8_t         sourceExcluded;  // do not consume, bit
93
  uint16_t       port;
94
  int32_t        autoCommitInterval;
95
  int32_t        sessionTimeoutMs;
96
  int32_t        heartBeatIntervalMs;
97
  int32_t        maxPollIntervalMs;
98
  char*          ip;
99
  char*          user;
100
  char*          pass;
101
  tmq_commit_cb* commitCb;
102
  void*          commitCbUserParam;
103
  int8_t         enableBatchMeta;
104
};
105

106
struct tmq_t {
107
  int64_t        refId;
108
  char           groupId[TSDB_CGROUP_LEN];
109
  char           clientId[TSDB_CLIENT_ID_LEN];
110
  char           user[TSDB_USER_LEN];
111
  char           fqdn[TSDB_FQDN_LEN];
112
  int8_t         withTbName;
113
  int8_t         useSnapshot;
114
  int8_t         autoCommit;
115
  int32_t        autoCommitInterval;
116
  int32_t        sessionTimeoutMs;
117
  int32_t        heartBeatIntervalMs;
118
  int32_t        maxPollIntervalMs;
119
  int8_t         resetOffsetCfg;
120
  int8_t         replayEnable;
121
  int8_t         sourceExcluded;  // do not consume, bit
122
  int64_t        consumerId;
123
  tmq_commit_cb* commitCb;
124
  void*          commitCbUserParam;
125
  int8_t         enableBatchMeta;
126

127
  // status
128
  SRWLatch lock;
129
  int8_t   status;
130
  int32_t  epoch;
131
  // poll info
132
  int64_t pollCnt;
133
  int64_t totalRows;
134
  int8_t  pollFlag;
135

136
  // timer
137
  tmr_h       hbLiveTimer;
138
  tmr_h       epTimer;
139
  tmr_h       commitTimer;
140
  STscObj*    pTscObj;       // connection
141
  SArray*     clientTopics;  // SArray<SMqClientTopic>
142
  STaosQueue* mqueue;        // queue of rsp
143
  STaosQall*  qall;
144
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
145
  tsem2_t     rspSem;
146
};
147

148
typedef struct {
149
  int32_t code;
150
  tsem2_t sem;
151
} SAskEpInfo;
152

153
typedef struct {
154
  STqOffsetVal committedOffset;
155
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
156
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
157
  int64_t      walVerBegin;
158
  int64_t      walVerEnd;
159
} SVgOffsetInfo;
160

161
typedef struct {
162
  int64_t       pollCnt;
163
  int64_t       numOfRows;
164
  SVgOffsetInfo offsetInfo;
165
  int32_t       vgId;
166
  int32_t       vgStatus;
167
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
168
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
169
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
170
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
171
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
172
  SEpSet        epSet;
173
} SMqClientVg;
174

175
typedef struct {
176
  char           topicName[TSDB_TOPIC_FNAME_LEN];
177
  char           db[TSDB_DB_FNAME_LEN];
178
  SArray*        vgs;  // SArray<SMqClientVg>
179
  SSchemaWrapper schema;
180
  int8_t         noPrivilege;
181
} SMqClientTopic;
182

183
typedef struct {
184
  int32_t         vgId;
185
  char            topicName[TSDB_TOPIC_FNAME_LEN];
186
  SMqClientTopic* topicHandle;
187
  uint64_t        reqId;
188
  SEpSet*         pEpset;
189
  union {
190
    struct{
191
      SMqRspHead   head;
192
      STqOffsetVal rspOffset;
193
    };
194
    SMqDataRsp      dataRsp;
195
    SMqMetaRsp      metaRsp;
196
    SMqBatchMetaRsp batchMetaRsp;
197
  };
198
} SMqPollRspWrapper;
199

200
typedef struct {
201
  int32_t code;
202
  int8_t  tmqRspType;
203
  int32_t epoch;
204
  union{
205
    SMqPollRspWrapper pollRsp;
206
    SMqAskEpRsp       epRsp;
207
  };
208
} SMqRspWrapper;
209

210
typedef struct {
211
  tsem2_t rspSem;
212
  int32_t rspErr;
213
} SMqSubscribeCbParam;
214

215
typedef struct {
216
  int64_t refId;
217
  bool    sync;
218
  void*   pParam;
219
} SMqAskEpCbParam;
220

221
typedef struct {
222
  int64_t  refId;
223
  char     topicName[TSDB_TOPIC_FNAME_LEN];
224
  int32_t  vgId;
225
  uint64_t requestId;  // request id for debug purpose
226
} SMqPollCbParam;
227

228
typedef struct {
229
  tsem2_t       rsp;
230
  int32_t       numOfRsp;
231
  SArray*       pList;
232
  TdThreadMutex mutex;
233
  int64_t       consumerId;
234
  char*         pTopicName;
235
  int32_t       code;
236
} SMqVgCommon;
237

238
typedef struct {
239
  tsem2_t sem;
240
  int32_t code;
241
} SMqSeekParam;
242

243
typedef struct {
244
  tsem2_t     sem;
245
  int32_t     code;
246
  SMqVgOffset vgOffset;
247
} SMqCommittedParam;
248

249
typedef struct {
250
  int32_t      vgId;
251
  int32_t      epoch;
252
  int32_t      totalReq;
253
  SMqVgCommon* pCommon;
254
} SMqVgWalInfoParam;
255

256
typedef struct {
257
  int64_t        refId;
258
  int32_t        epoch;
259
  int32_t        waitingRspNum;
260
  int32_t        code;
261
  tmq_commit_cb* callbackFn;
262
  void*          userParam;
263
} SMqCommitCbParamSet;
264

265
typedef struct {
266
  SMqCommitCbParamSet* params;
267
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
268
  int32_t              vgId;
269
  int64_t              consumerId;
270
} SMqCommitCbParam;
271

272
typedef struct {
273
  tsem2_t sem;
274
  int32_t code;
275
} SSyncCommitInfo;
276

277
typedef struct {
278
  STqOffsetVal currentOffset;
279
  STqOffsetVal commitOffset;
280
  STqOffsetVal seekOffset;
281
  int64_t      numOfRows;
282
  int32_t      vgStatus;
283
} SVgroupSaveInfo;
284

285
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
286
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
287
static   SMqMgmt        tmqMgmt = {0};
288

289
tmq_conf_t* tmq_conf_new() {
443✔
290
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
443✔
291
  if (conf == NULL) {
446!
292
    return conf;
×
293
  }
294

295
  conf->withTbName = false;
446✔
296
  conf->autoCommit = true;
446✔
297
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
446✔
298
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
446✔
299
  conf->enableBatchMeta = false;
446✔
300
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
446✔
301
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
446✔
302
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
446✔
303

304
  return conf;
446✔
305
}
306

307
void tmq_conf_destroy(tmq_conf_t* conf) {
446✔
308
  if (conf) {
446!
309
    if (conf->ip) {
446✔
310
      taosMemoryFree(conf->ip);
2✔
311
    }
312
    if (conf->user) {
446✔
313
      taosMemoryFree(conf->user);
444✔
314
    }
315
    if (conf->pass) {
446✔
316
      taosMemoryFree(conf->pass);
444✔
317
    }
318
    taosMemoryFree(conf);
446✔
319
  }
320
}
446✔
321

322
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
3,075✔
323
  int32_t code = 0;
3,075✔
324
  if (conf == NULL || key == NULL || value == NULL) {
3,075!
325
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
326
    return TMQ_CONF_INVALID;
×
327
  }
328
  if (strcasecmp(key, "group.id") == 0) {
3,081✔
329
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
446✔
330
    return TMQ_CONF_OK;
446✔
331
  }
332

333
  if (strcasecmp(key, "client.id") == 0) {
2,635✔
334
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
57✔
335
    return TMQ_CONF_OK;
57✔
336
  }
337

338
  if (strcasecmp(key, "enable.auto.commit") == 0) {
2,578✔
339
    if (strcasecmp(value, "true") == 0) {
433✔
340
      conf->autoCommit = true;
216✔
341
      return TMQ_CONF_OK;
216✔
342
    } else if (strcasecmp(value, "false") == 0) {
217!
343
      conf->autoCommit = false;
217✔
344
      return TMQ_CONF_OK;
217✔
345
    } else {
346
      tqErrorC("invalid value for enable.auto.commit: %s", value);
×
347
      return TMQ_CONF_INVALID;
×
348
    }
349
  }
350

351
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
2,145✔
352
    int64_t tmp;
353
    code = taosStr2int64(value, &tmp);
251✔
354
    if (tmp < 0 || code != 0) {
250!
355
      tqErrorC("invalid value for auto.commit.interval.ms: %s", value);
×
356
      return TMQ_CONF_INVALID;
×
357
    }
358
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
251✔
359
    return TMQ_CONF_OK;
251✔
360
  }
361

362
  if (strcasecmp(key, "session.timeout.ms") == 0) {
1,894✔
363
    int64_t tmp;
364
    code = taosStr2int64(value, &tmp);
2✔
365
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
2!
366
      tqErrorC("invalid value for session.timeout.ms: %s", value);
×
367
      return TMQ_CONF_INVALID;
×
368
    }
369
    conf->sessionTimeoutMs = tmp;
2✔
370
    return TMQ_CONF_OK;
2✔
371
  }
372

373
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
1,892!
374
    int64_t tmp;
375
    code = taosStr2int64(value, &tmp);
×
376
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
×
377
      tqErrorC("invalid value for heartbeat.interval.ms: %s", value);
×
378
      return TMQ_CONF_INVALID;
×
379
    }
380
    conf->heartBeatIntervalMs = tmp;
×
381
    return TMQ_CONF_OK;
×
382
  }
383

384
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
1,892✔
385
    int32_t tmp;
386
    code = taosStr2int32(value, &tmp);
2✔
387
    if (tmp < 1000 || code != 0) {
2!
388
      tqErrorC("invalid value for max.poll.interval.ms: %s", value);
×
389
      return TMQ_CONF_INVALID;
×
390
    }
391
    conf->maxPollIntervalMs = tmp;
2✔
392
    return TMQ_CONF_OK;
2✔
393
  }
394

395
  if (strcasecmp(key, "auto.offset.reset") == 0) {
1,890✔
396
    if (strcasecmp(value, "none") == 0) {
414✔
397
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
11✔
398
      return TMQ_CONF_OK;
11✔
399
    } else if (strcasecmp(value, "earliest") == 0) {
403✔
400
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
381✔
401
      return TMQ_CONF_OK;
381✔
402
    } else if (strcasecmp(value, "latest") == 0) {
22!
403
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
22✔
404
      return TMQ_CONF_OK;
22✔
405
    } else {
406
      tqErrorC("invalid value for auto.offset.reset: %s", value);
×
407
      return TMQ_CONF_INVALID;
×
408
    }
409
  }
410

411
  if (strcasecmp(key, "msg.with.table.name") == 0) {
1,476✔
412
    if (strcasecmp(value, "true") == 0) {
392✔
413
      conf->withTbName = true;
373✔
414
      return TMQ_CONF_OK;
373✔
415
    } else if (strcasecmp(value, "false") == 0) {
19!
416
      conf->withTbName = false;
19✔
417
      return TMQ_CONF_OK;
19✔
418
    } else {
419
      tqErrorC("invalid value for msg.with.table.name: %s", value);
×
420
      return TMQ_CONF_INVALID;
×
421
    }
422
  }
423

424
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
1,084✔
425
    if (strcasecmp(value, "true") == 0) {
146✔
426
      conf->snapEnable = true;
113✔
427
      return TMQ_CONF_OK;
113✔
428
    } else if (strcasecmp(value, "false") == 0) {
33!
429
      conf->snapEnable = false;
33✔
430
      return TMQ_CONF_OK;
33✔
431
    } else {
432
      tqErrorC("invalid value for experimental.snapshot.enable: %s", value);
×
433
      return TMQ_CONF_INVALID;
×
434
    }
435
  }
436

437
  if (strcasecmp(key, "td.connect.ip") == 0) {
938✔
438
    void *tmp = taosStrdup(value);
2✔
439
    if (tmp == NULL) {
2!
440
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
441
      return TMQ_CONF_INVALID;
×
442
    }
443
    conf->ip = tmp;
2✔
444
    return TMQ_CONF_OK;
2✔
445
  }
446

447
  if (strcasecmp(key, "td.connect.user") == 0) {
936✔
448
    void *tmp = taosStrdup(value);
443✔
449
    if (tmp == NULL) {
443!
450
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
451
      return TMQ_CONF_INVALID;
×
452
    }
453
    conf->user = tmp;
443✔
454
    return TMQ_CONF_OK;
443✔
455
  }
456

457
  if (strcasecmp(key, "td.connect.pass") == 0) {
493✔
458
    void *tmp = taosStrdup(value);
443✔
459
    if (tmp == NULL) {
444✔
460
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
1!
461
      return TMQ_CONF_INVALID;
×
462
    }
463
    conf->pass = tmp;
443✔
464
    return TMQ_CONF_OK;
443✔
465
  }
466

467
  if (strcasecmp(key, "td.connect.port") == 0) {
50!
468
    int64_t tmp;
469
    code = taosStr2int64(value, &tmp);
×
470
    if (tmp <= 0 || tmp > 65535 || code != 0) {
×
471
      tqErrorC("invalid value for td.connect.port: %s", value);
×
472
      return TMQ_CONF_INVALID;
×
473
    }
474

475
    conf->port = tmp;
×
476
    return TMQ_CONF_OK;
×
477
  }
478

479
  if (strcasecmp(key, "enable.replay") == 0) {
50✔
480
    if (strcasecmp(value, "true") == 0) {
7!
481
      conf->replayEnable = true;
7✔
482
      return TMQ_CONF_OK;
7✔
483
    } else if (strcasecmp(value, "false") == 0) {
×
484
      conf->replayEnable = false;
×
485
      return TMQ_CONF_OK;
×
486
    } else {
487
      tqErrorC("invalid value for enable.replay: %s", value);
×
488
      return TMQ_CONF_INVALID;
×
489
    }
490
  }
491
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
43✔
492
    int64_t tmp;
493
    code = taosStr2int64(value, &tmp);
40✔
494
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
40!
495
    return TMQ_CONF_OK;
40✔
496
  }
497

498
  if (strcasecmp(key, "td.connect.db") == 0) {
3!
499
    return TMQ_CONF_OK;
×
500
  }
501

502
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
3!
503
    int64_t tmp;
504
    code = taosStr2int64(value, &tmp);
4✔
505
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
4!
506
    return TMQ_CONF_OK;
4✔
507
  }
508

509
  tqErrorC("unknown key: %s", key);
×
510
  return TMQ_CONF_UNKNOWN;
×
511
}
512

513
tmq_list_t* tmq_list_new() {
1,314✔
514
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
1,314✔
515
}
516

517
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
517✔
518
  if (list == NULL) {
517!
519
    return TSDB_CODE_INVALID_PARA;
×
520
  }
521
  SArray* container = &list->container;
517✔
522
  if (src == NULL || src[0] == 0) {
517!
523
    return TSDB_CODE_INVALID_PARA;
×
524
  }
525
  char* topic = taosStrdup(src);
517✔
526
  if (topic == NULL) return terrno;
517!
527
  if (taosArrayPush(container, &topic) == NULL) {
517!
528
    taosMemoryFree(topic);
×
529
    return terrno;
×
530
  }
531
  return 0;
517✔
532
}
533

534
void tmq_list_destroy(tmq_list_t* list) {
1,314✔
535
  if (list == NULL) return;
1,314!
536
  SArray* container = &list->container;
1,314✔
537
  taosArrayDestroyP(container, taosMemoryFree);
1,314✔
538
}
539

540
int32_t tmq_list_get_size(const tmq_list_t* list) {
7✔
541
  if (list == NULL) {
7!
542
    return TSDB_CODE_INVALID_PARA;
×
543
  }
544
  const SArray* container = &list->container;
7✔
545
  return taosArrayGetSize(container);
7✔
546
}
547

548
char** tmq_list_to_c_array(const tmq_list_t* list) {
7✔
549
  if (list == NULL) {
7!
550
    return NULL;
×
551
  }
552
  const SArray* container = &list->container;
7✔
553
  return container->pData;
7✔
554
}
555

556
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
12,775✔
557
  int64_t refId = pParamSet->refId;
12,775✔
558
  int32_t code = 0;
12,775✔
559
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
12,775✔
560
  if (tmq == NULL) {
12,775!
561
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
562
  }
563

564
  // if no more waiting rsp
565
  if (pParamSet->callbackFn != NULL) {
12,775✔
566
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
12,751✔
567
  }
568

569
  taosMemoryFree(pParamSet);
12,775✔
570
  if (tmq != NULL) {
12,775!
571
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
12,775✔
572
  }
573

574
  return code;
12,775✔
575
}
576

577
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
18,416✔
578
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
18,416✔
579
  if (waitingRspNum == 0) {
18,417✔
580
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
12,775!
581
             vgId);
582
    return tmqCommitDone(pParamSet);
12,775✔
583
  } else {
584
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
5,642!
585
             waitingRspNum);
586
  }
587
  return 0;
5,642✔
588
}
589

590
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
5,672✔
591
  if (pBuf){
5,672!
592
    taosMemoryFreeClear(pBuf->pData);
5,672!
593
    taosMemoryFreeClear(pBuf->pEpSet);
5,672✔
594
  }
595
  if(param == NULL){
5,672!
596
    return TSDB_CODE_INVALID_PARA;
×
597
  }
598
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
5,672✔
599
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
5,672✔
600

601
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
5,672✔
602
}
603

604
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
5,673✔
605
                               SMqCommitCbParamSet* pParamSet) {
606
  SMqVgOffset pOffset = {0};
5,673✔
607

608
  pOffset.consumerId = tmq->consumerId;
5,673✔
609
  pOffset.offset.val = *offset;
5,673✔
610
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
5,673✔
611
  int32_t len = 0;
5,673✔
612
  int32_t code = 0;
5,673✔
613
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
5,673!
614
  if (code < 0) {
5,673!
615
    return TSDB_CODE_INVALID_PARA;
×
616
  }
617

618
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
5,673✔
619
  if (buf == NULL) {
5,673!
620
    return terrno;
×
621
  }
622

623
  ((SMsgHead*)buf)->vgId = htonl(vgId);
5,673✔
624

625
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
5,673✔
626

627
  SEncoder encoder = {0};
5,673✔
628
  tEncoderInit(&encoder, abuf, len);
5,673✔
629
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
5,673!
630
    tEncoderClear(&encoder);
×
631
    taosMemoryFree(buf);
×
632
    return TSDB_CODE_INVALID_PARA;
×
633
  }
634
  tEncoderClear(&encoder);
5,671✔
635

636
  // build param
637
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
5,672✔
638
  if (pParam == NULL) {
5,672!
639
    taosMemoryFree(buf);
×
640
    return terrno;
×
641
  }
642

643
  pParam->params = pParamSet;
5,672✔
644
  pParam->vgId = vgId;
5,672✔
645
  pParam->consumerId = tmq->consumerId;
5,672✔
646

647
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
5,672✔
648

649
  // build send info
650
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,672✔
651
  if (pMsgSendInfo == NULL) {
5,673!
652
    taosMemoryFree(buf);
×
653
    taosMemoryFree(pParam);
×
654
    return terrno;
×
655
  }
656

657
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
5,673✔
658

659
  pMsgSendInfo->requestId = generateRequestId();
5,673✔
660
  pMsgSendInfo->requestObjRefId = 0;
5,673✔
661
  pMsgSendInfo->param = pParam;
5,673✔
662
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
5,673✔
663
  pMsgSendInfo->fp = tmqCommitCb;
5,673✔
664
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
5,673✔
665

666
  // int64_t transporterId = 0;
667
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
5,673✔
668
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
5,672✔
669
  if (code != 0) {
5,673!
670
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
671
  }
672
  return code;
5,673✔
673
}
674

675
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
196✔
676
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
196✔
677
  for (int32_t i = 0; i < numOfTopics; ++i) {
197✔
678
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
127✔
679
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
127!
680
      continue;
1✔
681
    }
682
    *topic = pTopic;
126✔
683
    return 0;
126✔
684
  }
685

686
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
70!
687
  return TSDB_CODE_TMQ_INVALID_TOPIC;
70✔
688
}
689

690
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
12,787✔
691
                                       SMqCommitCbParamSet** ppParamSet) {
692
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
12,787✔
693
  if (pParamSet == NULL) {
12,787!
694
    return terrno;
×
695
  }
696

697
  pParamSet->refId = tmq->refId;
12,787✔
698
  pParamSet->epoch = tmq->epoch;
12,787✔
699
  pParamSet->callbackFn = pCommitFp;
12,787✔
700
  pParamSet->userParam = userParam;
12,787✔
701
  pParamSet->waitingRspNum = rspNum;
12,787✔
702
  *ppParamSet = pParamSet;
12,787✔
703
  return 0;
12,787✔
704
}
705

706
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
189✔
707
  SMqClientTopic* pTopic = NULL;
189✔
708
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
189✔
709
  if (code != 0) {
189✔
710
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
70!
711
    return code;
70✔
712
  }
713

714
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
119✔
715
  for (int32_t i = 0; i < numOfVgs; ++i) {
199✔
716
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
184✔
717
    if (pClientVg && pClientVg->vgId == vgId) {
184!
718
      *pVg = pClientVg;
104✔
719
      break;
104✔
720
    }
721
  }
722

723
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
119✔
724
}
725

726
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
24,506✔
727
  int32_t code = 0;
24,506✔
728
  if (offsetVal->type <= 0) {
24,506✔
729
    code = TSDB_CODE_TMQ_INVALID_MSG;
1,521✔
730
    return code;
1,521✔
731
  }
732
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
22,985✔
733
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
17,312✔
734
    return code;
17,312✔
735
  }
736
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
5,673✔
737
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
5,673✔
738

739
  char commitBuf[TSDB_OFFSET_LEN] = {0};
5,673✔
740
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
5,673✔
741

742
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
5,673✔
743
  if (code != TSDB_CODE_SUCCESS) {
5,673!
744
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
745
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
746
    return code;
×
747
  }
748

749
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
5,673!
750
          tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
751
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
5,673✔
752
  return code;
5,673✔
753
}
754

755
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
43✔
756
                                 tmq_commit_cb* pCommitFp, void* userParam) {
757
  tqInfoC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
43!
758
  SMqCommitCbParamSet* pParamSet = NULL;
43✔
759
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
43✔
760
  if (code != 0){
43!
761
    return code;
×
762
  }
763

764
  taosRLockLatch(&tmq->lock);
43✔
765
  SMqClientVg* pVg = NULL;
43✔
766
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
43✔
767
  if (code == 0) {
43!
768
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
43✔
769
  }
770
  taosRUnLockLatch(&tmq->lock);
43✔
771

772
  if (code != 0){
43✔
773
    taosMemoryFree(pParamSet);
12✔
774
  }
775
  return code;
43✔
776
}
777

778
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
29✔
779
  char*        pTopicName = NULL;
29✔
780
  int32_t      vgId = 0;
29✔
781
  STqOffsetVal offsetVal = {0};
29✔
782
  int32_t      code = 0;
29✔
783

784
  if (pRes == NULL || tmq == NULL) {
29!
785
    code = TSDB_CODE_INVALID_PARA;
×
786
    goto end;
×
787
  }
788

789
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_META(pRes) ||
29!
790
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
791
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
29✔
792
    pTopicName = pRspObj->topic;
29✔
793
    vgId = pRspObj->vgId;
29✔
794
    offsetVal = pRspObj->rspOffset;
29✔
795
  } else {
796
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
797
    goto end;
×
798
  }
799

800
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
29✔
801

802
end:
29✔
803
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
29!
804
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
805
    pCommitFp(tmq, code, userParam);
×
806
  }
807
}
29✔
808

809
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
12,744✔
810
  int32_t code = 0;
12,744✔
811
  taosRLockLatch(&tmq->lock);
12,744✔
812
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
12,744✔
813
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
12,744!
814

815
  for (int32_t i = 0; i < numOfTopics; i++) {
25,371✔
816
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
12,627✔
817
    if (pTopic == NULL) {
12,627!
818
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
819
      goto END;
×
820
    }
821
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
12,627✔
822
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
12,627!
823
    for (int32_t j = 0; j < numOfVgroups; j++) {
37,090✔
824
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
24,463✔
825
      if (pVg == NULL) {
24,463!
826
        code = terrno;
×
827
        goto END;
×
828
      }
829

830
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
24,463✔
831
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
24,463✔
832
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
1,521!
833
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
834
      }
835
    }
836
  }
837
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
12,744!
838
           numOfTopics);
839
END:
12,033✔
840
  taosRUnLockLatch(&tmq->lock);
12,744✔
841
  return code;
12,744✔
842
}
843

844
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
12,744✔
845
  int32_t code = 0;
12,744✔
846
  SMqCommitCbParamSet* pParamSet = NULL;
12,744✔
847
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
848
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
12,744✔
849
  if (code != 0) {
12,744!
850
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
851
    if (pCommitFp != NULL) {
×
852
      pCommitFp(tmq, code, userParam);
×
853
    }
854
    return;
×
855
  }
856
  code = innerCommitAll(tmq, pParamSet);
12,744✔
857
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
12,744✔
858
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
267!
859
  }
860

861
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
12,744✔
862
  if (code != 0) {
12,744!
863
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
864
  }
865
  return;
12,744✔
866
}
867

868
static void generateTimedTask(int64_t refId, int32_t type) {
17,072✔
869
  tmq_t*  tmq = NULL;
17,072✔
870
  int8_t* pTaskType = NULL;
17,072✔
871
  int32_t code = 0;
17,072✔
872

873
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
17,072✔
874
  if (tmq == NULL) return;
17,072!
875

876
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
17,072✔
877
  if (code == TSDB_CODE_SUCCESS) {
17,072!
878
    *pTaskType = type;
17,072✔
879
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
17,072!
880
      if (tsem2_post(&tmq->rspSem) != 0){
17,072!
881
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
882
      }
883
    }else{
884
      taosFreeQitem(pTaskType);
×
885
    }
886
  }
887

888
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
17,072✔
889
  if (code != 0){
17,072!
890
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
891
  }
892
}
893

894
void tmqAssignAskEpTask(void* param, void* tmrId) {
4,876✔
895
  int64_t refId = (int64_t)param;
4,876✔
896
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
4,876✔
897
}
4,876✔
898

899
void tmqReplayTask(void* param, void* tmrId) {
12✔
900
  int64_t refId = (int64_t)param;
12✔
901
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
12✔
902
  if (tmq == NULL) return;
12!
903

904
  if (tsem2_post(&tmq->rspSem) != 0){
12!
905
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
906
  }
907
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
12✔
908
  if (code != 0){
12!
909
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
910
  }
911
}
912

913
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
12,196✔
914
  int64_t refId = (int64_t)param;
12,196✔
915
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
12,196✔
916
}
12,196✔
917

918
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
2,691✔
919
  if (pMsg == NULL) {
2,691!
920
    return TSDB_CODE_INVALID_PARA;
×
921
  }
922

923
  if (param == NULL || code != 0){
2,691!
924
    goto END;
40✔
925
  }
926

927
  SMqHbRsp rsp = {0};
2,651✔
928
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
2,651✔
929
  if (code != 0) {
2,651!
930
    goto END;
×
931
  }
932

933
  int64_t refId = (int64_t)param;
2,651✔
934
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,651✔
935
  if (tmq != NULL) {
2,651!
936
    taosWLockLatch(&tmq->lock);
2,651✔
937
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
5,050✔
938
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
2,399✔
939
      if (privilege && privilege->noPrivilege == 1) {
2,399!
940
        int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
3✔
941
        for (int32_t j = 0; j < topicNumCur; j++) {
6✔
942
          SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
3✔
943
          if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
3!
944
            tqInfoC("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
3!
945
            pTopicCur->noPrivilege = 1;
3✔
946
          }
947
        }
948
      }
949
    }
950
    taosWUnLockLatch(&tmq->lock);
2,651✔
951
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
2,651✔
952
    if (code != 0){
2,651!
953
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
954
    }
955
  }
956

957
  tqClientDebugFlag = rsp.debugFlag;
2,651✔
958

959
  tDestroySMqHbRsp(&rsp);
2,651✔
960

961
END:
2,691✔
962
  taosMemoryFree(pMsg->pData);
2,691✔
963
  taosMemoryFree(pMsg->pEpSet);
2,691✔
964
  return code;
2,691✔
965
}
966

967
void tmqSendHbReq(void* param, void* tmrId) {
2,691✔
968
  int64_t refId = (int64_t)param;
2,691✔
969

970
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,691✔
971
  if (tmq == NULL) {
2,691!
972
    return;
×
973
  }
974

975
  SMqHbReq req = {0};
2,691✔
976
  req.consumerId = tmq->consumerId;
2,691✔
977
  req.epoch = tmq->epoch;
2,691✔
978
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
2,691✔
979
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
2,691!
980
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
2,691✔
981
  if (req.topics == NULL) {
2,691!
982
    goto END;
×
983
  }
984
  taosRLockLatch(&tmq->lock);
2,691✔
985
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
5,167✔
986
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
2,476✔
987
    if (pTopic == NULL) {
2,476!
988
      continue;
×
989
    }
990
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
2,476✔
991
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
2,476✔
992
    if (data == NULL) {
2,476!
993
      continue;
×
994
    }
995
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
2,476✔
996
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
2,476✔
997
    if (data->offsetRows == NULL) {
2,476!
998
      continue;
×
999
    }
1000
    for (int j = 0; j < numOfVgroups; j++) {
8,633✔
1001
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
6,157✔
1002
      if (pVg == NULL) {
6,157!
1003
        continue;
×
1004
      }
1005
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
6,157✔
1006
      if (offRows == NULL) {
6,157!
1007
        continue;
×
1008
      }
1009
      offRows->vgId = pVg->vgId;
6,157✔
1010
      offRows->rows = pVg->numOfRows;
6,157✔
1011
      offRows->offset = pVg->offsetInfo.endOffset;
6,157✔
1012
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
6,157✔
1013
      char buf[TSDB_OFFSET_LEN] = {0};
6,157✔
1014
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
6,157✔
1015
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
6,157!
1016
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1017
    }
1018
  }
1019
  taosRUnLockLatch(&tmq->lock);
2,691✔
1020

1021
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
2,691✔
1022
  if (tlen < 0) {
2,691!
1023
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1024
    goto END;
×
1025
  }
1026

1027
  void* pReq = taosMemoryCalloc(1, tlen);
2,691✔
1028
  if (pReq == NULL) {
2,691!
1029
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1030
    goto END;
×
1031
  }
1032

1033
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
2,691!
1034
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1035
    taosMemoryFree(pReq);
×
1036
    goto END;
×
1037
  }
1038

1039
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,691✔
1040
  if (sendInfo == NULL) {
2,691!
1041
    taosMemoryFree(pReq);
×
1042
    goto END;
×
1043
  }
1044

1045
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2,691✔
1046

1047
  sendInfo->requestId = generateRequestId();
2,691✔
1048
  sendInfo->requestObjRefId = 0;
2,691✔
1049
  sendInfo->param = (void*)refId;
2,691✔
1050
  sendInfo->fp = tmqHbCb;
2,691✔
1051
  sendInfo->msgType = TDMT_MND_TMQ_HB;
2,691✔
1052

1053
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
2,691✔
1054

1055
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
2,691✔
1056
  if (code != 0) {
2,691!
1057
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1058
  }
1059
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
2,691✔
1060

1061
END:
2,691✔
1062
  tDestroySMqHbReq(&req);
2,691✔
1063
  if (tmrId != NULL) {
2,691✔
1064
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
1,857✔
1065
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
1,857!
1066
  }
1067
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
2,691✔
1068
  if (ret != 0){
2,691!
1069
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1070
  }
1071
}
1072

1073
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
1,195✔
1074
  if (code != 0) {
1,195!
1075
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1076
  }
1077
}
1,195✔
1078

1079
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
55,866✔
1080
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
55,866✔
1081
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
7,321✔
1082
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
48,545✔
1083
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
48,327✔
1084
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
218✔
1085
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
11!
1086
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
207✔
1087
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
188!
1088
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
19!
1089
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
19!
1090
  }
1091
}
55,866✔
1092

1093
static void freeClientVg(void* param) {
4,464✔
1094
  SMqClientVg* pVg = param;
4,464✔
1095
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
4,464✔
1096
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
4,464✔
1097
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
4,464✔
1098
}
4,464✔
1099
static void freeClientTopic(void* param) {
3,475✔
1100
  SMqClientTopic* pTopic = param;
3,475✔
1101
  taosMemoryFreeClear(pTopic->schema.pSchema);
3,475✔
1102
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
3,475✔
1103
}
3,475✔
1104

1105
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
3,475✔
1106
                                   tmq_t* tmq) {
1107
  pTopic->schema = pTopicEp->schema;
3,475✔
1108
  pTopicEp->schema.nCols = 0;
3,475✔
1109
  pTopicEp->schema.pSchema = NULL;
3,475✔
1110

1111
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
3,475✔
1112
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
3,475✔
1113

1114
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
3,475✔
1115
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
3,475✔
1116

1117
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
3,475!
1118
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
3,475✔
1119
  if (pTopic->vgs == NULL) {
3,475!
1120
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1121
    return;
×
1122
  }
1123
  for (int32_t j = 0; j < vgNumGet; j++) {
7,939✔
1124
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
4,464✔
1125
    if (pVgEp == NULL) {
4,464!
1126
      continue;
×
1127
    }
1128
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
4,464✔
1129
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
4,464✔
1130

1131
    STqOffsetVal offsetNew = {0};
4,464✔
1132
    offsetNew.type = tmq->resetOffsetCfg;
4,464✔
1133

1134
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
4,464!
1135
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1136

1137
    SMqClientVg clientVg = {
13,392✔
1138
        .pollCnt = 0,
1139
        .vgId = pVgEp->vgId,
4,464✔
1140
        .epSet = pVgEp->epSet,
1141
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
4,464✔
1142
        .vgSkipCnt = 0,
1143
        .emptyBlockReceiveTs = 0,
1144
        .blockReceiveTs = 0,
1145
        .blockSleepForReplay = 0,
1146
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
4,464✔
1147
    };
1148

1149
    clientVg.offsetInfo.walVerBegin = -1;
4,464✔
1150
    clientVg.offsetInfo.walVerEnd = -1;
4,464✔
1151
    clientVg.seekUpdated = false;
4,464✔
1152
    if (pInfo) {
4,464✔
1153
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
3,023✔
1154
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
3,023✔
1155
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
3,023✔
1156
    } else {
1157
      clientVg.offsetInfo.endOffset = offsetNew;
1,441✔
1158
      clientVg.offsetInfo.committedOffset = offsetNew;
1,441✔
1159
      clientVg.offsetInfo.beginOffset = offsetNew;
1,441✔
1160
    }
1161
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
8,928!
1162
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1163
               pTopic->topicName);
1164
      freeClientVg(&clientVg);
×
1165
    }
1166
  }
1167
}
1168

1169
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
3,386✔
1170
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
3,386✔
1171
  if (pVgOffsetHashMap == NULL) {
3,386!
1172
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1173
    return;
×
1174
  }
1175

1176
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
3,386✔
1177
  for (int32_t i = 0; i < topicNumCur; i++) {
6,359✔
1178
    // find old topic
1179
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
2,973✔
1180
    if (pTopicCur && pTopicCur->vgs) {
2,973!
1181
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
2,973✔
1182
      tqInfoC("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
2,973!
1183
      for (int32_t j = 0; j < vgNumCur; j++) {
6,015✔
1184
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
3,042✔
1185
        if (pVgCur == NULL) {
3,042!
1186
          continue;
×
1187
        }
1188
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
3,042✔
1189
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
3,042✔
1190

1191
        char buf[TSDB_OFFSET_LEN] = {0};
3,042✔
1192
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
3,042✔
1193
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
3,042!
1194

1195
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
3,042✔
1196
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1197
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1198
            .numOfRows = pVgCur->numOfRows,
3,042✔
1199
            .vgStatus = pVgCur->vgStatus};
3,042✔
1200
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
3,042!
1201
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1202
        }
1203
      }
1204
    }
1205
  }
1206

1207
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
6,861✔
1208
    SMqClientTopic topic = {0};
3,475✔
1209
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
3,475✔
1210
    if (pTopicEp == NULL) {
3,475!
1211
      continue;
×
1212
    }
1213
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
3,475✔
1214
    if (taosArrayPush(newTopics, &topic) == NULL) {
3,475!
1215
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1216
      freeClientTopic(&topic);
×
1217
    }
1218
  }
1219

1220
  taosHashCleanup(pVgOffsetHashMap);
3,386✔
1221
}
1222

1223
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
8,597✔
1224
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
8,597✔
1225
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1226
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1227
  if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
8,598✔
1228
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
4,770!
1229
             tmq->epoch, epoch, topicNumGet);
1230
    return;
4,770✔
1231
  }
1232

1233
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
3,828✔
1234
  if (newTopics == NULL) {
3,828!
1235
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1236
    return;
×
1237
  }
1238
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
3,828!
1239
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1240

1241
  taosWLockLatch(&tmq->lock);
3,828✔
1242
  if (topicNumGet > 0){
3,828✔
1243
    buildNewTopicList(tmq, newTopics, pRsp);
3,386✔
1244
  }
1245
  // destroy current buffered existed topics info
1246
  if (tmq->clientTopics) {
3,828!
1247
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
3,828✔
1248
  }
1249
  tmq->clientTopics = newTopics;
3,828✔
1250
  taosWUnLockLatch(&tmq->lock);
3,828✔
1251

1252
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
3,828✔
1253
  atomic_store_32(&tmq->epoch, epoch);
3,828✔
1254

1255
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
3,828!
1256
}
1257

1258
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
11,157✔
1259
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
11,157✔
1260
  if (pParam == NULL) {
11,157!
1261
    goto FAIL;
×
1262
  }
1263

1264
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
11,157✔
1265
  if (tmq == NULL) {
11,157!
1266
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1267
    goto FAIL;
×
1268
  }
1269

1270
  if (code != TSDB_CODE_SUCCESS) {
11,157✔
1271
    if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
2,546✔
1272
      tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
30!
1273
    }
1274
    goto END;
2,546✔
1275
  }
1276

1277
  if (pMsg == NULL) {
8,611!
1278
    goto END;
×
1279
  }
1280
  SMqRspHead* head = pMsg->pData;
8,611✔
1281
  int32_t     epoch = atomic_load_32(&tmq->epoch);
8,611✔
1282
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
8,610!
1283
  if (pParam->sync) {
8,610✔
1284
    SMqAskEpRsp rsp = {0};
1,289✔
1285
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
2,578!
1286
      doUpdateLocalEp(tmq, head->epoch, &rsp);
1,289✔
1287
    }
1288
    tDeleteSMqAskEpRsp(&rsp);
1289
  } else {
1290
    SMqRspWrapper* pWrapper = NULL;
7,321✔
1291
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
7,321✔
1292
    if (code) {
7,322!
1293
      goto END;
×
1294
    }
1295

1296
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
7,322✔
1297
    pWrapper->epoch = head->epoch;
7,322✔
1298
    (void)memcpy(&pWrapper->epRsp, pMsg->pData, sizeof(SMqRspHead));
7,322✔
1299
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->epRsp) == NULL) {
14,644!
1300
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1301
      taosFreeQitem(pWrapper);
×
1302
    } else {
1303
      code = taosWriteQitem(tmq->mqueue, pWrapper);
7,322✔
1304
      if (code != 0) {
7,321!
1305
        tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1306
        taosFreeQitem(pWrapper);
×
1307
        tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1308
      }
1309
    }
1310
  }
1311

1312
  END:
11,156✔
1313
  {
1314
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
11,156✔
1315
    if (ret != 0){
11,157!
1316
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1317
    }
1318
  }
1319

1320
  FAIL:
11,157✔
1321
  if (pParam && pParam->sync) {
11,157✔
1322
    SAskEpInfo* pInfo = pParam->pParam;
3,809✔
1323
    if (pInfo) {
3,809!
1324
      pInfo->code = code;
3,809✔
1325
      if (tsem2_post(&pInfo->sem) != 0){
3,809!
1326
        tqErrorC("failed to post rsp sem askep cb");
×
1327
      }
1328
    }
1329
  }
1330

1331
  if (pMsg) {
11,157✔
1332
    taosMemoryFree(pMsg->pEpSet);
11,156✔
1333
    taosMemoryFree(pMsg->pData);
11,156✔
1334
  }
1335

1336
  return code;
11,156✔
1337
}
1338

1339
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
11,154✔
1340
  SMqAskEpReq req = {0};
11,154✔
1341
  req.consumerId = pTmq->consumerId;
11,154✔
1342
  req.epoch = updateEpSet ? -1 : pTmq->epoch;
11,154✔
1343
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
11,154✔
1344
  int              code = 0;
11,154✔
1345
  SMqAskEpCbParam* pParam = NULL;
11,154✔
1346
  void*            pReq = NULL;
11,154✔
1347

1348
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
11,154✔
1349
  if (tlen < 0) {
11,157!
1350
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
×
1351
    return TSDB_CODE_INVALID_PARA;
×
1352
  }
1353

1354
  pReq = taosMemoryCalloc(1, tlen);
11,157✔
1355
  if (pReq == NULL) {
11,156!
1356
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
×
1357
    return terrno;
×
1358
  }
1359

1360
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
11,156!
1361
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
×
1362
    taosMemoryFree(pReq);
×
1363
    return TSDB_CODE_INVALID_PARA;
×
1364
  }
1365

1366
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
11,156✔
1367
  if (pParam == NULL) {
11,157!
1368
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
×
1369
    taosMemoryFree(pReq);
×
1370
    return terrno;
×
1371
  }
1372

1373
  pParam->refId = pTmq->refId;
11,157✔
1374
  pParam->sync = sync;
11,157✔
1375
  pParam->pParam = param;
11,157✔
1376

1377
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
11,157✔
1378
  if (sendInfo == NULL) {
11,156!
1379
    taosMemoryFree(pReq);
×
1380
    taosMemoryFree(pParam);
×
1381
    return terrno;
×
1382
  }
1383

1384
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
11,156✔
1385
  sendInfo->requestId = generateRequestId();
11,156✔
1386
  sendInfo->requestObjRefId = 0;
11,156✔
1387
  sendInfo->param = pParam;
11,156✔
1388
  sendInfo->paramFreeFp = taosMemoryFree;
11,156✔
1389
  sendInfo->fp = askEpCb;
11,156✔
1390
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
11,156✔
1391

1392
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
11,156✔
1393
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
11,157!
1394
  return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
11,157✔
1395
}
1396

1397
void tmqHandleAllDelayedTask(tmq_t* pTmq) {
96,858✔
1398
  STaosQall* qall = NULL;
96,858✔
1399
  int32_t    code = 0;
96,858✔
1400

1401
  code = taosAllocateQall(&qall);
96,858✔
1402
  if (code) {
96,862!
1403
    tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
×
1404
    return;
81,445✔
1405
  }
1406

1407
  int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
96,862✔
1408
  if (numOfItems == 0) {
96,861✔
1409
    taosFreeQall(qall);
81,447✔
1410
    return;
81,445✔
1411
  }
1412

1413
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
15,414!
1414
  int8_t* pTaskType = NULL;
15,414✔
1415
  while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
31,889✔
1416
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
16,475✔
1417
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
4,463!
1418
      code = askEp(pTmq, NULL, false, false);
4,463✔
1419
      if (code != 0) {
4,463!
1420
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1421
        continue;
×
1422
      }
1423
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
4,463!
1424
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
4,463✔
1425
                              &pTmq->epTimer);
1426
      tqDebugC("reset timer for tmq ask ep:%d", ret);
4,463!
1427
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
12,012!
1428
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
12,012✔
1429
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
12,012✔
1430
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
12,012!
1431
               pTmq->autoCommitInterval / 1000.0);
1432
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
12,012✔
1433
                              &pTmq->commitTimer);
1434
      tqDebugC("reset timer for commit:%d", ret);
12,012!
1435
    } else {
1436
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1437
    }
1438

1439
    taosFreeQitem(pTaskType);
16,475✔
1440
  }
1441

1442
  taosFreeQall(qall);
15,414✔
1443
}
1444

1445
void tmqClearUnhandleMsg(tmq_t* tmq) {
446✔
1446
  SMqRspWrapper* rspWrapper = NULL;
446✔
1447
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
484✔
1448
    tmqFreeRspWrapper(rspWrapper);
38✔
1449
    taosFreeQitem(rspWrapper);
38✔
1450
  }
1451

1452
  rspWrapper = NULL;
446✔
1453
  if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
446✔
1454
    return;
86✔
1455
  }
1456
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
1,440✔
1457
    tmqFreeRspWrapper(rspWrapper);
1,080✔
1458
    taosFreeQitem(rspWrapper);
1,080✔
1459
  }
1460
}
1461

1462
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
1,298✔
1463
  if (pMsg) {
1,298!
1464
    taosMemoryFreeClear(pMsg->pEpSet);
1,298✔
1465
  }
1466

1467
  if (param == NULL) {
1,298!
1468
    return code;
×
1469
  }
1470

1471
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
1,298✔
1472
  pParam->rspErr = code;
1,298✔
1473

1474
  if (tsem2_post(&pParam->rspSem) != 0){
1,298!
1475
    tqErrorC("failed to post sem, subscribe cb");
×
1476
  }
1477
  return 0;
1,298✔
1478
}
1479

1480
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
33✔
1481
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
33!
1482
  if (*topics == NULL) {
33✔
1483
    *topics = tmq_list_new();
26✔
1484
    if (*topics == NULL) {
26!
1485
      return terrno;
×
1486
    }
1487
  }
1488
  taosRLockLatch(&tmq->lock);
33✔
1489
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
44✔
1490
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
11✔
1491
    if (topic == NULL) {
11!
1492
      tqErrorC("topic is null");
×
1493
      continue;
×
1494
    }
1495
    char* tmp = strchr(topic->topicName, '.');
11✔
1496
    if (tmp == NULL) {
11!
1497
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1498
      continue;
×
1499
    }
1500
    if (tmq_list_append(*topics, tmp + 1) != 0) {
11!
1501
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1502
      continue;
×
1503
    }
1504
  }
1505
  taosRUnLockLatch(&tmq->lock);
33✔
1506
  return 0;
33✔
1507
}
1508

1509
void tmqFreeImpl(void* handle) {
446✔
1510
  tmq_t*  tmq = (tmq_t*)handle;
446✔
1511
  int64_t id = tmq->consumerId;
446✔
1512

1513
  if (tmq->mqueue) {
446!
1514
    tmqClearUnhandleMsg(tmq);
446✔
1515
    taosCloseQueue(tmq->mqueue);
446✔
1516
  }
1517

1518
  if (tmq->delayedTask) {
446!
1519
    taosCloseQueue(tmq->delayedTask);
446✔
1520
  }
1521

1522
  taosFreeQall(tmq->qall);
446✔
1523
  if(tsem2_destroy(&tmq->rspSem) != 0) {
446!
1524
    tqErrorC("failed to destroy sem in free tmq");
×
1525
  }
1526

1527
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
446✔
1528
  taos_close_internal(tmq->pTscObj);
446✔
1529

1530
  if (tmq->commitTimer) {
446✔
1531
    if (!taosTmrStopA(&tmq->commitTimer)) {
442✔
1532
      tqErrorC("failed to stop commit timer");
184!
1533
    }
1534
  }
1535
  if (tmq->epTimer) {
446✔
1536
    if (!taosTmrStopA(&tmq->epTimer)) {
442✔
1537
      tqErrorC("failed to stop ep timer");
413!
1538
    }
1539
  }
1540
  if (tmq->hbLiveTimer) {
446!
1541
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
446!
1542
      tqErrorC("failed to stop hb timer");
×
1543
    }
1544
  }
1545
  taosMemoryFree(tmq);
446✔
1546

1547
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
446!
1548
}
446✔
1549

1550
static void tmqMgmtInit(void) {
302✔
1551
  tmqInitRes = 0;
302✔
1552
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
302✔
1553

1554
  if (tmqMgmt.timer == NULL) {
302!
1555
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
×
1556
  }
1557

1558
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
302✔
1559
  if (tmqMgmt.rsetId < 0) {
302!
1560
    tmqInitRes = terrno;
×
1561
  }
1562
}
302✔
1563

1564
void tmqMgmtClose(void) {
4,082✔
1565
  if (tmqMgmt.timer) {
4,082✔
1566
    taosTmrCleanUp(tmqMgmt.timer);
302✔
1567
    tmqMgmt.timer = NULL;
302✔
1568
  }
1569

1570
  if (tmqMgmt.rsetId >= 0) {
4,082!
1571
    taosCloseRef(tmqMgmt.rsetId);
4,082✔
1572
    tmqMgmt.rsetId = -1;
4,082✔
1573
  }
1574
}
4,082✔
1575

1576
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
436✔
1577
  int32_t code = 0;
436✔
1578

1579
  if (conf == NULL) {
436!
1580
    SET_ERROR_MSG_TMQ("configure is null")
×
1581
    return NULL;
×
1582
  }
1583
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
436✔
1584
  if (code != 0) {
446!
1585
    SET_ERROR_MSG_TMQ("tmq init error")
×
1586
    return NULL;
×
1587
  }
1588
  if (tmqInitRes != 0) {
446!
1589
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1590
    return NULL;
×
1591
  }
1592

1593
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
446✔
1594
  if (pTmq == NULL) {
446!
1595
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1596
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1597
    return NULL;
×
1598
  }
1599

1600
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
446✔
1601
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
446✔
1602

1603
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
446✔
1604
  if (pTmq->clientTopics == NULL) {
446!
1605
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1606
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1607
    goto _failed;
×
1608
  }
1609
  code = taosOpenQueue(&pTmq->mqueue);
446✔
1610
  if (code) {
446!
1611
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1612
             pTmq->groupId);
1613
    SET_ERROR_MSG_TMQ("open queue failed")
×
1614
    goto _failed;
×
1615
  }
1616

1617
  code = taosOpenQueue(&pTmq->delayedTask);
446✔
1618
  if (code) {
446!
1619
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1620
             pTmq->groupId);
1621
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1622
    goto _failed;
×
1623
  }
1624

1625
  code = taosAllocateQall(&pTmq->qall);
446✔
1626
  if (code) {
446!
1627
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1628
             pTmq->groupId);
1629
    SET_ERROR_MSG_TMQ("allocate qall failed")
×
1630
    goto _failed;
×
1631
  }
1632

1633
  if (conf->groupId[0] == 0) {
446!
1634
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1635
             pTmq->groupId);
1636
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1637
    goto _failed;
×
1638
  }
1639

1640
  // init status
1641
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
446✔
1642
  pTmq->pollCnt = 0;
446✔
1643
  pTmq->epoch = 0;
446✔
1644
  pTmq->pollFlag = 0;
446✔
1645

1646
  // set conf
1647
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
446✔
1648
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
446✔
1649
  pTmq->withTbName = conf->withTbName;
446✔
1650
  pTmq->useSnapshot = conf->snapEnable;
446✔
1651
  pTmq->autoCommit = conf->autoCommit;
446✔
1652
  pTmq->autoCommitInterval = conf->autoCommitInterval;
446✔
1653
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
446✔
1654
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
446✔
1655
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
446✔
1656
  pTmq->commitCb = conf->commitCb;
446✔
1657
  pTmq->commitCbUserParam = conf->commitCbUserParam;
446✔
1658
  pTmq->resetOffsetCfg = conf->resetOffset;
446✔
1659
  pTmq->replayEnable = conf->replayEnable;
446✔
1660
  pTmq->sourceExcluded = conf->sourceExcluded;
446✔
1661
  pTmq->enableBatchMeta = conf->enableBatchMeta;
446✔
1662
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
446✔
1663
  if (taosGetFqdn(pTmq->fqdn) != 0) {
446!
1664
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1665
  }
1666
  if (conf->replayEnable) {
444✔
1667
    pTmq->autoCommit = false;
7✔
1668
  }
1669
  taosInitRWLatch(&pTmq->lock);
444✔
1670

1671
  // assign consumerId
1672
  pTmq->consumerId = tGenIdPI64();
444✔
1673

1674
  // init semaphore
1675
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
445!
1676
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1677
             tstrerror(TAOS_SYSTEM_ERROR(errno)), pTmq->groupId);
1678
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1679
    goto _failed;
×
1680
  }
1681

1682
  // init connection
1683
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
443✔
1684
  if (code) {
446!
1685
    terrno = code;
×
1686
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1687
    SET_ERROR_MSG_TMQ("init tscObj failed")
×
1688
    goto _failed;
×
1689
  }
1690

1691
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
446✔
1692
  if (pTmq->refId < 0) {
446!
1693
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1694
    goto _failed;
×
1695
  }
1696

1697
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
446✔
1698
  if (pTmq->hbLiveTimer == NULL) {
446!
1699
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1700
    goto _failed;
×
1701
  }
1702
  char         buf[TSDB_OFFSET_LEN] = {0};
446✔
1703
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
446✔
1704
  tFormatOffset(buf, tListLen(buf), &offset);
446✔
1705
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
446!
1706
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
1707
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1708
          buf);
1709

1710
  return pTmq;
446✔
1711

1712
_failed:
×
1713
  tmqFreeImpl(pTmq);
×
1714
  return NULL;
×
1715
}
1716

1717
static int32_t syncAskEp(tmq_t* pTmq) {
3,809✔
1718
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
3,809✔
1719
  if (pInfo == NULL) return terrno;
3,809!
1720
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
3,809!
1721
    taosMemoryFree(pInfo);
×
1722
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1723
  }
1724

1725
  int32_t code = askEp(pTmq, pInfo, true, false);
3,809✔
1726
  if (code == 0) {
3,809!
1727
    if (tsem2_wait(&pInfo->sem) != 0){
3,809!
1728
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1729
    }
1730
    code = pInfo->code;
3,809✔
1731
  }
1732

1733
  if(tsem2_destroy(&pInfo->sem) != 0) {
3,809!
1734
    tqErrorC("failed to destroy sem sync ask ep");
×
1735
  }
1736
  taosMemoryFree(pInfo);
3,809✔
1737
  return code;
3,809✔
1738
}
1739

1740
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1,298✔
1741
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
1,298!
1742
  const SArray*   container = &topic_list->container;
1,298✔
1743
  int32_t         sz = taosArrayGetSize(container);
1,298✔
1744
  void*           buf = NULL;
1,298✔
1745
  SMsgSendInfo*   sendInfo = NULL;
1,298✔
1746
  SCMSubscribeReq req = {0};
1,298✔
1747
  int32_t         code = 0;
1,298✔
1748

1749
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
1,298!
1750

1751
  req.consumerId = tmq->consumerId;
1,298✔
1752
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
1,298✔
1753
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1,298✔
1754
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
1,298✔
1755
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
1,298✔
1756

1757
  req.topicNames = taosArrayInit(sz, sizeof(void*));
1,298✔
1758
  if (req.topicNames == NULL) {
1,298!
1759
    code = terrno;
×
1760
    goto END;
×
1761
  }
1762

1763
  req.withTbName = tmq->withTbName;
1,298✔
1764
  req.autoCommit = tmq->autoCommit;
1,298✔
1765
  req.autoCommitInterval = tmq->autoCommitInterval;
1,298✔
1766
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
1,298✔
1767
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
1,298✔
1768
  req.resetOffsetCfg = tmq->resetOffsetCfg;
1,298✔
1769
  req.enableReplay = tmq->replayEnable;
1,298✔
1770
  req.enableBatchMeta = tmq->enableBatchMeta;
1,298✔
1771

1772
  for (int32_t i = 0; i < sz; i++) {
1,821✔
1773
    char* topic = taosArrayGetP(container, i);
523✔
1774
    if (topic == NULL) {
523!
1775
      code = terrno;
×
1776
      goto END;
×
1777
    }
1778
    SName name = {0};
523✔
1779
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
523✔
1780
    if (code) {
523!
1781
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1782
               code);
1783
      goto END;
×
1784
    }
1785
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
523✔
1786
    if (topicFName == NULL) {
523!
1787
      code = terrno;
×
1788
      goto END;
×
1789
    }
1790

1791
    code = tNameExtractFullName(&name, topicFName);
523✔
1792
    if (code) {
523!
1793
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1794
               code);
1795
      taosMemoryFree(topicFName);
×
1796
      goto END;
×
1797
    }
1798

1799
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
1,045!
1800
      code = terrno;
×
1801
      taosMemoryFree(topicFName);
×
1802
      goto END;
×
1803
    }
1804
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
522!
1805
  }
1806

1807
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1,298✔
1808
  buf = taosMemoryMalloc(tlen);
1,298✔
1809
  if (buf == NULL) {
1,298!
1810
    code = terrno;
×
1811
    goto END;
×
1812
  }
1813

1814
  void* abuf = buf;
1,298!
1815
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
1,298✔
1816

1817
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,298✔
1818
  if (sendInfo == NULL) {
1,298!
1819
    code = terrno;
×
1820
    taosMemoryFree(buf);
×
1821
    goto END;
×
1822
  }
1823

1824
  SMqSubscribeCbParam param = {.rspErr = 0};
1,298✔
1825
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
1,298!
1826
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1827
    taosMemoryFree(buf);
×
1828
    taosMemoryFree(sendInfo);
×
1829
    goto END;
×
1830
  }
1831

1832
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1,298✔
1833
  sendInfo->requestId = generateRequestId();
1,298✔
1834
  sendInfo->requestObjRefId = 0;
1,298✔
1835
  sendInfo->param = &param;
1,298✔
1836
  sendInfo->fp = tmqSubscribeCb;
1,298✔
1837
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
1,298✔
1838

1839
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1,298✔
1840

1841
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
1,298✔
1842
  if (code != 0) {
1,298!
1843
    goto END;
×
1844
  }
1845

1846
  if (tsem2_wait(&param.rspSem) != 0){
1,298!
1847
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1848
  }
1849
  if(tsem2_destroy(&param.rspSem) != 0) {
1,298!
1850
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1851
  }
1852

1853
  if (param.rspErr != 0) {
1,298✔
1854
    code = param.rspErr;
5✔
1855
    goto END;
5✔
1856
  }
1857

1858
  int32_t retryCnt = 0;
1,293✔
1859
  while ((code = syncAskEp(tmq)) != 0) {
3,809✔
1860
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2,520!
1861
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
4!
1862
               tmq->consumerId, tstrerror(code));
1863
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
4!
1864
        code = 0;
4✔
1865
      }
1866
      goto END;
4✔
1867
    }
1868

1869
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
2,516!
1870
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
2,516✔
1871
  }
1872

1873
  if (tmq->epTimer == NULL){
1,289✔
1874
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
442✔
1875
  }
1876
  if (tmq->commitTimer == NULL){
1,289✔
1877
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
442✔
1878
  }
1879
  if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
1,289!
1880
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1881
    goto END;
×
1882
  }
1883

1884
END:
1,289✔
1885
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
1,298✔
1886
  return code;
1,298✔
1887
}
1888

1889
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
345✔
1890
  if (conf == NULL) return;
345!
1891
  conf->commitCb = cb;
345✔
1892
  conf->commitCbUserParam = param;
345✔
1893
}
1894

1895
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
47,437✔
1896
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
47,437✔
1897
  for (int i = 0; i < topicNumCur; i++) {
49,276✔
1898
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
49,258✔
1899
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
49,259!
1900
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
47,436✔
1901
      for (int32_t j = 0; j < vgNumCur; j++) {
98,558✔
1902
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
98,543✔
1903
        if (pVgCur && pVgCur->vgId == vgId) {
98,543!
1904
          *pVg = pVgCur;
47,423✔
1905
          return;
47,423✔
1906
        }
1907
      }
1908
    }
1909
  }
1910
}
1911

1912
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
42,926✔
1913
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
42,926✔
1914
  for (int i = 0; i < topicNumCur; i++) {
44,741!
1915
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
44,743✔
1916
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
44,743✔
1917
      return pTopicCur;
42,930✔
1918
    }
1919
  }
1920
  return NULL;
×
1921
}
1922

1923
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
48,535✔
1924
  tmq_t*             tmq = NULL;
48,535✔
1925
  SMqRspWrapper*     pRspWrapper = NULL;
48,535✔
1926
  int8_t             rspType = 0;
48,535✔
1927
  int32_t            vgId = 0;
48,535✔
1928
  uint64_t           requestId = 0;
48,535✔
1929
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
48,535✔
1930
  if (pMsg == NULL) {
48,535!
1931
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1932
  }
1933
  if (pParam == NULL) {
48,535!
1934
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1935
    goto EXIT;
×
1936
  }
1937
  int64_t refId = pParam->refId;
48,535✔
1938
  vgId = pParam->vgId;
48,535✔
1939
  requestId = pParam->requestId;
48,535✔
1940
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
48,535✔
1941
  if (tmq == NULL) {
48,544!
1942
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1943
    goto EXIT;
×
1944
  }
1945

1946
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
48,544✔
1947
  if (ret) {
48,541✔
1948
    code = ret;
5✔
1949
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
5!
1950
    goto END;
×
1951
  }
1952

1953
  if (code != 0) {
48,536✔
1954
    goto END;
4,582✔
1955
  }
1956

1957
  if (pMsg->pData == NULL) {
43,954!
1958
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
1959
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1960
    goto END;
×
1961
  }
1962

1963
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
43,954✔
1964
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
43,954✔
1965

1966
  if (msgEpoch != clientEpoch) {
43,946✔
1967
    tqErrorC("consumer:0x%" PRIx64
42!
1968
             " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1969
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
1970
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
42✔
1971
    goto END;
42✔
1972
  }
1973
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
43,904✔
1974
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, requestId);
43,904!
1975
  if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
43,907✔
1976
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
43,689!
1977
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
218✔
1978
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
188!
1979
  } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
30✔
1980
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
11!
1981
  } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
19!
1982
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
19!
1983
  } else {  // invalid rspType
1984
    tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
×
1985
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1986
    goto END;
×
1987
  }
1988
  pRspWrapper->tmqRspType = rspType;
43,887✔
1989
  pRspWrapper->pollRsp.reqId = requestId;
43,887✔
1990
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
43,887✔
1991
  pMsg->pEpSet = NULL;
43,887✔
1992

1993
END:
48,511✔
1994
  if (pRspWrapper) {
48,511!
1995
    pRspWrapper->code = code;
48,511✔
1996
    pRspWrapper->pollRsp.vgId = vgId;
48,511✔
1997
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
48,511✔
1998
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
48,511✔
1999
    if (code != 0) {
48,543!
2000
      tmqFreeRspWrapper(pRspWrapper);
×
2001
      taosFreeQitem(pRspWrapper);
×
2002
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2003
    } else {
2004
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
48,543!
2005
               tmq ? tmq->consumerId : 0, rspType, vgId, taosQueueItemSize(tmq->mqueue), requestId);
2006
    }
2007
  }
2008

2009

2010
  if (tsem2_post(&tmq->rspSem) != 0){
48,543!
2011
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2012
  }
2013
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
48,545✔
2014
  if (ret != 0){
48,545!
2015
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2016
  }
2017

2018
EXIT:
48,545✔
2019
  taosMemoryFreeClear(pMsg->pData);
48,544✔
2020
  taosMemoryFreeClear(pMsg->pEpSet);
48,543✔
2021
  return code;
48,543✔
2022
}
2023

2024
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
48,546✔
2025
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
48,546✔
2026
  pReq->withTbName = tmq->withTbName;
48,546✔
2027
  pReq->consumerId = tmq->consumerId;
48,546✔
2028
  pReq->timeout = timeout;
48,546✔
2029
  pReq->epoch = tmq->epoch;
48,546✔
2030
  pReq->reqOffset = pVg->offsetInfo.endOffset;
48,546✔
2031
  pReq->head.vgId = pVg->vgId;
48,546✔
2032
  pReq->useSnapshot = tmq->useSnapshot;
48,546✔
2033
  pReq->reqId = generateRequestId();
48,546✔
2034
  pReq->enableReplay = tmq->replayEnable;
48,548✔
2035
  pReq->sourceExcluded = tmq->sourceExcluded;
48,548✔
2036
  pReq->enableBatchMeta = tmq->enableBatchMeta;
48,548✔
2037
}
48,548✔
2038

2039
void changeByteEndian(char* pData) {
371,688✔
2040
  if (pData == NULL) {
371,688!
2041
    return;
×
2042
  }
2043
  char* p = pData;
371,688✔
2044

2045
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2046
  // length | version:
2047
  int32_t blockVersion = *(int32_t*)p;
371,688✔
2048
  if (blockVersion != BLOCK_VERSION_1) {
371,688!
2049
    tqErrorC("invalid block version:%d", blockVersion);
×
2050
    return;
×
2051
  }
2052
  *(int32_t*)p = BLOCK_VERSION_2;
371,688✔
2053

2054
  p += sizeof(int32_t);
371,688✔
2055
  p += sizeof(int32_t);
371,688✔
2056
  p += sizeof(int32_t);
371,688✔
2057
  int32_t cols = *(int32_t*)p;
371,688✔
2058
  p += sizeof(int32_t);
371,688✔
2059
  p += sizeof(int32_t);
371,688✔
2060
  p += sizeof(uint64_t);
371,688✔
2061
  // check fields
2062
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
371,688✔
2063

2064
  int32_t* colLength = (int32_t*)p;
371,688✔
2065
  for (int32_t i = 0; i < cols; ++i) {
2,324,135✔
2066
    colLength[i] = htonl(colLength[i]);
1,952,447✔
2067
  }
2068
}
2069

2070
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
727,188✔
2071
  if (pRetrieve == NULL) {
727,188!
2072
    return;
×
2073
  }
2074
  if (*(int64_t*)pRetrieve == 0) {
727,188!
2075
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2076
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2077
    if (precision != NULL) {
×
2078
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2079
    }
2080
  } else if (*(int64_t*)pRetrieve == 1) {
727,188✔
2081
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
727,182✔
2082
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
727,182✔
2083
    if (precision != NULL) {
727,170✔
2084
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
355,525✔
2085
    }
2086
  }
2087
}
2088

2089
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
31,999✔
2090
                                        SMqRspObj* pRspObj) {
2091
  pRspObj->resIter = -1;
31,999✔
2092
  pRspObj->resInfo.totalRows = 0;
31,999✔
2093
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
31,999✔
2094

2095
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
31,999✔
2096
  bool needTransformSchema = !pDataRsp->withSchema;
31,999✔
2097
  if (!pDataRsp->withSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
31,999✔
2098
    pDataRsp->withSchema = true;
30,016✔
2099
    pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
30,016✔
2100
    if (pDataRsp->blockSchema == NULL) {
30,018!
2101
      tqErrorC("failed to allocate memory for blockSchema");
×
2102
      return;
×
2103
    }
2104
  }
2105
  // extract the rows in this data packet
2106
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
403,694✔
2107
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
371,687✔
2108
    void*   rawData = NULL;
371,691✔
2109
    int64_t rows = 0;
371,691✔
2110
    // deal with compatibility
2111
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
371,691✔
2112

2113
    pVg->numOfRows += rows;
371,689✔
2114
    (*numOfRows) += rows;
371,689✔
2115
    changeByteEndian(rawData);
371,689✔
2116
    if (needTransformSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
371,692✔
2117
      SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
251,739!
2118
      if (schema) {
251,739!
2119
        if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
503,478!
2120
          tqErrorC("failed to push schema into blockSchema");
×
2121
          continue;
×
2122
        }
2123
      }
2124
    }
2125
  }
2126
}
2127

2128
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
48,545✔
2129
  SMqPollReq      req = {0};
48,545✔
2130
  char*           msg = NULL;
48,545✔
2131
  SMqPollCbParam* pParam = NULL;
48,545✔
2132
  SMsgSendInfo*   sendInfo = NULL;
48,545✔
2133
  int             code = 0;
48,545✔
2134
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
48,545✔
2135

2136
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
48,545✔
2137
  if (msgSize < 0) {
48,543!
2138
    code = TSDB_CODE_INVALID_MSG;
×
2139
    return code;
×
2140
  }
2141

2142
  msg = taosMemoryCalloc(1, msgSize);
48,543✔
2143
  if (NULL == msg) {
48,542!
2144
    return terrno;
×
2145
  }
2146

2147
  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
48,542!
2148
    code = TSDB_CODE_INVALID_MSG;
×
2149
    taosMemoryFreeClear(msg);
×
2150
    return code;
×
2151
  }
2152

2153
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
48,543✔
2154
  if (pParam == NULL) {
48,543!
2155
    code = terrno;
×
2156
    taosMemoryFreeClear(msg);
×
2157
    return code;
×
2158
  }
2159

2160
  pParam->refId = pTmq->refId;
48,543✔
2161
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
48,543✔
2162
  pParam->vgId = pVg->vgId;
48,543✔
2163
  pParam->requestId = req.reqId;
48,543✔
2164

2165
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
48,543✔
2166
  if (sendInfo == NULL) {
48,543!
2167
    taosMemoryFreeClear(pParam);
×
2168
    taosMemoryFreeClear(msg);
×
2169
    return terrno;
×
2170
  }
2171

2172
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
48,543✔
2173
  sendInfo->requestId = req.reqId;
48,543✔
2174
  sendInfo->requestObjRefId = 0;
48,543✔
2175
  sendInfo->param = pParam;
48,543✔
2176
  sendInfo->paramFreeFp = taosMemoryFree;
48,543✔
2177
  sendInfo->fp = tmqPollCb;
48,543✔
2178
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
48,543✔
2179

2180
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
48,543✔
2181
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
48,543✔
2182
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
48,542✔
2183
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
48,545!
2184
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2185
  if (code != 0) {
48,545!
2186
    return code;
×
2187
  }
2188

2189
  pVg->pollCnt++;
48,545✔
2190
  pVg->seekUpdated = false;  // reset this flag.
48,545✔
2191
  pTmq->pollCnt++;
48,545✔
2192

2193
  return 0;
48,545✔
2194
}
2195

2196
// broadcast the poll request to all related vnodes
2197
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
96,857✔
2198
  int32_t code = 0;
96,857✔
2199

2200
  taosWLockLatch(&tmq->lock);
96,857✔
2201
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
96,860✔
2202
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
96,861!
2203

2204
  for (int i = 0; i < numOfTopics; i++) {
198,079✔
2205
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
101,212✔
2206
    if (pTopic == NULL) {
101,213!
2207
      continue;
×
2208
    }
2209
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
101,213✔
2210
    if (pTopic->noPrivilege) {
101,217✔
2211
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
6!
2212
      continue;
6✔
2213
    }
2214
    for (int j = 0; j < numOfVg; j++) {
391,872✔
2215
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
290,660✔
2216
      if (pVg == NULL) {
290,657!
2217
        continue;
×
2218
      }
2219
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
290,652✔
2220
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than 10ms
290,652!
2221
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
22,363!
2222
                 tmq->epoch, pVg->vgId);
2223
        continue;
22,363✔
2224
      }
2225

2226
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
268,290✔
2227
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
268,290!
2228
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
23!
2229
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2230
        continue;
23✔
2231
      }
2232

2233
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
268,267✔
2234
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
268,277✔
2235
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
219,733✔
2236
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
219,732!
2237
                 pVg->vgId, vgSkipCnt);
2238
        continue;
219,732✔
2239
      }
2240

2241
      atomic_store_32(&pVg->vgSkipCnt, 0);
48,544✔
2242
      code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
48,545✔
2243
      if (code != TSDB_CODE_SUCCESS) {
48,543!
2244
        goto end;
×
2245
      }
2246
    }
2247
  }
2248

2249
end:
96,867✔
2250
  taosWUnLockLatch(&tmq->lock);
96,867✔
2251
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
96,862!
2252
  return code;
96,862✔
2253
}
2254

2255
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
42,930✔
2256
                         int64_t consumerId, bool hasData) {
2257
  if (!pVg->seekUpdated) {
42,930✔
2258
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
42,929!
2259
    if (hasData) {
42,929✔
2260
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
32,209✔
2261
    }
2262
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
42,929✔
2263
  } else {
2264
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
1!
2265
  }
2266

2267
  // update the status
2268
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
42,930✔
2269

2270
  // update the valid wal version range
2271
  pVg->offsetInfo.walVerBegin = sver;
42,931✔
2272
  pVg->offsetInfo.walVerEnd = ever + 1;
42,931✔
2273
}
42,931✔
2274

2275
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
32,205✔
2276
  typedef union {
2277
    SMqDataRsp      dataRsp;
2278
    SMqMetaRsp      metaRsp;
2279
    SMqBatchMetaRsp batchMetaRsp;
2280
  } MEMSIZE;
2281

2282
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
32,205✔
2283
  if (pRspObj == NULL) {
32,208✔
2284
    tqErrorC("buildRsp:failed to allocate memory");
3!
2285
    return NULL;
×
2286
  }
2287
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
32,205✔
2288
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
32,205✔
2289
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
32,205✔
2290
  pRspObj->vgId = pollRspWrapper->vgId;
32,205✔
2291
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
32,205✔
2292
  return pRspObj;
32,205✔
2293
}
2294

2295
static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
4,508✔
2296
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
4,508✔
2297

2298
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
4,508✔
2299
    int32_t code = askEp(tmq, NULL, false, true);
2,873✔
2300
    if (code != 0) {
2,873!
2301
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2302
    }
2303
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1,635✔
2304
    int32_t code = askEp(tmq, NULL, false, false);
12✔
2305
    if (code != 0) {
12!
2306
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2307
    }
2308
  }
2309
  tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
4,508!
2310
          tstrerror(pRspWrapper->code));
2311
  taosWLockLatch(&tmq->lock);
4,508✔
2312
  SMqClientVg* pVg = NULL;
4,508✔
2313
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
4,508✔
2314
  if (pVg) {
4,508✔
2315
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
4,494✔
2316
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
4,494✔
2317
  }
2318
  taosWUnLockLatch(&tmq->lock);
4,508✔
2319
}
4,508✔
2320
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
50,240✔
2321
  SMqRspObj* pRspObj = NULL;
50,240✔
2322

2323
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
50,240✔
2324
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
7,309!
2325
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
7,309✔
2326
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
7,309✔
2327
    return pRspObj;
7,309✔
2328
  }
2329

2330
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
42,931✔
2331
  taosWLockLatch(&tmq->lock);
42,931✔
2332
  SMqClientVg* pVg = NULL;
42,931✔
2333
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
42,931✔
2334
  if(pVg == NULL) {
42,929✔
2335
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
1!
2336
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2337
    goto END;
1✔
2338
  }
2339
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
42,928✔
2340
  if (pollRspWrapper->pEpset != NULL) {
42,930✔
2341
    pVg->epSet = *pollRspWrapper->pEpset;
11✔
2342
  }
2343

2344
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
85,654✔
2345
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
42,724✔
2346
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
42,724✔
2347

2348
    char buf[TSDB_OFFSET_LEN] = {0};
42,725✔
2349
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
42,725✔
2350
    if (pollRspWrapper->dataRsp.blockNum == 0) {
42,721✔
2351
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
10,722!
2352
                   ", total:%" PRId64 ",QID:0x%" PRIx64,
2353
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2354
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
21,443✔
2355
    } else {
2356
      pRspObj = buildRsp(pollRspWrapper);
31,999✔
2357
      if (pRspObj == NULL) {
31,999!
2358
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2359
        goto END;
×
2360
      }
2361
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA;
31,999✔
2362
      int64_t numOfRows = 0;
31,999✔
2363
      tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
31,999✔
2364
      tmq->totalRows += numOfRows;
32,003✔
2365
      pVg->emptyBlockReceiveTs = 0;
32,003✔
2366
      if (tmq->replayEnable) {
32,003✔
2367
        pVg->blockReceiveTs = taosGetTimestampMs();
27✔
2368
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
27✔
2369
        if (pVg->blockSleepForReplay > 0) {
27✔
2370
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
12!
2371
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2372
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2373
          }
2374
        }
2375
      }
2376
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
32,003!
2377
                   ", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64,
2378
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2379
               pollRspWrapper->reqId);
2380
    }
2381
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
206!
2382
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
206✔
2383
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2384

2385

2386
    pRspObj = buildRsp(pollRspWrapper);
206✔
2387
    if (pRspObj == NULL) {
206!
2388
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2389
      goto END;
×
2390
    }
2391
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
206✔
2392
  }
2393

2394
  END:
×
2395
  taosWUnLockLatch(&tmq->lock);
42,931✔
2396
  return pRspObj;
42,932✔
2397
}
2398

2399
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
96,860✔
2400
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
96,860!
2401

2402
  void* returnVal = NULL;
96,862✔
2403
  while (1) {
22,540✔
2404
    SMqRspWrapper* pRspWrapper = NULL;
119,402✔
2405
    if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
119,402✔
2406
      if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
112,648✔
2407
        return NULL;
64,653✔
2408
      }
2409
      if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
47,996!
2410
        return NULL;
×
2411
      }
2412
    }
2413

2414
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
54,745!
2415
    if (pRspWrapper->code != 0) {
54,749✔
2416
      processMqRspError(tmq, pRspWrapper);
4,508✔
2417
    }else{
2418
      returnVal = processMqRsp(tmq, pRspWrapper);
50,241✔
2419
    }
2420
    tmqFreeRspWrapper(pRspWrapper);
54,749✔
2421
    taosFreeQitem(pRspWrapper);
54,748✔
2422
    if(returnVal != NULL){
54,749✔
2423
      break;
32,209✔
2424
    }
2425
  }
2426

2427
  return returnVal;
32,209✔
2428
}
2429

2430
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
32,473✔
2431
  if (tmq == NULL) return NULL;
32,473!
2432

2433
  void*   rspObj = NULL;
32,473✔
2434
  int64_t startTime = taosGetTimestampMs();
32,473✔
2435

2436
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
32,473!
2437
           timeout);
2438

2439
  // in no topic status, delayed task also need to be processed
2440
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
32,473!
2441
    tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
×
2442
    taosMsleep(500);  //     sleep for a while
×
2443
    return NULL;
×
2444
  }
2445

2446
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
32,473✔
2447

2448
  while (1) {
2449
    tmqHandleAllDelayedTask(tmq);
96,859✔
2450

2451
    if (tmqPollImpl(tmq, timeout) < 0) {
96,857!
2452
      tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
×
2453
    }
2454

2455
    rspObj = tmqHandleAllRsp(tmq, timeout);
96,862✔
2456
    if (rspObj) {
96,862✔
2457
      tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
32,209!
2458
      return (TAOS_RES*)rspObj;
32,209✔
2459
    }
2460

2461
    if (timeout >= 0) {
64,653!
2462
      int64_t currentTime = taosGetTimestampMs();
64,653✔
2463
      int64_t elapsedTime = currentTime - startTime;
64,653✔
2464
      if (elapsedTime > timeout || elapsedTime < 0) {
64,653!
2465
        tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
264!
2466
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
2467
        return NULL;
264✔
2468
      }
2469
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
64,389✔
2470
    } else {
2471
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2472
    }
2473
  }
2474
}
2475

2476
static void displayConsumeStatistics(tmq_t* pTmq) {
840✔
2477
  taosRLockLatch(&pTmq->lock);
840✔
2478
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
840✔
2479
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
840!
2480
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2481

2482
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
840!
2483
  for (int32_t i = 0; i < numOfTopics; ++i) {
1,342✔
2484
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
502✔
2485
    if (pTopics == NULL) continue;
502!
2486
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
502!
2487
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
502✔
2488
    for (int32_t j = 0; j < numOfVgs; ++j) {
1,924✔
2489
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
1,422✔
2490
      if (pVg == NULL) continue;
1,422!
2491
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
1,422!
2492
    }
2493
  }
2494
  taosRUnLockLatch(&pTmq->lock);
840✔
2495
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
840!
2496
}
840✔
2497

2498
int32_t tmq_unsubscribe(tmq_t* tmq) {
840✔
2499
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
840!
2500
  int32_t code = 0;
840✔
2501
  int8_t status = atomic_load_8(&tmq->status);
840✔
2502
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
840!
2503

2504
  displayConsumeStatistics(tmq);
840✔
2505
  if (status != TMQ_CONSUMER_STATUS__READY) {
840✔
2506
    tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
6!
2507
    goto END;
6✔
2508
  }
2509
  if (tmq->autoCommit) {
834✔
2510
    code = tmq_commit_sync(tmq, NULL);
408✔
2511
    if (code != 0) {
408!
2512
       goto END;
×
2513
    }
2514
  }
2515
  tmqSendHbReq((void*)(tmq->refId), NULL);
834✔
2516

2517
  tmq_list_t* lst = tmq_list_new();
834✔
2518
  if (lst == NULL) {
834!
2519
    code = TSDB_CODE_OUT_OF_MEMORY;
×
2520
    goto END;
×
2521
  }
2522
  code = tmq_subscribe(tmq, lst);
834✔
2523
  tmq_list_destroy(lst);
834✔
2524
  if(code != 0){
834!
2525
    goto END;
×
2526
  }
2527

2528
END:
834✔
2529
  return code;
840✔
2530
}
2531

2532
int32_t tmq_consumer_close(tmq_t* tmq) {
446✔
2533
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
446!
2534
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
446!
2535
  int32_t code = tmq_unsubscribe(tmq);
446✔
2536
  if (code == 0) {
446!
2537
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
446✔
2538
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
446✔
2539
    if (code != 0){
446!
2540
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2541
    }
2542
  }
2543
  return code;
446✔
2544
}
2545

2546
const char* tmq_err2str(int32_t err) {
239✔
2547
  if (err == 0) {
239✔
2548
    return "success";
221✔
2549
  } else if (err == -1) {
18!
2550
    return "fail";
×
2551
  } else {
2552
    if (*(taosGetErrMsg()) == 0) {
18✔
2553
      return tstrerror(err);
5✔
2554
    } else {
2555
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
13✔
2556
      return (const char*)taosGetErrMsgReturn();
13✔
2557
    }
2558
  }
2559
}
2560

2561
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
32,198✔
2562
  if (res == NULL) {
32,198✔
2563
    return TMQ_RES_INVALID;
11✔
2564
  }
2565
  if (TD_RES_TMQ(res)) {
32,187✔
2566
    return TMQ_RES_DATA;
31,977✔
2567
  } else if (TD_RES_TMQ_META(res)) {
210✔
2568
    return TMQ_RES_TABLE_META;
173✔
2569
  } else if (TD_RES_TMQ_METADATA(res)) {
37✔
2570
    return TMQ_RES_METADATA;
21✔
2571
  } else if (TD_RES_TMQ_BATCH_META(res)) {
16!
2572
    return TMQ_RES_TABLE_META;
17✔
2573
  } else {
2574
    return TMQ_RES_INVALID;
×
2575
  }
2576
}
2577

2578
const char* tmq_get_topic_name(TAOS_RES* res) {
31,519✔
2579
  if (res == NULL) {
31,519✔
2580
    return NULL;
3✔
2581
  }
2582
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
31,516✔
2583
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
180!
2584
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
31,516✔
2585
    if (tmp == NULL) {
31,516!
2586
      return NULL;
×
2587
    }
2588
    return tmp + 1;
31,516✔
2589
  } else {
2590
    return NULL;
×
2591
  }
2592
}
2593

2594
const char* tmq_get_db_name(TAOS_RES* res) {
31,524✔
2595
  if (res == NULL) {
31,524✔
2596
    return NULL;
8✔
2597
  }
2598

2599
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
31,516✔
2600
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
182!
2601
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
31,516✔
2602
    if (tmp == NULL) {
31,516!
2603
      return NULL;
×
2604
    }
2605
    return tmp + 1;
31,516✔
2606
  } else {
2607
    return NULL;
×
2608
  }
2609
}
2610

2611
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
31,524✔
2612
  if (res == NULL) {
31,524✔
2613
    return TSDB_CODE_INVALID_PARA;
10✔
2614
  }
2615
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
31,514✔
2616
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
180!
2617
    return ((SMqRspObj*)res)->vgId;
31,514✔
2618
  } else {
2619
    return TSDB_CODE_INVALID_PARA;
×
2620
  }
2621
}
2622

2623
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
429✔
2624
  if (res == NULL) {
429✔
2625
    return TSDB_CODE_INVALID_PARA;
13✔
2626
  }
2627
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
416!
2628
    SMqRspObj* pRspObj = (SMqRspObj*)res;
402✔
2629
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
402✔
2630
    if (pOffset->type == TMQ_OFFSET__LOG) {
402!
2631
      return pOffset->version;
402✔
2632
    } else {
2633
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2634
    }
2635
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
14!
2636
    SMqRspObj* pRspObj = (SMqRspObj*)res;
14✔
2637
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
14!
2638
      return pRspObj->rspOffset.version;
14✔
2639
    }
2640
  } else {
2641
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2642
  }
2643

2644
  // data from tsdb, no valid offset info
2645
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2646
}
2647

2648
const char* tmq_get_table_name(TAOS_RES* res) {
15,219,618✔
2649
  if (res == NULL) {
15,219,618✔
2650
    return NULL;
9✔
2651
  }
2652
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
15,219,609✔
2653
    SMqRspObj* pRspObj = (SMqRspObj*)res;
15,219,607✔
2654
    SMqDataRsp* data = &pRspObj->dataRsp;
15,219,607✔
2655
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
15,219,607!
2656
        pRspObj->resIter >= data->blockNum) {
4,323,460✔
2657
      return NULL;
10,896,154✔
2658
    }
2659
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
4,323,453✔
2660
  }
2661
  return NULL;
2✔
2662
}
2663

2664
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
24✔
2665
  if (tmq == NULL) {
24!
2666
    tqErrorC("invalid tmq handle, null");
×
2667
    if (cb != NULL) {
×
2668
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2669
    }
2670
    return;
×
2671
  }
2672
  if (pRes == NULL) {  // here needs to commit all offsets.
24!
2673
    asyncCommitAllOffsets(tmq, cb, param);
24✔
2674
  } else {  // only commit one offset
2675
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2676
  }
2677
}
2678

2679
static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) {
739✔
2680
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
739✔
2681
  pInfo->code = code;
739✔
2682
  if (tsem2_post(&pInfo->sem) != 0){
739!
2683
    tqErrorC("failed to post rsp sem in commit cb");
×
2684
  }
2685
}
739✔
2686

2687
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
737✔
2688
  if (tmq == NULL) {
737!
2689
    tqErrorC("invalid tmq handle, null");
×
2690
    return TSDB_CODE_INVALID_PARA;
×
2691
  }
2692

2693
  int32_t code = 0;
737✔
2694

2695
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
737✔
2696
  if (pInfo == NULL) {
737!
2697
    tqErrorC("failed to allocate memory for sync commit");
×
2698
    return terrno;
×
2699
  }
2700
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
737!
2701
    tqErrorC("failed to init sem for sync commit");
×
2702
    taosMemoryFree(pInfo);
×
2703
    return TSDB_CODE_OUT_OF_MEMORY;
×
2704
  }
2705
  pInfo->code = 0;
737✔
2706

2707
  if (pRes == NULL) {
737✔
2708
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
708✔
2709
  } else {
2710
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
29✔
2711
  }
2712

2713
  if (tsem2_wait(&pInfo->sem) != 0){
737!
2714
    tqErrorC("failed to wait sem for sync commit");
×
2715
  }
2716
  code = pInfo->code;
737✔
2717

2718
  if(tsem2_destroy(&pInfo->sem) != 0) {
737!
2719
    tqErrorC("failed to destroy sem for sync commit");
×
2720
  }
2721
  taosMemoryFree(pInfo);
737✔
2722

2723
  tqInfoC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
737!
2724
  return code;
737✔
2725
}
2726

2727
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2728
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
41✔
2729
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
41!
2730
    tqErrorC("Assignment or poll interface need to be called first");
8!
2731
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
8✔
2732
  }
2733

2734
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
33!
2735
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
1!
2736
             offset->walVerBegin, offset->walVerEnd);
2737
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
1✔
2738
  }
2739

2740
  return 0;
32✔
2741
}
2742

2743
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
30✔
2744
  if (tmq == NULL || pTopicName == NULL) {
30!
2745
    tqErrorC("invalid tmq handle, null");
×
2746
    return TSDB_CODE_INVALID_PARA;
×
2747
  }
2748

2749
  int32_t accId = tmq->pTscObj->acctId;
30✔
2750
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
30✔
2751
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
30✔
2752

2753
  taosWLockLatch(&tmq->lock);
30✔
2754
  SMqClientVg* pVg = NULL;
30✔
2755
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
30✔
2756
  if (code != 0) {
30✔
2757
    taosWUnLockLatch(&tmq->lock);
18✔
2758
    return code;
18✔
2759
  }
2760

2761
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
12✔
2762
  code = checkWalRange(pOffsetInfo, offset);
12✔
2763
  if (code != 0) {
12✔
2764
    taosWUnLockLatch(&tmq->lock);
2✔
2765
    return code;
2✔
2766
  }
2767
  taosWUnLockLatch(&tmq->lock);
10✔
2768

2769
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
10✔
2770

2771
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
10✔
2772
  if (pInfo == NULL) {
10!
2773
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2774
    return terrno;
×
2775
  }
2776

2777
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
10!
2778
    taosMemoryFree(pInfo);
×
2779
    return TSDB_CODE_OUT_OF_MEMORY;
×
2780
  }
2781
  pInfo->code = 0;
10✔
2782

2783
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
10✔
2784
  if (code == 0) {
10✔
2785
    if (tsem2_wait(&pInfo->sem) != 0){
2!
2786
      tqErrorC("failed to wait sem for sync commit offset");
×
2787
    }
2788
    code = pInfo->code;
2✔
2789
  }
2790

2791
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
10✔
2792
  if(tsem2_destroy(&pInfo->sem) != 0) {
10!
2793
    tqErrorC("failed to destroy sem for sync commit offset");
×
2794
  }
2795
  taosMemoryFree(pInfo);
10✔
2796

2797
  tqInfoC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
10!
2798
          offset, tstrerror(code));
2799

2800
  return code;
10✔
2801
}
2802

2803
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
20✔
2804
                             void* param) {
2805
  int32_t code = 0;
20✔
2806
  if (tmq == NULL || pTopicName == NULL) {
20!
2807
    tqErrorC("invalid tmq handle, null");
×
2808
    code = TSDB_CODE_INVALID_PARA;
×
2809
    goto end;
×
2810
  }
2811

2812
  int32_t accId = tmq->pTscObj->acctId;
20✔
2813
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
20✔
2814
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
20✔
2815

2816
  taosWLockLatch(&tmq->lock);
20✔
2817
  SMqClientVg* pVg = NULL;
20✔
2818
  code = getClientVg(tmq, tname, vgId, &pVg);
20✔
2819
  if (code != 0) {
20✔
2820
    taosWUnLockLatch(&tmq->lock);
14✔
2821
    goto end;
14✔
2822
  }
2823

2824
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
6✔
2825
  code = checkWalRange(pOffsetInfo, offset);
6✔
2826
  if (code != 0) {
6✔
2827
    taosWUnLockLatch(&tmq->lock);
2✔
2828
    goto end;
2✔
2829
  }
2830
  taosWUnLockLatch(&tmq->lock);
4✔
2831

2832
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
4✔
2833

2834
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
4✔
2835

2836
  tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
4!
2837
          offset, tstrerror(code));
2838

2839
end:
×
2840
  if (code != 0 && cb != NULL) {
20!
2841
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
2842
    cb(tmq, code, param);
×
2843
  }
2844
}
20✔
2845

2846
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
387,469✔
2847
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
387,469✔
2848
  SMqDataRsp* data = &pRspObj->dataRsp;
387,469✔
2849

2850
  pRspObj->resIter++;
387,469✔
2851
  if (pRspObj->resIter < data->blockNum) {
387,469✔
2852
    if (data->withSchema) {
355,543!
2853
      doFreeReqResultInfo(&pRspObj->resInfo);
355,543✔
2854
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
355,547✔
2855
      if (pSW) {
355,541!
2856
        TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols));
355,541!
2857
      }
2858
    }
2859

2860
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
355,537✔
2861
    void*   rawData = NULL;
355,534✔
2862
    int64_t rows = 0;
355,534✔
2863
    int32_t precision = 0;
355,534✔
2864
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
355,534✔
2865

2866
    pRspObj->resInfo.pData = rawData;
355,521✔
2867
    pRspObj->resInfo.numOfRows = rows;
355,521✔
2868
    pRspObj->resInfo.current = 0;
355,521✔
2869
    pRspObj->resInfo.precision = precision;
355,521✔
2870

2871
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
355,521✔
2872
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4);
355,521✔
2873
    if (code != 0) {
355,534!
2874
      return code;
×
2875
    }
2876
    *pResInfo = &pRspObj->resInfo;
355,534✔
2877
    return code;
355,534✔
2878
  }
2879

2880
  return TSDB_CODE_TSC_INTERNAL_ERROR;
31,926✔
2881
}
2882

2883
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
3✔
2884
  if (param == NULL) {
3!
2885
    return code;
×
2886
  }
2887
  SMqVgWalInfoParam* pParam = param;
3✔
2888
  SMqVgCommon*       pCommon = pParam->pCommon;
3✔
2889

2890
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
3✔
2891
  if (code != TSDB_CODE_SUCCESS) {
3!
2892
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2893
             pParam->vgId, pCommon->pTopicName);
2894

2895
  } else {
2896
    SMqDataRsp rsp = {0};
3✔
2897
    SDecoder   decoder = {0};
3✔
2898
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
3✔
2899
    code = tDecodeMqDataRsp(&decoder, &rsp);
3✔
2900
    tDecoderClear(&decoder);
3✔
2901
    if (code != 0) {
3!
2902
      goto END;
×
2903
    }
2904

2905
    SMqRspHead*          pHead = pMsg->pData;
3✔
2906
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
3✔
2907
                                       .end = pHead->walever + 1,
3✔
2908
                                       .currentOffset = rsp.rspOffset.version,
3✔
2909
                                       .vgId = pParam->vgId};
3✔
2910

2911
    (void)taosThreadMutexLock(&pCommon->mutex);
3✔
2912
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
6!
2913
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2914
               pParam->vgId, pCommon->pTopicName);
2915
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2916
    }
2917
    (void)taosThreadMutexUnlock(&pCommon->mutex);
3✔
2918
  }
2919

2920
END:
3✔
2921
  pCommon->code = code;
3✔
2922
  if (total == pParam->totalReq) {
3✔
2923
    if (tsem2_post(&pCommon->rsp) != 0) {
2!
2924
      tqErrorC("failed to post semaphore in get wal cb");
×
2925
    }
2926
  }
2927

2928
  if (pMsg) {
3!
2929
    taosMemoryFree(pMsg->pData);
3✔
2930
    taosMemoryFree(pMsg->pEpSet);
3✔
2931
  }
2932

2933
  return code;
3✔
2934
}
2935

2936
static void destroyCommonInfo(SMqVgCommon* pCommon) {
7✔
2937
  if (pCommon == NULL) {
7✔
2938
    return;
5✔
2939
  }
2940
  taosArrayDestroy(pCommon->pList);
2✔
2941
  if(tsem2_destroy(&pCommon->rsp) != 0) {
2!
2942
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
2943
  }
2944
  (void)taosThreadMutexDestroy(&pCommon->mutex);
2✔
2945
  taosMemoryFree(pCommon->pTopicName);
2✔
2946
  taosMemoryFree(pCommon);
2✔
2947
}
2948

2949
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
76✔
2950
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
76!
2951
    return true;
×
2952
  }
2953
  return false;
76✔
2954
}
2955

2956
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
1✔
2957
  SMqCommittedParam* pParam = param;
1✔
2958

2959
  if (code != 0) {
1!
2960
    goto end;
×
2961
  }
2962
  if (pMsg) {
1!
2963
    SDecoder decoder = {0};
1✔
2964
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
1✔
2965
    if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) {
1!
2966
      tOffsetDestroy(&pParam->vgOffset.offset);
×
2967
      code = TSDB_CODE_OUT_OF_MEMORY;
×
2968
      goto end;
×
2969
    }
2970
    tDecoderClear(&decoder);
1✔
2971
  }
2972

2973
end:
×
2974
  if (pMsg) {
1!
2975
    taosMemoryFree(pMsg->pData);
1✔
2976
    taosMemoryFree(pMsg->pEpSet);
1✔
2977
  }
2978
  pParam->code = code;
1✔
2979
  if (tsem2_post(&pParam->sem) != 0){
1!
2980
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
2981
  }
2982
  return code;
1✔
2983
}
2984

2985
int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) {
1✔
2986
  int32_t     code = 0;
1✔
2987
  SMqVgOffset pOffset = {0};
1✔
2988

2989
  pOffset.consumerId = tmq->consumerId;
1✔
2990
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
1✔
2991

2992
  int32_t len = 0;
1✔
2993
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
1!
2994
  if (code < 0) {
1!
2995
    return TSDB_CODE_INVALID_PARA;
×
2996
  }
2997

2998
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
1✔
2999
  if (buf == NULL) {
1!
3000
    return terrno;
×
3001
  }
3002

3003
  ((SMsgHead*)buf)->vgId = htonl(vgId);
1✔
3004

3005
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
3006

3007
  SEncoder encoder = {0};
1✔
3008
  tEncoderInit(&encoder, abuf, len);
1✔
3009
  code = tEncodeMqVgOffset(&encoder, &pOffset);
1✔
3010
  if (code < 0) {
1!
3011
    taosMemoryFree(buf);
×
3012
    tEncoderClear(&encoder);
×
3013
    return code;
×
3014
  }
3015
  tEncoderClear(&encoder);
1✔
3016

3017
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1✔
3018
  if (sendInfo == NULL) {
1!
3019
    taosMemoryFree(buf);
×
3020
    return terrno;
×
3021
  }
3022

3023
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
1✔
3024
  if (pParam == NULL) {
1!
3025
    taosMemoryFree(buf);
×
3026
    taosMemoryFree(sendInfo);
×
3027
    return terrno;
×
3028
  }
3029
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
1!
3030
    taosMemoryFree(buf);
×
3031
    taosMemoryFree(sendInfo);
×
3032
    taosMemoryFree(pParam);
×
3033
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3034
  }
3035

3036
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
1✔
3037
  sendInfo->requestId = generateRequestId();
1✔
3038
  sendInfo->requestObjRefId = 0;
1✔
3039
  sendInfo->param = pParam;
1✔
3040
  sendInfo->fp = tmCommittedCb;
1✔
3041
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
1✔
3042

3043
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
1✔
3044
  if (code != 0) {
1!
3045
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3046
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3047
    }
3048
    taosMemoryFree(pParam);
×
3049
    return code;
×
3050
  }
3051

3052
  if (tsem2_wait(&pParam->sem) != 0){
1!
3053
    tqErrorC("failed to wait semaphore in get committed from server");
×
3054
  }
3055
  code = pParam->code;
1✔
3056
  if (code == TSDB_CODE_SUCCESS) {
1!
3057
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
1!
3058
      code = pParam->vgOffset.offset.val.version;
1✔
3059
    } else {
3060
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3061
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3062
    }
3063
  }
3064
  if(tsem2_destroy(&pParam->sem) != 0) {
1!
3065
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3066
  }
3067
  taosMemoryFree(pParam);
1✔
3068

3069
  return code;
1✔
3070
}
3071

3072
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
26✔
3073
  if (tmq == NULL || pTopicName == NULL) {
26!
3074
    tqErrorC("invalid tmq handle, null");
×
3075
    return TSDB_CODE_INVALID_PARA;
×
3076
  }
3077

3078
  int32_t accId = tmq->pTscObj->acctId;
26✔
3079
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
26✔
3080
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
26✔
3081

3082
  taosWLockLatch(&tmq->lock);
26✔
3083

3084
  SMqClientVg* pVg = NULL;
26✔
3085
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
26✔
3086
  if (code != 0) {
26✔
3087
    taosWUnLockLatch(&tmq->lock);
12✔
3088
    return code;
12✔
3089
  }
3090

3091
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
14✔
3092
  int32_t        type = pOffsetInfo->endOffset.type;
14✔
3093
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
14!
3094
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3095
    taosWUnLockLatch(&tmq->lock);
×
3096
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3097
  }
3098

3099
  code = checkWalRange(pOffsetInfo, -1);
14✔
3100
  if (code != 0) {
14!
3101
    taosWUnLockLatch(&tmq->lock);
×
3102
    return code;
×
3103
  }
3104
  SEpSet  epSet = pVg->epSet;
14✔
3105
  int64_t begin = pVg->offsetInfo.walVerBegin;
14✔
3106
  int64_t end = pVg->offsetInfo.walVerEnd;
14✔
3107
  taosWUnLockLatch(&tmq->lock);
14✔
3108

3109
  int64_t position = 0;
14✔
3110
  if (type == TMQ_OFFSET__LOG) {
14✔
3111
    position = pOffsetInfo->endOffset.version;
13✔
3112
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
1!
3113
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3114
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
1!
3115
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3116
        position = begin;
×
3117
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3118
        position = end;
×
3119
      }
3120
    } else {
3121
      position = code;
1✔
3122
    }
3123
  } else {
3124
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3125
  }
3126

3127
  tqInfoC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
14!
3128
  return position;
14✔
3129
}
3130

3131
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
34✔
3132
  if (tmq == NULL || pTopicName == NULL) {
34!
3133
    tqErrorC("invalid tmq handle, null");
×
3134
    return TSDB_CODE_INVALID_PARA;
×
3135
  }
3136

3137
  int32_t accId = tmq->pTscObj->acctId;
34✔
3138
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
34✔
3139
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
34✔
3140

3141
  taosWLockLatch(&tmq->lock);
34✔
3142

3143
  SMqClientVg* pVg = NULL;
34✔
3144
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
34✔
3145
  if (code != 0) {
34✔
3146
    taosWUnLockLatch(&tmq->lock);
14✔
3147
    return code;
14✔
3148
  }
3149

3150
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
20✔
3151
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
20!
3152
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3153
             pOffsetInfo->endOffset.type);
3154
    taosWUnLockLatch(&tmq->lock);
×
3155
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3156
  }
3157

3158
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
20!
3159
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3160
             pOffsetInfo->committedOffset.type);
3161
    taosWUnLockLatch(&tmq->lock);
×
3162
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3163
  }
3164

3165
  int64_t committed = 0;
20✔
3166
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
20!
3167
    committed = pOffsetInfo->committedOffset.version;
20✔
3168
    taosWUnLockLatch(&tmq->lock);
20✔
3169
    goto end;
20✔
3170
  }
3171
  SEpSet epSet = pVg->epSet;
×
3172
  taosWUnLockLatch(&tmq->lock);
×
3173

3174
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
×
3175

3176
end:
20✔
3177
  tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
20!
3178
  return committed;
20✔
3179
}
3180

3181
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
27✔
3182
                                 int32_t* numOfAssignment) {
3183
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
27!
3184
    tqErrorC("invalid tmq handle, null");
20!
3185
    return TSDB_CODE_INVALID_PARA;
20✔
3186
  }
3187
  *numOfAssignment = 0;
7✔
3188
  *assignment = NULL;
7✔
3189
  SMqVgCommon* pCommon = NULL;
7✔
3190

3191
  int32_t accId = tmq->pTscObj->acctId;
7✔
3192
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
7✔
3193
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
7✔
3194

3195
  taosWLockLatch(&tmq->lock);
7✔
3196

3197
  SMqClientTopic* pTopic = NULL;
7✔
3198
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
7✔
3199
  if (code != 0) {
7!
3200
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
3201
    goto end;
×
3202
  }
3203

3204
  // in case of snapshot is opened, no valid offset will return
3205
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
7✔
3206
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
20✔
3207
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
13✔
3208
    if (pClientVg == NULL) {
13!
3209
      continue;
×
3210
    }
3211
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
13✔
3212
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
13!
3213
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3214
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3215
      goto end;
×
3216
    }
3217
  }
3218

3219
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
7✔
3220
  if (*assignment == NULL) {
7!
3221
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3222
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3223
    code = terrno;
×
3224
    goto end;
×
3225
  }
3226

3227
  bool needFetch = false;
7✔
3228

3229
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
18✔
3230
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
13✔
3231
    if (pClientVg == NULL) {
13!
3232
      continue;
×
3233
    }
3234
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
13✔
3235
      needFetch = true;
2✔
3236
      break;
2✔
3237
    }
3238

3239
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
11✔
3240
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
11✔
3241
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
11✔
3242
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
11✔
3243
    pAssignment->vgId = pClientVg->vgId;
11✔
3244
    tqInfoC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
11!
3245
            pAssignment->currentOffset);
3246
  }
3247

3248
  if (needFetch) {
7✔
3249
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
2✔
3250
    if (pCommon == NULL) {
2!
3251
      code = terrno;
×
3252
      goto end;
×
3253
    }
3254

3255
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
2✔
3256
    if (pCommon->pList == NULL) {
2!
3257
      code = terrno;
×
3258
      goto end;
×
3259
    }
3260
    if (tsem2_init(&pCommon->rsp, 0, 0) != 0) {
2!
3261
      code = TSDB_CODE_OUT_OF_MEMORY;
×
3262
      goto end;
×
3263
    }
3264
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
2✔
3265
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
2✔
3266
    if (pCommon->pTopicName == NULL) {
2!
3267
      code = terrno;
×
3268
      goto end;
×
3269
    }
3270
    pCommon->consumerId = tmq->consumerId;
2✔
3271

3272
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
5✔
3273
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
3✔
3274
      if (pClientVg == NULL) {
3!
3275
        continue;
×
3276
      }
3277
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
3✔
3278
      if (pParam == NULL) {
3!
3279
        code = terrno;
×
3280
        goto end;
×
3281
      }
3282

3283
      pParam->epoch = tmq->epoch;
3✔
3284
      pParam->vgId = pClientVg->vgId;
3✔
3285
      pParam->totalReq = *numOfAssignment;
3✔
3286
      pParam->pCommon = pCommon;
3✔
3287

3288
      SMqPollReq req = {0};
3✔
3289
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
3✔
3290
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
3✔
3291

3292
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
3✔
3293
      if (msgSize < 0) {
3!
3294
        taosMemoryFree(pParam);
×
3295
        code = TSDB_CODE_OUT_OF_MEMORY;
×
3296
        goto end;
×
3297
      }
3298

3299
      char* msg = taosMemoryCalloc(1, msgSize);
3✔
3300
      if (NULL == msg) {
3!
3301
        taosMemoryFree(pParam);
×
3302
        code = terrno;
×
3303
        goto end;
×
3304
      }
3305

3306
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
3!
3307
        taosMemoryFree(msg);
×
3308
        taosMemoryFree(pParam);
×
3309
        code = TSDB_CODE_OUT_OF_MEMORY;
×
3310
        goto end;
×
3311
      }
3312

3313
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3✔
3314
      if (sendInfo == NULL) {
3!
3315
        taosMemoryFree(pParam);
×
3316
        taosMemoryFree(msg);
×
3317
        code = terrno;
×
3318
        goto end;
×
3319
      }
3320

3321
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
3✔
3322
      sendInfo->requestId = req.reqId;
3✔
3323
      sendInfo->requestObjRefId = 0;
3✔
3324
      sendInfo->param = pParam;
3✔
3325
      sendInfo->paramFreeFp = taosMemoryFree;
3✔
3326
      sendInfo->fp = tmqGetWalInfoCb;
3✔
3327
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
3✔
3328

3329
      // int64_t transporterId = 0;
3330
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
3✔
3331
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
3✔
3332

3333
      tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
3!
3334
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3335
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
3✔
3336
      if (code != 0) {
3!
3337
        goto end;
×
3338
      }
3339
    }
3340

3341
    if (tsem2_wait(&pCommon->rsp) != 0){
2!
3342
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3343
    }
3344
    code = pCommon->code;
2✔
3345

3346
    if (code != TSDB_CODE_SUCCESS) {
2!
3347
      goto end;
×
3348
    }
3349
    int32_t num = taosArrayGetSize(pCommon->pList);
2✔
3350
    for (int32_t i = 0; i < num; ++i) {
5✔
3351
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
3✔
3352
    }
3353
    *numOfAssignment = num;
2✔
3354

3355
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5✔
3356
      tmq_topic_assignment* p = &(*assignment)[j];
3✔
3357

3358
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
8✔
3359
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
5✔
3360
        if (pClientVg == NULL) {
5!
3361
          continue;
×
3362
        }
3363
        if (pClientVg->vgId != p->vgId) {
5✔
3364
          continue;
2✔
3365
        }
3366

3367
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
3✔
3368
        tqInfoC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
3!
3369
                p->vgId, p->currentOffset);
3370

3371
        pOffsetInfo->walVerBegin = p->begin;
3✔
3372
        pOffsetInfo->walVerEnd = p->end;
3✔
3373
      }
3374
    }
3375
  }
3376

3377
end:
7✔
3378
  if (code != TSDB_CODE_SUCCESS) {
7!
3379
    taosMemoryFree(*assignment);
×
3380
    *assignment = NULL;
×
3381
    *numOfAssignment = 0;
×
3382
  }
3383
  destroyCommonInfo(pCommon);
7✔
3384
  taosWUnLockLatch(&tmq->lock);
7✔
3385
  return code;
7✔
3386
}
3387

3388
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
28✔
3389
  if (pAssignment == NULL) {
28✔
3390
    return;
21✔
3391
  }
3392

3393
  taosMemoryFree(pAssignment);
7✔
3394
}
3395

3396
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
4✔
3397
  if (pMsg) {
4!
3398
    taosMemoryFree(pMsg->pData);
4✔
3399
    taosMemoryFree(pMsg->pEpSet);
4✔
3400
  }
3401
  if (param == NULL) {
4!
3402
    return code;
×
3403
  }
3404
  SMqSeekParam* pParam = param;
4✔
3405
  pParam->code = code;
4✔
3406
  if (tsem2_post(&pParam->sem) != 0){
4!
3407
    tqErrorC("failed to post sem in tmqSeekCb");
×
3408
  }
3409
  return 0;
4✔
3410
}
3411

3412
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3413
// there is no data to poll
3414
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
36✔
3415
  if (tmq == NULL || pTopicName == NULL) {
36!
3416
    tqErrorC("invalid tmq handle, null");
×
3417
    return TSDB_CODE_INVALID_PARA;
×
3418
  }
3419

3420
  int32_t accId = tmq->pTscObj->acctId;
36✔
3421
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
36✔
3422
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
36✔
3423

3424
  taosWLockLatch(&tmq->lock);
36✔
3425

3426
  SMqClientVg* pVg = NULL;
36✔
3427
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
36✔
3428
  if (code != 0) {
36✔
3429
    taosWUnLockLatch(&tmq->lock);
27✔
3430
    return code;
27✔
3431
  }
3432

3433
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
9✔
3434

3435
  int32_t type = pOffsetInfo->endOffset.type;
9✔
3436
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
9!
3437
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3438
    taosWUnLockLatch(&tmq->lock);
×
3439
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3440
  }
3441

3442
  code = checkWalRange(pOffsetInfo, offset);
9✔
3443
  if (code != 0) {
9✔
3444
    taosWUnLockLatch(&tmq->lock);
5✔
3445
    return code;
5✔
3446
  }
3447

3448
  tqInfoC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
4!
3449
  // update the offset, and then commit to vnode
3450
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
4✔
3451
  pOffsetInfo->endOffset.version = offset;
4✔
3452
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
4✔
3453
  pVg->seekUpdated = true;
4✔
3454
  SEpSet epSet = pVg->epSet;
4✔
3455
  taosWUnLockLatch(&tmq->lock);
4✔
3456

3457
  SMqSeekReq req = {0};
4✔
3458
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
4✔
3459
  req.head.vgId = vgId;
4✔
3460
  req.consumerId = tmq->consumerId;
4✔
3461

3462
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
4✔
3463
  if (msgSize < 0) {
4!
3464
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3465
  }
3466

3467
  char* msg = taosMemoryCalloc(1, msgSize);
4✔
3468
  if (NULL == msg) {
4!
3469
    return terrno;
×
3470
  }
3471

3472
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
4!
3473
    taosMemoryFree(msg);
×
3474
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3475
  }
3476

3477
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4✔
3478
  if (sendInfo == NULL) {
4!
3479
    taosMemoryFree(msg);
×
3480
    return terrno;
×
3481
  }
3482

3483
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
4✔
3484
  if (pParam == NULL) {
4!
3485
    taosMemoryFree(msg);
×
3486
    taosMemoryFree(sendInfo);
×
3487
    return terrno;
×
3488
  }
3489
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
4!
3490
    taosMemoryFree(msg);
×
3491
    taosMemoryFree(sendInfo);
×
3492
    taosMemoryFree(pParam);
×
3493
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3494
  }
3495

3496
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
4✔
3497
  sendInfo->requestId = generateRequestId();
4✔
3498
  sendInfo->requestObjRefId = 0;
4✔
3499
  sendInfo->param = pParam;
4✔
3500
  sendInfo->fp = tmqSeekCb;
4✔
3501
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
4✔
3502

3503
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
4✔
3504
  if (code != 0) {
4!
3505
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3506
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3507
    }
3508
    taosMemoryFree(pParam);
×
3509
    return code;
×
3510
  }
3511

3512
  if (tsem2_wait(&pParam->sem) != 0){
4!
3513
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3514
  }
3515
  code = pParam->code;
4✔
3516
  if(tsem2_destroy(&pParam->sem) != 0) {
4!
3517
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3518
  }
3519
  taosMemoryFree(pParam);
4✔
3520

3521
  tqInfoC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
4!
3522

3523
  return code;
4✔
3524
}
3525

3526
TAOS* tmq_get_connect(tmq_t* tmq) {
12✔
3527
  if (tmq && tmq->pTscObj) {
12!
3528
    return (TAOS*)(&(tmq->pTscObj->id));
12✔
3529
  }
3530
  return NULL;
×
3531
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc