• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.61
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tarray.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsg.h"
25
#include "tqueue.h"
26
#include "tref.h"
27
#include "ttimer.h"
28

29
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO  || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  INFO  ", DEBUG_INFO,  tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
31
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
32
#define tqWarnC(...)  do { if (cDebugFlag & DEBUG_WARN  || tqClientDebugFlag & DEBUG_WARN)  { taosPrintLog("TQ  WARN  ", DEBUG_WARN,  tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
33

34
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
35
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
36
#define DEFAULT_HEARTBEAT_INTERVAL     3000
37
#define DEFAULT_ASKEP_INTERVAL         1000
38
#define DEFAULT_COMMIT_CNT             1
39
#define SUBSCRIBE_RETRY_MAX_COUNT      240
40
#define SUBSCRIBE_RETRY_INTERVAL       500
41

42

43
#define SET_ERROR_MSG_TMQ(MSG) \
44
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, "%s", MSG);
45

46
#define PROCESS_POLL_RSP(FUNC,DATA) \
47
  SDecoder decoder = {0}; \
48
  tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \
49
  if (FUNC(&decoder, DATA) < 0) { \
50
    tDecoderClear(&decoder); \
51
    code = terrno; \
52
    goto END;\
53
  }\
54
  tDecoderClear(&decoder);\
55
  (void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
56

57
#define DELETE_POLL_RSP(FUNC,DATA) \
58
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
59
  taosMemoryFreeClear(pRsp->pEpset);             \
60
  taosMemoryFreeClear(pRsp->data);               \
61
  FUNC(DATA);
62

63
enum {
64
  TMQ_VG_STATUS__IDLE = 0,
65
  TMQ_VG_STATUS__WAIT,
66
};
67

68
enum {
69
  TMQ_CONSUMER_STATUS__INIT = 0,
70
  TMQ_CONSUMER_STATUS__READY,
71
  TMQ_CONSUMER_STATUS__CLOSED,
72
};
73

74
enum {
75
  TMQ_DELAYED_TASK__ASK_EP = 1,
76
  TMQ_DELAYED_TASK__COMMIT,
77
};
78

79
typedef struct {
80
  tmr_h               timer;
81
  int32_t             rsetId;
82
  TdThreadMutex       lock;
83
} SMqMgmt;
84

85
struct tmq_list_t {
86
  SArray container;
87
};
88

89
struct tmq_conf_t {
90
  char           clientId[TSDB_CLIENT_ID_LEN];
91
  char           groupId[TSDB_CGROUP_LEN];
92
  int8_t         autoCommit;
93
  int8_t         enableWalMarker;
94
  int8_t         resetOffset;
95
  int8_t         withTbName;
96
  int8_t         snapEnable;
97
  int8_t         replayEnable;
98
  int8_t         sourceExcluded;  // do not consume, bit
99
  int8_t         rawData;         // fetch raw data
100
  int32_t        maxPollWaitTime;
101
  int32_t        minPollRows;
102
  uint16_t       port;
103
  int32_t        autoCommitInterval;
104
  int32_t        sessionTimeoutMs;
105
  int32_t        heartBeatIntervalMs;
106
  int32_t        maxPollIntervalMs;
107
  char*          ip;
108
  char*          user;
109
  char*          pass;
110
  tmq_commit_cb* commitCb;
111
  void*          commitCbUserParam;
112
  int8_t         enableBatchMeta;
113
  char*          token;
114
};
115

116
struct tmq_t {
117
  int64_t        refId;
118
  char           groupId[TSDB_CGROUP_LEN];
119
  char           clientId[TSDB_CLIENT_ID_LEN];
120
  char           user[TSDB_USER_LEN];
121
  char           fqdn[TSDB_FQDN_LEN];
122
  int8_t         withTbName;
123
  int8_t         useSnapshot;
124
  int8_t         autoCommit;
125
  int8_t         enableWalMarker;
126
  int32_t        autoCommitInterval;
127
  int32_t        sessionTimeoutMs;
128
  int32_t        heartBeatIntervalMs;
129
  int32_t        maxPollIntervalMs;
130
  int8_t         resetOffsetCfg;
131
  int8_t         replayEnable;
132
  int8_t         sourceExcluded;  // do not consume, bit
133
  int8_t         rawData;         // fetch raw data
134
  int32_t        maxPollWaitTime;
135
  int32_t        minPollRows;
136
  int64_t        consumerId;
137
  tmq_commit_cb* commitCb;
138
  void*          commitCbUserParam;
139
  int8_t         enableBatchMeta;
140

141
  // status
142
  SRWLatch lock;
143
  int8_t   status;
144
  int32_t  epoch;
145
  // poll info
146
  int64_t pollCnt;
147
  int64_t totalRows;
148
  int8_t  pollFlag;
149

150
  // timer
151
  tmr_h       hbLiveTimer;
152
  tmr_h       epTimer;
153
  tmr_h       commitTimer;
154
  STscObj*    pTscObj;       // connection
155
  SArray*     clientTopics;  // SArray<SMqClientTopic>
156
  STaosQueue* mqueue;        // queue of rsp
157
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
158
  tsem2_t     rspSem;
159

160
  // token
161
  int32_t tokenCode;
162
};
163

164
typedef struct {
165
  int32_t code;
166
  tsem2_t sem;
167
} SAskEpInfo;
168

169
typedef struct {
170
  STqOffsetVal committedOffset;
171
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
172
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
173
  int64_t      walVerBegin;
174
  int64_t      walVerEnd;
175
} SVgOffsetInfo;
176

177
typedef struct {
178
  int64_t       pollCnt;
179
  int64_t       numOfRows;
180
  SVgOffsetInfo offsetInfo;
181
  int32_t       vgId;
182
  int32_t       vgStatus;
183
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
184
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
185
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
186
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
187
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
188
  SEpSet        epSet;
189
} SMqClientVg;
190

191
typedef struct {
192
  char           topicName[TSDB_TOPIC_FNAME_LEN];
193
  char           db[TSDB_DB_FNAME_LEN];
194
  SArray*        vgs;  // SArray<SMqClientVg>
195
  int8_t         noPrivilege;
196
} SMqClientTopic;
197

198
typedef struct {
199
  int32_t         vgId;
200
  char            topicName[TSDB_TOPIC_FNAME_LEN];
201
  SMqClientTopic* topicHandle;
202
  uint64_t        reqId;
203
  SEpSet*         pEpset;
204
  void*           data;
205
  uint32_t        len;
206
  union {
207
    struct{
208
      SMqRspHead   head;
209
      STqOffsetVal rspOffset;
210
    };
211
    SMqDataRsp      dataRsp;
212
    SMqMetaRsp      metaRsp;
213
    SMqBatchMetaRsp batchMetaRsp;
214
  };
215
} SMqPollRspWrapper;
216

217
typedef struct {
218
  int32_t code;
219
  int8_t  tmqRspType;
220
  int32_t epoch;
221
  union{
222
    SMqPollRspWrapper pollRsp;
223
    SMqAskEpRsp       epRsp;
224
  };
225
} SMqRspWrapper;
226

227
typedef struct {
228
  tsem2_t rspSem;
229
  int32_t rspErr;
230
} SMqSubscribeCbParam;
231

232
typedef struct {
233
  int64_t refId;
234
  bool    sync;
235
  void*   pParam;
236
} SMqAskEpCbParam;
237

238
typedef struct {
239
  int64_t  refId;
240
  char     topicName[TSDB_TOPIC_FNAME_LEN];
241
  int32_t  vgId;
242
  uint64_t requestId;  // request id for debug purpose
243
} SMqPollCbParam;
244

245
typedef struct {
246
  tsem2_t       rsp;
247
  int32_t       numOfRsp;
248
  SArray*       pList;
249
  TdThreadMutex mutex;
250
  int64_t       consumerId;
251
  char*         pTopicName;
252
  int32_t       code;
253
} SMqVgCommon;
254

255
typedef struct {
256
  tsem2_t sem;
257
  int32_t code;
258
} SMqSeekParam;
259

260
typedef struct {
261
  tsem2_t     sem;
262
  int32_t     code;
263
  SMqVgOffset vgOffset;
264
} SMqCommittedParam;
265

266
typedef struct {
267
  int32_t      vgId;
268
  int32_t      epoch;
269
  int32_t      totalReq;
270
  SMqVgCommon* pCommon;
271
} SMqVgWalInfoParam;
272

273
typedef struct {
274
  int64_t        refId;
275
  int32_t        epoch;
276
  int32_t        waitingRspNum;
277
  int32_t        code;
278
  tmq_commit_cb* callbackFn;
279
  void*          userParam;
280
} SMqCommitCbParamSet;
281

282
typedef struct {
283
  SMqCommitCbParamSet* params;
284
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
285
  int32_t              vgId;
286
  int64_t              consumerId;
287
} SMqCommitCbParam;
288

289
typedef struct {
290
  tsem2_t sem;
291
  int32_t code;
292
} SSyncCommitInfo;
293

294
typedef struct {
295
  STqOffsetVal currentOffset;
296
  STqOffsetVal commitOffset;
297
  STqOffsetVal seekOffset;
298
  int64_t      numOfRows;
299
  int32_t      vgStatus;
300
  int64_t      walVerBegin;
301
  int64_t      walVerEnd;
302
} SVgroupSaveInfo;
303

304
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
305
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
306
static   SMqMgmt        tmqMgmt = {0};
307

308
tmq_conf_t* tmq_conf_new() {
117,820✔
309
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
117,820✔
310
  if (conf == NULL) {
118,253✔
311
    return conf;
×
312
  }
313

314
  conf->withTbName = false;
118,253✔
315
  conf->autoCommit = true;
118,253✔
316
  conf->enableWalMarker = false;
118,253✔
317
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
118,253✔
318
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
118,253✔
319
  conf->enableBatchMeta = false;
118,253✔
320
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
118,253✔
321
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
118,253✔
322
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
118,253✔
323
  conf->maxPollWaitTime = DEFAULT_MAX_POLL_WAIT_TIME;
118,253✔
324
  conf->minPollRows = DEFAULT_MIN_POLL_ROWS;
118,253✔
325

326
  return conf;
118,253✔
327
}
328

329
void tmq_conf_destroy(tmq_conf_t* conf) {
118,253✔
330
  if (conf) {
118,253✔
331
    if (conf->ip) {
118,253✔
332
      taosMemoryFree(conf->ip);
5,537✔
333
    }
334
    if (conf->user) {
118,253✔
335
      taosMemoryFree(conf->user);
117,477✔
336
    }
337
    if (conf->pass) {
118,253✔
338
      taosMemoryFree(conf->pass);
117,477✔
339
    }
340
    if (conf->token) {
118,253✔
341
      taosMemoryFree(conf->token);
322✔
342
    }
343
    taosMemoryFree(conf);
118,253✔
344
  }
345
}
118,253✔
346

347
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
829,098✔
348
  int32_t code = 0;
829,098✔
349
  if (conf == NULL || key == NULL || value == NULL) {
829,098✔
350
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
351
    return TMQ_CONF_INVALID;
×
352
  }
353
  if (strcasecmp(key, "group.id") == 0) {
829,436✔
354
    if (strchr(value, TMQ_SEPARATOR_CHAR) != NULL) {
118,253✔
355
      tqErrorC("invalid group.id:%s, can not contains ':'", value);
×
356
      return TMQ_CONF_INVALID;
×
357
    }
358
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
118,253✔
359
    return TMQ_CONF_OK;
118,253✔
360
  }
361

362
  if (strcasecmp(key, "client.id") == 0) {
711,183✔
363
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
15,572✔
364
    return TMQ_CONF_OK;
15,572✔
365
  }
366

367
  if (strcasecmp(key, "enable.auto.commit") == 0) {
695,611✔
368
    if (strcasecmp(value, "true") == 0) {
112,017✔
369
      conf->autoCommit = true;
62,925✔
370
      return TMQ_CONF_OK;
62,925✔
371
    } else if (strcasecmp(value, "false") == 0) {
49,092✔
372
      conf->autoCommit = false;
49,092✔
373
      return TMQ_CONF_OK;
49,092✔
374
    } else {
375
      tqErrorC("invalid value for enable.auto.commit:%s", value);
×
376
      return TMQ_CONF_INVALID;
×
377
    }
378
  }
379

380
  if (strcasecmp(key, "enable.wal.marker") == 0) {
583,594✔
381
    if (strcasecmp(value, "true") == 0) {
×
382
      conf->enableWalMarker = true;
×
383
      return TMQ_CONF_OK;
×
384
    } else if (strcasecmp(value, "false") == 0) {
×
385
      conf->enableWalMarker = false;
×
386
      return TMQ_CONF_OK;
×
387
    } else {
388
      tqErrorC("invalid value for enable.wal.marker:%s", value);
×
389
      return TMQ_CONF_INVALID;
×
390
    }
391
  }
392

393
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
583,594✔
394
    int64_t tmp;
83,022✔
395
    code = taosStr2int64(value, &tmp);
89,143✔
396
    if (tmp < 0 || code != 0) {
89,143✔
397
      tqErrorC("invalid value for auto.commit.interval.ms:%s", value);
×
398
      return TMQ_CONF_INVALID;
×
399
    }
400
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
89,143✔
401
    return TMQ_CONF_OK;
89,143✔
402
  }
403

404
  if (strcasecmp(key, "session.timeout.ms") == 0) {
494,451✔
405
    int64_t tmp;
1,400✔
406
    code = taosStr2int64(value, &tmp);
1,675✔
407
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
1,675✔
408
      tqErrorC("invalid value for session.timeout.ms:%s", value);
×
409
      return TMQ_CONF_INVALID;
×
410
    }
411
    conf->sessionTimeoutMs = tmp;
1,675✔
412
    return TMQ_CONF_OK;
1,675✔
413
  }
414

415
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
492,776✔
416
    int64_t tmp;
346✔
417
    code = taosStr2int64(value, &tmp);
346✔
418
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
346✔
419
      tqErrorC("invalid value for heartbeat.interval.ms:%s", value);
346✔
420
      return TMQ_CONF_INVALID;
346✔
421
    }
422
    conf->heartBeatIntervalMs = tmp;
×
423
    return TMQ_CONF_OK;
×
424
  }
425

426
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
492,430✔
427
    int32_t tmp;
1,102✔
428
    code = taosStr2int32(value, &tmp);
1,102✔
429
    if (tmp < 1000 || code != 0) {
1,102✔
430
      tqErrorC("invalid value for max.poll.interval.ms:%s", value);
×
431
      return TMQ_CONF_INVALID;
×
432
    }
433
    conf->maxPollIntervalMs = tmp;
1,102✔
434
    return TMQ_CONF_OK;
1,102✔
435
  }
436

437
  if (strcasecmp(key, "auto.offset.reset") == 0) {
491,328✔
438
    if (strcasecmp(value, "none") == 0) {
106,948✔
439
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
2,648✔
440
      return TMQ_CONF_OK;
2,648✔
441
    } else if (strcasecmp(value, "earliest") == 0) {
104,300✔
442
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
97,595✔
443
      return TMQ_CONF_OK;
97,595✔
444
    } else if (strcasecmp(value, "latest") == 0) {
6,705✔
445
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
6,705✔
446
      return TMQ_CONF_OK;
6,705✔
447
    } else {
448
      tqErrorC("invalid value for auto.offset.reset:%s", value);
×
449
      return TMQ_CONF_INVALID;
×
450
    }
451
  }
452

453
  if (strcasecmp(key, "msg.with.table.name") == 0) {
384,380✔
454
    if (strcasecmp(value, "true") == 0) {
93,223✔
455
      conf->withTbName = true;
87,802✔
456
      return TMQ_CONF_OK;
87,802✔
457
    } else if (strcasecmp(value, "false") == 0) {
5,421✔
458
      conf->withTbName = false;
5,421✔
459
      return TMQ_CONF_OK;
5,421✔
460
    } else {
461
      tqErrorC("invalid value for msg.with.table.name:%s", value);
×
462
      return TMQ_CONF_INVALID;
×
463
    }
464
  }
465

466
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
291,157✔
467
    if (strcasecmp(value, "true") == 0) {
38,252✔
468
      conf->snapEnable = true;
29,830✔
469
      return TMQ_CONF_OK;
29,830✔
470
    } else if (strcasecmp(value, "false") == 0) {
8,422✔
471
      conf->snapEnable = false;
8,422✔
472
      return TMQ_CONF_OK;
8,422✔
473
    } else {
474
      tqErrorC("invalid value for experimental.snapshot.enable:%s", value);
×
475
      return TMQ_CONF_INVALID;
×
476
    }
477
  }
478

479
  if (strcasecmp(key, "td.connect.ip") == 0) {
252,905✔
480
    void *tmp = taosStrdup(value);
5,537✔
481
    if (tmp == NULL) {
5,537✔
482
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
483
      return TMQ_CONF_INVALID;
×
484
    }
485
    taosMemoryFree(conf->ip);
5,537✔
486
    conf->ip = tmp;
5,537✔
487
    return TMQ_CONF_OK;
5,537✔
488
  }
489

490
  if (strcasecmp(key, "td.connect.user") == 0) {
247,368✔
491
    void *tmp = taosStrdup(value);
117,377✔
492
    if (tmp == NULL) {
117,382✔
493
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
494
      return TMQ_CONF_INVALID;
×
495
    }
496
    taosMemoryFree(conf->user);
117,382✔
497
    conf->user = tmp;
117,432✔
498
    return TMQ_CONF_OK;
117,432✔
499
  }
500

501
  if (strcasecmp(key, "td.connect.pass") == 0) {
129,991✔
502
    void *tmp = taosStrdup(value);
117,477✔
503
    if (tmp == NULL) {
117,427✔
504
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
505
      return TMQ_CONF_INVALID;
×
506
    }
507
    taosMemoryFree(conf->pass);
117,427✔
508
    conf->pass = tmp;
117,477✔
509
    return TMQ_CONF_OK;
117,477✔
510
  }
511

512
  if (strcasecmp(key, "td.connect.token") == 0) {
12,514✔
513
    void *tmp = taosStrdup(value);
322✔
514
    if (tmp == NULL) {
322✔
515
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
516
      return TMQ_CONF_INVALID;
×
517
    }
518
    taosMemoryFree(conf->token);
322✔
519
    conf->token = tmp;
322✔
520
    return TMQ_CONF_OK;
322✔
521
  }
522

523
  if (strcasecmp(key, "td.connect.port") == 0) {
12,192✔
524
    int64_t tmp;
4,423✔
525
    code = taosStr2int64(value, &tmp);
4,597✔
526
    if (tmp <= 0 || tmp > 65535 || code != 0) {
4,597✔
527
      tqErrorC("invalid value for td.connect.port:%s", value);
217✔
528
      return TMQ_CONF_INVALID;
217✔
529
    }
530

531
    conf->port = tmp;
4,380✔
532
    return TMQ_CONF_OK;
4,380✔
533
  }
534

535
  if (strcasecmp(key, "enable.replay") == 0) {
7,595✔
536
    if (strcasecmp(value, "true") == 0) {
2,044✔
537
      conf->replayEnable = true;
2,044✔
538
      return TMQ_CONF_OK;
2,044✔
539
    } else if (strcasecmp(value, "false") == 0) {
×
540
      conf->replayEnable = false;
×
541
      return TMQ_CONF_OK;
×
542
    } else {
543
      tqErrorC("invalid value for enable.replay:%s", value);
×
544
      return TMQ_CONF_INVALID;
×
545
    }
546
  }
547
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
5,551✔
548
    int64_t tmp = 0;
1,380✔
549
    code = taosStr2int64(value, &tmp);
1,380✔
550
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
1,380✔
551
    return TMQ_CONF_OK;
1,380✔
552
  }
553
  if (strcasecmp(key, "msg.consume.rawdata") == 0) {
4,171✔
554
    int64_t tmp = 0;
×
555
    code = taosStr2int64(value, &tmp);
×
556
    conf->rawData = (0 == code && tmp != 0) ? 1 : 0;
×
557
    return TMQ_CONF_OK;
×
558
  }
559

560
  if (strcasecmp(key, "fetch.max.wait.ms") == 0) {
4,171✔
561
    int64_t tmp = 0;
756✔
562
    code = taosStr2int64(value, &tmp);
756✔
563
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
756✔
564
      tqErrorC("invalid value for fetch.max.wait.ms:%s", value);
×
565
      return TMQ_CONF_INVALID;
×
566
    }
567
    conf->maxPollWaitTime = tmp;
756✔
568
    return TMQ_CONF_OK;
756✔
569
  }
570

571
  if (strcasecmp(key, "min.poll.rows") == 0) {
3,415✔
572
    int64_t tmp = 0;
3,345✔
573
    code = taosStr2int64(value, &tmp);
3,345✔
574
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
3,345✔
575
      tqErrorC("invalid value for min.poll.rows:%s", value);
×
576
      return TMQ_CONF_INVALID;
×
577
    }
578
    conf->minPollRows = tmp;
3,345✔
579
    return TMQ_CONF_OK;
3,345✔
580
  }
581

582
  if (strcasecmp(key, "td.connect.db") == 0) {
70✔
583
    return TMQ_CONF_OK;
×
584
  }
585

586
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
70✔
587
    int64_t tmp;
×
588
    code = taosStr2int64(value, &tmp);
×
589
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
×
590
    return TMQ_CONF_OK;
×
591
  }
592

593
  tqErrorC("unknown key:%s", key);
70✔
594
  return TMQ_CONF_UNKNOWN;
20✔
595
}
596

597
tmq_list_t* tmq_list_new() {
342,648✔
598
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
342,648✔
599
}
600

601
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
128,438✔
602
  if (list == NULL) {
128,438✔
603
    return TSDB_CODE_INVALID_PARA;
×
604
  }
605
  SArray* container = &list->container;
128,438✔
606
  if (src == NULL || src[0] == 0) {
128,438✔
607
    return TSDB_CODE_INVALID_PARA;
×
608
  }
609
  char* topic = taosStrdup(src);
128,438✔
610
  if (topic == NULL) return terrno;
128,487✔
611
  if (taosArrayPush(container, &topic) == NULL) {
128,105✔
612
    taosMemoryFree(topic);
×
613
    return terrno;
×
614
  }
615
  return 0;
128,105✔
616
}
617

618
void tmq_list_destroy(tmq_list_t* list) {
342,648✔
619
  if (list == NULL) return;
342,648✔
620
  SArray* container = &list->container;
342,648✔
621
  taosArrayDestroyP(container, taosMemFree);
342,648✔
622
}
623

624
int32_t tmq_list_get_size(const tmq_list_t* list) {
9,166✔
625
  if (list == NULL) {
9,166✔
626
    return TSDB_CODE_INVALID_PARA;
×
627
  }
628
  const SArray* container = &list->container;
9,166✔
629
  return taosArrayGetSize(container);
9,166✔
630
}
631

632
char** tmq_list_to_c_array(const tmq_list_t* list) {
5,583✔
633
  if (list == NULL) {
5,583✔
634
    return NULL;
×
635
  }
636
  const SArray* container = &list->container;
5,583✔
637
  return container->pData;
5,583✔
638
}
639

640
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
3,434,484✔
641
  if (pParamSet == NULL) {
3,434,484✔
642
    return TSDB_CODE_INVALID_PARA;
×
643
  }
644
  int64_t refId = pParamSet->refId;
3,434,484✔
645
  int32_t code = 0;
3,434,484✔
646
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
3,434,484✔
647
  if (tmq == NULL) {
3,434,484✔
648
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
649
  }
650

651
  // if no more waiting rsp
652
  if (pParamSet->callbackFn != NULL) {
3,434,484✔
653
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
3,426,526✔
654
  }
655

656
  taosMemoryFree(pParamSet);
3,434,484✔
657
  if (tmq != NULL) {
3,434,146✔
658
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
3,434,146✔
659
  }
660

661
  return code;
3,434,484✔
662
}
663

664
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
6,989,248✔
665
  if (pParamSet == NULL) {
6,989,248✔
666
    return TSDB_CODE_INVALID_PARA;
×
667
  }
668
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
6,989,248✔
669
  if (waitingRspNum == 0) {
6,989,247✔
670
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
3,434,484✔
671
             vgId);
672
    return tmqCommitDone(pParamSet);
3,434,484✔
673
  } else {
674
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
3,554,763✔
675
             waitingRspNum);
676
  }
677
  return 0;
3,554,764✔
678
}
679

680
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
3,565,391✔
681
  if (pBuf){
3,565,391✔
682
    taosMemoryFreeClear(pBuf->pData);
3,565,391✔
683
    taosMemoryFreeClear(pBuf->pEpSet);
3,565,391✔
684
  }
685
  if(param == NULL){
3,565,391✔
686
    return TSDB_CODE_INVALID_PARA;
×
687
  }
688
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
3,565,391✔
689
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
3,565,391✔
690

691
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
3,565,391✔
692
}
693

694
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
3,565,053✔
695
                               SMqCommitCbParamSet* pParamSet) {
696
  if (tmq == NULL || epSet == NULL || offset == NULL || pTopicName == NULL || pParamSet == NULL) {
3,565,053✔
697
    return TSDB_CODE_INVALID_PARA;
×
698
  }
699
  SMqVgOffset pOffset = {0};
3,565,391✔
700

701
  pOffset.consumerId = tmq->consumerId;
3,565,391✔
702
  pOffset.offset.val = *offset;
3,565,053✔
703
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
3,565,053✔
704
  int32_t len = 0;
3,565,053✔
705
  int32_t code = 0;
3,565,053✔
706
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
3,565,053✔
707
  if (code < 0) {
3,565,053✔
708
    return TSDB_CODE_INVALID_PARA;
×
709
  }
710

711
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
3,565,053✔
712
  if (buf == NULL) {
3,565,053✔
713
    return terrno;
×
714
  }
715

716
  ((SMsgHead*)buf)->vgId = htonl(vgId);
3,565,053✔
717

718
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
3,564,715✔
719

720
  SEncoder encoder = {0};
3,564,715✔
721
  tEncoderInit(&encoder, abuf, len);
3,564,039✔
722
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
3,565,053✔
723
    tEncoderClear(&encoder);
×
724
    taosMemoryFree(buf);
×
725
    return TSDB_CODE_INVALID_PARA;
×
726
  }
727
  tEncoderClear(&encoder);
3,565,053✔
728

729
  // build param
730
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
3,565,053✔
731
  if (pParam == NULL) {
3,564,377✔
732
    taosMemoryFree(buf);
×
733
    return terrno;
×
734
  }
735

736
  pParam->params = pParamSet;
3,564,377✔
737
  pParam->vgId = vgId;
3,564,715✔
738
  pParam->consumerId = tmq->consumerId;
3,564,715✔
739

740
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
3,564,377✔
741

742
  // build send info
743
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,564,377✔
744
  if (pMsgSendInfo == NULL) {
3,565,053✔
745
    taosMemoryFree(buf);
×
746
    taosMemoryFree(pParam);
×
747
    return terrno;
×
748
  }
749

750
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
3,565,053✔
751

752
  pMsgSendInfo->requestId = generateRequestId();
3,564,377✔
753
  pMsgSendInfo->requestObjRefId = 0;
3,565,391✔
754
  pMsgSendInfo->param = pParam;
3,565,053✔
755
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
3,565,391✔
756
  pMsgSendInfo->fp = tmqCommitCb;
3,565,053✔
757
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
3,564,715✔
758

759
  // int64_t transporterId = 0;
760
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
3,565,391✔
761
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
3,565,391✔
762
  if (code != 0) {
3,565,391✔
763
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
764
  }
765
  return code;
3,565,391✔
766
}
767

768
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
55,383✔
769
  if (tmq == NULL || pTopicName == NULL || topic == NULL) {
55,383✔
770
    return TSDB_CODE_INVALID_PARA;
×
771
  }
772
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
55,383✔
773
  for (int32_t i = 0; i < numOfTopics; ++i) {
55,442✔
774
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
31,509✔
775
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
31,509✔
776
      continue;
59✔
777
    }
778
    *topic = pTopic;
31,450✔
779
    return 0;
31,450✔
780
  }
781

782
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
23,933✔
783
  return TSDB_CODE_TMQ_INVALID_TOPIC;
23,933✔
784
}
785

786
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
3,436,790✔
787
                                       SMqCommitCbParamSet** ppParamSet) {
788
  if (tmq == NULL || ppParamSet == NULL) {
3,436,790✔
789
    return TSDB_CODE_INVALID_PARA;
×
790
  }
791
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
3,436,790✔
792
  if (pParamSet == NULL) {
3,436,790✔
793
    return terrno;
×
794
  }
795

796
  pParamSet->refId = tmq->refId;
3,436,790✔
797
  pParamSet->epoch = atomic_load_32(&tmq->epoch);
3,436,790✔
798
  pParamSet->callbackFn = pCommitFp;
3,436,790✔
799
  pParamSet->userParam = userParam;
3,436,790✔
800
  pParamSet->waitingRspNum = rspNum;
3,436,790✔
801
  *ppParamSet = pParamSet;
3,436,790✔
802
  return 0;
3,436,790✔
803
}
804

805
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
53,303✔
806
  if (tmq == NULL || pTopicName == NULL || pVg == NULL) {
53,303✔
807
    return TSDB_CODE_INVALID_PARA;
×
808
  }
809
  SMqClientTopic* pTopic = NULL;
53,303✔
810
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
53,303✔
811
  if (code != 0) {
53,303✔
812
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
23,933✔
813
    return code;
23,933✔
814
  }
815

816
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
29,370✔
817
  for (int32_t i = 0; i < numOfVgs; ++i) {
38,247✔
818
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
37,496✔
819
    if (pClientVg && pClientVg->vgId == vgId) {
37,496✔
820
      *pVg = pClientVg;
28,619✔
821
      break;
28,619✔
822
    }
823
  }
824

825
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
29,370✔
826
}
827

828
static int32_t sendWalMarkMsgToMnodeCb(void* param, SDataBuf* pMsg, int32_t code) {
×
829
  if (pMsg) {
×
830
    taosMemoryFreeClear(pMsg->pEpSet);
×
831
    taosMemoryFreeClear(pMsg->pData);
×
832
  }
833
  tqDebugC("sendWalMarkMsgToMnodeCb code:%d", code);
×
834
  return 0;
×
835
}
836

837
static void asyncSendWalMarkMsgToMnode(tmq_t* tmq, int32_t vgId, int64_t keepVersion) {
×
838
  if (tmq == NULL) return ;
×
839
  void*           buf = NULL;
×
840
  SMsgSendInfo*   sendInfo = NULL;
×
841
  SMndSetVgroupKeepVersionReq req = {0};
×
842

843
  tqDebugC("consumer:0x%" PRIx64 " send vgId:%d keepVersion:%"PRId64, tmq->consumerId, vgId, keepVersion);
×
844
  req.vgId = vgId;
×
845
  req.keepVersion = keepVersion;
×
846

847
  int32_t tlen = tSerializeSMndSetVgroupKeepVersionReq(NULL, 0, &req);
×
848
  buf = taosMemoryMalloc(tlen);
×
849
  if (buf == NULL) {
×
850
    return;
×
851
  }
852
  tlen = tSerializeSMndSetVgroupKeepVersionReq(buf, tlen, &req);
×
853

854
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
855
  if (sendInfo == NULL) {
×
856
    taosMemoryFree(buf);
×
857
    return;
×
858
  }
859

860
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
×
861
  sendInfo->requestId = generateRequestId();
×
862
  sendInfo->fp = sendWalMarkMsgToMnodeCb;
×
863
  sendInfo->msgType = TDMT_MND_SET_VGROUP_KEEP_VERSION;
×
864

865
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
×
866

867
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
868
  if (code != 0) {
×
869
    tqErrorC("consumer:0x%" PRIx64 " send wal mark msg to mnode failed, code:%s", tmq->consumerId,
×
870
             tstrerror(terrno));
871
  }
872
}
873

874
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
5,669,132✔
875
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) {
5,669,132✔
876
    return TSDB_CODE_INVALID_PARA;
×
877
  }
878
  int32_t code = 0;
5,669,132✔
879
  if (offsetVal->type <= 0) {
5,669,132✔
880
    code = TSDB_CODE_TMQ_INVALID_MSG;
191,055✔
881
    return code;
191,055✔
882
  }
883
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
5,478,077✔
884
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
1,912,686✔
885
    return code;
1,912,686✔
886
  }
887
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
3,565,391✔
888
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
3,565,391✔
889

890
  char commitBuf[TSDB_OFFSET_LEN] = {0};
3,565,053✔
891
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
3,565,053✔
892

893
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
3,565,391✔
894
  if (code != TSDB_CODE_SUCCESS) {
3,565,391✔
895
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
896
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
897
    return code;
×
898
  }
899

900
  if (tmq->enableWalMarker && offsetVal->type == TMQ_OFFSET__LOG) {
3,565,391✔
901
    asyncSendWalMarkMsgToMnode(tmq, pVg->vgId, offsetVal->version);
×
902
  }
903
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
3,565,391✔
904
           tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
905
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
3,565,391✔
906
  return code;
3,565,391✔
907
}
908

909
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
12,933✔
910
                                 tmq_commit_cb* pCommitFp, void* userParam) {
911
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL) {
12,933✔
912
    return TSDB_CODE_INVALID_PARA;
×
913
  }
914
  tqDebugC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
12,933✔
915
  SMqCommitCbParamSet* pParamSet = NULL;
12,933✔
916
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
12,933✔
917
  if (code != 0){
12,933✔
918
    return code;
×
919
  }
920

921
  taosRLockLatch(&tmq->lock);
12,933✔
922
  SMqClientVg* pVg = NULL;
12,933✔
923
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
12,933✔
924
  if (code == 0) {
12,933✔
925
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
12,933✔
926
  }
927
  taosRUnLockLatch(&tmq->lock);
12,933✔
928

929
  if (code != 0){
12,933✔
930
    taosMemoryFree(pParamSet);
2,306✔
931
  }
932
  return code;
12,933✔
933
}
934

935
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
9,007✔
936
  char*        pTopicName = NULL;
9,007✔
937
  int32_t      vgId = 0;
9,007✔
938
  STqOffsetVal offsetVal = {0};
9,007✔
939
  int32_t      code = 0;
9,007✔
940

941
  if (pRes == NULL || tmq == NULL) {
9,007✔
942
    code = TSDB_CODE_INVALID_PARA;
×
943
    goto end;
×
944
  }
945

946
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
9,007✔
947
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
948
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
9,007✔
949
    pTopicName = pRspObj->topic;
9,007✔
950
    vgId = pRspObj->vgId;
9,007✔
951
    offsetVal = pRspObj->rspOffset;
9,007✔
952
  } else {
953
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
954
    goto end;
×
955
  }
956

957
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
9,007✔
958

959
  end:
9,007✔
960
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
9,007✔
961
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
962
    pCommitFp(tmq, code, userParam);
×
963
  }
964
}
9,007✔
965

966
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
3,423,857✔
967
  if (tmq == NULL || pParamSet == NULL) {
3,423,857✔
968
    return TSDB_CODE_INVALID_PARA;
×
969
  }
970
  int32_t code = 0;
3,423,857✔
971
  taosRLockLatch(&tmq->lock);
3,423,857✔
972
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
3,423,857✔
973
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
3,423,857✔
974

975
  for (int32_t i = 0; i < numOfTopics; i++) {
6,781,545✔
976
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
3,357,688✔
977
    if (pTopic == NULL) {
3,357,688✔
978
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
979
      goto END;
×
980
    }
981
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
3,357,688✔
982
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
3,357,688✔
983
    for (int32_t j = 0; j < numOfVgroups; j++) {
9,013,887✔
984
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
5,656,199✔
985
      if (pVg == NULL) {
5,656,199✔
986
        code = terrno;
×
987
        goto END;
×
988
      }
989

990
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
5,656,199✔
991
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
5,656,199✔
992
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
191,055✔
993
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
994
      }
995
    }
996
  }
997
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
3,423,857✔
998
           numOfTopics);
999
  END:
25,190✔
1000
  taosRUnLockLatch(&tmq->lock);
3,423,857✔
1001
  return code;
3,423,857✔
1002
}
1003

1004
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
3,423,857✔
1005
  if (tmq == NULL) {
3,423,857✔
1006
    return;
×
1007
  }
1008
  int32_t code = 0;
3,423,857✔
1009
  SMqCommitCbParamSet* pParamSet = NULL;
3,423,857✔
1010
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
1011
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
3,423,857✔
1012
  if (code != 0) {
3,423,857✔
1013
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
1014
    if (pCommitFp != NULL) {
×
1015
      pCommitFp(tmq, code, userParam);
×
1016
    }
1017
    return;
×
1018
  }
1019
  code = innerCommitAll(tmq, pParamSet);
3,423,857✔
1020
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
3,423,857✔
1021
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
36,228✔
1022
  }
1023

1024
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
3,423,857✔
1025
  if (code != 0) {
3,423,857✔
1026
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
1027
  }
1028
  return;
3,423,857✔
1029
}
1030

1031
static void generateTimedTask(int64_t refId, int32_t type) {
5,014,482✔
1032
  tmq_t*  tmq = NULL;
5,014,482✔
1033
  int8_t* pTaskType = NULL;
5,014,482✔
1034
  int32_t code = 0;
5,014,482✔
1035

1036
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
5,014,482✔
1037
  if (tmq == NULL) return;
5,014,482✔
1038

1039
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
5,013,853✔
1040
  if (code == TSDB_CODE_SUCCESS) {
5,013,853✔
1041
    *pTaskType = type;
5,013,853✔
1042
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
5,013,853✔
1043
      if (tsem2_post(&tmq->rspSem) != 0){
5,013,853✔
1044
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
1045
      }
1046
    }else{
1047
      taosFreeQitem(pTaskType);
×
1048
    }
1049
  }
1050

1051
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
5,013,853✔
1052
  if (code != 0){
5,013,853✔
1053
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
1054
  }
1055
}
1056

1057
void tmqAssignAskEpTask(void* param, void* tmrId) {
1,751,118✔
1058
  int64_t refId = (int64_t)param;
1,751,118✔
1059
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
1,751,118✔
1060
}
1,751,118✔
1061

1062
void tmqReplayTask(void* param, void* tmrId) {
3,796✔
1063
  int64_t refId = (int64_t)param;
3,796✔
1064
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
3,796✔
1065
  if (tmq == NULL) return;
3,796✔
1066

1067
  if (tsem2_post(&tmq->rspSem) != 0){
3,796✔
1068
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
1069
  }
1070
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
3,796✔
1071
  if (code != 0){
3,796✔
1072
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
1073
  }
1074
}
1075

1076
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
3,263,364✔
1077
  int64_t refId = (int64_t)param;
3,263,364✔
1078
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
3,263,364✔
1079
}
3,263,364✔
1080

1081
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
858,915✔
1082
  if (pMsg == NULL || param == NULL) {
858,915✔
1083
    return TSDB_CODE_INVALID_PARA;
×
1084
  }
1085

1086
  int64_t refId = (int64_t)param;
858,915✔
1087
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
858,915✔
1088

1089
  if (tmq == NULL) {
858,915✔
1090
    goto END;
868✔
1091
  }
1092

1093
  atomic_store_32(&tmq->tokenCode, code);
858,047✔
1094
  if (code != 0){
858,047✔
1095
    goto END;
7,715✔
1096
  }
1097

1098
  SMqHbRsp rsp = {0};
850,332✔
1099
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
850,332✔
1100
  if (code != 0) {
850,332✔
1101
    goto END;
×
1102
  }
1103

1104
  taosWLockLatch(&tmq->lock);
850,332✔
1105
  for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
1,627,172✔
1106
    STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
776,840✔
1107
    if (privilege == NULL) {
776,840✔
1108
      continue;
×
1109
    }
1110
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
776,840✔
1111
    for (int32_t j = 0; j < topicNumCur; j++) {
1,631,299✔
1112
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
854,459✔
1113
      if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0 && pTopicCur->noPrivilege != privilege->noPrivilege) {
854,459✔
1114
        tqInfoC("consumer:0x%" PRIx64 ", update privilege:%s, topic:%s", tmq->consumerId, privilege->noPrivilege ? "false" : "true", privilege->topic);
154✔
1115
        pTopicCur->noPrivilege = privilege->noPrivilege;
154✔
1116
      }
1117
    }
1118
  }
1119
  taosWUnLockLatch(&tmq->lock);
850,332✔
1120

1121
  tqClientDebugFlag = rsp.debugFlag;
850,332✔
1122

1123
  tDestroySMqHbRsp(&rsp);
850,332✔
1124

1125
END:
858,915✔
1126
  taosMemoryFree(pMsg->pData);
858,915✔
1127
  taosMemoryFree(pMsg->pEpSet);
858,915✔
1128
  if (tmq != NULL) {
858,915✔
1129
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
858,047✔
1130
    if (ret != 0){
858,047✔
1131
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1132
    }
1133
  }
1134
  if (code != 0){
858,915✔
1135
    tqErrorC("failed to process heartbeat, refId:%"PRId64 ", code:%d", refId, code);
8,053✔
1136
  }
1137
  return code;
858,915✔
1138
}
1139

1140
void tmqSendHbReq(void* param, void* tmrId) {
859,253✔
1141
  int64_t refId = (int64_t)param;
859,253✔
1142

1143
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
859,253✔
1144
  if (tmq == NULL) {
859,253✔
1145
    return;
338✔
1146
  }
1147

1148
  SMqHbReq req = {0};
858,915✔
1149
  req.consumerId = tmq->consumerId;
858,915✔
1150
  req.epoch = atomic_load_32(&tmq->epoch);
858,915✔
1151
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
858,915✔
1152
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
858,915✔
1153
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
858,915✔
1154
  if (req.topics == NULL) {
858,915✔
1155
    goto END;
×
1156
  }
1157
  taosRLockLatch(&tmq->lock);
858,915✔
1158
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
1,653,272✔
1159
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
794,357✔
1160
    if (pTopic == NULL) {
794,357✔
1161
      continue;
×
1162
    }
1163
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
794,357✔
1164
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
794,357✔
1165
    if (data == NULL) {
794,357✔
1166
      continue;
×
1167
    }
1168
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
794,357✔
1169
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
794,357✔
1170
    if (data->offsetRows == NULL) {
794,357✔
1171
      continue;
×
1172
    }
1173
    for (int j = 0; j < numOfVgroups; j++) {
2,889,890✔
1174
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
2,095,533✔
1175
      if (pVg == NULL) {
2,095,533✔
1176
        continue;
×
1177
      }
1178
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
2,095,533✔
1179
      if (offRows == NULL) {
2,095,533✔
1180
        continue;
×
1181
      }
1182
      offRows->vgId = pVg->vgId;
2,095,533✔
1183
      offRows->rows = pVg->numOfRows;
2,095,533✔
1184
      offRows->offset = pVg->offsetInfo.endOffset;
2,095,533✔
1185
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
2,095,533✔
1186
      char buf[TSDB_OFFSET_LEN] = {0};
2,095,533✔
1187
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
2,095,533✔
1188
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
2,095,533✔
1189
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1190
    }
1191
  }
1192
  taosRUnLockLatch(&tmq->lock);
858,915✔
1193

1194
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
858,915✔
1195
  if (tlen < 0) {
858,915✔
1196
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1197
    goto END;
×
1198
  }
1199

1200
  void* pReq = taosMemoryCalloc(1, tlen);
858,915✔
1201
  if (pReq == NULL) {
858,915✔
1202
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1203
    goto END;
×
1204
  }
1205

1206
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
858,915✔
1207
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1208
    taosMemoryFree(pReq);
×
1209
    goto END;
×
1210
  }
1211

1212
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
858,915✔
1213
  if (sendInfo == NULL) {
858,915✔
1214
    taosMemoryFree(pReq);
×
1215
    goto END;
×
1216
  }
1217

1218
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
858,915✔
1219

1220
  sendInfo->requestId = generateRequestId();
858,915✔
1221
  sendInfo->requestObjRefId = 0;
858,915✔
1222
  sendInfo->param = (void*)refId;
858,915✔
1223
  sendInfo->fp = tmqHbCb;
858,915✔
1224
  sendInfo->msgType = TDMT_MND_TMQ_HB;
858,915✔
1225

1226

1227
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
858,915✔
1228

1229
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
858,915✔
1230
  if (code != 0) {
858,915✔
1231
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1232
  }
1233
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
858,915✔
1234

1235
  END:
858,915✔
1236
  tDestroySMqHbReq(&req);
858,915✔
1237
  if (tmrId != NULL) {
858,915✔
1238
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
647,854✔
1239
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
647,854✔
1240
  }
1241
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
858,915✔
1242
  if (ret != 0){
858,915✔
1243
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1244
  }
1245
}
1246

1247
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
118,101✔
1248
  if (code != 0 && pTmq != NULL) {
118,101✔
1249
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1250
  }
1251
}
118,101✔
1252

1253
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
17,439,546✔
1254
  if (rspWrapper == NULL) {
17,439,546✔
1255
    return;
×
1256
  }
1257
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
17,439,546✔
1258
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
2,325,917✔
1259
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
15,113,629✔
1260
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
15,085,708✔
1261
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
27,921✔
1262
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
2,079✔
1263
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
25,842✔
1264
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
22,534✔
1265
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
3,308✔
1266
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
3,308✔
1267
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
1268
    DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
×
1269
  }
1270
}
1271

1272
static void freeClientVg(void* param) {
1,092,257✔
1273
  if (param == NULL) {
1,092,257✔
1274
    return;
×
1275
  }
1276
  SMqClientVg* pVg = param;
1,092,257✔
1277
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
1,092,257✔
1278
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
1,092,257✔
1279
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
1,092,257✔
1280
}
1281
static void freeClientTopic(void* param) {
816,670✔
1282
  if (param == NULL) {
816,670✔
1283
    return;
×
1284
  }
1285
  SMqClientTopic* pTopic = param;
816,670✔
1286
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
816,670✔
1287
}
1288

1289
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
817,389✔
1290
                                   tmq_t* tmq) {
1291
  if (pTopic == NULL || pTopicEp == NULL || pVgOffsetHashMap == NULL || tmq == NULL) {
817,389✔
1292
    return;
×
1293
  }
1294

1295
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
817,389✔
1296
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
817,389✔
1297

1298
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
817,389✔
1299
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
817,389✔
1300

1301
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
817,389✔
1302
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
817,389✔
1303
  if (pTopic->vgs == NULL) {
817,389✔
1304
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1305
    return;
×
1306
  }
1307
  for (int32_t j = 0; j < vgNumGet; j++) {
1,916,305✔
1308
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
1,098,916✔
1309
    if (pVgEp == NULL) {
1,098,916✔
1310
      continue;
×
1311
    }
1312
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
1,098,916✔
1313
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
1,098,916✔
1314

1315
    STqOffsetVal offsetNew = {0};
1,098,916✔
1316
    offsetNew.type = tmq->resetOffsetCfg;
1,098,916✔
1317

1318
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
1,098,916✔
1319
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1320

1321
    SMqClientVg clientVg = {
2,219,406✔
1322
        .pollCnt = 0,
1323
        .vgId = pVgEp->vgId,
1,098,916✔
1324
        .epSet = pVgEp->epSet,
1325
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
1,098,916✔
1326
        .vgSkipCnt = 0,
1327
        .emptyBlockReceiveTs = 0,
1328
        .blockReceiveTs = 0,
1329
        .blockSleepForReplay = 0,
1330
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
1,098,916✔
1331
    };
1332

1333
    clientVg.offsetInfo.walVerBegin = -1;
1,098,916✔
1334
    clientVg.offsetInfo.walVerEnd = -1;
1,098,916✔
1335
    clientVg.seekUpdated = false;
1,098,916✔
1336
    if (pInfo) {
1,098,916✔
1337
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
696,438✔
1338
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
696,438✔
1339
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
696,438✔
1340
      clientVg.offsetInfo.walVerBegin = pInfo->walVerBegin;
696,438✔
1341
      clientVg.offsetInfo.walVerEnd = pInfo->walVerEnd;
696,438✔
1342
    } else {
1343
      clientVg.offsetInfo.endOffset = offsetNew;
402,478✔
1344
      clientVg.offsetInfo.committedOffset = offsetNew;
402,478✔
1345
      clientVg.offsetInfo.beginOffset = offsetNew;
402,478✔
1346
    }
1347
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
2,197,832✔
1348
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1349
               pTopic->topicName);
1350
      freeClientVg(&clientVg);
×
1351
    }
1352
  }
1353
}
1354

1355
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
810,973✔
1356
  if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
810,973✔
1357
    return;
×
1358
  }
1359
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
810,973✔
1360
  if (pVgOffsetHashMap == NULL) {
810,973✔
1361
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1362
    return;
×
1363
  }
1364

1365
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
810,973✔
1366
  for (int32_t i = 0; i < topicNumCur; i++) {
1,506,526✔
1367
    // find old topic
1368
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
695,553✔
1369
    if (pTopicCur && pTopicCur->vgs) {
695,553✔
1370
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
695,553✔
1371
      tqInfoC("consumer:0x%" PRIx64 ", current vg num:%d", tmq->consumerId, vgNumCur);
695,553✔
1372
      for (int32_t j = 0; j < vgNumCur; j++) {
1,395,236✔
1373
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
699,683✔
1374
        if (pVgCur == NULL) {
699,683✔
1375
          continue;
×
1376
        }
1377
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
699,683✔
1378
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
699,683✔
1379

1380
        char buf[TSDB_OFFSET_LEN] = {0};
699,683✔
1381
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
699,683✔
1382
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
699,683✔
1383

1384
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
699,683✔
1385
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1386
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1387
            .numOfRows = pVgCur->numOfRows,
699,683✔
1388
            .vgStatus = pVgCur->vgStatus,
699,683✔
1389
            .walVerBegin = pVgCur->offsetInfo.walVerBegin,
699,683✔
1390
            .walVerEnd = pVgCur->offsetInfo.walVerEnd
699,683✔
1391
        };
1392
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
699,683✔
1393
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1394
        }
1395
      }
1396
    }
1397
  }
1398

1399
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
1,628,362✔
1400
    SMqClientTopic topic = {0};
817,389✔
1401
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
817,389✔
1402
    if (pTopicEp == NULL) {
817,389✔
1403
      continue;
×
1404
    }
1405
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
817,389✔
1406
    if (taosArrayPush(newTopics, &topic) == NULL) {
817,389✔
1407
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1408
      freeClientTopic(&topic);
×
1409
    }
1410
  }
1411

1412
  taosHashCleanup(pVgOffsetHashMap);
810,973✔
1413
}
1414

1415
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
2,658,330✔
1416
  if (tmq == NULL || pRsp == NULL) {
2,658,330✔
1417
    return;
×
1418
  }
1419
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
2,658,330✔
1420
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1421
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1422
  if (epoch < atomic_load_32(&tmq->epoch) || (epoch == atomic_load_32(&tmq->epoch) && topicNumGet == 0)) {
2,658,330✔
1423
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
1,638,273✔
1424
             tmq->epoch, epoch, topicNumGet);
1425
    return;
1,638,273✔
1426
  }
1427

1428
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
1,020,057✔
1429
  if (newTopics == NULL) {
1,020,057✔
1430
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1431
    return;
×
1432
  }
1433
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1,020,057✔
1434
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1435

1436
  taosWLockLatch(&tmq->lock);
1,020,057✔
1437
  if (topicNumGet > 0){
1,020,057✔
1438
    buildNewTopicList(tmq, newTopics, pRsp);
810,973✔
1439
  }
1440
  // destroy current buffered existed topics info
1441
  if (tmq->clientTopics) {
1,020,057✔
1442
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
1,020,057✔
1443
  }
1444
  tmq->clientTopics = newTopics;
1,020,057✔
1445
  taosWUnLockLatch(&tmq->lock);
1,020,057✔
1446

1447
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1,020,057✔
1448
  atomic_store_32(&tmq->epoch, epoch);
1,020,057✔
1449

1450
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
1,020,057✔
1451
}
1452

1453
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
3,303,134✔
1454
  SMqAskEpRsp rsp = {0};
3,303,134✔
1455

1456
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
3,303,134✔
1457
  if (pParam == NULL) {
3,303,134✔
1458
    goto _ERR;
×
1459
  }
1460

1461
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
3,303,134✔
1462
  if (tmq == NULL) {
3,303,134✔
1463
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1464
    goto _ERR;
×
1465
  }
1466

1467
  if (code != TSDB_CODE_SUCCESS) {
3,303,134✔
1468
    tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
5,451✔
1469
    if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){
5,451✔
1470
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
3,197✔
1471
    }
1472
    goto END;
5,451✔
1473
  }
1474

1475
  if (pMsg == NULL) {
3,297,683✔
1476
    goto END;
×
1477
  }
1478
  SMqRspHead* head = pMsg->pData;
3,297,683✔
1479
  int32_t     epoch = atomic_load_32(&tmq->epoch);
3,297,683✔
1480
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
3,297,683✔
1481

1482
  if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) == NULL) {
6,595,366✔
1483
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
1484
    tqErrorC("consumer:0x%" PRIx64 ", decode ep rsp failed", tmq->consumerId);
×
1485
    goto END;
×
1486
  }
1487

1488
  if (rsp.code != TSDB_CODE_SUCCESS) {
3,297,683✔
1489
    code = rsp.code;
637,827✔
1490
    goto END;
637,827✔
1491
  }
1492

1493
  if (pParam->sync) {
2,659,856✔
1494
    doUpdateLocalEp(tmq, head->epoch, &rsp);
333,939✔
1495
  } else {
1496
    SMqRspWrapper* pWrapper = NULL;
2,325,917✔
1497
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
2,325,917✔
1498
    if (code) {
2,325,917✔
1499
      goto END;
×
1500
    }
1501

1502
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
2,325,917✔
1503
    pWrapper->epoch = head->epoch;
2,325,917✔
1504
    TSWAP(pWrapper->epRsp, rsp);
2,325,917✔
1505
    code = taosWriteQitem(tmq->mqueue, pWrapper);
2,325,917✔
1506
    if (code != 0) {
2,325,917✔
1507
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1508
      taosFreeQitem(pWrapper);
×
1509
      tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1510
    }
1511
  }
1512

1513
END:
3,303,134✔
1514
  tDeleteSMqAskEpRsp(&rsp);
1515
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
3,303,134✔
1516
  if (ret != 0){
3,303,134✔
1517
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1518
  }
1519

1520
_ERR:
3,303,134✔
1521
  if (pParam && pParam->sync) {
3,303,134✔
1522
    SAskEpInfo* pInfo = pParam->pParam;
973,626✔
1523
    if (pInfo) {
973,626✔
1524
      pInfo->code = code;
973,626✔
1525
      if (tsem2_post(&pInfo->sem) != 0){
973,626✔
1526
        tqErrorC("failed to post rsp sem askep cb");
×
1527
      }
1528
    }
1529
  }
1530

1531
  if (pMsg) {
3,303,134✔
1532
    taosMemoryFree(pMsg->pEpSet);
3,303,134✔
1533
    taosMemoryFree(pMsg->pData);
3,303,134✔
1534
  }
1535

1536
  return code;
3,303,134✔
1537
}
1538

1539
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
3,303,106✔
1540
  if (pTmq == NULL) {
3,303,106✔
1541
    return TSDB_CODE_INVALID_PARA;
×
1542
  }
1543
  int32_t code = 0;
3,303,106✔
1544
  int32_t lino = 0;
3,303,106✔
1545
  SMqAskEpReq req = {0};
3,303,106✔
1546
  req.consumerId = pTmq->consumerId;
3,303,106✔
1547
  req.epoch = updateEpSet ? -1 : atomic_load_32(&pTmq->epoch);
3,303,106✔
1548
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
3,303,106✔
1549
  SMqAskEpCbParam* pParam = NULL;
3,303,134✔
1550
  void*            pReq = NULL;
3,303,134✔
1551

1552
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
3,303,134✔
1553
  TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
3,303,134✔
1554
  pReq = taosMemoryCalloc(1, tlen);
3,303,134✔
1555
  TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
3,303,134✔
1556

1557
  code = tSerializeSMqAskEpReq(pReq, tlen, &req);
3,303,134✔
1558
  TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
3,302,058✔
1559

1560
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
3,302,058✔
1561
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
3,302,030✔
1562

1563
  pParam->refId = pTmq->refId;
3,302,030✔
1564
  pParam->sync = sync;
3,302,753✔
1565
  pParam->pParam = param;
3,303,106✔
1566

1567
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,302,764✔
1568
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
3,302,019✔
1569

1570
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
3,302,019✔
1571
  sendInfo->requestId = generateRequestId();
3,302,753✔
1572
  sendInfo->requestObjRefId = 0;
3,302,753✔
1573
  sendInfo->param = pParam;
3,302,753✔
1574
  sendInfo->paramFreeFp = taosAutoMemoryFree;
3,303,106✔
1575
  sendInfo->fp = askEpCb;
3,303,106✔
1576
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
3,301,963✔
1577

1578
  pReq = NULL;
3,301,963✔
1579
  pParam = NULL;
3,301,963✔
1580

1581
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
3,301,963✔
1582
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode, QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
3,303,078✔
1583
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
3,303,106✔
1584

1585
END:
3,303,134✔
1586
  if (code != 0) {
3,303,134✔
1587
    tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
×
1588
  }
1589
  taosMemoryFree(pReq);
3,303,134✔
1590
  taosMemoryFree(pParam);
3,303,134✔
1591
  return code;
3,303,106✔
1592
}
1593

1594
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
24,323,780✔
1595
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
24,323,780✔
1596
  while (1) {
4,867,071✔
1597
    int8_t* pTaskType = NULL;
29,190,851✔
1598
    taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
29,190,823✔
1599
    if (pTaskType == NULL) {break;}
29,190,823✔
1600
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
4,867,043✔
1601
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
1,643,831✔
1602
      int32_t code = askEp(pTmq, NULL, false, false);
1,643,831✔
1603
      if (code != 0) {
1,643,831✔
1604
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1605
      }
1606
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
1,643,831✔
1607
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
1,643,831✔
1608
                              &pTmq->epTimer);
1609
      tqDebugC("reset timer for tmq ask ep:%d", ret);
1,643,831✔
1610
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
3,223,212✔
1611
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
3,223,240✔
1612
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
3,223,240✔
1613
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
3,223,240✔
1614
               pTmq->autoCommitInterval / 1000.0);
1615
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
3,223,240✔
1616
                              &pTmq->commitTimer);
1617
      tqDebugC("reset timer for commit:%d", ret);
3,223,240✔
1618
    } else {
1619
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1620
    }
1621

1622
    taosFreeQitem(pTaskType);
4,867,071✔
1623
  }
1624

1625
  return 0;
24,323,780✔
1626
}
1627

1628
void tmqClearUnhandleMsg(tmq_t* tmq) {
327,226✔
1629
  if (tmq == NULL) return;
327,226✔
1630
  while (1) {
186,658✔
1631
    SMqRspWrapper* rspWrapper = NULL;
513,884✔
1632
    taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
513,884✔
1633
    if (rspWrapper == NULL) break;
513,884✔
1634
    tmqFreeRspWrapper(rspWrapper);
186,658✔
1635
    taosFreeQitem(rspWrapper);
186,658✔
1636
  }
1637
}
1638

1639
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
337,480✔
1640
  if (pMsg) {
337,480✔
1641
    taosMemoryFreeClear(pMsg->pEpSet);
337,480✔
1642
    taosMemoryFreeClear(pMsg->pData);
337,480✔
1643
  }
1644

1645
  if (param == NULL) {
337,480✔
1646
    return code;
×
1647
  }
1648

1649
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
337,480✔
1650
  pParam->rspErr = code;
337,480✔
1651

1652
  if (tsem2_post(&pParam->rspSem) != 0){
337,480✔
1653
    tqErrorC("failed to post sem, subscribe cb");
×
1654
  }
1655
  return 0;
337,480✔
1656
}
1657

1658
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
9,266✔
1659
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
9,266✔
1660
  if (*topics == NULL) {
9,266✔
1661
    *topics = tmq_list_new();
7,272✔
1662
    if (*topics == NULL) {
7,272✔
1663
      return terrno;
×
1664
    }
1665
  }
1666
  taosRLockLatch(&tmq->lock);
9,266✔
1667
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
13,342✔
1668
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
4,076✔
1669
    if (topic == NULL) {
4,076✔
1670
      tqErrorC("topic is null");
×
1671
      continue;
×
1672
    }
1673
    char* tmp = strchr(topic->topicName, '.');
4,076✔
1674
    if (tmp == NULL) {
4,076✔
1675
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1676
      continue;
×
1677
    }
1678
    if (tmq_list_append(*topics, tmp + 1) != 0) {
4,076✔
1679
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1680
      continue;
×
1681
    }
1682
  }
1683
  taosRUnLockLatch(&tmq->lock);
9,266✔
1684
  return 0;
9,266✔
1685
}
1686

1687
void tmqFreeImpl(void* handle) {
116,165✔
1688
  if (handle == NULL) return;
116,165✔
1689
  tmq_t*  tmq = (tmq_t*)handle;
116,165✔
1690
  int64_t id = tmq->consumerId;
116,165✔
1691

1692
  if (tmq->mqueue) {
116,165✔
1693
    tmqClearUnhandleMsg(tmq);
116,165✔
1694
    taosCloseQueue(tmq->mqueue);
116,165✔
1695
  }
1696

1697
  if (tmq->delayedTask) {
116,165✔
1698
    taosCloseQueue(tmq->delayedTask);
116,165✔
1699
  }
1700

1701
  if(tsem2_destroy(&tmq->rspSem) != 0) {
116,165✔
1702
    tqErrorC("failed to destroy sem in free tmq");
×
1703
  }
1704

1705
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
116,165✔
1706
  taos_close_internal(tmq->pTscObj);
116,165✔
1707

1708
  if (tmq->commitTimer) {
116,165✔
1709
    if (!taosTmrStopA(&tmq->commitTimer)) {
64,239✔
1710
      tqErrorC("failed to stop commit timer");
39,471✔
1711
    }
1712
  }
1713
  if (tmq->epTimer) {
116,165✔
1714
    if (!taosTmrStopA(&tmq->epTimer)) {
113,774✔
1715
      tqErrorC("failed to stop ep timer");
106,222✔
1716
    }
1717
  }
1718
  if (tmq->hbLiveTimer) {
116,165✔
1719
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
115,843✔
1720
      tqErrorC("failed to stop hb timer");
×
1721
    }
1722
  }
1723
  taosMemoryFree(tmq);
116,165✔
1724

1725
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
116,165✔
1726
}
1727

1728
static void tmqMgmtInit(void) {
85,682✔
1729
  tmqInitRes = 0;
85,682✔
1730

1731
  if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
85,682✔
1732
    goto END;
×
1733
  }
1734

1735
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
85,682✔
1736

1737
  if (tmqMgmt.timer == NULL) {
85,682✔
1738
    goto END;
×
1739
  }
1740

1741
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
85,682✔
1742
  if (tmqMgmt.rsetId < 0) {
85,682✔
1743
    goto END;
×
1744
  }
1745

1746
  return;
85,682✔
1747
END:
×
1748
  tmqInitRes = terrno;
×
1749
}
1750

1751
void tmqMgmtClose(void) {
1,449,055✔
1752
  if (tmqMgmt.timer) {
1,449,055✔
1753
    taosTmrCleanUp(tmqMgmt.timer);
85,682✔
1754
    tmqMgmt.timer = NULL;
85,682✔
1755
  }
1756

1757
  if (tmqMgmt.rsetId > 0) {
1,449,055✔
1758
    (void) taosThreadMutexLock(&tmqMgmt.lock);
85,682✔
1759
    tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
85,682✔
1760
    int64_t  refId = 0;
85,682✔
1761

1762
    while (tmq) {
88,092✔
1763
      refId = tmq->refId;
2,410✔
1764
      if (refId == 0) {
2,410✔
1765
        break;
×
1766
      }
1767
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
2,410✔
1768
      tmq = taosIterateRef(tmqMgmt.rsetId, refId);
2,410✔
1769
    }
1770
    taosCloseRef(tmqMgmt.rsetId);
85,682✔
1771
    tmqMgmt.rsetId = -1;
85,682✔
1772
    (void)taosThreadMutexUnlock(&tmqMgmt.lock);
85,682✔
1773
  }
1774
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
1,449,055✔
1775
}
1,449,055✔
1776

1777
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
118,187✔
1778
  int32_t code = 0;
118,187✔
1779

1780
  if (conf == NULL) {
118,187✔
1781
    SET_ERROR_MSG_TMQ("configure is null")
×
1782
    return NULL;
×
1783
  }
1784
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
118,187✔
1785
  if (code != 0) {
118,575✔
1786
    tqErrorC("failed to tmqInit, code:%s", tstrerror(code));
×
1787
    SET_ERROR_MSG_TMQ("tmq init error")
×
1788
    return NULL;
×
1789
  }
1790
  if (tmqInitRes != 0) {
118,575✔
1791
    SET_ERROR_MSG_TMQ("tmqInitRes init error")
×
1792
    return NULL;
×
1793
  }
1794

1795
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
118,575✔
1796
  if (pTmq == NULL) {
118,575✔
1797
    tqErrorC("failed to create consumer, code:%s", terrstr());
×
1798
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1799
    return NULL;
×
1800
  }
1801

1802
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
118,575✔
1803
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
118,575✔
1804

1805
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
118,575✔
1806
  if (pTmq->clientTopics == NULL) {
118,575✔
1807
    tqErrorC("failed to init topic array, since:%s", terrstr());
×
1808
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1809
    goto _failed;
×
1810
  }
1811
  code = taosOpenQueue(&pTmq->mqueue);
118,575✔
1812
  if (code) {
118,575✔
1813
    tqErrorC("open mqueue failed since %s", tstrerror(code));
×
1814
    SET_ERROR_MSG_TMQ("open mqueue failed")
×
1815
    goto _failed;
×
1816
  }
1817

1818
  code = taosOpenQueue(&pTmq->delayedTask);
118,575✔
1819
  if (code) {
118,575✔
1820
    tqErrorC("open delayed task queue failed since %s", tstrerror(code));
×
1821
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1822
    goto _failed;
×
1823
  }
1824

1825
  if (conf->groupId[0] == 0) {
118,575✔
1826
    SET_ERROR_MSG_TMQ("group is empty")
×
1827
    goto _failed;
×
1828
  }
1829

1830
  // init status
1831
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
118,575✔
1832
  pTmq->pollCnt = 0;
118,575✔
1833
  pTmq->epoch = 0;
118,575✔
1834
  pTmq->pollFlag = 0;
118,575✔
1835

1836
  // set conf
1837
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
118,575✔
1838
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
118,575✔
1839
  pTmq->withTbName = conf->withTbName;
118,575✔
1840
  pTmq->useSnapshot = conf->snapEnable;
118,575✔
1841
  pTmq->enableWalMarker = conf->enableWalMarker;
118,575✔
1842
  pTmq->autoCommit = conf->autoCommit;
118,575✔
1843
  pTmq->autoCommitInterval = conf->autoCommitInterval;
118,575✔
1844
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
118,575✔
1845
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
118,575✔
1846
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
118,575✔
1847
  pTmq->commitCb = conf->commitCb;
118,575✔
1848
  pTmq->commitCbUserParam = conf->commitCbUserParam;
118,575✔
1849
  pTmq->resetOffsetCfg = conf->resetOffset;
118,575✔
1850
  pTmq->replayEnable = conf->replayEnable;
118,575✔
1851
  pTmq->sourceExcluded = conf->sourceExcluded;
118,575✔
1852
  pTmq->rawData = conf->rawData;
118,575✔
1853
  pTmq->maxPollWaitTime = conf->maxPollWaitTime;
118,575✔
1854
  pTmq->minPollRows = conf->minPollRows;
118,575✔
1855
  pTmq->enableBatchMeta = conf->enableBatchMeta;
118,575✔
1856
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
118,575✔
1857
  if (taosGetFqdn(pTmq->fqdn) != 0) {
118,575✔
1858
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1859
  }
1860
  if (conf->replayEnable) {
118,575✔
1861
    pTmq->autoCommit = false;
2,044✔
1862
  }
1863
  taosInitRWLatch(&pTmq->lock);
118,575✔
1864

1865
  // assign consumerId
1866
  pTmq->consumerId = tGenIdPI64();
118,575✔
1867

1868
  // init semaphore
1869
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
118,575✔
1870
    tqErrorC("consumer:0x %" PRIx64 " init semaphore failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1871
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1872
    goto _failed;
×
1873
  }
1874

1875
  if (conf->token != NULL) {
118,117✔
1876
    code = taos_connect_by_auth(conf->ip, NULL, conf->token, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
644✔
1877
    if (code) {
644✔
1878
      tqErrorC("consumer:0x%" PRIx64 " connect by token failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
322✔
1879
      SET_ERROR_MSG_TMQ(terrstr())
322✔
1880
      goto _failed;
322✔
1881
    }
1882
  } else {
1883
    // init connection
1884
    code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
117,473✔
1885
    if (code) {
117,931✔
1886
      tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1887
      SET_ERROR_MSG_TMQ(terrstr())
×
1888
      goto _failed;
×
1889
    }
1890
  }
1891
  
1892
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
118,253✔
1893
  if (pTmq->refId < 0) {
118,253✔
1894
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1895
    goto _failed;
×
1896
  }
1897

1898
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
118,253✔
1899
  if (pTmq->hbLiveTimer == NULL) {
118,253✔
1900
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1901
    goto _failed;
×
1902
  }
1903
  char         buf[TSDB_OFFSET_LEN] = {0};
118,253✔
1904
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
118,253✔
1905
  tFormatOffset(buf, tListLen(buf), &offset);
118,253✔
1906
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
118,253✔
1907
              ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, maxPollIntervalMs:%dms, sessionTimeoutMs:%dms",
1908
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1909
          buf, pTmq->maxPollIntervalMs, pTmq->sessionTimeoutMs);
1910

1911
  return pTmq;
118,253✔
1912

1913
_failed:
322✔
1914
  tmqFreeImpl(pTmq);
322✔
1915
  return NULL;
322✔
1916
}
1917

1918
static int32_t syncAskEp(tmq_t* pTmq) {
973,626✔
1919
  if (pTmq == NULL) return TSDB_CODE_INVALID_PARA;
973,626✔
1920
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
973,626✔
1921
  if (pInfo == NULL) return terrno;
973,626✔
1922
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
973,626✔
1923
    taosMemoryFree(pInfo);
×
1924
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1925
  }
1926

1927
  int32_t code = askEp(pTmq, pInfo, true, false);
973,626✔
1928
  if (code == 0) {
973,626✔
1929
    if (tsem2_wait(&pInfo->sem) != 0){
973,626✔
1930
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1931
    }
1932
    code = pInfo->code;
973,626✔
1933
  }
1934

1935
  if(tsem2_destroy(&pInfo->sem) != 0) {
973,626✔
1936
    tqErrorC("failed to destroy sem sync ask ep");
×
1937
  }
1938
  taosMemoryFree(pInfo);
973,626✔
1939
  return code;
973,626✔
1940
}
1941

1942
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
337,480✔
1943
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
337,480✔
1944
  const SArray*   container = &topic_list->container;
337,480✔
1945
  int32_t         sz = taosArrayGetSize(container);
337,480✔
1946
  void*           buf = NULL;
337,480✔
1947
  SMsgSendInfo*   sendInfo = NULL;
337,480✔
1948
  SCMSubscribeReq req = {0};
337,480✔
1949
  int32_t         code = 0;
337,147✔
1950

1951
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
337,147✔
1952

1953
  req.consumerId = tmq->consumerId;
337,480✔
1954
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
337,480✔
1955
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
337,480✔
1956
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
337,480✔
1957
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
337,480✔
1958

1959
  req.topicNames = taosArrayInit(sz, sizeof(void*));
337,480✔
1960
  if (req.topicNames == NULL) {
337,480✔
1961
    code = terrno;
×
1962
    goto END;
×
1963
  }
1964

1965
  req.withTbName = tmq->withTbName;
337,480✔
1966
  req.autoCommit = tmq->autoCommit;
337,480✔
1967
  req.autoCommitInterval = tmq->autoCommitInterval;
337,480✔
1968
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
337,480✔
1969
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
337,480✔
1970
  req.resetOffsetCfg = tmq->resetOffsetCfg;
337,480✔
1971
  req.enableReplay = tmq->replayEnable;
337,480✔
1972
  req.enableBatchMeta = tmq->enableBatchMeta;
337,480✔
1973

1974
  for (int32_t i = 0; i < sz; i++) {
465,989✔
1975
    char* topic = taosArrayGetP(container, i);
128,509✔
1976
    if (topic == NULL) {
128,509✔
1977
      code = terrno;
×
1978
      goto END;
×
1979
    }
1980
    SName name = {0};
128,509✔
1981
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
128,509✔
1982
    if (code) {
128,509✔
1983
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1984
               code);
1985
      goto END;
×
1986
    }
1987
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
128,509✔
1988
    if (topicFName == NULL) {
128,509✔
1989
      code = terrno;
×
1990
      goto END;
×
1991
    }
1992

1993
    code = tNameExtractFullName(&name, topicFName);
128,509✔
1994
    if (code) {
128,509✔
1995
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1996
               code);
1997
      taosMemoryFree(topicFName);
×
1998
      goto END;
×
1999
    }
2000

2001
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
257,018✔
2002
      code = terrno;
×
2003
      taosMemoryFree(topicFName);
×
2004
      goto END;
×
2005
    }
2006
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
128,509✔
2007
  }
2008

2009
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
337,480✔
2010
  buf = taosMemoryMalloc(tlen);
337,480✔
2011
  if (buf == NULL) {
337,142✔
2012
    code = terrno;
×
2013
    goto END;
×
2014
  }
2015

2016
  void* abuf = buf;
337,142✔
2017
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
337,480✔
2018

2019
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
337,480✔
2020
  if (sendInfo == NULL) {
337,480✔
2021
    code = terrno;
×
2022
    taosMemoryFree(buf);
×
2023
    goto END;
×
2024
  }
2025

2026
  SMqSubscribeCbParam param = {.rspErr = 0};
337,480✔
2027
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
337,480✔
2028
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2029
    taosMemoryFree(buf);
×
2030
    taosMemoryFree(sendInfo);
×
2031
    goto END;
×
2032
  }
2033

2034
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
337,142✔
2035
  sendInfo->requestId = generateRequestId();
337,147✔
2036
  sendInfo->requestObjRefId = 0;
337,480✔
2037
  sendInfo->param = &param;
337,480✔
2038
  sendInfo->fp = tmqSubscribeCb;
337,480✔
2039
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
337,480✔
2040

2041
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
336,809✔
2042

2043
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
337,480✔
2044
  if (code != 0) {
337,480✔
2045
    goto END;
×
2046
  }
2047

2048
  if (tsem2_wait(&param.rspSem) != 0){
337,480✔
2049
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
2050
  }
2051
  if(tsem2_destroy(&param.rspSem) != 0) {
337,480✔
2052
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
2053
  }
2054

2055
  if (param.rspErr != 0) {
337,480✔
2056
    code = param.rspErr;
2,876✔
2057
    goto END;
2,876✔
2058
  }
2059

2060
  int32_t retryCnt = 0;
334,604✔
2061
  while ((code = syncAskEp(tmq)) != 0) {
972,431✔
2062
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
639,687✔
2063
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes or code:%s",
1,888✔
2064
               tmq->consumerId, tstrerror(code));
2065
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
1,860✔
2066
        code = 0;
1,860✔
2067
      }
2068
      goto END;
1,860✔
2069
    }
2070

2071
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
637,799✔
2072
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
637,827✔
2073
  }
2074

2075
  if (tmq->epTimer == NULL){
332,744✔
2076
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
115,146✔
2077
    if (tmq->epTimer == NULL) {
115,146✔
2078
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2079
      goto END;
×
2080
    }
2081
  }
2082
  if (tmq->autoCommit && tmq->commitTimer == NULL){
332,744✔
2083
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
64,892✔
2084
    if (tmq->commitTimer == NULL) {
64,892✔
2085
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2086
      goto END;
×
2087
    }
2088
  }
2089

2090
  END:
337,438✔
2091
  taosArrayDestroyP(req.topicNames, NULL);
337,480✔
2092
  return code;
337,480✔
2093
}
2094

2095
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
85,744✔
2096
  if (conf == NULL) return;
85,744✔
2097
  conf->commitCb = cb;
85,744✔
2098
  conf->commitCbUserParam = param;
86,082✔
2099
}
2100

2101
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
14,928,497✔
2102
  if (tmq == NULL || topicName == NULL || pVg == NULL) {
14,928,497✔
2103
    return;
×
2104
  }
2105
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
14,928,497✔
2106
  for (int i = 0; i < topicNumCur; i++) {
15,257,694✔
2107
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
15,256,840✔
2108
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
15,256,477✔
2109
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
14,927,827✔
2110
      for (int32_t j = 0; j < vgNumCur; j++) {
36,549,825✔
2111
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
36,549,278✔
2112
        if (pVgCur && pVgCur->vgId == vgId) {
36,549,278✔
2113
          *pVg = pVgCur;
14,927,643✔
2114
          return;
14,927,643✔
2115
        }
2116
      }
2117
    }
2118
  }
2119
}
2120

2121
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
14,065,042✔
2122
  if (tmq == NULL || topicName == NULL) {
14,065,042✔
2123
    return NULL;
×
2124
  }
2125
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
14,065,042✔
2126
  for (int i = 0; i < topicNumCur; i++) {
14,393,362✔
2127
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
14,393,362✔
2128
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
14,393,362✔
2129
      return pTopicCur;
14,065,042✔
2130
    }
2131
  }
2132
  return NULL;
×
2133
}
2134

2135
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
15,114,025✔
2136
  tmq_t*             tmq = NULL;
15,114,025✔
2137
  SMqRspWrapper*     pRspWrapper = NULL;
15,114,025✔
2138
  int8_t             rspType = 0;
15,114,025✔
2139
  int32_t            vgId = 0;
15,114,025✔
2140
  uint64_t           requestId = 0;
15,114,025✔
2141
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
15,114,025✔
2142
  if (pMsg == NULL) {
15,114,025✔
2143
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2144
  }
2145
  if (pParam == NULL) {
15,114,025✔
2146
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2147
    goto EXIT;
×
2148
  }
2149
  int64_t refId = pParam->refId;
15,114,025✔
2150
  vgId = pParam->vgId;
15,114,025✔
2151
  requestId = pParam->requestId;
15,114,025✔
2152
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
15,114,025✔
2153
  if (tmq == NULL) {
15,113,980✔
2154
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
338✔
2155
    goto EXIT;
338✔
2156
  }
2157

2158
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
15,113,642✔
2159
  if (ret) {
15,113,581✔
2160
    code = ret;
1✔
2161
    tqWarnC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
1✔
2162
    goto END;
×
2163
  }
2164

2165
  if (code != 0) {
15,113,580✔
2166
    goto END;
880,152✔
2167
  }
2168

2169
  if (pMsg->pData == NULL) {
14,233,428✔
2170
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
2171
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2172
    goto END;
×
2173
  }
2174

2175
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
14,233,142✔
2176
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
14,233,065✔
2177

2178
  if (msgEpoch != clientEpoch) {
14,232,381✔
2179
    tqWarnC("consumer:0x%" PRIx64" msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
642✔
2180
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
2181
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
642✔
2182
    goto END;
642✔
2183
  }
2184
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
14,231,739✔
2185
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s), QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
14,232,102✔
2186

2187
  pRspWrapper->tmqRspType = rspType;
14,232,874✔
2188
  pRspWrapper->pollRsp.reqId = requestId;
14,232,874✔
2189
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
14,232,874✔
2190
  pRspWrapper->pollRsp.data = pMsg->pData;
14,232,874✔
2191
  pRspWrapper->pollRsp.len = pMsg->len;
14,232,874✔
2192
  pMsg->pData = NULL;
14,232,521✔
2193
  pMsg->pEpSet = NULL;
14,232,874✔
2194

2195
  END:
15,113,668✔
2196
  if (pRspWrapper) {
15,113,668✔
2197
    pRspWrapper->code = code;
15,113,675✔
2198
    pRspWrapper->pollRsp.vgId = vgId;
15,113,675✔
2199
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
15,113,312✔
2200
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
15,113,675✔
2201
    if (code != 0) {
15,113,688✔
2202
      tmqFreeRspWrapper(pRspWrapper);
×
2203
      taosFreeQitem(pRspWrapper);
×
2204
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2205
    } else {
2206
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d, QID:0x%" PRIx64,
15,113,688✔
2207
               tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
2208
    }
2209
  }
2210

2211
  if (tsem2_post(&tmq->rspSem) != 0){
15,113,639✔
2212
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2213
  }
2214
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
15,113,688✔
2215
  if (ret != 0){
15,113,687✔
2216
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2217
  }
2218

2219
  EXIT:
15,114,025✔
2220
  taosMemoryFreeClear(pMsg->pData);
15,114,013✔
2221
  taosMemoryFreeClear(pMsg->pEpSet);
15,114,013✔
2222
  return code;
15,113,287✔
2223
}
2224

2225
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
15,114,787✔
2226
  if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) {
15,114,787✔
2227
    return;
338✔
2228
  }
2229
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
15,114,449✔
2230
  pReq->withTbName = tmq->withTbName;
15,114,787✔
2231
  pReq->consumerId = tmq->consumerId;
15,114,787✔
2232
  pReq->timeout = tmq->maxPollWaitTime;
15,114,787✔
2233
  pReq->minPollRows = tmq->minPollRows;
15,114,787✔
2234
  pReq->epoch = atomic_load_32(&tmq->epoch);
15,114,759✔
2235
  pReq->reqOffset = pVg->offsetInfo.endOffset;
15,114,787✔
2236
  pReq->head.vgId = pVg->vgId;
15,114,787✔
2237
  pReq->useSnapshot = tmq->useSnapshot;
15,114,787✔
2238
  pReq->reqId = generateRequestId();
15,114,787✔
2239
  pReq->enableReplay = tmq->replayEnable;
15,114,787✔
2240
  pReq->sourceExcluded = tmq->sourceExcluded;
15,114,787✔
2241
  pReq->rawData = tmq->rawData;
15,114,787✔
2242
  pReq->enableBatchMeta = tmq->enableBatchMeta;
15,114,759✔
2243
}
2244

2245
void changeByteEndian(char* pData) {
46,301,691✔
2246
  if (pData == NULL) {
46,301,691✔
2247
    return;
×
2248
  }
2249
  char* p = pData;
46,301,691✔
2250

2251
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2252
  // length | version:
2253
  int32_t blockVersion = *(int32_t*)p;
46,301,691✔
2254
  if (blockVersion != BLOCK_VERSION_1) {
46,301,691✔
2255
    tqErrorC("invalid block version:%d", blockVersion);
×
2256
    return;
×
2257
  }
2258
  *(int32_t*)p = BLOCK_VERSION_2;
46,301,691✔
2259

2260
  p += sizeof(int32_t);
46,301,691✔
2261
  p += sizeof(int32_t);
46,301,691✔
2262
  p += sizeof(int32_t);
46,301,691✔
2263
  int32_t cols = *(int32_t*)p;
46,301,691✔
2264
  p += sizeof(int32_t);
46,301,691✔
2265
  p += sizeof(int32_t);
46,301,691✔
2266
  p += sizeof(uint64_t);
46,301,691✔
2267
  // check fields
2268
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
46,301,691✔
2269

2270
  int32_t* colLength = (int32_t*)p;
46,301,691✔
2271
  for (int32_t i = 0; i < cols; ++i) {
264,595,972✔
2272
    colLength[i] = htonl(colLength[i]);
218,294,281✔
2273
  }
2274
}
2275

2276
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
86,909,452✔
2277
  if (pRetrieve == NULL || rawData == NULL || rows == NULL) {
86,909,452✔
2278
    return;
×
2279
  }
2280
  if (*(int64_t*)pRetrieve == 0) {
86,909,452✔
2281
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2282
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2283
    if (precision != NULL) {
×
2284
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2285
    }
2286
  } else if (*(int64_t*)pRetrieve == 1) {
86,909,452✔
2287
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
86,909,815✔
2288
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
86,909,815✔
2289
    if (precision != NULL) {
86,909,815✔
2290
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
40,608,124✔
2291
    }
2292
  }
2293
}
2294

2295
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
3,765,852✔
2296
                                        SMqRspObj* pRspObj) {
2297
  if (pWrapper == NULL || pVg == NULL || numOfRows == NULL || pRspObj == NULL) {
3,765,852✔
2298
    return;
×
2299
  }
2300
  pRspObj->resIter = -1;
3,765,852✔
2301
  pRspObj->resInfo.totalRows = 0;
3,765,852✔
2302
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
3,765,852✔
2303

2304
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
3,765,852✔
2305
  // extract the rows in this data packet
2306
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
50,067,543✔
2307
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
46,301,691✔
2308
    void*   rawData = NULL;
46,301,691✔
2309
    int64_t rows = 0;
46,301,691✔
2310
    // deal with compatibility
2311
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
46,301,691✔
2312

2313
    pVg->numOfRows += rows;
46,301,691✔
2314
    (*numOfRows) += rows;
46,301,691✔
2315
    changeByteEndian(rawData);
46,301,691✔
2316
  }
2317
}
2318

2319
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
15,114,026✔
2320
  SMqPollReq      req = {0};
15,114,026✔
2321
  char*           msg = NULL;
15,114,026✔
2322
  SMqPollCbParam* pParam = NULL;
15,114,026✔
2323
  SMsgSendInfo*   sendInfo = NULL;
15,114,026✔
2324
  int             code = 0;
15,114,026✔
2325
  int             lino = 0;
15,114,026✔
2326
  tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg);
15,114,026✔
2327

2328
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
15,114,026✔
2329
  TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
15,111,544✔
2330

2331
  msg = taosMemoryCalloc(1, msgSize);
15,111,544✔
2332
  TSDB_CHECK_NULL(msg, code, lino, END, terrno);
15,112,644✔
2333

2334
  TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
15,112,644✔
2335

2336
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
15,113,688✔
2337
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
15,112,954✔
2338

2339
  pParam->refId = pTmq->refId;
15,112,954✔
2340
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
15,113,307✔
2341
  pParam->vgId = pVg->vgId;
15,113,688✔
2342
  pParam->requestId = req.reqId;
15,113,688✔
2343

2344
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
15,111,895✔
2345
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
15,113,264✔
2346

2347
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
15,113,264✔
2348
  sendInfo->requestId = req.reqId;
15,113,998✔
2349
  sendInfo->requestObjRefId = 0;
15,113,264✔
2350
  sendInfo->param = pParam;
15,112,586✔
2351
  sendInfo->paramFreeFp = taosAutoMemoryFree;
15,112,248✔
2352
  sendInfo->fp = tmqPollCb;
15,114,026✔
2353
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
15,114,026✔
2354

2355
  msg = NULL;
15,112,558✔
2356
  pParam = NULL;
15,112,558✔
2357

2358
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
15,112,558✔
2359
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
15,112,558✔
2360
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
15,112,261✔
2361
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, QID:0x%" PRIx64, pTmq->consumerId,
15,114,026✔
2362
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2363
  TSDB_CHECK_CODE(code, lino, END);
15,114,026✔
2364

2365
  pVg->pollCnt++;
15,114,026✔
2366
  pVg->seekUpdated = false;  // reset this flag.
15,114,026✔
2367
  pTmq->pollCnt++;
15,114,026✔
2368

2369
END:
15,114,026✔
2370
  if (code != 0){
15,114,026✔
2371
    tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
×
2372
  }
2373
  taosMemoryFreeClear(pParam);
15,114,026✔
2374
  taosMemoryFreeClear(msg);
15,114,026✔
2375
  return code;
15,114,026✔
2376
}
2377

2378
static int32_t tmqPollImpl(tmq_t* tmq) {
19,843,769✔
2379
  if (tmq == NULL) {
19,843,769✔
2380
    return TSDB_CODE_INVALID_MSG;
×
2381
  }
2382
  taosWLockLatch(&tmq->lock);
19,843,769✔
2383

2384
  int32_t code = atomic_load_32(&tmq->tokenCode);
19,843,769✔
2385
  if (code == TSDB_CODE_MND_TOKEN_NOT_EXIST || code == TSDB_CODE_MND_TOKEN_DISABLED || code == TSDB_CODE_MND_TOKEN_EXPIRED){
19,843,416✔
2386
    goto end;
9,982✔
2387
  } else {
2388
    code = 0;
19,833,434✔
2389
  }
2390

2391
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
19,833,434✔
2392
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
6,873✔
2393
    goto end;
6,873✔
2394
  }
2395

2396
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
19,826,561✔
2397
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
19,826,561✔
2398

2399
  for (int i = 0; i < numOfTopics; i++) {
40,392,006✔
2400
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
20,565,092✔
2401
    if (pTopic == NULL) {
20,565,092✔
2402
      continue;
×
2403
    }
2404
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
20,565,092✔
2405
    if (pTopic->noPrivilege) {
20,565,092✔
2406
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
616✔
2407
      continue;
616✔
2408
    }
2409
    for (int j = 0; j < numOfVg; j++) {
89,183,998✔
2410
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
68,619,522✔
2411
      if (pVg == NULL) {
68,619,522✔
2412
        continue;
×
2413
      }
2414

2415
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
68,619,522✔
2416
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than EMPTY_BLOCK_POLL_IDLE_DURATION
68,619,522✔
2417
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll",
39,258,941✔
2418
                 tmq->consumerId, tmq->epoch, pVg->vgId, elapsed);
2419
        continue;
39,258,941✔
2420
      }
2421

2422
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
29,360,581✔
2423
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
29,360,581✔
2424
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
9,052✔
2425
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2426
        continue;
9,052✔
2427
      }
2428

2429
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
29,351,529✔
2430
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
29,351,501✔
2431
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
14,237,503✔
2432
        if (vgSkipCnt % 10000 == 0) {
14,237,503✔
2433
          tqInfoC("consumer:0x%" PRIx64 " epoch %d, vgId:%d has skipped poll %d times in a row", tmq->consumerId,
×
2434
                  tmq->epoch, pVg->vgId, vgSkipCnt);
2435
        }
2436
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
14,237,503✔
2437
                 pVg->vgId, vgSkipCnt);
2438
        continue;
14,237,503✔
2439
      }
2440

2441
      atomic_store_32(&pVg->vgSkipCnt, 0);
15,113,998✔
2442
      code = doTmqPollImpl(tmq, pTopic, pVg);
15,114,026✔
2443
      if (code != TSDB_CODE_SUCCESS) {
15,114,026✔
2444
        goto end;
×
2445
      }
2446
    }
2447
  }
2448

2449
  end:
19,826,914✔
2450
  taosWUnLockLatch(&tmq->lock);
19,843,769✔
2451
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
19,843,769✔
2452
  return code;
19,843,769✔
2453
}
2454

2455
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
14,065,042✔
2456
                         int64_t consumerId, bool hasData) {
2457
  if (pVg == NULL || reqOffset == NULL || rspOffset == NULL) {
14,065,042✔
2458
    return;
×
2459
  }
2460
  if (!pVg->seekUpdated) {
14,065,042✔
2461
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
14,064,031✔
2462
    if (hasData) {
14,064,394✔
2463
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
3,791,348✔
2464
    }
2465
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
14,064,394✔
2466
  } else {
2467
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
1,011✔
2468
  }
2469

2470
  // update the status
2471
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
14,065,042✔
2472

2473
  // update the valid wal version range
2474
  pVg->offsetInfo.walVerBegin = sver;
14,065,042✔
2475
  pVg->offsetInfo.walVerEnd = ever + 1;
14,065,042✔
2476
}
2477

2478
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
3,791,348✔
2479
  typedef union {
2480
    SMqDataRsp      dataRsp;
2481
    SMqMetaRsp      metaRsp;
2482
    SMqBatchMetaRsp batchMetaRsp;
2483
  } MEMSIZE;
2484

2485
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
3,791,348✔
2486
  if (pRspObj == NULL) {
3,791,348✔
2487
    tqErrorC("buildRsp:failed to allocate memory");
×
2488
    return NULL;
×
2489
  }
2490
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
3,791,348✔
2491
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
3,791,348✔
2492
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
3,791,348✔
2493
  pRspObj->vgId = pollRspWrapper->vgId;
3,791,348✔
2494
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
3,791,348✔
2495
  return pRspObj;
3,791,348✔
2496
}
2497

2498
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
863,455✔
2499
  int32_t code = pRspWrapper->code;
863,455✔
2500
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
863,455✔
2501

2502
  tqErrorC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
863,455✔
2503
    tstrerror(pRspWrapper->code));
2504
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||   // for vnode transform
863,455✔
2505
      pRspWrapper->code == TSDB_CODE_SYN_NOT_LEADER) {          // for vnode split
864,963✔
2506
    int32_t ret = askEp(tmq, NULL, false, true);
685,677✔
2507
    if (ret != 0) {
685,677✔
2508
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when vnode transform, ret:%s", tmq->consumerId, tstrerror(ret));
×
2509
    }
2510
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
177,778✔
2511
    code = syncAskEp(tmq);
1,195✔
2512
    if (code != 0) {
1,195✔
2513
      tqWarnC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
×
2514
    }
2515
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
176,583✔
2516
    code = 0;
173,597✔
2517
  }
2518
  
2519
  taosWLockLatch(&tmq->lock);
863,455✔
2520
  SMqClientVg* pVg = NULL;
863,455✔
2521
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
863,455✔
2522
  if (pVg) {
863,455✔
2523
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
1,724,345✔
2524
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
862,601✔
2525
  }
2526
  taosWUnLockLatch(&tmq->lock);
863,455✔
2527

2528
  return code;
863,455✔
2529
}
2530

2531
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
14,065,042✔
2532
  int32_t code = 0;
14,065,042✔
2533
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
14,065,042✔
2534
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
14,037,467✔
2535
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
14,037,129✔
2536
    pRspWrapper->pollRsp.data = NULL;
14,037,467✔
2537
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
27,575✔
2538
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
22,188✔
2539
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
5,387✔
2540
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
2,079✔
2541
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
2,079✔
2542
    pRspWrapper->pollRsp.data = NULL;
2,079✔
2543
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
3,308✔
2544
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
3,308✔
2545
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
2546
    PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
×
2547
    pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
×
2548
    pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
×
2549
    pRspWrapper->pollRsp.data = NULL;
×
2550
  } else {
2551
    tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
×
2552
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2553
    goto END;
×
2554
  }
2555
  END:
14,065,042✔
2556
  return code;
14,065,042✔
2557
}
2558

2559
static int32_t processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper, SMqRspObj** pRspObj){
16,389,433✔
2560
  int32_t    code = 0;
16,389,433✔
2561

2562
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
16,389,433✔
2563
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
2,324,391✔
2564
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
2,324,391✔
2565
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
2,324,391✔
2566
    goto END;
2,324,391✔
2567
  }
2568

2569
  code = processWrapperData(pRspWrapper);
14,065,042✔
2570
  if (code != 0) {
14,065,042✔
2571
    goto END;
×
2572
  }
2573
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
14,065,042✔
2574
  taosWLockLatch(&tmq->lock);
14,065,042✔
2575
  SMqClientVg* pVg = NULL;
14,065,042✔
2576
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
14,065,042✔
2577
  if(pVg == NULL) {
14,065,042✔
2578
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
×
2579
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2580
    code = TSDB_CODE_TMQ_INVALID_VGID;
×
2581
    goto END;
×
2582
  }
2583
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
14,065,042✔
2584
  if (pollRspWrapper->pEpset != NULL) {
14,065,042✔
2585
    pVg->epSet = *pollRspWrapper->pEpset;
8,909✔
2586
  }
2587

2588
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
14,065,042✔
2589
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
27,575✔
2590
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
14,065,014✔
2591
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
14,039,183✔
2592
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
14,039,546✔
2593
    
2594
    if (pollRspWrapper->dataRsp.timeout) {
14,039,546✔
2595
      tqInfoC("consumer:0x%" PRIx64 " poll data timeout, vgId:%d", tmq->consumerId, pVg->vgId);
×
2596
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
×
2597
      goto END;
×
2598
    }
2599
    char buf[TSDB_OFFSET_LEN] = {0};
14,039,518✔
2600
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
14,039,518✔
2601
    if (pollRspWrapper->dataRsp.blockNum == 0) {
14,039,518✔
2602
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
20,437,223✔
2603
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
10,273,666✔
2604
                   ", total:%" PRId64 ", QID:0x%" PRIx64,
2605
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2606
    } else {
2607
      *pRspObj = buildRsp(pollRspWrapper);
3,765,852✔
2608
      if (*pRspObj == NULL) {
3,765,852✔
2609
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2610
        code = terrno;
×
2611
        goto END;
×
2612
      }
2613
      (*pRspObj)->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
7,531,704✔
2614
                         (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
3,765,852✔
2615
      int64_t numOfRows = 0;
3,765,852✔
2616
      if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
3,765,852✔
2617
        tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, *pRspObj);
3,765,852✔
2618
        tmq->totalRows += numOfRows;
3,765,852✔
2619
      }
2620
      if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
3,765,852✔
2621
        pVg->blockReceiveTs = taosGetTimestampMs();
9,928✔
2622
        pVg->blockSleepForReplay = (*pRspObj)->dataRsp.sleepTime;
4,964✔
2623
        if (pVg->blockSleepForReplay > 0) {
4,964✔
2624
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
3,796✔
2625
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2626
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2627
          }
2628
        }
2629
      }
2630
      pVg->emptyBlockReceiveTs = 0;
3,765,852✔
2631
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
3,765,852✔
2632
                   ", vg total:%" PRId64 ", total:%" PRId64 ", QID:0x%" PRIx64,
2633
               tmq->consumerId, pVg->vgId, buf, (*pRspObj)->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2634
               pollRspWrapper->reqId);
2635
    }
2636
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
25,496✔
2637
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
25,496✔
2638
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2639

2640
    *pRspObj = buildRsp(pollRspWrapper);
25,496✔
2641
    if (*pRspObj == NULL) {
25,496✔
2642
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2643
      code = terrno;
×
2644
      goto END;
×
2645
    }
2646
    (*pRspObj)->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
25,496✔
2647
  }
2648

2649
END:
16,236,116✔
2650
  taosWUnLockLatch(&tmq->lock);
16,389,405✔
2651
  return code;
16,389,433✔
2652
}
2653

2654
static int32_t tmqHandleAllRsp(tmq_t* tmq, SMqRspObj** rspObj) {
24,323,780✔
2655
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
24,323,780✔
2656

2657
  int32_t code = 0;
24,323,780✔
2658
  while (1) {
12,772,877✔
2659
    SMqRspWrapper* pRspWrapper = NULL;
37,096,657✔
2660
    taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
37,096,294✔
2661
    if (pRspWrapper == NULL) {break;}
37,122,152✔
2662

2663
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
17,252,888✔
2664
    if (pRspWrapper->code != 0) {
17,252,888✔
2665
      code = processMqRspError(tmq, pRspWrapper);
863,455✔
2666
    }else{
2667
      code = processMqRsp(tmq, pRspWrapper, rspObj);
16,389,433✔
2668
    }
2669

2670
    tmqFreeRspWrapper(pRspWrapper);
17,252,888✔
2671
    taosFreeQitem(pRspWrapper);
17,252,888✔
2672
    if(*rspObj != NULL || code != 0){
17,252,860✔
2673
      break;
2674
    }
2675
  }
2676

2677
END:
24,323,780✔
2678
  return code;
24,323,780✔
2679
}
2680

2681
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
4,560,535✔
2682
  int32_t lino = 0;
4,560,535✔
2683
  int32_t code = 0;
4,560,535✔
2684
  terrno = 0;
4,560,535✔
2685
  TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
4,560,535✔
2686

2687
  int64_t startTime = taosGetTimestampMs();
4,560,467✔
2688

2689
  tqDebugC("%s consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, __func__, tmq->consumerId, startTime, timeout);
4,560,467✔
2690
  TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
4,560,467✔
2691

2692
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
4,560,467✔
2693

2694
  while (1) {
19,763,285✔
2695
    code = tmqHandleAllDelayedTask(tmq);
24,323,752✔
2696
    TSDB_CHECK_CODE(code, lino, END);
24,328,397✔
2697

2698
    SMqRspObj*   rspObj = NULL;
24,323,780✔
2699
    code = tmqHandleAllRsp(tmq, &rspObj);
24,323,780✔
2700
    if (rspObj) {
24,323,780✔
2701
      tqDebugC("%s consumer:0x%" PRIx64 " end to poll, return rsp:%p", __func__, tmq->consumerId, rspObj);
3,791,348✔
2702
      return (TAOS_RES*)rspObj;
3,791,348✔
2703
    }
2704
    TSDB_CHECK_CODE(code, lino, END);
20,532,432✔
2705

2706
    code = tmqPollImpl(tmq);
19,843,769✔
2707
    TSDB_CHECK_CODE(code, lino, END);
19,843,769✔
2708

2709
    if (timeout >= 0) {
19,826,914✔
2710
      int64_t currentTime = taosGetTimestampMs();
19,826,914✔
2711
      int64_t elapsedTime = currentTime - startTime;
19,826,914✔
2712
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
19,826,914✔
2713
      TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
19,826,886✔
2714
    } else {
2715
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2716
    }
2717
  }
2718

2719
END:
769,187✔
2720
  if (code != 0) {
769,187✔
2721
    terrno = code;
705,586✔
2722
    tqErrorC("%s consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", __func__, tmq != NULL ? tmq->consumerId : 0, lino, tstrerror(code));
705,586✔
2723
  } else {
2724
    tqDebugC("%s consumer:0x%" PRIx64 " poll end with timeout, msg:%s", __func__, tmq != NULL ? tmq->consumerId : 0, tstrerror(terrno));
63,601✔
2725
  }
2726
  return NULL;
769,187✔
2727
}
2728

2729
static void displayConsumeStatistics(tmq_t* pTmq) {
219,963✔
2730
  if (pTmq == NULL) return;
219,963✔
2731
  taosRLockLatch(&pTmq->lock);
219,963✔
2732
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
219,963✔
2733
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
219,963✔
2734
          pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2735

2736
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
219,963✔
2737
  for (int32_t i = 0; i < numOfTopics; ++i) {
337,964✔
2738
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
118,001✔
2739
    if (pTopics == NULL) continue;
118,001✔
2740
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
118,001✔
2741
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
118,001✔
2742
    for (int32_t j = 0; j < numOfVgs; ++j) {
493,034✔
2743
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
375,033✔
2744
      if (pVg == NULL) continue;
375,033✔
2745
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
375,033✔
2746
    }
2747
  }
2748
  taosRUnLockLatch(&pTmq->lock);
219,963✔
2749
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
219,963✔
2750
}
2751

2752
int32_t tmq_unsubscribe(tmq_t* tmq) {
219,963✔
2753
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
219,963✔
2754
  int32_t code = 0;
219,963✔
2755
  int8_t status = atomic_load_8(&tmq->status);
219,963✔
2756
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
219,963✔
2757

2758
  displayConsumeStatistics(tmq);
219,963✔
2759
  if (status != TMQ_CONSUMER_STATUS__READY) {
219,963✔
2760
    tqInfoC("consumer:0x%" PRIx64 " status:%d, not in ready state, no need unsubscribe", tmq->consumerId, status);
8,902✔
2761
    goto END;
8,902✔
2762
  }
2763
  if (tmq->autoCommit) {
211,061✔
2764
    code = tmq_commit_sync(tmq, NULL);
120,962✔
2765
    if (code != 0) {
120,962✔
2766
      goto END;
×
2767
    }
2768
  }
2769
  tmqSendHbReq((void*)(tmq->refId), NULL);
211,061✔
2770

2771
  tmq_list_t* lst = tmq_list_new();
211,061✔
2772
  if (lst == NULL) {
211,061✔
2773
    code = terrno;
×
2774
    goto END;
×
2775
  }
2776
  code = tmq_subscribe(tmq, lst);
211,061✔
2777
  tmq_list_destroy(lst);
211,061✔
2778
  tmqClearUnhandleMsg(tmq);
211,061✔
2779
  atomic_store_32(&tmq->epoch, 0);
211,061✔
2780
  if(code != 0){
211,061✔
2781
    goto END;
322✔
2782
  }
2783

2784
  END:
210,739✔
2785
  return code;
219,963✔
2786
}
2787

2788
int32_t tmq_consumer_close(tmq_t* tmq) {
117,570✔
2789
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
117,570✔
2790
  int32_t code = 0;
117,502✔
2791
  (void) taosThreadMutexLock(&tmqMgmt.lock);
117,502✔
2792
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
117,502✔
2793
    goto end;
1,337✔
2794
  }
2795
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
116,165✔
2796
  code = tmq_unsubscribe(tmq);
116,165✔
2797
  if (code == 0) {
116,165✔
2798
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
115,843✔
2799
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
115,843✔
2800
    if (code != 0){
115,843✔
2801
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2802
    }
2803
  }
2804

2805
end:
116,165✔
2806
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
117,502✔
2807
  return code;
117,502✔
2808
}
2809

2810
const char* tmq_err2str(int32_t err) {
26,128✔
2811
  if (err == 0) {
26,128✔
2812
    return "success";
24,137✔
2813
  } else if (err == -1) {
1,991✔
2814
    return "fail";
×
2815
  } else {
2816
    if (*(taosGetErrMsg()) == 0) {
1,991✔
2817
      return tstrerror(err);
1,991✔
2818
    } else {
2819
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
×
2820
      return (const char*)taosGetErrMsgReturn();
×
2821
    }
2822
  }
2823
}
2824

2825
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
3,772,671✔
2826
  if (res == NULL) {
3,772,671✔
2827
    return TMQ_RES_INVALID;
4,844✔
2828
  }
2829
  if (TD_RES_TMQ(res)) {
3,767,827✔
2830
    return TMQ_RES_DATA;
3,738,519✔
2831
  } else if (TD_RES_TMQ_META(res)) {
29,308✔
2832
    return TMQ_RES_TABLE_META;
22,534✔
2833
  } else if (TD_RES_TMQ_METADATA(res)) {
6,774✔
2834
    return TMQ_RES_METADATA;
3,466✔
2835
  } else if (TD_RES_TMQ_BATCH_META(res)) {
3,308✔
2836
    return TMQ_RES_TABLE_META;
3,308✔
2837
  } else if (TD_RES_TMQ_RAW(res)) {
×
2838
    return TMQ_RES_RAWDATA;
×
2839
  } else {
2840
    return TMQ_RES_INVALID;
×
2841
  }
2842
}
2843

2844
const char* tmq_get_topic_name(TAOS_RES* res) {
3,501,694✔
2845
  if (res == NULL) {
3,501,694✔
2846
    return NULL;
3,806✔
2847
  }
2848
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
3,497,888✔
2849
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
24,906✔
2850
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
3,497,888✔
2851
    if (tmp == NULL) {
3,497,888✔
2852
      return NULL;
×
2853
    }
2854
    return tmp + 1;
3,497,888✔
2855
  } else {
2856
    return NULL;
×
2857
  }
2858
}
2859

2860
const char* tmq_get_db_name(TAOS_RES* res) {
3,483,206✔
2861
  if (res == NULL) {
3,483,206✔
2862
    return NULL;
4,152✔
2863
  }
2864

2865
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
3,479,054✔
2866
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
24,906✔
2867
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
3,479,054✔
2868
    if (tmp == NULL) {
3,479,054✔
2869
      return NULL;
×
2870
    }
2871
    return tmp + 1;
3,479,054✔
2872
  } else {
2873
    return NULL;
×
2874
  }
2875
}
2876

2877
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
3,486,010✔
2878
  if (res == NULL) {
3,486,010✔
2879
    return TSDB_CODE_INVALID_PARA;
7,612✔
2880
  }
2881
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
3,478,398✔
2882
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
24,906✔
2883
    return ((SMqRspObj*)res)->vgId;
3,478,398✔
2884
  } else {
2885
    return TSDB_CODE_INVALID_PARA;
×
2886
  }
2887
}
2888

2889
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
159,362✔
2890
  if (res == NULL) {
159,362✔
2891
    return TSDB_CODE_INVALID_PARA;
6,920✔
2892
  }
2893
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
152,442✔
2894
    SMqRspObj* pRspObj = (SMqRspObj*)res;
152,442✔
2895
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
152,442✔
2896
    if (pOffset->type == TMQ_OFFSET__LOG) {
152,442✔
2897
      return pOffset->version;
152,442✔
2898
    } else {
2899
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2900
    }
2901
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
2902
    SMqRspObj* pRspObj = (SMqRspObj*)res;
×
2903
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
×
2904
      return pRspObj->rspOffset.version;
×
2905
    }
2906
  } else {
2907
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2908
  }
2909

2910
  // data from tsdb, no valid offset info
2911
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2912
}
2913

2914
const char* tmq_get_table_name(TAOS_RES* res) {
2,147,483,647✔
2915
  if (res == NULL) {
2,147,483,647✔
2916
    return NULL;
3,806✔
2917
  }
2918
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
2,147,483,647✔
2919
    SMqRspObj* pRspObj = (SMqRspObj*)res;
2,147,483,647✔
2920
    SMqDataRsp* data = &pRspObj->dataRsp;
2,147,483,647✔
2921
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
2,147,483,647✔
2922
        pRspObj->resIter >= data->blockNum) {
1,436,502,009✔
2923
      return NULL;
2,147,483,647✔
2924
    }
2925
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
1,436,502,001✔
2926
  }
2927
  return NULL;
1,384✔
2928
}
2929

2930
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
7,612✔
2931
  if (tmq == NULL) {
7,612✔
2932
    tqErrorC("invalid tmq handle, null");
×
2933
    if (cb != NULL) {
×
2934
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2935
    }
2936
    return;
×
2937
  }
2938
  if (pRes == NULL) {  // here needs to commit all offsets.
7,612✔
2939
    asyncCommitAllOffsets(tmq, cb, param);
7,612✔
2940
  } else {  // only commit one offset
2941
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2942
  }
2943
}
2944

2945
static void commitCallBackFn(tmq_t* tmq, int32_t code, void* param) {
203,286✔
2946
  if (param == NULL) {
203,286✔
2947
    tqErrorC("invalid param in commit cb");
×
2948
    return;
×
2949
  }
2950
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
203,286✔
2951
  pInfo->code = code;
203,286✔
2952
  if (tsem2_post(&pInfo->sem) != 0){
203,286✔
2953
    tqErrorC("failed to post rsp sem in commit cb");
×
2954
  }
2955
}
2956

2957
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
202,012✔
2958
  if (tmq == NULL) {
202,012✔
2959
    tqErrorC("invalid tmq handle, null");
×
2960
    return TSDB_CODE_INVALID_PARA;
×
2961
  }
2962

2963
  int32_t code = 0;
202,012✔
2964

2965
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
202,012✔
2966
  if (pInfo == NULL) {
202,012✔
2967
    tqErrorC("failed to allocate memory for sync commit");
×
2968
    return terrno;
×
2969
  }
2970

2971
  code = tsem2_init(&pInfo->sem, 0, 0);
202,012✔
2972
  if (code != 0) {
202,012✔
2973
    tqErrorC("failed to init sem for sync commit");
×
2974
    taosMemoryFree(pInfo);
×
2975
    return code;
×
2976
  }
2977
  pInfo->code = 0;
202,012✔
2978

2979
  if (pRes == NULL) {
202,012✔
2980
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
193,005✔
2981
  } else {
2982
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
9,007✔
2983
  }
2984

2985
  if (tsem2_wait(&pInfo->sem) != 0){
202,012✔
2986
    tqErrorC("failed to wait sem for sync commit");
×
2987
  }
2988
  code = pInfo->code;
202,012✔
2989

2990
  if(tsem2_destroy(&pInfo->sem) != 0) {
202,012✔
2991
    tqErrorC("failed to destroy sem for sync commit");
×
2992
  }
2993
  taosMemoryFree(pInfo);
202,012✔
2994

2995
  tqDebugC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
202,012✔
2996
  return code;
202,012✔
2997
}
2998

2999
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
3000
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
10,497✔
3001
  if (offset == NULL) {
10,497✔
3002
    tqErrorC("invalid offset, null");
×
3003
    return TSDB_CODE_INVALID_PARA;
×
3004
  }
3005
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
10,497✔
3006
    tqErrorC("Assignment or poll interface need to be called first");
692✔
3007
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
692✔
3008
  }
3009

3010
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
9,805✔
3011
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
59✔
3012
             offset->walVerBegin, offset->walVerEnd);
3013
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
59✔
3014
  }
3015

3016
  return 0;
9,746✔
3017
}
3018

3019
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
7,040✔
3020
  if (tmq == NULL || pTopicName == NULL) {
7,040✔
3021
    tqErrorC("invalid tmq handle, null");
×
3022
    return TSDB_CODE_INVALID_PARA;
×
3023
  }
3024

3025
  int32_t accId = tmq->pTscObj->acctId;
7,040✔
3026
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
7,040✔
3027
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
7,040✔
3028

3029
  taosWLockLatch(&tmq->lock);
7,040✔
3030
  SMqClientVg* pVg = NULL;
7,040✔
3031
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
7,040✔
3032
  if (code != 0) {
7,040✔
3033
    taosWUnLockLatch(&tmq->lock);
3,806✔
3034
    return code;
3,806✔
3035
  }
3036

3037
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
3,234✔
3038
  code = checkWalRange(pOffsetInfo, offset);
3,234✔
3039
  if (code != 0) {
3,234✔
3040
    taosWUnLockLatch(&tmq->lock);
692✔
3041
    return code;
692✔
3042
  }
3043
  taosWUnLockLatch(&tmq->lock);
2,542✔
3044

3045
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
2,542✔
3046

3047
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
2,542✔
3048
  if (pInfo == NULL) {
2,542✔
3049
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
3050
    return terrno;
×
3051
  }
3052

3053
  code = tsem2_init(&pInfo->sem, 0, 0);
2,542✔
3054
  if (code != 0) {
2,542✔
3055
    taosMemoryFree(pInfo);
×
3056
    return code;
×
3057
  }
3058
  pInfo->code = 0;
2,542✔
3059

3060
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
2,542✔
3061
  if (code == 0) {
2,542✔
3062
    if (tsem2_wait(&pInfo->sem) != 0){
1,274✔
3063
      tqErrorC("failed to wait sem for sync commit offset");
×
3064
    }
3065
    code = pInfo->code;
1,274✔
3066
  }
3067

3068
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
2,542✔
3069
  if(tsem2_destroy(&pInfo->sem) != 0) {
2,542✔
3070
    tqErrorC("failed to destroy sem for sync commit offset");
×
3071
  }
3072
  taosMemoryFree(pInfo);
2,542✔
3073

3074
  tqDebugC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
2,542✔
3075
          offset, tstrerror(code));
3076

3077
  return code;
2,542✔
3078
}
3079

3080
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
9,342✔
3081
                             void* param) {
3082
  int32_t code = 0;
9,342✔
3083
  if (tmq == NULL || pTopicName == NULL) {
9,342✔
3084
    tqErrorC("invalid tmq handle, null");
×
3085
    code = TSDB_CODE_INVALID_PARA;
×
3086
    goto end;
×
3087
  }
3088

3089
  int32_t accId = tmq->pTscObj->acctId;
9,342✔
3090
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
9,342✔
3091
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
9,342✔
3092

3093
  taosWLockLatch(&tmq->lock);
9,342✔
3094
  SMqClientVg* pVg = NULL;
9,342✔
3095
  code = getClientVg(tmq, tname, vgId, &pVg);
9,342✔
3096
  if (code != 0) {
9,342✔
3097
    taosWUnLockLatch(&tmq->lock);
7,958✔
3098
    goto end;
7,958✔
3099
  }
3100

3101
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
1,384✔
3102
  code = checkWalRange(pOffsetInfo, offset);
1,384✔
3103
  if (code != 0) {
1,384✔
UNCOV
3104
    taosWUnLockLatch(&tmq->lock);
×
UNCOV
3105
    goto end;
×
3106
  }
3107
  taosWUnLockLatch(&tmq->lock);
1,384✔
3108

3109
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
1,384✔
3110

3111
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
1,384✔
3112

3113
  tqDebugC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
1,384✔
3114
          offset, tstrerror(code));
3115

3116
  end:
9,342✔
3117
  if (code != 0 && cb != NULL) {
9,342✔
3118
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
3119
    cb(tmq, code, param);
×
3120
  }
3121
}
9,342✔
3122

3123

3124
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
44,336,873✔
3125
  if (res == NULL || pResInfo == NULL) {
44,336,873✔
3126
    return TSDB_CODE_INVALID_PARA;
×
3127
  }
3128
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
44,336,873✔
3129
  SMqDataRsp* data = &pRspObj->dataRsp;
44,336,873✔
3130

3131
  pRspObj->resIter++;
44,336,873✔
3132
  if (pRspObj->resIter < data->blockNum) {
44,336,873✔
3133
    doFreeReqResultInfo(&pRspObj->resInfo);
40,608,124✔
3134
    SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
40,607,761✔
3135
    if (pSW) {
40,607,035✔
3136
      TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
40,607,761✔
3137
    }
3138
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
40,607,398✔
3139
    void*   rawData = NULL;
40,608,124✔
3140
    int64_t rows = 0;
40,608,124✔
3141
    int32_t precision = 0;
40,608,124✔
3142
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
40,608,124✔
3143

3144
    pRspObj->resInfo.pData = rawData;
40,607,761✔
3145
    pRspObj->resInfo.numOfRows = rows;
40,608,124✔
3146
    pRspObj->resInfo.current = 0;
40,607,398✔
3147
    pRspObj->resInfo.precision = precision;
40,607,761✔
3148

3149
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
40,608,124✔
3150
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4, false);
40,608,124✔
3151
    if (code != 0) {
40,607,761✔
3152
      return code;
×
3153
    }
3154
    *pResInfo = &pRspObj->resInfo;
40,607,761✔
3155
    return code;
40,607,398✔
3156
  }
3157

3158
  return TSDB_CODE_TSC_INTERNAL_ERROR;
3,728,749✔
3159
}
3160

3161
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
761✔
3162
  if (param == NULL || pMsg == NULL) {
761✔
3163
    return code;
×
3164
  }
3165
  SMqVgWalInfoParam* pParam = param;
761✔
3166
  SMqVgCommon*       pCommon = pParam->pCommon;
761✔
3167

3168
  if (code != TSDB_CODE_SUCCESS) {
761✔
3169
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3170
             pParam->vgId, pCommon->pTopicName);
3171

3172
  } else {
3173
    SMqDataRsp rsp = {0};
761✔
3174
    SDecoder   decoder = {0};
761✔
3175
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
761✔
3176
    code = tDecodeMqDataRsp(&decoder, &rsp);
761✔
3177
    tDecoderClear(&decoder);
761✔
3178
    if (code != 0) {
754✔
3179
      goto END;
×
3180
    }
3181

3182
    SMqRspHead*          pHead = pMsg->pData;
754✔
3183
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
1,388✔
3184
        .end = pHead->walever + 1,
754✔
3185
        .currentOffset = rsp.rspOffset.version,
754✔
3186
        .vgId = pParam->vgId};
754✔
3187

3188
    (void)taosThreadMutexLock(&pCommon->mutex);
754✔
3189
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
1,522✔
3190
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3191
               pParam->vgId, pCommon->pTopicName);
3192
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
3193
    }
3194
    (void)taosThreadMutexUnlock(&pCommon->mutex);
761✔
3195
  }
3196

3197
  END:
761✔
3198
  pCommon->code = code;
761✔
3199
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
761✔
3200
  if (total == pParam->totalReq) {
761✔
3201
    if (tsem2_post(&pCommon->rsp) != 0) {
410✔
3202
      tqErrorC("failed to post semaphore in get wal cb");
×
3203
    }
3204
  }
3205

3206
  if (pMsg) {
761✔
3207
    taosMemoryFree(pMsg->pData);
761✔
3208
    taosMemoryFree(pMsg->pEpSet);
761✔
3209
  }
3210

3211
  return code;
761✔
3212
}
3213

3214
static void destroyCommonInfo(SMqVgCommon* pCommon) {
2,080✔
3215
  if (pCommon == NULL) {
2,080✔
3216
    return;
1,670✔
3217
  }
3218
  taosArrayDestroy(pCommon->pList);
410✔
3219
  pCommon->pList = NULL;
410✔
3220
  if(tsem2_destroy(&pCommon->rsp) != 0) {
410✔
3221
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
3222
  }
3223
  taosMemoryFreeClear(pCommon->pTopicName);
410✔
3224
  (void)taosThreadMutexDestroy(&pCommon->mutex);
410✔
3225
  taosMemoryFree(pCommon);
410✔
3226
}
3227

3228
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
20,299✔
3229
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
20,299✔
3230
    return true;
×
3231
  }
3232
  return false;
20,299✔
3233
}
3234

3235
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
1,068✔
3236
  if (param == NULL) {
1,068✔
3237
    return code;
×
3238
  }
3239
  SMqCommittedParam* pParam = param;
1,068✔
3240

3241
  if (code != 0) {
1,068✔
3242
    goto end;
59✔
3243
  }
3244
  if (pMsg) {
1,009✔
3245
    SDecoder decoder = {0};
1,009✔
3246
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
1,009✔
3247
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
1,009✔
3248
    if (err < 0) {
1,009✔
3249
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3250
      code = err;
×
3251
      goto end;
×
3252
    }
3253
    tDecoderClear(&decoder);
1,009✔
3254
  }
3255

3256
  end:
×
3257
  if (pMsg) {
1,068✔
3258
    taosMemoryFree(pMsg->pData);
1,068✔
3259
    taosMemoryFree(pMsg->pEpSet);
1,068✔
3260
  }
3261
  pParam->code = code;
1,068✔
3262
  if (tsem2_post(&pParam->sem) != 0){
1,068✔
3263
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
3264
  }
3265
  return code;
1,068✔
3266
}
3267

3268
int32_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet, int64_t* committed) {
1,068✔
3269
  if (tmq == NULL || tname == NULL || epSet == NULL) {
1,068✔
3270
    return TSDB_CODE_INVALID_PARA;
×
3271
  }
3272
  int32_t     code = 0;
1,068✔
3273
  SMqVgOffset pOffset = {0};
1,068✔
3274

3275
  pOffset.consumerId = tmq->consumerId;
1,068✔
3276
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
1,068✔
3277

3278
  int32_t len = 0;
1,068✔
3279
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
1,068✔
3280
  if (code < 0) {
1,068✔
3281
    return TSDB_CODE_INVALID_PARA;
×
3282
  }
3283

3284
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
1,068✔
3285
  if (buf == NULL) {
1,068✔
3286
    return terrno;
×
3287
  }
3288

3289
  ((SMsgHead*)buf)->vgId = htonl(vgId);
1,068✔
3290

3291
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1,068✔
3292

3293
  SEncoder encoder = {0};
1,068✔
3294
  tEncoderInit(&encoder, abuf, len);
1,068✔
3295
  code = tEncodeMqVgOffset(&encoder, &pOffset);
1,068✔
3296
  if (code < 0) {
1,068✔
3297
    taosMemoryFree(buf);
×
3298
    tEncoderClear(&encoder);
×
3299
    return code;
×
3300
  }
3301
  tEncoderClear(&encoder);
1,068✔
3302

3303
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,068✔
3304
  if (sendInfo == NULL) {
1,068✔
3305
    taosMemoryFree(buf);
×
3306
    return terrno;
×
3307
  }
3308

3309
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
1,068✔
3310
  if (pParam == NULL) {
1,068✔
3311
    taosMemoryFree(buf);
×
3312
    taosMemoryFree(sendInfo);
×
3313
    return terrno;
×
3314
  }
3315
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
1,068✔
3316
    taosMemoryFree(buf);
×
3317
    taosMemoryFree(sendInfo);
×
3318
    taosMemoryFree(pParam);
×
3319
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3320
  }
3321

3322
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
1,068✔
3323
  sendInfo->requestId = generateRequestId();
1,068✔
3324
  sendInfo->requestObjRefId = 0;
1,068✔
3325
  sendInfo->param = pParam;
1,068✔
3326
  sendInfo->fp = tmCommittedCb;
1,068✔
3327
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
1,068✔
3328

3329
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
1,068✔
3330
  if (code != 0) {
1,068✔
3331
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3332
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3333
    }
3334
    taosMemoryFree(pParam);
×
3335
    return code;
×
3336
  }
3337

3338
  if (tsem2_wait(&pParam->sem) != 0){
1,068✔
3339
    tqErrorC("failed to wait semaphore in get committed from server");
×
3340
  }
3341
  code = pParam->code;
1,068✔
3342
  if (code == TSDB_CODE_SUCCESS) {
1,068✔
3343
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
1,009✔
3344
      *committed = pParam->vgOffset.offset.val.version;
1,009✔
3345
    } else {
3346
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3347
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3348
    }
3349
  }
3350
  if(tsem2_destroy(&pParam->sem) != 0) {
1,068✔
3351
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3352
  }
3353
  taosMemoryFree(pParam);
1,068✔
3354

3355
  return code;
1,068✔
3356
}
3357

3358
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
8,302✔
3359
  if (tmq == NULL || pTopicName == NULL) {
8,302✔
3360
    tqErrorC("invalid tmq handle, null");
×
3361
    return TSDB_CODE_INVALID_PARA;
×
3362
  }
3363

3364
  int32_t accId = tmq->pTscObj->acctId;
8,302✔
3365
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
8,302✔
3366
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
8,302✔
3367

3368
  taosWLockLatch(&tmq->lock);
8,302✔
3369

3370
  SMqClientVg* pVg = NULL;
8,302✔
3371
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
8,302✔
3372
  if (code != 0) {
8,302✔
3373
    taosWUnLockLatch(&tmq->lock);
4,498✔
3374
    return code;
4,498✔
3375
  }
3376

3377
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
3,804✔
3378
  int32_t        type = pOffsetInfo->endOffset.type;
3,804✔
3379
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
3,804✔
3380
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3381
    taosWUnLockLatch(&tmq->lock);
×
3382
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3383
  }
3384

3385
  code = checkWalRange(pOffsetInfo, -1);
3,804✔
3386
  if (code != 0) {
3,804✔
UNCOV
3387
    taosWUnLockLatch(&tmq->lock);
×
UNCOV
3388
    return code;
×
3389
  }
3390
  SEpSet  epSet = pVg->epSet;
3,804✔
3391
  int64_t begin = pVg->offsetInfo.walVerBegin;
3,804✔
3392
  int64_t end = pVg->offsetInfo.walVerEnd;
3,804✔
3393
  taosWUnLockLatch(&tmq->lock);
3,804✔
3394

3395
  int64_t position = 0;
3,804✔
3396
  if (type == TMQ_OFFSET__LOG) {
3,804✔
3397
    position = pOffsetInfo->endOffset.version;
3,487✔
3398
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
317✔
3399
    code = getCommittedFromServer(tmq, tname, vgId, &epSet, &position);
317✔
3400
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
317✔
3401
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3402
        position = begin;
×
3403
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3404
        position = end;
×
3405
      } else {
3406
        tqErrorC("consumer:0x%" PRIx64 " invalid offset type:%d", tmq->consumerId, type);
×
3407
        return TSDB_CODE_INTERNAL_ERROR;
×
3408
      }
3409
    } else if(code != 0) {
317✔
3410
      tqErrorC("consumer:0x%" PRIx64 " getCommittedFromServer error,%d", tmq->consumerId, code);
×
3411
      return code;
×
3412
    }
3413
  } else {
3414
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3415
    return TSDB_CODE_INTERNAL_ERROR;
×
3416
  }
3417

3418
  tqDebugC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
3,804✔
3419
  return position;
3,804✔
3420
}
3421

3422
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
10,033✔
3423
  if (tmq == NULL || pTopicName == NULL) {
10,033✔
3424
    tqErrorC("invalid tmq handle, null");
×
3425
    return TSDB_CODE_INVALID_PARA;
×
3426
  }
3427

3428
  int32_t accId = tmq->pTscObj->acctId;
10,033✔
3429
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
10,033✔
3430
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
10,033✔
3431

3432
  taosWLockLatch(&tmq->lock);
10,033✔
3433

3434
  SMqClientVg* pVg = NULL;
10,033✔
3435
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
10,033✔
3436
  if (code != 0) {
10,033✔
3437
    taosWUnLockLatch(&tmq->lock);
4,844✔
3438
    return code;
4,844✔
3439
  }
3440

3441
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
5,189✔
3442
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
5,189✔
3443
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3444
             pOffsetInfo->endOffset.type);
3445
    taosWUnLockLatch(&tmq->lock);
×
3446
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3447
  }
3448

3449
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
5,189✔
3450
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3451
             pOffsetInfo->committedOffset.type);
3452
    taosWUnLockLatch(&tmq->lock);
×
3453
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3454
  }
3455

3456
  int64_t committed = 0;
5,189✔
3457
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
5,189✔
3458
    committed = pOffsetInfo->committedOffset.version;
4,438✔
3459
    taosWUnLockLatch(&tmq->lock);
4,438✔
3460
    goto end;
4,438✔
3461
  }
3462
  SEpSet epSet = pVg->epSet;
751✔
3463
  taosWUnLockLatch(&tmq->lock);
751✔
3464

3465
  code = getCommittedFromServer(tmq, tname, vgId, &epSet, &committed);
751✔
3466
  if (code != 0) {
751✔
3467
    tqErrorC("consumer:0x%" PRIx64 " getCommittedFromServer error,%d", tmq->consumerId, code);
59✔
3468
    return code;
59✔
3469
  }
3470

3471
  end:
5,130✔
3472
  tqDebugC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
5,130✔
3473
  return committed;
5,130✔
3474
}
3475

3476
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
7,616✔
3477
                                 int32_t* numOfAssignment) {
3478
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
7,616✔
3479
    tqErrorC("invalid tmq handle, null");
5,536✔
3480
    return TSDB_CODE_INVALID_PARA;
5,536✔
3481
  }
3482
  *numOfAssignment = 0;
2,080✔
3483
  *assignment = NULL;
2,080✔
3484
  SMqVgCommon* pCommon = NULL;
2,080✔
3485

3486
  int32_t accId = tmq->pTscObj->acctId;
2,080✔
3487
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
2,080✔
3488
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
2,080✔
3489

3490
  taosWLockLatch(&tmq->lock);
2,080✔
3491

3492
  SMqClientTopic* pTopic = NULL;
2,080✔
3493
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
2,080✔
3494
  if (code != 0) {
2,080✔
3495
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
3496
    goto end;
×
3497
  }
3498

3499
  // in case of snapshot is opened, no valid offset will return
3500
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2,080✔
3501
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
6,122✔
3502
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
4,042✔
3503
    if (pClientVg == NULL) {
4,042✔
3504
      continue;
×
3505
    }
3506
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
4,042✔
3507
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
4,042✔
3508
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3509
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3510
      goto end;
×
3511
    }
3512
  }
3513

3514
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
2,080✔
3515
  if (*assignment == NULL) {
2,080✔
3516
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3517
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3518
    code = terrno;
×
3519
    goto end;
×
3520
  }
3521

3522
  bool needFetch = false;
2,080✔
3523

3524
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5,678✔
3525
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
4,008✔
3526
    if (pClientVg == NULL) {
4,008✔
3527
      continue;
×
3528
    }
3529
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
4,008✔
3530
      needFetch = true;
410✔
3531
      break;
410✔
3532
    }
3533

3534
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
3,598✔
3535
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
3,598✔
3536
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
3,598✔
3537
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
3,598✔
3538
    pAssignment->vgId = pClientVg->vgId;
3,598✔
3539
    tqDebugC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
3,598✔
3540
            pAssignment->currentOffset);
3541
  }
3542

3543
  if (needFetch) {
2,080✔
3544
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
410✔
3545
    if (pCommon == NULL) {
410✔
3546
      code = terrno;
×
3547
      goto end;
×
3548
    }
3549

3550
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
410✔
3551
    if (pCommon->pList == NULL) {
410✔
3552
      code = terrno;
×
3553
      goto end;
×
3554
    }
3555

3556
    code = tsem2_init(&pCommon->rsp, 0, 0);
410✔
3557
    if (code != 0) {
410✔
3558
      goto end;
×
3559
    }
3560
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
410✔
3561
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
410✔
3562
    if (pCommon->pTopicName == NULL) {
410✔
3563
      code = terrno;
×
3564
      goto end;
×
3565
    }
3566
    pCommon->consumerId = tmq->consumerId;
410✔
3567
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
1,171✔
3568
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
761✔
3569
      if (pClientVg == NULL) {
761✔
3570
        continue;
×
3571
      }
3572
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
761✔
3573
      if (pParam == NULL) {
761✔
3574
        code = terrno;
×
3575
        goto end;
×
3576
      }
3577

3578
      pParam->epoch = atomic_load_32(&tmq->epoch);
761✔
3579
      pParam->vgId = pClientVg->vgId;
761✔
3580
      pParam->totalReq = *numOfAssignment;
761✔
3581
      pParam->pCommon = pCommon;
761✔
3582

3583
      SMqPollReq req = {0};
761✔
3584
      tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg);
761✔
3585
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
761✔
3586

3587
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
761✔
3588
      if (msgSize < 0) {
761✔
3589
        taosMemoryFree(pParam);
×
3590
        code = msgSize;
×
3591
        goto end;
×
3592
      }
3593

3594
      char* msg = taosMemoryCalloc(1, msgSize);
761✔
3595
      if (NULL == msg) {
761✔
3596
        taosMemoryFree(pParam);
×
3597
        code = terrno;
×
3598
        goto end;
×
3599
      }
3600

3601
      msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
761✔
3602
      if (msgSize < 0) {
761✔
3603
        taosMemoryFree(msg);
×
3604
        taosMemoryFree(pParam);
×
3605
        code = msgSize;
×
3606
        goto end;
×
3607
      }
3608

3609
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
761✔
3610
      if (sendInfo == NULL) {
761✔
3611
        taosMemoryFree(pParam);
×
3612
        taosMemoryFree(msg);
×
3613
        code = terrno;
×
3614
        goto end;
×
3615
      }
3616

3617
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
761✔
3618
      sendInfo->requestId = req.reqId;
761✔
3619
      sendInfo->requestObjRefId = 0;
761✔
3620
      sendInfo->param = pParam;
761✔
3621
      sendInfo->paramFreeFp = taosAutoMemoryFree;
761✔
3622
      sendInfo->fp = tmqGetWalInfoCb;
761✔
3623
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
761✔
3624

3625
      // int64_t transporterId = 0;
3626
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
761✔
3627
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
761✔
3628

3629
      tqDebugC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, QID:0x%" PRIx64, tmq->consumerId,
761✔
3630
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3631
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
761✔
3632
      if (code != 0) {
761✔
3633
        goto end;
×
3634
      }
3635
    }
3636

3637
    if (tsem2_wait(&pCommon->rsp) != 0){
410✔
3638
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3639
    }
3640
    code = pCommon->code;
410✔
3641

3642
    if (code != TSDB_CODE_SUCCESS) {
410✔
3643
      goto end;
×
3644
    }
3645
    int32_t num = taosArrayGetSize(pCommon->pList);
410✔
3646
    for (int32_t i = 0; i < num; ++i) {
1,171✔
3647
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
761✔
3648
    }
3649
    *numOfAssignment = num;
410✔
3650

3651
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
1,171✔
3652
      tmq_topic_assignment* p = &(*assignment)[j];
761✔
3653

3654
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
2,224✔
3655
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
1,463✔
3656
        if (pClientVg == NULL) {
1,463✔
3657
          continue;
×
3658
        }
3659
        if (pClientVg->vgId != p->vgId) {
1,463✔
3660
          continue;
702✔
3661
        }
3662

3663
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
761✔
3664
        tqDebugC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
761✔
3665
                p->vgId, p->currentOffset);
3666

3667
        pOffsetInfo->walVerBegin = p->begin;
761✔
3668
        pOffsetInfo->walVerEnd = p->end;
761✔
3669
      }
3670
    }
3671
  }
3672

3673
  end:
2,080✔
3674
  if (code != TSDB_CODE_SUCCESS) {
2,080✔
3675
    taosMemoryFree(*assignment);
×
3676
    *assignment = NULL;
×
3677
    *numOfAssignment = 0;
×
3678
  }
3679
  destroyCommonInfo(pCommon);
2,080✔
3680
  taosWUnLockLatch(&tmq->lock);
2,080✔
3681
  return code;
2,080✔
3682
}
3683

3684
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
8,902✔
3685
  if (pAssignment == NULL) {
8,902✔
3686
    return;
6,920✔
3687
  }
3688

3689
  taosMemoryFree(pAssignment);
1,982✔
3690
}
3691

3692
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
2,016✔
3693
  if (pMsg) {
2,016✔
3694
    taosMemoryFree(pMsg->pData);
2,016✔
3695
    taosMemoryFree(pMsg->pEpSet);
2,016✔
3696
  }
3697
  if (param == NULL) {
2,016✔
3698
    return code;
×
3699
  }
3700
  SMqSeekParam* pParam = param;
2,016✔
3701
  pParam->code = code;
2,016✔
3702
  if (tsem2_post(&pParam->sem) != 0){
2,016✔
3703
    tqErrorC("failed to post sem in tmqSeekCb");
×
3704
  }
3705
  return 0;
2,016✔
3706
}
3707

3708
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3709
// there is no data to poll
3710
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
5,653✔
3711
  if (tmq == NULL || pTopicName == NULL) {
5,653✔
3712
    tqErrorC("invalid tmq handle, null");
×
3713
    return TSDB_CODE_INVALID_PARA;
×
3714
  }
3715

3716
  int32_t accId = tmq->pTscObj->acctId;
5,653✔
3717
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
5,653✔
3718
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
5,653✔
3719

3720
  taosWLockLatch(&tmq->lock);
5,653✔
3721

3722
  SMqClientVg* pVg = NULL;
5,653✔
3723
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
5,653✔
3724
  if (code != 0) {
5,653✔
3725
    taosWUnLockLatch(&tmq->lock);
3,578✔
3726
    return code;
3,578✔
3727
  }
3728

3729
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
2,075✔
3730

3731
  int32_t type = pOffsetInfo->endOffset.type;
2,075✔
3732
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
2,075✔
3733
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3734
    taosWUnLockLatch(&tmq->lock);
×
3735
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3736
  }
3737

3738
  code = checkWalRange(pOffsetInfo, offset);
2,075✔
3739
  if (code != 0) {
2,075✔
3740
    taosWUnLockLatch(&tmq->lock);
59✔
3741
    return code;
59✔
3742
  }
3743

3744
  tqDebugC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
2,016✔
3745
  // update the offset, and then commit to vnode
3746
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
2,016✔
3747
  pOffsetInfo->endOffset.version = offset;
2,016✔
3748
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
2,016✔
3749
  pVg->seekUpdated = true;
2,016✔
3750
  SEpSet epSet = pVg->epSet;
2,016✔
3751
  taosWUnLockLatch(&tmq->lock);
2,016✔
3752

3753
  SMqSeekReq req = {0};
2,016✔
3754
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
2,016✔
3755
  req.head.vgId = vgId;
2,016✔
3756
  req.consumerId = tmq->consumerId;
2,016✔
3757

3758
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
2,016✔
3759
  if (msgSize < 0) {
2,016✔
3760
    tqErrorC("%s get invalid msg at line %d", __func__, __LINE__);
×
3761
    return TSDB_CODE_TMQ_INVALID_MSG;
×
3762
  }
3763

3764
  char* msg = taosMemoryCalloc(1, msgSize);
2,016✔
3765
  if (NULL == msg) {
2,016✔
3766
    return terrno;
×
3767
  }
3768

3769
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
2,016✔
3770
    tqErrorC("%s serialize seek req failed at line %d", __func__, __LINE__);
×
3771
    taosMemoryFree(msg);
×
3772
    return TSDB_CODE_TMQ_INVALID_MSG;
×
3773
  }
3774

3775
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,016✔
3776
  if (sendInfo == NULL) {
2,016✔
3777
    taosMemoryFree(msg);
×
3778
    return terrno;
×
3779
  }
3780

3781
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
2,016✔
3782
  if (pParam == NULL) {
2,016✔
3783
    taosMemoryFree(msg);
×
3784
    taosMemoryFree(sendInfo);
×
3785
    return terrno;
×
3786
  }
3787
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
2,016✔
3788
    taosMemoryFree(msg);
×
3789
    taosMemoryFree(sendInfo);
×
3790
    taosMemoryFree(pParam);
×
3791
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3792
  }
3793

3794
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
2,016✔
3795
  sendInfo->requestId = generateRequestId();
2,016✔
3796
  sendInfo->requestObjRefId = 0;
2,016✔
3797
  sendInfo->param = pParam;
2,016✔
3798
  sendInfo->fp = tmqSeekCb;
2,016✔
3799
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
2,016✔
3800

3801
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
2,016✔
3802
  if (code != 0) {
2,016✔
3803
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3804
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3805
    }
3806
    taosMemoryFree(pParam);
×
3807
    return code;
×
3808
  }
3809

3810
  if (tsem2_wait(&pParam->sem) != 0){
2,016✔
3811
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3812
  }
3813
  code = pParam->code;
2,016✔
3814
  if(tsem2_destroy(&pParam->sem) != 0) {
2,016✔
3815
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3816
  }
3817
  taosMemoryFree(pParam);
2,016✔
3818

3819
  tqDebugC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
2,016✔
3820

3821
  return code;
2,016✔
3822
}
3823

3824
TAOS* tmq_get_connect(tmq_t* tmq) {
7,612✔
3825
  if (tmq && tmq->pTscObj) {
7,612✔
3826
    return (TAOS*)(&(tmq->pTscObj->id));
7,612✔
3827
  }
3828
  return NULL;
×
3829
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc