• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4917

07 Jan 2026 03:52PM UTC coverage: 65.42% (+0.02%) from 65.402%
#4917

push

travis-ci

web-flow
merge: from main to 3.0 branch #34204

31 of 34 new or added lines in 2 files covered. (91.18%)

819 existing lines in 129 files now uncovered.

202679 of 309814 relevant lines covered (65.42%)

116724351.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.51
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tarray.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tqueue.h"
25
#include "tref.h"
26
#include "ttimer.h"
27

28
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
29
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO  || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  INFO  ", DEBUG_INFO,  tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
31

32
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
33
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
34
#define DEFAULT_HEARTBEAT_INTERVAL     3000
35
#define DEFAULT_ASKEP_INTERVAL         1000
36
#define DEFAULT_COMMIT_CNT             1
37
#define SUBSCRIBE_RETRY_MAX_COUNT      240
38
#define SUBSCRIBE_RETRY_INTERVAL       500
39

40

41
#define SET_ERROR_MSG_TMQ(MSG) \
42
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
43

44
#define PROCESS_POLL_RSP(FUNC,DATA) \
45
  SDecoder decoder = {0}; \
46
  tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \
47
  if (FUNC(&decoder, DATA) < 0) { \
48
    tDecoderClear(&decoder); \
49
    code = terrno; \
50
    goto END;\
51
  }\
52
  tDecoderClear(&decoder);\
53
  (void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
54

55
#define DELETE_POLL_RSP(FUNC,DATA) \
56
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
57
  taosMemoryFreeClear(pRsp->pEpset);             \
58
  taosMemoryFreeClear(pRsp->data);               \
59
  FUNC(DATA);
60

61
enum {
62
  TMQ_VG_STATUS__IDLE = 0,
63
  TMQ_VG_STATUS__WAIT,
64
};
65

66
enum {
67
  TMQ_CONSUMER_STATUS__INIT = 0,
68
  TMQ_CONSUMER_STATUS__READY,
69
  TMQ_CONSUMER_STATUS__CLOSED,
70
};
71

72
enum {
73
  TMQ_DELAYED_TASK__ASK_EP = 1,
74
  TMQ_DELAYED_TASK__COMMIT,
75
};
76

77
typedef struct {
78
  tmr_h               timer;
79
  int32_t             rsetId;
80
  TdThreadMutex       lock;
81
} SMqMgmt;
82

83
struct tmq_list_t {
84
  SArray container;
85
};
86

87
struct tmq_conf_t {
88
  char           clientId[TSDB_CLIENT_ID_LEN];
89
  char           groupId[TSDB_CGROUP_LEN];
90
  int8_t         autoCommit;
91
  int8_t         enableWalMarker;
92
  int8_t         resetOffset;
93
  int8_t         withTbName;
94
  int8_t         snapEnable;
95
  int8_t         replayEnable;
96
  int8_t         sourceExcluded;  // do not consume, bit
97
  int8_t         rawData;         // fetch raw data
98
  int32_t        maxPollWaitTime;
99
  int32_t        minPollRows;
100
  uint16_t       port;
101
  int32_t        autoCommitInterval;
102
  int32_t        sessionTimeoutMs;
103
  int32_t        heartBeatIntervalMs;
104
  int32_t        maxPollIntervalMs;
105
  char*          ip;
106
  char*          user;
107
  char*          pass;
108
  tmq_commit_cb* commitCb;
109
  void*          commitCbUserParam;
110
  int8_t         enableBatchMeta;
111
};
112

113
struct tmq_t {
114
  int64_t        refId;
115
  char           groupId[TSDB_CGROUP_LEN];
116
  char           clientId[TSDB_CLIENT_ID_LEN];
117
  char           user[TSDB_USER_LEN];
118
  char           fqdn[TSDB_FQDN_LEN];
119
  int8_t         withTbName;
120
  int8_t         useSnapshot;
121
  int8_t         autoCommit;
122
  int8_t         enableWalMarker;
123
  int32_t        autoCommitInterval;
124
  int32_t        sessionTimeoutMs;
125
  int32_t        heartBeatIntervalMs;
126
  int32_t        maxPollIntervalMs;
127
  int8_t         resetOffsetCfg;
128
  int8_t         replayEnable;
129
  int8_t         sourceExcluded;  // do not consume, bit
130
  int8_t         rawData;         // fetch raw data
131
  int32_t        maxPollWaitTime;
132
  int32_t        minPollRows;
133
  int64_t        consumerId;
134
  tmq_commit_cb* commitCb;
135
  void*          commitCbUserParam;
136
  int8_t         enableBatchMeta;
137

138
  // status
139
  SRWLatch lock;
140
  int8_t   status;
141
  int32_t  epoch;
142
  // poll info
143
  int64_t pollCnt;
144
  int64_t totalRows;
145
  int8_t  pollFlag;
146

147
  // timer
148
  tmr_h       hbLiveTimer;
149
  tmr_h       epTimer;
150
  tmr_h       commitTimer;
151
  STscObj*    pTscObj;       // connection
152
  SArray*     clientTopics;  // SArray<SMqClientTopic>
153
  STaosQueue* mqueue;        // queue of rsp
154
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
155
  tsem2_t     rspSem;
156
};
157

158
typedef struct {
159
  int32_t code;
160
  tsem2_t sem;
161
} SAskEpInfo;
162

163
typedef struct {
164
  STqOffsetVal committedOffset;
165
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
166
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
167
  int64_t      walVerBegin;
168
  int64_t      walVerEnd;
169
} SVgOffsetInfo;
170

171
typedef struct {
172
  int64_t       pollCnt;
173
  int64_t       numOfRows;
174
  SVgOffsetInfo offsetInfo;
175
  int32_t       vgId;
176
  int32_t       vgStatus;
177
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
178
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
179
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
180
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
181
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
182
  SEpSet        epSet;
183
} SMqClientVg;
184

185
typedef struct {
186
  char           topicName[TSDB_TOPIC_FNAME_LEN];
187
  char           db[TSDB_DB_FNAME_LEN];
188
  SArray*        vgs;  // SArray<SMqClientVg>
189
  int8_t         noPrivilege;
190
} SMqClientTopic;
191

192
typedef struct {
193
  int32_t         vgId;
194
  char            topicName[TSDB_TOPIC_FNAME_LEN];
195
  SMqClientTopic* topicHandle;
196
  uint64_t        reqId;
197
  SEpSet*         pEpset;
198
  void*           data;
199
  uint32_t        len;
200
  union {
201
    struct{
202
      SMqRspHead   head;
203
      STqOffsetVal rspOffset;
204
    };
205
    SMqDataRsp      dataRsp;
206
    SMqMetaRsp      metaRsp;
207
    SMqBatchMetaRsp batchMetaRsp;
208
  };
209
} SMqPollRspWrapper;
210

211
typedef struct {
212
  int32_t code;
213
  int8_t  tmqRspType;
214
  int32_t epoch;
215
  union{
216
    SMqPollRspWrapper pollRsp;
217
    SMqAskEpRsp       epRsp;
218
  };
219
} SMqRspWrapper;
220

221
typedef struct {
222
  tsem2_t rspSem;
223
  int32_t rspErr;
224
} SMqSubscribeCbParam;
225

226
typedef struct {
227
  int64_t refId;
228
  bool    sync;
229
  void*   pParam;
230
} SMqAskEpCbParam;
231

232
typedef struct {
233
  int64_t  refId;
234
  char     topicName[TSDB_TOPIC_FNAME_LEN];
235
  int32_t  vgId;
236
  uint64_t requestId;  // request id for debug purpose
237
} SMqPollCbParam;
238

239
typedef struct {
240
  tsem2_t       rsp;
241
  int32_t       numOfRsp;
242
  SArray*       pList;
243
  TdThreadMutex mutex;
244
  int64_t       consumerId;
245
  char*         pTopicName;
246
  int32_t       code;
247
} SMqVgCommon;
248

249
typedef struct {
250
  tsem2_t sem;
251
  int32_t code;
252
} SMqSeekParam;
253

254
typedef struct {
255
  tsem2_t     sem;
256
  int32_t     code;
257
  SMqVgOffset vgOffset;
258
} SMqCommittedParam;
259

260
typedef struct {
261
  int32_t      vgId;
262
  int32_t      epoch;
263
  int32_t      totalReq;
264
  SMqVgCommon* pCommon;
265
} SMqVgWalInfoParam;
266

267
typedef struct {
268
  int64_t        refId;
269
  int32_t        epoch;
270
  int32_t        waitingRspNum;
271
  int32_t        code;
272
  tmq_commit_cb* callbackFn;
273
  void*          userParam;
274
} SMqCommitCbParamSet;
275

276
typedef struct {
277
  SMqCommitCbParamSet* params;
278
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
279
  int32_t              vgId;
280
  int64_t              consumerId;
281
} SMqCommitCbParam;
282

283
typedef struct {
284
  tsem2_t sem;
285
  int32_t code;
286
} SSyncCommitInfo;
287

288
typedef struct {
289
  STqOffsetVal currentOffset;
290
  STqOffsetVal commitOffset;
291
  STqOffsetVal seekOffset;
292
  int64_t      numOfRows;
293
  int32_t      vgStatus;
294
} SVgroupSaveInfo;
295

296
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
297
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
298
static   SMqMgmt        tmqMgmt = {0};
299

300
tmq_conf_t* tmq_conf_new() {
114,185✔
301
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
114,185✔
302
  if (conf == NULL) {
114,185✔
303
    return conf;
×
304
  }
305

306
  conf->withTbName = false;
114,185✔
307
  conf->autoCommit = true;
114,185✔
308
  conf->enableWalMarker = false;
114,185✔
309
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
114,185✔
310
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
114,185✔
311
  conf->enableBatchMeta = false;
114,185✔
312
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
114,185✔
313
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
114,185✔
314
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
114,185✔
315
  conf->maxPollWaitTime = DEFAULT_MAX_POLL_WAIT_TIME;
114,185✔
316
  conf->minPollRows = DEFAULT_MIN_POLL_ROWS;
114,185✔
317

318
  return conf;
114,185✔
319
}
320

321
void tmq_conf_destroy(tmq_conf_t* conf) {
114,185✔
322
  if (conf) {
114,185✔
323
    if (conf->ip) {
114,185✔
324
      taosMemoryFree(conf->ip);
4,877✔
325
    }
326
    if (conf->user) {
114,185✔
327
      taosMemoryFree(conf->user);
113,479✔
328
    }
329
    if (conf->pass) {
114,185✔
330
      taosMemoryFree(conf->pass);
113,479✔
331
    }
332
    taosMemoryFree(conf);
114,185✔
333
  }
334
}
114,185✔
335

336
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
809,691✔
337
  int32_t code = 0;
809,691✔
338
  if (conf == NULL || key == NULL || value == NULL) {
809,691✔
339
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
340
    return TMQ_CONF_INVALID;
×
341
  }
342
  if (strcasecmp(key, "group.id") == 0) {
809,848✔
343
    if (strchr(value, TMQ_SEPARATOR_CHAR) != NULL) {
114,132✔
344
      tqErrorC("invalid group.id:%s, can not contains ':'", value);
×
345
      return TMQ_CONF_INVALID;
×
346
    }
347
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
114,132✔
348
    return TMQ_CONF_OK;
114,132✔
349
  }
350

351
  if (strcasecmp(key, "client.id") == 0) {
695,716✔
352
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
20,984✔
353
    return TMQ_CONF_OK;
20,984✔
354
  }
355

356
  if (strcasecmp(key, "enable.auto.commit") == 0) {
674,732✔
357
    if (strcasecmp(value, "true") == 0) {
108,751✔
358
      conf->autoCommit = true;
67,194✔
359
      return TMQ_CONF_OK;
67,194✔
360
    } else if (strcasecmp(value, "false") == 0) {
41,557✔
361
      conf->autoCommit = false;
41,557✔
362
      return TMQ_CONF_OK;
41,557✔
363
    } else {
364
      tqErrorC("invalid value for enable.auto.commit:%s", value);
×
365
      return TMQ_CONF_INVALID;
×
366
    }
367
  }
368

369
  if (strcasecmp(key, "enable.wal.marker") == 0) {
565,981✔
370
    if (strcasecmp(value, "true") == 0) {
×
371
      conf->enableWalMarker = true;
×
372
      return TMQ_CONF_OK;
×
373
    } else if (strcasecmp(value, "false") == 0) {
×
374
      conf->enableWalMarker = false;
×
375
      return TMQ_CONF_OK;
×
376
    } else {
377
      tqErrorC("invalid value for enable.wal.marker:%s", value);
×
378
      return TMQ_CONF_INVALID;
×
379
    }
380
  }
381

382
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
565,981✔
383
    int64_t tmp;
73,809✔
384
    code = taosStr2int64(value, &tmp);
80,142✔
385
    if (tmp < 0 || code != 0) {
80,195✔
386
      tqErrorC("invalid value for auto.commit.interval.ms:%s", value);
×
387
      return TMQ_CONF_INVALID;
×
388
    }
389
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
80,195✔
390
    return TMQ_CONF_OK;
80,195✔
391
  }
392

393
  if (strcasecmp(key, "session.timeout.ms") == 0) {
485,839✔
394
    int64_t tmp;
1,294✔
395
    code = taosStr2int64(value, &tmp);
1,559✔
396
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
1,559✔
397
      tqErrorC("invalid value for session.timeout.ms:%s", value);
×
398
      return TMQ_CONF_INVALID;
×
399
    }
400
    conf->sessionTimeoutMs = tmp;
1,559✔
401
    return TMQ_CONF_OK;
1,559✔
402
  }
403

404
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
484,280✔
405
    int64_t tmp;
324✔
406
    code = taosStr2int64(value, &tmp);
324✔
407
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
324✔
408
      tqErrorC("invalid value for heartbeat.interval.ms:%s", value);
324✔
409
      return TMQ_CONF_INVALID;
324✔
410
    }
411
    conf->heartBeatIntervalMs = tmp;
×
412
    return TMQ_CONF_OK;
×
413
  }
414

415
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
483,956✔
416
    int32_t tmp;
1,008✔
417
    code = taosStr2int32(value, &tmp);
1,008✔
418
    if (tmp < 1000 || code != 0) {
1,008✔
419
      tqErrorC("invalid value for max.poll.interval.ms:%s", value);
×
420
      return TMQ_CONF_INVALID;
×
421
    }
422
    conf->maxPollIntervalMs = tmp;
1,008✔
423
    return TMQ_CONF_OK;
1,008✔
424
  }
425

426
  if (strcasecmp(key, "auto.offset.reset") == 0) {
482,948✔
427
    if (strcasecmp(value, "none") == 0) {
103,087✔
428
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
2,404✔
429
      return TMQ_CONF_OK;
2,404✔
430
    } else if (strcasecmp(value, "earliest") == 0) {
100,683✔
431
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
94,632✔
432
      return TMQ_CONF_OK;
94,632✔
433
    } else if (strcasecmp(value, "latest") == 0) {
6,051✔
434
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
6,051✔
435
      return TMQ_CONF_OK;
6,051✔
436
    } else {
437
      tqErrorC("invalid value for auto.offset.reset:%s", value);
×
438
      return TMQ_CONF_INVALID;
×
439
    }
440
  }
441

442
  if (strcasecmp(key, "msg.with.table.name") == 0) {
379,861✔
443
    if (strcasecmp(value, "true") == 0) {
91,061✔
444
      conf->withTbName = true;
86,381✔
445
      return TMQ_CONF_OK;
86,381✔
446
    } else if (strcasecmp(value, "false") == 0) {
4,680✔
447
      conf->withTbName = false;
4,680✔
448
      return TMQ_CONF_OK;
4,680✔
449
    } else {
450
      tqErrorC("invalid value for msg.with.table.name:%s", value);
×
451
      return TMQ_CONF_INVALID;
×
452
    }
453
  }
454

455
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
288,800✔
456
    if (strcasecmp(value, "true") == 0) {
35,453✔
457
      conf->snapEnable = true;
28,753✔
458
      return TMQ_CONF_OK;
28,753✔
459
    } else if (strcasecmp(value, "false") == 0) {
6,700✔
460
      conf->snapEnable = false;
6,700✔
461
      return TMQ_CONF_OK;
6,700✔
462
    } else {
463
      tqErrorC("invalid value for experimental.snapshot.enable:%s", value);
×
464
      return TMQ_CONF_INVALID;
×
465
    }
466
  }
467

468
  if (strcasecmp(key, "td.connect.ip") == 0) {
253,347✔
469
    void *tmp = taosStrdup(value);
4,877✔
470
    if (tmp == NULL) {
4,877✔
471
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
472
      return TMQ_CONF_INVALID;
×
473
    }
474
    conf->ip = tmp;
4,877✔
475
    return TMQ_CONF_OK;
4,877✔
476
  }
477

478
  if (strcasecmp(key, "td.connect.user") == 0) {
248,470✔
479
    void *tmp = taosStrdup(value);
113,428✔
480
    if (tmp == NULL) {
113,428✔
481
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
482
      return TMQ_CONF_INVALID;
×
483
    }
484
    conf->user = tmp;
113,428✔
485
    return TMQ_CONF_OK;
113,428✔
486
  }
487

488
  if (strcasecmp(key, "td.connect.pass") == 0) {
135,042✔
489
    void *tmp = taosStrdup(value);
113,479✔
490
    if (tmp == NULL) {
113,479✔
491
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
492
      return TMQ_CONF_INVALID;
×
493
    }
494
    conf->pass = tmp;
113,479✔
495
    return TMQ_CONF_OK;
113,479✔
496
  }
497

498
  if (strcasecmp(key, "td.connect.port") == 0) {
21,563✔
499
    int64_t tmp;
3,872✔
500
    code = taosStr2int64(value, &tmp);
4,030✔
501
    if (tmp <= 0 || tmp > 65535 || code != 0) {
4,030✔
502
      tqErrorC("invalid value for td.connect.port:%s", value);
233✔
503
      return TMQ_CONF_INVALID;
233✔
504
    }
505

506
    conf->port = tmp;
3,797✔
507
    return TMQ_CONF_OK;
3,797✔
508
  }
509

510
  if (strcasecmp(key, "enable.replay") == 0) {
17,533✔
511
    if (strcasecmp(value, "true") == 0) {
1,827✔
512
      conf->replayEnable = true;
1,827✔
513
      return TMQ_CONF_OK;
1,827✔
514
    } else if (strcasecmp(value, "false") == 0) {
×
515
      conf->replayEnable = false;
×
516
      return TMQ_CONF_OK;
×
517
    } else {
518
      tqErrorC("invalid value for enable.replay:%s", value);
×
519
      return TMQ_CONF_INVALID;
×
520
    }
521
  }
522
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
15,706✔
523
    int64_t tmp = 0;
10,904✔
524
    code = taosStr2int64(value, &tmp);
10,904✔
525
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
10,904✔
526
    return TMQ_CONF_OK;
10,904✔
527
  }
528
  if (strcasecmp(key, "msg.consume.rawdata") == 0) {
4,802✔
529
    int64_t tmp = 0;
×
530
    code = taosStr2int64(value, &tmp);
×
531
    conf->rawData = (0 == code && tmp != 0) ? 1 : 0;
×
532
    return TMQ_CONF_OK;
×
533
  }
534

535
  if (strcasecmp(key, "fetch.max.wait.ms") == 0) {
4,802✔
536
    int64_t tmp = 0;
684✔
537
    code = taosStr2int64(value, &tmp);
684✔
538
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
684✔
539
      tqErrorC("invalid value for fetch.max.wait.ms:%s", value);
×
540
      return TMQ_CONF_INVALID;
×
541
    }
542
    conf->maxPollWaitTime = tmp;
684✔
543
    return TMQ_CONF_OK;
684✔
544
  }
545

546
  if (strcasecmp(key, "min.poll.rows") == 0) {
4,118✔
547
    int64_t tmp = 0;
3,055✔
548
    code = taosStr2int64(value, &tmp);
3,055✔
549
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
3,055✔
550
      tqErrorC("invalid value for min.poll.rows:%s", value);
×
551
      return TMQ_CONF_INVALID;
×
552
    }
553
    conf->minPollRows = tmp;
3,055✔
554
    return TMQ_CONF_OK;
3,055✔
555
  }
556

557
  if (strcasecmp(key, "td.connect.db") == 0) {
1,065✔
558
    return TMQ_CONF_OK;
×
559
  }
560

561
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
1,065✔
562
    int64_t tmp;
1,043✔
563
    code = taosStr2int64(value, &tmp);
1,043✔
564
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
1,043✔
565
    return TMQ_CONF_OK;
1,043✔
566
  }
567

568
  tqErrorC("unknown key:%s", key);
22✔
569
  return TMQ_CONF_UNKNOWN;
22✔
570
}
571

572
tmq_list_t* tmq_list_new() {
322,643✔
573
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
322,643✔
574
}
575

576
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
123,302✔
577
  if (list == NULL) {
123,302✔
578
    return TSDB_CODE_INVALID_PARA;
×
579
  }
580
  SArray* container = &list->container;
123,302✔
581
  if (src == NULL || src[0] == 0) {
123,302✔
582
    return TSDB_CODE_INVALID_PARA;
×
583
  }
584
  char* topic = taosStrdup(src);
123,302✔
585
  if (topic == NULL) return terrno;
123,302✔
586
  if (taosArrayPush(container, &topic) == NULL) {
123,302✔
587
    taosMemoryFree(topic);
×
588
    return terrno;
×
589
  }
590
  return 0;
123,302✔
591
}
592

593
void tmq_list_destroy(tmq_list_t* list) {
322,643✔
594
  if (list == NULL) return;
322,643✔
595
  SArray* container = &list->container;
322,643✔
596
  taosArrayDestroyP(container, taosMemFree);
322,643✔
597
}
598

599
int32_t tmq_list_get_size(const tmq_list_t* list) {
7,943✔
600
  if (list == NULL) {
7,943✔
601
    return TSDB_CODE_INVALID_PARA;
×
602
  }
603
  const SArray* container = &list->container;
7,943✔
604
  return taosArrayGetSize(container);
7,943✔
605
}
606

607
char** tmq_list_to_c_array(const tmq_list_t* list) {
4,866✔
608
  if (list == NULL) {
4,866✔
609
    return NULL;
×
610
  }
611
  const SArray* container = &list->container;
4,866✔
612
  return container->pData;
4,866✔
613
}
614

615
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
3,407,174✔
616
  if (pParamSet == NULL) {
3,407,174✔
617
    return TSDB_CODE_INVALID_PARA;
×
618
  }
619
  int64_t refId = pParamSet->refId;
3,407,174✔
620
  int32_t code = 0;
3,407,174✔
621
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
3,407,174✔
622
  if (tmq == NULL) {
3,407,174✔
623
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
624
  }
625

626
  // if no more waiting rsp
627
  if (pParamSet->callbackFn != NULL) {
3,407,174✔
628
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
3,401,632✔
629
  }
630

631
  taosMemoryFree(pParamSet);
3,407,174✔
632
  if (tmq != NULL) {
3,407,174✔
633
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
3,407,174✔
634
  }
635

636
  return code;
3,407,174✔
637
}
638

639
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
5,226,805✔
640
  if (pParamSet == NULL) {
5,226,805✔
641
    return TSDB_CODE_INVALID_PARA;
×
642
  }
643
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
5,226,805✔
644
  if (waitingRspNum == 0) {
5,226,805✔
645
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
3,407,174✔
646
             vgId);
647
    return tmqCommitDone(pParamSet);
3,407,174✔
648
  } else {
649
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
1,819,631✔
650
             waitingRspNum);
651
  }
652
  return 0;
1,819,631✔
653
}
654

655
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
1,830,132✔
656
  if (pBuf){
1,830,132✔
657
    taosMemoryFreeClear(pBuf->pData);
1,830,132✔
658
    taosMemoryFreeClear(pBuf->pEpSet);
1,830,132✔
659
  }
660
  if(param == NULL){
1,830,132✔
661
    return TSDB_CODE_INVALID_PARA;
×
662
  }
663
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
1,830,132✔
664
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
1,830,132✔
665

666
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
1,830,132✔
667
}
668

669
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
1,830,132✔
670
                               SMqCommitCbParamSet* pParamSet) {
671
  if (tmq == NULL || epSet == NULL || offset == NULL || pTopicName == NULL || pParamSet == NULL) {
1,830,132✔
672
    return TSDB_CODE_INVALID_PARA;
×
673
  }
674
  SMqVgOffset pOffset = {0};
1,830,132✔
675

676
  pOffset.consumerId = tmq->consumerId;
1,830,132✔
677
  pOffset.offset.val = *offset;
1,830,132✔
678
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
1,830,132✔
679
  int32_t len = 0;
1,830,132✔
680
  int32_t code = 0;
1,830,132✔
681
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
1,830,132✔
682
  if (code < 0) {
1,830,132✔
683
    return TSDB_CODE_INVALID_PARA;
×
684
  }
685

686
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
1,830,132✔
687
  if (buf == NULL) {
1,830,132✔
688
    return terrno;
×
689
  }
690

691
  ((SMsgHead*)buf)->vgId = htonl(vgId);
1,830,132✔
692

693
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1,830,132✔
694

695
  SEncoder encoder = {0};
1,830,132✔
696
  tEncoderInit(&encoder, abuf, len);
1,830,132✔
697
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
1,830,132✔
698
    tEncoderClear(&encoder);
×
699
    taosMemoryFree(buf);
×
700
    return TSDB_CODE_INVALID_PARA;
×
701
  }
702
  tEncoderClear(&encoder);
1,830,132✔
703

704
  // build param
705
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
1,830,132✔
706
  if (pParam == NULL) {
1,830,132✔
707
    taosMemoryFree(buf);
×
708
    return terrno;
×
709
  }
710

711
  pParam->params = pParamSet;
1,830,132✔
712
  pParam->vgId = vgId;
1,830,132✔
713
  pParam->consumerId = tmq->consumerId;
1,830,132✔
714

715
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
1,830,132✔
716

717
  // build send info
718
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,830,132✔
719
  if (pMsgSendInfo == NULL) {
1,830,132✔
720
    taosMemoryFree(buf);
×
721
    taosMemoryFree(pParam);
×
722
    return terrno;
×
723
  }
724

725
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
1,830,132✔
726

727
  pMsgSendInfo->requestId = generateRequestId();
1,830,132✔
728
  pMsgSendInfo->requestObjRefId = 0;
1,830,132✔
729
  pMsgSendInfo->param = pParam;
1,830,132✔
730
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
1,830,132✔
731
  pMsgSendInfo->fp = tmqCommitCb;
1,830,132✔
732
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
1,830,132✔
733

734
  // int64_t transporterId = 0;
735
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
1,830,132✔
736
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
1,830,132✔
737
  if (code != 0) {
1,830,132✔
738
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
739
  }
740
  return code;
1,830,132✔
741
}
742

743
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
53,184✔
744
  if (tmq == NULL || pTopicName == NULL || topic == NULL) {
53,184✔
745
    return TSDB_CODE_INVALID_PARA;
×
746
  }
747
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
53,184✔
748
  for (int32_t i = 0; i < numOfTopics; ++i) {
53,242✔
749
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
37,210✔
750
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
37,210✔
751
      continue;
58✔
752
    }
753
    *topic = pTopic;
37,152✔
754
    return 0;
37,152✔
755
  }
756

757
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
16,032✔
758
  return TSDB_CODE_TMQ_INVALID_TOPIC;
16,032✔
759
}
760

761
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
3,409,935✔
762
                                       SMqCommitCbParamSet** ppParamSet) {
763
  if (tmq == NULL || ppParamSet == NULL) {
3,409,935✔
764
    return TSDB_CODE_INVALID_PARA;
×
765
  }
766
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
3,409,935✔
767
  if (pParamSet == NULL) {
3,409,935✔
768
    return terrno;
×
769
  }
770

771
  pParamSet->refId = tmq->refId;
3,409,935✔
772
  pParamSet->epoch = atomic_load_32(&tmq->epoch);
3,409,935✔
773
  pParamSet->callbackFn = pCommitFp;
3,409,935✔
774
  pParamSet->userParam = userParam;
3,409,935✔
775
  pParamSet->waitingRspNum = rspNum;
3,409,935✔
776
  *ppParamSet = pParamSet;
3,409,935✔
777
  return 0;
3,409,935✔
778
}
779

780
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
51,309✔
781
  if (tmq == NULL || pTopicName == NULL || pVg == NULL) {
51,309✔
782
    return TSDB_CODE_INVALID_PARA;
×
783
  }
784
  SMqClientTopic* pTopic = NULL;
51,309✔
785
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
51,309✔
786
  if (code != 0) {
51,309✔
787
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
16,032✔
788
    return code;
16,032✔
789
  }
790

791
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
35,277✔
792
  for (int32_t i = 0; i < numOfVgs; ++i) {
50,631✔
793
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
48,617✔
794
    if (pClientVg && pClientVg->vgId == vgId) {
48,617✔
795
      *pVg = pClientVg;
33,263✔
796
      break;
33,263✔
797
    }
798
  }
799

800
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
35,277✔
801
}
802

803
static int32_t sendWalMarkMsgToMnodeCb(void* param, SDataBuf* pMsg, int32_t code) {
×
804
  if (pMsg) {
×
805
    taosMemoryFreeClear(pMsg->pEpSet);
×
806
    taosMemoryFreeClear(pMsg->pData);
×
807
  }
808
  tqDebugC("sendWalMarkMsgToMnodeCb code:%d", code);
×
809
  return 0;
×
810
}
811

812
static void asyncSendWalMarkMsgToMnode(tmq_t* tmq, int32_t vgId, int64_t keepVersion) {
×
813
  if (tmq == NULL) return ;
×
814
  void*           buf = NULL;
×
815
  SMsgSendInfo*   sendInfo = NULL;
×
816
  SMndSetVgroupKeepVersionReq req = {0};
×
817

818
  tqDebugC("consumer:0x%" PRIx64 " send vgId:%d keepVersion:%"PRId64, tmq->consumerId, vgId, keepVersion);
×
819
  req.vgId = vgId;
×
820
  req.keepVersion = keepVersion;
×
821

822
  int32_t tlen = tSerializeSMndSetVgroupKeepVersionReq(NULL, 0, &req);
×
823
  buf = taosMemoryMalloc(tlen);
×
824
  if (buf == NULL) {
×
825
    return;
×
826
  }
827
  tlen = tSerializeSMndSetVgroupKeepVersionReq(buf, tlen, &req);
×
828

829
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
830
  if (sendInfo == NULL) {
×
831
    taosMemoryFree(buf);
×
832
    return;
×
833
  }
834

835
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
×
836
  sendInfo->requestId = generateRequestId();
×
837
  sendInfo->fp = sendWalMarkMsgToMnodeCb;
×
838
  sendInfo->msgType = TDMT_MND_SET_VGROUP_KEEP_VERSION;
×
839

840
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
×
841

842
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
843
  if (code != 0) {
×
844
    tqErrorC("consumer:0x%" PRIx64 " send wal mark msg to mnode failed, code:%s", tmq->consumerId,
×
845
             tstrerror(terrno));
846
  }
847
}
848

849
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
6,230,675✔
850
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) {
6,230,675✔
851
    return TSDB_CODE_INVALID_PARA;
×
852
  }
853
  int32_t code = 0;
6,230,675✔
854
  if (offsetVal->type <= 0) {
6,230,675✔
855
    code = TSDB_CODE_TMQ_INVALID_MSG;
253,454✔
856
    return code;
253,454✔
857
  }
858
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
5,977,221✔
859
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
4,147,089✔
860
    return code;
4,147,089✔
861
  }
862
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
1,830,132✔
863
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
1,830,132✔
864

865
  char commitBuf[TSDB_OFFSET_LEN] = {0};
1,830,132✔
866
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
1,830,132✔
867

868
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
1,830,132✔
869
  if (code != TSDB_CODE_SUCCESS) {
1,830,132✔
870
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
871
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
872
    return code;
×
873
  }
874

875
  if (tmq->enableWalMarker && offsetVal->type == TMQ_OFFSET__LOG) {
1,830,132✔
876
    asyncSendWalMarkMsgToMnode(tmq, pVg->vgId, offsetVal->version);
×
877
  }
878
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
1,830,132✔
879
           tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
880
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
1,830,132✔
881
  return code;
1,830,132✔
882
}
883

884
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
13,262✔
885
                                 tmq_commit_cb* pCommitFp, void* userParam) {
886
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL) {
13,262✔
887
    return TSDB_CODE_INVALID_PARA;
×
888
  }
889
  tqDebugC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
13,262✔
890
  SMqCommitCbParamSet* pParamSet = NULL;
13,262✔
891
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
13,262✔
892
  if (code != 0){
13,262✔
893
    return code;
×
894
  }
895

896
  taosRLockLatch(&tmq->lock);
13,262✔
897
  SMqClientVg* pVg = NULL;
13,262✔
898
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
13,262✔
899
  if (code == 0) {
13,262✔
900
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
13,262✔
901
  }
902
  taosRUnLockLatch(&tmq->lock);
13,262✔
903

904
  if (code != 0){
13,262✔
905
    taosMemoryFree(pParamSet);
2,761✔
906
  }
907
  return code;
13,262✔
908
}
909

910
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
8,399✔
911
  char*        pTopicName = NULL;
8,399✔
912
  int32_t      vgId = 0;
8,399✔
913
  STqOffsetVal offsetVal = {0};
8,399✔
914
  int32_t      code = 0;
8,399✔
915

916
  if (pRes == NULL || tmq == NULL) {
8,399✔
917
    code = TSDB_CODE_INVALID_PARA;
×
918
    goto end;
×
919
  }
920

921
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
8,399✔
922
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
923
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
8,399✔
924
    pTopicName = pRspObj->topic;
8,399✔
925
    vgId = pRspObj->vgId;
8,399✔
926
    offsetVal = pRspObj->rspOffset;
8,399✔
927
  } else {
928
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
929
    goto end;
×
930
  }
931

932
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
8,399✔
933

934
  end:
8,399✔
935
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
8,399✔
936
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
937
    pCommitFp(tmq, code, userParam);
×
938
  }
939
}
8,399✔
940

941
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
3,396,673✔
942
  if (tmq == NULL || pParamSet == NULL) {
3,396,673✔
943
    return TSDB_CODE_INVALID_PARA;
×
944
  }
945
  int32_t code = 0;
3,396,673✔
946
  taosRLockLatch(&tmq->lock);
3,396,673✔
947
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
3,396,673✔
948
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
3,396,673✔
949

950
  for (int32_t i = 0; i < numOfTopics; i++) {
6,739,539✔
951
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
3,342,866✔
952
    if (pTopic == NULL) {
3,342,866✔
953
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
954
      goto END;
×
955
    }
956
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
3,342,866✔
957
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
3,342,866✔
958
    for (int32_t j = 0; j < numOfVgroups; j++) {
9,560,279✔
959
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
6,217,413✔
960
      if (pVg == NULL) {
6,217,413✔
961
        code = terrno;
×
962
        goto END;
×
963
      }
964

965
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
6,217,413✔
966
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
6,217,413✔
967
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
253,454✔
968
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
969
      }
970
    }
971
  }
972
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
3,396,673✔
973
           numOfTopics);
974
  END:
28,334✔
975
  taosRUnLockLatch(&tmq->lock);
3,396,673✔
976
  return code;
3,396,673✔
977
}
978

979
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
3,396,673✔
980
  if (tmq == NULL) {
3,396,673✔
981
    return;
×
982
  }
983
  int32_t code = 0;
3,396,673✔
984
  SMqCommitCbParamSet* pParamSet = NULL;
3,396,673✔
985
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
986
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
3,396,673✔
987
  if (code != 0) {
3,396,673✔
988
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
989
    if (pCommitFp != NULL) {
×
990
      pCommitFp(tmq, code, userParam);
×
991
    }
992
    return;
×
993
  }
994
  code = innerCommitAll(tmq, pParamSet);
3,396,673✔
995
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
3,396,673✔
996
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
55,462✔
997
  }
998

999
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
3,396,673✔
1000
  if (code != 0) {
3,396,673✔
1001
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
1002
  }
1003
  return;
3,396,673✔
1004
}
1005

1006
static void generateTimedTask(int64_t refId, int32_t type) {
4,978,745✔
1007
  tmq_t*  tmq = NULL;
4,978,745✔
1008
  int8_t* pTaskType = NULL;
4,978,745✔
1009
  int32_t code = 0;
4,978,745✔
1010

1011
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
4,978,745✔
1012
  if (tmq == NULL) return;
4,978,745✔
1013

1014
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
4,978,436✔
1015
  if (code == TSDB_CODE_SUCCESS) {
4,978,436✔
1016
    *pTaskType = type;
4,978,436✔
1017
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
4,978,436✔
1018
      if (tsem2_post(&tmq->rspSem) != 0){
4,978,436✔
1019
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
1020
      }
1021
    }else{
1022
      taosFreeQitem(pTaskType);
×
1023
    }
1024
  }
1025

1026
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
4,978,436✔
1027
  if (code != 0){
4,978,436✔
1028
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
1029
  }
1030
}
1031

1032
void tmqAssignAskEpTask(void* param, void* tmrId) {
1,733,875✔
1033
  int64_t refId = (int64_t)param;
1,733,875✔
1034
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
1,733,875✔
1035
}
1,733,875✔
1036

1037
void tmqReplayTask(void* param, void* tmrId) {
3,132✔
1038
  int64_t refId = (int64_t)param;
3,132✔
1039
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
3,132✔
1040
  if (tmq == NULL) return;
3,132✔
1041

1042
  if (tsem2_post(&tmq->rspSem) != 0){
3,132✔
1043
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
1044
  }
1045
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
3,132✔
1046
  if (code != 0){
3,132✔
1047
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
1048
  }
1049
}
1050

1051
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
3,244,870✔
1052
  int64_t refId = (int64_t)param;
3,244,870✔
1053
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
3,244,870✔
1054
}
3,244,870✔
1055

1056
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
840,643✔
1057
  if (pMsg == NULL) {
840,643✔
1058
    return TSDB_CODE_INVALID_PARA;
×
1059
  }
1060

1061
  if (param == NULL || code != 0){
840,643✔
1062
    goto END;
8,203✔
1063
  }
1064

1065
  SMqHbRsp rsp = {0};
832,440✔
1066
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
832,440✔
1067
  if (code != 0) {
832,440✔
1068
    goto END;
×
1069
  }
1070

1071
  int64_t refId = (int64_t)param;
832,440✔
1072
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
832,440✔
1073
  if (tmq != NULL) {
832,440✔
1074
    taosWLockLatch(&tmq->lock);
830,899✔
1075
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
1,593,469✔
1076
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
762,570✔
1077
      if (privilege == NULL) {
762,570✔
1078
        continue;
×
1079
      }
1080
      int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
762,570✔
1081
      for (int32_t j = 0; j < topicNumCur; j++) {
1,598,041✔
1082
        SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
835,471✔
1083
        if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0 && pTopicCur->noPrivilege != privilege->noPrivilege) {
835,471✔
1084
          tqInfoC("consumer:0x%" PRIx64 ", update privilege:%s, topic:%s", tmq->consumerId, privilege->noPrivilege ? "false" : "true", privilege->topic);
146✔
1085
          pTopicCur->noPrivilege = privilege->noPrivilege;
146✔
1086
        }
1087
      }
1088
    }
1089
    taosWUnLockLatch(&tmq->lock);
830,899✔
1090
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
830,899✔
1091
    if (code != 0){
830,899✔
1092
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
1093
    }
1094
  }
1095

1096
  tqClientDebugFlag = rsp.debugFlag;
832,440✔
1097

1098
  tDestroySMqHbRsp(&rsp);
832,440✔
1099

1100
  END:
840,643✔
1101
  taosMemoryFree(pMsg->pData);
840,643✔
1102
  taosMemoryFree(pMsg->pEpSet);
840,643✔
1103
  return code;
840,643✔
1104
}
1105

1106
void tmqSendHbReq(void* param, void* tmrId) {
840,952✔
1107
  int64_t refId = (int64_t)param;
840,952✔
1108

1109
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
840,952✔
1110
  if (tmq == NULL) {
840,952✔
1111
    return;
309✔
1112
  }
1113

1114
  SMqHbReq req = {0};
840,643✔
1115
  req.consumerId = tmq->consumerId;
840,643✔
1116
  req.epoch = atomic_load_32(&tmq->epoch);
840,643✔
1117
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
840,643✔
1118
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
840,643✔
1119
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
840,643✔
1120
  if (req.topics == NULL) {
840,643✔
1121
    goto END;
×
1122
  }
1123
  taosRLockLatch(&tmq->lock);
840,643✔
1124
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
1,621,431✔
1125
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
780,788✔
1126
    if (pTopic == NULL) {
780,788✔
1127
      continue;
×
1128
    }
1129
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
780,788✔
1130
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
780,788✔
1131
    if (data == NULL) {
780,788✔
1132
      continue;
×
1133
    }
1134
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
780,788✔
1135
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
780,788✔
1136
    if (data->offsetRows == NULL) {
780,788✔
1137
      continue;
×
1138
    }
1139
    for (int j = 0; j < numOfVgroups; j++) {
2,906,064✔
1140
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
2,125,276✔
1141
      if (pVg == NULL) {
2,125,276✔
1142
        continue;
×
1143
      }
1144
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
2,125,276✔
1145
      if (offRows == NULL) {
2,125,276✔
1146
        continue;
×
1147
      }
1148
      offRows->vgId = pVg->vgId;
2,125,276✔
1149
      offRows->rows = pVg->numOfRows;
2,125,276✔
1150
      offRows->offset = pVg->offsetInfo.endOffset;
2,125,276✔
1151
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
2,125,276✔
1152
      char buf[TSDB_OFFSET_LEN] = {0};
2,125,276✔
1153
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
2,125,276✔
1154
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
2,125,276✔
1155
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1156
    }
1157
  }
1158
  taosRUnLockLatch(&tmq->lock);
840,643✔
1159

1160
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
840,643✔
1161
  if (tlen < 0) {
840,643✔
1162
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1163
    goto END;
×
1164
  }
1165

1166
  void* pReq = taosMemoryCalloc(1, tlen);
840,643✔
1167
  if (pReq == NULL) {
840,643✔
1168
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1169
    goto END;
×
1170
  }
1171

1172
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
840,643✔
1173
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1174
    taosMemoryFree(pReq);
×
1175
    goto END;
×
1176
  }
1177

1178
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
840,643✔
1179
  if (sendInfo == NULL) {
840,643✔
1180
    taosMemoryFree(pReq);
×
1181
    goto END;
×
1182
  }
1183

1184
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
840,643✔
1185

1186
  sendInfo->requestId = generateRequestId();
840,643✔
1187
  sendInfo->requestObjRefId = 0;
840,643✔
1188
  sendInfo->param = (void*)refId;
840,643✔
1189
  sendInfo->fp = tmqHbCb;
840,643✔
1190
  sendInfo->msgType = TDMT_MND_TMQ_HB;
840,643✔
1191

1192

1193
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
840,643✔
1194

1195
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
840,643✔
1196
  if (code != 0) {
840,643✔
1197
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1198
  }
1199
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
840,643✔
1200

1201
  END:
840,643✔
1202
  tDestroySMqHbReq(&req);
840,643✔
1203
  if (tmrId != NULL) {
840,643✔
1204
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
644,718✔
1205
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
644,718✔
1206
  }
1207
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
840,643✔
1208
  if (ret != 0){
840,643✔
1209
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1210
  }
1211
}
1212

1213
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
128,525✔
1214
  if (code != 0 && pTmq != NULL) {
128,525✔
1215
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1216
  }
1217
}
128,525✔
1218

1219
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
20,699,472✔
1220
  if (rspWrapper == NULL) {
20,699,472✔
1221
    return;
×
1222
  }
1223
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
20,699,472✔
1224
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
2,314,673✔
1225
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
18,384,799✔
1226
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
18,320,715✔
1227
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
64,084✔
1228
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
5,061✔
1229
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
59,023✔
1230
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
53,256✔
1231
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
5,767✔
1232
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
5,767✔
1233
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
1234
    DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
×
1235
  }
1236
}
1237

1238
static void freeClientVg(void* param) {
1,069,398✔
1239
  if (param == NULL) {
1,069,398✔
1240
    return;
×
1241
  }
1242
  SMqClientVg* pVg = param;
1,069,398✔
1243
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
1,069,398✔
1244
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
1,069,398✔
1245
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
1,069,398✔
1246
}
1247
static void freeClientTopic(void* param) {
812,678✔
1248
  if (param == NULL) {
812,678✔
1249
    return;
×
1250
  }
1251
  SMqClientTopic* pTopic = param;
812,678✔
1252
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
812,678✔
1253
}
1254

1255
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
813,675✔
1256
                                   tmq_t* tmq) {
1257
  if (pTopic == NULL || pTopicEp == NULL || pVgOffsetHashMap == NULL || tmq == NULL) {
813,675✔
1258
    return;
×
1259
  }
1260

1261
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
813,675✔
1262
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
813,675✔
1263

1264
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
813,675✔
1265
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
813,675✔
1266

1267
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
813,675✔
1268
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
813,675✔
1269
  if (pTopic->vgs == NULL) {
813,675✔
1270
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1271
    return;
×
1272
  }
1273
  for (int32_t j = 0; j < vgNumGet; j++) {
1,890,595✔
1274
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
1,076,920✔
1275
    if (pVgEp == NULL) {
1,076,920✔
1276
      continue;
×
1277
    }
1278
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
1,076,920✔
1279
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
1,076,920✔
1280

1281
    STqOffsetVal offsetNew = {0};
1,076,920✔
1282
    offsetNew.type = tmq->resetOffsetCfg;
1,076,920✔
1283

1284
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
1,076,920✔
1285
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1286

1287
    SMqClientVg clientVg = {
2,177,938✔
1288
        .pollCnt = 0,
1289
        .vgId = pVgEp->vgId,
1,076,920✔
1290
        .epSet = pVgEp->epSet,
1291
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
1,076,920✔
1292
        .vgSkipCnt = 0,
1293
        .emptyBlockReceiveTs = 0,
1294
        .blockReceiveTs = 0,
1295
        .blockSleepForReplay = 0,
1296
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
1,076,920✔
1297
    };
1298

1299
    clientVg.offsetInfo.walVerBegin = -1;
1,076,920✔
1300
    clientVg.offsetInfo.walVerEnd = -1;
1,076,920✔
1301
    clientVg.seekUpdated = false;
1,076,920✔
1302
    if (pInfo) {
1,076,920✔
1303
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
697,190✔
1304
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
697,190✔
1305
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
697,190✔
1306
    } else {
1307
      clientVg.offsetInfo.endOffset = offsetNew;
379,730✔
1308
      clientVg.offsetInfo.committedOffset = offsetNew;
379,730✔
1309
      clientVg.offsetInfo.beginOffset = offsetNew;
379,730✔
1310
    }
1311
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
2,153,840✔
1312
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1313
               pTopic->topicName);
1314
      freeClientVg(&clientVg);
×
1315
    }
1316
  }
1317
}
1318

1319
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
807,074✔
1320
  if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
807,074✔
1321
    return;
×
1322
  }
1323
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
807,074✔
1324
  if (pVgOffsetHashMap == NULL) {
807,074✔
1325
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1326
    return;
×
1327
  }
1328

1329
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
807,074✔
1330
  for (int32_t i = 0; i < topicNumCur; i++) {
1,503,920✔
1331
    // find old topic
1332
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
696,846✔
1333
    if (pTopicCur && pTopicCur->vgs) {
696,846✔
1334
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
696,846✔
1335
      tqInfoC("consumer:0x%" PRIx64 ", current vg num:%d", tmq->consumerId, vgNumCur);
696,846✔
1336
      for (int32_t j = 0; j < vgNumCur; j++) {
1,396,925✔
1337
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
700,079✔
1338
        if (pVgCur == NULL) {
700,079✔
1339
          continue;
×
1340
        }
1341
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
700,079✔
1342
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
700,079✔
1343

1344
        char buf[TSDB_OFFSET_LEN] = {0};
700,079✔
1345
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
700,079✔
1346
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
700,079✔
1347

1348
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
700,079✔
1349
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1350
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1351
            .numOfRows = pVgCur->numOfRows,
700,079✔
1352
            .vgStatus = pVgCur->vgStatus};
700,079✔
1353
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
700,079✔
1354
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1355
        }
1356
      }
1357
    }
1358
  }
1359

1360
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
1,620,749✔
1361
    SMqClientTopic topic = {0};
813,675✔
1362
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
813,675✔
1363
    if (pTopicEp == NULL) {
813,675✔
1364
      continue;
×
1365
    }
1366
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
813,675✔
1367
    if (taosArrayPush(newTopics, &topic) == NULL) {
813,675✔
1368
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1369
      freeClientTopic(&topic);
×
1370
    }
1371
  }
1372

1373
  taosHashCleanup(pVgOffsetHashMap);
807,074✔
1374
}
1375

1376
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
2,616,047✔
1377
  if (tmq == NULL || pRsp == NULL) {
2,616,047✔
1378
    return;
×
1379
  }
1380
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
2,616,047✔
1381
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1382
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1383
  if (epoch < atomic_load_32(&tmq->epoch) || (epoch == atomic_load_32(&tmq->epoch) && topicNumGet == 0)) {
2,616,047✔
1384
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
1,699,390✔
1385
             tmq->epoch, epoch, topicNumGet);
1386
    return;
1,699,390✔
1387
  }
1388

1389
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
916,657✔
1390
  if (newTopics == NULL) {
916,657✔
1391
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1392
    return;
×
1393
  }
1394
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
916,657✔
1395
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1396

1397
  taosWLockLatch(&tmq->lock);
916,657✔
1398
  if (topicNumGet > 0){
916,657✔
1399
    buildNewTopicList(tmq, newTopics, pRsp);
807,074✔
1400
  }
1401
  // destroy current buffered existed topics info
1402
  if (tmq->clientTopics) {
916,657✔
1403
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
916,657✔
1404
  }
1405
  tmq->clientTopics = newTopics;
916,657✔
1406
  taosWUnLockLatch(&tmq->lock);
916,657✔
1407

1408
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
916,657✔
1409
  atomic_store_32(&tmq->epoch, epoch);
916,657✔
1410

1411
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
916,657✔
1412
}
1413

1414
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
3,248,194✔
1415
  SMqAskEpRsp rsp = {0};
3,248,194✔
1416

1417
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
3,248,194✔
1418
  if (pParam == NULL) {
3,248,194✔
1419
    goto _ERR;
×
1420
  }
1421

1422
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
3,248,194✔
1423
  if (tmq == NULL) {
3,248,194✔
1424
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1425
    goto _ERR;
×
1426
  }
1427

1428
  if (code != TSDB_CODE_SUCCESS) {
3,248,194✔
1429
    tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
7,114✔
1430
    if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){
7,114✔
1431
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
6,472✔
1432
    }
1433
    goto END;
7,114✔
1434
  }
1435

1436
  if (pMsg == NULL) {
3,241,080✔
1437
    goto END;
×
1438
  }
1439
  SMqRspHead* head = pMsg->pData;
3,241,080✔
1440
  int32_t     epoch = atomic_load_32(&tmq->epoch);
3,241,080✔
1441
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
3,241,080✔
1442

1443
  if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) == NULL) {
6,482,160✔
1444
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
1445
    tqErrorC("consumer:0x%" PRIx64 ", decode ep rsp failed", tmq->consumerId);
×
1446
    goto END;
×
1447
  }
1448

1449
  if (rsp.code != TSDB_CODE_SUCCESS) {
3,241,080✔
1450
    code = rsp.code;
612,246✔
1451
    goto END;
612,246✔
1452
  }
1453

1454
  if (pParam->sync) {
2,628,834✔
1455
    doUpdateLocalEp(tmq, head->epoch, &rsp);
314,161✔
1456
  } else {
1457
    SMqRspWrapper* pWrapper = NULL;
2,314,673✔
1458
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
2,314,673✔
1459
    if (code) {
2,314,673✔
1460
      goto END;
×
1461
    }
1462

1463
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
2,314,673✔
1464
    pWrapper->epoch = head->epoch;
2,314,673✔
1465
    TSWAP(pWrapper->epRsp, rsp);
2,314,673✔
1466
    code = taosWriteQitem(tmq->mqueue, pWrapper);
2,314,673✔
1467
    if (code != 0) {
2,314,673✔
1468
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1469
      taosFreeQitem(pWrapper);
×
1470
      tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1471
    }
1472
  }
1473

1474
END:
3,248,194✔
1475
  tDeleteSMqAskEpRsp(&rsp);
1476
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
3,248,194✔
1477
  if (ret != 0){
3,248,194✔
1478
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1479
  }
1480

1481
_ERR:
3,248,194✔
1482
  if (pParam && pParam->sync) {
3,248,194✔
1483
    SAskEpInfo* pInfo = pParam->pParam;
927,757✔
1484
    if (pInfo) {
927,757✔
1485
      pInfo->code = code;
927,757✔
1486
      if (tsem2_post(&pInfo->sem) != 0){
927,757✔
1487
        tqErrorC("failed to post rsp sem askep cb");
×
1488
      }
1489
    }
1490
  }
1491

1492
  if (pMsg) {
3,248,194✔
1493
    taosMemoryFree(pMsg->pEpSet);
3,248,194✔
1494
    taosMemoryFree(pMsg->pData);
3,248,194✔
1495
  }
1496

1497
  return code;
3,248,194✔
1498
}
1499

1500
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
3,248,194✔
1501
  if (pTmq == NULL) {
3,248,194✔
1502
    return TSDB_CODE_INVALID_PARA;
×
1503
  }
1504
  int32_t code = 0;
3,248,194✔
1505
  int32_t lino = 0;
3,248,194✔
1506
  SMqAskEpReq req = {0};
3,248,194✔
1507
  req.consumerId = pTmq->consumerId;
3,248,194✔
1508
  req.epoch = updateEpSet ? -1 : atomic_load_32(&pTmq->epoch);
3,248,194✔
1509
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
3,248,194✔
1510
  SMqAskEpCbParam* pParam = NULL;
3,248,194✔
1511
  void*            pReq = NULL;
3,248,194✔
1512

1513
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
3,248,194✔
1514
  TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
3,248,194✔
1515
  pReq = taosMemoryCalloc(1, tlen);
3,248,194✔
1516
  TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
3,248,194✔
1517

1518
  code = tSerializeSMqAskEpReq(pReq, tlen, &req);
3,248,194✔
1519
  TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
3,248,194✔
1520

1521
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
3,248,194✔
1522
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
3,248,194✔
1523

1524
  pParam->refId = pTmq->refId;
3,248,194✔
1525
  pParam->sync = sync;
3,248,194✔
1526
  pParam->pParam = param;
3,248,194✔
1527

1528
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,248,194✔
1529
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
3,248,194✔
1530

1531
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
3,248,194✔
1532
  sendInfo->requestId = generateRequestId();
3,248,194✔
1533
  sendInfo->requestObjRefId = 0;
3,248,194✔
1534
  sendInfo->param = pParam;
3,248,194✔
1535
  sendInfo->paramFreeFp = taosAutoMemoryFree;
3,248,194✔
1536
  sendInfo->fp = askEpCb;
3,248,194✔
1537
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
3,248,194✔
1538

1539
  pReq = NULL;
3,248,194✔
1540
  pParam = NULL;
3,248,194✔
1541

1542
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
3,248,194✔
1543
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode, QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
3,248,194✔
1544
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
3,248,194✔
1545

1546
END:
3,248,194✔
1547
  if (code != 0) {
3,248,194✔
1548
    tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
×
1549
  }
1550
  taosMemoryFree(pReq);
3,248,194✔
1551
  taosMemoryFree(pParam);
3,248,194✔
1552
  return code;
3,248,194✔
1553
}
1554

1555
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
35,718,155✔
1556
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
35,718,155✔
1557
  while (1) {
4,840,788✔
1558
    int8_t* pTaskType = NULL;
40,558,943✔
1559
    taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
40,558,943✔
1560
    if (pTaskType == NULL) {break;}
40,558,943✔
1561
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
4,840,788✔
1562
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
1,632,428✔
1563
      int32_t code = askEp(pTmq, NULL, false, false);
1,632,428✔
1564
      if (code != 0) {
1,632,428✔
1565
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1566
      }
1567
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
1,632,428✔
1568
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
1,632,428✔
1569
                              &pTmq->epTimer);
1570
      tqDebugC("reset timer for tmq ask ep:%d", ret);
1,632,428✔
1571
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
3,208,360✔
1572
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
3,208,360✔
1573
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
3,208,360✔
1574
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
3,208,360✔
1575
               pTmq->autoCommitInterval / 1000.0);
1576
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
3,208,360✔
1577
                              &pTmq->commitTimer);
1578
      tqDebugC("reset timer for commit:%d", ret);
3,208,360✔
1579
    } else {
1580
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1581
    }
1582

1583
    taosFreeQitem(pTaskType);
4,840,788✔
1584
  }
1585

1586
  return 0;
35,718,155✔
1587
}
1588

1589
void tmqClearUnhandleMsg(tmq_t* tmq) {
307,013✔
1590
  if (tmq == NULL) return;
307,013✔
1591
  while (1) {
240,181✔
1592
    SMqRspWrapper* rspWrapper = NULL;
547,194✔
1593
    taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
547,194✔
1594
    if (rspWrapper == NULL) break;
547,194✔
1595
    tmqFreeRspWrapper(rspWrapper);
240,181✔
1596
    taosFreeQitem(rspWrapper);
240,181✔
1597
  }
1598
}
1599

1600
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
318,181✔
1601
  if (pMsg) {
318,181✔
1602
    taosMemoryFreeClear(pMsg->pEpSet);
318,181✔
1603
    taosMemoryFreeClear(pMsg->pData);
318,181✔
1604
  }
1605

1606
  if (param == NULL) {
318,181✔
1607
    return code;
×
1608
  }
1609

1610
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
318,181✔
1611
  pParam->rspErr = code;
318,181✔
1612

1613
  if (tsem2_post(&pParam->rspSem) != 0){
318,181✔
1614
    tqErrorC("failed to post sem, subscribe cb");
×
1615
  }
1616
  return 0;
318,181✔
1617
}
1618

1619
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
8,961✔
1620
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
8,961✔
1621
  if (*topics == NULL) {
8,961✔
1622
    *topics = tmq_list_new();
7,179✔
1623
    if (*topics == NULL) {
7,179✔
1624
      return terrno;
×
1625
    }
1626
  }
1627
  taosRLockLatch(&tmq->lock);
8,961✔
1628
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
12,054✔
1629
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
3,093✔
1630
    if (topic == NULL) {
3,093✔
1631
      tqErrorC("topic is null");
×
1632
      continue;
×
1633
    }
1634
    char* tmp = strchr(topic->topicName, '.');
3,093✔
1635
    if (tmp == NULL) {
3,093✔
1636
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1637
      continue;
×
1638
    }
1639
    if (tmq_list_append(*topics, tmp + 1) != 0) {
3,093✔
1640
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1641
      continue;
×
1642
    }
1643
  }
1644
  taosRUnLockLatch(&tmq->lock);
8,961✔
1645
  return 0;
8,961✔
1646
}
1647

1648
void tmqFreeImpl(void* handle) {
111,088✔
1649
  if (handle == NULL) return;
111,088✔
1650
  tmq_t*  tmq = (tmq_t*)handle;
111,088✔
1651
  int64_t id = tmq->consumerId;
111,088✔
1652

1653
  if (tmq->mqueue) {
111,088✔
1654
    tmqClearUnhandleMsg(tmq);
111,088✔
1655
    taosCloseQueue(tmq->mqueue);
111,088✔
1656
  }
1657

1658
  if (tmq->delayedTask) {
111,088✔
1659
    taosCloseQueue(tmq->delayedTask);
111,088✔
1660
  }
1661

1662
  if(tsem2_destroy(&tmq->rspSem) != 0) {
111,088✔
1663
    tqErrorC("failed to destroy sem in free tmq");
×
1664
  }
1665

1666
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
111,088✔
1667
  taos_close_internal(tmq->pTscObj);
111,088✔
1668

1669
  if (tmq->commitTimer) {
111,088✔
1670
    if (!taosTmrStopA(&tmq->commitTimer)) {
68,002✔
1671
      tqErrorC("failed to stop commit timer");
36,510✔
1672
    }
1673
  }
1674
  if (tmq->epTimer) {
111,088✔
1675
    if (!taosTmrStopA(&tmq->epTimer)) {
109,652✔
1676
      tqErrorC("failed to stop ep timer");
100,124✔
1677
    }
1678
  }
1679
  if (tmq->hbLiveTimer) {
111,088✔
1680
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
111,088✔
1681
      tqErrorC("failed to stop hb timer");
×
1682
    }
1683
  }
1684
  taosMemoryFree(tmq);
111,088✔
1685

1686
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
111,088✔
1687
}
1688

1689
static void tmqMgmtInit(void) {
77,197✔
1690
  tmqInitRes = 0;
77,197✔
1691

1692
  if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
77,197✔
1693
    goto END;
×
1694
  }
1695

1696
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
77,197✔
1697

1698
  if (tmqMgmt.timer == NULL) {
77,197✔
1699
    goto END;
×
1700
  }
1701

1702
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
77,197✔
1703
  if (tmqMgmt.rsetId < 0) {
77,197✔
1704
    goto END;
×
1705
  }
1706

1707
  return;
77,197✔
1708
END:
×
1709
  tmqInitRes = terrno;
×
1710
}
1711

1712
void tmqMgmtClose(void) {
1,169,010✔
1713
  if (tmqMgmt.timer) {
1,169,010✔
1714
    taosTmrCleanUp(tmqMgmt.timer);
77,197✔
1715
    tmqMgmt.timer = NULL;
77,197✔
1716
  }
1717

1718
  if (tmqMgmt.rsetId > 0) {
1,169,010✔
1719
    (void) taosThreadMutexLock(&tmqMgmt.lock);
77,197✔
1720
    tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
77,197✔
1721
    int64_t  refId = 0;
77,197✔
1722

1723
    while (tmq) {
80,294✔
1724
      refId = tmq->refId;
3,097✔
1725
      if (refId == 0) {
3,097✔
1726
        break;
×
1727
      }
1728
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
3,097✔
1729
      tmq = taosIterateRef(tmqMgmt.rsetId, refId);
3,097✔
1730
    }
1731
    taosCloseRef(tmqMgmt.rsetId);
77,197✔
1732
    tmqMgmt.rsetId = -1;
77,197✔
1733
    (void)taosThreadMutexUnlock(&tmqMgmt.lock);
77,197✔
1734
  }
1735
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
1,169,010✔
1736
}
1,169,010✔
1737

1738
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
114,134✔
1739
  int32_t code = 0;
114,134✔
1740

1741
  if (conf == NULL) {
114,134✔
1742
    SET_ERROR_MSG_TMQ("configure is null")
×
1743
    return NULL;
×
1744
  }
1745
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
114,134✔
1746
  if (code != 0) {
114,132✔
1747
    SET_ERROR_MSG_TMQ("tmq init error")
×
1748
    return NULL;
×
1749
  }
1750
  if (tmqInitRes != 0) {
114,132✔
1751
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1752
    return NULL;
×
1753
  }
1754

1755
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
114,132✔
1756
  if (pTmq == NULL) {
114,185✔
1757
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1758
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1759
    return NULL;
×
1760
  }
1761

1762
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
114,185✔
1763
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
114,185✔
1764

1765
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
114,185✔
1766
  if (pTmq->clientTopics == NULL) {
114,185✔
1767
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1768
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1769
    goto _failed;
×
1770
  }
1771
  code = taosOpenQueue(&pTmq->mqueue);
114,185✔
1772
  if (code) {
114,185✔
1773
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1774
             pTmq->groupId);
1775
    SET_ERROR_MSG_TMQ("open queue failed")
×
1776
    goto _failed;
×
1777
  }
1778

1779
  code = taosOpenQueue(&pTmq->delayedTask);
114,185✔
1780
  if (code) {
114,185✔
1781
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1782
             pTmq->groupId);
1783
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1784
    goto _failed;
×
1785
  }
1786

1787
  if (conf->groupId[0] == 0) {
114,185✔
1788
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1789
             pTmq->groupId);
1790
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1791
    goto _failed;
×
1792
  }
1793

1794
  // init status
1795
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
114,185✔
1796
  pTmq->pollCnt = 0;
114,185✔
1797
  pTmq->epoch = 0;
114,185✔
1798
  pTmq->pollFlag = 0;
114,185✔
1799

1800
  // set conf
1801
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
114,185✔
1802
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
114,185✔
1803
  pTmq->withTbName = conf->withTbName;
114,185✔
1804
  pTmq->useSnapshot = conf->snapEnable;
114,185✔
1805
  pTmq->enableWalMarker = conf->enableWalMarker;
114,185✔
1806
  pTmq->autoCommit = conf->autoCommit;
114,185✔
1807
  pTmq->autoCommitInterval = conf->autoCommitInterval;
114,185✔
1808
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
114,185✔
1809
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
114,185✔
1810
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
114,185✔
1811
  pTmq->commitCb = conf->commitCb;
114,185✔
1812
  pTmq->commitCbUserParam = conf->commitCbUserParam;
114,185✔
1813
  pTmq->resetOffsetCfg = conf->resetOffset;
114,185✔
1814
  pTmq->replayEnable = conf->replayEnable;
114,185✔
1815
  pTmq->sourceExcluded = conf->sourceExcluded;
114,185✔
1816
  pTmq->rawData = conf->rawData;
114,185✔
1817
  pTmq->maxPollWaitTime = conf->maxPollWaitTime;
114,185✔
1818
  pTmq->minPollRows = conf->minPollRows;
114,185✔
1819
  pTmq->enableBatchMeta = conf->enableBatchMeta;
114,185✔
1820
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
114,185✔
1821
  if (taosGetFqdn(pTmq->fqdn) != 0) {
114,185✔
1822
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1823
  }
1824
  if (conf->replayEnable) {
114,185✔
1825
    pTmq->autoCommit = false;
1,827✔
1826
  }
1827
  taosInitRWLatch(&pTmq->lock);
114,185✔
1828

1829
  // assign consumerId
1830
  pTmq->consumerId = tGenIdPI64();
114,185✔
1831

1832
  // init semaphore
1833
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
114,185✔
1834
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1835
             tstrerror(TAOS_SYSTEM_ERROR(ERRNO)), pTmq->groupId);
1836
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1837
    goto _failed;
×
1838
  }
1839

1840
  // init connection
1841
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
114,132✔
1842
  if (code) {
114,185✔
1843
    terrno = code;
×
1844
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1845
    SET_ERROR_MSG_TMQ("init tscObj failed")
×
1846
    goto _failed;
×
1847
  }
1848

1849
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
114,185✔
1850
  if (pTmq->refId < 0) {
114,185✔
1851
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1852
    goto _failed;
×
1853
  }
1854

1855
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
114,185✔
1856
  if (pTmq->hbLiveTimer == NULL) {
114,185✔
1857
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1858
    goto _failed;
×
1859
  }
1860
  char         buf[TSDB_OFFSET_LEN] = {0};
114,185✔
1861
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
114,185✔
1862
  tFormatOffset(buf, tListLen(buf), &offset);
114,185✔
1863
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
114,185✔
1864
              ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, maxPollIntervalMs:%dms, sessionTimeoutMs:%dms",
1865
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1866
          buf, pTmq->maxPollIntervalMs, pTmq->sessionTimeoutMs);
1867

1868
  return pTmq;
114,185✔
1869

1870
  _failed:
×
1871
  tmqFreeImpl(pTmq);
×
1872
  return NULL;
×
1873
}
1874

1875
static int32_t syncAskEp(tmq_t* pTmq) {
927,757✔
1876
  if (pTmq == NULL) return TSDB_CODE_INVALID_PARA;
927,757✔
1877
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
927,757✔
1878
  if (pInfo == NULL) return terrno;
927,757✔
1879
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
927,757✔
1880
    taosMemoryFree(pInfo);
×
1881
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1882
  }
1883

1884
  int32_t code = askEp(pTmq, pInfo, true, false);
927,757✔
1885
  if (code == 0) {
927,757✔
1886
    if (tsem2_wait(&pInfo->sem) != 0){
927,757✔
1887
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1888
    }
1889
    code = pInfo->code;
927,757✔
1890
  }
1891

1892
  if(tsem2_destroy(&pInfo->sem) != 0) {
927,757✔
1893
    tqErrorC("failed to destroy sem sync ask ep");
×
1894
  }
1895
  taosMemoryFree(pInfo);
927,757✔
1896
  return code;
927,757✔
1897
}
1898

1899
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
318,181✔
1900
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
318,181✔
1901
  const SArray*   container = &topic_list->container;
318,181✔
1902
  int32_t         sz = taosArrayGetSize(container);
318,181✔
1903
  void*           buf = NULL;
318,181✔
1904
  SMsgSendInfo*   sendInfo = NULL;
318,181✔
1905
  SCMSubscribeReq req = {0};
318,181✔
1906
  int32_t         code = 0;
318,181✔
1907

1908
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
318,181✔
1909

1910
  req.consumerId = tmq->consumerId;
318,181✔
1911
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
318,181✔
1912
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
318,181✔
1913
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
318,181✔
1914
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
318,181✔
1915

1916
  req.topicNames = taosArrayInit(sz, sizeof(void*));
318,181✔
1917
  if (req.topicNames == NULL) {
318,181✔
1918
    code = terrno;
×
1919
    goto END;
×
1920
  }
1921

1922
  req.withTbName = tmq->withTbName;
318,181✔
1923
  req.autoCommit = tmq->autoCommit;
318,181✔
1924
  req.autoCommitInterval = tmq->autoCommitInterval;
318,181✔
1925
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
318,181✔
1926
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
318,181✔
1927
  req.resetOffsetCfg = tmq->resetOffsetCfg;
318,181✔
1928
  req.enableReplay = tmq->replayEnable;
318,181✔
1929
  req.enableBatchMeta = tmq->enableBatchMeta;
318,181✔
1930

1931
  for (int32_t i = 0; i < sz; i++) {
442,889✔
1932
    char* topic = taosArrayGetP(container, i);
124,708✔
1933
    if (topic == NULL) {
124,708✔
1934
      code = terrno;
×
1935
      goto END;
×
1936
    }
1937
    SName name = {0};
124,708✔
1938
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
124,708✔
1939
    if (code) {
124,708✔
1940
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1941
               code);
1942
      goto END;
×
1943
    }
1944
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
124,708✔
1945
    if (topicFName == NULL) {
124,708✔
1946
      code = terrno;
×
1947
      goto END;
×
1948
    }
1949

1950
    code = tNameExtractFullName(&name, topicFName);
124,708✔
1951
    if (code) {
124,708✔
1952
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1953
               code);
1954
      taosMemoryFree(topicFName);
×
1955
      goto END;
×
1956
    }
1957

1958
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
249,416✔
1959
      code = terrno;
×
1960
      taosMemoryFree(topicFName);
×
1961
      goto END;
×
1962
    }
1963
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
124,708✔
1964
  }
1965

1966
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
318,181✔
1967
  buf = taosMemoryMalloc(tlen);
318,181✔
1968
  if (buf == NULL) {
318,181✔
1969
    code = terrno;
×
1970
    goto END;
×
1971
  }
1972

1973
  void* abuf = buf;
318,181✔
1974
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
318,181✔
1975

1976
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
318,181✔
1977
  if (sendInfo == NULL) {
318,181✔
1978
    code = terrno;
×
1979
    taosMemoryFree(buf);
×
1980
    goto END;
×
1981
  }
1982

1983
  SMqSubscribeCbParam param = {.rspErr = 0};
318,181✔
1984
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
318,181✔
1985
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1986
    taosMemoryFree(buf);
×
1987
    taosMemoryFree(sendInfo);
×
1988
    goto END;
×
1989
  }
1990

1991
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
318,181✔
1992
  sendInfo->requestId = generateRequestId();
318,181✔
1993
  sendInfo->requestObjRefId = 0;
318,181✔
1994
  sendInfo->param = &param;
318,181✔
1995
  sendInfo->fp = tmqSubscribeCb;
318,181✔
1996
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
318,181✔
1997

1998
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
318,181✔
1999

2000
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
318,181✔
2001
  if (code != 0) {
318,181✔
2002
    goto END;
×
2003
  }
2004

2005
  if (tsem2_wait(&param.rspSem) != 0){
318,181✔
2006
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
2007
  }
2008
  if(tsem2_destroy(&param.rspSem) != 0) {
318,181✔
2009
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
2010
  }
2011

2012
  if (param.rspErr != 0) {
318,181✔
2013
    code = param.rspErr;
2,706✔
2014
    goto END;
2,706✔
2015
  }
2016

2017
  int32_t retryCnt = 0;
315,475✔
2018
  while ((code = syncAskEp(tmq)) != 0) {
927,721✔
2019
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
613,596✔
2020
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
1,350✔
2021
               tmq->consumerId, tstrerror(code));
2022
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
1,350✔
2023
        code = 0;
1,350✔
2024
      }
2025
      goto END;
1,350✔
2026
    }
2027

2028
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
612,246✔
2029
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
612,246✔
2030
  }
2031

2032
  if (tmq->epTimer == NULL){
314,125✔
2033
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
110,975✔
2034
    if (tmq->epTimer == NULL) {
110,975✔
2035
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2036
      goto END;
×
2037
    }
2038
  }
2039
  if (tmq->autoCommit && tmq->commitTimer == NULL){
314,125✔
2040
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
68,328✔
2041
    if (tmq->commitTimer == NULL) {
68,328✔
2042
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2043
      goto END;
×
2044
    }
2045
  }
2046

2047
  END:
318,181✔
2048
  taosArrayDestroyP(req.topicNames, NULL);
318,181✔
2049
  return code;
318,181✔
2050
}
2051

2052
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
84,634✔
2053
  if (conf == NULL) return;
84,634✔
2054
  conf->commitCb = cb;
84,634✔
2055
  conf->commitCbUserParam = param;
84,634✔
2056
}
2057

2058
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
18,157,405✔
2059
  if (tmq == NULL || topicName == NULL || pVg == NULL) {
18,157,405✔
2060
    return;
×
2061
  }
2062
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
18,157,405✔
2063
  for (int i = 0; i < topicNumCur; i++) {
18,558,567✔
2064
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
18,558,278✔
2065
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
18,557,952✔
2066
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
18,157,405✔
2067
      for (int32_t j = 0; j < vgNumCur; j++) {
38,932,712✔
2068
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
38,932,097✔
2069
        if (pVgCur && pVgCur->vgId == vgId) {
38,932,749✔
2070
          *pVg = pVgCur;
18,156,790✔
2071
          return;
18,156,790✔
2072
        }
2073
      }
2074
    }
2075
  }
2076
}
2077

2078
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
17,193,008✔
2079
  if (tmq == NULL || topicName == NULL) {
17,193,008✔
2080
    return NULL;
×
2081
  }
2082
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
17,193,008✔
2083
  for (int i = 0; i < topicNumCur; i++) {
17,594,207✔
2084
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
17,594,207✔
2085
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
17,593,229✔
2086
      return pTopicCur;
17,193,334✔
2087
    }
2088
  }
2089
  return NULL;
×
2090
}
2091

2092
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
18,385,820✔
2093
  tmq_t*             tmq = NULL;
18,385,820✔
2094
  SMqRspWrapper*     pRspWrapper = NULL;
18,385,820✔
2095
  int8_t             rspType = 0;
18,385,820✔
2096
  int32_t            vgId = 0;
18,385,820✔
2097
  uint64_t           requestId = 0;
18,385,820✔
2098
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
18,385,820✔
2099
  if (pMsg == NULL) {
18,385,820✔
2100
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2101
  }
2102
  if (pParam == NULL) {
18,385,820✔
2103
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2104
    goto EXIT;
×
2105
  }
2106
  int64_t refId = pParam->refId;
18,385,820✔
2107
  vgId = pParam->vgId;
18,385,820✔
2108
  requestId = pParam->requestId;
18,385,537✔
2109
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
18,385,559✔
2110
  if (tmq == NULL) {
18,385,820✔
2111
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
2112
    goto EXIT;
×
2113
  }
2114

2115
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
18,385,820✔
2116
  if (ret) {
18,385,820✔
UNCOV
2117
    code = ret;
×
UNCOV
2118
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
×
2119
    goto END;
×
2120
  }
2121

2122
  if (code != 0) {
18,385,820✔
2123
    goto END;
980,845✔
2124
  }
2125

2126
  if (pMsg->pData == NULL) {
17,404,975✔
2127
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
2128
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2129
    goto END;
×
2130
  }
2131

2132
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
17,404,408✔
2133
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
17,404,649✔
2134

2135
  if (msgEpoch != clientEpoch) {
17,404,397✔
2136
    tqErrorC("consumer:0x%" PRIx64
4,086✔
2137
                 " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
2138
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
2139
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
4,086✔
2140
    goto END;
4,086✔
2141
  }
2142
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
17,400,311✔
2143
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s), QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
17,400,527✔
2144

2145
  pRspWrapper->tmqRspType = rspType;
17,400,889✔
2146
  pRspWrapper->pollRsp.reqId = requestId;
17,400,889✔
2147
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
17,400,889✔
2148
  pRspWrapper->pollRsp.data = pMsg->pData;
17,400,889✔
2149
  pRspWrapper->pollRsp.len = pMsg->len;
17,400,889✔
2150
  pMsg->pData = NULL;
17,400,889✔
2151
  pMsg->pEpSet = NULL;
17,400,889✔
2152

2153
  END:
18,385,820✔
2154
  if (pRspWrapper) {
18,385,820✔
2155
    pRspWrapper->code = code;
18,385,820✔
2156
    pRspWrapper->pollRsp.vgId = vgId;
18,385,820✔
2157
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
18,385,820✔
2158
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
18,385,820✔
2159
    if (code != 0) {
18,385,820✔
2160
      tmqFreeRspWrapper(pRspWrapper);
×
2161
      taosFreeQitem(pRspWrapper);
×
2162
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2163
    } else {
2164
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d, QID:0x%" PRIx64,
18,385,820✔
2165
               tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
2166
    }
2167
  }
2168

2169
  if (tsem2_post(&tmq->rspSem) != 0){
18,385,820✔
2170
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2171
  }
2172
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
18,385,820✔
2173
  if (ret != 0){
18,385,820✔
2174
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2175
  }
2176

2177
  EXIT:
18,385,820✔
2178
  taosMemoryFreeClear(pMsg->pData);
18,385,820✔
2179
  taosMemoryFreeClear(pMsg->pEpSet);
18,385,820✔
2180
  return code;
18,385,528✔
2181
}
2182

2183
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
18,386,526✔
2184
  if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) {
18,386,526✔
2185
    return;
×
2186
  }
2187
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
18,386,526✔
2188
  pReq->withTbName = tmq->withTbName;
18,386,526✔
2189
  pReq->consumerId = tmq->consumerId;
18,386,526✔
2190
  pReq->timeout = tmq->maxPollWaitTime;
18,386,526✔
2191
  pReq->minPollRows = tmq->minPollRows;
18,386,526✔
2192
  pReq->epoch = atomic_load_32(&tmq->epoch);
18,386,526✔
2193
  pReq->reqOffset = pVg->offsetInfo.endOffset;
18,386,526✔
2194
  pReq->head.vgId = pVg->vgId;
18,386,526✔
2195
  pReq->useSnapshot = tmq->useSnapshot;
18,386,526✔
2196
  pReq->reqId = generateRequestId();
18,386,526✔
2197
  pReq->enableReplay = tmq->replayEnable;
18,386,198✔
2198
  pReq->sourceExcluded = tmq->sourceExcluded;
18,385,872✔
2199
  pReq->rawData = tmq->rawData;
18,385,872✔
2200
  pReq->enableBatchMeta = tmq->enableBatchMeta;
18,385,872✔
2201
}
2202

2203
void changeByteEndian(char* pData) {
110,121,911✔
2204
  if (pData == NULL) {
110,121,911✔
2205
    return;
×
2206
  }
2207
  char* p = pData;
110,121,911✔
2208

2209
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2210
  // length | version:
2211
  int32_t blockVersion = *(int32_t*)p;
110,121,911✔
2212
  if (blockVersion != BLOCK_VERSION_1) {
110,121,911✔
2213
    tqErrorC("invalid block version:%d", blockVersion);
×
2214
    return;
×
2215
  }
2216
  *(int32_t*)p = BLOCK_VERSION_2;
110,121,911✔
2217

2218
  p += sizeof(int32_t);
110,121,585✔
2219
  p += sizeof(int32_t);
110,121,911✔
2220
  p += sizeof(int32_t);
110,122,237✔
2221
  int32_t cols = *(int32_t*)p;
110,122,237✔
2222
  p += sizeof(int32_t);
110,122,237✔
2223
  p += sizeof(int32_t);
110,121,911✔
2224
  p += sizeof(uint64_t);
110,122,237✔
2225
  // check fields
2226
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
110,121,911✔
2227

2228
  int32_t* colLength = (int32_t*)p;
110,121,911✔
2229
  for (int32_t i = 0; i < cols; ++i) {
678,153,395✔
2230
    colLength[i] = htonl(colLength[i]);
568,031,158✔
2231
  }
2232
}
2233

2234
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
213,056,463✔
2235
  if (pRetrieve == NULL || rawData == NULL || rows == NULL) {
213,056,463✔
2236
    return;
×
2237
  }
2238
  if (*(int64_t*)pRetrieve == 0) {
213,057,115✔
2239
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2240
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2241
    if (precision != NULL) {
×
2242
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2243
    }
2244
  } else if (*(int64_t*)pRetrieve == 1) {
213,056,789✔
2245
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
213,058,040✔
2246
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
213,058,040✔
2247
    if (precision != NULL) {
213,056,084✔
2248
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
102,935,856✔
2249
    }
2250
  }
2251
}
2252

2253
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
12,592,023✔
2254
                                        SMqRspObj* pRspObj) {
2255
  if (pWrapper == NULL || pVg == NULL || numOfRows == NULL || pRspObj == NULL) {
12,592,023✔
2256
    return;
×
2257
  }
2258
  pRspObj->resIter = -1;
12,592,023✔
2259
  pRspObj->resInfo.totalRows = 0;
12,592,023✔
2260
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
12,592,023✔
2261

2262
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
12,592,023✔
2263
  // extract the rows in this data packet
2264
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
122,714,260✔
2265
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
110,122,237✔
2266
    void*   rawData = NULL;
110,121,911✔
2267
    int64_t rows = 0;
110,121,911✔
2268
    // deal with compatibility
2269
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
110,121,911✔
2270

2271
    pVg->numOfRows += rows;
110,121,911✔
2272
    (*numOfRows) += rows;
110,122,237✔
2273
    changeByteEndian(rawData);
110,122,237✔
2274
  }
2275
}
2276

2277
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
18,385,820✔
2278
  SMqPollReq      req = {0};
18,385,820✔
2279
  char*           msg = NULL;
18,385,820✔
2280
  SMqPollCbParam* pParam = NULL;
18,385,820✔
2281
  SMsgSendInfo*   sendInfo = NULL;
18,385,820✔
2282
  int             code = 0;
18,385,820✔
2283
  int             lino = 0;
18,385,820✔
2284
  tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg);
18,385,820✔
2285

2286
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
18,385,166✔
2287
  TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
18,384,514✔
2288

2289
  msg = taosMemoryCalloc(1, msgSize);
18,384,514✔
2290
  TSDB_CHECK_NULL(msg, code, lino, END, terrno);
18,384,190✔
2291

2292
  TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
18,384,190✔
2293

2294
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
18,383,538✔
2295
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
18,383,538✔
2296

2297
  pParam->refId = pTmq->refId;
18,383,538✔
2298
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
18,384,842✔
2299
  pParam->vgId = pVg->vgId;
18,385,168✔
2300
  pParam->requestId = req.reqId;
18,385,168✔
2301

2302
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
18,382,886✔
2303
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
18,383,536✔
2304

2305
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
18,383,536✔
2306
  sendInfo->requestId = req.reqId;
18,383,864✔
2307
  sendInfo->requestObjRefId = 0;
18,383,536✔
2308
  sendInfo->param = pParam;
18,383,536✔
2309
  sendInfo->paramFreeFp = taosAutoMemoryFree;
18,385,168✔
2310
  sendInfo->fp = tmqPollCb;
18,384,840✔
2311
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
18,383,864✔
2312

2313
  msg = NULL;
18,383,538✔
2314
  pParam = NULL;
18,383,538✔
2315

2316
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
18,383,538✔
2317
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
18,382,884✔
2318
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
18,384,840✔
2319
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, QID:0x%" PRIx64, pTmq->consumerId,
18,385,820✔
2320
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2321
  TSDB_CHECK_CODE(code, lino, END);
18,385,820✔
2322

2323
  pVg->pollCnt++;
18,385,820✔
2324
  pVg->seekUpdated = false;  // reset this flag.
18,385,820✔
2325
  pTmq->pollCnt++;
18,385,820✔
2326

2327
END:
18,385,820✔
2328
  if (code != 0){
18,385,820✔
2329
    tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
×
2330
  }
2331
  taosMemoryFreeClear(pParam);
18,385,820✔
2332
  taosMemoryFreeClear(msg);
18,385,820✔
2333
  return code;
18,385,820✔
2334
}
2335

2336
static int32_t tmqPollImpl(tmq_t* tmq) {
23,067,109✔
2337
  if (tmq == NULL) {
23,067,109✔
2338
    return TSDB_CODE_INVALID_MSG;
×
2339
  }
2340
  int32_t code = 0;
23,067,109✔
2341
  taosWLockLatch(&tmq->lock);
23,067,109✔
2342

2343
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
23,067,109✔
2344
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
4,847✔
2345
    goto end;
4,847✔
2346
  }
2347

2348
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
23,062,262✔
2349
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
23,062,262✔
2350

2351
  for (int i = 0; i < numOfTopics; i++) {
46,839,647✔
2352
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
23,777,385✔
2353
    if (pTopic == NULL) {
23,777,385✔
2354
      continue;
×
2355
    }
2356
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
23,777,385✔
2357
    if (pTopic->noPrivilege) {
23,777,385✔
2358
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
584✔
2359
      continue;
584✔
2360
    }
2361
    for (int j = 0; j < numOfVg; j++) {
93,511,378✔
2362
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
69,734,577✔
2363
      if (pVg == NULL) {
69,734,577✔
2364
        continue;
×
2365
      }
2366

2367
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
69,734,577✔
2368
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than 10ms
69,734,577✔
2369
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
13,220,192✔
2370
                 tmq->epoch, pVg->vgId);
2371
        continue;
13,220,192✔
2372
      }
2373

2374
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
56,514,385✔
2375
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
56,514,385✔
2376
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
6,525✔
2377
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2378
        continue;
6,525✔
2379
      }
2380

2381
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
56,507,860✔
2382
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
56,507,860✔
2383
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
38,122,040✔
2384
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
38,122,040✔
2385
                 pVg->vgId, vgSkipCnt);
2386
        continue;
38,121,966✔
2387
      }
2388

2389
      atomic_store_32(&pVg->vgSkipCnt, 0);
18,385,820✔
2390
      code = doTmqPollImpl(tmq, pTopic, pVg);
18,385,820✔
2391
      if (code != TSDB_CODE_SUCCESS) {
18,385,894✔
2392
        goto end;
×
2393
      }
2394
    }
2395
  }
2396

2397
  end:
23,062,262✔
2398
  taosWUnLockLatch(&tmq->lock);
23,067,109✔
2399
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
23,067,109✔
2400
  return code;
23,067,109✔
2401
}
2402

2403
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
17,193,008✔
2404
                         int64_t consumerId, bool hasData) {
2405
  if (pVg == NULL || reqOffset == NULL || rspOffset == NULL) {
17,193,008✔
UNCOV
2406
    return;
×
2407
  }
2408
  if (!pVg->seekUpdated) {
17,193,008✔
2409
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
17,191,416✔
2410
    if (hasData) {
17,192,720✔
2411
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
12,651,046✔
2412
    }
2413
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
17,192,720✔
2414
  } else {
2415
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
1,592✔
2416
  }
2417

2418
  // update the status
2419
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
17,193,334✔
2420

2421
  // update the valid wal version range
2422
  pVg->offsetInfo.walVerBegin = sver;
17,193,334✔
2423
  pVg->offsetInfo.walVerEnd = ever + 1;
17,193,334✔
2424
}
2425

2426
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
12,651,046✔
2427
  typedef union {
2428
    SMqDataRsp      dataRsp;
2429
    SMqMetaRsp      metaRsp;
2430
    SMqBatchMetaRsp batchMetaRsp;
2431
  } MEMSIZE;
2432

2433
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
12,651,046✔
2434
  if (pRspObj == NULL) {
12,651,046✔
2435
    tqErrorC("buildRsp:failed to allocate memory");
×
2436
    return NULL;
×
2437
  }
2438
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
12,651,046✔
2439
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
12,651,046✔
2440
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
12,651,046✔
2441
  pRspObj->vgId = pollRspWrapper->vgId;
12,651,046✔
2442
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
12,651,046✔
2443
  return pRspObj;
12,651,046✔
2444
}
2445

2446
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
964,071✔
2447
  int32_t code = 0;
964,071✔
2448
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
964,071✔
2449

2450
  tqErrorC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
964,071✔
2451
    tstrerror(pRspWrapper->code));
2452
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
964,071✔
2453
    code = askEp(tmq, NULL, false, true);
688,009✔
2454
    if (code != 0) {
688,009✔
2455
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when vnode transform, code:%s", tmq->consumerId, tstrerror(code));
×
2456
    }
2457
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
276,062✔
2458
    code = syncAskEp(tmq);
36✔
2459
    if (code != 0) {
36✔
2460
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
×
2461
    }
2462
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
276,026✔
2463
    code = 0;
155,980✔
2464
  }
2465
  
2466
  taosWLockLatch(&tmq->lock);
964,071✔
2467
  SMqClientVg* pVg = NULL;
964,071✔
2468
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
964,071✔
2469
  if (pVg) {
964,071✔
2470
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
1,916,034✔
2471
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
963,782✔
2472
  }
2473
  taosWUnLockLatch(&tmq->lock);
964,071✔
2474

2475
  return code;
964,071✔
2476
}
2477

2478
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
17,193,334✔
2479
  int32_t code = 0;
17,193,334✔
2480
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
17,193,334✔
2481
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
17,129,250✔
2482
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
17,129,250✔
2483
    pRspWrapper->pollRsp.data = NULL;
17,129,250✔
2484
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
64,084✔
2485
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
53,256✔
2486
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
10,828✔
2487
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
5,061✔
2488
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
5,061✔
2489
    pRspWrapper->pollRsp.data = NULL;
5,061✔
2490
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
5,767✔
2491
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
5,767✔
2492
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
2493
    PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
×
2494
    pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
×
2495
    pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
×
2496
    pRspWrapper->pollRsp.data = NULL;
×
2497
  } else {
2498
    tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
×
2499
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2500
    goto END;
×
2501
  }
2502
  END:
17,193,334✔
2503
  return code;
17,193,334✔
2504
}
2505

2506
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
19,495,220✔
2507
  int32_t    code = 0;
19,495,220✔
2508
  SMqRspObj* pRspObj = NULL;
19,495,220✔
2509

2510
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
19,495,220✔
2511
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
2,301,886✔
2512
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
2,301,886✔
2513
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
2,301,886✔
2514
    terrno = code;
2,301,886✔
2515
    return pRspObj;
2,301,886✔
2516
  }
2517

2518
  code = processWrapperData(pRspWrapper);
17,193,334✔
2519
  if (code != 0) {
17,193,334✔
2520
    goto END;
×
2521
  }
2522
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
17,193,334✔
2523
  taosWLockLatch(&tmq->lock);
17,193,334✔
2524
  SMqClientVg* pVg = NULL;
17,193,334✔
2525
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
17,193,334✔
2526
  if(pVg == NULL) {
17,193,008✔
2527
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
×
2528
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2529
    code = TSDB_CODE_TMQ_INVALID_VGID;
×
2530
    goto END;
×
2531
  }
2532
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
17,193,008✔
2533
  if (pollRspWrapper->pEpset != NULL) {
17,193,008✔
2534
    pVg->epSet = *pollRspWrapper->pEpset;
5,164✔
2535
  }
2536

2537
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
17,193,008✔
2538
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
64,084✔
2539
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
17,193,334✔
2540
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
17,133,985✔
2541
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
17,134,311✔
2542

2543
    char buf[TSDB_OFFSET_LEN] = {0};
17,134,311✔
2544
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
17,134,311✔
2545
    if (pollRspWrapper->dataRsp.blockNum == 0) {
17,134,311✔
2546
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
9,069,004✔
2547
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
4,542,288✔
2548
                   ", total:%" PRId64 ", QID:0x%" PRIx64,
2549
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2550
    } else {
2551
      pRspObj = buildRsp(pollRspWrapper);
12,592,023✔
2552
      if (pRspObj == NULL) {
12,591,697✔
2553
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2554
        goto END;
×
2555
      }
2556
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
25,183,720✔
2557
                         (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
12,592,023✔
2558
      int64_t numOfRows = 0;
12,592,023✔
2559
      if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
12,592,023✔
2560
        tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
12,592,023✔
2561
        tmq->totalRows += numOfRows;
12,592,023✔
2562
      }
2563
      if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
12,592,023✔
2564
        pVg->blockReceiveTs = taosGetTimestampMs();
14,094✔
2565
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
7,047✔
2566
        if (pVg->blockSleepForReplay > 0) {
7,047✔
2567
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
3,132✔
2568
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2569
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2570
          }
2571
        }
2572
      }
2573
      pVg->emptyBlockReceiveTs = 0;
12,591,697✔
2574
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
12,591,697✔
2575
                   ", vg total:%" PRId64 ", total:%" PRId64 ", QID:0x%" PRIx64,
2576
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2577
               pollRspWrapper->reqId);
2578
    }
2579
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
59,023✔
2580
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
59,023✔
2581
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2582

2583

2584
    pRspObj = buildRsp(pollRspWrapper);
59,023✔
2585
    if (pRspObj == NULL) {
59,023✔
2586
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2587
      goto END;
×
2588
    }
2589
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
59,023✔
2590
  }
2591

2592
END:
17,126,212✔
2593
  terrno = code;
17,193,334✔
2594
  taosWUnLockLatch(&tmq->lock);
17,193,334✔
2595
  return pRspObj;
17,193,334✔
2596
}
2597

2598
static void* tmqHandleAllRsp(tmq_t* tmq) {
35,718,155✔
2599
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
35,718,155✔
2600

2601
  int32_t code = 0;
35,718,155✔
2602
  void* returnVal = NULL;
35,718,155✔
2603
  while (1) {
7,808,245✔
2604
    SMqRspWrapper* pRspWrapper = NULL;
43,526,400✔
2605
    taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
43,526,400✔
2606
    if (pRspWrapper == NULL) {break;}
43,577,950✔
2607

2608
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
20,459,291✔
2609
    if (pRspWrapper->code != 0) {
20,459,291✔
2610
      code = processMqRspError(tmq, pRspWrapper);
964,071✔
2611
    }else{
2612
      returnVal = processMqRsp(tmq, pRspWrapper);
19,495,220✔
2613
      code = terrno;
19,495,220✔
2614
    }
2615

2616
    tmqFreeRspWrapper(pRspWrapper);
20,459,291✔
2617
    taosFreeQitem(pRspWrapper);
20,459,291✔
2618
    if(returnVal != NULL || code != 0){
20,459,291✔
2619
      break;
2620
    }
2621
  }
2622

2623
END:
35,718,155✔
2624
  terrno = code;
35,718,155✔
2625
  return returnVal;
35,718,155✔
2626
}
2627

2628
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
12,720,445✔
2629
  int32_t lino = 0;
12,720,445✔
2630
  int32_t code = 0;
12,720,445✔
2631
  TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
12,720,445✔
2632

2633
  void*   rspObj = NULL;
12,720,373✔
2634
  int64_t startTime = taosGetTimestampMs();
12,720,047✔
2635

2636
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
12,720,047✔
2637
  TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
12,721,025✔
2638

2639
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
12,721,025✔
2640

2641
  while (1) {
2642
    code = tmqHandleAllDelayedTask(tmq);
35,718,155✔
2643
    TSDB_CHECK_CODE(code, lino, END);
35,718,155✔
2644

2645
    rspObj = tmqHandleAllRsp(tmq);
35,718,155✔
2646
    if (rspObj) {
35,718,155✔
2647
      tqDebugC("consumer:0x%" PRIx64 "end to poll, return rsp:%p", tmq->consumerId, rspObj);
12,651,046✔
2648
      return (TAOS_RES*)rspObj;
12,651,046✔
2649
    }
2650
    code = terrno;
23,067,109✔
2651
    TSDB_CHECK_CODE(code, lino, END);
23,067,109✔
2652

2653
    code = tmqPollImpl(tmq);
23,067,109✔
2654
    TSDB_CHECK_CODE(code, lino, END);
23,067,109✔
2655

2656
    if (timeout >= 0) {
23,062,262✔
2657
      int64_t currentTime = taosGetTimestampMs();
23,062,262✔
2658
      int64_t elapsedTime = currentTime - startTime;
23,062,262✔
2659
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
23,062,262✔
2660
      TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
23,062,262✔
2661
    } else {
2662
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2663
    }
2664
  }
2665

2666
END:
70,051✔
2667
  terrno = code;
70,051✔
2668
  if (tmq != NULL && terrno != 0) {
70,051✔
2669
    tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
4,847✔
2670
  } else {
2671
    tqDebugC("consumer:0x%" PRIx64 " poll end with no data", tmq->consumerId);
65,204✔
2672
  }
2673
  return NULL;
70,051✔
2674
}
2675

2676
static void displayConsumeStatistics(tmq_t* pTmq) {
202,204✔
2677
  if (pTmq == NULL) return;
202,204✔
2678
  taosRLockLatch(&pTmq->lock);
202,204✔
2679
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
202,204✔
2680
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
202,204✔
2681
          pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2682

2683
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
202,204✔
2684
  for (int32_t i = 0; i < numOfTopics; ++i) {
315,280✔
2685
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
113,076✔
2686
    if (pTopics == NULL) continue;
113,076✔
2687
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
113,076✔
2688
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
113,076✔
2689
    for (int32_t j = 0; j < numOfVgs; ++j) {
465,237✔
2690
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
352,161✔
2691
      if (pVg == NULL) continue;
352,161✔
2692
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
352,161✔
2693
    }
2694
  }
2695
  taosRUnLockLatch(&pTmq->lock);
202,204✔
2696
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
202,204✔
2697
}
2698

2699
int32_t tmq_unsubscribe(tmq_t* tmq) {
202,204✔
2700
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
202,204✔
2701
  int32_t code = 0;
202,204✔
2702
  int8_t status = atomic_load_8(&tmq->status);
202,204✔
2703
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
202,204✔
2704

2705
  displayConsumeStatistics(tmq);
202,204✔
2706
  if (status != TMQ_CONSUMER_STATUS__READY) {
202,204✔
2707
    tqInfoC("consumer:0x%" PRIx64 " status:%d, not in ready state, no need unsubscribe", tmq->consumerId, status);
6,279✔
2708
    goto END;
6,279✔
2709
  }
2710
  if (tmq->autoCommit) {
195,925✔
2711
    code = tmq_commit_sync(tmq, NULL);
118,146✔
2712
    if (code != 0) {
118,146✔
2713
      goto END;
×
2714
    }
2715
  }
2716
  tmqSendHbReq((void*)(tmq->refId), NULL);
195,925✔
2717

2718
  tmq_list_t* lst = tmq_list_new();
195,925✔
2719
  if (lst == NULL) {
195,925✔
2720
    code = terrno;
×
2721
    goto END;
×
2722
  }
2723
  code = tmq_subscribe(tmq, lst);
195,925✔
2724
  tmq_list_destroy(lst);
195,925✔
2725
  tmqClearUnhandleMsg(tmq);
195,925✔
2726
  if(code != 0){
195,925✔
2727
    goto END;
×
2728
  }
2729

2730
  END:
195,925✔
2731
  return code;
202,204✔
2732
}
2733

2734
int32_t tmq_consumer_close(tmq_t* tmq) {
112,749✔
2735
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
112,749✔
2736
  int32_t code = 0;
112,677✔
2737
  (void) taosThreadMutexLock(&tmqMgmt.lock);
112,677✔
2738
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
112,677✔
2739
    goto end;
1,589✔
2740
  }
2741
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
111,088✔
2742
  code = tmq_unsubscribe(tmq);
111,088✔
2743
  if (code == 0) {
111,088✔
2744
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
111,088✔
2745
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
111,088✔
2746
    if (code != 0){
111,088✔
2747
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2748
    }
2749
  }
2750

2751
end:
111,088✔
2752
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
112,677✔
2753
  return code;
112,677✔
2754
}
2755

2756
const char* tmq_err2str(int32_t err) {
61,543✔
2757
  if (err == 0) {
61,543✔
2758
    return "success";
55,831✔
2759
  } else if (err == -1) {
5,712✔
2760
    return "fail";
×
2761
  } else {
2762
    if (*(taosGetErrMsg()) == 0) {
5,712✔
2763
      return tstrerror(err);
2,241✔
2764
    } else {
2765
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
3,471✔
2766
      return (const char*)taosGetErrMsgReturn();
3,471✔
2767
    }
2768
  }
2769
}
2770

2771
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
12,639,471✔
2772
  if (res == NULL) {
12,639,471✔
2773
    return TMQ_RES_INVALID;
4,564✔
2774
  }
2775
  if (TD_RES_TMQ(res)) {
12,634,907✔
2776
    return TMQ_RES_DATA;
12,576,292✔
2777
  } else if (TD_RES_TMQ_META(res)) {
58,615✔
2778
    return TMQ_RES_TABLE_META;
47,883✔
2779
  } else if (TD_RES_TMQ_METADATA(res)) {
10,732✔
2780
    return TMQ_RES_METADATA;
5,479✔
2781
  } else if (TD_RES_TMQ_BATCH_META(res)) {
5,253✔
2782
    return TMQ_RES_TABLE_META;
5,253✔
2783
  } else if (TD_RES_TMQ_RAW(res)) {
×
2784
    return TMQ_RES_RAWDATA;
×
2785
  } else {
2786
    return TMQ_RES_INVALID;
×
2787
  }
2788
}
2789

2790
const char* tmq_get_topic_name(TAOS_RES* res) {
12,408,592✔
2791
  if (res == NULL) {
12,408,592✔
2792
    return NULL;
7,172✔
2793
  }
2794
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
12,401,420✔
2795
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
53,320✔
2796
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
12,401,420✔
2797
    if (tmp == NULL) {
12,401,420✔
2798
      return NULL;
×
2799
    }
2800
    return tmp + 1;
12,401,420✔
2801
  } else {
2802
    return NULL;
×
2803
  }
2804
}
2805

2806
const char* tmq_get_db_name(TAOS_RES* res) {
12,386,413✔
2807
  if (res == NULL) {
12,386,413✔
2808
    return NULL;
5,216✔
2809
  }
2810

2811
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
12,381,197✔
2812
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
50,712✔
2813
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
12,381,197✔
2814
    if (tmp == NULL) {
12,381,197✔
2815
      return NULL;
×
2816
    }
2817
    return tmp + 1;
12,381,197✔
2818
  } else {
2819
    return NULL;
×
2820
  }
2821
}
2822

2823
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
12,388,744✔
2824
  if (res == NULL) {
12,388,744✔
2825
    return TSDB_CODE_INVALID_PARA;
5,542✔
2826
  }
2827
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
12,383,202✔
2828
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
52,016✔
2829
    return ((SMqRspObj*)res)->vgId;
12,383,202✔
2830
  } else {
2831
    return TSDB_CODE_INVALID_PARA;
×
2832
  }
2833
}
2834

2835
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
144,676✔
2836
  if (res == NULL) {
144,676✔
2837
    return TSDB_CODE_INVALID_PARA;
5,216✔
2838
  }
2839
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
139,460✔
2840
    SMqRspObj* pRspObj = (SMqRspObj*)res;
138,808✔
2841
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
138,808✔
2842
    if (pOffset->type == TMQ_OFFSET__LOG) {
138,808✔
2843
      return pOffset->version;
138,808✔
2844
    } else {
2845
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2846
    }
2847
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
652✔
2848
    SMqRspObj* pRspObj = (SMqRspObj*)res;
652✔
2849
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
652✔
2850
      return pRspObj->rspOffset.version;
652✔
2851
    }
2852
  } else {
2853
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2854
  }
2855

2856
  // data from tsdb, no valid offset info
2857
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2858
}
2859

2860
const char* tmq_get_table_name(TAOS_RES* res) {
2,147,483,647✔
2861
  if (res == NULL) {
2,147,483,647✔
2862
    return NULL;
5,542✔
2863
  }
2864
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
2,147,483,647✔
2865
    SMqRspObj* pRspObj = (SMqRspObj*)res;
2,147,483,647✔
2866
    SMqDataRsp* data = &pRspObj->dataRsp;
2,147,483,647✔
2867
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
2,147,483,647✔
2868
        pRspObj->resIter >= data->blockNum) {
1,287,947,538✔
2869
      return NULL;
2,147,483,647✔
2870
    }
2871
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
1,287,951,456✔
2872
  }
2873
  return NULL;
1,304✔
2874
}
2875

2876
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
5,216✔
2877
  if (tmq == NULL) {
5,216✔
2878
    tqErrorC("invalid tmq handle, null");
×
2879
    if (cb != NULL) {
×
2880
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2881
    }
2882
    return;
×
2883
  }
2884
  if (pRes == NULL) {  // here needs to commit all offsets.
5,216✔
2885
    asyncCommitAllOffsets(tmq, cb, param);
5,216✔
2886
  } else {  // only commit one offset
2887
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2888
  }
2889
}
2890

2891
static void commitCallBackFn(tmq_t* tmq, int32_t code, void* param) {
193,272✔
2892
  if (param == NULL) {
193,272✔
2893
    tqErrorC("invalid param in commit cb");
×
2894
    return;
×
2895
  }
2896
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
193,272✔
2897
  pInfo->code = code;
193,272✔
2898
  if (tsem2_post(&pInfo->sem) != 0){
193,272✔
2899
    tqErrorC("failed to post rsp sem in commit cb");
×
2900
  }
2901
}
2902

2903
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
191,496✔
2904
  if (tmq == NULL) {
191,496✔
2905
    tqErrorC("invalid tmq handle, null");
×
2906
    return TSDB_CODE_INVALID_PARA;
×
2907
  }
2908

2909
  int32_t code = 0;
191,496✔
2910

2911
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
191,496✔
2912
  if (pInfo == NULL) {
191,496✔
2913
    tqErrorC("failed to allocate memory for sync commit");
×
2914
    return terrno;
×
2915
  }
2916

2917
  code = tsem2_init(&pInfo->sem, 0, 0);
191,496✔
2918
  if (code != 0) {
191,496✔
2919
    tqErrorC("failed to init sem for sync commit");
×
2920
    taosMemoryFree(pInfo);
×
2921
    return code;
×
2922
  }
2923
  pInfo->code = 0;
191,496✔
2924

2925
  if (pRes == NULL) {
191,496✔
2926
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
183,097✔
2927
  } else {
2928
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
8,399✔
2929
  }
2930

2931
  if (tsem2_wait(&pInfo->sem) != 0){
191,496✔
2932
    tqErrorC("failed to wait sem for sync commit");
×
2933
  }
2934
  code = pInfo->code;
191,496✔
2935

2936
  if(tsem2_destroy(&pInfo->sem) != 0) {
191,496✔
2937
    tqErrorC("failed to destroy sem for sync commit");
×
2938
  }
2939
  taosMemoryFree(pInfo);
191,496✔
2940

2941
  tqDebugC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
191,496✔
2942
  return code;
191,496✔
2943
}
2944

2945
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2946
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
14,053✔
2947
  if (offset == NULL) {
14,053✔
2948
    tqErrorC("invalid offset, null");
×
2949
    return TSDB_CODE_INVALID_PARA;
×
2950
  }
2951
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
14,053✔
2952
    tqErrorC("Assignment or poll interface need to be called first");
×
2953
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
2954
  }
2955

2956
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
14,053✔
2957
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
58✔
2958
             offset->walVerBegin, offset->walVerEnd);
2959
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
58✔
2960
  }
2961

2962
  return 0;
13,995✔
2963
}
2964

2965
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
10,731✔
2966
  if (tmq == NULL || pTopicName == NULL) {
10,731✔
2967
    tqErrorC("invalid tmq handle, null");
×
2968
    return TSDB_CODE_INVALID_PARA;
×
2969
  }
2970

2971
  int32_t accId = tmq->pTscObj->acctId;
10,731✔
2972
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
10,731✔
2973
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
10,731✔
2974

2975
  taosWLockLatch(&tmq->lock);
10,731✔
2976
  SMqClientVg* pVg = NULL;
10,731✔
2977
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
10,731✔
2978
  if (code != 0) {
10,731✔
2979
    taosWUnLockLatch(&tmq->lock);
6,520✔
2980
    return code;
6,520✔
2981
  }
2982

2983
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
4,211✔
2984
  code = checkWalRange(pOffsetInfo, offset);
4,211✔
2985
  if (code != 0) {
4,211✔
2986
    taosWUnLockLatch(&tmq->lock);
×
2987
    return code;
×
2988
  }
2989
  taosWUnLockLatch(&tmq->lock);
4,211✔
2990

2991
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
4,211✔
2992

2993
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
4,211✔
2994
  if (pInfo == NULL) {
4,211✔
2995
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2996
    return terrno;
×
2997
  }
2998

2999
  code = tsem2_init(&pInfo->sem, 0, 0);
4,211✔
3000
  if (code != 0) {
4,211✔
3001
    taosMemoryFree(pInfo);
×
3002
    return code;
×
3003
  }
3004
  pInfo->code = 0;
4,211✔
3005

3006
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
4,211✔
3007
  if (code == 0) {
4,211✔
3008
    if (tsem2_wait(&pInfo->sem) != 0){
1,776✔
3009
      tqErrorC("failed to wait sem for sync commit offset");
×
3010
    }
3011
    code = pInfo->code;
1,776✔
3012
  }
3013

3014
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
4,211✔
3015
  if(tsem2_destroy(&pInfo->sem) != 0) {
4,211✔
3016
    tqErrorC("failed to destroy sem for sync commit offset");
×
3017
  }
3018
  taosMemoryFree(pInfo);
4,211✔
3019

3020
  tqDebugC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
4,211✔
3021
          offset, tstrerror(code));
3022

3023
  return code;
4,211✔
3024
}
3025

3026
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
3,912✔
3027
                             void* param) {
3028
  int32_t code = 0;
3,912✔
3029
  if (tmq == NULL || pTopicName == NULL) {
3,912✔
3030
    tqErrorC("invalid tmq handle, null");
×
3031
    code = TSDB_CODE_INVALID_PARA;
×
3032
    goto end;
×
3033
  }
3034

3035
  int32_t accId = tmq->pTscObj->acctId;
3,912✔
3036
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
3,912✔
3037
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
3,912✔
3038

3039
  taosWLockLatch(&tmq->lock);
3,912✔
3040
  SMqClientVg* pVg = NULL;
3,912✔
3041
  code = getClientVg(tmq, tname, vgId, &pVg);
3,912✔
3042
  if (code != 0) {
3,912✔
3043
    taosWUnLockLatch(&tmq->lock);
3,260✔
3044
    goto end;
3,260✔
3045
  }
3046

3047
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
652✔
3048
  code = checkWalRange(pOffsetInfo, offset);
652✔
3049
  if (code != 0) {
652✔
3050
    taosWUnLockLatch(&tmq->lock);
×
3051
    goto end;
×
3052
  }
3053
  taosWUnLockLatch(&tmq->lock);
652✔
3054

3055
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
652✔
3056

3057
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
652✔
3058

3059
  tqDebugC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
652✔
3060
          offset, tstrerror(code));
3061

3062
  end:
3,912✔
3063
  if (code != 0 && cb != NULL) {
3,912✔
3064
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
3065
    cb(tmq, code, param);
×
3066
  }
3067
}
3,912✔
3068

3069

3070
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
115,498,856✔
3071
  if (res == NULL || pResInfo == NULL) {
115,498,856✔
3072
    return TSDB_CODE_INVALID_PARA;
×
3073
  }
3074
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
115,498,909✔
3075
  SMqDataRsp* data = &pRspObj->dataRsp;
115,498,909✔
3076

3077
  pRspObj->resIter++;
115,498,909✔
3078
  if (pRspObj->resIter < data->blockNum) {
115,498,909✔
3079
    doFreeReqResultInfo(&pRspObj->resInfo);
102,936,129✔
3080
    SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
102,936,182✔
3081
    if (pSW) {
102,936,129✔
3082
      TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
102,936,129✔
3083
    }
3084
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
102,936,182✔
3085
    void*   rawData = NULL;
102,936,182✔
3086
    int64_t rows = 0;
102,936,182✔
3087
    int32_t precision = 0;
102,936,182✔
3088
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
102,936,182✔
3089

3090
    pRspObj->resInfo.pData = rawData;
102,935,856✔
3091
    pRspObj->resInfo.numOfRows = rows;
102,935,856✔
3092
    pRspObj->resInfo.current = 0;
102,935,856✔
3093
    pRspObj->resInfo.precision = precision;
102,935,856✔
3094

3095
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
102,935,856✔
3096
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4, false);
102,936,182✔
3097
    if (code != 0) {
102,935,530✔
3098
      return code;
×
3099
    }
3100
    *pResInfo = &pRspObj->resInfo;
102,935,530✔
3101
    return code;
102,934,878✔
3102
  }
3103

3104
  return TSDB_CODE_TSC_INTERNAL_ERROR;
12,562,780✔
3105
}
3106

3107
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
706✔
3108
  if (param == NULL || pMsg == NULL) {
706✔
3109
    return code;
×
3110
  }
3111
  SMqVgWalInfoParam* pParam = param;
706✔
3112
  SMqVgCommon*       pCommon = pParam->pCommon;
706✔
3113

3114
  if (code != TSDB_CODE_SUCCESS) {
706✔
3115
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3116
             pParam->vgId, pCommon->pTopicName);
3117

3118
  } else {
3119
    SMqDataRsp rsp = {0};
706✔
3120
    SDecoder   decoder = {0};
706✔
3121
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
706✔
3122
    code = tDecodeMqDataRsp(&decoder, &rsp);
706✔
3123
    tDecoderClear(&decoder);
705✔
3124
    if (code != 0) {
706✔
3125
      goto END;
×
3126
    }
3127

3128
    SMqRspHead*          pHead = pMsg->pData;
706✔
3129
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
1,268✔
3130
        .end = pHead->walever + 1,
706✔
3131
        .currentOffset = rsp.rspOffset.version,
706✔
3132
        .vgId = pParam->vgId};
706✔
3133

3134
    (void)taosThreadMutexLock(&pCommon->mutex);
706✔
3135
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
1,412✔
3136
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3137
               pParam->vgId, pCommon->pTopicName);
3138
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
3139
    }
3140
    (void)taosThreadMutexUnlock(&pCommon->mutex);
706✔
3141
  }
3142

3143
  END:
706✔
3144
  pCommon->code = code;
706✔
3145
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
706✔
3146
  if (total == pParam->totalReq) {
706✔
3147
    if (tsem2_post(&pCommon->rsp) != 0) {
382✔
3148
      tqErrorC("failed to post semaphore in get wal cb");
×
3149
    }
3150
  }
3151

3152
  if (pMsg) {
706✔
3153
    taosMemoryFree(pMsg->pData);
706✔
3154
    taosMemoryFree(pMsg->pEpSet);
706✔
3155
  }
3156

3157
  return code;
706✔
3158
}
3159

3160
static void destroyCommonInfo(SMqVgCommon* pCommon) {
1,875✔
3161
  if (pCommon == NULL) {
1,875✔
3162
    return;
1,493✔
3163
  }
3164
  taosArrayDestroy(pCommon->pList);
382✔
3165
  pCommon->pList = NULL;
382✔
3166
  if(tsem2_destroy(&pCommon->rsp) != 0) {
382✔
3167
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
3168
  }
3169
  taosMemoryFreeClear(pCommon->pTopicName);
382✔
3170
  (void)taosThreadMutexDestroy(&pCommon->mutex);
382✔
3171
  taosMemoryFree(pCommon);
382✔
3172
}
3173

3174
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
24,720✔
3175
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
24,720✔
3176
    return true;
×
3177
  }
3178
  return false;
24,720✔
3179
}
3180

3181
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
991✔
3182
  if (param == NULL) {
991✔
3183
    return code;
×
3184
  }
3185
  SMqCommittedParam* pParam = param;
991✔
3186

3187
  if (code != 0) {
991✔
3188
    goto end;
710✔
3189
  }
3190
  if (pMsg) {
281✔
3191
    SDecoder decoder = {0};
281✔
3192
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
281✔
3193
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
281✔
3194
    if (err < 0) {
281✔
3195
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3196
      code = err;
×
3197
      goto end;
×
3198
    }
3199
    tDecoderClear(&decoder);
281✔
3200
  }
3201

3202
  end:
×
3203
  if (pMsg) {
991✔
3204
    taosMemoryFree(pMsg->pData);
991✔
3205
    taosMemoryFree(pMsg->pEpSet);
991✔
3206
  }
3207
  pParam->code = code;
991✔
3208
  if (tsem2_post(&pParam->sem) != 0){
991✔
3209
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
3210
  }
3211
  return code;
991✔
3212
}
3213

3214
int32_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet, int64_t* committed) {
991✔
3215
  if (tmq == NULL || tname == NULL || epSet == NULL) {
991✔
3216
    return TSDB_CODE_INVALID_PARA;
×
3217
  }
3218
  int32_t     code = 0;
991✔
3219
  SMqVgOffset pOffset = {0};
991✔
3220

3221
  pOffset.consumerId = tmq->consumerId;
991✔
3222
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
991✔
3223

3224
  int32_t len = 0;
991✔
3225
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
991✔
3226
  if (code < 0) {
991✔
3227
    return TSDB_CODE_INVALID_PARA;
×
3228
  }
3229

3230
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
991✔
3231
  if (buf == NULL) {
991✔
3232
    return terrno;
×
3233
  }
3234

3235
  ((SMsgHead*)buf)->vgId = htonl(vgId);
991✔
3236

3237
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
991✔
3238

3239
  SEncoder encoder = {0};
991✔
3240
  tEncoderInit(&encoder, abuf, len);
991✔
3241
  code = tEncodeMqVgOffset(&encoder, &pOffset);
991✔
3242
  if (code < 0) {
991✔
3243
    taosMemoryFree(buf);
×
3244
    tEncoderClear(&encoder);
×
3245
    return code;
×
3246
  }
3247
  tEncoderClear(&encoder);
991✔
3248

3249
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
991✔
3250
  if (sendInfo == NULL) {
991✔
3251
    taosMemoryFree(buf);
×
3252
    return terrno;
×
3253
  }
3254

3255
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
991✔
3256
  if (pParam == NULL) {
991✔
3257
    taosMemoryFree(buf);
×
3258
    taosMemoryFree(sendInfo);
×
3259
    return terrno;
×
3260
  }
3261
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
991✔
3262
    taosMemoryFree(buf);
×
3263
    taosMemoryFree(sendInfo);
×
3264
    taosMemoryFree(pParam);
×
3265
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3266
  }
3267

3268
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
991✔
3269
  sendInfo->requestId = generateRequestId();
991✔
3270
  sendInfo->requestObjRefId = 0;
991✔
3271
  sendInfo->param = pParam;
991✔
3272
  sendInfo->fp = tmCommittedCb;
991✔
3273
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
991✔
3274

3275
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
991✔
3276
  if (code != 0) {
991✔
3277
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3278
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3279
    }
3280
    taosMemoryFree(pParam);
×
3281
    return code;
×
3282
  }
3283

3284
  if (tsem2_wait(&pParam->sem) != 0){
991✔
3285
    tqErrorC("failed to wait semaphore in get committed from server");
×
3286
  }
3287
  code = pParam->code;
991✔
3288
  if (code == TSDB_CODE_SUCCESS) {
991✔
3289
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
281✔
3290
      *committed = pParam->vgOffset.offset.val.version;
281✔
3291
    } else {
3292
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3293
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3294
    }
3295
  }
3296
  if(tsem2_destroy(&pParam->sem) != 0) {
991✔
3297
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3298
  }
3299
  taosMemoryFree(pParam);
991✔
3300

3301
  return code;
991✔
3302
}
3303

3304
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
10,218✔
3305
  if (tmq == NULL || pTopicName == NULL) {
10,218✔
3306
    tqErrorC("invalid tmq handle, null");
×
3307
    return TSDB_CODE_INVALID_PARA;
×
3308
  }
3309

3310
  int32_t accId = tmq->pTscObj->acctId;
10,218✔
3311
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
10,218✔
3312
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
10,218✔
3313

3314
  taosWLockLatch(&tmq->lock);
10,218✔
3315

3316
  SMqClientVg* pVg = NULL;
10,218✔
3317
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
10,218✔
3318
  if (code != 0) {
10,218✔
3319
    taosWUnLockLatch(&tmq->lock);
4,238✔
3320
    return code;
4,238✔
3321
  }
3322

3323
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
5,980✔
3324
  int32_t        type = pOffsetInfo->endOffset.type;
5,980✔
3325
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
5,980✔
3326
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3327
    taosWUnLockLatch(&tmq->lock);
×
3328
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3329
  }
3330

3331
  code = checkWalRange(pOffsetInfo, -1);
5,980✔
3332
  if (code != 0) {
5,980✔
3333
    taosWUnLockLatch(&tmq->lock);
×
3334
    return code;
×
3335
  }
3336
  SEpSet  epSet = pVg->epSet;
5,980✔
3337
  int64_t begin = pVg->offsetInfo.walVerBegin;
5,980✔
3338
  int64_t end = pVg->offsetInfo.walVerEnd;
5,980✔
3339
  taosWUnLockLatch(&tmq->lock);
5,980✔
3340

3341
  int64_t position = 0;
5,980✔
3342
  if (type == TMQ_OFFSET__LOG) {
5,980✔
3343
    position = pOffsetInfo->endOffset.version;
5,699✔
3344
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
281✔
3345
    code = getCommittedFromServer(tmq, tname, vgId, &epSet, &position);
281✔
3346
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
281✔
3347
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3348
        position = begin;
×
3349
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3350
        position = end;
×
3351
      } else {
3352
        tqErrorC("consumer:0x%" PRIx64 " invalid offset type:%d", tmq->consumerId, type);
×
3353
        return TSDB_CODE_INTERNAL_ERROR;
×
3354
      }
3355
    } else if(code != 0) {
281✔
3356
      tqErrorC("consumer:0x%" PRIx64 " getCommittedFromServer error,%d", tmq->consumerId, code);
×
3357
      return code;
×
3358
    }
3359
  } else {
3360
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3361
    return TSDB_CODE_INTERNAL_ERROR;
×
3362
  }
3363

3364
  tqDebugC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
5,980✔
3365
  return position;
5,980✔
3366
}
3367

3368
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
7,578✔
3369
  if (tmq == NULL || pTopicName == NULL) {
7,578✔
3370
    tqErrorC("invalid tmq handle, null");
×
3371
    return TSDB_CODE_INVALID_PARA;
×
3372
  }
3373

3374
  int32_t accId = tmq->pTscObj->acctId;
7,578✔
3375
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
7,578✔
3376
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
7,578✔
3377

3378
  taosWLockLatch(&tmq->lock);
7,578✔
3379

3380
  SMqClientVg* pVg = NULL;
7,578✔
3381
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
7,578✔
3382
  if (code != 0) {
7,578✔
3383
    taosWUnLockLatch(&tmq->lock);
1,630✔
3384
    return code;
1,630✔
3385
  }
3386

3387
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
5,948✔
3388
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
5,948✔
3389
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3390
             pOffsetInfo->endOffset.type);
3391
    taosWUnLockLatch(&tmq->lock);
×
3392
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3393
  }
3394

3395
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
5,948✔
3396
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3397
             pOffsetInfo->committedOffset.type);
3398
    taosWUnLockLatch(&tmq->lock);
×
3399
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3400
  }
3401

3402
  int64_t committed = 0;
5,948✔
3403
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
5,948✔
3404
    committed = pOffsetInfo->committedOffset.version;
5,238✔
3405
    taosWUnLockLatch(&tmq->lock);
5,238✔
3406
    goto end;
5,238✔
3407
  }
3408
  SEpSet epSet = pVg->epSet;
710✔
3409
  taosWUnLockLatch(&tmq->lock);
710✔
3410

3411
  code = getCommittedFromServer(tmq, tname, vgId, &epSet, &committed);
710✔
3412
  if (code != 0) {
710✔
3413
    tqErrorC("consumer:0x%" PRIx64 " getCommittedFromServer error,%d", tmq->consumerId, code);
710✔
3414
    return code;
710✔
3415
  }
3416

3417
  end:
5,238✔
3418
  tqDebugC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
5,238✔
3419
  return committed;
5,238✔
3420
}
3421

3422
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
7,743✔
3423
                                 int32_t* numOfAssignment) {
3424
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
7,743✔
3425
    tqErrorC("invalid tmq handle, null");
5,868✔
3426
    return TSDB_CODE_INVALID_PARA;
5,868✔
3427
  }
3428
  *numOfAssignment = 0;
1,875✔
3429
  *assignment = NULL;
1,875✔
3430
  SMqVgCommon* pCommon = NULL;
1,875✔
3431

3432
  int32_t accId = tmq->pTscObj->acctId;
1,875✔
3433
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
1,875✔
3434
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
1,875✔
3435

3436
  taosWLockLatch(&tmq->lock);
1,875✔
3437

3438
  SMqClientTopic* pTopic = NULL;
1,875✔
3439
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
1,875✔
3440
  if (code != 0) {
1,875✔
3441
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
3442
    goto end;
×
3443
  }
3444

3445
  // in case of snapshot is opened, no valid offset will return
3446
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
1,875✔
3447
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5,509✔
3448
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
3,634✔
3449
    if (pClientVg == NULL) {
3,634✔
3450
      continue;
×
3451
    }
3452
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
3,634✔
3453
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
3,634✔
3454
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3455
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3456
      goto end;
×
3457
    }
3458
  }
3459

3460
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
1,875✔
3461
  if (*assignment == NULL) {
1,875✔
3462
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3463
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3464
    code = terrno;
×
3465
    goto end;
×
3466
  }
3467

3468
  bool needFetch = false;
1,875✔
3469

3470
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5,084✔
3471
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
3,591✔
3472
    if (pClientVg == NULL) {
3,591✔
3473
      continue;
×
3474
    }
3475
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
3,591✔
3476
      needFetch = true;
382✔
3477
      break;
382✔
3478
    }
3479

3480
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
3,209✔
3481
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
3,209✔
3482
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
3,209✔
3483
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
3,209✔
3484
    pAssignment->vgId = pClientVg->vgId;
3,209✔
3485
    tqDebugC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
3,209✔
3486
            pAssignment->currentOffset);
3487
  }
3488

3489
  if (needFetch) {
1,875✔
3490
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
382✔
3491
    if (pCommon == NULL) {
382✔
3492
      code = terrno;
×
3493
      goto end;
×
3494
    }
3495

3496
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
382✔
3497
    if (pCommon->pList == NULL) {
382✔
3498
      code = terrno;
×
3499
      goto end;
×
3500
    }
3501

3502
    code = tsem2_init(&pCommon->rsp, 0, 0);
382✔
3503
    if (code != 0) {
382✔
3504
      goto end;
×
3505
    }
3506
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
382✔
3507
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
382✔
3508
    if (pCommon->pTopicName == NULL) {
382✔
3509
      code = terrno;
×
3510
      goto end;
×
3511
    }
3512
    pCommon->consumerId = tmq->consumerId;
382✔
3513
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
1,088✔
3514
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
706✔
3515
      if (pClientVg == NULL) {
706✔
3516
        continue;
×
3517
      }
3518
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
706✔
3519
      if (pParam == NULL) {
706✔
3520
        code = terrno;
×
3521
        goto end;
×
3522
      }
3523

3524
      pParam->epoch = atomic_load_32(&tmq->epoch);
706✔
3525
      pParam->vgId = pClientVg->vgId;
706✔
3526
      pParam->totalReq = *numOfAssignment;
706✔
3527
      pParam->pCommon = pCommon;
706✔
3528

3529
      SMqPollReq req = {0};
706✔
3530
      tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg);
706✔
3531
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
706✔
3532

3533
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
706✔
3534
      if (msgSize < 0) {
706✔
3535
        taosMemoryFree(pParam);
×
3536
        code = msgSize;
×
3537
        goto end;
×
3538
      }
3539

3540
      char* msg = taosMemoryCalloc(1, msgSize);
706✔
3541
      if (NULL == msg) {
706✔
3542
        taosMemoryFree(pParam);
×
3543
        code = terrno;
×
3544
        goto end;
×
3545
      }
3546

3547
      msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
706✔
3548
      if (msgSize < 0) {
706✔
3549
        taosMemoryFree(msg);
×
3550
        taosMemoryFree(pParam);
×
3551
        code = msgSize;
×
3552
        goto end;
×
3553
      }
3554

3555
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
706✔
3556
      if (sendInfo == NULL) {
706✔
3557
        taosMemoryFree(pParam);
×
3558
        taosMemoryFree(msg);
×
3559
        code = terrno;
×
3560
        goto end;
×
3561
      }
3562

3563
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
706✔
3564
      sendInfo->requestId = req.reqId;
706✔
3565
      sendInfo->requestObjRefId = 0;
706✔
3566
      sendInfo->param = pParam;
706✔
3567
      sendInfo->paramFreeFp = taosAutoMemoryFree;
706✔
3568
      sendInfo->fp = tmqGetWalInfoCb;
706✔
3569
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
706✔
3570

3571
      // int64_t transporterId = 0;
3572
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
706✔
3573
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
706✔
3574

3575
      tqDebugC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, QID:0x%" PRIx64, tmq->consumerId,
706✔
3576
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3577
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
706✔
3578
      if (code != 0) {
706✔
3579
        goto end;
×
3580
      }
3581
    }
3582

3583
    if (tsem2_wait(&pCommon->rsp) != 0){
382✔
3584
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3585
    }
3586
    code = pCommon->code;
382✔
3587

3588
    if (code != TSDB_CODE_SUCCESS) {
382✔
3589
      goto end;
×
3590
    }
3591
    int32_t num = taosArrayGetSize(pCommon->pList);
382✔
3592
    for (int32_t i = 0; i < num; ++i) {
1,088✔
3593
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
706✔
3594
    }
3595
    *numOfAssignment = num;
382✔
3596

3597
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
1,088✔
3598
      tmq_topic_assignment* p = &(*assignment)[j];
706✔
3599

3600
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
2,060✔
3601
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
1,354✔
3602
        if (pClientVg == NULL) {
1,354✔
3603
          continue;
×
3604
        }
3605
        if (pClientVg->vgId != p->vgId) {
1,354✔
3606
          continue;
648✔
3607
        }
3608

3609
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
706✔
3610
        tqDebugC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
706✔
3611
                p->vgId, p->currentOffset);
3612

3613
        pOffsetInfo->walVerBegin = p->begin;
706✔
3614
        pOffsetInfo->walVerEnd = p->end;
706✔
3615
      }
3616
    }
3617
  }
3618

3619
  end:
1,875✔
3620
  if (code != TSDB_CODE_SUCCESS) {
1,875✔
3621
    taosMemoryFree(*assignment);
×
3622
    *assignment = NULL;
×
3623
    *numOfAssignment = 0;
×
3624
  }
3625
  destroyCommonInfo(pCommon);
1,875✔
3626
  taosWUnLockLatch(&tmq->lock);
1,875✔
3627
  return code;
1,875✔
3628
}
3629

3630
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
10,900✔
3631
  if (pAssignment == NULL) {
10,900✔
3632
    return;
9,128✔
3633
  }
3634

3635
  taosMemoryFree(pAssignment);
1,772✔
3636
}
3637

3638
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
3,152✔
3639
  if (pMsg) {
3,152✔
3640
    taosMemoryFree(pMsg->pData);
3,152✔
3641
    taosMemoryFree(pMsg->pEpSet);
3,152✔
3642
  }
3643
  if (param == NULL) {
3,152✔
3644
    return code;
×
3645
  }
3646
  SMqSeekParam* pParam = param;
3,152✔
3647
  pParam->code = code;
3,152✔
3648
  if (tsem2_post(&pParam->sem) != 0){
3,152✔
3649
    tqErrorC("failed to post sem in tmqSeekCb");
×
3650
  }
3651
  return 0;
3,152✔
3652
}
3653

3654
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3655
// there is no data to poll
3656
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
5,608✔
3657
  if (tmq == NULL || pTopicName == NULL) {
5,608✔
3658
    tqErrorC("invalid tmq handle, null");
×
3659
    return TSDB_CODE_INVALID_PARA;
×
3660
  }
3661

3662
  int32_t accId = tmq->pTscObj->acctId;
5,608✔
3663
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
5,608✔
3664
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
5,608✔
3665

3666
  taosWLockLatch(&tmq->lock);
5,608✔
3667

3668
  SMqClientVg* pVg = NULL;
5,608✔
3669
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
5,608✔
3670
  if (code != 0) {
5,608✔
3671
    taosWUnLockLatch(&tmq->lock);
2,398✔
3672
    return code;
2,398✔
3673
  }
3674

3675
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
3,210✔
3676

3677
  int32_t type = pOffsetInfo->endOffset.type;
3,210✔
3678
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
3,210✔
3679
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3680
    taosWUnLockLatch(&tmq->lock);
×
3681
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3682
  }
3683

3684
  code = checkWalRange(pOffsetInfo, offset);
3,210✔
3685
  if (code != 0) {
3,210✔
3686
    taosWUnLockLatch(&tmq->lock);
58✔
3687
    return code;
58✔
3688
  }
3689

3690
  tqDebugC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
3,152✔
3691
  // update the offset, and then commit to vnode
3692
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
3,152✔
3693
  pOffsetInfo->endOffset.version = offset;
3,152✔
3694
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
3,152✔
3695
  pVg->seekUpdated = true;
3,152✔
3696
  SEpSet epSet = pVg->epSet;
3,152✔
3697
  taosWUnLockLatch(&tmq->lock);
3,152✔
3698

3699
  SMqSeekReq req = {0};
3,152✔
3700
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
3,152✔
3701
  req.head.vgId = vgId;
3,152✔
3702
  req.consumerId = tmq->consumerId;
3,152✔
3703

3704
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
3,152✔
3705
  if (msgSize < 0) {
3,152✔
3706
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3707
  }
3708

3709
  char* msg = taosMemoryCalloc(1, msgSize);
3,152✔
3710
  if (NULL == msg) {
3,152✔
3711
    return terrno;
×
3712
  }
3713

3714
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
3,152✔
3715
    taosMemoryFree(msg);
×
3716
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3717
  }
3718

3719
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,152✔
3720
  if (sendInfo == NULL) {
3,152✔
3721
    taosMemoryFree(msg);
×
3722
    return terrno;
×
3723
  }
3724

3725
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
3,152✔
3726
  if (pParam == NULL) {
3,152✔
3727
    taosMemoryFree(msg);
×
3728
    taosMemoryFree(sendInfo);
×
3729
    return terrno;
×
3730
  }
3731
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
3,152✔
3732
    taosMemoryFree(msg);
×
3733
    taosMemoryFree(sendInfo);
×
3734
    taosMemoryFree(pParam);
×
3735
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3736
  }
3737

3738
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
3,152✔
3739
  sendInfo->requestId = generateRequestId();
3,152✔
3740
  sendInfo->requestObjRefId = 0;
3,152✔
3741
  sendInfo->param = pParam;
3,152✔
3742
  sendInfo->fp = tmqSeekCb;
3,152✔
3743
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
3,152✔
3744

3745
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
3,152✔
3746
  if (code != 0) {
3,152✔
3747
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3748
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3749
    }
3750
    taosMemoryFree(pParam);
×
3751
    return code;
×
3752
  }
3753

3754
  if (tsem2_wait(&pParam->sem) != 0){
3,152✔
3755
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3756
  }
3757
  code = pParam->code;
3,152✔
3758
  if(tsem2_destroy(&pParam->sem) != 0) {
3,152✔
3759
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3760
  }
3761
  taosMemoryFree(pParam);
3,152✔
3762

3763
  tqDebugC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
3,152✔
3764

3765
  return code;
3,152✔
3766
}
3767

3768
TAOS* tmq_get_connect(tmq_t* tmq) {
6,194✔
3769
  if (tmq && tmq->pTscObj) {
6,194✔
3770
    return (TAOS*)(&(tmq->pTscObj->id));
6,194✔
3771
  }
3772
  return NULL;
×
3773
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc