• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.7
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24
#include "tref.h"
25
#include "ttimer.h"
26

27
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}     while(0)
28
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}            while(0)
29
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30

31
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
32
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
33
#define DEFAULT_HEARTBEAT_INTERVAL     3000
34
#define DEFAULT_ASKEP_INTERVAL         1000
35
#define DEFAULT_COMMIT_CNT             1
36
#define SUBSCRIBE_RETRY_MAX_COUNT      240
37
#define SUBSCRIBE_RETRY_INTERVAL       500
38

39

40
#define SET_ERROR_MSG_TMQ(MSG) \
41
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
42

43
#define PROCESS_POLL_RSP(FUNC,DATA) \
44
  SDecoder decoder = {0}; \
45
  tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); \
46
  if (FUNC(&decoder, DATA) < 0) { \
47
    tDecoderClear(&decoder); \
48
    code = terrno; \
49
    goto END;\
50
  }\
51
  tDecoderClear(&decoder);\
52
  (void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead));
53

54
#define DELETE_POLL_RSP(FUNC,DATA) \
55
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
56
  taosMemoryFreeClear(pRsp->pEpset);\
57
  FUNC(DATA);
58

59
enum {
60
  TMQ_VG_STATUS__IDLE = 0,
61
  TMQ_VG_STATUS__WAIT,
62
};
63

64
enum {
65
  TMQ_CONSUMER_STATUS__INIT = 0,
66
  TMQ_CONSUMER_STATUS__READY,
67
  TMQ_CONSUMER_STATUS__CLOSED,
68
};
69

70
enum {
71
  TMQ_DELAYED_TASK__ASK_EP = 1,
72
  TMQ_DELAYED_TASK__COMMIT,
73
};
74

75
typedef struct {
76
  tmr_h   timer;
77
  int32_t rsetId;
78
} SMqMgmt;
79

80
struct tmq_list_t {
81
  SArray container;
82
};
83

84
struct tmq_conf_t {
85
  char           clientId[TSDB_CLIENT_ID_LEN];
86
  char           groupId[TSDB_CGROUP_LEN];
87
  int8_t         autoCommit;
88
  int8_t         resetOffset;
89
  int8_t         withTbName;
90
  int8_t         snapEnable;
91
  int8_t         replayEnable;
92
  int8_t         sourceExcluded;  // do not consume, bit
93
  uint16_t       port;
94
  int32_t        autoCommitInterval;
95
  int32_t        sessionTimeoutMs;
96
  int32_t        heartBeatIntervalMs;
97
  int32_t        maxPollIntervalMs;
98
  char*          ip;
99
  char*          user;
100
  char*          pass;
101
  tmq_commit_cb* commitCb;
102
  void*          commitCbUserParam;
103
  int8_t         enableBatchMeta;
104
};
105

106
struct tmq_t {
107
  int64_t        refId;
108
  char           groupId[TSDB_CGROUP_LEN];
109
  char           clientId[TSDB_CLIENT_ID_LEN];
110
  char           user[TSDB_USER_LEN];
111
  char           fqdn[TSDB_FQDN_LEN];
112
  int8_t         withTbName;
113
  int8_t         useSnapshot;
114
  int8_t         autoCommit;
115
  int32_t        autoCommitInterval;
116
  int32_t        sessionTimeoutMs;
117
  int32_t        heartBeatIntervalMs;
118
  int32_t        maxPollIntervalMs;
119
  int8_t         resetOffsetCfg;
120
  int8_t         replayEnable;
121
  int8_t         sourceExcluded;  // do not consume, bit
122
  int64_t        consumerId;
123
  tmq_commit_cb* commitCb;
124
  void*          commitCbUserParam;
125
  int8_t         enableBatchMeta;
126

127
  // status
128
  SRWLatch lock;
129
  int8_t   status;
130
  int32_t  epoch;
131
  // poll info
132
  int64_t pollCnt;
133
  int64_t totalRows;
134
  int8_t  pollFlag;
135

136
  // timer
137
  tmr_h       hbLiveTimer;
138
  tmr_h       epTimer;
139
  tmr_h       commitTimer;
140
  STscObj*    pTscObj;       // connection
141
  SArray*     clientTopics;  // SArray<SMqClientTopic>
142
  STaosQueue* mqueue;        // queue of rsp
143
  STaosQall*  qall;
144
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
145
  tsem2_t     rspSem;
146
};
147

148
typedef struct {
149
  int32_t code;
150
  tsem2_t sem;
151
} SAskEpInfo;
152

153
typedef struct {
154
  STqOffsetVal committedOffset;
155
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
156
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
157
  int64_t      walVerBegin;
158
  int64_t      walVerEnd;
159
} SVgOffsetInfo;
160

161
typedef struct {
162
  int64_t       pollCnt;
163
  int64_t       numOfRows;
164
  SVgOffsetInfo offsetInfo;
165
  int32_t       vgId;
166
  int32_t       vgStatus;
167
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
168
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
169
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
170
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
171
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
172
  SEpSet        epSet;
173
} SMqClientVg;
174

175
typedef struct {
176
  char           topicName[TSDB_TOPIC_FNAME_LEN];
177
  char           db[TSDB_DB_FNAME_LEN];
178
  SArray*        vgs;  // SArray<SMqClientVg>
179
  SSchemaWrapper schema;
180
  int8_t         noPrivilege;
181
} SMqClientTopic;
182

183
typedef struct {
184
  int32_t         vgId;
185
  char            topicName[TSDB_TOPIC_FNAME_LEN];
186
  SMqClientTopic* topicHandle;
187
  uint64_t        reqId;
188
  SEpSet*         pEpset;
189
  union {
190
    struct{
191
      SMqRspHead   head;
192
      STqOffsetVal rspOffset;
193
    };
194
    SMqDataRsp      dataRsp;
195
    SMqMetaRsp      metaRsp;
196
    SMqBatchMetaRsp batchMetaRsp;
197
  };
198
} SMqPollRspWrapper;
199

200
typedef struct {
201
  int32_t code;
202
  int8_t  tmqRspType;
203
  int32_t epoch;
204
  union{
205
    SMqPollRspWrapper pollRsp;
206
    SMqAskEpRsp       epRsp;
207
  };
208
} SMqRspWrapper;
209

210
typedef struct {
211
  tsem2_t rspSem;
212
  int32_t rspErr;
213
} SMqSubscribeCbParam;
214

215
typedef struct {
216
  int64_t refId;
217
  bool    sync;
218
  void*   pParam;
219
} SMqAskEpCbParam;
220

221
typedef struct {
222
  int64_t  refId;
223
  char     topicName[TSDB_TOPIC_FNAME_LEN];
224
  int32_t  vgId;
225
  uint64_t requestId;  // request id for debug purpose
226
} SMqPollCbParam;
227

228
typedef struct {
229
  tsem2_t       rsp;
230
  int32_t       numOfRsp;
231
  SArray*       pList;
232
  TdThreadMutex mutex;
233
  int64_t       consumerId;
234
  char*         pTopicName;
235
  int32_t       code;
236
} SMqVgCommon;
237

238
typedef struct {
239
  tsem2_t sem;
240
  int32_t code;
241
} SMqSeekParam;
242

243
typedef struct {
244
  tsem2_t     sem;
245
  int32_t     code;
246
  SMqVgOffset vgOffset;
247
} SMqCommittedParam;
248

249
typedef struct {
250
  int32_t      vgId;
251
  int32_t      epoch;
252
  int32_t      totalReq;
253
  SMqVgCommon* pCommon;
254
} SMqVgWalInfoParam;
255

256
typedef struct {
257
  int64_t        refId;
258
  int32_t        epoch;
259
  int32_t        waitingRspNum;
260
  int32_t        code;
261
  tmq_commit_cb* callbackFn;
262
  void*          userParam;
263
} SMqCommitCbParamSet;
264

265
typedef struct {
266
  SMqCommitCbParamSet* params;
267
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
268
  int32_t              vgId;
269
  int64_t              consumerId;
270
} SMqCommitCbParam;
271

272
typedef struct {
273
  tsem2_t sem;
274
  int32_t code;
275
} SSyncCommitInfo;
276

277
typedef struct {
278
  STqOffsetVal currentOffset;
279
  STqOffsetVal commitOffset;
280
  STqOffsetVal seekOffset;
281
  int64_t      numOfRows;
282
  int32_t      vgStatus;
283
} SVgroupSaveInfo;
284

285
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
286
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
287
static   SMqMgmt        tmqMgmt = {0};
288

289
tmq_conf_t* tmq_conf_new() {
408✔
290
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
408!
291
  if (conf == NULL) {
411!
292
    return conf;
×
293
  }
294

295
  conf->withTbName = false;
411✔
296
  conf->autoCommit = true;
411✔
297
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
411✔
298
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
411✔
299
  conf->enableBatchMeta = false;
411✔
300
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
411✔
301
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
411✔
302
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
411✔
303

304
  return conf;
411✔
305
}
306

307
void tmq_conf_destroy(tmq_conf_t* conf) {
411✔
308
  if (conf) {
411!
309
    if (conf->ip) {
411✔
310
      taosMemoryFree(conf->ip);
2!
311
    }
312
    if (conf->user) {
411✔
313
      taosMemoryFree(conf->user);
409!
314
    }
315
    if (conf->pass) {
411✔
316
      taosMemoryFree(conf->pass);
409!
317
    }
318
    taosMemoryFree(conf);
411!
319
  }
320
}
411✔
321

322
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
2,930✔
323
  int32_t code = 0;
2,930✔
324
  if (conf == NULL || key == NULL || value == NULL) {
2,930!
325
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
1!
326
    return TMQ_CONF_INVALID;
×
327
  }
328
  if (strcasecmp(key, "group.id") == 0) {
2,929✔
329
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
411✔
330
    return TMQ_CONF_OK;
411✔
331
  }
332

333
  if (strcasecmp(key, "client.id") == 0) {
2,518✔
334
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
56✔
335
    return TMQ_CONF_OK;
56✔
336
  }
337

338
  if (strcasecmp(key, "enable.auto.commit") == 0) {
2,462✔
339
    if (strcasecmp(value, "true") == 0) {
399✔
340
      conf->autoCommit = true;
182✔
341
      return TMQ_CONF_OK;
182✔
342
    } else if (strcasecmp(value, "false") == 0) {
217!
343
      conf->autoCommit = false;
217✔
344
      return TMQ_CONF_OK;
217✔
345
    } else {
346
      tqErrorC("invalid value for enable.auto.commit: %s", value);
×
347
      return TMQ_CONF_INVALID;
×
348
    }
349
  }
350

351
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
2,063✔
352
    int64_t tmp;
353
    code = taosStr2int64(value, &tmp);
249✔
354
    if (tmp < 0 || code != 0) {
249!
355
      tqErrorC("invalid value for auto.commit.interval.ms: %s", value);
×
356
      return TMQ_CONF_INVALID;
×
357
    }
358
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
249✔
359
    return TMQ_CONF_OK;
249✔
360
  }
361

362
  if (strcasecmp(key, "session.timeout.ms") == 0) {
1,814✔
363
    int64_t tmp;
364
    code = taosStr2int64(value, &tmp);
2✔
365
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
2!
366
      tqErrorC("invalid value for session.timeout.ms: %s", value);
×
367
      return TMQ_CONF_INVALID;
×
368
    }
369
    conf->sessionTimeoutMs = tmp;
2✔
370
    return TMQ_CONF_OK;
2✔
371
  }
372

373
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
1,812!
374
    int64_t tmp;
375
    code = taosStr2int64(value, &tmp);
×
376
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
×
377
      tqErrorC("invalid value for heartbeat.interval.ms: %s", value);
×
378
      return TMQ_CONF_INVALID;
×
379
    }
380
    conf->heartBeatIntervalMs = tmp;
×
381
    return TMQ_CONF_OK;
×
382
  }
383

384
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
1,812✔
385
    int32_t tmp;
386
    code = taosStr2int32(value, &tmp);
2✔
387
    if (tmp < 1000 || code != 0) {
2!
388
      tqErrorC("invalid value for max.poll.interval.ms: %s", value);
×
389
      return TMQ_CONF_INVALID;
×
390
    }
391
    conf->maxPollIntervalMs = tmp;
2✔
392
    return TMQ_CONF_OK;
2✔
393
  }
394

395
  if (strcasecmp(key, "auto.offset.reset") == 0) {
1,810✔
396
    if (strcasecmp(value, "none") == 0) {
411✔
397
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
11✔
398
      return TMQ_CONF_OK;
11✔
399
    } else if (strcasecmp(value, "earliest") == 0) {
400✔
400
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
378✔
401
      return TMQ_CONF_OK;
378✔
402
    } else if (strcasecmp(value, "latest") == 0) {
22!
403
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
22✔
404
      return TMQ_CONF_OK;
22✔
405
    } else {
406
      tqErrorC("invalid value for auto.offset.reset: %s", value);
×
407
      return TMQ_CONF_INVALID;
×
408
    }
409
  }
410

411
  if (strcasecmp(key, "msg.with.table.name") == 0) {
1,399✔
412
    if (strcasecmp(value, "true") == 0) {
389✔
413
      conf->withTbName = true;
369✔
414
      return TMQ_CONF_OK;
369✔
415
    } else if (strcasecmp(value, "false") == 0) {
20!
416
      conf->withTbName = false;
20✔
417
      return TMQ_CONF_OK;
20✔
418
    } else {
419
      tqErrorC("invalid value for msg.with.table.name: %s", value);
×
420
      return TMQ_CONF_INVALID;
×
421
    }
422
  }
423

424
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
1,010✔
425
    if (strcasecmp(value, "true") == 0) {
144✔
426
      conf->snapEnable = true;
111✔
427
      return TMQ_CONF_OK;
111✔
428
    } else if (strcasecmp(value, "false") == 0) {
33!
429
      conf->snapEnable = false;
33✔
430
      return TMQ_CONF_OK;
33✔
431
    } else {
432
      tqErrorC("invalid value for experimental.snapshot.enable: %s", value);
×
433
      return TMQ_CONF_INVALID;
×
434
    }
435
  }
436

437
  if (strcasecmp(key, "td.connect.ip") == 0) {
866✔
438
    void *tmp = taosStrdup(value);
2!
439
    if (tmp == NULL) {
2!
440
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
441
      return TMQ_CONF_INVALID;
×
442
    }
443
    conf->ip = tmp;
2✔
444
    return TMQ_CONF_OK;
2✔
445
  }
446

447
  if (strcasecmp(key, "td.connect.user") == 0) {
864✔
448
    void *tmp = taosStrdup(value);
408!
449
    if (tmp == NULL) {
407!
450
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
451
      return TMQ_CONF_INVALID;
×
452
    }
453
    conf->user = tmp;
407✔
454
    return TMQ_CONF_OK;
407✔
455
  }
456

457
  if (strcasecmp(key, "td.connect.pass") == 0) {
456✔
458
    void *tmp = taosStrdup(value);
408!
459
    if (tmp == NULL) {
408!
460
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
461
      return TMQ_CONF_INVALID;
×
462
    }
463
    conf->pass = tmp;
408✔
464
    return TMQ_CONF_OK;
408✔
465
  }
466

467
  if (strcasecmp(key, "td.connect.port") == 0) {
48!
468
    int64_t tmp;
469
    code = taosStr2int64(value, &tmp);
×
470
    if (tmp <= 0 || tmp > 65535 || code != 0) {
×
471
      tqErrorC("invalid value for td.connect.port: %s", value);
×
472
      return TMQ_CONF_INVALID;
×
473
    }
474

475
    conf->port = tmp;
×
476
    return TMQ_CONF_OK;
×
477
  }
478

479
  if (strcasecmp(key, "enable.replay") == 0) {
48✔
480
    if (strcasecmp(value, "true") == 0) {
7!
481
      conf->replayEnable = true;
7✔
482
      return TMQ_CONF_OK;
7✔
483
    } else if (strcasecmp(value, "false") == 0) {
×
484
      conf->replayEnable = false;
×
485
      return TMQ_CONF_OK;
×
486
    } else {
487
      tqErrorC("invalid value for enable.replay: %s", value);
×
488
      return TMQ_CONF_INVALID;
×
489
    }
490
  }
491
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
41✔
492
    int64_t tmp;
493
    code = taosStr2int64(value, &tmp);
40✔
494
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
40!
495
    return TMQ_CONF_OK;
40✔
496
  }
497

498
  if (strcasecmp(key, "td.connect.db") == 0) {
1!
499
    return TMQ_CONF_OK;
×
500
  }
501

502
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
1!
503
    int64_t tmp;
504
    code = taosStr2int64(value, &tmp);
4✔
505
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
4!
506
    return TMQ_CONF_OK;
4✔
507
  }
508

UNCOV
509
  tqErrorC("unknown key: %s", key);
×
510
  return TMQ_CONF_UNKNOWN;
×
511
}
512

513
tmq_list_t* tmq_list_new() {
1,204✔
514
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
1,204✔
515
}
516

517
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
482✔
518
  if (list == NULL) {
482!
519
    return TSDB_CODE_INVALID_PARA;
×
520
  }
521
  SArray* container = &list->container;
482✔
522
  if (src == NULL || src[0] == 0) {
482!
523
    return TSDB_CODE_INVALID_PARA;
×
524
  }
525
  char* topic = taosStrdup(src);
482!
526
  if (topic == NULL) return terrno;
482!
527
  if (taosArrayPush(container, &topic) == NULL) {
482!
528
    taosMemoryFree(topic);
×
529
    return terrno;
×
530
  }
531
  return 0;
482✔
532
}
533

534
void tmq_list_destroy(tmq_list_t* list) {
1,204✔
535
  if (list == NULL) return;
1,204!
536
  SArray* container = &list->container;
1,204✔
537
  taosArrayDestroyP(container, NULL);
1,204✔
538
}
539

540
int32_t tmq_list_get_size(const tmq_list_t* list) {
7✔
541
  if (list == NULL) {
7!
542
    return TSDB_CODE_INVALID_PARA;
×
543
  }
544
  const SArray* container = &list->container;
7✔
545
  return taosArrayGetSize(container);
7✔
546
}
547

548
char** tmq_list_to_c_array(const tmq_list_t* list) {
7✔
549
  if (list == NULL) {
7!
550
    return NULL;
×
551
  }
552
  const SArray* container = &list->container;
7✔
553
  return container->pData;
7✔
554
}
555

556
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
10,638✔
557
  int64_t refId = pParamSet->refId;
10,638✔
558
  int32_t code = 0;
10,638✔
559
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
10,638✔
560
  if (tmq == NULL) {
10,638!
561
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
562
  }
563

564
  // if no more waiting rsp
565
  if (pParamSet->callbackFn != NULL) {
10,638✔
566
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
10,619✔
567
  }
568

569
  taosMemoryFree(pParamSet);
10,638!
570
  if (tmq != NULL) {
10,638!
571
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
10,638✔
572
  }
573

574
  return code;
10,638✔
575
}
576

577
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
14,059✔
578
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
14,059✔
579
  if (waitingRspNum == 0) {
14,059✔
580
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
10,638!
581
             vgId);
582
    return tmqCommitDone(pParamSet);
10,638✔
583
  } else {
584
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
3,421!
585
             waitingRspNum);
586
  }
587
  return 0;
3,421✔
588
}
589

590
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
3,456✔
591
  if (pBuf){
3,456!
592
    taosMemoryFreeClear(pBuf->pData);
3,456!
593
    taosMemoryFreeClear(pBuf->pEpSet);
3,456!
594
  }
595
  if(param == NULL){
3,456!
596
    return TSDB_CODE_INVALID_PARA;
×
597
  }
598
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
3,456✔
599
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
3,456✔
600

601
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
3,456✔
602
}
603

604
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
3,456✔
605
                               SMqCommitCbParamSet* pParamSet) {
606
  SMqVgOffset pOffset = {0};
3,456✔
607

608
  pOffset.consumerId = tmq->consumerId;
3,456✔
609
  pOffset.offset.val = *offset;
3,456✔
610
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
3,456✔
611
  int32_t len = 0;
3,456✔
612
  int32_t code = 0;
3,456✔
613
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
3,456!
614
  if (code < 0) {
3,456!
615
    return TSDB_CODE_INVALID_PARA;
×
616
  }
617

618
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
3,456!
619
  if (buf == NULL) {
3,456!
620
    return terrno;
×
621
  }
622

623
  ((SMsgHead*)buf)->vgId = htonl(vgId);
3,456✔
624

625
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
3,456✔
626

627
  SEncoder encoder = {0};
3,456✔
628
  tEncoderInit(&encoder, abuf, len);
3,456✔
629
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
3,456!
630
    tEncoderClear(&encoder);
×
631
    taosMemoryFree(buf);
×
632
    return TSDB_CODE_INVALID_PARA;
×
633
  }
634
  tEncoderClear(&encoder);
3,456✔
635

636
  // build param
637
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
3,456!
638
  if (pParam == NULL) {
3,456!
639
    taosMemoryFree(buf);
×
640
    return terrno;
×
641
  }
642

643
  pParam->params = pParamSet;
3,456✔
644
  pParam->vgId = vgId;
3,456✔
645
  pParam->consumerId = tmq->consumerId;
3,456✔
646

647
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
3,456✔
648

649
  // build send info
650
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,456!
651
  if (pMsgSendInfo == NULL) {
3,456!
652
    taosMemoryFree(buf);
×
653
    taosMemoryFree(pParam);
×
654
    return terrno;
×
655
  }
656

657
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
3,456✔
658

659
  pMsgSendInfo->requestId = generateRequestId();
3,456✔
660
  pMsgSendInfo->requestObjRefId = 0;
3,456✔
661
  pMsgSendInfo->param = pParam;
3,456✔
662
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
3,456✔
663
  pMsgSendInfo->fp = tmqCommitCb;
3,456✔
664
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
3,456✔
665

666
  // int64_t transporterId = 0;
667
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
3,456✔
668
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
3,456✔
669
  if (code != 0) {
3,456!
670
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
671
  }
672
  return code;
3,456✔
673
}
674

675
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
192✔
676
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
192✔
677
  for (int32_t i = 0; i < numOfTopics; ++i) {
193✔
678
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
117✔
679
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
117!
680
      continue;
1✔
681
    }
682
    *topic = pTopic;
116✔
683
    return 0;
116✔
684
  }
685

686
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
76!
687
  return TSDB_CODE_TMQ_INVALID_TOPIC;
76✔
688
}
689

690
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
10,648✔
691
                                       SMqCommitCbParamSet** ppParamSet) {
692
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
10,648!
693
  if (pParamSet == NULL) {
10,648!
694
    return terrno;
×
695
  }
696

697
  pParamSet->refId = tmq->refId;
10,648✔
698
  pParamSet->epoch = tmq->epoch;
10,648✔
699
  pParamSet->callbackFn = pCommitFp;
10,648✔
700
  pParamSet->userParam = userParam;
10,648✔
701
  pParamSet->waitingRspNum = rspNum;
10,648✔
702
  *ppParamSet = pParamSet;
10,648✔
703
  return 0;
10,648✔
704
}
705

706
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
184✔
707
  SMqClientTopic* pTopic = NULL;
184✔
708
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
184✔
709
  if (code != 0) {
184✔
710
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
76!
711
    return code;
76✔
712
  }
713

714
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
108✔
715
  for (int32_t i = 0; i < numOfVgs; ++i) {
170✔
716
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
161✔
717
    if (pClientVg && pClientVg->vgId == vgId) {
161!
718
      *pVg = pClientVg;
99✔
719
      break;
99✔
720
    }
721
  }
722

723
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
108✔
724
}
725

726
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
18,892✔
727
  int32_t code = 0;
18,892✔
728
  if (offsetVal->type <= 0) {
18,892✔
729
    code = TSDB_CODE_TMQ_INVALID_MSG;
971✔
730
    return code;
971✔
731
  }
732
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
17,921✔
733
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
14,465✔
734
    return code;
14,465✔
735
  }
736
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
3,456✔
737
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
3,456✔
738

739
  char commitBuf[TSDB_OFFSET_LEN] = {0};
3,456✔
740
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
3,456✔
741

742
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
3,456✔
743
  if (code != TSDB_CODE_SUCCESS) {
3,456!
744
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
745
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
746
    return code;
×
747
  }
748

749
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
3,456!
750
          tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
751
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
3,456✔
752
  return code;
3,456✔
753
}
754

755
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
45✔
756
                                 tmq_commit_cb* pCommitFp, void* userParam) {
757
  tqInfoC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
45!
758
  SMqCommitCbParamSet* pParamSet = NULL;
45✔
759
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
45✔
760
  if (code != 0){
45!
761
    return code;
×
762
  }
763

764
  taosRLockLatch(&tmq->lock);
45✔
765
  SMqClientVg* pVg = NULL;
45✔
766
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
45✔
767
  if (code == 0) {
45!
768
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
45✔
769
  }
770
  taosRUnLockLatch(&tmq->lock);
45✔
771

772
  if (code != 0){
45✔
773
    taosMemoryFree(pParamSet);
10!
774
  }
775
  return code;
45✔
776
}
777

778
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
29✔
779
  char*        pTopicName = NULL;
29✔
780
  int32_t      vgId = 0;
29✔
781
  STqOffsetVal offsetVal = {0};
29✔
782
  int32_t      code = 0;
29✔
783

784
  if (pRes == NULL || tmq == NULL) {
29!
785
    code = TSDB_CODE_INVALID_PARA;
×
786
    goto end;
×
787
  }
788

789
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_META(pRes) ||
29!
790
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
791
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
29✔
792
    pTopicName = pRspObj->topic;
29✔
793
    vgId = pRspObj->vgId;
29✔
794
    offsetVal = pRspObj->rspOffset;
29✔
795
  } else {
796
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
797
    goto end;
×
798
  }
799

800
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
29✔
801

802
end:
29✔
803
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
29!
804
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
805
    pCommitFp(tmq, code, userParam);
×
806
  }
807
}
29✔
808

809
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
10,603✔
810
  int32_t code = 0;
10,603✔
811
  taosRLockLatch(&tmq->lock);
10,603✔
812
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
10,603✔
813
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
10,603!
814

815
  for (int32_t i = 0; i < numOfTopics; i++) {
21,089✔
816
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
10,486✔
817
    if (pTopic == NULL) {
10,486!
818
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
819
      goto END;
×
820
    }
821
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
10,486✔
822
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
10,486!
823
    for (int32_t j = 0; j < numOfVgroups; j++) {
29,333✔
824
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
18,847✔
825
      if (pVg == NULL) {
18,847!
826
        code = terrno;
×
827
        goto END;
×
828
      }
829

830
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
18,847✔
831
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
18,847✔
832
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
971!
833
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
834
      }
835
    }
836
  }
837
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
10,603!
838
           numOfTopics);
839
END:
371✔
840
  taosRUnLockLatch(&tmq->lock);
10,603✔
841
  return code;
10,603✔
842
}
843

844
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
10,603✔
845
  int32_t code = 0;
10,603✔
846
  SMqCommitCbParamSet* pParamSet = NULL;
10,603✔
847
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
848
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
10,603✔
849
  if (code != 0) {
10,603!
850
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
851
    if (pCommitFp != NULL) {
×
852
      pCommitFp(tmq, code, userParam);
×
853
    }
854
    return;
×
855
  }
856
  code = innerCommitAll(tmq, pParamSet);
10,603✔
857
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
10,603✔
858
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
145!
859
  }
860

861
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
10,603✔
862
  if (code != 0) {
10,603!
863
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
864
  }
865
  return;
10,603✔
866
}
867

868
static void generateTimedTask(int64_t refId, int32_t type) {
14,592✔
869
  tmq_t*  tmq = NULL;
14,592✔
870
  int8_t* pTaskType = NULL;
14,592✔
871
  int32_t code = 0;
14,592✔
872

873
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
14,592✔
874
  if (tmq == NULL) return;
14,592!
875

876
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
14,592✔
877
  if (code == TSDB_CODE_SUCCESS) {
14,592!
878
    *pTaskType = type;
14,592✔
879
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
14,592!
880
      if (tsem2_post(&tmq->rspSem) != 0){
14,592!
881
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
882
      }
883
    }else{
884
      taosFreeQitem(pTaskType);
×
885
    }
886
  }
887

888
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
14,592✔
889
  if (code != 0){
14,592!
890
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
891
  }
892
}
893

894
void tmqAssignAskEpTask(void* param, void* tmrId) {
4,513✔
895
  int64_t refId = (int64_t)param;
4,513✔
896
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
4,513✔
897
}
4,513✔
898

899
void tmqReplayTask(void* param, void* tmrId) {
12✔
900
  int64_t refId = (int64_t)param;
12✔
901
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
12✔
902
  if (tmq == NULL) return;
12!
903

904
  if (tsem2_post(&tmq->rspSem) != 0){
12!
905
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
906
  }
907
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
12✔
908
  if (code != 0){
12!
909
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
910
  }
911
}
912

913
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
10,079✔
914
  int64_t refId = (int64_t)param;
10,079✔
915
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
10,079✔
916
}
10,079✔
917

918
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
2,474✔
919
  if (pMsg == NULL) {
2,474!
920
    return TSDB_CODE_INVALID_PARA;
×
921
  }
922

923
  if (param == NULL || code != 0){
2,474!
924
    goto END;
31✔
925
  }
926

927
  SMqHbRsp rsp = {0};
2,443✔
928
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
2,443✔
929
  if (code != 0) {
2,443!
930
    goto END;
×
931
  }
932

933
  int64_t refId = (int64_t)param;
2,443✔
934
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,443✔
935
  if (tmq != NULL) {
2,443✔
936
    taosWLockLatch(&tmq->lock);
2,442✔
937
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
4,676✔
938
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
2,234✔
939
      if (privilege && privilege->noPrivilege == 1) {
2,234!
940
        int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
×
941
        for (int32_t j = 0; j < topicNumCur; j++) {
×
942
          SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
×
943
          if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
×
944
            tqInfoC("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
×
945
            pTopicCur->noPrivilege = 1;
×
946
          }
947
        }
948
      }
949
    }
950
    taosWUnLockLatch(&tmq->lock);
2,442✔
951
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
2,442✔
952
    if (code != 0){
2,442!
953
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
954
    }
955
  }
956

957
  tqClientDebugFlag = rsp.debugFlag;
2,443✔
958

959
  tDestroySMqHbRsp(&rsp);
2,443✔
960

961
END:
2,474✔
962
  taosMemoryFree(pMsg->pData);
2,474!
963
  taosMemoryFree(pMsg->pEpSet);
2,474!
964
  return code;
2,474✔
965
}
966

967
void tmqSendHbReq(void* param, void* tmrId) {
2,474✔
968
  int64_t refId = (int64_t)param;
2,474✔
969

970
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,474✔
971
  if (tmq == NULL) {
2,474!
972
    return;
×
973
  }
974

975
  SMqHbReq req = {0};
2,474✔
976
  req.consumerId = tmq->consumerId;
2,474✔
977
  req.epoch = tmq->epoch;
2,474✔
978
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
2,474✔
979
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
2,474!
980
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
2,474✔
981
  if (req.topics == NULL) {
2,474!
982
    goto END;
×
983
  }
984
  taosRLockLatch(&tmq->lock);
2,474✔
985
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
4,763✔
986
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
2,289✔
987
    if (pTopic == NULL) {
2,289!
988
      continue;
×
989
    }
990
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
2,289✔
991
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
2,289✔
992
    if (data == NULL) {
2,289!
993
      continue;
×
994
    }
995
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
2,289✔
996
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
2,289✔
997
    if (data->offsetRows == NULL) {
2,289!
998
      continue;
×
999
    }
1000
    for (int j = 0; j < numOfVgroups; j++) {
7,795✔
1001
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
5,506✔
1002
      if (pVg == NULL) {
5,506!
1003
        continue;
×
1004
      }
1005
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
5,506✔
1006
      if (offRows == NULL) {
5,506!
1007
        continue;
×
1008
      }
1009
      offRows->vgId = pVg->vgId;
5,506✔
1010
      offRows->rows = pVg->numOfRows;
5,506✔
1011
      offRows->offset = pVg->offsetInfo.endOffset;
5,506✔
1012
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
5,506✔
1013
      char buf[TSDB_OFFSET_LEN] = {0};
5,506✔
1014
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
5,506✔
1015
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
5,506!
1016
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1017
    }
1018
  }
1019
  taosRUnLockLatch(&tmq->lock);
2,474✔
1020

1021
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
2,474✔
1022
  if (tlen < 0) {
2,474!
1023
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1024
    goto END;
×
1025
  }
1026

1027
  void* pReq = taosMemoryCalloc(1, tlen);
2,474!
1028
  if (pReq == NULL) {
2,474!
1029
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1030
    goto END;
×
1031
  }
1032

1033
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
2,474!
1034
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1035
    taosMemoryFree(pReq);
×
1036
    goto END;
×
1037
  }
1038

1039
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,474!
1040
  if (sendInfo == NULL) {
2,474!
1041
    taosMemoryFree(pReq);
×
1042
    goto END;
×
1043
  }
1044

1045
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2,474✔
1046

1047
  sendInfo->requestId = generateRequestId();
2,474✔
1048
  sendInfo->requestObjRefId = 0;
2,474✔
1049
  sendInfo->param = (void*)refId;
2,474✔
1050
  sendInfo->fp = tmqHbCb;
2,474✔
1051
  sendInfo->msgType = TDMT_MND_TMQ_HB;
2,474✔
1052

1053
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
2,474✔
1054

1055
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
2,474✔
1056
  if (code != 0) {
2,474!
1057
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1058
  }
1059
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
2,474✔
1060

1061
END:
2,474✔
1062
  tDestroySMqHbReq(&req);
2,474✔
1063
  if (tmrId != NULL) {
2,474✔
1064
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
1,706✔
1065
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
1,706!
1066
  }
1067
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
2,474✔
1068
  if (ret != 0){
2,474!
1069
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1070
  }
1071
}
1072

1073
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
581✔
1074
  if (code != 0) {
581!
1075
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1076
  }
1077
}
581✔
1078

1079
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
51,794✔
1080
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
51,794✔
1081
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
6,847✔
1082
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
44,947✔
1083
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
44,728!
1084
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
219✔
1085
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
12!
1086
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
207✔
1087
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
188!
1088
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
19!
1089
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
19!
1090
  }
1091
}
51,794✔
1092

1093
static void freeClientVg(void* param) {
4,088✔
1094
  SMqClientVg* pVg = param;
4,088✔
1095
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
4,088✔
1096
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
4,088✔
1097
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
4,088✔
1098
}
4,088✔
1099
static void freeClientTopic(void* param) {
3,286✔
1100
  SMqClientTopic* pTopic = param;
3,286✔
1101
  taosMemoryFreeClear(pTopic->schema.pSchema);
3,286!
1102
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
3,286✔
1103
}
3,286✔
1104

1105
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
3,287✔
1106
                                   tmq_t* tmq) {
1107
  pTopic->schema = pTopicEp->schema;
3,287✔
1108
  pTopicEp->schema.nCols = 0;
3,287✔
1109
  pTopicEp->schema.pSchema = NULL;
3,287✔
1110

1111
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
3,287✔
1112
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
3,287✔
1113

1114
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
3,287✔
1115
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
3,287✔
1116

1117
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
3,287!
1118
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
3,287✔
1119
  if (pTopic->vgs == NULL) {
3,287!
1120
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1121
    return;
×
1122
  }
1123
  for (int32_t j = 0; j < vgNumGet; j++) {
7,376✔
1124
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
4,089✔
1125
    if (pVgEp == NULL) {
4,089!
1126
      continue;
×
1127
    }
1128
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
4,089✔
1129
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
4,089✔
1130

1131
    STqOffsetVal offsetNew = {0};
4,089✔
1132
    offsetNew.type = tmq->resetOffsetCfg;
4,089✔
1133

1134
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
4,089!
1135
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1136

1137
    SMqClientVg clientVg = {
12,267✔
1138
        .pollCnt = 0,
1139
        .vgId = pVgEp->vgId,
4,089✔
1140
        .epSet = pVgEp->epSet,
1141
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
4,089✔
1142
        .vgSkipCnt = 0,
1143
        .emptyBlockReceiveTs = 0,
1144
        .blockReceiveTs = 0,
1145
        .blockSleepForReplay = 0,
1146
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
4,089✔
1147
    };
1148

1149
    clientVg.offsetInfo.walVerBegin = -1;
4,089✔
1150
    clientVg.offsetInfo.walVerEnd = -1;
4,089✔
1151
    clientVg.seekUpdated = false;
4,089✔
1152
    if (pInfo) {
4,089✔
1153
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
2,859✔
1154
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
2,859✔
1155
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
2,859✔
1156
    } else {
1157
      clientVg.offsetInfo.endOffset = offsetNew;
1,230✔
1158
      clientVg.offsetInfo.committedOffset = offsetNew;
1,230✔
1159
      clientVg.offsetInfo.beginOffset = offsetNew;
1,230✔
1160
    }
1161
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
8,178!
1162
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1163
               pTopic->topicName);
1164
      freeClientVg(&clientVg);
×
1165
    }
1166
  }
1167
}
1168

1169
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
3,202✔
1170
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
3,202✔
1171
  if (pVgOffsetHashMap == NULL) {
3,202!
1172
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1173
    return;
×
1174
  }
1175

1176
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
3,202✔
1177
  for (int32_t i = 0; i < topicNumCur; i++) {
6,023✔
1178
    // find old topic
1179
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
2,821✔
1180
    if (pTopicCur && pTopicCur->vgs) {
2,821!
1181
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
2,821✔
1182
      tqInfoC("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
2,821!
1183
      for (int32_t j = 0; j < vgNumCur; j++) {
5,691✔
1184
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
2,870✔
1185
        if (pVgCur == NULL) {
2,870!
1186
          continue;
×
1187
        }
1188
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
2,870✔
1189
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
2,870✔
1190

1191
        char buf[TSDB_OFFSET_LEN] = {0};
2,870✔
1192
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
2,870✔
1193
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
2,870!
1194

1195
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
2,870✔
1196
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1197
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1198
            .numOfRows = pVgCur->numOfRows,
2,870✔
1199
            .vgStatus = pVgCur->vgStatus};
2,870✔
1200
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
2,870!
1201
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1202
        }
1203
      }
1204
    }
1205
  }
1206

1207
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
6,489✔
1208
    SMqClientTopic topic = {0};
3,287✔
1209
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
3,287✔
1210
    if (pTopicEp == NULL) {
3,287!
1211
      continue;
×
1212
    }
1213
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
3,287✔
1214
    if (taosArrayPush(newTopics, &topic) == NULL) {
3,287!
1215
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1216
      freeClientTopic(&topic);
×
1217
    }
1218
  }
1219

1220
  taosHashCleanup(pVgOffsetHashMap);
3,202✔
1221
}
1222

1223
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
8,031✔
1224
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
8,031✔
1225
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1226
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1227
  if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
8,031✔
1228
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
4,424!
1229
             tmq->epoch, epoch, topicNumGet);
1230
    return;
4,424✔
1231
  }
1232

1233
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
3,607✔
1234
  if (newTopics == NULL) {
3,607!
1235
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1236
    return;
×
1237
  }
1238
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
3,607!
1239
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1240

1241
  taosWLockLatch(&tmq->lock);
3,607✔
1242
  if (topicNumGet > 0){
3,607✔
1243
    buildNewTopicList(tmq, newTopics, pRsp);
3,202✔
1244
  }
1245
  // destroy current buffered existed topics info
1246
  if (tmq->clientTopics) {
3,607!
1247
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
3,607✔
1248
  }
1249
  tmq->clientTopics = newTopics;
3,607✔
1250
  taosWUnLockLatch(&tmq->lock);
3,607✔
1251

1252
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
3,607✔
1253
  atomic_store_32(&tmq->epoch, epoch);
3,607✔
1254

1255
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
3,607!
1256
}
1257

1258
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
10,381✔
1259
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
10,381✔
1260
  if (pParam == NULL) {
10,381!
1261
    goto FAIL;
×
1262
  }
1263

1264
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
10,381✔
1265
  if (tmq == NULL) {
10,382!
1266
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1267
    goto FAIL;
×
1268
  }
1269

1270
  if (code != TSDB_CODE_SUCCESS) {
10,382✔
1271
    if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
2,342✔
1272
      tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
33!
1273
    }
1274
    goto END;
2,342✔
1275
  }
1276

1277
  if (pMsg == NULL) {
8,040!
1278
    goto END;
×
1279
  }
1280
  SMqRspHead* head = pMsg->pData;
8,040✔
1281
  int32_t     epoch = atomic_load_32(&tmq->epoch);
8,040✔
1282
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
8,040!
1283
  if (pParam->sync) {
8,040✔
1284
    SMqAskEpRsp rsp = {0};
1,193✔
1285
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
2,386!
1286
      doUpdateLocalEp(tmq, head->epoch, &rsp);
1,193✔
1287
    }
1288
    tDeleteSMqAskEpRsp(&rsp);
1289
  } else {
1290
    SMqRspWrapper* pWrapper = NULL;
6,847✔
1291
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
6,847✔
1292
    if (code) {
6,847!
1293
      goto END;
×
1294
    }
1295

1296
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
6,847✔
1297
    pWrapper->epoch = head->epoch;
6,847✔
1298
    (void)memcpy(&pWrapper->epRsp, pMsg->pData, sizeof(SMqRspHead));
6,847✔
1299
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->epRsp) == NULL) {
13,694!
1300
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1301
      taosFreeQitem(pWrapper);
×
1302
    } else {
1303
      code = taosWriteQitem(tmq->mqueue, pWrapper);
6,847✔
1304
      if (code != 0) {
6,847!
1305
        tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1306
        taosFreeQitem(pWrapper);
×
1307
        tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1308
      }
1309
    }
1310
  }
1311

1312
  END:
10,382✔
1313
  {
1314
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
10,382✔
1315
    if (ret != 0){
10,382!
1316
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1317
    }
1318
  }
1319

1320
  FAIL:
10,382✔
1321
  if (pParam && pParam->sync) {
10,382!
1322
    SAskEpInfo* pInfo = pParam->pParam;
3,510✔
1323
    if (pInfo) {
3,510!
1324
      pInfo->code = code;
3,510✔
1325
      if (tsem2_post(&pInfo->sem) != 0){
3,510!
1326
        tqErrorC("failed to post rsp sem askep cb");
×
1327
      }
1328
    }
1329
  }
1330

1331
  if (pMsg) {
10,382!
1332
    taosMemoryFree(pMsg->pEpSet);
10,382!
1333
    taosMemoryFree(pMsg->pData);
10,382!
1334
  }
1335

1336
  return code;
10,382✔
1337
}
1338

1339
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
10,382✔
1340
  SMqAskEpReq req = {0};
10,382✔
1341
  req.consumerId = pTmq->consumerId;
10,382✔
1342
  req.epoch = updateEpSet ? -1 : pTmq->epoch;
10,382✔
1343
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
10,382✔
1344
  int              code = 0;
10,382✔
1345
  SMqAskEpCbParam* pParam = NULL;
10,382✔
1346
  void*            pReq = NULL;
10,382✔
1347

1348
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
10,382✔
1349
  if (tlen < 0) {
10,382!
1350
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
×
1351
    return TSDB_CODE_INVALID_PARA;
×
1352
  }
1353

1354
  pReq = taosMemoryCalloc(1, tlen);
10,382!
1355
  if (pReq == NULL) {
10,382!
1356
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
×
1357
    return terrno;
×
1358
  }
1359

1360
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
10,382!
1361
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
×
1362
    taosMemoryFree(pReq);
×
1363
    return TSDB_CODE_INVALID_PARA;
×
1364
  }
1365

1366
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
10,382!
1367
  if (pParam == NULL) {
10,382!
1368
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
×
1369
    taosMemoryFree(pReq);
×
1370
    return terrno;
×
1371
  }
1372

1373
  pParam->refId = pTmq->refId;
10,382✔
1374
  pParam->sync = sync;
10,382✔
1375
  pParam->pParam = param;
10,382✔
1376

1377
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
10,382!
1378
  if (sendInfo == NULL) {
10,382!
1379
    taosMemoryFree(pReq);
×
1380
    taosMemoryFree(pParam);
×
1381
    return terrno;
×
1382
  }
1383

1384
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
10,382✔
1385
  sendInfo->requestId = generateRequestId();
10,382✔
1386
  sendInfo->requestObjRefId = 0;
10,382✔
1387
  sendInfo->param = pParam;
10,382✔
1388
  sendInfo->paramFreeFp = taosAutoMemoryFree;
10,382✔
1389
  sendInfo->fp = askEpCb;
10,382✔
1390
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
10,382✔
1391

1392
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
10,382✔
1393
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
10,382!
1394
  return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
10,382✔
1395
}
1396

1397
void tmqHandleAllDelayedTask(tmq_t* pTmq) {
90,272✔
1398
  STaosQall* qall = NULL;
90,272✔
1399
  int32_t    code = 0;
90,272✔
1400

1401
  code = taosAllocateQall(&qall);
90,272✔
1402
  if (code) {
90,274!
1403
    tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
×
1404
    return;
76,463✔
1405
  }
1406

1407
  int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
90,274✔
1408
  if (numOfItems == 0) {
90,274✔
1409
    taosFreeQall(qall);
76,463✔
1410
    return;
76,463✔
1411
  }
1412

1413
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
13,811!
1414
  int8_t* pTaskType = NULL;
13,811✔
1415
  while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
27,907✔
1416
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
14,096✔
1417
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
4,137!
1418
      code = askEp(pTmq, NULL, false, false);
4,137✔
1419
      if (code != 0) {
4,137!
1420
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1421
        continue;
×
1422
      }
1423
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
4,137!
1424
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
4,137✔
1425
                              &pTmq->epTimer);
1426
      tqDebugC("reset timer for tmq ask ep:%d", ret);
4,137!
1427
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
9,959!
1428
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
9,959✔
1429
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
9,959✔
1430
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
9,959!
1431
               pTmq->autoCommitInterval / 1000.0);
1432
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
9,959✔
1433
                              &pTmq->commitTimer);
1434
      tqDebugC("reset timer for commit:%d", ret);
9,959!
1435
    } else {
1436
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1437
    }
1438

1439
    taosFreeQitem(pTaskType);
14,096✔
1440
  }
1441

1442
  taosFreeQall(qall);
13,811✔
1443
}
1444

1445
void tmqClearUnhandleMsg(tmq_t* tmq) {
410✔
1446
  SMqRspWrapper* rspWrapper = NULL;
410✔
1447
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
419✔
1448
    tmqFreeRspWrapper(rspWrapper);
9✔
1449
    taosFreeQitem(rspWrapper);
9✔
1450
  }
1451

1452
  rspWrapper = NULL;
410✔
1453
  if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
410✔
1454
    return;
89✔
1455
  }
1456
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
1,233✔
1457
    tmqFreeRspWrapper(rspWrapper);
912✔
1458
    taosFreeQitem(rspWrapper);
912✔
1459
  }
1460
}
1461

1462
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
1,205✔
1463
  if (pMsg) {
1,205!
1464
    taosMemoryFreeClear(pMsg->pEpSet);
1,205!
1465
  }
1466

1467
  if (param == NULL) {
1,205!
1468
    return code;
×
1469
  }
1470

1471
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
1,205✔
1472
  pParam->rspErr = code;
1,205✔
1473

1474
  if (tsem2_post(&pParam->rspSem) != 0){
1,205!
1475
    tqErrorC("failed to post sem, subscribe cb");
×
1476
  }
1477
  return 0;
1,205✔
1478
}
1479

1480
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
25✔
1481
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
25!
1482
  if (*topics == NULL) {
25✔
1483
    *topics = tmq_list_new();
18✔
1484
    if (*topics == NULL) {
18!
1485
      return terrno;
×
1486
    }
1487
  }
1488
  taosRLockLatch(&tmq->lock);
25✔
1489
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
37✔
1490
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
12✔
1491
    if (topic == NULL) {
12!
1492
      tqErrorC("topic is null");
×
1493
      continue;
×
1494
    }
1495
    char* tmp = strchr(topic->topicName, '.');
12✔
1496
    if (tmp == NULL) {
12!
1497
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1498
      continue;
×
1499
    }
1500
    if (tmq_list_append(*topics, tmp + 1) != 0) {
12!
1501
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1502
      continue;
×
1503
    }
1504
  }
1505
  taosRUnLockLatch(&tmq->lock);
25✔
1506
  return 0;
25✔
1507
}
1508

1509
void tmqFreeImpl(void* handle) {
410✔
1510
  tmq_t*  tmq = (tmq_t*)handle;
410✔
1511
  int64_t id = tmq->consumerId;
410✔
1512

1513
  if (tmq->mqueue) {
410!
1514
    tmqClearUnhandleMsg(tmq);
410✔
1515
    taosCloseQueue(tmq->mqueue);
410✔
1516
  }
1517

1518
  if (tmq->delayedTask) {
410!
1519
    taosCloseQueue(tmq->delayedTask);
410✔
1520
  }
1521

1522
  taosFreeQall(tmq->qall);
410✔
1523
  if(tsem2_destroy(&tmq->rspSem) != 0) {
410!
1524
    tqErrorC("failed to destroy sem in free tmq");
×
1525
  }
1526

1527
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
410✔
1528
  taos_close_internal(tmq->pTscObj);
410✔
1529

1530
  if (tmq->commitTimer) {
410✔
1531
    if (!taosTmrStopA(&tmq->commitTimer)) {
185✔
1532
      tqErrorC("failed to stop commit timer");
120!
1533
    }
1534
  }
1535
  if (tmq->epTimer) {
410✔
1536
    if (!taosTmrStopA(&tmq->epTimer)) {
405✔
1537
      tqErrorC("failed to stop ep timer");
375!
1538
    }
1539
  }
1540
  if (tmq->hbLiveTimer) {
410✔
1541
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
409!
1542
      tqErrorC("failed to stop hb timer");
×
1543
    }
1544
  }
1545
  taosMemoryFree(tmq);
410!
1546

1547
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
410!
1548
}
410✔
1549

1550
static void tmqMgmtInit(void) {
296✔
1551
  tmqInitRes = 0;
296✔
1552
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
296✔
1553

1554
  if (tmqMgmt.timer == NULL) {
296!
1555
    tmqInitRes = terrno;
×
1556
  }
1557

1558
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
296✔
1559
  if (tmqMgmt.rsetId < 0) {
296!
1560
    tmqInitRes = terrno;
×
1561
  }
1562
}
296✔
1563

1564
void tmqMgmtClose(void) {
2,014✔
1565
  if (tmqMgmt.timer) {
2,014✔
1566
    taosTmrCleanUp(tmqMgmt.timer);
296✔
1567
    tmqMgmt.timer = NULL;
296✔
1568
  }
1569

1570
  if (tmqMgmt.rsetId >= 0) {
2,014!
1571
    taosCloseRef(tmqMgmt.rsetId);
2,014✔
1572
    tmqMgmt.rsetId = -1;
2,014✔
1573
  }
1574
}
2,014✔
1575

1576
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
399✔
1577
  int32_t code = 0;
399✔
1578

1579
  if (conf == NULL) {
399!
1580
    SET_ERROR_MSG_TMQ("configure is null")
×
1581
    return NULL;
×
1582
  }
1583
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
399✔
1584
  if (code != 0) {
411!
1585
    SET_ERROR_MSG_TMQ("tmq init error")
×
1586
    return NULL;
×
1587
  }
1588
  if (tmqInitRes != 0) {
411!
1589
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1590
    return NULL;
×
1591
  }
1592

1593
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
411!
1594
  if (pTmq == NULL) {
411!
1595
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1596
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1597
    return NULL;
×
1598
  }
1599

1600
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
411✔
1601
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
411✔
1602

1603
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
411✔
1604
  if (pTmq->clientTopics == NULL) {
411!
1605
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1606
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1607
    goto _failed;
×
1608
  }
1609
  code = taosOpenQueue(&pTmq->mqueue);
411✔
1610
  if (code) {
411!
1611
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1612
             pTmq->groupId);
1613
    SET_ERROR_MSG_TMQ("open queue failed")
×
1614
    goto _failed;
×
1615
  }
1616

1617
  code = taosOpenQueue(&pTmq->delayedTask);
411✔
1618
  if (code) {
411!
1619
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1620
             pTmq->groupId);
1621
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1622
    goto _failed;
×
1623
  }
1624

1625
  code = taosAllocateQall(&pTmq->qall);
411✔
1626
  if (code) {
411✔
1627
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
2!
1628
             pTmq->groupId);
1629
    SET_ERROR_MSG_TMQ("allocate qall failed")
×
1630
    goto _failed;
×
1631
  }
1632

1633
  if (conf->groupId[0] == 0) {
409!
1634
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1635
             pTmq->groupId);
1636
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1637
    goto _failed;
×
1638
  }
1639

1640
  // init status
1641
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
409✔
1642
  pTmq->pollCnt = 0;
409✔
1643
  pTmq->epoch = 0;
409✔
1644
  pTmq->pollFlag = 0;
409✔
1645

1646
  // set conf
1647
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
409✔
1648
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
409✔
1649
  pTmq->withTbName = conf->withTbName;
409✔
1650
  pTmq->useSnapshot = conf->snapEnable;
409✔
1651
  pTmq->autoCommit = conf->autoCommit;
409✔
1652
  pTmq->autoCommitInterval = conf->autoCommitInterval;
409✔
1653
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
409✔
1654
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
409✔
1655
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
409✔
1656
  pTmq->commitCb = conf->commitCb;
409✔
1657
  pTmq->commitCbUserParam = conf->commitCbUserParam;
409✔
1658
  pTmq->resetOffsetCfg = conf->resetOffset;
409✔
1659
  pTmq->replayEnable = conf->replayEnable;
409✔
1660
  pTmq->sourceExcluded = conf->sourceExcluded;
409✔
1661
  pTmq->enableBatchMeta = conf->enableBatchMeta;
409✔
1662
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
409✔
1663
  if (taosGetFqdn(pTmq->fqdn) != 0) {
409!
1664
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1665
  }
1666
  if (conf->replayEnable) {
411✔
1667
    pTmq->autoCommit = false;
7✔
1668
  }
1669
  taosInitRWLatch(&pTmq->lock);
411✔
1670

1671
  // assign consumerId
1672
  pTmq->consumerId = tGenIdPI64();
411✔
1673

1674
  // init semaphore
1675
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
411!
1676
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1677
             tstrerror(TAOS_SYSTEM_ERROR(errno)), pTmq->groupId);
1678
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1679
    goto _failed;
×
1680
  }
1681

1682
  // init connection
1683
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
407✔
1684
  if (code) {
411✔
1685
    terrno = code;
1✔
1686
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1!
1687
    SET_ERROR_MSG_TMQ("init tscObj failed")
1!
1688
    goto _failed;
1✔
1689
  }
1690

1691
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
410✔
1692
  if (pTmq->refId < 0) {
410!
1693
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1694
    goto _failed;
×
1695
  }
1696

1697
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
410✔
1698
  if (pTmq->hbLiveTimer == NULL) {
410!
1699
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1700
    goto _failed;
×
1701
  }
1702
  char         buf[TSDB_OFFSET_LEN] = {0};
410✔
1703
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
410✔
1704
  tFormatOffset(buf, tListLen(buf), &offset);
410✔
1705
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
410!
1706
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
1707
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1708
          buf);
1709

1710
  return pTmq;
410✔
1711

1712
_failed:
1✔
1713
  tmqFreeImpl(pTmq);
1✔
1714
  return NULL;
1✔
1715
}
1716

1717
static int32_t syncAskEp(tmq_t* pTmq) {
3,510✔
1718
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
3,510!
1719
  if (pInfo == NULL) return terrno;
3,510!
1720
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
3,510!
1721
    taosMemoryFree(pInfo);
×
1722
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1723
  }
1724

1725
  int32_t code = askEp(pTmq, pInfo, true, false);
3,510✔
1726
  if (code == 0) {
3,510!
1727
    if (tsem2_wait(&pInfo->sem) != 0){
3,510!
1728
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1729
    }
1730
    code = pInfo->code;
3,510✔
1731
  }
1732

1733
  if(tsem2_destroy(&pInfo->sem) != 0) {
3,510!
1734
    tqErrorC("failed to destroy sem sync ask ep");
×
1735
  }
1736
  taosMemoryFree(pInfo);
3,510!
1737
  return code;
3,510✔
1738
}
1739

1740
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1,206✔
1741
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
1,206!
1742
  const SArray*   container = &topic_list->container;
1,205✔
1743
  int32_t         sz = taosArrayGetSize(container);
1,205✔
1744
  void*           buf = NULL;
1,205✔
1745
  SMsgSendInfo*   sendInfo = NULL;
1,205✔
1746
  SCMSubscribeReq req = {0};
1,205✔
1747
  int32_t         code = 0;
1,205✔
1748

1749
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
1,205!
1750

1751
  req.consumerId = tmq->consumerId;
1,205✔
1752
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
1,205✔
1753
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1,205✔
1754
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
1,205✔
1755
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
1,205✔
1756

1757
  req.topicNames = taosArrayInit(sz, sizeof(void*));
1,205✔
1758
  if (req.topicNames == NULL) {
1,205!
1759
    code = terrno;
×
1760
    goto END;
×
1761
  }
1762

1763
  req.withTbName = tmq->withTbName;
1,205✔
1764
  req.autoCommit = tmq->autoCommit;
1,205✔
1765
  req.autoCommitInterval = tmq->autoCommitInterval;
1,205✔
1766
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
1,205✔
1767
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
1,205✔
1768
  req.resetOffsetCfg = tmq->resetOffsetCfg;
1,205✔
1769
  req.enableReplay = tmq->replayEnable;
1,205✔
1770
  req.enableBatchMeta = tmq->enableBatchMeta;
1,205✔
1771

1772
  for (int32_t i = 0; i < sz; i++) {
1,701✔
1773
    char* topic = taosArrayGetP(container, i);
496✔
1774
    if (topic == NULL) {
496!
1775
      code = terrno;
×
1776
      goto END;
×
1777
    }
1778
    SName name = {0};
496✔
1779
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
496✔
1780
    if (code) {
496!
1781
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1782
               code);
1783
      goto END;
×
1784
    }
1785
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
496!
1786
    if (topicFName == NULL) {
496!
1787
      code = terrno;
×
1788
      goto END;
×
1789
    }
1790

1791
    code = tNameExtractFullName(&name, topicFName);
496✔
1792
    if (code) {
496!
1793
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1794
               code);
1795
      taosMemoryFree(topicFName);
×
1796
      goto END;
×
1797
    }
1798

1799
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
992!
1800
      code = terrno;
×
1801
      taosMemoryFree(topicFName);
×
1802
      goto END;
×
1803
    }
1804
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
496!
1805
  }
1806

1807
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1,205✔
1808
  buf = taosMemoryMalloc(tlen);
1,205!
1809
  if (buf == NULL) {
1,205!
1810
    code = terrno;
×
1811
    goto END;
×
1812
  }
1813

1814
  void* abuf = buf;
1,205!
1815
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
1,205✔
1816

1817
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,205!
1818
  if (sendInfo == NULL) {
1,205!
1819
    code = terrno;
×
1820
    taosMemoryFree(buf);
×
1821
    goto END;
×
1822
  }
1823

1824
  SMqSubscribeCbParam param = {.rspErr = 0};
1,205✔
1825
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
1,205!
1826
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1827
    taosMemoryFree(buf);
×
1828
    taosMemoryFree(sendInfo);
×
1829
    goto END;
×
1830
  }
1831

1832
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1,205✔
1833
  sendInfo->requestId = generateRequestId();
1,205✔
1834
  sendInfo->requestObjRefId = 0;
1,205✔
1835
  sendInfo->param = &param;
1,205✔
1836
  sendInfo->fp = tmqSubscribeCb;
1,205✔
1837
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
1,205✔
1838

1839
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1,205✔
1840

1841
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
1,205✔
1842
  if (code != 0) {
1,205!
1843
    goto END;
×
1844
  }
1845

1846
  if (tsem2_wait(&param.rspSem) != 0){
1,205!
1847
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1848
  }
1849
  if(tsem2_destroy(&param.rspSem) != 0) {
1,205!
1850
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1851
  }
1852

1853
  if (param.rspErr != 0) {
1,205✔
1854
    code = param.rspErr;
4✔
1855
    goto END;
4✔
1856
  }
1857

1858
  int32_t retryCnt = 0;
1,201✔
1859
  while ((code = syncAskEp(tmq)) != 0) {
3,510✔
1860
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2,317!
1861
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
8!
1862
               tmq->consumerId, tstrerror(code));
1863
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
8!
1864
        code = 0;
8✔
1865
      }
1866
      goto END;
8✔
1867
    }
1868

1869
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
2,309!
1870
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
2,309✔
1871
  }
1872

1873
  if (tmq->epTimer == NULL){
1,193✔
1874
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
406✔
1875
    if (tmq->epTimer == NULL) {
406!
1876
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1877
      goto END;
×
1878
    }
1879
  }
1880
  if (tmq->autoCommit && tmq->commitTimer == NULL){
1,193✔
1881
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
185✔
1882
    if (tmq->commitTimer == NULL) {
185!
1883
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1884
      goto END;
×
1885
    }
1886
  }
1887

1888
END:
1,193✔
1889
  taosArrayDestroyP(req.topicNames, NULL);
1,205✔
1890
  return code;
1,205✔
1891
}
1892

1893
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
338✔
1894
  if (conf == NULL) return;
338!
1895
  conf->commitCb = cb;
338✔
1896
  conf->commitCbUserParam = param;
338✔
1897
}
1898

1899
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
44,035✔
1900
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
44,035✔
1901
  for (int i = 0; i < topicNumCur; i++) {
45,578✔
1902
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
45,567✔
1903
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
45,567!
1904
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
44,035✔
1905
      for (int32_t j = 0; j < vgNumCur; j++) {
89,994✔
1906
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
89,983✔
1907
        if (pVgCur && pVgCur->vgId == vgId) {
89,983!
1908
          *pVg = pVgCur;
44,024✔
1909
          return;
44,024✔
1910
        }
1911
      }
1912
    }
1913
  }
1914
}
1915

1916
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
40,080✔
1917
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
40,080✔
1918
  for (int i = 0; i < topicNumCur; i++) {
41,611!
1919
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
41,611✔
1920
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
41,611✔
1921
      return pTopicCur;
40,080✔
1922
    }
1923
  }
1924
  return NULL;
×
1925
}
1926

1927
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
44,947✔
1928
  tmq_t*             tmq = NULL;
44,947✔
1929
  SMqRspWrapper*     pRspWrapper = NULL;
44,947✔
1930
  int8_t             rspType = 0;
44,947✔
1931
  int32_t            vgId = 0;
44,947✔
1932
  uint64_t           requestId = 0;
44,947✔
1933
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
44,947✔
1934
  if (pMsg == NULL) {
44,947!
1935
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1936
  }
1937
  if (pParam == NULL) {
44,947!
1938
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1939
    goto EXIT;
×
1940
  }
1941
  int64_t refId = pParam->refId;
44,947✔
1942
  vgId = pParam->vgId;
44,947✔
1943
  requestId = pParam->requestId;
44,947✔
1944
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
44,947✔
1945
  if (tmq == NULL) {
44,948!
1946
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1947
    goto EXIT;
×
1948
  }
1949

1950
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
44,948✔
1951
  if (ret) {
44,947!
1952
    code = ret;
×
1953
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
×
1954
    goto END;
×
1955
  }
1956

1957
  if (code != 0) {
44,947✔
1958
    goto END;
4,028✔
1959
  }
1960

1961
  if (pMsg->pData == NULL) {
40,919!
1962
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
1963
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1964
    goto END;
×
1965
  }
1966

1967
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
40,919✔
1968
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
40,919✔
1969

1970
  if (msgEpoch != clientEpoch) {
40,919✔
1971
    tqErrorC("consumer:0x%" PRIx64
34!
1972
             " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1973
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
1974
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
34✔
1975
    goto END;
34✔
1976
  }
1977
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
40,885✔
1978
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, requestId);
40,885!
1979
  if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
40,885✔
1980
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
40,666!
1981
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
219✔
1982
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
188!
1983
  } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
31✔
1984
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
12!
1985
  } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
19!
1986
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
19!
1987
  } else {  // invalid rspType
1988
    tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
×
1989
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1990
    goto END;
×
1991
  }
1992
  pRspWrapper->tmqRspType = rspType;
40,878✔
1993
  pRspWrapper->pollRsp.reqId = requestId;
40,878✔
1994
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
40,878✔
1995
  pMsg->pEpSet = NULL;
40,878✔
1996

1997
END:
44,940✔
1998
  if (pRspWrapper) {
44,940!
1999
    pRspWrapper->code = code;
44,942✔
2000
    pRspWrapper->pollRsp.vgId = vgId;
44,942✔
2001
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
44,942✔
2002
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
44,942✔
2003
    if (code != 0) {
44,948!
2004
      tmqFreeRspWrapper(pRspWrapper);
×
2005
      taosFreeQitem(pRspWrapper);
×
2006
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2007
    } else {
2008
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
44,948!
2009
               tmq ? tmq->consumerId : 0, rspType, vgId, taosQueueItemSize(tmq->mqueue), requestId);
2010
    }
2011
  }
2012

2013

2014
  if (tsem2_post(&tmq->rspSem) != 0){
44,946!
2015
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2016
  }
2017
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
44,948✔
2018
  if (ret != 0){
44,948!
2019
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2020
  }
2021

2022
EXIT:
44,948✔
2023
  taosMemoryFreeClear(pMsg->pData);
44,948!
2024
  taosMemoryFreeClear(pMsg->pEpSet);
44,948!
2025
  return code;
44,948✔
2026
}
2027

2028
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
44,951✔
2029
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
44,951✔
2030
  pReq->withTbName = tmq->withTbName;
44,951✔
2031
  pReq->consumerId = tmq->consumerId;
44,951✔
2032
  pReq->timeout = timeout;
44,951✔
2033
  pReq->epoch = tmq->epoch;
44,951✔
2034
  pReq->reqOffset = pVg->offsetInfo.endOffset;
44,951✔
2035
  pReq->head.vgId = pVg->vgId;
44,951✔
2036
  pReq->useSnapshot = tmq->useSnapshot;
44,951✔
2037
  pReq->reqId = generateRequestId();
44,951✔
2038
  pReq->enableReplay = tmq->replayEnable;
44,951✔
2039
  pReq->sourceExcluded = tmq->sourceExcluded;
44,951✔
2040
  pReq->enableBatchMeta = tmq->enableBatchMeta;
44,951✔
2041
}
44,951✔
2042

2043
void changeByteEndian(char* pData) {
349,628✔
2044
  if (pData == NULL) {
349,628!
2045
    return;
×
2046
  }
2047
  char* p = pData;
349,628✔
2048

2049
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2050
  // length | version:
2051
  int32_t blockVersion = *(int32_t*)p;
349,628✔
2052
  if (blockVersion != BLOCK_VERSION_1) {
349,628!
2053
    tqErrorC("invalid block version:%d", blockVersion);
×
2054
    return;
×
2055
  }
2056
  *(int32_t*)p = BLOCK_VERSION_2;
349,628✔
2057

2058
  p += sizeof(int32_t);
349,628✔
2059
  p += sizeof(int32_t);
349,628✔
2060
  p += sizeof(int32_t);
349,628✔
2061
  int32_t cols = *(int32_t*)p;
349,628✔
2062
  p += sizeof(int32_t);
349,628✔
2063
  p += sizeof(int32_t);
349,628✔
2064
  p += sizeof(uint64_t);
349,628✔
2065
  // check fields
2066
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
349,628✔
2067

2068
  int32_t* colLength = (int32_t*)p;
349,628✔
2069
  for (int32_t i = 0; i < cols; ++i) {
2,244,238✔
2070
    colLength[i] = htonl(colLength[i]);
1,894,610✔
2071
  }
2072
}
2073

2074
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
683,121✔
2075
  if (pRetrieve == NULL) {
683,121!
2076
    return;
×
2077
  }
2078
  if (*(int64_t*)pRetrieve == 0) {
683,121!
2079
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2080
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2081
    if (precision != NULL) {
×
2082
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2083
    }
2084
  } else if (*(int64_t*)pRetrieve == 1) {
683,121!
2085
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
683,123✔
2086
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
683,123✔
2087
    if (precision != NULL) {
683,123✔
2088
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
333,531✔
2089
    }
2090
  }
2091
}
2092

2093
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
31,154✔
2094
                                        SMqRspObj* pRspObj) {
2095
  pRspObj->resIter = -1;
31,154✔
2096
  pRspObj->resInfo.totalRows = 0;
31,154✔
2097
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
31,154✔
2098

2099
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
31,154✔
2100
  bool needTransformSchema = !pDataRsp->withSchema;
31,154✔
2101
  if (!pDataRsp->withSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
31,154✔
2102
    pDataRsp->withSchema = true;
28,915✔
2103
    pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
28,915✔
2104
    if (pDataRsp->blockSchema == NULL) {
28,915!
2105
      tqErrorC("failed to allocate memory for blockSchema");
×
2106
      return;
×
2107
    }
2108
  }
2109
  // extract the rows in this data packet
2110
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
380,785✔
2111
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
349,631✔
2112
    void*   rawData = NULL;
349,637✔
2113
    int64_t rows = 0;
349,637✔
2114
    // deal with compatibility
2115
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
349,637✔
2116

2117
    pVg->numOfRows += rows;
349,633✔
2118
    (*numOfRows) += rows;
349,633✔
2119
    changeByteEndian(rawData);
349,633✔
2120
    if (needTransformSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
349,622✔
2121
      SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
230,412!
2122
      if (schema) {
230,425✔
2123
        if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
460,832!
2124
          tqErrorC("failed to push schema into blockSchema");
×
2125
          continue;
×
2126
        }
2127
      }
2128
    }
2129
  }
2130
}
2131

2132
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
44,948✔
2133
  SMqPollReq      req = {0};
44,948✔
2134
  char*           msg = NULL;
44,948✔
2135
  SMqPollCbParam* pParam = NULL;
44,948✔
2136
  SMsgSendInfo*   sendInfo = NULL;
44,948✔
2137
  int             code = 0;
44,948✔
2138
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
44,948✔
2139

2140
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
44,948✔
2141
  if (msgSize < 0) {
44,947!
2142
    code = TSDB_CODE_INVALID_MSG;
×
2143
    return code;
×
2144
  }
2145

2146
  msg = taosMemoryCalloc(1, msgSize);
44,947!
2147
  if (NULL == msg) {
44,947!
2148
    return terrno;
×
2149
  }
2150

2151
  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
44,947!
2152
    code = TSDB_CODE_INVALID_MSG;
×
2153
    taosMemoryFreeClear(msg);
×
2154
    return code;
×
2155
  }
2156

2157
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
44,947!
2158
  if (pParam == NULL) {
44,948!
2159
    code = terrno;
×
2160
    taosMemoryFreeClear(msg);
×
2161
    return code;
×
2162
  }
2163

2164
  pParam->refId = pTmq->refId;
44,948✔
2165
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
44,948✔
2166
  pParam->vgId = pVg->vgId;
44,948✔
2167
  pParam->requestId = req.reqId;
44,948✔
2168

2169
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
44,948!
2170
  if (sendInfo == NULL) {
44,948!
2171
    taosMemoryFreeClear(pParam);
×
2172
    taosMemoryFreeClear(msg);
×
2173
    return terrno;
×
2174
  }
2175

2176
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
44,948✔
2177
  sendInfo->requestId = req.reqId;
44,948✔
2178
  sendInfo->requestObjRefId = 0;
44,948✔
2179
  sendInfo->param = pParam;
44,948✔
2180
  sendInfo->paramFreeFp = taosAutoMemoryFree;
44,948✔
2181
  sendInfo->fp = tmqPollCb;
44,948✔
2182
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
44,948✔
2183

2184
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
44,948✔
2185
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
44,948✔
2186
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
44,948✔
2187
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
44,948!
2188
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2189
  if (code != 0) {
44,948!
2190
    return code;
×
2191
  }
2192

2193
  pVg->pollCnt++;
44,948✔
2194
  pVg->seekUpdated = false;  // reset this flag.
44,948✔
2195
  pTmq->pollCnt++;
44,948✔
2196

2197
  return 0;
44,948✔
2198
}
2199

2200
// broadcast the poll request to all related vnodes
2201
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
90,274✔
2202
  int32_t code = 0;
90,274✔
2203

2204
  taosWLockLatch(&tmq->lock);
90,274✔
2205
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
90,274✔
2206
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
90,274!
2207

2208
  for (int i = 0; i < numOfTopics; i++) {
184,183✔
2209
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
93,909✔
2210
    if (pTopic == NULL) {
93,908!
2211
      continue;
×
2212
    }
2213
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
93,908✔
2214
    if (pTopic->noPrivilege) {
93,908!
2215
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
×
2216
      continue;
×
2217
    }
2218
    for (int j = 0; j < numOfVg; j++) {
356,666✔
2219
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
262,757✔
2220
      if (pVg == NULL) {
262,757!
2221
        continue;
×
2222
      }
2223
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
262,757✔
2224
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than 10ms
262,757!
2225
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
23,256!
2226
                 tmq->epoch, pVg->vgId);
2227
        continue;
23,256✔
2228
      }
2229

2230
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
239,501✔
2231
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
239,501!
2232
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
22!
2233
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2234
        continue;
22✔
2235
      }
2236

2237
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
239,479✔
2238
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
239,479✔
2239
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
194,532✔
2240
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
194,532!
2241
                 pVg->vgId, vgSkipCnt);
2242
        continue;
194,532✔
2243
      }
2244

2245
      atomic_store_32(&pVg->vgSkipCnt, 0);
44,947✔
2246
      code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
44,948✔
2247
      if (code != TSDB_CODE_SUCCESS) {
44,948!
2248
        goto end;
×
2249
      }
2250
    }
2251
  }
2252

2253
end:
90,274✔
2254
  taosWUnLockLatch(&tmq->lock);
90,274✔
2255
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
90,274!
2256
  return code;
90,274✔
2257
}
2258

2259
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
40,080✔
2260
                         int64_t consumerId, bool hasData) {
2261
  if (!pVg->seekUpdated) {
40,080✔
2262
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
40,078!
2263
    if (hasData) {
40,078✔
2264
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
31,361✔
2265
    }
2266
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
40,078✔
2267
  } else {
2268
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
2!
2269
  }
2270

2271
  // update the status
2272
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
40,080✔
2273

2274
  // update the valid wal version range
2275
  pVg->offsetInfo.walVerBegin = sver;
40,080✔
2276
  pVg->offsetInfo.walVerEnd = ever + 1;
40,080✔
2277
}
40,080✔
2278

2279
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
31,361✔
2280
  typedef union {
2281
    SMqDataRsp      dataRsp;
2282
    SMqMetaRsp      metaRsp;
2283
    SMqBatchMetaRsp batchMetaRsp;
2284
  } MEMSIZE;
2285

2286
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
31,361!
2287
  if (pRspObj == NULL) {
31,361!
2288
    tqErrorC("buildRsp:failed to allocate memory");
×
2289
    return NULL;
×
2290
  }
2291
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
31,361✔
2292
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
31,361✔
2293
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
31,361✔
2294
  pRspObj->vgId = pollRspWrapper->vgId;
31,361✔
2295
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
31,361✔
2296
  return pRspObj;
31,361✔
2297
}
2298

2299
static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
3,955✔
2300
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
3,955✔
2301

2302
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
3,955✔
2303
    int32_t code = askEp(tmq, NULL, false, true);
2,733✔
2304
    if (code != 0) {
2,733!
2305
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2306
    }
2307
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1,222✔
2308
    int32_t code = askEp(tmq, NULL, false, false);
2✔
2309
    if (code != 0) {
2!
2310
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2311
    }
2312
  }
2313
  tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
3,955!
2314
          tstrerror(pRspWrapper->code));
2315
  taosWLockLatch(&tmq->lock);
3,955✔
2316
  SMqClientVg* pVg = NULL;
3,955✔
2317
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
3,955✔
2318
  if (pVg) {
3,955✔
2319
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
3,944✔
2320
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
3,944✔
2321
  }
2322
  taosWUnLockLatch(&tmq->lock);
3,955✔
2323
}
3,955✔
2324
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
46,918✔
2325
  SMqRspObj* pRspObj = NULL;
46,918✔
2326

2327
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
46,918✔
2328
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
6,838!
2329
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
6,838✔
2330
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
6,838✔
2331
    return pRspObj;
6,838✔
2332
  }
2333

2334
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
40,080✔
2335
  taosWLockLatch(&tmq->lock);
40,080✔
2336
  SMqClientVg* pVg = NULL;
40,080✔
2337
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
40,080✔
2338
  if(pVg == NULL) {
40,080!
UNCOV
2339
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
×
2340
             pollRspWrapper->topicName, pollRspWrapper->vgId);
UNCOV
2341
    goto END;
×
2342
  }
2343
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
40,080✔
2344
  if (pollRspWrapper->pEpset != NULL) {
40,080✔
2345
    pVg->epSet = *pollRspWrapper->pEpset;
22✔
2346
  }
2347

2348
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
79,953✔
2349
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
39,873✔
2350
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
39,873✔
2351

2352
    char buf[TSDB_OFFSET_LEN] = {0};
39,873✔
2353
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
39,873✔
2354
    if (pollRspWrapper->dataRsp.blockNum == 0) {
39,873✔
2355
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
8,719!
2356
                   ", total:%" PRId64 ",QID:0x%" PRIx64,
2357
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2358
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
17,438✔
2359
    } else {
2360
      pRspObj = buildRsp(pollRspWrapper);
31,154✔
2361
      if (pRspObj == NULL) {
31,154!
2362
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2363
        goto END;
×
2364
      }
2365
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA;
31,154✔
2366
      int64_t numOfRows = 0;
31,154✔
2367
      tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
31,154✔
2368
      tmq->totalRows += numOfRows;
31,154✔
2369
      pVg->emptyBlockReceiveTs = 0;
31,154✔
2370
      if (tmq->replayEnable) {
31,154✔
2371
        pVg->blockReceiveTs = taosGetTimestampMs();
27✔
2372
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
27✔
2373
        if (pVg->blockSleepForReplay > 0) {
27✔
2374
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
12!
2375
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2376
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2377
          }
2378
        }
2379
      }
2380
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
31,154!
2381
                   ", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64,
2382
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2383
               pollRspWrapper->reqId);
2384
    }
2385
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
207!
2386
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
207✔
2387
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2388

2389

2390
    pRspObj = buildRsp(pollRspWrapper);
207✔
2391
    if (pRspObj == NULL) {
207!
2392
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2393
      goto END;
×
2394
    }
2395
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
207✔
2396
  }
2397

2398
  END:
×
2399
  taosWUnLockLatch(&tmq->lock);
40,080✔
2400
  return pRspObj;
40,080✔
2401
}
2402

2403
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
90,274✔
2404
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
90,274!
2405

2406
  void* returnVal = NULL;
90,274✔
2407
  while (1) {
19,512✔
2408
    SMqRspWrapper* pRspWrapper = NULL;
109,786✔
2409
    if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
109,786✔
2410
      if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
102,315✔
2411
        return NULL;
58,913✔
2412
      }
2413
      if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
43,402!
2414
        return NULL;
×
2415
      }
2416
    }
2417

2418
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
50,873!
2419
    if (pRspWrapper->code != 0) {
50,873✔
2420
      processMqRspError(tmq, pRspWrapper);
3,955✔
2421
    }else{
2422
      returnVal = processMqRsp(tmq, pRspWrapper);
46,918✔
2423
    }
2424
    tmqFreeRspWrapper(pRspWrapper);
50,873✔
2425
    taosFreeQitem(pRspWrapper);
50,873✔
2426
    if(returnVal != NULL){
50,873✔
2427
      break;
31,361✔
2428
    }
2429
  }
2430

2431
  return returnVal;
31,361✔
2432
}
2433

2434
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
31,613✔
2435
  if (tmq == NULL) return NULL;
31,613✔
2436

2437
  void*   rspObj = NULL;
31,612✔
2438
  int64_t startTime = taosGetTimestampMs();
31,612✔
2439

2440
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
31,612!
2441
           timeout);
2442

2443
  // in no topic status, delayed task also need to be processed
2444
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
31,612!
2445
    tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
×
2446
    taosMsleep(500);  //     sleep for a while
×
2447
    return NULL;
×
2448
  }
2449

2450
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
31,612✔
2451

2452
  while (1) {
2453
    tmqHandleAllDelayedTask(tmq);
90,273✔
2454

2455
    if (tmqPollImpl(tmq, timeout) < 0) {
90,274!
2456
      tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
×
2457
    }
2458

2459
    rspObj = tmqHandleAllRsp(tmq, timeout);
90,274✔
2460
    if (rspObj) {
90,274✔
2461
      tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
31,361!
2462
      return (TAOS_RES*)rspObj;
31,361✔
2463
    }
2464

2465
    if (timeout >= 0) {
58,913!
2466
      int64_t currentTime = taosGetTimestampMs();
58,913✔
2467
      int64_t elapsedTime = currentTime - startTime;
58,913✔
2468
      if (elapsedTime > timeout || elapsedTime < 0) {
58,913!
2469
        tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
251!
2470
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
2471
        return NULL;
251✔
2472
      }
2473
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
58,662✔
2474
    } else {
2475
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2476
    }
2477
  }
2478
}
2479

2480
static void displayConsumeStatistics(tmq_t* pTmq) {
774✔
2481
  taosRLockLatch(&pTmq->lock);
774✔
2482
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
774✔
2483
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
774!
2484
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2485

2486
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
774!
2487
  for (int32_t i = 0; i < numOfTopics; ++i) {
1,239✔
2488
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
465✔
2489
    if (pTopics == NULL) continue;
465!
2490
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
465!
2491
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
465✔
2492
    for (int32_t j = 0; j < numOfVgs; ++j) {
1,683✔
2493
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
1,218✔
2494
      if (pVg == NULL) continue;
1,218!
2495
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
1,218!
2496
    }
2497
  }
2498
  taosRUnLockLatch(&pTmq->lock);
774✔
2499
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
774!
2500
}
774✔
2501

2502
int32_t tmq_unsubscribe(tmq_t* tmq) {
774✔
2503
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
774!
2504
  int32_t code = 0;
774✔
2505
  int8_t status = atomic_load_8(&tmq->status);
774✔
2506
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
774!
2507

2508
  displayConsumeStatistics(tmq);
774✔
2509
  if (status != TMQ_CONSUMER_STATUS__READY) {
774✔
2510
    tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
6!
2511
    goto END;
6✔
2512
  }
2513
  if (tmq->autoCommit) {
768✔
2514
    code = tmq_commit_sync(tmq, NULL);
344✔
2515
    if (code != 0) {
344!
2516
       goto END;
×
2517
    }
2518
  }
2519
  tmqSendHbReq((void*)(tmq->refId), NULL);
768✔
2520

2521
  tmq_list_t* lst = tmq_list_new();
768✔
2522
  if (lst == NULL) {
768!
2523
    code = terrno;
×
2524
    goto END;
×
2525
  }
2526
  code = tmq_subscribe(tmq, lst);
768✔
2527
  tmq_list_destroy(lst);
768✔
2528
  if(code != 0){
768!
2529
    goto END;
×
2530
  }
2531

2532
END:
768✔
2533
  return code;
774✔
2534
}
2535

2536
int32_t tmq_consumer_close(tmq_t* tmq) {
410✔
2537
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
410✔
2538
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
409!
2539
  int32_t code = tmq_unsubscribe(tmq);
409✔
2540
  if (code == 0) {
409!
2541
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
409✔
2542
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
409✔
2543
    if (code != 0){
409!
2544
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2545
    }
2546
  }
2547
  return code;
409✔
2548
}
2549

2550
const char* tmq_err2str(int32_t err) {
238✔
2551
  if (err == 0) {
238✔
2552
    return "success";
221✔
2553
  } else if (err == -1) {
17!
2554
    return "fail";
×
2555
  } else {
2556
    if (*(taosGetErrMsg()) == 0) {
17✔
2557
      return tstrerror(err);
4✔
2558
    } else {
2559
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
13✔
2560
      return (const char*)taosGetErrMsgReturn();
13✔
2561
    }
2562
  }
2563
}
2564

2565
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
31,347✔
2566
  if (res == NULL) {
31,347✔
2567
    return TMQ_RES_INVALID;
22✔
2568
  }
2569
  if (TD_RES_TMQ(res)) {
31,325✔
2570
    return TMQ_RES_DATA;
31,124✔
2571
  } else if (TD_RES_TMQ_META(res)) {
201✔
2572
    return TMQ_RES_TABLE_META;
161✔
2573
  } else if (TD_RES_TMQ_METADATA(res)) {
40✔
2574
    return TMQ_RES_METADATA;
23✔
2575
  } else if (TD_RES_TMQ_BATCH_META(res)) {
17!
2576
    return TMQ_RES_TABLE_META;
17✔
2577
  } else {
2578
    return TMQ_RES_INVALID;
×
2579
  }
2580
}
2581

2582
const char* tmq_get_topic_name(TAOS_RES* res) {
30,824✔
2583
  if (res == NULL) {
30,824✔
2584
    return NULL;
22✔
2585
  }
2586
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
30,802✔
2587
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
176!
2588
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
30,802✔
2589
    if (tmp == NULL) {
30,802!
2590
      return NULL;
×
2591
    }
2592
    return tmp + 1;
30,802✔
2593
  } else {
2594
    return NULL;
×
2595
  }
2596
}
2597

2598
const char* tmq_get_db_name(TAOS_RES* res) {
30,817✔
2599
  if (res == NULL) {
30,817✔
2600
    return NULL;
15✔
2601
  }
2602

2603
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
30,802✔
2604
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
176!
2605
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
30,802✔
2606
    if (tmp == NULL) {
30,802!
2607
      return NULL;
×
2608
    }
2609
    return tmp + 1;
30,802✔
2610
  } else {
2611
    return NULL;
×
2612
  }
2613
}
2614

2615
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
30,812✔
2616
  if (res == NULL) {
30,812✔
2617
    return TSDB_CODE_INVALID_PARA;
10✔
2618
  }
2619
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
30,802✔
2620
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
176!
2621
    return ((SMqRspObj*)res)->vgId;
30,802✔
2622
  } else {
2623
    return TSDB_CODE_INVALID_PARA;
×
2624
  }
2625
}
2626

2627
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
427✔
2628
  if (res == NULL) {
427✔
2629
    return TSDB_CODE_INVALID_PARA;
19✔
2630
  }
2631
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
408!
2632
    SMqRspObj* pRspObj = (SMqRspObj*)res;
402✔
2633
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
402✔
2634
    if (pOffset->type == TMQ_OFFSET__LOG) {
402!
2635
      return pOffset->version;
402✔
2636
    } else {
2637
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2638
    }
2639
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
6!
2640
    SMqRspObj* pRspObj = (SMqRspObj*)res;
6✔
2641
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
6!
2642
      return pRspObj->rspOffset.version;
6✔
2643
    }
2644
  } else {
2645
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2646
  }
2647

2648
  // data from tsdb, no valid offset info
2649
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2650
}
2651

2652
const char* tmq_get_table_name(TAOS_RES* res) {
14,674,106✔
2653
  if (res == NULL) {
14,674,106✔
2654
    return NULL;
15✔
2655
  }
2656
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
14,674,091✔
2657
    SMqRspObj* pRspObj = (SMqRspObj*)res;
14,674,089✔
2658
    SMqDataRsp* data = &pRspObj->dataRsp;
14,674,089✔
2659
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
14,674,089!
2660
        pRspObj->resIter >= data->blockNum) {
3,892,418✔
2661
      return NULL;
10,781,679✔
2662
    }
2663
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
3,892,410✔
2664
  }
2665
  return NULL;
2✔
2666
}
2667

2668
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
18✔
2669
  if (tmq == NULL) {
18!
2670
    tqErrorC("invalid tmq handle, null");
×
2671
    if (cb != NULL) {
×
2672
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2673
    }
2674
    return;
×
2675
  }
2676
  if (pRes == NULL) {  // here needs to commit all offsets.
18!
2677
    asyncCommitAllOffsets(tmq, cb, param);
18✔
2678
  } else {  // only commit one offset
2679
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2680
  }
2681
}
2682

2683
static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) {
660✔
2684
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
660✔
2685
  pInfo->code = code;
660✔
2686
  if (tsem2_post(&pInfo->sem) != 0){
660!
2687
    tqErrorC("failed to post rsp sem in commit cb");
×
2688
  }
2689
}
660✔
2690

2691
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
655✔
2692
  if (tmq == NULL) {
655!
2693
    tqErrorC("invalid tmq handle, null");
×
2694
    return TSDB_CODE_INVALID_PARA;
×
2695
  }
2696

2697
  int32_t code = 0;
655✔
2698

2699
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
655!
2700
  if (pInfo == NULL) {
655!
2701
    tqErrorC("failed to allocate memory for sync commit");
×
2702
    return terrno;
×
2703
  }
2704

2705
  code = tsem2_init(&pInfo->sem, 0, 0);
655✔
2706
  if (code != 0) {
655!
2707
    tqErrorC("failed to init sem for sync commit");
×
2708
    taosMemoryFree(pInfo);
×
2709
    return code;
×
2710
  }
2711
  pInfo->code = 0;
655✔
2712

2713
  if (pRes == NULL) {
655✔
2714
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
626✔
2715
  } else {
2716
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
29✔
2717
  }
2718

2719
  if (tsem2_wait(&pInfo->sem) != 0){
655!
2720
    tqErrorC("failed to wait sem for sync commit");
×
2721
  }
2722
  code = pInfo->code;
655✔
2723

2724
  if(tsem2_destroy(&pInfo->sem) != 0) {
655!
2725
    tqErrorC("failed to destroy sem for sync commit");
×
2726
  }
2727
  taosMemoryFree(pInfo);
655!
2728

2729
  tqInfoC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
655!
2730
  return code;
655✔
2731
}
2732

2733
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2734
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
39✔
2735
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
39!
2736
    tqErrorC("Assignment or poll interface need to be called first");
×
2737
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
2738
  }
2739

2740
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
39!
2741
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
1!
2742
             offset->walVerBegin, offset->walVerEnd);
2743
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
1✔
2744
  }
2745

2746
  return 0;
38✔
2747
}
2748

2749
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
38✔
2750
  if (tmq == NULL || pTopicName == NULL) {
38!
2751
    tqErrorC("invalid tmq handle, null");
×
2752
    return TSDB_CODE_INVALID_PARA;
×
2753
  }
2754

2755
  int32_t accId = tmq->pTscObj->acctId;
38✔
2756
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
38✔
2757
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
38✔
2758

2759
  taosWLockLatch(&tmq->lock);
38✔
2760
  SMqClientVg* pVg = NULL;
38✔
2761
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
38✔
2762
  if (code != 0) {
38✔
2763
    taosWUnLockLatch(&tmq->lock);
26✔
2764
    return code;
26✔
2765
  }
2766

2767
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
12✔
2768
  code = checkWalRange(pOffsetInfo, offset);
12✔
2769
  if (code != 0) {
12!
2770
    taosWUnLockLatch(&tmq->lock);
×
2771
    return code;
×
2772
  }
2773
  taosWUnLockLatch(&tmq->lock);
12✔
2774

2775
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
12✔
2776

2777
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
12!
2778
  if (pInfo == NULL) {
12!
2779
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2780
    return terrno;
×
2781
  }
2782

2783
  code = tsem2_init(&pInfo->sem, 0, 0);
12✔
2784
  if (code != 0) {
12!
2785
    taosMemoryFree(pInfo);
×
2786
    return code;
×
2787
  }
2788
  pInfo->code = 0;
12✔
2789

2790
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
12✔
2791
  if (code == 0) {
12✔
2792
    if (tsem2_wait(&pInfo->sem) != 0){
5!
2793
      tqErrorC("failed to wait sem for sync commit offset");
×
2794
    }
2795
    code = pInfo->code;
5✔
2796
  }
2797

2798
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
12✔
2799
  if(tsem2_destroy(&pInfo->sem) != 0) {
12!
2800
    tqErrorC("failed to destroy sem for sync commit offset");
×
2801
  }
2802
  taosMemoryFree(pInfo);
12!
2803

2804
  tqInfoC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
12!
2805
          offset, tstrerror(code));
2806

2807
  return code;
12✔
2808
}
2809

2810
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
20✔
2811
                             void* param) {
2812
  int32_t code = 0;
20✔
2813
  if (tmq == NULL || pTopicName == NULL) {
20!
2814
    tqErrorC("invalid tmq handle, null");
×
2815
    code = TSDB_CODE_INVALID_PARA;
×
2816
    goto end;
×
2817
  }
2818

2819
  int32_t accId = tmq->pTscObj->acctId;
20✔
2820
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
20✔
2821
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
20✔
2822

2823
  taosWLockLatch(&tmq->lock);
20✔
2824
  SMqClientVg* pVg = NULL;
20✔
2825
  code = getClientVg(tmq, tname, vgId, &pVg);
20✔
2826
  if (code != 0) {
20✔
2827
    taosWUnLockLatch(&tmq->lock);
16✔
2828
    goto end;
16✔
2829
  }
2830

2831
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
4✔
2832
  code = checkWalRange(pOffsetInfo, offset);
4✔
2833
  if (code != 0) {
4!
2834
    taosWUnLockLatch(&tmq->lock);
×
2835
    goto end;
×
2836
  }
2837
  taosWUnLockLatch(&tmq->lock);
4✔
2838

2839
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
4✔
2840

2841
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
4✔
2842

2843
  tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
4!
2844
          offset, tstrerror(code));
2845

2846
end:
×
2847
  if (code != 0 && cb != NULL) {
20!
2848
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
2849
    cb(tmq, code, param);
×
2850
  }
2851
}
20✔
2852

2853

2854
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
364,617✔
2855
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
364,617✔
2856
  SMqDataRsp* data = &pRspObj->dataRsp;
364,617✔
2857

2858
  pRspObj->resIter++;
364,617✔
2859
  if (pRspObj->resIter < data->blockNum) {
364,617✔
2860
    if (data->withSchema) {
333,544✔
2861
      doFreeReqResultInfo(&pRspObj->resInfo);
333,543✔
2862
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
333,548✔
2863
      if (pSW) {
333,547!
2864
        TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols));
333,547!
2865
      }
2866
    }
2867

2868
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
333,539✔
2869
    void*   rawData = NULL;
333,534✔
2870
    int64_t rows = 0;
333,534✔
2871
    int32_t precision = 0;
333,534✔
2872
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
333,534✔
2873

2874
    pRspObj->resInfo.pData = rawData;
333,530✔
2875
    pRspObj->resInfo.numOfRows = rows;
333,530✔
2876
    pRspObj->resInfo.current = 0;
333,530✔
2877
    pRspObj->resInfo.precision = precision;
333,530✔
2878

2879
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
333,530✔
2880
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4);
333,530✔
2881
    if (code != 0) {
333,546!
2882
      return code;
×
2883
    }
2884
    *pResInfo = &pRspObj->resInfo;
333,546✔
2885
    return code;
333,546✔
2886
  }
2887

2888
  return TSDB_CODE_TSC_INTERNAL_ERROR;
31,073✔
2889
}
2890

2891
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
3✔
2892
  if (param == NULL) {
3!
2893
    return code;
×
2894
  }
2895
  SMqVgWalInfoParam* pParam = param;
3✔
2896
  SMqVgCommon*       pCommon = pParam->pCommon;
3✔
2897

2898
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
3✔
2899
  if (code != TSDB_CODE_SUCCESS) {
3!
2900
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2901
             pParam->vgId, pCommon->pTopicName);
2902

2903
  } else {
2904
    SMqDataRsp rsp = {0};
3✔
2905
    SDecoder   decoder = {0};
3✔
2906
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
3✔
2907
    code = tDecodeMqDataRsp(&decoder, &rsp);
3✔
2908
    tDecoderClear(&decoder);
3✔
2909
    if (code != 0) {
3!
2910
      goto END;
×
2911
    }
2912

2913
    SMqRspHead*          pHead = pMsg->pData;
3✔
2914
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
3✔
2915
                                       .end = pHead->walever + 1,
3✔
2916
                                       .currentOffset = rsp.rspOffset.version,
3✔
2917
                                       .vgId = pParam->vgId};
3✔
2918

2919
    (void)taosThreadMutexLock(&pCommon->mutex);
3✔
2920
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
6!
2921
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2922
               pParam->vgId, pCommon->pTopicName);
2923
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2924
    }
2925
    (void)taosThreadMutexUnlock(&pCommon->mutex);
3✔
2926
  }
2927

2928
END:
3✔
2929
  pCommon->code = code;
3✔
2930
  if (total == pParam->totalReq) {
3✔
2931
    if (tsem2_post(&pCommon->rsp) != 0) {
2!
2932
      tqErrorC("failed to post semaphore in get wal cb");
×
2933
    }
2934
  }
2935

2936
  if (pMsg) {
3!
2937
    taosMemoryFree(pMsg->pData);
3!
2938
    taosMemoryFree(pMsg->pEpSet);
3!
2939
  }
2940

2941
  return code;
3✔
2942
}
2943

2944
static void destroyCommonInfo(SMqVgCommon* pCommon) {
8✔
2945
  if (pCommon == NULL) {
8✔
2946
    return;
6✔
2947
  }
2948
  taosArrayDestroy(pCommon->pList);
2✔
2949
  if(tsem2_destroy(&pCommon->rsp) != 0) {
2!
2950
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
2951
  }
2952
  (void)taosThreadMutexDestroy(&pCommon->mutex);
2✔
2953
  taosMemoryFree(pCommon->pTopicName);
2!
2954
  taosMemoryFree(pCommon);
2!
2955
}
2956

2957
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
67✔
2958
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
67!
2959
    return true;
×
2960
  }
2961
  return false;
67✔
2962
}
2963

2964
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
2✔
2965
  SMqCommittedParam* pParam = param;
2✔
2966

2967
  if (code != 0) {
2✔
2968
    goto end;
1✔
2969
  }
2970
  if (pMsg) {
1!
2971
    SDecoder decoder = {0};
1✔
2972
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
1✔
2973
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
1✔
2974
    if (err < 0) {
1!
2975
      tOffsetDestroy(&pParam->vgOffset.offset);
×
2976
      code = err;
×
2977
      goto end;
×
2978
    }
2979
    tDecoderClear(&decoder);
1✔
2980
  }
2981

2982
end:
×
2983
  if (pMsg) {
2!
2984
    taosMemoryFree(pMsg->pData);
2!
2985
    taosMemoryFree(pMsg->pEpSet);
2!
2986
  }
2987
  pParam->code = code;
2✔
2988
  if (tsem2_post(&pParam->sem) != 0){
2!
2989
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
2990
  }
2991
  return code;
2✔
2992
}
2993

2994
int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) {
2✔
2995
  int32_t     code = 0;
2✔
2996
  SMqVgOffset pOffset = {0};
2✔
2997

2998
  pOffset.consumerId = tmq->consumerId;
2✔
2999
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
2✔
3000

3001
  int32_t len = 0;
2✔
3002
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
2!
3003
  if (code < 0) {
2!
3004
    return TSDB_CODE_INVALID_PARA;
×
3005
  }
3006

3007
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
2!
3008
  if (buf == NULL) {
2!
3009
    return terrno;
×
3010
  }
3011

3012
  ((SMsgHead*)buf)->vgId = htonl(vgId);
2✔
3013

3014
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
2✔
3015

3016
  SEncoder encoder = {0};
2✔
3017
  tEncoderInit(&encoder, abuf, len);
2✔
3018
  code = tEncodeMqVgOffset(&encoder, &pOffset);
2✔
3019
  if (code < 0) {
2!
3020
    taosMemoryFree(buf);
×
3021
    tEncoderClear(&encoder);
×
3022
    return code;
×
3023
  }
3024
  tEncoderClear(&encoder);
2✔
3025

3026
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2!
3027
  if (sendInfo == NULL) {
2!
3028
    taosMemoryFree(buf);
×
3029
    return terrno;
×
3030
  }
3031

3032
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
2!
3033
  if (pParam == NULL) {
2!
3034
    taosMemoryFree(buf);
×
3035
    taosMemoryFree(sendInfo);
×
3036
    return terrno;
×
3037
  }
3038
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
2!
3039
    taosMemoryFree(buf);
×
3040
    taosMemoryFree(sendInfo);
×
3041
    taosMemoryFree(pParam);
×
3042
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3043
  }
3044

3045
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
2✔
3046
  sendInfo->requestId = generateRequestId();
2✔
3047
  sendInfo->requestObjRefId = 0;
2✔
3048
  sendInfo->param = pParam;
2✔
3049
  sendInfo->fp = tmCommittedCb;
2✔
3050
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
2✔
3051

3052
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
2✔
3053
  if (code != 0) {
2!
3054
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3055
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3056
    }
3057
    taosMemoryFree(pParam);
×
3058
    return code;
×
3059
  }
3060

3061
  if (tsem2_wait(&pParam->sem) != 0){
2!
3062
    tqErrorC("failed to wait semaphore in get committed from server");
×
3063
  }
3064
  code = pParam->code;
2✔
3065
  if (code == TSDB_CODE_SUCCESS) {
2✔
3066
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
1!
3067
      code = pParam->vgOffset.offset.val.version;
1✔
3068
    } else {
3069
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3070
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3071
    }
3072
  }
3073
  if(tsem2_destroy(&pParam->sem) != 0) {
2!
3074
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3075
  }
3076
  taosMemoryFree(pParam);
2!
3077

3078
  return code;
2✔
3079
}
3080

3081
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
29✔
3082
  if (tmq == NULL || pTopicName == NULL) {
29!
3083
    tqErrorC("invalid tmq handle, null");
×
3084
    return TSDB_CODE_INVALID_PARA;
×
3085
  }
3086

3087
  int32_t accId = tmq->pTscObj->acctId;
29✔
3088
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
29✔
3089
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
29✔
3090

3091
  taosWLockLatch(&tmq->lock);
29✔
3092

3093
  SMqClientVg* pVg = NULL;
29✔
3094
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
29✔
3095
  if (code != 0) {
29✔
3096
    taosWUnLockLatch(&tmq->lock);
15✔
3097
    return code;
15✔
3098
  }
3099

3100
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
14✔
3101
  int32_t        type = pOffsetInfo->endOffset.type;
14✔
3102
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
14!
3103
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3104
    taosWUnLockLatch(&tmq->lock);
×
3105
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3106
  }
3107

3108
  code = checkWalRange(pOffsetInfo, -1);
14✔
3109
  if (code != 0) {
14!
3110
    taosWUnLockLatch(&tmq->lock);
×
3111
    return code;
×
3112
  }
3113
  SEpSet  epSet = pVg->epSet;
14✔
3114
  int64_t begin = pVg->offsetInfo.walVerBegin;
14✔
3115
  int64_t end = pVg->offsetInfo.walVerEnd;
14✔
3116
  taosWUnLockLatch(&tmq->lock);
14✔
3117

3118
  int64_t position = 0;
14✔
3119
  if (type == TMQ_OFFSET__LOG) {
14✔
3120
    position = pOffsetInfo->endOffset.version;
13✔
3121
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
1!
3122
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3123
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
1!
3124
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3125
        position = begin;
×
3126
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3127
        position = end;
×
3128
      }
3129
    } else {
3130
      position = code;
1✔
3131
    }
3132
  } else {
3133
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3134
  }
3135

3136
  tqInfoC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
14!
3137
  return position;
14✔
3138
}
3139

3140
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
26✔
3141
  if (tmq == NULL || pTopicName == NULL) {
26!
3142
    tqErrorC("invalid tmq handle, null");
×
3143
    return TSDB_CODE_INVALID_PARA;
×
3144
  }
3145

3146
  int32_t accId = tmq->pTscObj->acctId;
26✔
3147
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
26✔
3148
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
26✔
3149

3150
  taosWLockLatch(&tmq->lock);
26✔
3151

3152
  SMqClientVg* pVg = NULL;
26✔
3153
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
26✔
3154
  if (code != 0) {
26✔
3155
    taosWUnLockLatch(&tmq->lock);
11✔
3156
    return code;
11✔
3157
  }
3158

3159
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
15✔
3160
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
15!
3161
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3162
             pOffsetInfo->endOffset.type);
3163
    taosWUnLockLatch(&tmq->lock);
×
3164
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3165
  }
3166

3167
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
15!
3168
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3169
             pOffsetInfo->committedOffset.type);
3170
    taosWUnLockLatch(&tmq->lock);
×
3171
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3172
  }
3173

3174
  int64_t committed = 0;
15✔
3175
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
15✔
3176
    committed = pOffsetInfo->committedOffset.version;
14✔
3177
    taosWUnLockLatch(&tmq->lock);
14✔
3178
    goto end;
14✔
3179
  }
3180
  SEpSet epSet = pVg->epSet;
1✔
3181
  taosWUnLockLatch(&tmq->lock);
1✔
3182

3183
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3184

3185
end:
15✔
3186
  tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
15!
3187
  return committed;
15✔
3188
}
3189

3190
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
25✔
3191
                                 int32_t* numOfAssignment) {
3192
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
25!
3193
    tqErrorC("invalid tmq handle, null");
17!
3194
    return TSDB_CODE_INVALID_PARA;
17✔
3195
  }
3196
  *numOfAssignment = 0;
8✔
3197
  *assignment = NULL;
8✔
3198
  SMqVgCommon* pCommon = NULL;
8✔
3199

3200
  int32_t accId = tmq->pTscObj->acctId;
8✔
3201
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
8✔
3202
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
8✔
3203

3204
  taosWLockLatch(&tmq->lock);
8✔
3205

3206
  SMqClientTopic* pTopic = NULL;
8✔
3207
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
8✔
3208
  if (code != 0) {
8!
3209
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
3210
    goto end;
×
3211
  }
3212

3213
  // in case of snapshot is opened, no valid offset will return
3214
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
8✔
3215
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
22✔
3216
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
14✔
3217
    if (pClientVg == NULL) {
14!
3218
      continue;
×
3219
    }
3220
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
14✔
3221
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
14!
3222
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3223
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3224
      goto end;
×
3225
    }
3226
  }
3227

3228
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
8!
3229
  if (*assignment == NULL) {
8!
3230
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3231
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3232
    code = terrno;
×
3233
    goto end;
×
3234
  }
3235

3236
  bool needFetch = false;
8✔
3237

3238
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
20✔
3239
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
14✔
3240
    if (pClientVg == NULL) {
14!
3241
      continue;
×
3242
    }
3243
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
14✔
3244
      needFetch = true;
2✔
3245
      break;
2✔
3246
    }
3247

3248
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
12✔
3249
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
12✔
3250
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
12✔
3251
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
12✔
3252
    pAssignment->vgId = pClientVg->vgId;
12✔
3253
    tqInfoC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
12!
3254
            pAssignment->currentOffset);
3255
  }
3256

3257
  if (needFetch) {
8✔
3258
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
2!
3259
    if (pCommon == NULL) {
2!
3260
      code = terrno;
×
3261
      goto end;
×
3262
    }
3263

3264
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
2✔
3265
    if (pCommon->pList == NULL) {
2!
3266
      code = terrno;
×
3267
      goto end;
×
3268
    }
3269

3270
    code = tsem2_init(&pCommon->rsp, 0, 0);
2✔
3271
    if (code != 0) {
2!
3272
      goto end;
×
3273
    }
3274
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
2✔
3275
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
2!
3276
    if (pCommon->pTopicName == NULL) {
2!
3277
      code = terrno;
×
3278
      goto end;
×
3279
    }
3280
    pCommon->consumerId = tmq->consumerId;
2✔
3281

3282
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
5✔
3283
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
3✔
3284
      if (pClientVg == NULL) {
3!
3285
        continue;
×
3286
      }
3287
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
3!
3288
      if (pParam == NULL) {
3!
3289
        code = terrno;
×
3290
        goto end;
×
3291
      }
3292

3293
      pParam->epoch = tmq->epoch;
3✔
3294
      pParam->vgId = pClientVg->vgId;
3✔
3295
      pParam->totalReq = *numOfAssignment;
3✔
3296
      pParam->pCommon = pCommon;
3✔
3297

3298
      SMqPollReq req = {0};
3✔
3299
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
3✔
3300
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
3✔
3301

3302
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
3✔
3303
      if (msgSize < 0) {
3!
3304
        taosMemoryFree(pParam);
×
3305
        code = msgSize;
×
3306
        goto end;
×
3307
      }
3308

3309
      char* msg = taosMemoryCalloc(1, msgSize);
3!
3310
      if (NULL == msg) {
3!
3311
        taosMemoryFree(pParam);
×
3312
        code = terrno;
×
3313
        goto end;
×
3314
      }
3315

3316
      msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
3✔
3317
      if (msgSize < 0) {
3!
3318
        taosMemoryFree(msg);
×
3319
        taosMemoryFree(pParam);
×
3320
        code = msgSize;
×
3321
        goto end;
×
3322
      }
3323

3324
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3!
3325
      if (sendInfo == NULL) {
3!
3326
        taosMemoryFree(pParam);
×
3327
        taosMemoryFree(msg);
×
3328
        code = terrno;
×
3329
        goto end;
×
3330
      }
3331

3332
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
3✔
3333
      sendInfo->requestId = req.reqId;
3✔
3334
      sendInfo->requestObjRefId = 0;
3✔
3335
      sendInfo->param = pParam;
3✔
3336
      sendInfo->paramFreeFp = taosAutoMemoryFree;
3✔
3337
      sendInfo->fp = tmqGetWalInfoCb;
3✔
3338
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
3✔
3339

3340
      // int64_t transporterId = 0;
3341
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
3✔
3342
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
3✔
3343

3344
      tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
3!
3345
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3346
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
3✔
3347
      if (code != 0) {
3!
3348
        goto end;
×
3349
      }
3350
    }
3351

3352
    if (tsem2_wait(&pCommon->rsp) != 0){
2!
3353
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3354
    }
3355
    code = pCommon->code;
2✔
3356

3357
    if (code != TSDB_CODE_SUCCESS) {
2!
3358
      goto end;
×
3359
    }
3360
    int32_t num = taosArrayGetSize(pCommon->pList);
2✔
3361
    for (int32_t i = 0; i < num; ++i) {
5✔
3362
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
3✔
3363
    }
3364
    *numOfAssignment = num;
2✔
3365

3366
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5✔
3367
      tmq_topic_assignment* p = &(*assignment)[j];
3✔
3368

3369
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
8✔
3370
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
5✔
3371
        if (pClientVg == NULL) {
5!
3372
          continue;
×
3373
        }
3374
        if (pClientVg->vgId != p->vgId) {
5✔
3375
          continue;
2✔
3376
        }
3377

3378
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
3✔
3379
        tqInfoC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
3!
3380
                p->vgId, p->currentOffset);
3381

3382
        pOffsetInfo->walVerBegin = p->begin;
3✔
3383
        pOffsetInfo->walVerEnd = p->end;
3✔
3384
      }
3385
    }
3386
  }
3387

3388
end:
8✔
3389
  if (code != TSDB_CODE_SUCCESS) {
8!
3390
    taosMemoryFree(*assignment);
×
3391
    *assignment = NULL;
×
3392
    *numOfAssignment = 0;
×
3393
  }
3394
  destroyCommonInfo(pCommon);
8✔
3395
  taosWUnLockLatch(&tmq->lock);
8✔
3396
  return code;
8✔
3397
}
3398

3399
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
23✔
3400
  if (pAssignment == NULL) {
23✔
3401
    return;
16✔
3402
  }
3403

3404
  taosMemoryFree(pAssignment);
7!
3405
}
3406

3407
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
8✔
3408
  if (pMsg) {
8!
3409
    taosMemoryFree(pMsg->pData);
8!
3410
    taosMemoryFree(pMsg->pEpSet);
8!
3411
  }
3412
  if (param == NULL) {
8!
3413
    return code;
×
3414
  }
3415
  SMqSeekParam* pParam = param;
8✔
3416
  pParam->code = code;
8✔
3417
  if (tsem2_post(&pParam->sem) != 0){
8!
3418
    tqErrorC("failed to post sem in tmqSeekCb");
×
3419
  }
3420
  return 0;
8✔
3421
}
3422

3423
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3424
// there is no data to poll
3425
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
26✔
3426
  if (tmq == NULL || pTopicName == NULL) {
26!
3427
    tqErrorC("invalid tmq handle, null");
×
3428
    return TSDB_CODE_INVALID_PARA;
×
3429
  }
3430

3431
  int32_t accId = tmq->pTscObj->acctId;
26✔
3432
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
26✔
3433
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
26✔
3434

3435
  taosWLockLatch(&tmq->lock);
26✔
3436

3437
  SMqClientVg* pVg = NULL;
26✔
3438
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
26✔
3439
  if (code != 0) {
26✔
3440
    taosWUnLockLatch(&tmq->lock);
17✔
3441
    return code;
17✔
3442
  }
3443

3444
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
9✔
3445

3446
  int32_t type = pOffsetInfo->endOffset.type;
9✔
3447
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
9!
3448
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3449
    taosWUnLockLatch(&tmq->lock);
×
3450
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3451
  }
3452

3453
  code = checkWalRange(pOffsetInfo, offset);
9✔
3454
  if (code != 0) {
9✔
3455
    taosWUnLockLatch(&tmq->lock);
1✔
3456
    return code;
1✔
3457
  }
3458

3459
  tqInfoC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
8!
3460
  // update the offset, and then commit to vnode
3461
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
8✔
3462
  pOffsetInfo->endOffset.version = offset;
8✔
3463
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
8✔
3464
  pVg->seekUpdated = true;
8✔
3465
  SEpSet epSet = pVg->epSet;
8✔
3466
  taosWUnLockLatch(&tmq->lock);
8✔
3467

3468
  SMqSeekReq req = {0};
8✔
3469
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
8✔
3470
  req.head.vgId = vgId;
8✔
3471
  req.consumerId = tmq->consumerId;
8✔
3472

3473
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
8✔
3474
  if (msgSize < 0) {
8!
3475
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3476
  }
3477

3478
  char* msg = taosMemoryCalloc(1, msgSize);
8!
3479
  if (NULL == msg) {
8!
3480
    return terrno;
×
3481
  }
3482

3483
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
8!
3484
    taosMemoryFree(msg);
×
3485
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3486
  }
3487

3488
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
8!
3489
  if (sendInfo == NULL) {
8!
3490
    taosMemoryFree(msg);
×
3491
    return terrno;
×
3492
  }
3493

3494
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
8!
3495
  if (pParam == NULL) {
8!
3496
    taosMemoryFree(msg);
×
3497
    taosMemoryFree(sendInfo);
×
3498
    return terrno;
×
3499
  }
3500
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
8!
3501
    taosMemoryFree(msg);
×
3502
    taosMemoryFree(sendInfo);
×
3503
    taosMemoryFree(pParam);
×
3504
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3505
  }
3506

3507
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
8✔
3508
  sendInfo->requestId = generateRequestId();
8✔
3509
  sendInfo->requestObjRefId = 0;
8✔
3510
  sendInfo->param = pParam;
8✔
3511
  sendInfo->fp = tmqSeekCb;
8✔
3512
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
8✔
3513

3514
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
8✔
3515
  if (code != 0) {
8!
3516
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3517
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3518
    }
3519
    taosMemoryFree(pParam);
×
3520
    return code;
×
3521
  }
3522

3523
  if (tsem2_wait(&pParam->sem) != 0){
8!
3524
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3525
  }
3526
  code = pParam->code;
8✔
3527
  if(tsem2_destroy(&pParam->sem) != 0) {
8!
3528
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3529
  }
3530
  taosMemoryFree(pParam);
8!
3531

3532
  tqInfoC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
8!
3533

3534
  return code;
8✔
3535
}
3536

3537
TAOS* tmq_get_connect(tmq_t* tmq) {
16✔
3538
  if (tmq && tmq->pTscObj) {
16!
3539
    return (TAOS*)(&(tmq->pTscObj->id));
16✔
3540
  }
3541
  return NULL;
×
3542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc