• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.44
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24
#include "tref.h"
25
#include "ttimer.h"
26

27
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}     while(0)
28
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}            while(0)
29
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30

31
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
32
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
33
#define DEFAULT_HEARTBEAT_INTERVAL     3000
34
#define DEFAULT_ASKEP_INTERVAL         1000
35
#define DEFAULT_COMMIT_CNT             1
36
#define SUBSCRIBE_RETRY_MAX_COUNT      240
37
#define SUBSCRIBE_RETRY_INTERVAL       500
38

39

40
#define SET_ERROR_MSG_TMQ(MSG) \
41
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
42

43
#define PROCESS_POLL_RSP(FUNC,DATA) \
44
  SDecoder decoder = {0}; \
45
  tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); \
46
  if (FUNC(&decoder, DATA) < 0) { \
47
    tDecoderClear(&decoder); \
48
    code = terrno; \
49
    goto END;\
50
  }\
51
  tDecoderClear(&decoder);\
52
  (void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead));
53

54
#define DELETE_POLL_RSP(FUNC,DATA) \
55
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
56
  taosMemoryFreeClear(pRsp->pEpset);\
57
  FUNC(DATA);
58

59
enum {
60
  TMQ_VG_STATUS__IDLE = 0,
61
  TMQ_VG_STATUS__WAIT,
62
};
63

64
enum {
65
  TMQ_CONSUMER_STATUS__INIT = 0,
66
  TMQ_CONSUMER_STATUS__READY,
67
  TMQ_CONSUMER_STATUS__CLOSED,
68
};
69

70
enum {
71
  TMQ_DELAYED_TASK__ASK_EP = 1,
72
  TMQ_DELAYED_TASK__COMMIT,
73
};
74

75
typedef struct {
76
  tmr_h   timer;
77
  int32_t rsetId;
78
} SMqMgmt;
79

80
struct tmq_list_t {
81
  SArray container;
82
};
83

84
struct tmq_conf_t {
85
  char           clientId[TSDB_CLIENT_ID_LEN];
86
  char           groupId[TSDB_CGROUP_LEN];
87
  int8_t         autoCommit;
88
  int8_t         resetOffset;
89
  int8_t         withTbName;
90
  int8_t         snapEnable;
91
  int8_t         replayEnable;
92
  int8_t         sourceExcluded;  // do not consume, bit
93
  uint16_t       port;
94
  int32_t        autoCommitInterval;
95
  int32_t        sessionTimeoutMs;
96
  int32_t        heartBeatIntervalMs;
97
  int32_t        maxPollIntervalMs;
98
  char*          ip;
99
  char*          user;
100
  char*          pass;
101
  tmq_commit_cb* commitCb;
102
  void*          commitCbUserParam;
103
  int8_t         enableBatchMeta;
104
};
105

106
struct tmq_t {
107
  int64_t        refId;
108
  char           groupId[TSDB_CGROUP_LEN];
109
  char           clientId[TSDB_CLIENT_ID_LEN];
110
  char           user[TSDB_USER_LEN];
111
  char           fqdn[TSDB_FQDN_LEN];
112
  int8_t         withTbName;
113
  int8_t         useSnapshot;
114
  int8_t         autoCommit;
115
  int32_t        autoCommitInterval;
116
  int32_t        sessionTimeoutMs;
117
  int32_t        heartBeatIntervalMs;
118
  int32_t        maxPollIntervalMs;
119
  int8_t         resetOffsetCfg;
120
  int8_t         replayEnable;
121
  int8_t         sourceExcluded;  // do not consume, bit
122
  int64_t        consumerId;
123
  tmq_commit_cb* commitCb;
124
  void*          commitCbUserParam;
125
  int8_t         enableBatchMeta;
126

127
  // status
128
  SRWLatch lock;
129
  int8_t   status;
130
  int32_t  epoch;
131
  // poll info
132
  int64_t pollCnt;
133
  int64_t totalRows;
134
  int8_t  pollFlag;
135

136
  // timer
137
  tmr_h       hbLiveTimer;
138
  tmr_h       epTimer;
139
  tmr_h       commitTimer;
140
  STscObj*    pTscObj;       // connection
141
  SArray*     clientTopics;  // SArray<SMqClientTopic>
142
  STaosQueue* mqueue;        // queue of rsp
143
  STaosQall*  qall;
144
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
145
  tsem2_t     rspSem;
146
};
147

148
typedef struct {
149
  int32_t code;
150
  tsem2_t sem;
151
} SAskEpInfo;
152

153
typedef struct {
154
  STqOffsetVal committedOffset;
155
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
156
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
157
  int64_t      walVerBegin;
158
  int64_t      walVerEnd;
159
} SVgOffsetInfo;
160

161
typedef struct {
162
  int64_t       pollCnt;
163
  int64_t       numOfRows;
164
  SVgOffsetInfo offsetInfo;
165
  int32_t       vgId;
166
  int32_t       vgStatus;
167
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
168
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
169
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
170
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
171
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
172
  SEpSet        epSet;
173
} SMqClientVg;
174

175
typedef struct {
176
  char           topicName[TSDB_TOPIC_FNAME_LEN];
177
  char           db[TSDB_DB_FNAME_LEN];
178
  SArray*        vgs;  // SArray<SMqClientVg>
179
  SSchemaWrapper schema;
180
  int8_t         noPrivilege;
181
} SMqClientTopic;
182

183
typedef struct {
184
  int32_t         vgId;
185
  char            topicName[TSDB_TOPIC_FNAME_LEN];
186
  SMqClientTopic* topicHandle;
187
  uint64_t        reqId;
188
  SEpSet*         pEpset;
189
  union {
190
    struct{
191
      SMqRspHead   head;
192
      STqOffsetVal rspOffset;
193
    };
194
    SMqDataRsp      dataRsp;
195
    SMqMetaRsp      metaRsp;
196
    SMqBatchMetaRsp batchMetaRsp;
197
  };
198
} SMqPollRspWrapper;
199

200
typedef struct {
201
  int32_t code;
202
  int8_t  tmqRspType;
203
  int32_t epoch;
204
  union{
205
    SMqPollRspWrapper pollRsp;
206
    SMqAskEpRsp       epRsp;
207
  };
208
} SMqRspWrapper;
209

210
typedef struct {
211
  tsem2_t rspSem;
212
  int32_t rspErr;
213
} SMqSubscribeCbParam;
214

215
typedef struct {
216
  int64_t refId;
217
  bool    sync;
218
  void*   pParam;
219
} SMqAskEpCbParam;
220

221
typedef struct {
222
  int64_t  refId;
223
  char     topicName[TSDB_TOPIC_FNAME_LEN];
224
  int32_t  vgId;
225
  uint64_t requestId;  // request id for debug purpose
226
} SMqPollCbParam;
227

228
typedef struct {
229
  tsem2_t       rsp;
230
  int32_t       numOfRsp;
231
  SArray*       pList;
232
  TdThreadMutex mutex;
233
  int64_t       consumerId;
234
  char*         pTopicName;
235
  int32_t       code;
236
} SMqVgCommon;
237

238
typedef struct {
239
  tsem2_t sem;
240
  int32_t code;
241
} SMqSeekParam;
242

243
typedef struct {
244
  tsem2_t     sem;
245
  int32_t     code;
246
  SMqVgOffset vgOffset;
247
} SMqCommittedParam;
248

249
typedef struct {
250
  int32_t      vgId;
251
  int32_t      epoch;
252
  int32_t      totalReq;
253
  SMqVgCommon* pCommon;
254
} SMqVgWalInfoParam;
255

256
typedef struct {
257
  int64_t        refId;
258
  int32_t        epoch;
259
  int32_t        waitingRspNum;
260
  int32_t        code;
261
  tmq_commit_cb* callbackFn;
262
  void*          userParam;
263
} SMqCommitCbParamSet;
264

265
typedef struct {
266
  SMqCommitCbParamSet* params;
267
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
268
  int32_t              vgId;
269
  int64_t              consumerId;
270
} SMqCommitCbParam;
271

272
typedef struct {
273
  tsem2_t sem;
274
  int32_t code;
275
} SSyncCommitInfo;
276

277
typedef struct {
278
  STqOffsetVal currentOffset;
279
  STqOffsetVal commitOffset;
280
  STqOffsetVal seekOffset;
281
  int64_t      numOfRows;
282
  int32_t      vgStatus;
283
} SVgroupSaveInfo;
284

285
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
286
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
287
static   SMqMgmt        tmqMgmt = {0};
288

289
tmq_conf_t* tmq_conf_new() {
410✔
290
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
410!
291
  if (conf == NULL) {
412!
292
    return conf;
×
293
  }
294

295
  conf->withTbName = false;
412✔
296
  conf->autoCommit = true;
412✔
297
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
412✔
298
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
412✔
299
  conf->enableBatchMeta = false;
412✔
300
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
412✔
301
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
412✔
302
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
412✔
303

304
  return conf;
412✔
305
}
306

307
void tmq_conf_destroy(tmq_conf_t* conf) {
412✔
308
  if (conf) {
412!
309
    if (conf->ip) {
412!
UNCOV
310
      taosMemoryFree(conf->ip);
×
311
    }
312
    if (conf->user) {
412!
313
      taosMemoryFree(conf->user);
412!
314
    }
315
    if (conf->pass) {
412!
316
      taosMemoryFree(conf->pass);
412!
317
    }
318
    taosMemoryFree(conf);
412!
319
  }
320
}
412✔
321

322
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
2,937✔
323
  int32_t code = 0;
2,937✔
324
  if (conf == NULL || key == NULL || value == NULL) {
2,937!
325
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
326
    return TMQ_CONF_INVALID;
×
327
  }
328
  if (strcasecmp(key, "group.id") == 0) {
2,937✔
329
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
412✔
330
    return TMQ_CONF_OK;
412✔
331
  }
332

333
  if (strcasecmp(key, "client.id") == 0) {
2,525✔
334
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
54✔
335
    return TMQ_CONF_OK;
54✔
336
  }
337

338
  if (strcasecmp(key, "enable.auto.commit") == 0) {
2,471✔
339
    if (strcasecmp(value, "true") == 0) {
400✔
340
      conf->autoCommit = true;
185✔
341
      return TMQ_CONF_OK;
185✔
342
    } else if (strcasecmp(value, "false") == 0) {
215!
343
      conf->autoCommit = false;
215✔
344
      return TMQ_CONF_OK;
215✔
345
    } else {
346
      tqErrorC("invalid value for enable.auto.commit: %s", value);
×
347
      return TMQ_CONF_INVALID;
×
348
    }
349
  }
350

351
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
2,071✔
352
    int64_t tmp;
353
    code = taosStr2int64(value, &tmp);
250✔
354
    if (tmp < 0 || code != 0) {
250!
355
      tqErrorC("invalid value for auto.commit.interval.ms: %s", value);
×
356
      return TMQ_CONF_INVALID;
×
357
    }
358
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
250✔
359
    return TMQ_CONF_OK;
250✔
360
  }
361

362
  if (strcasecmp(key, "session.timeout.ms") == 0) {
1,821!
363
    int64_t tmp;
UNCOV
364
    code = taosStr2int64(value, &tmp);
×
UNCOV
365
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
×
366
      tqErrorC("invalid value for session.timeout.ms: %s", value);
×
367
      return TMQ_CONF_INVALID;
×
368
    }
UNCOV
369
    conf->sessionTimeoutMs = tmp;
×
UNCOV
370
    return TMQ_CONF_OK;
×
371
  }
372

373
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
1,821!
374
    int64_t tmp;
375
    code = taosStr2int64(value, &tmp);
×
376
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
×
377
      tqErrorC("invalid value for heartbeat.interval.ms: %s", value);
×
378
      return TMQ_CONF_INVALID;
×
379
    }
380
    conf->heartBeatIntervalMs = tmp;
×
381
    return TMQ_CONF_OK;
×
382
  }
383

384
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
1,821!
385
    int32_t tmp;
UNCOV
386
    code = taosStr2int32(value, &tmp);
×
UNCOV
387
    if (tmp < 1000 || code != 0) {
×
388
      tqErrorC("invalid value for max.poll.interval.ms: %s", value);
×
389
      return TMQ_CONF_INVALID;
×
390
    }
UNCOV
391
    conf->maxPollIntervalMs = tmp;
×
UNCOV
392
    return TMQ_CONF_OK;
×
393
  }
394

395
  if (strcasecmp(key, "auto.offset.reset") == 0) {
1,821✔
396
    if (strcasecmp(value, "none") == 0) {
412✔
397
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
11✔
398
      return TMQ_CONF_OK;
11✔
399
    } else if (strcasecmp(value, "earliest") == 0) {
401✔
400
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
379✔
401
      return TMQ_CONF_OK;
379✔
402
    } else if (strcasecmp(value, "latest") == 0) {
22!
403
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
22✔
404
      return TMQ_CONF_OK;
22✔
405
    } else {
406
      tqErrorC("invalid value for auto.offset.reset: %s", value);
×
407
      return TMQ_CONF_INVALID;
×
408
    }
409
  }
410

411
  if (strcasecmp(key, "msg.with.table.name") == 0) {
1,409✔
412
    if (strcasecmp(value, "true") == 0) {
390✔
413
      conf->withTbName = true;
370✔
414
      return TMQ_CONF_OK;
370✔
415
    } else if (strcasecmp(value, "false") == 0) {
20!
416
      conf->withTbName = false;
20✔
417
      return TMQ_CONF_OK;
20✔
418
    } else {
419
      tqErrorC("invalid value for msg.with.table.name: %s", value);
×
420
      return TMQ_CONF_INVALID;
×
421
    }
422
  }
423

424
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
1,019✔
425
    if (strcasecmp(value, "true") == 0) {
145✔
426
      conf->snapEnable = true;
114✔
427
      return TMQ_CONF_OK;
114✔
428
    } else if (strcasecmp(value, "false") == 0) {
31!
429
      conf->snapEnable = false;
31✔
430
      return TMQ_CONF_OK;
31✔
431
    } else {
432
      tqErrorC("invalid value for experimental.snapshot.enable: %s", value);
×
433
      return TMQ_CONF_INVALID;
×
434
    }
435
  }
436

437
  if (strcasecmp(key, "td.connect.ip") == 0) {
874!
UNCOV
438
    void *tmp = taosStrdup(value);
×
UNCOV
439
    if (tmp == NULL) {
×
440
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
441
      return TMQ_CONF_INVALID;
×
442
    }
UNCOV
443
    conf->ip = tmp;
×
UNCOV
444
    return TMQ_CONF_OK;
×
445
  }
446

447
  if (strcasecmp(key, "td.connect.user") == 0) {
874✔
448
    void *tmp = taosStrdup(value);
410!
449
    if (tmp == NULL) {
411!
450
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
451
      return TMQ_CONF_INVALID;
×
452
    }
453
    conf->user = tmp;
411✔
454
    return TMQ_CONF_OK;
411✔
455
  }
456

457
  if (strcasecmp(key, "td.connect.pass") == 0) {
464✔
458
    void *tmp = taosStrdup(value);
411!
459
    if (tmp == NULL) {
412!
460
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
461
      return TMQ_CONF_INVALID;
×
462
    }
463
    conf->pass = tmp;
412✔
464
    return TMQ_CONF_OK;
412✔
465
  }
466

467
  if (strcasecmp(key, "td.connect.port") == 0) {
53!
468
    int64_t tmp;
469
    code = taosStr2int64(value, &tmp);
×
470
    if (tmp <= 0 || tmp > 65535 || code != 0) {
×
471
      tqErrorC("invalid value for td.connect.port: %s", value);
×
472
      return TMQ_CONF_INVALID;
×
473
    }
474

475
    conf->port = tmp;
×
476
    return TMQ_CONF_OK;
×
477
  }
478

479
  if (strcasecmp(key, "enable.replay") == 0) {
53✔
480
    if (strcasecmp(value, "true") == 0) {
7!
481
      conf->replayEnable = true;
7✔
482
      return TMQ_CONF_OK;
7✔
483
    } else if (strcasecmp(value, "false") == 0) {
×
484
      conf->replayEnable = false;
×
485
      return TMQ_CONF_OK;
×
486
    } else {
487
      tqErrorC("invalid value for enable.replay: %s", value);
×
488
      return TMQ_CONF_INVALID;
×
489
    }
490
  }
491
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
46✔
492
    int64_t tmp;
493
    code = taosStr2int64(value, &tmp);
40✔
494
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
40!
495
    return TMQ_CONF_OK;
40✔
496
  }
497

498
  if (strcasecmp(key, "td.connect.db") == 0) {
6!
499
    return TMQ_CONF_OK;
×
500
  }
501

502
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
6✔
503
    int64_t tmp;
504
    code = taosStr2int64(value, &tmp);
4✔
505
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
4!
506
    return TMQ_CONF_OK;
4✔
507
  }
508

509
  tqErrorC("unknown key: %s", key);
2!
510
  return TMQ_CONF_UNKNOWN;
×
511
}
512

513
tmq_list_t* tmq_list_new() {
1,187✔
514
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
1,187✔
515
}
516

517
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
483✔
518
  if (list == NULL) {
483!
519
    return TSDB_CODE_INVALID_PARA;
×
520
  }
521
  SArray* container = &list->container;
483✔
522
  if (src == NULL || src[0] == 0) {
483!
523
    return TSDB_CODE_INVALID_PARA;
×
524
  }
525
  char* topic = taosStrdup(src);
483!
526
  if (topic == NULL) return terrno;
483!
527
  if (taosArrayPush(container, &topic) == NULL) {
483!
528
    taosMemoryFree(topic);
×
529
    return terrno;
×
530
  }
531
  return 0;
483✔
532
}
533

534
void tmq_list_destroy(tmq_list_t* list) {
1,187✔
535
  if (list == NULL) return;
1,187!
536
  SArray* container = &list->container;
1,187✔
537
  taosArrayDestroyP(container, NULL);
1,187✔
538
}
539

540
int32_t tmq_list_get_size(const tmq_list_t* list) {
7✔
541
  if (list == NULL) {
7!
542
    return TSDB_CODE_INVALID_PARA;
×
543
  }
544
  const SArray* container = &list->container;
7✔
545
  return taosArrayGetSize(container);
7✔
546
}
547

548
char** tmq_list_to_c_array(const tmq_list_t* list) {
7✔
549
  if (list == NULL) {
7!
550
    return NULL;
×
551
  }
552
  const SArray* container = &list->container;
7✔
553
  return container->pData;
7✔
554
}
555

556
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
10,122✔
557
  int64_t refId = pParamSet->refId;
10,122✔
558
  int32_t code = 0;
10,122✔
559
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
10,122✔
560
  if (tmq == NULL) {
10,122!
561
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
562
  }
563

564
  // if no more waiting rsp
565
  if (pParamSet->callbackFn != NULL) {
10,122✔
566
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
10,109✔
567
  }
568

569
  taosMemoryFree(pParamSet);
10,122!
570
  if (tmq != NULL) {
10,122!
571
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
10,122✔
572
  }
573

574
  return code;
10,122✔
575
}
576

577
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
13,642✔
578
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
13,642✔
579
  if (waitingRspNum == 0) {
13,642✔
580
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
10,122!
581
             vgId);
582
    return tmqCommitDone(pParamSet);
10,122✔
583
  } else {
584
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
3,520!
585
             waitingRspNum);
586
  }
587
  return 0;
3,520✔
588
}
589

590
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
3,553✔
591
  if (pBuf){
3,553!
592
    taosMemoryFreeClear(pBuf->pData);
3,553!
593
    taosMemoryFreeClear(pBuf->pEpSet);
3,553!
594
  }
595
  if(param == NULL){
3,553!
596
    return TSDB_CODE_INVALID_PARA;
×
597
  }
598
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
3,553✔
599
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
3,553✔
600

601
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
3,553✔
602
}
603

604
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
3,553✔
605
                               SMqCommitCbParamSet* pParamSet) {
606
  SMqVgOffset pOffset = {0};
3,553✔
607

608
  pOffset.consumerId = tmq->consumerId;
3,553✔
609
  pOffset.offset.val = *offset;
3,553✔
610
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
3,553✔
611
  int32_t len = 0;
3,553✔
612
  int32_t code = 0;
3,553✔
613
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
3,553!
614
  if (code < 0) {
3,553!
615
    return TSDB_CODE_INVALID_PARA;
×
616
  }
617

618
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
3,553!
619
  if (buf == NULL) {
3,553!
620
    return terrno;
×
621
  }
622

623
  ((SMsgHead*)buf)->vgId = htonl(vgId);
3,553✔
624

625
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
3,553✔
626

627
  SEncoder encoder = {0};
3,553✔
628
  tEncoderInit(&encoder, abuf, len);
3,553✔
629
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
3,553!
630
    tEncoderClear(&encoder);
×
631
    taosMemoryFree(buf);
×
632
    return TSDB_CODE_INVALID_PARA;
×
633
  }
634
  tEncoderClear(&encoder);
3,553✔
635

636
  // build param
637
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
3,553!
638
  if (pParam == NULL) {
3,553!
639
    taosMemoryFree(buf);
×
640
    return terrno;
×
641
  }
642

643
  pParam->params = pParamSet;
3,553✔
644
  pParam->vgId = vgId;
3,553✔
645
  pParam->consumerId = tmq->consumerId;
3,553✔
646

647
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
3,553✔
648

649
  // build send info
650
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,553!
651
  if (pMsgSendInfo == NULL) {
3,553!
652
    taosMemoryFree(buf);
×
653
    taosMemoryFree(pParam);
×
654
    return terrno;
×
655
  }
656

657
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
3,553✔
658

659
  pMsgSendInfo->requestId = generateRequestId();
3,553✔
660
  pMsgSendInfo->requestObjRefId = 0;
3,553✔
661
  pMsgSendInfo->param = pParam;
3,553✔
662
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
3,553✔
663
  pMsgSendInfo->fp = tmqCommitCb;
3,553✔
664
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
3,553✔
665

666
  // int64_t transporterId = 0;
667
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
3,553✔
668
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
3,553✔
669
  if (code != 0) {
3,553!
670
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
671
  }
672
  return code;
3,553✔
673
}
674

675
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
197✔
676
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
197✔
677
  for (int32_t i = 0; i < numOfTopics; ++i) {
198✔
678
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
160✔
679
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
160!
680
      continue;
1✔
681
    }
682
    *topic = pTopic;
159✔
683
    return 0;
159✔
684
  }
685

686
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
38!
687
  return TSDB_CODE_TMQ_INVALID_TOPIC;
38✔
688
}
689

690
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
10,126✔
691
                                       SMqCommitCbParamSet** ppParamSet) {
692
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
10,126!
693
  if (pParamSet == NULL) {
10,126!
694
    return terrno;
×
695
  }
696

697
  pParamSet->refId = tmq->refId;
10,126✔
698
  pParamSet->epoch = tmq->epoch;
10,126✔
699
  pParamSet->callbackFn = pCommitFp;
10,126✔
700
  pParamSet->userParam = userParam;
10,126✔
701
  pParamSet->waitingRspNum = rspNum;
10,126✔
702
  *ppParamSet = pParamSet;
10,126✔
703
  return 0;
10,126✔
704
}
705

706
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
189✔
707
  SMqClientTopic* pTopic = NULL;
189✔
708
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
189✔
709
  if (code != 0) {
189✔
710
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
38!
711
    return code;
38✔
712
  }
713

714
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
151✔
715
  for (int32_t i = 0; i < numOfVgs; ++i) {
321✔
716
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
247✔
717
    if (pClientVg && pClientVg->vgId == vgId) {
247!
718
      *pVg = pClientVg;
77✔
719
      break;
77✔
720
    }
721
  }
722

723
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
151✔
724
}
725

726
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
17,950✔
727
  int32_t code = 0;
17,950✔
728
  if (offsetVal->type <= 0) {
17,950✔
729
    code = TSDB_CODE_TMQ_INVALID_MSG;
988✔
730
    return code;
988✔
731
  }
732
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
16,962✔
733
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
13,409✔
734
    return code;
13,409✔
735
  }
736
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
3,553✔
737
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
3,553✔
738

739
  char commitBuf[TSDB_OFFSET_LEN] = {0};
3,553✔
740
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
3,553✔
741

742
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
3,553✔
743
  if (code != TSDB_CODE_SUCCESS) {
3,553!
744
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
745
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
746
    return code;
×
747
  }
748

749
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
3,553!
750
          tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
751
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
3,553✔
752
  return code;
3,553✔
753
}
754

755
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
37✔
756
                                 tmq_commit_cb* pCommitFp, void* userParam) {
757
  tqInfoC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
37!
758
  SMqCommitCbParamSet* pParamSet = NULL;
37✔
759
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
37✔
760
  if (code != 0){
37!
761
    return code;
×
762
  }
763

764
  taosRLockLatch(&tmq->lock);
37✔
765
  SMqClientVg* pVg = NULL;
37✔
766
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
37✔
767
  if (code == 0) {
37!
768
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
37✔
769
  }
770
  taosRUnLockLatch(&tmq->lock);
37✔
771

772
  if (code != 0){
37✔
773
    taosMemoryFree(pParamSet);
4!
774
  }
775
  return code;
37✔
776
}
777

778
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
29✔
779
  char*        pTopicName = NULL;
29✔
780
  int32_t      vgId = 0;
29✔
781
  STqOffsetVal offsetVal = {0};
29✔
782
  int32_t      code = 0;
29✔
783

784
  if (pRes == NULL || tmq == NULL) {
29!
785
    code = TSDB_CODE_INVALID_PARA;
×
786
    goto end;
×
787
  }
788

789
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_META(pRes) ||
29!
790
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
791
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
29✔
792
    pTopicName = pRspObj->topic;
29✔
793
    vgId = pRspObj->vgId;
29✔
794
    offsetVal = pRspObj->rspOffset;
29✔
795
  } else {
796
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
797
    goto end;
×
798
  }
799

800
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
29✔
801

802
end:
29✔
803
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
29!
804
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
805
    pCommitFp(tmq, code, userParam);
×
806
  }
807
}
29✔
808

809
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
10,089✔
810
  int32_t code = 0;
10,089✔
811
  taosRLockLatch(&tmq->lock);
10,089✔
812
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
10,089✔
813
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
10,089!
814

815
  for (int32_t i = 0; i < numOfTopics; i++) {
20,099✔
816
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
10,010✔
817
    if (pTopic == NULL) {
10,010!
818
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
819
      goto END;
×
820
    }
821
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
10,010✔
822
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
10,010!
823
    for (int32_t j = 0; j < numOfVgroups; j++) {
27,923✔
824
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
17,913✔
825
      if (pVg == NULL) {
17,913!
826
        code = terrno;
×
827
        goto END;
×
828
      }
829

830
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
17,913✔
831
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
17,913✔
832
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
988!
833
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
834
      }
835
    }
836
  }
837
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
10,089!
838
           numOfTopics);
839
END:
362✔
840
  taosRUnLockLatch(&tmq->lock);
10,089✔
841
  return code;
10,089✔
842
}
843

844
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
10,089✔
845
  int32_t code = 0;
10,089✔
846
  SMqCommitCbParamSet* pParamSet = NULL;
10,089✔
847
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
848
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
10,089✔
849
  if (code != 0) {
10,089!
850
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
851
    if (pCommitFp != NULL) {
×
852
      pCommitFp(tmq, code, userParam);
×
853
    }
854
    return;
×
855
  }
856
  code = innerCommitAll(tmq, pParamSet);
10,089✔
857
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
10,089✔
858
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
162!
859
  }
860

861
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
10,089✔
862
  if (code != 0) {
10,089!
863
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
864
  }
865
  return;
10,089✔
866
}
867

868
static void generateTimedTask(int64_t refId, int32_t type) {
13,902✔
869
  tmq_t*  tmq = NULL;
13,902✔
870
  int8_t* pTaskType = NULL;
13,902✔
871
  int32_t code = 0;
13,902✔
872

873
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
13,902✔
874
  if (tmq == NULL) return;
13,902!
875

876
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
13,902✔
877
  if (code == TSDB_CODE_SUCCESS) {
13,902!
878
    *pTaskType = type;
13,902✔
879
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
13,902!
880
      if (tsem2_post(&tmq->rspSem) != 0){
13,902!
881
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
882
      }
883
    }else{
884
      taosFreeQitem(pTaskType);
×
885
    }
886
  }
887

888
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
13,902✔
889
  if (code != 0){
13,902!
890
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
891
  }
892
}
893

894
void tmqAssignAskEpTask(void* param, void* tmrId) {
4,328✔
895
  int64_t refId = (int64_t)param;
4,328✔
896
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
4,328✔
897
}
4,328✔
898

899
void tmqReplayTask(void* param, void* tmrId) {
12✔
900
  int64_t refId = (int64_t)param;
12✔
901
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
12✔
902
  if (tmq == NULL) return;
12!
903

904
  if (tsem2_post(&tmq->rspSem) != 0){
12!
905
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
906
  }
907
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
12✔
908
  if (code != 0){
12!
909
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
910
  }
911
}
912

913
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
9,574✔
914
  int64_t refId = (int64_t)param;
9,574✔
915
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
9,574✔
916
}
9,574✔
917

918
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
2,355✔
919
  if (pMsg == NULL) {
2,355!
920
    return TSDB_CODE_INVALID_PARA;
×
921
  }
922

923
  if (param == NULL || code != 0){
2,355!
924
    goto END;
11✔
925
  }
926

927
  SMqHbRsp rsp = {0};
2,344✔
928
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
2,344✔
929
  if (code != 0) {
2,344!
930
    goto END;
×
931
  }
932

933
  int64_t refId = (int64_t)param;
2,344✔
934
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,344✔
935
  if (tmq != NULL) {
2,344!
936
    taosWLockLatch(&tmq->lock);
2,344✔
937
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
4,490✔
938
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
2,146✔
939
      if (privilege && privilege->noPrivilege == 1) {
2,146!
940
        int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
×
941
        for (int32_t j = 0; j < topicNumCur; j++) {
×
942
          SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
×
943
          if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
×
944
            tqInfoC("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
×
945
            pTopicCur->noPrivilege = 1;
×
946
          }
947
        }
948
      }
949
    }
950
    taosWUnLockLatch(&tmq->lock);
2,344✔
951
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
2,344✔
952
    if (code != 0){
2,344!
953
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
954
    }
955
  }
956

957
  tqClientDebugFlag = rsp.debugFlag;
2,344✔
958

959
  tDestroySMqHbRsp(&rsp);
2,344✔
960

961
END:
2,355✔
962
  taosMemoryFree(pMsg->pData);
2,355!
963
  taosMemoryFree(pMsg->pEpSet);
2,355!
964
  return code;
2,355✔
965
}
966

967
void tmqSendHbReq(void* param, void* tmrId) {
2,355✔
968
  int64_t refId = (int64_t)param;
2,355✔
969

970
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,355✔
971
  if (tmq == NULL) {
2,355!
972
    return;
×
973
  }
974

975
  SMqHbReq req = {0};
2,355✔
976
  req.consumerId = tmq->consumerId;
2,355✔
977
  req.epoch = tmq->epoch;
2,355✔
978
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
2,355✔
979
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
2,355!
980
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
2,355✔
981
  if (req.topics == NULL) {
2,355!
982
    goto END;
×
983
  }
984
  taosRLockLatch(&tmq->lock);
2,355✔
985
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
4,540✔
986
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
2,185✔
987
    if (pTopic == NULL) {
2,185!
988
      continue;
×
989
    }
990
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
2,185✔
991
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
2,185✔
992
    if (data == NULL) {
2,185!
993
      continue;
×
994
    }
995
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
2,185✔
996
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
2,185✔
997
    if (data->offsetRows == NULL) {
2,185!
998
      continue;
×
999
    }
1000
    for (int j = 0; j < numOfVgroups; j++) {
7,348✔
1001
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
5,163✔
1002
      if (pVg == NULL) {
5,163!
1003
        continue;
×
1004
      }
1005
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
5,163✔
1006
      if (offRows == NULL) {
5,163!
1007
        continue;
×
1008
      }
1009
      offRows->vgId = pVg->vgId;
5,163✔
1010
      offRows->rows = pVg->numOfRows;
5,163✔
1011
      offRows->offset = pVg->offsetInfo.endOffset;
5,163✔
1012
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
5,163✔
1013
      char buf[TSDB_OFFSET_LEN] = {0};
5,163✔
1014
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
5,163✔
1015
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
5,163!
1016
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1017
    }
1018
  }
1019
  taosRUnLockLatch(&tmq->lock);
2,355✔
1020

1021
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
2,355✔
1022
  if (tlen < 0) {
2,355!
1023
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1024
    goto END;
×
1025
  }
1026

1027
  void* pReq = taosMemoryCalloc(1, tlen);
2,355!
1028
  if (pReq == NULL) {
2,355!
1029
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1030
    goto END;
×
1031
  }
1032

1033
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
2,355!
1034
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1035
    taosMemoryFree(pReq);
×
1036
    goto END;
×
1037
  }
1038

1039
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,355!
1040
  if (sendInfo == NULL) {
2,355!
1041
    taosMemoryFree(pReq);
×
1042
    goto END;
×
1043
  }
1044

1045
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2,355✔
1046

1047
  sendInfo->requestId = generateRequestId();
2,355✔
1048
  sendInfo->requestObjRefId = 0;
2,355✔
1049
  sendInfo->param = (void*)refId;
2,355✔
1050
  sendInfo->fp = tmqHbCb;
2,355✔
1051
  sendInfo->msgType = TDMT_MND_TMQ_HB;
2,355✔
1052

1053
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
2,355✔
1054

1055
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
2,355✔
1056
  if (code != 0) {
2,355!
1057
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1058
  }
1059
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
2,355✔
1060

1061
END:
2,355✔
1062
  tDestroySMqHbReq(&req);
2,355✔
1063
  if (tmrId != NULL) {
2,355✔
1064
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
1,597✔
1065
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
1,597!
1066
  }
1067
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
2,355✔
1068
  if (ret != 0){
2,355!
1069
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1070
  }
1071
}
1072

1073
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
577✔
1074
  if (code != 0) {
577!
1075
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1076
  }
1077
}
577✔
1078

1079
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
52,777✔
1080
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
52,777✔
1081
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
6,586✔
1082
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
46,191✔
1083
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
45,971!
1084
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
220✔
1085
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
13!
1086
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
207✔
1087
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
188!
1088
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
19!
1089
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
19!
1090
  }
1091
}
52,777✔
1092

1093
static void freeClientVg(void* param) {
4,010✔
1094
  SMqClientVg* pVg = param;
4,010✔
1095
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
4,010✔
1096
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
4,010✔
1097
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
4,010✔
1098
}
4,010✔
1099
static void freeClientTopic(void* param) {
3,203✔
1100
  SMqClientTopic* pTopic = param;
3,203✔
1101
  taosMemoryFreeClear(pTopic->schema.pSchema);
3,203!
1102
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
3,203✔
1103
}
3,203✔
1104

1105
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
3,204✔
1106
                                   tmq_t* tmq) {
1107
  pTopic->schema = pTopicEp->schema;
3,204✔
1108
  pTopicEp->schema.nCols = 0;
3,204✔
1109
  pTopicEp->schema.pSchema = NULL;
3,204✔
1110

1111
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
3,204✔
1112
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
3,204✔
1113

1114
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
3,204✔
1115
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
3,204✔
1116

1117
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
3,204!
1118
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
3,204✔
1119
  if (pTopic->vgs == NULL) {
3,204!
1120
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1121
    return;
×
1122
  }
1123
  for (int32_t j = 0; j < vgNumGet; j++) {
7,215✔
1124
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
4,011✔
1125
    if (pVgEp == NULL) {
4,011!
1126
      continue;
×
1127
    }
1128
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
4,011✔
1129
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
4,011✔
1130

1131
    STqOffsetVal offsetNew = {0};
4,011✔
1132
    offsetNew.type = tmq->resetOffsetCfg;
4,011✔
1133

1134
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
4,011!
1135
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1136

1137
    SMqClientVg clientVg = {
12,033✔
1138
        .pollCnt = 0,
1139
        .vgId = pVgEp->vgId,
4,011✔
1140
        .epSet = pVgEp->epSet,
1141
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
4,011✔
1142
        .vgSkipCnt = 0,
1143
        .emptyBlockReceiveTs = 0,
1144
        .blockReceiveTs = 0,
1145
        .blockSleepForReplay = 0,
1146
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
4,011✔
1147
    };
1148

1149
    clientVg.offsetInfo.walVerBegin = -1;
4,011✔
1150
    clientVg.offsetInfo.walVerEnd = -1;
4,011✔
1151
    clientVg.seekUpdated = false;
4,011✔
1152
    if (pInfo) {
4,011✔
1153
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
2,772✔
1154
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
2,772✔
1155
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
2,772✔
1156
    } else {
1157
      clientVg.offsetInfo.endOffset = offsetNew;
1,239✔
1158
      clientVg.offsetInfo.committedOffset = offsetNew;
1,239✔
1159
      clientVg.offsetInfo.beginOffset = offsetNew;
1,239✔
1160
    }
1161
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
8,022!
1162
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1163
               pTopic->topicName);
1164
      freeClientVg(&clientVg);
×
1165
    }
1166
  }
1167
}
1168

1169
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
3,119✔
1170
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
3,119✔
1171
  if (pVgOffsetHashMap == NULL) {
3,119!
1172
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1173
    return;
×
1174
  }
1175

1176
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
3,119✔
1177
  for (int32_t i = 0; i < topicNumCur; i++) {
5,855✔
1178
    // find old topic
1179
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
2,736✔
1180
    if (pTopicCur && pTopicCur->vgs) {
2,736!
1181
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
2,736✔
1182
      tqInfoC("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
2,736!
1183
      for (int32_t j = 0; j < vgNumCur; j++) {
5,523✔
1184
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
2,787✔
1185
        if (pVgCur == NULL) {
2,787!
1186
          continue;
×
1187
        }
1188
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
2,787✔
1189
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
2,787✔
1190

1191
        char buf[TSDB_OFFSET_LEN] = {0};
2,787✔
1192
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
2,787✔
1193
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
2,787!
1194

1195
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
2,787✔
1196
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1197
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1198
            .numOfRows = pVgCur->numOfRows,
2,787✔
1199
            .vgStatus = pVgCur->vgStatus};
2,787✔
1200
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
2,787!
1201
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1202
        }
1203
      }
1204
    }
1205
  }
1206

1207
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
6,323✔
1208
    SMqClientTopic topic = {0};
3,204✔
1209
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
3,204✔
1210
    if (pTopicEp == NULL) {
3,204!
1211
      continue;
×
1212
    }
1213
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
3,204✔
1214
    if (taosArrayPush(newTopics, &topic) == NULL) {
3,204!
1215
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1216
      freeClientTopic(&topic);
×
1217
    }
1218
  }
1219

1220
  taosHashCleanup(pVgOffsetHashMap);
3,119✔
1221
}
1222

1223
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
7,762✔
1224
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
7,762✔
1225
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1226
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1227
  if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
7,762✔
1228
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
4,234!
1229
             tmq->epoch, epoch, topicNumGet);
1230
    return;
4,234✔
1231
  }
1232

1233
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
3,528✔
1234
  if (newTopics == NULL) {
3,528!
1235
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1236
    return;
×
1237
  }
1238
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
3,528!
1239
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1240

1241
  taosWLockLatch(&tmq->lock);
3,528✔
1242
  if (topicNumGet > 0){
3,528✔
1243
    buildNewTopicList(tmq, newTopics, pRsp);
3,119✔
1244
  }
1245
  // destroy current buffered existed topics info
1246
  if (tmq->clientTopics) {
3,528!
1247
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
3,528✔
1248
  }
1249
  tmq->clientTopics = newTopics;
3,528✔
1250
  taosWUnLockLatch(&tmq->lock);
3,528✔
1251

1252
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
3,528✔
1253
  atomic_store_32(&tmq->epoch, epoch);
3,528✔
1254

1255
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
3,528!
1256
}
1257

1258
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
10,107✔
1259
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
10,107✔
1260
  if (pParam == NULL) {
10,107!
1261
    goto FAIL;
×
1262
  }
1263

1264
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
10,107✔
1265
  if (tmq == NULL) {
10,107!
1266
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1267
    goto FAIL;
×
1268
  }
1269

1270
  if (code != TSDB_CODE_SUCCESS) {
10,107✔
1271
    if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
2,334✔
1272
      tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
22!
1273
    }
1274
    goto END;
2,334✔
1275
  }
1276

1277
  if (pMsg == NULL) {
7,773!
1278
    goto END;
×
1279
  }
1280
  SMqRspHead* head = pMsg->pData;
7,773✔
1281
  int32_t     epoch = atomic_load_32(&tmq->epoch);
7,773✔
1282
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
7,773!
1283
  if (pParam->sync) {
7,773✔
1284
    SMqAskEpRsp rsp = {0};
1,187✔
1285
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
2,374!
1286
      doUpdateLocalEp(tmq, head->epoch, &rsp);
1,187✔
1287
    }
1288
    tDeleteSMqAskEpRsp(&rsp);
1289
  } else {
1290
    SMqRspWrapper* pWrapper = NULL;
6,586✔
1291
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
6,586✔
1292
    if (code) {
6,586!
1293
      goto END;
×
1294
    }
1295

1296
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
6,586✔
1297
    pWrapper->epoch = head->epoch;
6,586✔
1298
    (void)memcpy(&pWrapper->epRsp, pMsg->pData, sizeof(SMqRspHead));
6,586✔
1299
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->epRsp) == NULL) {
13,172!
1300
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1301
      taosFreeQitem(pWrapper);
×
1302
    } else {
1303
      code = taosWriteQitem(tmq->mqueue, pWrapper);
6,586✔
1304
      if (code != 0) {
6,586!
1305
        tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1306
        taosFreeQitem(pWrapper);
×
1307
        tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1308
      }
1309
    }
1310
  }
1311

1312
  END:
10,107✔
1313
  {
1314
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
10,107✔
1315
    if (ret != 0){
10,107!
1316
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1317
    }
1318
  }
1319

1320
  FAIL:
10,107✔
1321
  if (pParam && pParam->sync) {
10,107!
1322
    SAskEpInfo* pInfo = pParam->pParam;
3,505✔
1323
    if (pInfo) {
3,505!
1324
      pInfo->code = code;
3,505✔
1325
      if (tsem2_post(&pInfo->sem) != 0){
3,505!
1326
        tqErrorC("failed to post rsp sem askep cb");
×
1327
      }
1328
    }
1329
  }
1330

1331
  if (pMsg) {
10,107!
1332
    taosMemoryFree(pMsg->pEpSet);
10,107!
1333
    taosMemoryFree(pMsg->pData);
10,107!
1334
  }
1335

1336
  return code;
10,107✔
1337
}
1338

1339
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
10,107✔
1340
  SMqAskEpReq req = {0};
10,107✔
1341
  req.consumerId = pTmq->consumerId;
10,107✔
1342
  req.epoch = updateEpSet ? -1 : pTmq->epoch;
10,107✔
1343
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
10,107✔
1344
  int              code = 0;
10,107✔
1345
  SMqAskEpCbParam* pParam = NULL;
10,107✔
1346
  void*            pReq = NULL;
10,107✔
1347

1348
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
10,107✔
1349
  if (tlen < 0) {
10,107!
1350
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
×
1351
    return TSDB_CODE_INVALID_PARA;
×
1352
  }
1353

1354
  pReq = taosMemoryCalloc(1, tlen);
10,107!
1355
  if (pReq == NULL) {
10,107!
1356
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
×
1357
    return terrno;
×
1358
  }
1359

1360
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
10,107!
1361
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
×
1362
    taosMemoryFree(pReq);
×
1363
    return TSDB_CODE_INVALID_PARA;
×
1364
  }
1365

1366
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
10,107!
1367
  if (pParam == NULL) {
10,107!
1368
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
×
1369
    taosMemoryFree(pReq);
×
1370
    return terrno;
×
1371
  }
1372

1373
  pParam->refId = pTmq->refId;
10,107✔
1374
  pParam->sync = sync;
10,107✔
1375
  pParam->pParam = param;
10,107✔
1376

1377
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
10,107!
1378
  if (sendInfo == NULL) {
10,107!
1379
    taosMemoryFree(pReq);
×
1380
    taosMemoryFree(pParam);
×
1381
    return terrno;
×
1382
  }
1383

1384
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
10,107✔
1385
  sendInfo->requestId = generateRequestId();
10,107✔
1386
  sendInfo->requestObjRefId = 0;
10,107✔
1387
  sendInfo->param = pParam;
10,107✔
1388
  sendInfo->paramFreeFp = taosAutoMemoryFree;
10,107✔
1389
  sendInfo->fp = askEpCb;
10,107✔
1390
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
10,107✔
1391

1392
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
10,107✔
1393
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
10,107!
1394
  return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
10,107✔
1395
}
1396

1397
void tmqHandleAllDelayedTask(tmq_t* pTmq) {
92,414✔
1398
  STaosQall* qall = NULL;
92,414✔
1399
  int32_t    code = 0;
92,414✔
1400

1401
  code = taosAllocateQall(&qall);
92,414✔
1402
  if (code) {
92,421!
1403
    tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
×
1404
    return;
79,293✔
1405
  }
1406

1407
  int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
92,421✔
1408
  if (numOfItems == 0) {
92,420✔
1409
    taosFreeQall(qall);
79,293✔
1410
    return;
79,293✔
1411
  }
1412

1413
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
13,127!
1414
  int8_t* pTaskType = NULL;
13,128✔
1415
  while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
26,523✔
1416
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
13,395✔
1417
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
3,950!
1418
      code = askEp(pTmq, NULL, false, false);
3,950✔
1419
      if (code != 0) {
3,950!
1420
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1421
        continue;
×
1422
      }
1423
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
3,950!
1424
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
3,950✔
1425
                              &pTmq->epTimer);
1426
      tqDebugC("reset timer for tmq ask ep:%d", ret);
3,950!
1427
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
9,445!
1428
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
9,445✔
1429
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
9,445✔
1430
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
9,445!
1431
               pTmq->autoCommitInterval / 1000.0);
1432
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
9,445✔
1433
                              &pTmq->commitTimer);
1434
      tqDebugC("reset timer for commit:%d", ret);
9,445!
1435
    } else {
1436
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1437
    }
1438

1439
    taosFreeQitem(pTaskType);
13,395✔
1440
  }
1441

1442
  taosFreeQall(qall);
13,128✔
1443
}
1444

1445
void tmqClearUnhandleMsg(tmq_t* tmq) {
411✔
1446
  SMqRspWrapper* rspWrapper = NULL;
411✔
1447
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
417✔
1448
    tmqFreeRspWrapper(rspWrapper);
6✔
1449
    taosFreeQitem(rspWrapper);
6✔
1450
  }
1451

1452
  rspWrapper = NULL;
411✔
1453
  if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
411✔
1454
    return;
82✔
1455
  }
1456
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
1,277✔
1457
    tmqFreeRspWrapper(rspWrapper);
948✔
1458
    taosFreeQitem(rspWrapper);
948✔
1459
  }
1460
}
1461

1462
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
1,197✔
1463
  if (pMsg) {
1,197!
1464
    taosMemoryFreeClear(pMsg->pEpSet);
1,197!
1465
  }
1466

1467
  if (param == NULL) {
1,197!
1468
    return code;
×
1469
  }
1470

1471
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
1,197✔
1472
  pParam->rspErr = code;
1,197✔
1473

1474
  if (tsem2_post(&pParam->rspSem) != 0){
1,197!
1475
    tqErrorC("failed to post sem, subscribe cb");
×
1476
  }
1477
  return 0;
1,197✔
1478
}
1479

1480
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
17✔
1481
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
17!
1482
  if (*topics == NULL) {
17✔
1483
    *topics = tmq_list_new();
10✔
1484
    if (*topics == NULL) {
10!
1485
      return terrno;
×
1486
    }
1487
  }
1488
  taosRLockLatch(&tmq->lock);
17✔
1489
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
29✔
1490
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
12✔
1491
    if (topic == NULL) {
12!
1492
      tqErrorC("topic is null");
×
1493
      continue;
×
1494
    }
1495
    char* tmp = strchr(topic->topicName, '.');
12✔
1496
    if (tmp == NULL) {
12!
1497
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1498
      continue;
×
1499
    }
1500
    if (tmq_list_append(*topics, tmp + 1) != 0) {
12!
1501
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1502
      continue;
×
1503
    }
1504
  }
1505
  taosRUnLockLatch(&tmq->lock);
17✔
1506
  return 0;
17✔
1507
}
1508

1509
void tmqFreeImpl(void* handle) {
411✔
1510
  tmq_t*  tmq = (tmq_t*)handle;
411✔
1511
  int64_t id = tmq->consumerId;
411✔
1512

1513
  if (tmq->mqueue) {
411!
1514
    tmqClearUnhandleMsg(tmq);
411✔
1515
    taosCloseQueue(tmq->mqueue);
411✔
1516
  }
1517

1518
  if (tmq->delayedTask) {
411!
1519
    taosCloseQueue(tmq->delayedTask);
411✔
1520
  }
1521

1522
  taosFreeQall(tmq->qall);
411✔
1523
  if(tsem2_destroy(&tmq->rspSem) != 0) {
411!
1524
    tqErrorC("failed to destroy sem in free tmq");
×
1525
  }
1526

1527
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
411✔
1528
  taos_close_internal(tmq->pTscObj);
411✔
1529

1530
  if (tmq->commitTimer) {
411✔
1531
    if (!taosTmrStopA(&tmq->commitTimer)) {
188✔
1532
      tqErrorC("failed to stop commit timer");
129!
1533
    }
1534
  }
1535
  if (tmq->epTimer) {
411✔
1536
    if (!taosTmrStopA(&tmq->epTimer)) {
406✔
1537
      tqErrorC("failed to stop ep timer");
377!
1538
    }
1539
  }
1540
  if (tmq->hbLiveTimer) {
411✔
1541
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
410!
1542
      tqErrorC("failed to stop hb timer");
×
1543
    }
1544
  }
1545
  taosMemoryFree(tmq);
411!
1546

1547
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
411!
1548
}
411✔
1549

1550
static void tmqMgmtInit(void) {
298✔
1551
  tmqInitRes = 0;
298✔
1552
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
298✔
1553

1554
  if (tmqMgmt.timer == NULL) {
298!
1555
    tmqInitRes = terrno;
×
1556
  }
1557

1558
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
298✔
1559
  if (tmqMgmt.rsetId < 0) {
298!
1560
    tmqInitRes = terrno;
×
1561
  }
1562
}
298✔
1563

1564
void tmqMgmtClose(void) {
1,975✔
1565
  if (tmqMgmt.timer) {
1,975✔
1566
    taosTmrCleanUp(tmqMgmt.timer);
298✔
1567
    tmqMgmt.timer = NULL;
298✔
1568
  }
1569

1570
  if (tmqMgmt.rsetId >= 0) {
1,975!
1571
    taosCloseRef(tmqMgmt.rsetId);
1,975✔
1572
    tmqMgmt.rsetId = -1;
1,975✔
1573
  }
1574
}
1,975✔
1575

1576
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
400✔
1577
  int32_t code = 0;
400✔
1578

1579
  if (conf == NULL) {
400!
1580
    SET_ERROR_MSG_TMQ("configure is null")
×
1581
    return NULL;
×
1582
  }
1583
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
400✔
1584
  if (code != 0) {
412!
1585
    SET_ERROR_MSG_TMQ("tmq init error")
×
1586
    return NULL;
×
1587
  }
1588
  if (tmqInitRes != 0) {
412!
1589
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1590
    return NULL;
×
1591
  }
1592

1593
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
412!
1594
  if (pTmq == NULL) {
412!
1595
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1596
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1597
    return NULL;
×
1598
  }
1599

1600
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
412!
1601
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
412!
1602

1603
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
412✔
1604
  if (pTmq->clientTopics == NULL) {
412!
1605
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1606
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1607
    goto _failed;
×
1608
  }
1609
  code = taosOpenQueue(&pTmq->mqueue);
412✔
1610
  if (code) {
412!
1611
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1612
             pTmq->groupId);
1613
    SET_ERROR_MSG_TMQ("open queue failed")
×
1614
    goto _failed;
×
1615
  }
1616

1617
  code = taosOpenQueue(&pTmq->delayedTask);
412✔
1618
  if (code) {
412!
1619
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1620
             pTmq->groupId);
1621
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1622
    goto _failed;
×
1623
  }
1624

1625
  code = taosAllocateQall(&pTmq->qall);
412✔
1626
  if (code) {
412!
1627
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1628
             pTmq->groupId);
1629
    SET_ERROR_MSG_TMQ("allocate qall failed")
×
1630
    goto _failed;
×
1631
  }
1632

1633
  if (conf->groupId[0] == 0) {
412!
1634
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1635
             pTmq->groupId);
1636
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1637
    goto _failed;
×
1638
  }
1639

1640
  // init status
1641
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
412✔
1642
  pTmq->pollCnt = 0;
412✔
1643
  pTmq->epoch = 0;
412✔
1644
  pTmq->pollFlag = 0;
412✔
1645

1646
  // set conf
1647
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
412✔
1648
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
412✔
1649
  pTmq->withTbName = conf->withTbName;
412✔
1650
  pTmq->useSnapshot = conf->snapEnable;
412✔
1651
  pTmq->autoCommit = conf->autoCommit;
412✔
1652
  pTmq->autoCommitInterval = conf->autoCommitInterval;
412✔
1653
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
412✔
1654
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
412✔
1655
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
412✔
1656
  pTmq->commitCb = conf->commitCb;
412✔
1657
  pTmq->commitCbUserParam = conf->commitCbUserParam;
412✔
1658
  pTmq->resetOffsetCfg = conf->resetOffset;
412✔
1659
  pTmq->replayEnable = conf->replayEnable;
412✔
1660
  pTmq->sourceExcluded = conf->sourceExcluded;
412✔
1661
  pTmq->enableBatchMeta = conf->enableBatchMeta;
412✔
1662
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
412✔
1663
  if (taosGetFqdn(pTmq->fqdn) != 0) {
412!
1664
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1665
  }
1666
  if (conf->replayEnable) {
411✔
1667
    pTmq->autoCommit = false;
7✔
1668
  }
1669
  taosInitRWLatch(&pTmq->lock);
411✔
1670

1671
  // assign consumerId
1672
  pTmq->consumerId = tGenIdPI64();
410✔
1673

1674
  // init semaphore
1675
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
412!
1676
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1677
             tstrerror(TAOS_SYSTEM_ERROR(errno)), pTmq->groupId);
1678
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1679
    goto _failed;
×
1680
  }
1681

1682
  // init connection
1683
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
407✔
1684
  if (code) {
412✔
1685
    terrno = code;
1✔
1686
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1!
1687
    SET_ERROR_MSG_TMQ("init tscObj failed")
1!
1688
    goto _failed;
1✔
1689
  }
1690

1691
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
411✔
1692
  if (pTmq->refId < 0) {
411!
1693
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1694
    goto _failed;
×
1695
  }
1696

1697
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
411✔
1698
  if (pTmq->hbLiveTimer == NULL) {
411!
1699
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1700
    goto _failed;
×
1701
  }
1702
  char         buf[TSDB_OFFSET_LEN] = {0};
411✔
1703
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
411✔
1704
  tFormatOffset(buf, tListLen(buf), &offset);
411✔
1705
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
410!
1706
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
1707
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1708
          buf);
1709

1710
  return pTmq;
411✔
1711

1712
_failed:
1✔
1713
  tmqFreeImpl(pTmq);
1✔
1714
  return NULL;
1✔
1715
}
1716

1717
static int32_t syncAskEp(tmq_t* pTmq) {
3,505✔
1718
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
3,505!
1719
  if (pInfo == NULL) return terrno;
3,505!
1720
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
3,505!
1721
    taosMemoryFree(pInfo);
×
1722
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1723
  }
1724

1725
  int32_t code = askEp(pTmq, pInfo, true, false);
3,505✔
1726
  if (code == 0) {
3,505!
1727
    if (tsem2_wait(&pInfo->sem) != 0){
3,505!
1728
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1729
    }
1730
    code = pInfo->code;
3,505✔
1731
  }
1732

1733
  if(tsem2_destroy(&pInfo->sem) != 0) {
3,505!
1734
    tqErrorC("failed to destroy sem sync ask ep");
×
1735
  }
1736
  taosMemoryFree(pInfo);
3,505!
1737
  return code;
3,505✔
1738
}
1739

1740
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1,198✔
1741
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
1,198!
1742
  const SArray*   container = &topic_list->container;
1,197✔
1743
  int32_t         sz = taosArrayGetSize(container);
1,197✔
1744
  void*           buf = NULL;
1,197✔
1745
  SMsgSendInfo*   sendInfo = NULL;
1,197✔
1746
  SCMSubscribeReq req = {0};
1,197✔
1747
  int32_t         code = 0;
1,197✔
1748

1749
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
1,197!
1750

1751
  req.consumerId = tmq->consumerId;
1,197✔
1752
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
1,197✔
1753
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1,197✔
1754
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
1,197✔
1755
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
1,197✔
1756

1757
  req.topicNames = taosArrayInit(sz, sizeof(void*));
1,197✔
1758
  if (req.topicNames == NULL) {
1,197!
1759
    code = terrno;
×
1760
    goto END;
×
1761
  }
1762

1763
  req.withTbName = tmq->withTbName;
1,197✔
1764
  req.autoCommit = tmq->autoCommit;
1,197✔
1765
  req.autoCommitInterval = tmq->autoCommitInterval;
1,197✔
1766
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
1,197✔
1767
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
1,197✔
1768
  req.resetOffsetCfg = tmq->resetOffsetCfg;
1,197✔
1769
  req.enableReplay = tmq->replayEnable;
1,197✔
1770
  req.enableBatchMeta = tmq->enableBatchMeta;
1,197✔
1771

1772
  for (int32_t i = 0; i < sz; i++) {
1,695✔
1773
    char* topic = taosArrayGetP(container, i);
498✔
1774
    if (topic == NULL) {
498!
1775
      code = terrno;
×
1776
      goto END;
×
1777
    }
1778
    SName name = {0};
498✔
1779
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
498✔
1780
    if (code) {
498!
1781
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1782
               code);
1783
      goto END;
×
1784
    }
1785
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
498!
1786
    if (topicFName == NULL) {
498!
1787
      code = terrno;
×
1788
      goto END;
×
1789
    }
1790

1791
    code = tNameExtractFullName(&name, topicFName);
498✔
1792
    if (code) {
498!
1793
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1794
               code);
1795
      taosMemoryFree(topicFName);
×
1796
      goto END;
×
1797
    }
1798

1799
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
996!
1800
      code = terrno;
×
1801
      taosMemoryFree(topicFName);
×
1802
      goto END;
×
1803
    }
1804
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
498!
1805
  }
1806

1807
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1,197✔
1808
  buf = taosMemoryMalloc(tlen);
1,197!
1809
  if (buf == NULL) {
1,197!
1810
    code = terrno;
×
1811
    goto END;
×
1812
  }
1813

1814
  void* abuf = buf;
1,197!
1815
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
1,197✔
1816

1817
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,197!
1818
  if (sendInfo == NULL) {
1,197!
1819
    code = terrno;
×
1820
    taosMemoryFree(buf);
×
1821
    goto END;
×
1822
  }
1823

1824
  SMqSubscribeCbParam param = {.rspErr = 0};
1,197✔
1825
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
1,197!
1826
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1827
    taosMemoryFree(buf);
×
1828
    taosMemoryFree(sendInfo);
×
1829
    goto END;
×
1830
  }
1831

1832
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1,197✔
1833
  sendInfo->requestId = generateRequestId();
1,197✔
1834
  sendInfo->requestObjRefId = 0;
1,197✔
1835
  sendInfo->param = &param;
1,197✔
1836
  sendInfo->fp = tmqSubscribeCb;
1,197✔
1837
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
1,197✔
1838

1839
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1,197✔
1840

1841
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
1,197✔
1842
  if (code != 0) {
1,197!
1843
    goto END;
×
1844
  }
1845

1846
  if (tsem2_wait(&param.rspSem) != 0){
1,197!
1847
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1848
  }
1849
  if(tsem2_destroy(&param.rspSem) != 0) {
1,197!
1850
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1851
  }
1852

1853
  if (param.rspErr != 0) {
1,197✔
1854
    code = param.rspErr;
4✔
1855
    goto END;
4✔
1856
  }
1857

1858
  int32_t retryCnt = 0;
1,193✔
1859
  while ((code = syncAskEp(tmq)) != 0) {
3,505✔
1860
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2,318!
1861
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
6!
1862
               tmq->consumerId, tstrerror(code));
1863
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
6!
1864
        code = 0;
6✔
1865
      }
1866
      goto END;
6✔
1867
    }
1868

1869
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
2,312!
1870
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
2,312✔
1871
  }
1872

1873
  if (tmq->epTimer == NULL){
1,187✔
1874
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
407✔
1875
    if (tmq->epTimer == NULL) {
407!
1876
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1877
      goto END;
×
1878
    }
1879
  }
1880
  if (tmq->autoCommit && tmq->commitTimer == NULL){
1,187✔
1881
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
188✔
1882
    if (tmq->commitTimer == NULL) {
188!
1883
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1884
      goto END;
×
1885
    }
1886
  }
1887

1888
END:
1,187✔
1889
  taosArrayDestroyP(req.topicNames, NULL);
1,197✔
1890
  return code;
1,197✔
1891
}
1892

1893
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
340✔
1894
  if (conf == NULL) return;
340!
1895
  conf->commitCb = cb;
340✔
1896
  conf->commitCbUserParam = param;
340✔
1897
}
1898

1899
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
45,248✔
1900
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
45,248✔
1901
  for (int i = 0; i < topicNumCur; i++) {
47,312✔
1902
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
47,300✔
1903
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
47,300!
1904
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
45,246✔
1905
      for (int32_t j = 0; j < vgNumCur; j++) {
91,326✔
1906
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
91,316✔
1907
        if (pVgCur && pVgCur->vgId == vgId) {
91,316!
1908
          *pVg = pVgCur;
45,236✔
1909
          return;
45,236✔
1910
        }
1911
      }
1912
    }
1913
  }
1914
}
1915

1916
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
41,375✔
1917
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
41,375✔
1918
  for (int i = 0; i < topicNumCur; i++) {
43,428!
1919
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
43,428✔
1920
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
43,428✔
1921
      return pTopicCur;
41,375✔
1922
    }
1923
  }
1924
  return NULL;
×
1925
}
1926

1927
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
46,187✔
1928
  tmq_t*             tmq = NULL;
46,187✔
1929
  SMqRspWrapper*     pRspWrapper = NULL;
46,187✔
1930
  int8_t             rspType = 0;
46,187✔
1931
  int32_t            vgId = 0;
46,187✔
1932
  uint64_t           requestId = 0;
46,187✔
1933
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
46,187✔
1934
  if (pMsg == NULL) {
46,187!
1935
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1936
  }
1937
  if (pParam == NULL) {
46,187!
1938
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1939
    goto EXIT;
×
1940
  }
1941
  int64_t refId = pParam->refId;
46,187✔
1942
  vgId = pParam->vgId;
46,187✔
1943
  requestId = pParam->requestId;
46,187✔
1944
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
46,187✔
1945
  if (tmq == NULL) {
46,191!
1946
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1947
    goto EXIT;
×
1948
  }
1949

1950
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
46,191✔
1951
  if (ret) {
46,183!
1952
    code = ret;
×
1953
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
×
1954
    goto END;
×
1955
  }
1956

1957
  if (code != 0) {
46,184✔
1958
    goto END;
3,945✔
1959
  }
1960

1961
  if (pMsg->pData == NULL) {
42,239!
1962
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
1963
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1964
    goto END;
×
1965
  }
1966

1967
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
42,239✔
1968
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
42,239✔
1969

1970
  if (msgEpoch != clientEpoch) {
42,236✔
1971
    tqErrorC("consumer:0x%" PRIx64
33!
1972
             " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1973
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
1974
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
33✔
1975
    goto END;
33✔
1976
  }
1977
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
42,203✔
1978
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, requestId);
42,203!
1979
  if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
42,209✔
1980
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
41,989!
1981
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
220✔
1982
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
188!
1983
  } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
32✔
1984
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
13!
1985
  } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
19!
1986
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
19!
1987
  } else {  // invalid rspType
1988
    tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
×
1989
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1990
    goto END;
×
1991
  }
1992
  pRspWrapper->tmqRspType = rspType;
42,204✔
1993
  pRspWrapper->pollRsp.reqId = requestId;
42,204✔
1994
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
42,204✔
1995
  pMsg->pEpSet = NULL;
42,204✔
1996

1997
END:
46,182✔
1998
  if (pRspWrapper) {
46,182!
1999
    pRspWrapper->code = code;
46,182✔
2000
    pRspWrapper->pollRsp.vgId = vgId;
46,182✔
2001
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
46,182✔
2002
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
46,182✔
2003
    if (code != 0) {
46,191!
2004
      tmqFreeRspWrapper(pRspWrapper);
×
2005
      taosFreeQitem(pRspWrapper);
×
2006
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2007
    } else {
2008
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
46,191!
2009
               tmq ? tmq->consumerId : 0, rspType, vgId, taosQueueItemSize(tmq->mqueue), requestId);
2010
    }
2011
  }
2012

2013

2014
  if (tsem2_post(&tmq->rspSem) != 0){
46,192!
2015
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2016
  }
2017
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
46,192✔
2018
  if (ret != 0){
46,192!
2019
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2020
  }
2021

2022
EXIT:
46,192✔
2023
  taosMemoryFreeClear(pMsg->pData);
46,192!
2024
  taosMemoryFreeClear(pMsg->pEpSet);
46,192!
2025
  return code;
46,192✔
2026
}
2027

2028
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
46,195✔
2029
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
46,195✔
2030
  pReq->withTbName = tmq->withTbName;
46,195✔
2031
  pReq->consumerId = tmq->consumerId;
46,195✔
2032
  pReq->timeout = timeout;
46,195✔
2033
  pReq->epoch = tmq->epoch;
46,195✔
2034
  pReq->reqOffset = pVg->offsetInfo.endOffset;
46,195✔
2035
  pReq->head.vgId = pVg->vgId;
46,195✔
2036
  pReq->useSnapshot = tmq->useSnapshot;
46,195✔
2037
  pReq->reqId = generateRequestId();
46,195✔
2038
  pReq->enableReplay = tmq->replayEnable;
46,195✔
2039
  pReq->sourceExcluded = tmq->sourceExcluded;
46,195✔
2040
  pReq->enableBatchMeta = tmq->enableBatchMeta;
46,195✔
2041
}
46,195✔
2042

2043
void changeByteEndian(char* pData) {
344,944✔
2044
  if (pData == NULL) {
344,944!
2045
    return;
×
2046
  }
2047
  char* p = pData;
344,944✔
2048

2049
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2050
  // length | version:
2051
  int32_t blockVersion = *(int32_t*)p;
344,944✔
2052
  if (blockVersion != BLOCK_VERSION_1) {
344,944!
2053
    tqErrorC("invalid block version:%d", blockVersion);
×
2054
    return;
×
2055
  }
2056
  *(int32_t*)p = BLOCK_VERSION_2;
344,944✔
2057

2058
  p += sizeof(int32_t);
344,944✔
2059
  p += sizeof(int32_t);
344,944✔
2060
  p += sizeof(int32_t);
344,944✔
2061
  int32_t cols = *(int32_t*)p;
344,944✔
2062
  p += sizeof(int32_t);
344,944✔
2063
  p += sizeof(int32_t);
344,944✔
2064
  p += sizeof(uint64_t);
344,944✔
2065
  // check fields
2066
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
344,944✔
2067

2068
  int32_t* colLength = (int32_t*)p;
344,944✔
2069
  for (int32_t i = 0; i < cols; ++i) {
2,204,460✔
2070
    colLength[i] = htonl(colLength[i]);
1,859,516✔
2071
  }
2072
}
2073

2074
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
673,711✔
2075
  if (pRetrieve == NULL) {
673,711!
2076
    return;
×
2077
  }
2078
  if (*(int64_t*)pRetrieve == 0) {
673,711!
2079
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2080
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2081
    if (precision != NULL) {
×
2082
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2083
    }
2084
  } else if (*(int64_t*)pRetrieve == 1) {
673,711✔
2085
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
673,710✔
2086
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
673,710✔
2087
    if (precision != NULL) {
673,695✔
2088
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
328,792✔
2089
    }
2090
  }
2091
}
2092

2093
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
32,357✔
2094
                                        SMqRspObj* pRspObj) {
2095
  pRspObj->resIter = -1;
32,357✔
2096
  pRspObj->resInfo.totalRows = 0;
32,357✔
2097
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
32,357✔
2098

2099
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
32,357✔
2100
  bool needTransformSchema = !pDataRsp->withSchema;
32,357✔
2101
  if (!pDataRsp->withSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
32,357✔
2102
    pDataRsp->withSchema = true;
30,134✔
2103
    pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
30,134✔
2104
    if (pDataRsp->blockSchema == NULL) {
30,134!
2105
      tqErrorC("failed to allocate memory for blockSchema");
×
2106
      return;
×
2107
    }
2108
  }
2109
  // extract the rows in this data packet
2110
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
377,302✔
2111
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
344,945✔
2112
    void*   rawData = NULL;
344,946✔
2113
    int64_t rows = 0;
344,946✔
2114
    // deal with compatibility
2115
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
344,946✔
2116

2117
    pVg->numOfRows += rows;
344,944✔
2118
    (*numOfRows) += rows;
344,944✔
2119
    changeByteEndian(rawData);
344,944✔
2120
    if (needTransformSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
344,945✔
2121
      SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
226,313!
2122
      if (schema) {
226,314!
2123
        if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
452,627!
2124
          tqErrorC("failed to push schema into blockSchema");
×
2125
          continue;
×
2126
        }
2127
      }
2128
    }
2129
  }
2130
}
2131

2132
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
46,192✔
2133
  SMqPollReq      req = {0};
46,192✔
2134
  char*           msg = NULL;
46,192✔
2135
  SMqPollCbParam* pParam = NULL;
46,192✔
2136
  SMsgSendInfo*   sendInfo = NULL;
46,192✔
2137
  int             code = 0;
46,192✔
2138
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
46,192✔
2139

2140
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
46,192✔
2141
  if (msgSize < 0) {
46,191!
2142
    code = TSDB_CODE_INVALID_MSG;
×
2143
    return code;
×
2144
  }
2145

2146
  msg = taosMemoryCalloc(1, msgSize);
46,191!
2147
  if (NULL == msg) {
46,191!
2148
    return terrno;
×
2149
  }
2150

2151
  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
46,191!
2152
    code = TSDB_CODE_INVALID_MSG;
×
2153
    taosMemoryFreeClear(msg);
×
2154
    return code;
×
2155
  }
2156

2157
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
46,192!
2158
  if (pParam == NULL) {
46,191!
2159
    code = terrno;
×
2160
    taosMemoryFreeClear(msg);
×
2161
    return code;
×
2162
  }
2163

2164
  pParam->refId = pTmq->refId;
46,191✔
2165
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
46,191✔
2166
  pParam->vgId = pVg->vgId;
46,191✔
2167
  pParam->requestId = req.reqId;
46,191✔
2168

2169
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
46,191!
2170
  if (sendInfo == NULL) {
46,189!
2171
    taosMemoryFreeClear(pParam);
×
2172
    taosMemoryFreeClear(msg);
×
2173
    return terrno;
×
2174
  }
2175

2176
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
46,189✔
2177
  sendInfo->requestId = req.reqId;
46,189✔
2178
  sendInfo->requestObjRefId = 0;
46,189✔
2179
  sendInfo->param = pParam;
46,189✔
2180
  sendInfo->paramFreeFp = taosAutoMemoryFree;
46,189✔
2181
  sendInfo->fp = tmqPollCb;
46,189✔
2182
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
46,189✔
2183

2184
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
46,189✔
2185
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
46,189✔
2186
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
46,192✔
2187
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
46,192!
2188
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2189
  if (code != 0) {
46,192!
2190
    return code;
×
2191
  }
2192

2193
  pVg->pollCnt++;
46,192✔
2194
  pVg->seekUpdated = false;  // reset this flag.
46,192✔
2195
  pTmq->pollCnt++;
46,192✔
2196

2197
  return 0;
46,192✔
2198
}
2199

2200
// broadcast the poll request to all related vnodes
2201
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
92,420✔
2202
  int32_t code = 0;
92,420✔
2203

2204
  taosWLockLatch(&tmq->lock);
92,420✔
2205
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
92,421✔
2206
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
92,421!
2207

2208
  for (int i = 0; i < numOfTopics; i++) {
189,610✔
2209
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
97,187✔
2210
    if (pTopic == NULL) {
97,186!
2211
      continue;
×
2212
    }
2213
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
97,186✔
2214
    if (pTopic->noPrivilege) {
97,186!
2215
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
×
2216
      continue;
×
2217
    }
2218
    for (int j = 0; j < numOfVg; j++) {
372,723✔
2219
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
275,535✔
2220
      if (pVg == NULL) {
275,536!
2221
        continue;
×
2222
      }
2223
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
275,535✔
2224
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than 10ms
275,535!
2225
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
22,357!
2226
                 tmq->epoch, pVg->vgId);
2227
        continue;
22,357✔
2228
      }
2229

2230
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
253,178✔
2231
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
253,178!
2232
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
23!
2233
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2234
        continue;
23✔
2235
      }
2236

2237
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
253,155✔
2238
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
253,157✔
2239
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
206,965✔
2240
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
206,964!
2241
                 pVg->vgId, vgSkipCnt);
2242
        continue;
206,965✔
2243
      }
2244

2245
      atomic_store_32(&pVg->vgSkipCnt, 0);
46,192✔
2246
      code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
46,192✔
2247
      if (code != TSDB_CODE_SUCCESS) {
46,192!
2248
        goto end;
×
2249
      }
2250
    }
2251
  }
2252

2253
end:
92,423✔
2254
  taosWUnLockLatch(&tmq->lock);
92,423✔
2255
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
92,422!
2256
  return code;
92,422✔
2257
}
2258

2259
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
41,375✔
2260
                         int64_t consumerId, bool hasData) {
2261
  if (!pVg->seekUpdated) {
41,375✔
2262
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
41,373!
2263
    if (hasData) {
41,373✔
2264
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
32,564✔
2265
    }
2266
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
41,373✔
2267
  } else {
2268
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
2!
2269
  }
2270

2271
  // update the status
2272
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
41,375✔
2273

2274
  // update the valid wal version range
2275
  pVg->offsetInfo.walVerBegin = sver;
41,375✔
2276
  pVg->offsetInfo.walVerEnd = ever + 1;
41,375✔
2277
}
41,375✔
2278

2279
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
32,564✔
2280
  typedef union {
2281
    SMqDataRsp      dataRsp;
2282
    SMqMetaRsp      metaRsp;
2283
    SMqBatchMetaRsp batchMetaRsp;
2284
  } MEMSIZE;
2285

2286
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
32,564!
2287
  if (pRspObj == NULL) {
32,564!
2288
    tqErrorC("buildRsp:failed to allocate memory");
×
2289
    return NULL;
×
2290
  }
2291
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
32,564✔
2292
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
32,564✔
2293
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
32,564✔
2294
  pRspObj->vgId = pollRspWrapper->vgId;
32,564✔
2295
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
32,564✔
2296
  return pRspObj;
32,564✔
2297
}
2298

2299
static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
3,871✔
2300
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
3,871✔
2301

2302
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
3,871✔
2303
    int32_t code = askEp(tmq, NULL, false, true);
2,647✔
2304
    if (code != 0) {
2,647!
2305
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2306
    }
2307
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1,224✔
2308
    int32_t code = askEp(tmq, NULL, false, false);
5✔
2309
    if (code != 0) {
5!
2310
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2311
    }
2312
  }
2313
  tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
3,871!
2314
          tstrerror(pRspWrapper->code));
2315
  taosWLockLatch(&tmq->lock);
3,871✔
2316
  SMqClientVg* pVg = NULL;
3,871✔
2317
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
3,871✔
2318
  if (pVg) {
3,871✔
2319
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
3,861✔
2320
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
3,861✔
2321
  }
2322
  taosWUnLockLatch(&tmq->lock);
3,871✔
2323
}
3,871✔
2324
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
47,952✔
2325
  SMqRspObj* pRspObj = NULL;
47,952✔
2326

2327
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
47,952✔
2328
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
6,575!
2329
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
6,575✔
2330
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
6,575✔
2331
    return pRspObj;
6,575✔
2332
  }
2333

2334
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
41,377✔
2335
  taosWLockLatch(&tmq->lock);
41,377✔
2336
  SMqClientVg* pVg = NULL;
41,377✔
2337
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
41,377✔
2338
  if(pVg == NULL) {
41,377✔
2339
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
2!
2340
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2341
    goto END;
2✔
2342
  }
2343
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
41,375✔
2344
  if (pollRspWrapper->pEpset != NULL) {
41,375✔
2345
    pVg->epSet = *pollRspWrapper->pEpset;
24✔
2346
  }
2347

2348
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
82,543✔
2349
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
41,168✔
2350
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
41,168✔
2351

2352
    char buf[TSDB_OFFSET_LEN] = {0};
41,168✔
2353
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
41,168✔
2354
    if (pollRspWrapper->dataRsp.blockNum == 0) {
41,168✔
2355
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
8,811!
2356
                   ", total:%" PRId64 ",QID:0x%" PRIx64,
2357
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2358
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
17,622✔
2359
    } else {
2360
      pRspObj = buildRsp(pollRspWrapper);
32,357✔
2361
      if (pRspObj == NULL) {
32,357!
2362
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2363
        goto END;
×
2364
      }
2365
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA;
32,357✔
2366
      int64_t numOfRows = 0;
32,357✔
2367
      tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
32,357✔
2368
      tmq->totalRows += numOfRows;
32,357✔
2369
      pVg->emptyBlockReceiveTs = 0;
32,357✔
2370
      if (tmq->replayEnable) {
32,357✔
2371
        pVg->blockReceiveTs = taosGetTimestampMs();
27✔
2372
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
27✔
2373
        if (pVg->blockSleepForReplay > 0) {
27✔
2374
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
12!
2375
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2376
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2377
          }
2378
        }
2379
      }
2380
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
32,357!
2381
                   ", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64,
2382
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2383
               pollRspWrapper->reqId);
2384
    }
2385
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
207!
2386
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
207✔
2387
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2388

2389

2390
    pRspObj = buildRsp(pollRspWrapper);
207✔
2391
    if (pRspObj == NULL) {
207!
2392
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2393
      goto END;
×
2394
    }
2395
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
207✔
2396
  }
2397

2398
  END:
×
2399
  taosWUnLockLatch(&tmq->lock);
41,377✔
2400
  return pRspObj;
41,377✔
2401
}
2402

2403
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
92,422✔
2404
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
92,422!
2405

2406
  void* returnVal = NULL;
92,422✔
2407
  while (1) {
19,259✔
2408
    SMqRspWrapper* pRspWrapper = NULL;
111,681✔
2409
    if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
111,681✔
2410
      if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
104,603✔
2411
        return NULL;
59,858✔
2412
      }
2413
      if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
44,745!
2414
        return NULL;
×
2415
      }
2416
    }
2417

2418
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
51,823!
2419
    if (pRspWrapper->code != 0) {
51,823✔
2420
      processMqRspError(tmq, pRspWrapper);
3,871✔
2421
    }else{
2422
      returnVal = processMqRsp(tmq, pRspWrapper);
47,952✔
2423
    }
2424
    tmqFreeRspWrapper(pRspWrapper);
51,823✔
2425
    taosFreeQitem(pRspWrapper);
51,823✔
2426
    if(returnVal != NULL){
51,823✔
2427
      break;
32,564✔
2428
    }
2429
  }
2430

2431
  return returnVal;
32,564✔
2432
}
2433

2434
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
32,820✔
2435
  if (tmq == NULL) return NULL;
32,820✔
2436

2437
  void*   rspObj = NULL;
32,819✔
2438
  int64_t startTime = taosGetTimestampMs();
32,819✔
2439

2440
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
32,819!
2441
           timeout);
2442

2443
  // in no topic status, delayed task also need to be processed
2444
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
32,819!
UNCOV
2445
    tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
×
UNCOV
2446
    taosMsleep(500);  //     sleep for a while
×
UNCOV
2447
    return NULL;
×
2448
  }
2449

2450
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
32,819✔
2451

2452
  while (1) {
2453
    tmqHandleAllDelayedTask(tmq);
92,416✔
2454

2455
    if (tmqPollImpl(tmq, timeout) < 0) {
92,420!
2456
      tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
×
2457
    }
2458

2459
    rspObj = tmqHandleAllRsp(tmq, timeout);
92,422✔
2460
    if (rspObj) {
92,422✔
2461
      tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
32,564!
2462
      return (TAOS_RES*)rspObj;
32,564✔
2463
    }
2464

2465
    if (timeout >= 0) {
59,858!
2466
      int64_t currentTime = taosGetTimestampMs();
59,858✔
2467
      int64_t elapsedTime = currentTime - startTime;
59,858✔
2468
      if (elapsedTime > timeout || elapsedTime < 0) {
59,858!
2469
        tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
255!
2470
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
2471
        return NULL;
255✔
2472
      }
2473
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
59,603✔
2474
    } else {
2475
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2476
    }
2477
  }
2478
}
2479

2480
static void displayConsumeStatistics(tmq_t* pTmq) {
764✔
2481
  taosRLockLatch(&pTmq->lock);
764✔
2482
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
764✔
2483
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
764!
2484
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2485

2486
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
764!
2487
  for (int32_t i = 0; i < numOfTopics; ++i) {
1,231✔
2488
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
467✔
2489
    if (pTopics == NULL) continue;
467!
2490
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
467!
2491
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
467✔
2492
    for (int32_t j = 0; j < numOfVgs; ++j) {
1,690✔
2493
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
1,223✔
2494
      if (pVg == NULL) continue;
1,223!
2495
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
1,223!
2496
    }
2497
  }
2498
  taosRUnLockLatch(&pTmq->lock);
764✔
2499
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
764!
2500
}
764✔
2501

2502
int32_t tmq_unsubscribe(tmq_t* tmq) {
764✔
2503
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
764!
2504
  int32_t code = 0;
764✔
2505
  int8_t status = atomic_load_8(&tmq->status);
764✔
2506
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
764!
2507

2508
  displayConsumeStatistics(tmq);
764✔
2509
  if (status != TMQ_CONSUMER_STATUS__READY) {
764✔
2510
    tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
6!
2511
    goto END;
6✔
2512
  }
2513
  if (tmq->autoCommit) {
758✔
2514
    code = tmq_commit_sync(tmq, NULL);
336✔
2515
    if (code != 0) {
336!
2516
       goto END;
×
2517
    }
2518
  }
2519
  tmqSendHbReq((void*)(tmq->refId), NULL);
758✔
2520

2521
  tmq_list_t* lst = tmq_list_new();
758✔
2522
  if (lst == NULL) {
758!
2523
    code = terrno;
×
2524
    goto END;
×
2525
  }
2526
  code = tmq_subscribe(tmq, lst);
758✔
2527
  tmq_list_destroy(lst);
758✔
2528
  if(code != 0){
758!
2529
    goto END;
×
2530
  }
2531

2532
END:
758✔
2533
  return code;
764✔
2534
}
2535

2536
int32_t tmq_consumer_close(tmq_t* tmq) {
411✔
2537
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
411✔
2538
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
410!
2539
  int32_t code = tmq_unsubscribe(tmq);
410✔
2540
  if (code == 0) {
410!
2541
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
410✔
2542
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
410✔
2543
    if (code != 0){
410!
2544
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2545
    }
2546
  }
2547
  return code;
410✔
2548
}
2549

2550
const char* tmq_err2str(int32_t err) {
238✔
2551
  if (err == 0) {
238✔
2552
    return "success";
221✔
2553
  } else if (err == -1) {
17!
2554
    return "fail";
×
2555
  } else {
2556
    if (*(taosGetErrMsg()) == 0) {
17✔
2557
      return tstrerror(err);
4✔
2558
    } else {
2559
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
13✔
2560
      return (const char*)taosGetErrMsgReturn();
13✔
2561
    }
2562
  }
2563
}
2564

2565
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
32,544✔
2566
  if (res == NULL) {
32,544✔
2567
    return TMQ_RES_INVALID;
16✔
2568
  }
2569
  if (TD_RES_TMQ(res)) {
32,528✔
2570
    return TMQ_RES_DATA;
32,325✔
2571
  } else if (TD_RES_TMQ_META(res)) {
203✔
2572
    return TMQ_RES_TABLE_META;
161✔
2573
  } else if (TD_RES_TMQ_METADATA(res)) {
42✔
2574
    return TMQ_RES_METADATA;
25✔
2575
  } else if (TD_RES_TMQ_BATCH_META(res)) {
17!
2576
    return TMQ_RES_TABLE_META;
17✔
2577
  } else {
2578
    return TMQ_RES_INVALID;
×
2579
  }
2580
}
2581

2582
const char* tmq_get_topic_name(TAOS_RES* res) {
32,184✔
2583
  if (res == NULL) {
32,184✔
2584
    return NULL;
16✔
2585
  }
2586
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
32,168✔
2587
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
174!
2588
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
32,168✔
2589
    if (tmp == NULL) {
32,168!
2590
      return NULL;
×
2591
    }
2592
    return tmp + 1;
32,168✔
2593
  } else {
2594
    return NULL;
×
2595
  }
2596
}
2597

2598
const char* tmq_get_db_name(TAOS_RES* res) {
32,185✔
2599
  if (res == NULL) {
32,185✔
2600
    return NULL;
17✔
2601
  }
2602

2603
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
32,168✔
2604
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
174!
2605
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
32,168✔
2606
    if (tmp == NULL) {
32,168!
2607
      return NULL;
×
2608
    }
2609
    return tmp + 1;
32,168✔
2610
  } else {
2611
    return NULL;
×
2612
  }
2613
}
2614

2615
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
32,188✔
2616
  if (res == NULL) {
32,188✔
2617
    return TSDB_CODE_INVALID_PARA;
20✔
2618
  }
2619
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
32,168✔
2620
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
174!
2621
    return ((SMqRspObj*)res)->vgId;
32,168✔
2622
  } else {
2623
    return TSDB_CODE_INVALID_PARA;
×
2624
  }
2625
}
2626

2627
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
18✔
2628
  if (res == NULL) {
18!
2629
    return TSDB_CODE_INVALID_PARA;
18✔
2630
  }
UNCOV
2631
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
×
UNCOV
2632
    SMqRspObj* pRspObj = (SMqRspObj*)res;
×
UNCOV
2633
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
×
UNCOV
2634
    if (pOffset->type == TMQ_OFFSET__LOG) {
×
UNCOV
2635
      return pOffset->version;
×
2636
    } else {
2637
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2638
    }
2639
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
2640
    SMqRspObj* pRspObj = (SMqRspObj*)res;
×
2641
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
×
2642
      return pRspObj->rspOffset.version;
×
2643
    }
2644
  } else {
2645
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2646
  }
2647

2648
  // data from tsdb, no valid offset info
2649
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2650
}
2651

2652
const char* tmq_get_table_name(TAOS_RES* res) {
15,083,512✔
2653
  if (res == NULL) {
15,083,512✔
2654
    return NULL;
25✔
2655
  }
2656
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
15,083,487✔
2657
    SMqRspObj* pRspObj = (SMqRspObj*)res;
15,083,485✔
2658
    SMqDataRsp* data = &pRspObj->dataRsp;
15,083,485✔
2659
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
15,083,485!
2660
        pRspObj->resIter >= data->blockNum) {
4,275,700!
2661
      return NULL;
10,807,784✔
2662
    }
2663
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
4,275,701✔
2664
  }
2665
  return NULL;
2✔
2666
}
2667

2668
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
13✔
2669
  if (tmq == NULL) {
13!
2670
    tqErrorC("invalid tmq handle, null");
×
2671
    if (cb != NULL) {
×
2672
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2673
    }
2674
    return;
×
2675
  }
2676
  if (pRes == NULL) {  // here needs to commit all offsets.
13!
2677
    asyncCommitAllOffsets(tmq, cb, param);
13✔
2678
  } else {  // only commit one offset
2679
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2680
  }
2681
}
2682

2683
static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) {
664✔
2684
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
664✔
2685
  pInfo->code = code;
664✔
2686
  if (tsem2_post(&pInfo->sem) != 0){
664!
2687
    tqErrorC("failed to post rsp sem in commit cb");
×
2688
  }
2689
}
664✔
2690

2691
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
660✔
2692
  if (tmq == NULL) {
660!
2693
    tqErrorC("invalid tmq handle, null");
×
2694
    return TSDB_CODE_INVALID_PARA;
×
2695
  }
2696

2697
  int32_t code = 0;
660✔
2698

2699
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
660!
2700
  if (pInfo == NULL) {
660!
2701
    tqErrorC("failed to allocate memory for sync commit");
×
2702
    return terrno;
×
2703
  }
2704

2705
  code = tsem2_init(&pInfo->sem, 0, 0);
660✔
2706
  if (code != 0) {
660!
2707
    tqErrorC("failed to init sem for sync commit");
×
2708
    taosMemoryFree(pInfo);
×
2709
    return code;
×
2710
  }
2711
  pInfo->code = 0;
660✔
2712

2713
  if (pRes == NULL) {
660✔
2714
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
631✔
2715
  } else {
2716
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
29✔
2717
  }
2718

2719
  if (tsem2_wait(&pInfo->sem) != 0){
660!
2720
    tqErrorC("failed to wait sem for sync commit");
×
2721
  }
2722
  code = pInfo->code;
660✔
2723

2724
  if(tsem2_destroy(&pInfo->sem) != 0) {
660!
2725
    tqErrorC("failed to destroy sem for sync commit");
×
2726
  }
2727
  taosMemoryFree(pInfo);
660!
2728

2729
  tqInfoC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
660!
2730
  return code;
660✔
2731
}
2732

2733
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2734
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
25✔
2735
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
25!
2736
    tqErrorC("Assignment or poll interface need to be called first");
×
2737
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
2738
  }
2739

2740
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
25!
2741
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
1!
2742
             offset->walVerBegin, offset->walVerEnd);
2743
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
1✔
2744
  }
2745

2746
  return 0;
24✔
2747
}
2748

2749
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
28✔
2750
  if (tmq == NULL || pTopicName == NULL) {
28!
2751
    tqErrorC("invalid tmq handle, null");
×
2752
    return TSDB_CODE_INVALID_PARA;
×
2753
  }
2754

2755
  int32_t accId = tmq->pTscObj->acctId;
28✔
2756
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
28✔
2757
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
28✔
2758

2759
  taosWLockLatch(&tmq->lock);
28✔
2760
  SMqClientVg* pVg = NULL;
28✔
2761
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
28✔
2762
  if (code != 0) {
28✔
2763
    taosWUnLockLatch(&tmq->lock);
20✔
2764
    return code;
20✔
2765
  }
2766

2767
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
8✔
2768
  code = checkWalRange(pOffsetInfo, offset);
8✔
2769
  if (code != 0) {
8!
2770
    taosWUnLockLatch(&tmq->lock);
×
2771
    return code;
×
2772
  }
2773
  taosWUnLockLatch(&tmq->lock);
8✔
2774

2775
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
8✔
2776

2777
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
8!
2778
  if (pInfo == NULL) {
8!
2779
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2780
    return terrno;
×
2781
  }
2782

2783
  code = tsem2_init(&pInfo->sem, 0, 0);
8✔
2784
  if (code != 0) {
8!
2785
    taosMemoryFree(pInfo);
×
2786
    return code;
×
2787
  }
2788
  pInfo->code = 0;
8✔
2789

2790
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
8✔
2791
  if (code == 0) {
8✔
2792
    if (tsem2_wait(&pInfo->sem) != 0){
4!
2793
      tqErrorC("failed to wait sem for sync commit offset");
×
2794
    }
2795
    code = pInfo->code;
4✔
2796
  }
2797

2798
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
8✔
2799
  if(tsem2_destroy(&pInfo->sem) != 0) {
8!
2800
    tqErrorC("failed to destroy sem for sync commit offset");
×
2801
  }
2802
  taosMemoryFree(pInfo);
8!
2803

2804
  tqInfoC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
8!
2805
          offset, tstrerror(code));
2806

2807
  return code;
8✔
2808
}
2809

2810
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
23✔
2811
                             void* param) {
2812
  int32_t code = 0;
23✔
2813
  if (tmq == NULL || pTopicName == NULL) {
23!
2814
    tqErrorC("invalid tmq handle, null");
×
2815
    code = TSDB_CODE_INVALID_PARA;
×
2816
    goto end;
×
2817
  }
2818

2819
  int32_t accId = tmq->pTscObj->acctId;
23✔
2820
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
23✔
2821
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
23✔
2822

2823
  taosWLockLatch(&tmq->lock);
23✔
2824
  SMqClientVg* pVg = NULL;
23✔
2825
  code = getClientVg(tmq, tname, vgId, &pVg);
23✔
2826
  if (code != 0) {
23!
2827
    taosWUnLockLatch(&tmq->lock);
23✔
2828
    goto end;
23✔
2829
  }
2830

2831
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
2832
  code = checkWalRange(pOffsetInfo, offset);
×
2833
  if (code != 0) {
×
2834
    taosWUnLockLatch(&tmq->lock);
×
2835
    goto end;
×
2836
  }
2837
  taosWUnLockLatch(&tmq->lock);
×
2838

2839
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
×
2840

2841
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
×
2842

2843
  tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
×
2844
          offset, tstrerror(code));
2845

2846
end:
×
2847
  if (code != 0 && cb != NULL) {
23!
2848
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
2849
    cb(tmq, code, param);
×
2850
  }
2851
}
23✔
2852

2853

2854
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
361,079✔
2855
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
361,079✔
2856
  SMqDataRsp* data = &pRspObj->dataRsp;
361,079✔
2857

2858
  pRspObj->resIter++;
361,079✔
2859
  if (pRspObj->resIter < data->blockNum) {
361,079✔
2860
    if (data->withSchema) {
328,803!
2861
      doFreeReqResultInfo(&pRspObj->resInfo);
328,803✔
2862
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
328,805✔
2863
      if (pSW) {
328,805!
2864
        TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols));
328,805!
2865
      }
2866
    }
2867

2868
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
328,802✔
2869
    void*   rawData = NULL;
328,800✔
2870
    int64_t rows = 0;
328,800✔
2871
    int32_t precision = 0;
328,800✔
2872
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
328,800✔
2873

2874
    pRspObj->resInfo.pData = rawData;
328,791✔
2875
    pRspObj->resInfo.numOfRows = rows;
328,791✔
2876
    pRspObj->resInfo.current = 0;
328,791✔
2877
    pRspObj->resInfo.precision = precision;
328,791✔
2878

2879
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
328,791✔
2880
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4);
328,791✔
2881
    if (code != 0) {
328,796!
2882
      return code;
×
2883
    }
2884
    *pResInfo = &pRspObj->resInfo;
328,796✔
2885
    return code;
328,796✔
2886
  }
2887

2888
  return TSDB_CODE_TSC_INTERNAL_ERROR;
32,276✔
2889
}
2890

2891
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
3✔
2892
  if (param == NULL) {
3!
2893
    return code;
×
2894
  }
2895
  SMqVgWalInfoParam* pParam = param;
3✔
2896
  SMqVgCommon*       pCommon = pParam->pCommon;
3✔
2897

2898
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
3✔
2899
  if (code != TSDB_CODE_SUCCESS) {
3!
2900
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2901
             pParam->vgId, pCommon->pTopicName);
2902

2903
  } else {
2904
    SMqDataRsp rsp = {0};
3✔
2905
    SDecoder   decoder = {0};
3✔
2906
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
3✔
2907
    code = tDecodeMqDataRsp(&decoder, &rsp);
3✔
2908
    tDecoderClear(&decoder);
3✔
2909
    if (code != 0) {
3!
2910
      goto END;
×
2911
    }
2912

2913
    SMqRspHead*          pHead = pMsg->pData;
3✔
2914
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
3✔
2915
                                       .end = pHead->walever + 1,
3✔
2916
                                       .currentOffset = rsp.rspOffset.version,
3✔
2917
                                       .vgId = pParam->vgId};
3✔
2918

2919
    (void)taosThreadMutexLock(&pCommon->mutex);
3✔
2920
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
6!
2921
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2922
               pParam->vgId, pCommon->pTopicName);
2923
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2924
    }
2925
    (void)taosThreadMutexUnlock(&pCommon->mutex);
3✔
2926
  }
2927

2928
END:
3✔
2929
  pCommon->code = code;
3✔
2930
  if (total == pParam->totalReq) {
3✔
2931
    if (tsem2_post(&pCommon->rsp) != 0) {
2!
2932
      tqErrorC("failed to post semaphore in get wal cb");
×
2933
    }
2934
  }
2935

2936
  if (pMsg) {
3!
2937
    taosMemoryFree(pMsg->pData);
3!
2938
    taosMemoryFree(pMsg->pEpSet);
3!
2939
  }
2940

2941
  return code;
3✔
2942
}
2943

2944
static void destroyCommonInfo(SMqVgCommon* pCommon) {
8✔
2945
  if (pCommon == NULL) {
8✔
2946
    return;
6✔
2947
  }
2948
  taosArrayDestroy(pCommon->pList);
2✔
2949
  if(tsem2_destroy(&pCommon->rsp) != 0) {
2!
2950
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
2951
  }
2952
  (void)taosThreadMutexDestroy(&pCommon->mutex);
2✔
2953
  taosMemoryFree(pCommon->pTopicName);
2!
2954
  taosMemoryFree(pCommon);
2!
2955
}
2956

2957
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
61✔
2958
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
61!
2959
    return true;
×
2960
  }
2961
  return false;
61✔
2962
}
2963

2964
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
2✔
2965
  SMqCommittedParam* pParam = param;
2✔
2966

2967
  if (code != 0) {
2✔
2968
    goto end;
1✔
2969
  }
2970
  if (pMsg) {
1!
2971
    SDecoder decoder = {0};
1✔
2972
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
1✔
2973
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
1✔
2974
    if (err < 0) {
1!
2975
      tOffsetDestroy(&pParam->vgOffset.offset);
×
2976
      code = err;
×
2977
      goto end;
×
2978
    }
2979
    tDecoderClear(&decoder);
1✔
2980
  }
2981

2982
end:
×
2983
  if (pMsg) {
2!
2984
    taosMemoryFree(pMsg->pData);
2!
2985
    taosMemoryFree(pMsg->pEpSet);
2!
2986
  }
2987
  pParam->code = code;
2✔
2988
  if (tsem2_post(&pParam->sem) != 0){
2!
2989
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
2990
  }
2991
  return code;
2✔
2992
}
2993

2994
int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) {
2✔
2995
  int32_t     code = 0;
2✔
2996
  SMqVgOffset pOffset = {0};
2✔
2997

2998
  pOffset.consumerId = tmq->consumerId;
2✔
2999
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
2✔
3000

3001
  int32_t len = 0;
2✔
3002
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
2!
3003
  if (code < 0) {
2!
3004
    return TSDB_CODE_INVALID_PARA;
×
3005
  }
3006

3007
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
2!
3008
  if (buf == NULL) {
2!
3009
    return terrno;
×
3010
  }
3011

3012
  ((SMsgHead*)buf)->vgId = htonl(vgId);
2✔
3013

3014
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
2✔
3015

3016
  SEncoder encoder = {0};
2✔
3017
  tEncoderInit(&encoder, abuf, len);
2✔
3018
  code = tEncodeMqVgOffset(&encoder, &pOffset);
2✔
3019
  if (code < 0) {
2!
3020
    taosMemoryFree(buf);
×
3021
    tEncoderClear(&encoder);
×
3022
    return code;
×
3023
  }
3024
  tEncoderClear(&encoder);
2✔
3025

3026
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2!
3027
  if (sendInfo == NULL) {
2!
3028
    taosMemoryFree(buf);
×
3029
    return terrno;
×
3030
  }
3031

3032
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
2!
3033
  if (pParam == NULL) {
2!
3034
    taosMemoryFree(buf);
×
3035
    taosMemoryFree(sendInfo);
×
3036
    return terrno;
×
3037
  }
3038
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
2!
3039
    taosMemoryFree(buf);
×
3040
    taosMemoryFree(sendInfo);
×
3041
    taosMemoryFree(pParam);
×
3042
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3043
  }
3044

3045
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
2✔
3046
  sendInfo->requestId = generateRequestId();
2✔
3047
  sendInfo->requestObjRefId = 0;
2✔
3048
  sendInfo->param = pParam;
2✔
3049
  sendInfo->fp = tmCommittedCb;
2✔
3050
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
2✔
3051

3052
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
2✔
3053
  if (code != 0) {
2!
3054
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3055
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3056
    }
3057
    taosMemoryFree(pParam);
×
3058
    return code;
×
3059
  }
3060

3061
  if (tsem2_wait(&pParam->sem) != 0){
2!
3062
    tqErrorC("failed to wait semaphore in get committed from server");
×
3063
  }
3064
  code = pParam->code;
2✔
3065
  if (code == TSDB_CODE_SUCCESS) {
2✔
3066
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
1!
3067
      code = pParam->vgOffset.offset.val.version;
1✔
3068
    } else {
3069
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3070
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3071
    }
3072
  }
3073
  if(tsem2_destroy(&pParam->sem) != 0) {
2!
3074
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3075
  }
3076
  taosMemoryFree(pParam);
2!
3077

3078
  return code;
2✔
3079
}
3080

3081
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
40✔
3082
  if (tmq == NULL || pTopicName == NULL) {
40!
3083
    tqErrorC("invalid tmq handle, null");
×
3084
    return TSDB_CODE_INVALID_PARA;
×
3085
  }
3086

3087
  int32_t accId = tmq->pTscObj->acctId;
40✔
3088
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
40✔
3089
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
40✔
3090

3091
  taosWLockLatch(&tmq->lock);
40✔
3092

3093
  SMqClientVg* pVg = NULL;
40✔
3094
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
40✔
3095
  if (code != 0) {
40✔
3096
    taosWUnLockLatch(&tmq->lock);
28✔
3097
    return code;
28✔
3098
  }
3099

3100
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
12✔
3101
  int32_t        type = pOffsetInfo->endOffset.type;
12✔
3102
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
12!
3103
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3104
    taosWUnLockLatch(&tmq->lock);
×
3105
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3106
  }
3107

3108
  code = checkWalRange(pOffsetInfo, -1);
12✔
3109
  if (code != 0) {
12!
3110
    taosWUnLockLatch(&tmq->lock);
×
3111
    return code;
×
3112
  }
3113
  SEpSet  epSet = pVg->epSet;
12✔
3114
  int64_t begin = pVg->offsetInfo.walVerBegin;
12✔
3115
  int64_t end = pVg->offsetInfo.walVerEnd;
12✔
3116
  taosWUnLockLatch(&tmq->lock);
12✔
3117

3118
  int64_t position = 0;
12✔
3119
  if (type == TMQ_OFFSET__LOG) {
12✔
3120
    position = pOffsetInfo->endOffset.version;
11✔
3121
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
1!
3122
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3123
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
1!
3124
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3125
        position = begin;
×
3126
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3127
        position = end;
×
3128
      }
3129
    } else {
3130
      position = code;
1✔
3131
    }
3132
  } else {
3133
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3134
  }
3135

3136
  tqInfoC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
12!
3137
  return position;
12✔
3138
}
3139

3140
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
34✔
3141
  if (tmq == NULL || pTopicName == NULL) {
34!
3142
    tqErrorC("invalid tmq handle, null");
×
3143
    return TSDB_CODE_INVALID_PARA;
×
3144
  }
3145

3146
  int32_t accId = tmq->pTscObj->acctId;
34✔
3147
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
34✔
3148
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
34✔
3149

3150
  taosWLockLatch(&tmq->lock);
34✔
3151

3152
  SMqClientVg* pVg = NULL;
34✔
3153
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
34✔
3154
  if (code != 0) {
34✔
3155
    taosWUnLockLatch(&tmq->lock);
19✔
3156
    return code;
19✔
3157
  }
3158

3159
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
15✔
3160
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
15!
3161
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3162
             pOffsetInfo->endOffset.type);
3163
    taosWUnLockLatch(&tmq->lock);
×
3164
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3165
  }
3166

3167
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
15!
3168
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3169
             pOffsetInfo->committedOffset.type);
3170
    taosWUnLockLatch(&tmq->lock);
×
3171
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3172
  }
3173

3174
  int64_t committed = 0;
15✔
3175
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
15✔
3176
    committed = pOffsetInfo->committedOffset.version;
14✔
3177
    taosWUnLockLatch(&tmq->lock);
14✔
3178
    goto end;
14✔
3179
  }
3180
  SEpSet epSet = pVg->epSet;
1✔
3181
  taosWUnLockLatch(&tmq->lock);
1✔
3182

3183
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3184

3185
end:
15✔
3186
  tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
15!
3187
  return committed;
15✔
3188
}
3189

3190
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
20✔
3191
                                 int32_t* numOfAssignment) {
3192
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
20!
3193
    tqErrorC("invalid tmq handle, null");
12!
3194
    return TSDB_CODE_INVALID_PARA;
12✔
3195
  }
3196
  *numOfAssignment = 0;
8✔
3197
  *assignment = NULL;
8✔
3198
  SMqVgCommon* pCommon = NULL;
8✔
3199

3200
  int32_t accId = tmq->pTscObj->acctId;
8✔
3201
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
8✔
3202
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
8✔
3203

3204
  taosWLockLatch(&tmq->lock);
8✔
3205

3206
  SMqClientTopic* pTopic = NULL;
8✔
3207
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
8✔
3208
  if (code != 0) {
8!
UNCOV
3209
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
UNCOV
3210
    goto end;
×
3211
  }
3212

3213
  // in case of snapshot is opened, no valid offset will return
3214
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
8✔
3215
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
22✔
3216
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
14✔
3217
    if (pClientVg == NULL) {
14!
3218
      continue;
×
3219
    }
3220
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
14✔
3221
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
14!
3222
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3223
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3224
      goto end;
×
3225
    }
3226
  }
3227

3228
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
8!
3229
  if (*assignment == NULL) {
8!
3230
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3231
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3232
    code = terrno;
×
3233
    goto end;
×
3234
  }
3235

3236
  bool needFetch = false;
8✔
3237

3238
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
20✔
3239
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
14✔
3240
    if (pClientVg == NULL) {
14!
3241
      continue;
×
3242
    }
3243
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
14✔
3244
      needFetch = true;
2✔
3245
      break;
2✔
3246
    }
3247

3248
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
12✔
3249
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
12✔
3250
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
12✔
3251
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
12✔
3252
    pAssignment->vgId = pClientVg->vgId;
12✔
3253
    tqInfoC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
12!
3254
            pAssignment->currentOffset);
3255
  }
3256

3257
  if (needFetch) {
8✔
3258
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
2!
3259
    if (pCommon == NULL) {
2!
3260
      code = terrno;
×
3261
      goto end;
×
3262
    }
3263

3264
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
2✔
3265
    if (pCommon->pList == NULL) {
2!
3266
      code = terrno;
×
3267
      goto end;
×
3268
    }
3269

3270
    code = tsem2_init(&pCommon->rsp, 0, 0);
2✔
3271
    if (code != 0) {
2!
3272
      goto end;
×
3273
    }
3274
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
2✔
3275
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
2!
3276
    if (pCommon->pTopicName == NULL) {
2!
3277
      code = terrno;
×
3278
      goto end;
×
3279
    }
3280
    pCommon->consumerId = tmq->consumerId;
2✔
3281

3282
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
5✔
3283
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
3✔
3284
      if (pClientVg == NULL) {
3!
3285
        continue;
×
3286
      }
3287
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
3!
3288
      if (pParam == NULL) {
3!
3289
        code = terrno;
×
3290
        goto end;
×
3291
      }
3292

3293
      pParam->epoch = tmq->epoch;
3✔
3294
      pParam->vgId = pClientVg->vgId;
3✔
3295
      pParam->totalReq = *numOfAssignment;
3✔
3296
      pParam->pCommon = pCommon;
3✔
3297

3298
      SMqPollReq req = {0};
3✔
3299
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
3✔
3300
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
3✔
3301

3302
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
3✔
3303
      if (msgSize < 0) {
3!
3304
        taosMemoryFree(pParam);
×
3305
        code = msgSize;
×
3306
        goto end;
×
3307
      }
3308

3309
      char* msg = taosMemoryCalloc(1, msgSize);
3!
3310
      if (NULL == msg) {
3!
3311
        taosMemoryFree(pParam);
×
3312
        code = terrno;
×
3313
        goto end;
×
3314
      }
3315

3316
      msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
3✔
3317
      if (msgSize < 0) {
3!
3318
        taosMemoryFree(msg);
×
3319
        taosMemoryFree(pParam);
×
3320
        code = msgSize;
×
3321
        goto end;
×
3322
      }
3323

3324
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3!
3325
      if (sendInfo == NULL) {
3!
3326
        taosMemoryFree(pParam);
×
3327
        taosMemoryFree(msg);
×
3328
        code = terrno;
×
3329
        goto end;
×
3330
      }
3331

3332
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
3✔
3333
      sendInfo->requestId = req.reqId;
3✔
3334
      sendInfo->requestObjRefId = 0;
3✔
3335
      sendInfo->param = pParam;
3✔
3336
      sendInfo->paramFreeFp = taosAutoMemoryFree;
3✔
3337
      sendInfo->fp = tmqGetWalInfoCb;
3✔
3338
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
3✔
3339

3340
      // int64_t transporterId = 0;
3341
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
3✔
3342
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
3✔
3343

3344
      tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
3!
3345
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3346
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
3✔
3347
      if (code != 0) {
3!
3348
        goto end;
×
3349
      }
3350
    }
3351

3352
    if (tsem2_wait(&pCommon->rsp) != 0){
2!
3353
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3354
    }
3355
    code = pCommon->code;
2✔
3356

3357
    if (code != TSDB_CODE_SUCCESS) {
2!
3358
      goto end;
×
3359
    }
3360
    int32_t num = taosArrayGetSize(pCommon->pList);
2✔
3361
    for (int32_t i = 0; i < num; ++i) {
5✔
3362
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
3✔
3363
    }
3364
    *numOfAssignment = num;
2✔
3365

3366
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5✔
3367
      tmq_topic_assignment* p = &(*assignment)[j];
3✔
3368

3369
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
8✔
3370
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
5✔
3371
        if (pClientVg == NULL) {
5!
3372
          continue;
×
3373
        }
3374
        if (pClientVg->vgId != p->vgId) {
5✔
3375
          continue;
2✔
3376
        }
3377

3378
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
3✔
3379
        tqInfoC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
3!
3380
                p->vgId, p->currentOffset);
3381

3382
        pOffsetInfo->walVerBegin = p->begin;
3✔
3383
        pOffsetInfo->walVerEnd = p->end;
3✔
3384
      }
3385
    }
3386
  }
3387

3388
end:
8✔
3389
  if (code != TSDB_CODE_SUCCESS) {
8!
UNCOV
3390
    taosMemoryFree(*assignment);
×
UNCOV
3391
    *assignment = NULL;
×
UNCOV
3392
    *numOfAssignment = 0;
×
3393
  }
3394
  destroyCommonInfo(pCommon);
8✔
3395
  taosWUnLockLatch(&tmq->lock);
8✔
3396
  return code;
8✔
3397
}
3398

3399
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
26✔
3400
  if (pAssignment == NULL) {
26✔
3401
    return;
19✔
3402
  }
3403

3404
  taosMemoryFree(pAssignment);
7!
3405
}
3406

3407
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
4✔
3408
  if (pMsg) {
4!
3409
    taosMemoryFree(pMsg->pData);
4!
3410
    taosMemoryFree(pMsg->pEpSet);
4!
3411
  }
3412
  if (param == NULL) {
4!
3413
    return code;
×
3414
  }
3415
  SMqSeekParam* pParam = param;
4✔
3416
  pParam->code = code;
4✔
3417
  if (tsem2_post(&pParam->sem) != 0){
4!
3418
    tqErrorC("failed to post sem in tmqSeekCb");
×
3419
  }
3420
  return 0;
4✔
3421
}
3422

3423
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3424
// there is no data to poll
3425
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
27✔
3426
  if (tmq == NULL || pTopicName == NULL) {
27!
3427
    tqErrorC("invalid tmq handle, null");
×
3428
    return TSDB_CODE_INVALID_PARA;
×
3429
  }
3430

3431
  int32_t accId = tmq->pTscObj->acctId;
27✔
3432
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
27✔
3433
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
27✔
3434

3435
  taosWLockLatch(&tmq->lock);
27✔
3436

3437
  SMqClientVg* pVg = NULL;
27✔
3438
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
27✔
3439
  if (code != 0) {
27✔
3440
    taosWUnLockLatch(&tmq->lock);
22✔
3441
    return code;
22✔
3442
  }
3443

3444
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
5✔
3445

3446
  int32_t type = pOffsetInfo->endOffset.type;
5✔
3447
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
5!
3448
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3449
    taosWUnLockLatch(&tmq->lock);
×
3450
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3451
  }
3452

3453
  code = checkWalRange(pOffsetInfo, offset);
5✔
3454
  if (code != 0) {
5✔
3455
    taosWUnLockLatch(&tmq->lock);
1✔
3456
    return code;
1✔
3457
  }
3458

3459
  tqInfoC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
4!
3460
  // update the offset, and then commit to vnode
3461
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
4✔
3462
  pOffsetInfo->endOffset.version = offset;
4✔
3463
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
4✔
3464
  pVg->seekUpdated = true;
4✔
3465
  SEpSet epSet = pVg->epSet;
4✔
3466
  taosWUnLockLatch(&tmq->lock);
4✔
3467

3468
  SMqSeekReq req = {0};
4✔
3469
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
4✔
3470
  req.head.vgId = vgId;
4✔
3471
  req.consumerId = tmq->consumerId;
4✔
3472

3473
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
4✔
3474
  if (msgSize < 0) {
4!
3475
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3476
  }
3477

3478
  char* msg = taosMemoryCalloc(1, msgSize);
4!
3479
  if (NULL == msg) {
4!
3480
    return terrno;
×
3481
  }
3482

3483
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
4!
3484
    taosMemoryFree(msg);
×
3485
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3486
  }
3487

3488
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4!
3489
  if (sendInfo == NULL) {
4!
3490
    taosMemoryFree(msg);
×
3491
    return terrno;
×
3492
  }
3493

3494
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
4!
3495
  if (pParam == NULL) {
4!
3496
    taosMemoryFree(msg);
×
3497
    taosMemoryFree(sendInfo);
×
3498
    return terrno;
×
3499
  }
3500
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
4!
3501
    taosMemoryFree(msg);
×
3502
    taosMemoryFree(sendInfo);
×
3503
    taosMemoryFree(pParam);
×
3504
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3505
  }
3506

3507
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
4✔
3508
  sendInfo->requestId = generateRequestId();
4✔
3509
  sendInfo->requestObjRefId = 0;
4✔
3510
  sendInfo->param = pParam;
4✔
3511
  sendInfo->fp = tmqSeekCb;
4✔
3512
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
4✔
3513

3514
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
4✔
3515
  if (code != 0) {
4!
3516
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3517
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3518
    }
3519
    taosMemoryFree(pParam);
×
3520
    return code;
×
3521
  }
3522

3523
  if (tsem2_wait(&pParam->sem) != 0){
4!
3524
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3525
  }
3526
  code = pParam->code;
4✔
3527
  if(tsem2_destroy(&pParam->sem) != 0) {
4!
3528
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3529
  }
3530
  taosMemoryFree(pParam);
4!
3531

3532
  tqInfoC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
4!
3533

3534
  return code;
4✔
3535
}
3536

3537
TAOS* tmq_get_connect(tmq_t* tmq) {
23✔
3538
  if (tmq && tmq->pTscObj) {
23!
3539
    return (TAOS*)(&(tmq->pTscObj->id));
23✔
3540
  }
3541
  return NULL;
×
3542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc