• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.38
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24
#include "tref.h"
25
#include "ttimer.h"
26

27
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}     while(0)
28
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }}            while(0)
29
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30

31
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
32
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
33
#define DEFAULT_HEARTBEAT_INTERVAL     3000
34
#define DEFAULT_ASKEP_INTERVAL         1000
35
#define DEFAULT_COMMIT_CNT             1
36
#define SUBSCRIBE_RETRY_MAX_COUNT      240
37
#define SUBSCRIBE_RETRY_INTERVAL       500
38

39

40
#define SET_ERROR_MSG_TMQ(MSG) \
41
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
42

43
#define PROCESS_POLL_RSP(FUNC,DATA) \
44
  SDecoder decoder = {0}; \
45
  tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); \
46
  if (FUNC(&decoder, DATA) < 0) { \
47
    tDecoderClear(&decoder); \
48
    code = TSDB_CODE_OUT_OF_MEMORY; \
49
    goto END;\
50
  }\
51
  tDecoderClear(&decoder);\
52
  (void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead));
53

54
#define DELETE_POLL_RSP(FUNC,DATA) \
55
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
56
  taosMemoryFreeClear(pRsp->pEpset);\
57
  FUNC(DATA);
58

59
enum {
60
  TMQ_VG_STATUS__IDLE = 0,
61
  TMQ_VG_STATUS__WAIT,
62
};
63

64
enum {
65
  TMQ_CONSUMER_STATUS__INIT = 0,
66
  TMQ_CONSUMER_STATUS__READY,
67
  TMQ_CONSUMER_STATUS__CLOSED,
68
};
69

70
enum {
71
  TMQ_DELAYED_TASK__ASK_EP = 1,
72
  TMQ_DELAYED_TASK__COMMIT,
73
};
74

75
typedef struct {
76
  tmr_h   timer;
77
  int32_t rsetId;
78
} SMqMgmt;
79

80
struct tmq_list_t {
81
  SArray container;
82
};
83

84
struct tmq_conf_t {
85
  char           clientId[TSDB_CLIENT_ID_LEN];
86
  char           groupId[TSDB_CGROUP_LEN];
87
  int8_t         autoCommit;
88
  int8_t         resetOffset;
89
  int8_t         withTbName;
90
  int8_t         snapEnable;
91
  int8_t         replayEnable;
92
  int8_t         sourceExcluded;  // do not consume, bit
93
  uint16_t       port;
94
  int32_t        autoCommitInterval;
95
  int32_t        sessionTimeoutMs;
96
  int32_t        heartBeatIntervalMs;
97
  int32_t        maxPollIntervalMs;
98
  char*          ip;
99
  char*          user;
100
  char*          pass;
101
  tmq_commit_cb* commitCb;
102
  void*          commitCbUserParam;
103
  int8_t         enableBatchMeta;
104
};
105

106
struct tmq_t {
107
  int64_t        refId;
108
  char           groupId[TSDB_CGROUP_LEN];
109
  char           clientId[TSDB_CLIENT_ID_LEN];
110
  char           user[TSDB_USER_LEN];
111
  char           fqdn[TSDB_FQDN_LEN];
112
  int8_t         withTbName;
113
  int8_t         useSnapshot;
114
  int8_t         autoCommit;
115
  int32_t        autoCommitInterval;
116
  int32_t        sessionTimeoutMs;
117
  int32_t        heartBeatIntervalMs;
118
  int32_t        maxPollIntervalMs;
119
  int8_t         resetOffsetCfg;
120
  int8_t         replayEnable;
121
  int8_t         sourceExcluded;  // do not consume, bit
122
  int64_t        consumerId;
123
  tmq_commit_cb* commitCb;
124
  void*          commitCbUserParam;
125
  int8_t         enableBatchMeta;
126

127
  // status
128
  SRWLatch lock;
129
  int8_t   status;
130
  int32_t  epoch;
131
  // poll info
132
  int64_t pollCnt;
133
  int64_t totalRows;
134
  int8_t  pollFlag;
135

136
  // timer
137
  tmr_h       hbLiveTimer;
138
  tmr_h       epTimer;
139
  tmr_h       commitTimer;
140
  STscObj*    pTscObj;       // connection
141
  SArray*     clientTopics;  // SArray<SMqClientTopic>
142
  STaosQueue* mqueue;        // queue of rsp
143
  STaosQall*  qall;
144
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
145
  tsem2_t     rspSem;
146
};
147

148
typedef struct {
149
  int32_t code;
150
  tsem2_t sem;
151
} SAskEpInfo;
152

153
typedef struct {
154
  STqOffsetVal committedOffset;
155
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
156
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
157
  int64_t      walVerBegin;
158
  int64_t      walVerEnd;
159
} SVgOffsetInfo;
160

161
typedef struct {
162
  int64_t       pollCnt;
163
  int64_t       numOfRows;
164
  SVgOffsetInfo offsetInfo;
165
  int32_t       vgId;
166
  int32_t       vgStatus;
167
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
168
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
169
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
170
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
171
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
172
  SEpSet        epSet;
173
} SMqClientVg;
174

175
typedef struct {
176
  char           topicName[TSDB_TOPIC_FNAME_LEN];
177
  char           db[TSDB_DB_FNAME_LEN];
178
  SArray*        vgs;  // SArray<SMqClientVg>
179
  SSchemaWrapper schema;
180
  int8_t         noPrivilege;
181
} SMqClientTopic;
182

183
typedef struct {
184
  int32_t         vgId;
185
  char            topicName[TSDB_TOPIC_FNAME_LEN];
186
  SMqClientTopic* topicHandle;
187
  uint64_t        reqId;
188
  SEpSet*         pEpset;
189
  union {
190
    struct{
191
      SMqRspHead   head;
192
      STqOffsetVal rspOffset;
193
    };
194
    SMqDataRsp      dataRsp;
195
    SMqMetaRsp      metaRsp;
196
    SMqBatchMetaRsp batchMetaRsp;
197
  };
198
} SMqPollRspWrapper;
199

200
typedef struct {
201
  int32_t code;
202
  int8_t  tmqRspType;
203
  int32_t epoch;
204
  union{
205
    SMqPollRspWrapper pollRsp;
206
    SMqAskEpRsp       epRsp;
207
  };
208
} SMqRspWrapper;
209

210
typedef struct {
211
  tsem2_t rspSem;
212
  int32_t rspErr;
213
} SMqSubscribeCbParam;
214

215
typedef struct {
216
  int64_t refId;
217
  bool    sync;
218
  void*   pParam;
219
} SMqAskEpCbParam;
220

221
typedef struct {
222
  int64_t  refId;
223
  char     topicName[TSDB_TOPIC_FNAME_LEN];
224
  int32_t  vgId;
225
  uint64_t requestId;  // request id for debug purpose
226
} SMqPollCbParam;
227

228
typedef struct {
229
  tsem2_t       rsp;
230
  int32_t       numOfRsp;
231
  SArray*       pList;
232
  TdThreadMutex mutex;
233
  int64_t       consumerId;
234
  char*         pTopicName;
235
  int32_t       code;
236
} SMqVgCommon;
237

238
typedef struct {
239
  tsem2_t sem;
240
  int32_t code;
241
} SMqSeekParam;
242

243
typedef struct {
244
  tsem2_t     sem;
245
  int32_t     code;
246
  SMqVgOffset vgOffset;
247
} SMqCommittedParam;
248

249
typedef struct {
250
  int32_t      vgId;
251
  int32_t      epoch;
252
  int32_t      totalReq;
253
  SMqVgCommon* pCommon;
254
} SMqVgWalInfoParam;
255

256
typedef struct {
257
  int64_t        refId;
258
  int32_t        epoch;
259
  int32_t        waitingRspNum;
260
  int32_t        code;
261
  tmq_commit_cb* callbackFn;
262
  void*          userParam;
263
} SMqCommitCbParamSet;
264

265
typedef struct {
266
  SMqCommitCbParamSet* params;
267
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
268
  int32_t              vgId;
269
  int64_t              consumerId;
270
} SMqCommitCbParam;
271

272
typedef struct {
273
  tsem2_t sem;
274
  int32_t code;
275
} SSyncCommitInfo;
276

277
typedef struct {
278
  STqOffsetVal currentOffset;
279
  STqOffsetVal commitOffset;
280
  STqOffsetVal seekOffset;
281
  int64_t      numOfRows;
282
  int32_t      vgStatus;
283
} SVgroupSaveInfo;
284

285
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
286
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
287
static   SMqMgmt        tmqMgmt = {0};
288

289
tmq_conf_t* tmq_conf_new() {
444✔
290
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
444✔
291
  if (conf == NULL) {
446!
292
    return conf;
×
293
  }
294

295
  conf->withTbName = false;
446✔
296
  conf->autoCommit = true;
446✔
297
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
446✔
298
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
446✔
299
  conf->enableBatchMeta = false;
446✔
300
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
446✔
301
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
446✔
302
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
446✔
303

304
  return conf;
446✔
305
}
306

307
void tmq_conf_destroy(tmq_conf_t* conf) {
446✔
308
  if (conf) {
446!
309
    if (conf->ip) {
446✔
310
      taosMemoryFree(conf->ip);
2✔
311
    }
312
    if (conf->user) {
446✔
313
      taosMemoryFree(conf->user);
444✔
314
    }
315
    if (conf->pass) {
446✔
316
      taosMemoryFree(conf->pass);
444✔
317
    }
318
    taosMemoryFree(conf);
446✔
319
  }
320
}
446✔
321

322
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
3,071✔
323
  int32_t code = 0;
3,071✔
324
  if (conf == NULL || key == NULL || value == NULL) {
3,071!
325
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
326
    return TMQ_CONF_INVALID;
×
327
  }
328
  if (strcasecmp(key, "group.id") == 0) {
3,081✔
329
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
446✔
330
    return TMQ_CONF_OK;
446✔
331
  }
332

333
  if (strcasecmp(key, "client.id") == 0) {
2,635✔
334
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
57✔
335
    return TMQ_CONF_OK;
57✔
336
  }
337

338
  if (strcasecmp(key, "enable.auto.commit") == 0) {
2,578✔
339
    if (strcasecmp(value, "true") == 0) {
433✔
340
      conf->autoCommit = true;
216✔
341
      return TMQ_CONF_OK;
216✔
342
    } else if (strcasecmp(value, "false") == 0) {
217!
343
      conf->autoCommit = false;
217✔
344
      return TMQ_CONF_OK;
217✔
345
    } else {
346
      tqErrorC("invalid value for enable.auto.commit: %s", value);
×
347
      return TMQ_CONF_INVALID;
×
348
    }
349
  }
350

351
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
2,145✔
352
    int64_t tmp;
353
    code = taosStr2int64(value, &tmp);
251✔
354
    if (tmp < 0 || code != 0) {
250!
355
      tqErrorC("invalid value for auto.commit.interval.ms: %s", value);
1!
356
      return TMQ_CONF_INVALID;
×
357
    }
358
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
249✔
359
    return TMQ_CONF_OK;
249✔
360
  }
361

362
  if (strcasecmp(key, "session.timeout.ms") == 0) {
1,894✔
363
    int64_t tmp;
364
    code = taosStr2int64(value, &tmp);
2✔
365
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
2!
366
      tqErrorC("invalid value for session.timeout.ms: %s", value);
×
367
      return TMQ_CONF_INVALID;
×
368
    }
369
    conf->sessionTimeoutMs = tmp;
2✔
370
    return TMQ_CONF_OK;
2✔
371
  }
372

373
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
1,892!
374
    int64_t tmp;
375
    code = taosStr2int64(value, &tmp);
×
376
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
×
377
      tqErrorC("invalid value for heartbeat.interval.ms: %s", value);
×
378
      return TMQ_CONF_INVALID;
×
379
    }
380
    conf->heartBeatIntervalMs = tmp;
×
381
    return TMQ_CONF_OK;
×
382
  }
383

384
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
1,892✔
385
    int32_t tmp;
386
    code = taosStr2int32(value, &tmp);
2✔
387
    if (tmp < 1000 || code != 0) {
2!
388
      tqErrorC("invalid value for max.poll.interval.ms: %s", value);
×
389
      return TMQ_CONF_INVALID;
×
390
    }
391
    conf->maxPollIntervalMs = tmp;
2✔
392
    return TMQ_CONF_OK;
2✔
393
  }
394

395
  if (strcasecmp(key, "auto.offset.reset") == 0) {
1,890✔
396
    if (strcasecmp(value, "none") == 0) {
414✔
397
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
11✔
398
      return TMQ_CONF_OK;
11✔
399
    } else if (strcasecmp(value, "earliest") == 0) {
403✔
400
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
381✔
401
      return TMQ_CONF_OK;
381✔
402
    } else if (strcasecmp(value, "latest") == 0) {
22!
403
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
22✔
404
      return TMQ_CONF_OK;
22✔
405
    } else {
UNCOV
406
      tqErrorC("invalid value for auto.offset.reset: %s", value);
×
407
      return TMQ_CONF_INVALID;
×
408
    }
409
  }
410

411
  if (strcasecmp(key, "msg.with.table.name") == 0) {
1,476✔
412
    if (strcasecmp(value, "true") == 0) {
391✔
413
      conf->withTbName = true;
373✔
414
      return TMQ_CONF_OK;
373✔
415
    } else if (strcasecmp(value, "false") == 0) {
18!
416
      conf->withTbName = false;
19✔
417
      return TMQ_CONF_OK;
19✔
418
    } else {
419
      tqErrorC("invalid value for msg.with.table.name: %s", value);
×
420
      return TMQ_CONF_INVALID;
×
421
    }
422
  }
423

424
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
1,085✔
425
    if (strcasecmp(value, "true") == 0) {
146✔
426
      conf->snapEnable = true;
113✔
427
      return TMQ_CONF_OK;
113✔
428
    } else if (strcasecmp(value, "false") == 0) {
33!
429
      conf->snapEnable = false;
33✔
430
      return TMQ_CONF_OK;
33✔
431
    } else {
432
      tqErrorC("invalid value for experimental.snapshot.enable: %s", value);
×
433
      return TMQ_CONF_INVALID;
×
434
    }
435
  }
436

437
  if (strcasecmp(key, "td.connect.ip") == 0) {
939✔
438
    void *tmp = taosStrdup(value);
2✔
439
    if (tmp == NULL) {
2!
440
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
441
      return TMQ_CONF_INVALID;
×
442
    }
443
    conf->ip = tmp;
2✔
444
    return TMQ_CONF_OK;
2✔
445
  }
446

447
  if (strcasecmp(key, "td.connect.user") == 0) {
937✔
448
    void *tmp = taosStrdup(value);
440✔
449
    if (tmp == NULL) {
443!
450
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
451
      return TMQ_CONF_INVALID;
×
452
    }
453
    conf->user = tmp;
443✔
454
    return TMQ_CONF_OK;
443✔
455
  }
456

457
  if (strcasecmp(key, "td.connect.pass") == 0) {
497✔
458
    void *tmp = taosStrdup(value);
442✔
459
    if (tmp == NULL) {
442!
460
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
461
      return TMQ_CONF_INVALID;
×
462
    }
463
    conf->pass = tmp;
442✔
464
    return TMQ_CONF_OK;
442✔
465
  }
466

467
  if (strcasecmp(key, "td.connect.port") == 0) {
55!
468
    int64_t tmp;
469
    code = taosStr2int64(value, &tmp);
×
470
    if (tmp <= 0 || tmp > 65535 || code != 0) {
×
471
      tqErrorC("invalid value for td.connect.port: %s", value);
×
472
      return TMQ_CONF_INVALID;
×
473
    }
474

475
    conf->port = tmp;
×
476
    return TMQ_CONF_OK;
×
477
  }
478

479
  if (strcasecmp(key, "enable.replay") == 0) {
55✔
480
    if (strcasecmp(value, "true") == 0) {
7!
481
      conf->replayEnable = true;
7✔
482
      return TMQ_CONF_OK;
7✔
483
    } else if (strcasecmp(value, "false") == 0) {
×
484
      conf->replayEnable = false;
×
485
      return TMQ_CONF_OK;
×
486
    } else {
487
      tqErrorC("invalid value for enable.replay: %s", value);
×
488
      return TMQ_CONF_INVALID;
×
489
    }
490
  }
491
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
48✔
492
    int64_t tmp;
493
    code = taosStr2int64(value, &tmp);
40✔
494
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
40!
495
    return TMQ_CONF_OK;
40✔
496
  }
497

498
  if (strcasecmp(key, "td.connect.db") == 0) {
8!
499
    return TMQ_CONF_OK;
×
500
  }
501

502
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
8✔
503
    int64_t tmp;
504
    code = taosStr2int64(value, &tmp);
4✔
505
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
4!
506
    return TMQ_CONF_OK;
4✔
507
  }
508

509
  tqErrorC("unknown key: %s", key);
4!
510
  return TMQ_CONF_UNKNOWN;
×
511
}
512

513
tmq_list_t* tmq_list_new() {
1,298✔
514
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
1,298✔
515
}
516

517
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
515✔
518
  if (list == NULL) {
515!
519
    return TSDB_CODE_INVALID_PARA;
×
520
  }
521
  SArray* container = &list->container;
515✔
522
  if (src == NULL || src[0] == 0) {
515!
523
    return TSDB_CODE_INVALID_PARA;
×
524
  }
525
  char* topic = taosStrdup(src);
515✔
526
  if (topic == NULL) return terrno;
515!
527
  if (taosArrayPush(container, &topic) == NULL) {
515!
528
    taosMemoryFree(topic);
×
529
    return terrno;
×
530
  }
531
  return 0;
515✔
532
}
533

534
void tmq_list_destroy(tmq_list_t* list) {
1,298✔
535
  if (list == NULL) return;
1,298!
536
  SArray* container = &list->container;
1,298✔
537
  taosArrayDestroyP(container, taosMemoryFree);
1,298✔
538
}
539

540
int32_t tmq_list_get_size(const tmq_list_t* list) {
7✔
541
  if (list == NULL) {
7!
542
    return TSDB_CODE_INVALID_PARA;
×
543
  }
544
  const SArray* container = &list->container;
7✔
545
  return taosArrayGetSize(container);
7✔
546
}
547

548
char** tmq_list_to_c_array(const tmq_list_t* list) {
7✔
549
  if (list == NULL) {
7!
550
    return NULL;
×
551
  }
552
  const SArray* container = &list->container;
7✔
553
  return container->pData;
7✔
554
}
555

556
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
11,680✔
557
  int64_t refId = pParamSet->refId;
11,680✔
558
  int32_t code = 0;
11,680✔
559
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
11,680✔
560
  if (tmq == NULL) {
11,680!
561
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
562
  }
563

564
  // if no more waiting rsp
565
  if (pParamSet->callbackFn != NULL) {
11,680✔
566
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
11,666✔
567
  }
568

569
  taosMemoryFree(pParamSet);
11,680✔
570
  if (tmq != NULL) {
11,680!
571
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
11,680✔
572
  }
573

574
  return code;
11,680✔
575
}
576

577
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
17,685✔
578
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
17,685✔
579
  if (waitingRspNum == 0) {
17,686✔
580
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
11,680!
581
             vgId);
582
    return tmqCommitDone(pParamSet);
11,680✔
583
  } else {
584
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
6,006!
585
             waitingRspNum);
586
  }
587
  return 0;
6,006✔
588
}
589

590
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
6,038✔
591
  if (pBuf){
6,038!
592
    taosMemoryFreeClear(pBuf->pData);
6,038!
593
    taosMemoryFreeClear(pBuf->pEpSet);
6,038✔
594
  }
595
  if(param == NULL){
6,038!
596
    return TSDB_CODE_INVALID_PARA;
×
597
  }
598
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
6,038✔
599
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
6,038✔
600

601
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
6,038✔
602
}
603

604
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
6,039✔
605
                               SMqCommitCbParamSet* pParamSet) {
606
  SMqVgOffset pOffset = {0};
6,039✔
607

608
  pOffset.consumerId = tmq->consumerId;
6,039✔
609
  pOffset.offset.val = *offset;
6,039✔
610
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
6,039✔
611
  int32_t len = 0;
6,039✔
612
  int32_t code = 0;
6,039✔
613
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
6,039!
614
  if (code < 0) {
6,039!
615
    return TSDB_CODE_INVALID_PARA;
×
616
  }
617

618
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
6,039✔
619
  if (buf == NULL) {
6,039!
620
    return terrno;
×
621
  }
622

623
  ((SMsgHead*)buf)->vgId = htonl(vgId);
6,039✔
624

625
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
6,039✔
626

627
  SEncoder encoder = {0};
6,039✔
628
  tEncoderInit(&encoder, abuf, len);
6,039✔
629
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
6,039!
630
    tEncoderClear(&encoder);
×
631
    taosMemoryFree(buf);
×
632
    return TSDB_CODE_INVALID_PARA;
×
633
  }
634
  tEncoderClear(&encoder);
6,038✔
635

636
  // build param
637
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
6,038✔
638
  if (pParam == NULL) {
6,038!
639
    taosMemoryFree(buf);
×
640
    return terrno;
×
641
  }
642

643
  pParam->params = pParamSet;
6,038✔
644
  pParam->vgId = vgId;
6,038✔
645
  pParam->consumerId = tmq->consumerId;
6,038✔
646

647
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
6,038✔
648

649
  // build send info
650
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
6,038✔
651
  if (pMsgSendInfo == NULL) {
6,039!
652
    taosMemoryFree(buf);
×
653
    taosMemoryFree(pParam);
×
654
    return terrno;
×
655
  }
656

657
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
6,039✔
658

659
  pMsgSendInfo->requestId = generateRequestId();
6,039✔
660
  pMsgSendInfo->requestObjRefId = 0;
6,039✔
661
  pMsgSendInfo->param = pParam;
6,039✔
662
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
6,039✔
663
  pMsgSendInfo->fp = tmqCommitCb;
6,039✔
664
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
6,039✔
665

666
  // int64_t transporterId = 0;
667
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
6,039✔
668
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
6,039✔
669
  if (code != 0) {
6,039!
670
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
671
  }
672
  return code;
6,039✔
673
}
674

675
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
188✔
676
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
188✔
677
  for (int32_t i = 0; i < numOfTopics; ++i) {
189✔
678
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
109✔
679
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
109!
680
      continue;
1✔
681
    }
682
    *topic = pTopic;
108✔
683
    return 0;
108✔
684
  }
685

686
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
80!
687
  return TSDB_CODE_TMQ_INVALID_TOPIC;
80✔
688
}
689

690
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
11,688✔
691
                                       SMqCommitCbParamSet** ppParamSet) {
692
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
11,688✔
693
  if (pParamSet == NULL) {
11,688!
694
    return terrno;
×
695
  }
696

697
  pParamSet->refId = tmq->refId;
11,688✔
698
  pParamSet->epoch = tmq->epoch;
11,688✔
699
  pParamSet->callbackFn = pCommitFp;
11,688✔
700
  pParamSet->userParam = userParam;
11,688✔
701
  pParamSet->waitingRspNum = rspNum;
11,688✔
702
  *ppParamSet = pParamSet;
11,688✔
703
  return 0;
11,688✔
704
}
705

706
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
181✔
707
  SMqClientTopic* pTopic = NULL;
181✔
708
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
181✔
709
  if (code != 0) {
181✔
710
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
80!
711
    return code;
80✔
712
  }
713

714
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
101✔
715
  for (int32_t i = 0; i < numOfVgs; ++i) {
145✔
716
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
142✔
717
    if (pClientVg && pClientVg->vgId == vgId) {
142!
718
      *pVg = pClientVg;
98✔
719
      break;
98✔
720
    }
721
  }
722

723
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
101✔
724
}
725

726
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
22,812✔
727
  int32_t code = 0;
22,812✔
728
  if (offsetVal->type <= 0) {
22,812✔
729
    code = TSDB_CODE_TMQ_INVALID_MSG;
1,540✔
730
    return code;
1,540✔
731
  }
732
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
21,272✔
733
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
15,234✔
734
    return code;
15,234✔
735
  }
736
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
6,038✔
737
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
6,038✔
738

739
  char commitBuf[TSDB_OFFSET_LEN] = {0};
6,039✔
740
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
6,039✔
741

742
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
6,039✔
743
  if (code != TSDB_CODE_SUCCESS) {
6,039!
744
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
745
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
746
    return code;
×
747
  }
748

749
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
6,039!
750
          tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
751
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
6,039✔
752
  return code;
6,039✔
753
}
754

755
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
41✔
756
                                 tmq_commit_cb* pCommitFp, void* userParam) {
757
  tqInfoC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
41!
758
  SMqCommitCbParamSet* pParamSet = NULL;
41✔
759
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
41✔
760
  if (code != 0){
41!
761
    return code;
×
762
  }
763

764
  taosRLockLatch(&tmq->lock);
41✔
765
  SMqClientVg* pVg = NULL;
41✔
766
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
41✔
767
  if (code == 0) {
41!
768
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
41✔
769
  }
770
  taosRUnLockLatch(&tmq->lock);
41✔
771

772
  if (code != 0){
41✔
773
    taosMemoryFree(pParamSet);
8✔
774
  }
775
  return code;
41✔
776
}
777

778
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
29✔
779
  char*        pTopicName = NULL;
29✔
780
  int32_t      vgId = 0;
29✔
781
  STqOffsetVal offsetVal = {0};
29✔
782
  int32_t      code = 0;
29✔
783

784
  if (pRes == NULL || tmq == NULL) {
29!
785
    code = TSDB_CODE_INVALID_PARA;
×
786
    goto end;
×
787
  }
788

789
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_META(pRes) ||
29!
790
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
791
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
29✔
792
    pTopicName = pRspObj->topic;
29✔
793
    vgId = pRspObj->vgId;
29✔
794
    offsetVal = pRspObj->rspOffset;
29✔
795
  } else {
796
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
797
    goto end;
×
798
  }
799

800
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
29✔
801

802
end:
29✔
803
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
29!
804
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
805
    pCommitFp(tmq, code, userParam);
×
806
  }
807
}
29✔
808

809
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
11,647✔
810
  int32_t code = 0;
11,647✔
811
  taosRLockLatch(&tmq->lock);
11,647✔
812
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
11,647✔
813
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
11,647!
814

815
  for (int32_t i = 0; i < numOfTopics; i++) {
23,191✔
816
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
11,545✔
817
    if (pTopic == NULL) {
11,545!
818
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
819
      goto END;
×
820
    }
821
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
11,545✔
822
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
11,545!
823
    for (int32_t j = 0; j < numOfVgroups; j++) {
34,316✔
824
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
22,772✔
825
      if (pVg == NULL) {
22,771!
826
        code = terrno;
×
827
        goto END;
×
828
      }
829

830
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
22,771✔
831
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
22,771✔
832
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
1,540!
833
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
834
      }
835
    }
836
  }
837
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
11,646!
838
           numOfTopics);
839
END:
10,954✔
840
  taosRUnLockLatch(&tmq->lock);
11,647✔
841
  return code;
11,647✔
842
}
843

844
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
11,647✔
845
  int32_t code = 0;
11,647✔
846
  SMqCommitCbParamSet* pParamSet = NULL;
11,647✔
847
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
848
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
11,647✔
849
  if (code != 0) {
11,647!
850
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
851
    if (pCommitFp != NULL) {
×
852
      pCommitFp(tmq, code, userParam);
×
853
    }
854
    return;
×
855
  }
856
  code = innerCommitAll(tmq, pParamSet);
11,647✔
857
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
11,647✔
858
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
310!
859
  }
860

861
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
11,647✔
862
  if (code != 0) {
11,647!
863
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
864
  }
865
  return;
11,647✔
866
}
867

868
static void generateTimedTask(int64_t refId, int32_t type) {
15,803✔
869
  tmq_t*  tmq = NULL;
15,803✔
870
  int8_t* pTaskType = NULL;
15,803✔
871
  int32_t code = 0;
15,803✔
872

873
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
15,803✔
874
  if (tmq == NULL) return;
15,803!
875

876
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
15,803✔
877
  if (code == TSDB_CODE_SUCCESS) {
15,803!
878
    *pTaskType = type;
15,803✔
879
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
15,803!
880
      if (tsem2_post(&tmq->rspSem) != 0){
15,803!
881
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
882
      }
883
    }else{
884
      taosFreeQitem(pTaskType);
×
885
    }
886
  }
887

888
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
15,803✔
889
  if (code != 0){
15,803!
890
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
891
  }
892
}
893

894
void tmqAssignAskEpTask(void* param, void* tmrId) {
4,675✔
895
  int64_t refId = (int64_t)param;
4,675✔
896
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
4,675✔
897
}
4,675✔
898

899
void tmqReplayTask(void* param, void* tmrId) {
12✔
900
  int64_t refId = (int64_t)param;
12✔
901
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
12✔
902
  if (tmq == NULL) return;
12!
903

904
  if (tsem2_post(&tmq->rspSem) != 0){
12!
905
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
906
  }
907
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
12✔
908
  if (code != 0){
12!
909
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
910
  }
911
}
912

913
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
11,128✔
914
  int64_t refId = (int64_t)param;
11,128✔
915
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
11,128✔
916
}
11,128✔
917

918
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
2,575✔
919
  if (pMsg == NULL) {
2,575!
920
    return TSDB_CODE_INVALID_PARA;
×
921
  }
922

923
  if (param == NULL || code != 0){
2,575!
924
    goto END;
29✔
925
  }
926

927
  SMqHbRsp rsp = {0};
2,546✔
928
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
2,546✔
929
  if (code != 0) {
2,546!
930
    goto END;
×
931
  }
932

933
  int64_t refId = (int64_t)param;
2,546✔
934
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,546✔
935
  if (tmq != NULL) {
2,546!
936
    taosWLockLatch(&tmq->lock);
2,546✔
937
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
4,849✔
938
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
2,303✔
939
      if (privilege && privilege->noPrivilege == 1) {
2,303!
940
        int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
3✔
941
        for (int32_t j = 0; j < topicNumCur; j++) {
6✔
942
          SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
3✔
943
          if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
3!
944
            tqInfoC("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
3!
945
            pTopicCur->noPrivilege = 1;
3✔
946
          }
947
        }
948
      }
949
    }
950
    taosWUnLockLatch(&tmq->lock);
2,546✔
951
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
2,546✔
952
    if (code != 0){
2,546!
953
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
954
    }
955
  }
956

957
  tqClientDebugFlag = rsp.debugFlag;
2,546✔
958

959
  tDestroySMqHbRsp(&rsp);
2,546✔
960

961
END:
2,575✔
962
  taosMemoryFree(pMsg->pData);
2,575✔
963
  taosMemoryFree(pMsg->pEpSet);
2,575✔
964
  return code;
2,575✔
965
}
966

967
void tmqSendHbReq(void* param, void* tmrId) {
2,575✔
968
  int64_t refId = (int64_t)param;
2,575✔
969

970
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2,575✔
971
  if (tmq == NULL) {
2,575!
972
    return;
×
973
  }
974

975
  SMqHbReq req = {0};
2,575✔
976
  req.consumerId = tmq->consumerId;
2,575✔
977
  req.epoch = tmq->epoch;
2,575✔
978
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
2,575✔
979
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
2,575!
980
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
2,575✔
981
  if (req.topics == NULL) {
2,575!
982
    goto END;
×
983
  }
984
  taosRLockLatch(&tmq->lock);
2,575✔
985
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
4,932✔
986
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
2,357✔
987
    if (pTopic == NULL) {
2,357!
988
      continue;
×
989
    }
990
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
2,357✔
991
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
2,357✔
992
    if (data == NULL) {
2,357!
993
      continue;
×
994
    }
995
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
2,357✔
996
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
2,357✔
997
    if (data->offsetRows == NULL) {
2,357!
998
      continue;
×
999
    }
1000
    for (int j = 0; j < numOfVgroups; j++) {
8,240✔
1001
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
5,883✔
1002
      if (pVg == NULL) {
5,883!
1003
        continue;
×
1004
      }
1005
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
5,883✔
1006
      if (offRows == NULL) {
5,883!
1007
        continue;
×
1008
      }
1009
      offRows->vgId = pVg->vgId;
5,883✔
1010
      offRows->rows = pVg->numOfRows;
5,883✔
1011
      offRows->offset = pVg->offsetInfo.endOffset;
5,883✔
1012
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
5,883✔
1013
      char buf[TSDB_OFFSET_LEN] = {0};
5,883✔
1014
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
5,883✔
1015
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
5,883!
1016
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1017
    }
1018
  }
1019
  taosRUnLockLatch(&tmq->lock);
2,575✔
1020

1021
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
2,575✔
1022
  if (tlen < 0) {
2,575!
1023
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1024
    goto END;
×
1025
  }
1026

1027
  void* pReq = taosMemoryCalloc(1, tlen);
2,575✔
1028
  if (pReq == NULL) {
2,575!
1029
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1030
    goto END;
×
1031
  }
1032

1033
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
2,575!
1034
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1035
    taosMemoryFree(pReq);
×
1036
    goto END;
×
1037
  }
1038

1039
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,575✔
1040
  if (sendInfo == NULL) {
2,575!
1041
    taosMemoryFree(pReq);
×
1042
    goto END;
×
1043
  }
1044

1045
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2,575✔
1046

1047
  sendInfo->requestId = generateRequestId();
2,575✔
1048
  sendInfo->requestObjRefId = 0;
2,575✔
1049
  sendInfo->param = (void*)refId;
2,575✔
1050
  sendInfo->fp = tmqHbCb;
2,575✔
1051
  sendInfo->msgType = TDMT_MND_TMQ_HB;
2,575✔
1052

1053
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
2,575✔
1054

1055
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
2,575✔
1056
  if (code != 0) {
2,575!
1057
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1058
  }
1059
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
2,575✔
1060

1061
END:
2,575✔
1062
  tDestroySMqHbReq(&req);
2,575✔
1063
  if (tmrId != NULL) {
2,575✔
1064
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
1,746✔
1065
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
1,746!
1066
  }
1067
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
2,575✔
1068
  if (ret != 0){
2,575!
1069
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1070
  }
1071
}
1072

1073
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
1,175✔
1074
  if (code != 0) {
1,175!
1075
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1076
  }
1077
}
1,175✔
1078

1079
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
58,433✔
1080
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
58,433✔
1081
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
6,415✔
1082
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
52,018✔
1083
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
51,798✔
1084
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
220✔
1085
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
13!
1086
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
207✔
1087
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
189!
1088
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
18!
1089
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
19!
1090
  }
1091
}
58,434✔
1092

1093
static void freeClientVg(void* param) {
3,774✔
1094
  SMqClientVg* pVg = param;
3,774✔
1095
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
3,774✔
1096
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
3,774✔
1097
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
3,774✔
1098
}
3,774✔
1099
static void freeClientTopic(void* param) {
2,793✔
1100
  SMqClientTopic* pTopic = param;
2,793✔
1101
  taosMemoryFreeClear(pTopic->schema.pSchema);
2,793✔
1102
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
2,793✔
1103
}
2,793✔
1104

1105
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
2,793✔
1106
                                   tmq_t* tmq) {
1107
  pTopic->schema = pTopicEp->schema;
2,793✔
1108
  pTopicEp->schema.nCols = 0;
2,793✔
1109
  pTopicEp->schema.pSchema = NULL;
2,793✔
1110

1111
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
2,793✔
1112
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
2,793✔
1113

1114
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
2,793✔
1115
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
2,793✔
1116

1117
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
2,793!
1118
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
2,793✔
1119
  if (pTopic->vgs == NULL) {
2,793!
1120
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1121
    return;
×
1122
  }
1123
  for (int32_t j = 0; j < vgNumGet; j++) {
6,567✔
1124
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
3,774✔
1125
    if (pVgEp == NULL) {
3,774!
1126
      continue;
×
1127
    }
1128
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
3,774✔
1129
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
3,774✔
1130

1131
    STqOffsetVal offsetNew = {0};
3,774✔
1132
    offsetNew.type = tmq->resetOffsetCfg;
3,774✔
1133

1134
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
3,774!
1135
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1136

1137
    SMqClientVg clientVg = {
11,322✔
1138
        .pollCnt = 0,
1139
        .vgId = pVgEp->vgId,
3,774✔
1140
        .epSet = pVgEp->epSet,
1141
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
3,774✔
1142
        .vgSkipCnt = 0,
1143
        .emptyBlockReceiveTs = 0,
1144
        .blockReceiveTs = 0,
1145
        .blockSleepForReplay = 0,
1146
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
3,774✔
1147
    };
1148

1149
    clientVg.offsetInfo.walVerBegin = -1;
3,774✔
1150
    clientVg.offsetInfo.walVerEnd = -1;
3,774✔
1151
    clientVg.seekUpdated = false;
3,774✔
1152
    if (pInfo) {
3,774✔
1153
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
2,341✔
1154
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
2,341✔
1155
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
2,341✔
1156
    } else {
1157
      clientVg.offsetInfo.endOffset = offsetNew;
1,433✔
1158
      clientVg.offsetInfo.committedOffset = offsetNew;
1,433✔
1159
      clientVg.offsetInfo.beginOffset = offsetNew;
1,433✔
1160
    }
1161
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
7,548!
1162
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1163
               pTopic->topicName);
1164
      freeClientVg(&clientVg);
×
1165
    }
1166
  }
1167
}
1168

1169
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
2,703✔
1170
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
2,703✔
1171
  if (pVgOffsetHashMap == NULL) {
2,703!
1172
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1173
    return;
×
1174
  }
1175

1176
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
2,703✔
1177
  for (int32_t i = 0; i < topicNumCur; i++) {
4,994✔
1178
    // find old topic
1179
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
2,291✔
1180
    if (pTopicCur && pTopicCur->vgs) {
2,291!
1181
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
2,291✔
1182
      tqInfoC("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
2,291!
1183
      for (int32_t j = 0; j < vgNumCur; j++) {
4,647✔
1184
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
2,356✔
1185
        if (pVgCur == NULL) {
2,356!
1186
          continue;
×
1187
        }
1188
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
2,356✔
1189
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
2,356✔
1190

1191
        char buf[TSDB_OFFSET_LEN] = {0};
2,356✔
1192
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
2,356✔
1193
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
2,356!
1194

1195
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
2,356✔
1196
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1197
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1198
            .numOfRows = pVgCur->numOfRows,
2,356✔
1199
            .vgStatus = pVgCur->vgStatus};
2,356✔
1200
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
2,356!
1201
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1202
        }
1203
      }
1204
    }
1205
  }
1206

1207
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
5,496✔
1208
    SMqClientTopic topic = {0};
2,793✔
1209
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
2,793✔
1210
    if (pTopicEp == NULL) {
2,793!
1211
      continue;
×
1212
    }
1213
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
2,793✔
1214
    if (taosArrayPush(newTopics, &topic) == NULL) {
2,793!
1215
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1216
      freeClientTopic(&topic);
×
1217
    }
1218
  }
1219

1220
  taosHashCleanup(pVgOffsetHashMap);
2,703✔
1221
}
1222

1223
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
7,695✔
1224
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
7,695✔
1225
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1226
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1227
  if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
7,695✔
1228
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
4,551!
1229
             tmq->epoch, epoch, topicNumGet);
1230
    return;
4,551✔
1231
  }
1232

1233
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
3,144✔
1234
  if (newTopics == NULL) {
3,144!
1235
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1236
    return;
×
1237
  }
1238
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
3,144!
1239
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1240

1241
  taosWLockLatch(&tmq->lock);
3,144✔
1242
  if (topicNumGet > 0){
3,144✔
1243
    buildNewTopicList(tmq, newTopics, pRsp);
2,703✔
1244
  }
1245
  // destroy current buffered existed topics info
1246
  if (tmq->clientTopics) {
3,144!
1247
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
3,144✔
1248
  }
1249
  tmq->clientTopics = newTopics;
3,144✔
1250
  taosWUnLockLatch(&tmq->lock);
3,144✔
1251

1252
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
3,144✔
1253
  atomic_store_32(&tmq->epoch, epoch);
3,144✔
1254

1255
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
3,144!
1256
}
1257

1258
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
10,244✔
1259
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
10,244✔
1260
  if (pParam == NULL) {
10,244!
1261
    goto FAIL;
×
1262
  }
1263

1264
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
10,244✔
1265
  if (tmq == NULL) {
10,245!
1266
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1267
    goto FAIL;
×
1268
  }
1269

1270
  if (code != TSDB_CODE_SUCCESS) {
10,245✔
1271
    if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
2,542✔
1272
      tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
50!
1273
    }
1274
    goto END;
2,542✔
1275
  }
1276

1277
  if (pMsg == NULL) {
7,703!
1278
    goto END;
×
1279
  }
1280
  SMqRspHead* head = pMsg->pData;
7,703✔
1281
  int32_t     epoch = atomic_load_32(&tmq->epoch);
7,703✔
1282
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
7,703!
1283
  if (pParam->sync) {
7,703✔
1284
    SMqAskEpRsp rsp = {0};
1,288✔
1285
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
2,576!
1286
      doUpdateLocalEp(tmq, head->epoch, &rsp);
1,288✔
1287
    }
1288
    tDeleteSMqAskEpRsp(&rsp);
1289
  } else {
1290
    SMqRspWrapper* pWrapper = NULL;
6,415✔
1291
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
6,415✔
1292
    if (code) {
6,415!
1293
      goto END;
×
1294
    }
1295

1296
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
6,415✔
1297
    pWrapper->epoch = head->epoch;
6,415✔
1298
    (void)memcpy(&pWrapper->epRsp, pMsg->pData, sizeof(SMqRspHead));
6,415✔
1299
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->epRsp) == NULL) {
12,830!
1300
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1301
      taosFreeQitem(pWrapper);
×
1302
    } else {
1303
      code = taosWriteQitem(tmq->mqueue, pWrapper);
6,415✔
1304
      if (code != 0) {
6,415!
1305
        tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1306
        taosFreeQitem(pWrapper);
×
1307
        tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1308
      }
1309
    }
1310
  }
1311

1312
  END:
10,245✔
1313
  {
1314
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
10,245✔
1315
    if (ret != 0){
10,245!
1316
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1317
    }
1318
  }
1319

1320
  FAIL:
10,245✔
1321
  if (pParam && pParam->sync) {
10,245!
1322
    SAskEpInfo* pInfo = pParam->pParam;
3,774✔
1323
    if (pInfo) {
3,774!
1324
      pInfo->code = code;
3,774✔
1325
      if (tsem2_post(&pInfo->sem) != 0){
3,774!
1326
        tqErrorC("failed to post rsp sem askep cb");
×
1327
      }
1328
    }
1329
  }
1330

1331
  if (pMsg) {
10,245!
1332
    taosMemoryFree(pMsg->pEpSet);
10,245✔
1333
    taosMemoryFree(pMsg->pData);
10,245✔
1334
  }
1335

1336
  return code;
10,245✔
1337
}
1338

1339
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
10,245✔
1340
  SMqAskEpReq req = {0};
10,245✔
1341
  req.consumerId = pTmq->consumerId;
10,245✔
1342
  req.epoch = updateEpSet ? -1 : pTmq->epoch;
10,245✔
1343
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
10,245✔
1344
  int              code = 0;
10,245✔
1345
  SMqAskEpCbParam* pParam = NULL;
10,245✔
1346
  void*            pReq = NULL;
10,245✔
1347

1348
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
10,245✔
1349
  if (tlen < 0) {
10,245!
1350
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
×
1351
    return TSDB_CODE_INVALID_PARA;
×
1352
  }
1353

1354
  pReq = taosMemoryCalloc(1, tlen);
10,245✔
1355
  if (pReq == NULL) {
10,245!
1356
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
×
1357
    return terrno;
×
1358
  }
1359

1360
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
10,245!
1361
    tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
×
1362
    taosMemoryFree(pReq);
×
1363
    return TSDB_CODE_INVALID_PARA;
×
1364
  }
1365

1366
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
10,243✔
1367
  if (pParam == NULL) {
10,245!
1368
    tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
×
1369
    taosMemoryFree(pReq);
×
1370
    return terrno;
×
1371
  }
1372

1373
  pParam->refId = pTmq->refId;
10,245✔
1374
  pParam->sync = sync;
10,245✔
1375
  pParam->pParam = param;
10,245✔
1376

1377
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
10,245✔
1378
  if (sendInfo == NULL) {
10,243!
1379
    taosMemoryFree(pReq);
×
1380
    taosMemoryFree(pParam);
×
1381
    return terrno;
×
1382
  }
1383

1384
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
10,243✔
1385
  sendInfo->requestId = generateRequestId();
10,243✔
1386
  sendInfo->requestObjRefId = 0;
10,243✔
1387
  sendInfo->param = pParam;
10,243✔
1388
  sendInfo->paramFreeFp = taosMemoryFree;
10,243✔
1389
  sendInfo->fp = askEpCb;
10,243✔
1390
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
10,243✔
1391

1392
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
10,243✔
1393
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
10,244!
1394
  return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
10,245✔
1395
}
1396

1397
void tmqHandleAllDelayedTask(tmq_t* pTmq) {
102,519✔
1398
  STaosQall* qall = NULL;
102,519✔
1399
  int32_t    code = 0;
102,519✔
1400

1401
  code = taosAllocateQall(&qall);
102,519✔
1402
  if (code) {
102,525!
1403
    tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
×
1404
    return;
88,315✔
1405
  }
1406

1407
  int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
102,525✔
1408
  if (numOfItems == 0) {
102,523✔
1409
    taosFreeQall(qall);
88,315✔
1410
    return;
88,315✔
1411
  }
1412

1413
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
14,208!
1414
  int8_t* pTaskType = NULL;
14,208✔
1415
  while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
29,422✔
1416
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
15,214✔
1417
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
4,268!
1418
      code = askEp(pTmq, NULL, false, false);
4,268✔
1419
      if (code != 0) {
4,268!
1420
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1421
        continue;
×
1422
      }
1423
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
4,268!
1424
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
4,268✔
1425
                              &pTmq->epTimer);
1426
      tqDebugC("reset timer for tmq ask ep:%d", ret);
4,268!
1427
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
10,946!
1428
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
10,946✔
1429
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
10,946✔
1430
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
10,946!
1431
               pTmq->autoCommitInterval / 1000.0);
1432
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
10,946✔
1433
                              &pTmq->commitTimer);
1434
      tqDebugC("reset timer for commit:%d", ret);
10,946!
1435
    } else {
1436
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1437
    }
1438

1439
    taosFreeQitem(pTaskType);
15,214✔
1440
  }
1441

1442
  taosFreeQall(qall);
14,210✔
1443
}
1444

1445
void tmqClearUnhandleMsg(tmq_t* tmq) {
446✔
1446
  SMqRspWrapper* rspWrapper = NULL;
446✔
1447
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
482✔
1448
    tmqFreeRspWrapper(rspWrapper);
36✔
1449
    taosFreeQitem(rspWrapper);
36✔
1450
  }
1451

1452
  rspWrapper = NULL;
446✔
1453
  if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
446✔
1454
    return;
89✔
1455
  }
1456
  while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
1,409✔
1457
    tmqFreeRspWrapper(rspWrapper);
1,052✔
1458
    taosFreeQitem(rspWrapper);
1,052✔
1459
  }
1460
}
1461

1462
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
1,296✔
1463
  if (pMsg) {
1,296!
1464
    taosMemoryFreeClear(pMsg->pEpSet);
1,296✔
1465
  }
1466

1467
  if (param == NULL) {
1,296!
1468
    return code;
×
1469
  }
1470

1471
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
1,296✔
1472
  pParam->rspErr = code;
1,296✔
1473

1474
  if (tsem2_post(&pParam->rspSem) != 0){
1,296!
1475
    tqErrorC("failed to post sem, subscribe cb");
×
1476
  }
1477
  return 0;
1,296✔
1478
}
1479

1480
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
22✔
1481
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
22!
1482
  if (*topics == NULL) {
22✔
1483
    *topics = tmq_list_new();
15✔
1484
    if (*topics == NULL) {
15!
1485
      return terrno;
×
1486
    }
1487
  }
1488
  taosRLockLatch(&tmq->lock);
22✔
1489
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
31✔
1490
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
9✔
1491
    if (topic == NULL) {
9!
1492
      tqErrorC("topic is null");
×
1493
      continue;
×
1494
    }
1495
    char* tmp = strchr(topic->topicName, '.');
9✔
1496
    if (tmp == NULL) {
9!
1497
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1498
      continue;
×
1499
    }
1500
    if (tmq_list_append(*topics, tmp + 1) != 0) {
9!
1501
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1502
      continue;
×
1503
    }
1504
  }
1505
  taosRUnLockLatch(&tmq->lock);
22✔
1506
  return 0;
22✔
1507
}
1508

1509
void tmqFreeImpl(void* handle) {
446✔
1510
  tmq_t*  tmq = (tmq_t*)handle;
446✔
1511
  int64_t id = tmq->consumerId;
446✔
1512

1513
  if (tmq->mqueue) {
446!
1514
    tmqClearUnhandleMsg(tmq);
446✔
1515
    taosCloseQueue(tmq->mqueue);
446✔
1516
  }
1517

1518
  if (tmq->delayedTask) {
446!
1519
    taosCloseQueue(tmq->delayedTask);
446✔
1520
  }
1521

1522
  taosFreeQall(tmq->qall);
446✔
1523
  if(tsem2_destroy(&tmq->rspSem) != 0) {
446!
1524
    tqErrorC("failed to destroy sem in free tmq");
×
1525
  }
1526

1527
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
446✔
1528
  taos_close_internal(tmq->pTscObj);
446✔
1529

1530
  if (tmq->commitTimer) {
446✔
1531
    if (!taosTmrStopA(&tmq->commitTimer)) {
442✔
1532
      tqErrorC("failed to stop commit timer");
182!
1533
    }
1534
  }
1535
  if (tmq->epTimer) {
446✔
1536
    if (!taosTmrStopA(&tmq->epTimer)) {
442✔
1537
      tqErrorC("failed to stop ep timer");
407!
1538
    }
1539
  }
1540
  if (tmq->hbLiveTimer) {
446!
1541
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
446!
1542
      tqErrorC("failed to stop hb timer");
×
1543
    }
1544
  }
1545
  taosMemoryFree(tmq);
446✔
1546

1547
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
446!
1548
}
446✔
1549

1550
static void tmqMgmtInit(void) {
302✔
1551
  tmqInitRes = 0;
302✔
1552
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
302✔
1553

1554
  if (tmqMgmt.timer == NULL) {
302!
1555
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
×
1556
  }
1557

1558
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
302✔
1559
  if (tmqMgmt.rsetId < 0) {
302!
1560
    tmqInitRes = terrno;
×
1561
  }
1562
}
302✔
1563

1564
void tmqMgmtClose(void) {
3,289✔
1565
  if (tmqMgmt.timer) {
3,289✔
1566
    taosTmrCleanUp(tmqMgmt.timer);
302✔
1567
    tmqMgmt.timer = NULL;
302✔
1568
  }
1569

1570
  if (tmqMgmt.rsetId >= 0) {
3,289!
1571
    taosCloseRef(tmqMgmt.rsetId);
3,289✔
1572
    tmqMgmt.rsetId = -1;
3,289✔
1573
  }
1574
}
3,289✔
1575

1576
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
434✔
1577
  int32_t code = 0;
434✔
1578

1579
  if (conf == NULL) {
434!
1580
    SET_ERROR_MSG_TMQ("configure is null")
×
1581
    return NULL;
×
1582
  }
1583
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
434✔
1584
  if (code != 0) {
446!
1585
    SET_ERROR_MSG_TMQ("tmq init error")
×
1586
    return NULL;
×
1587
  }
1588
  if (tmqInitRes != 0) {
446!
1589
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1590
    return NULL;
×
1591
  }
1592

1593
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
446✔
1594
  if (pTmq == NULL) {
446!
1595
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1596
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1597
    return NULL;
×
1598
  }
1599

1600
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
446✔
1601
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
446✔
1602

1603
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
446✔
1604
  if (pTmq->clientTopics == NULL) {
446!
1605
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1606
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1607
    goto _failed;
×
1608
  }
1609
  code = taosOpenQueue(&pTmq->mqueue);
446✔
1610
  if (code) {
446!
1611
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1612
             pTmq->groupId);
1613
    SET_ERROR_MSG_TMQ("open queue failed")
×
1614
    goto _failed;
×
1615
  }
1616

1617
  code = taosOpenQueue(&pTmq->delayedTask);
446✔
1618
  if (code) {
446!
1619
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1620
             pTmq->groupId);
1621
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1622
    goto _failed;
×
1623
  }
1624

1625
  code = taosAllocateQall(&pTmq->qall);
446✔
1626
  if (code) {
446!
1627
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1628
             pTmq->groupId);
1629
    SET_ERROR_MSG_TMQ("allocate qall failed")
×
1630
    goto _failed;
×
1631
  }
1632

1633
  if (conf->groupId[0] == 0) {
446!
1634
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1635
             pTmq->groupId);
1636
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1637
    goto _failed;
×
1638
  }
1639

1640
  // init status
1641
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
446✔
1642
  pTmq->pollCnt = 0;
446✔
1643
  pTmq->epoch = 0;
446✔
1644
  pTmq->pollFlag = 0;
446✔
1645

1646
  // set conf
1647
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
446✔
1648
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
446✔
1649
  pTmq->withTbName = conf->withTbName;
446✔
1650
  pTmq->useSnapshot = conf->snapEnable;
446✔
1651
  pTmq->autoCommit = conf->autoCommit;
446✔
1652
  pTmq->autoCommitInterval = conf->autoCommitInterval;
446✔
1653
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
446✔
1654
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
446✔
1655
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
446✔
1656
  pTmq->commitCb = conf->commitCb;
446✔
1657
  pTmq->commitCbUserParam = conf->commitCbUserParam;
446✔
1658
  pTmq->resetOffsetCfg = conf->resetOffset;
446✔
1659
  pTmq->replayEnable = conf->replayEnable;
446✔
1660
  pTmq->sourceExcluded = conf->sourceExcluded;
446✔
1661
  pTmq->enableBatchMeta = conf->enableBatchMeta;
446✔
1662
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
446✔
1663
  if (taosGetFqdn(pTmq->fqdn) != 0) {
446!
1664
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1665
  }
1666
  if (conf->replayEnable) {
445✔
1667
    pTmq->autoCommit = false;
7✔
1668
  }
1669
  taosInitRWLatch(&pTmq->lock);
445✔
1670

1671
  // assign consumerId
1672
  pTmq->consumerId = tGenIdPI64();
444✔
1673

1674
  // init semaphore
1675
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
446!
1676
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1677
             tstrerror(TAOS_SYSTEM_ERROR(errno)), pTmq->groupId);
1678
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1679
    goto _failed;
×
1680
  }
1681

1682
  // init connection
1683
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
441✔
1684
  if (code) {
446!
1685
    terrno = code;
×
1686
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1687
    SET_ERROR_MSG_TMQ("init tscObj failed")
×
1688
    goto _failed;
×
1689
  }
1690

1691
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
446✔
1692
  if (pTmq->refId < 0) {
446!
1693
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1694
    goto _failed;
×
1695
  }
1696

1697
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
446✔
1698
  if (pTmq->hbLiveTimer == NULL) {
446!
1699
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1700
    goto _failed;
×
1701
  }
1702
  char         buf[TSDB_OFFSET_LEN] = {0};
446✔
1703
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
446✔
1704
  tFormatOffset(buf, tListLen(buf), &offset);
446✔
1705
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
446!
1706
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
1707
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1708
          buf);
1709

1710
  return pTmq;
446✔
1711

1712
_failed:
×
1713
  tmqFreeImpl(pTmq);
×
1714
  return NULL;
×
1715
}
1716

1717
static int32_t syncAskEp(tmq_t* pTmq) {
3,774✔
1718
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
3,774✔
1719
  if (pInfo == NULL) return terrno;
3,774!
1720
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
3,774!
1721
    taosMemoryFree(pInfo);
×
1722
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1723
  }
1724

1725
  int32_t code = askEp(pTmq, pInfo, true, false);
3,774✔
1726
  if (code == 0) {
3,774!
1727
    if (tsem2_wait(&pInfo->sem) != 0){
3,774!
1728
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1729
    }
1730
    code = pInfo->code;
3,774✔
1731
  }
1732

1733
  if(tsem2_destroy(&pInfo->sem) != 0) {
3,774!
1734
    tqErrorC("failed to destroy sem sync ask ep");
×
1735
  }
1736
  taosMemoryFree(pInfo);
3,774✔
1737
  return code;
3,774✔
1738
}
1739

1740
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1,296✔
1741
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
1,296!
1742
  const SArray*   container = &topic_list->container;
1,296✔
1743
  int32_t         sz = taosArrayGetSize(container);
1,296✔
1744
  void*           buf = NULL;
1,296✔
1745
  SMsgSendInfo*   sendInfo = NULL;
1,296✔
1746
  SCMSubscribeReq req = {0};
1,296✔
1747
  int32_t         code = 0;
1,296✔
1748

1749
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
1,296!
1750

1751
  req.consumerId = tmq->consumerId;
1,296✔
1752
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
1,296✔
1753
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1,296✔
1754
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
1,296✔
1755
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
1,296✔
1756

1757
  req.topicNames = taosArrayInit(sz, sizeof(void*));
1,296✔
1758
  if (req.topicNames == NULL) {
1,296!
1759
    code = terrno;
×
1760
    goto END;
×
1761
  }
1762

1763
  req.withTbName = tmq->withTbName;
1,296✔
1764
  req.autoCommit = tmq->autoCommit;
1,296✔
1765
  req.autoCommitInterval = tmq->autoCommitInterval;
1,296✔
1766
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
1,296✔
1767
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
1,296✔
1768
  req.resetOffsetCfg = tmq->resetOffsetCfg;
1,296✔
1769
  req.enableReplay = tmq->replayEnable;
1,296✔
1770
  req.enableBatchMeta = tmq->enableBatchMeta;
1,296✔
1771

1772
  for (int32_t i = 0; i < sz; i++) {
1,822✔
1773
    char* topic = taosArrayGetP(container, i);
526✔
1774
    if (topic == NULL) {
526!
1775
      code = terrno;
×
1776
      goto END;
×
1777
    }
1778
    SName name = {0};
526✔
1779
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
526✔
1780
    if (code) {
526!
1781
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1782
               code);
1783
      goto END;
×
1784
    }
1785
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
526✔
1786
    if (topicFName == NULL) {
525!
1787
      code = terrno;
×
1788
      goto END;
×
1789
    }
1790

1791
    code = tNameExtractFullName(&name, topicFName);
525✔
1792
    if (code) {
526!
1793
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1794
               code);
1795
      taosMemoryFree(topicFName);
×
1796
      goto END;
×
1797
    }
1798

1799
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
1,052!
1800
      code = terrno;
×
1801
      taosMemoryFree(topicFName);
×
1802
      goto END;
×
1803
    }
1804
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
526!
1805
  }
1806

1807
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1,296✔
1808
  buf = taosMemoryMalloc(tlen);
1,296✔
1809
  if (buf == NULL) {
1,296!
1810
    code = terrno;
×
1811
    goto END;
×
1812
  }
1813

1814
  void* abuf = buf;
1,296!
1815
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
1,296✔
1816

1817
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,296✔
1818
  if (sendInfo == NULL) {
1,296!
1819
    code = terrno;
×
1820
    taosMemoryFree(buf);
×
1821
    goto END;
×
1822
  }
1823

1824
  SMqSubscribeCbParam param = {.rspErr = 0};
1,296✔
1825
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
1,296!
1826
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1827
    taosMemoryFree(buf);
×
1828
    taosMemoryFree(sendInfo);
×
1829
    goto END;
×
1830
  }
1831

1832
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1,296✔
1833
  sendInfo->requestId = generateRequestId();
1,296✔
1834
  sendInfo->requestObjRefId = 0;
1,296✔
1835
  sendInfo->param = &param;
1,296✔
1836
  sendInfo->fp = tmqSubscribeCb;
1,296✔
1837
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
1,296✔
1838

1839
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1,296✔
1840

1841
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
1,296✔
1842
  if (code != 0) {
1,296!
1843
    goto END;
×
1844
  }
1845

1846
  if (tsem2_wait(&param.rspSem) != 0){
1,296!
1847
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1848
  }
1849
  if(tsem2_destroy(&param.rspSem) != 0) {
1,296!
1850
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1851
  }
1852

1853
  if (param.rspErr != 0) {
1,296✔
1854
    code = param.rspErr;
5✔
1855
    goto END;
5✔
1856
  }
1857

1858
  int32_t retryCnt = 0;
1,291✔
1859
  while ((code = syncAskEp(tmq)) != 0) {
3,774✔
1860
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2,486!
1861
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
3!
1862
               tmq->consumerId, tstrerror(code));
1863
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
3!
1864
        code = 0;
3✔
1865
      }
1866
      goto END;
3✔
1867
    }
1868

1869
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
2,483!
1870
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
2,483✔
1871
  }
1872

1873
  if (tmq->epTimer == NULL){
1,288✔
1874
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
442✔
1875
  }
1876
  if (tmq->commitTimer == NULL){
1,288✔
1877
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
442✔
1878
  }
1879
  if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
1,288!
1880
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1881
    goto END;
×
1882
  }
1883

1884
END:
1,288✔
1885
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
1,296✔
1886
  return code;
1,296✔
1887
}
1888

1889
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
340✔
1890
  if (conf == NULL) return;
340!
1891
  conf->commitCb = cb;
340✔
1892
  conf->commitCbUserParam = param;
340✔
1893
}
1894

1895
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
50,938✔
1896
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
50,938✔
1897
  for (int i = 0; i < topicNumCur; i++) {
52,423✔
1898
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
52,412✔
1899
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
52,413!
1900
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
50,937✔
1901
      for (int32_t j = 0; j < vgNumCur; j++) {
104,841✔
1902
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
104,832✔
1903
        if (pVgCur && pVgCur->vgId == vgId) {
104,831!
1904
          *pVg = pVgCur;
50,928✔
1905
          return;
50,928✔
1906
        }
1907
      }
1908
    }
1909
  }
1910
}
1911

1912
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
47,052✔
1913
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
47,052✔
1914
  for (int i = 0; i < topicNumCur; i++) {
48,523✔
1915
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
48,522✔
1916
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
48,523✔
1917
      return pTopicCur;
47,051✔
1918
    }
1919
  }
1920
  return NULL;
1✔
1921
}
1922

1923
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
52,011✔
1924
  tmq_t*             tmq = NULL;
52,011✔
1925
  SMqRspWrapper*     pRspWrapper = NULL;
52,011✔
1926
  int8_t             rspType = 0;
52,011✔
1927
  int32_t            vgId = 0;
52,011✔
1928
  uint64_t           requestId = 0;
52,011✔
1929
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
52,011✔
1930
  if (pMsg == NULL) {
52,011!
1931
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1932
  }
1933
  if (pParam == NULL) {
52,011!
1934
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1935
    goto EXIT;
×
1936
  }
1937
  int64_t refId = pParam->refId;
52,011✔
1938
  vgId = pParam->vgId;
52,011✔
1939
  requestId = pParam->requestId;
52,011✔
1940
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
52,011✔
1941
  if (tmq == NULL) {
52,021!
1942
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1943
    goto EXIT;
×
1944
  }
1945

1946
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
52,021✔
1947
  if (ret) {
52,012✔
1948
    code = ret;
3✔
1949
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
3!
1950
    goto END;
×
1951
  }
1952

1953
  if (code != 0) {
52,009✔
1954
    goto END;
3,954✔
1955
  }
1956

1957
  if (pMsg->pData == NULL) {
48,055!
1958
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
1959
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1960
    goto END;
×
1961
  }
1962

1963
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
48,055✔
1964
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
48,055✔
1965

1966
  if (msgEpoch != clientEpoch) {
48,044✔
1967
    tqErrorC("consumer:0x%" PRIx64
36!
1968
             " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1969
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
1970
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
36✔
1971
    goto END;
36✔
1972
  }
1973
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
48,008✔
1974
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, requestId);
48,008!
1975
  if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
48,014✔
1976
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
47,793!
1977
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
221✔
1978
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
189!
1979
  } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
32✔
1980
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
13!
1981
  } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
19!
1982
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
19!
1983
  } else {  // invalid rspType
UNCOV
1984
    tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
×
1985
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1986
    goto END;
×
1987
  }
1988
  pRspWrapper->tmqRspType = rspType;
47,997✔
1989
  pRspWrapper->pollRsp.reqId = requestId;
47,997✔
1990
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
47,997✔
1991
  pMsg->pEpSet = NULL;
47,997✔
1992

1993
END:
51,987✔
1994
  if (pRspWrapper) {
51,987✔
1995
    pRspWrapper->code = code;
51,986✔
1996
    pRspWrapper->pollRsp.vgId = vgId;
51,986✔
1997
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
51,986✔
1998
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
51,986✔
1999
    if (code != 0) {
52,020!
2000
      tmqFreeRspWrapper(pRspWrapper);
×
2001
      taosFreeQitem(pRspWrapper);
×
2002
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2003
    } else {
2004
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
52,020!
2005
               tmq ? tmq->consumerId : 0, rspType, vgId, taosQueueItemSize(tmq->mqueue), requestId);
2006
    }
2007
  }
2008

2009

2010
  if (tsem2_post(&tmq->rspSem) != 0){
52,020!
2011
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2012
  }
2013
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
52,019✔
2014
  if (ret != 0){
52,021!
2015
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2016
  }
2017

2018
EXIT:
52,021✔
2019
  taosMemoryFreeClear(pMsg->pData);
52,021✔
2020
  taosMemoryFreeClear(pMsg->pEpSet);
52,017✔
2021
  return code;
52,017✔
2022
}
2023

2024
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
52,024✔
2025
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
52,024✔
2026
  pReq->withTbName = tmq->withTbName;
52,024✔
2027
  pReq->consumerId = tmq->consumerId;
52,024✔
2028
  pReq->timeout = timeout;
52,024✔
2029
  pReq->epoch = tmq->epoch;
52,024✔
2030
  pReq->reqOffset = pVg->offsetInfo.endOffset;
52,024✔
2031
  pReq->head.vgId = pVg->vgId;
52,024✔
2032
  pReq->useSnapshot = tmq->useSnapshot;
52,024✔
2033
  pReq->reqId = generateRequestId();
52,024✔
2034
  pReq->enableReplay = tmq->replayEnable;
52,024✔
2035
  pReq->sourceExcluded = tmq->sourceExcluded;
52,024✔
2036
  pReq->enableBatchMeta = tmq->enableBatchMeta;
52,024✔
2037
}
52,024✔
2038

2039
void changeByteEndian(char* pData) {
367,994✔
2040
  if (pData == NULL) {
367,994!
2041
    return;
×
2042
  }
2043
  char* p = pData;
367,994✔
2044

2045
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2046
  // length | version:
2047
  int32_t blockVersion = *(int32_t*)p;
367,994✔
2048
  if (blockVersion != BLOCK_VERSION_1) {
367,994!
2049
    tqErrorC("invalid block version:%d", blockVersion);
×
2050
    return;
×
2051
  }
2052
  *(int32_t*)p = BLOCK_VERSION_2;
367,994✔
2053

2054
  p += sizeof(int32_t);
367,994✔
2055
  p += sizeof(int32_t);
367,994✔
2056
  p += sizeof(int32_t);
367,994✔
2057
  int32_t cols = *(int32_t*)p;
367,994✔
2058
  p += sizeof(int32_t);
367,994✔
2059
  p += sizeof(int32_t);
367,994✔
2060
  p += sizeof(uint64_t);
367,994✔
2061
  // check fields
2062
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
367,994✔
2063

2064
  int32_t* colLength = (int32_t*)p;
367,994✔
2065
  for (int32_t i = 0; i < cols; ++i) {
2,311,721✔
2066
    colLength[i] = htonl(colLength[i]);
1,943,727✔
2067
  }
2068
}
2069

2070
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
719,786✔
2071
  if (pRetrieve == NULL) {
719,786!
2072
    return;
×
2073
  }
2074
  if (*(int64_t*)pRetrieve == 0) {
719,786!
2075
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2076
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2077
    if (precision != NULL) {
×
2078
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2079
    }
2080
  } else if (*(int64_t*)pRetrieve == 1) {
719,786!
2081
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
719,792✔
2082
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
719,792✔
2083
    if (precision != NULL) {
719,784✔
2084
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
351,835✔
2085
    }
2086
  }
2087
}
2088

2089
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
35,179✔
2090
                                        SMqRspObj* pRspObj) {
2091
  pRspObj->resIter = -1;
35,179✔
2092
  pRspObj->resInfo.totalRows = 0;
35,179✔
2093
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
35,179✔
2094

2095
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
35,179✔
2096
  bool needTransformSchema = !pDataRsp->withSchema;
35,179✔
2097
  if (!pDataRsp->withSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
35,179✔
2098
    pDataRsp->withSchema = true;
33,044✔
2099
    pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
33,044✔
2100
    if (pDataRsp->blockSchema == NULL) {
33,045!
2101
      tqErrorC("failed to allocate memory for blockSchema");
×
2102
      return;
×
2103
    }
2104
  }
2105
  // extract the rows in this data packet
2106
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
403,179✔
2107
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
367,996✔
2108
    void*   rawData = NULL;
367,993✔
2109
    int64_t rows = 0;
367,993✔
2110
    // deal with compatibility
2111
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
367,993✔
2112

2113
    pVg->numOfRows += rows;
367,995✔
2114
    (*numOfRows) += rows;
367,995✔
2115
    changeByteEndian(rawData);
367,995✔
2116
    if (needTransformSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
367,996✔
2117
      SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
249,175!
2118
      if (schema) {
249,176!
2119
        if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
498,355!
2120
          tqErrorC("failed to push schema into blockSchema");
×
2121
          continue;
×
2122
        }
2123
      }
2124
    }
2125
  }
2126
}
2127

2128
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
52,021✔
2129
  SMqPollReq      req = {0};
52,021✔
2130
  char*           msg = NULL;
52,021✔
2131
  SMqPollCbParam* pParam = NULL;
52,021✔
2132
  SMsgSendInfo*   sendInfo = NULL;
52,021✔
2133
  int             code = 0;
52,021✔
2134
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
52,021✔
2135

2136
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
52,021✔
2137
  if (msgSize < 0) {
52,016!
2138
    code = TSDB_CODE_INVALID_MSG;
×
2139
    return code;
×
2140
  }
2141

2142
  msg = taosMemoryCalloc(1, msgSize);
52,016✔
2143
  if (NULL == msg) {
52,019!
2144
    return terrno;
×
2145
  }
2146

2147
  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
52,019!
2148
    code = TSDB_CODE_INVALID_MSG;
×
2149
    taosMemoryFreeClear(msg);
×
2150
    return code;
×
2151
  }
2152

2153
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
52,020✔
2154
  if (pParam == NULL) {
52,020!
2155
    code = terrno;
×
2156
    taosMemoryFreeClear(msg);
×
2157
    return code;
×
2158
  }
2159

2160
  pParam->refId = pTmq->refId;
52,020✔
2161
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
52,020✔
2162
  pParam->vgId = pVg->vgId;
52,020✔
2163
  pParam->requestId = req.reqId;
52,020✔
2164

2165
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
52,020✔
2166
  if (sendInfo == NULL) {
52,021!
2167
    taosMemoryFreeClear(pParam);
×
2168
    taosMemoryFreeClear(msg);
×
2169
    return terrno;
×
2170
  }
2171

2172
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
52,021✔
2173
  sendInfo->requestId = req.reqId;
52,021✔
2174
  sendInfo->requestObjRefId = 0;
52,021✔
2175
  sendInfo->param = pParam;
52,021✔
2176
  sendInfo->paramFreeFp = taosMemoryFree;
52,021✔
2177
  sendInfo->fp = tmqPollCb;
52,021✔
2178
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
52,021✔
2179

2180
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
52,021✔
2181
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
52,021✔
2182
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
52,019✔
2183
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
52,021!
2184
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2185
  if (code != 0) {
52,021!
2186
    return code;
×
2187
  }
2188

2189
  pVg->pollCnt++;
52,021✔
2190
  pVg->seekUpdated = false;  // reset this flag.
52,021✔
2191
  pTmq->pollCnt++;
52,021✔
2192

2193
  return 0;
52,021✔
2194
}
2195

2196
// broadcast the poll request to all related vnodes
2197
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
102,524✔
2198
  int32_t code = 0;
102,524✔
2199

2200
  taosWLockLatch(&tmq->lock);
102,524✔
2201
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
102,524✔
2202
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
102,526!
2203

2204
  for (int i = 0; i < numOfTopics; i++) {
208,450✔
2205
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
105,918✔
2206
    if (pTopic == NULL) {
105,919!
2207
      continue;
×
2208
    }
2209
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
105,919✔
2210
    if (pTopic->noPrivilege) {
105,918✔
2211
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
11!
2212
      continue;
11✔
2213
    }
2214
    for (int j = 0; j < numOfVg; j++) {
408,136✔
2215
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
302,223✔
2216
      if (pVg == NULL) {
302,222!
2217
        continue;
×
2218
      }
2219
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
302,212✔
2220
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than 10ms
302,212!
2221
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
22,916!
2222
                 tmq->epoch, pVg->vgId);
2223
        continue;
22,916✔
2224
      }
2225

2226
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
279,301✔
2227
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
279,301!
2228
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
24!
2229
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2230
        continue;
24✔
2231
      }
2232

2233
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
279,277✔
2234
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
279,290✔
2235
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
227,270✔
2236
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
227,274!
2237
                 pVg->vgId, vgSkipCnt);
2238
        continue;
227,274✔
2239
      }
2240

2241
      atomic_store_32(&pVg->vgSkipCnt, 0);
52,020✔
2242
      code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
52,021✔
2243
      if (code != TSDB_CODE_SUCCESS) {
52,015!
2244
        goto end;
×
2245
      }
2246
    }
2247
  }
2248

2249
end:
102,532✔
2250
  taosWUnLockLatch(&tmq->lock);
102,532✔
2251
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
102,526!
2252
  return code;
102,527✔
2253
}
2254

2255
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
47,052✔
2256
                         int64_t consumerId, bool hasData) {
2257
  if (!pVg->seekUpdated) {
47,052✔
2258
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
47,050!
2259
    if (hasData) {
47,050✔
2260
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
35,389✔
2261
    }
2262
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
47,051✔
2263
  } else {
2264
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
2!
2265
  }
2266

2267
  // update the status
2268
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
47,053✔
2269

2270
  // update the valid wal version range
2271
  pVg->offsetInfo.walVerBegin = sver;
47,055✔
2272
  pVg->offsetInfo.walVerEnd = ever + 1;
47,055✔
2273
}
47,055✔
2274

2275
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
35,387✔
2276
  typedef union {
2277
    SMqDataRsp      dataRsp;
2278
    SMqMetaRsp      metaRsp;
2279
    SMqBatchMetaRsp batchMetaRsp;
2280
  } MEMSIZE;
2281

2282
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
35,387✔
2283
  if (pRspObj == NULL) {
35,390✔
2284
    tqErrorC("buildRsp:failed to allocate memory");
1!
2285
    return NULL;
×
2286
  }
2287
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
35,389✔
2288
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
35,389✔
2289
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
35,389✔
2290
  pRspObj->vgId = pollRspWrapper->vgId;
35,389✔
2291
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
35,389✔
2292
  return pRspObj;
35,389✔
2293
}
2294

2295
static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
3,885✔
2296
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
3,885✔
2297

2298
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
3,885✔
2299
    int32_t code = askEp(tmq, NULL, false, true);
2,194✔
2300
    if (code != 0) {
2,194!
2301
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2302
    }
2303
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1,691✔
2304
    int32_t code = askEp(tmq, NULL, false, false);
9✔
2305
    if (code != 0) {
9!
2306
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
×
2307
    }
2308
  }
2309
  tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
3,885!
2310
          tstrerror(pRspWrapper->code));
2311
  taosWLockLatch(&tmq->lock);
3,885✔
2312
  SMqClientVg* pVg = NULL;
3,885✔
2313
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
3,885✔
2314
  if (pVg) {
3,885✔
2315
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
3,875✔
2316
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
3,875✔
2317
  }
2318
  taosWUnLockLatch(&tmq->lock);
3,885✔
2319
}
3,885✔
2320
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
53,462✔
2321
  SMqRspObj* pRspObj = NULL;
53,462✔
2322

2323
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
53,462✔
2324
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
6,407!
2325
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
6,407✔
2326
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
6,407✔
2327
    return pRspObj;
6,407✔
2328
  }
2329

2330
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
47,055✔
2331
  taosWLockLatch(&tmq->lock);
47,055✔
2332
  SMqClientVg* pVg = NULL;
47,054✔
2333
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
47,054✔
2334
  if(pVg == NULL) {
47,054✔
2335
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
1!
2336
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2337
    goto END;
1✔
2338
  }
2339
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
47,053✔
2340
  if (pollRspWrapper->pEpset != NULL) {
47,050✔
2341
    pVg->epSet = *pollRspWrapper->pEpset;
21✔
2342
  }
2343

2344
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
93,896✔
2345
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
46,842✔
2346
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
46,842✔
2347

2348
    char buf[TSDB_OFFSET_LEN] = {0};
46,847✔
2349
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
46,847✔
2350
    if (pollRspWrapper->dataRsp.blockNum == 0) {
46,845✔
2351
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
11,664!
2352
                   ", total:%" PRId64 ",QID:0x%" PRIx64,
2353
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2354
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
23,328✔
2355
    } else {
2356
      pRspObj = buildRsp(pollRspWrapper);
35,181✔
2357
      if (pRspObj == NULL) {
35,181✔
2358
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
1!
2359
        goto END;
×
2360
      }
2361
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA;
35,180✔
2362
      int64_t numOfRows = 0;
35,180✔
2363
      tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
35,180✔
2364
      tmq->totalRows += numOfRows;
35,182✔
2365
      pVg->emptyBlockReceiveTs = 0;
35,182✔
2366
      if (tmq->replayEnable) {
35,182✔
2367
        pVg->blockReceiveTs = taosGetTimestampMs();
27✔
2368
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
27✔
2369
        if (pVg->blockSleepForReplay > 0) {
27✔
2370
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
12!
2371
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2372
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2373
          }
2374
        }
2375
      }
2376
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
35,182!
2377
                   ", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64,
2378
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2379
               pollRspWrapper->reqId);
2380
    }
2381
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
208!
2382
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
208✔
2383
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2384

2385

2386
    pRspObj = buildRsp(pollRspWrapper);
208✔
2387
    if (pRspObj == NULL) {
208!
2388
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2389
      goto END;
×
2390
    }
2391
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
208✔
2392
  }
2393

2394
  END:
×
2395
  taosWUnLockLatch(&tmq->lock);
47,055✔
2396
  return pRspObj;
47,055✔
2397
}
2398

2399
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
102,523✔
2400
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
102,523!
2401

2402
  void* returnVal = NULL;
102,525✔
2403
  while (1) {
21,957✔
2404
    SMqRspWrapper* pRspWrapper = NULL;
124,482✔
2405
    if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
124,482✔
2406
      if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
117,644✔
2407
        return NULL;
67,136✔
2408
      }
2409
      if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
50,510!
2410
        return NULL;
×
2411
      }
2412
    }
2413

2414
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
57,348!
2415
    if (pRspWrapper->code != 0) {
57,348✔
2416
      processMqRspError(tmq, pRspWrapper);
3,885✔
2417
    }else{
2418
      returnVal = processMqRsp(tmq, pRspWrapper);
53,463✔
2419
    }
2420
    tmqFreeRspWrapper(pRspWrapper);
57,345✔
2421
    taosFreeQitem(pRspWrapper);
57,347✔
2422
    if(returnVal != NULL){
57,348✔
2423
      break;
35,391✔
2424
    }
2425
  }
2426

2427
  return returnVal;
35,391✔
2428
}
2429

2430
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
35,662✔
2431
  if (tmq == NULL) return NULL;
35,662!
2432

2433
  void*   rspObj = NULL;
35,662✔
2434
  int64_t startTime = taosGetTimestampMs();
35,662✔
2435

2436
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
35,662!
2437
           timeout);
2438

2439
  // in no topic status, delayed task also need to be processed
2440
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
35,662!
2441
    tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
×
2442
    taosMsleep(500);  //     sleep for a while
×
2443
    return NULL;
×
2444
  }
2445

2446
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
35,662✔
2447

2448
  while (1) {
2449
    tmqHandleAllDelayedTask(tmq);
102,524✔
2450

2451
    if (tmqPollImpl(tmq, timeout) < 0) {
102,524!
2452
      tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
×
2453
    }
2454

2455
    rspObj = tmqHandleAllRsp(tmq, timeout);
102,527✔
2456
    if (rspObj) {
102,527✔
2457
      tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
35,391!
2458
      return (TAOS_RES*)rspObj;
35,391✔
2459
    }
2460

2461
    if (timeout >= 0) {
67,136!
2462
      int64_t currentTime = taosGetTimestampMs();
67,136✔
2463
      int64_t elapsedTime = currentTime - startTime;
67,136✔
2464
      if (elapsedTime > timeout || elapsedTime < 0) {
67,136!
2465
        tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
271!
2466
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
2467
        return NULL;
271✔
2468
      }
2469
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
66,865✔
2470
    } else {
2471
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2472
    }
2473
  }
2474
}
2475

2476
static void displayConsumeStatistics(tmq_t* pTmq) {
835✔
2477
  taosRLockLatch(&pTmq->lock);
835✔
2478
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
835✔
2479
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
835!
2480
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2481

2482
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
835!
2483
  for (int32_t i = 0; i < numOfTopics; ++i) {
1,337✔
2484
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
502✔
2485
    if (pTopics == NULL) continue;
502!
2486
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
502!
2487
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
502✔
2488
    for (int32_t j = 0; j < numOfVgs; ++j) {
1,920✔
2489
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
1,418✔
2490
      if (pVg == NULL) continue;
1,418!
2491
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
1,418!
2492
    }
2493
  }
2494
  taosRUnLockLatch(&pTmq->lock);
835✔
2495
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
835!
2496
}
835✔
2497

2498
int32_t tmq_unsubscribe(tmq_t* tmq) {
835✔
2499
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
835!
2500
  int32_t code = 0;
835✔
2501
  int8_t status = atomic_load_8(&tmq->status);
835✔
2502
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
835!
2503

2504
  displayConsumeStatistics(tmq);
835✔
2505
  if (status != TMQ_CONSUMER_STATUS__READY) {
835✔
2506
    tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
6!
2507
    goto END;
6✔
2508
  }
2509
  if (tmq->autoCommit) {
829✔
2510
    code = tmq_commit_sync(tmq, NULL);
403✔
2511
    if (code != 0) {
403!
2512
       goto END;
×
2513
    }
2514
  }
2515
  tmqSendHbReq((void*)(tmq->refId), NULL);
829✔
2516

2517
  tmq_list_t* lst = tmq_list_new();
829✔
2518
  if (lst == NULL) {
829!
2519
    code = TSDB_CODE_OUT_OF_MEMORY;
×
2520
    goto END;
×
2521
  }
2522
  code = tmq_subscribe(tmq, lst);
829✔
2523
  tmq_list_destroy(lst);
829✔
2524
  if(code != 0){
829!
2525
    goto END;
×
2526
  }
2527

2528
END:
829✔
2529
  return code;
835✔
2530
}
2531

2532
int32_t tmq_consumer_close(tmq_t* tmq) {
446✔
2533
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
446!
2534
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
446!
2535
  int32_t code = tmq_unsubscribe(tmq);
446✔
2536
  if (code == 0) {
446!
2537
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
446✔
2538
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
446✔
2539
    if (code != 0){
446!
2540
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2541
    }
2542
  }
2543
  return code;
446✔
2544
}
2545

2546
const char* tmq_err2str(int32_t err) {
239✔
2547
  if (err == 0) {
239✔
2548
    return "success";
221✔
2549
  } else if (err == -1) {
18!
2550
    return "fail";
×
2551
  } else {
2552
    if (*(taosGetErrMsg()) == 0) {
18✔
2553
      return tstrerror(err);
5✔
2554
    } else {
2555
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
13✔
2556
      return (const char*)taosGetErrMsgReturn();
13✔
2557
    }
2558
  }
2559
}
2560

2561
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
35,371✔
2562
  if (res == NULL) {
35,371✔
2563
    return TMQ_RES_INVALID;
14✔
2564
  }
2565
  if (TD_RES_TMQ(res)) {
35,357✔
2566
    return TMQ_RES_DATA;
35,151✔
2567
  } else if (TD_RES_TMQ_META(res)) {
206✔
2568
    return TMQ_RES_TABLE_META;
163✔
2569
  } else if (TD_RES_TMQ_METADATA(res)) {
43✔
2570
    return TMQ_RES_METADATA;
27✔
2571
  } else if (TD_RES_TMQ_BATCH_META(res)) {
16!
2572
    return TMQ_RES_TABLE_META;
17✔
2573
  } else {
2574
    return TMQ_RES_INVALID;
×
2575
  }
2576
}
2577

2578
const char* tmq_get_topic_name(TAOS_RES* res) {
34,701✔
2579
  if (res == NULL) {
34,701✔
2580
    return NULL;
11✔
2581
  }
2582
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
34,690✔
2583
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
176!
2584
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
34,690✔
2585
    if (tmp == NULL) {
34,690!
2586
      return NULL;
×
2587
    }
2588
    return tmp + 1;
34,690✔
2589
  } else {
2590
    return NULL;
×
2591
  }
2592
}
2593

2594
const char* tmq_get_db_name(TAOS_RES* res) {
34,706✔
2595
  if (res == NULL) {
34,706✔
2596
    return NULL;
18✔
2597
  }
2598

2599
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
34,688✔
2600
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
174!
2601
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
34,688✔
2602
    if (tmp == NULL) {
34,688!
2603
      return NULL;
×
2604
    }
2605
    return tmp + 1;
34,688✔
2606
  } else {
2607
    return NULL;
×
2608
  }
2609
}
2610

2611
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
34,701✔
2612
  if (res == NULL) {
34,701✔
2613
    return TSDB_CODE_INVALID_PARA;
11✔
2614
  }
2615
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
34,690✔
2616
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
180!
2617
    return ((SMqRspObj*)res)->vgId;
34,690✔
2618
  } else {
2619
    return TSDB_CODE_INVALID_PARA;
×
2620
  }
2621
}
2622

2623
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
419✔
2624
  if (res == NULL) {
419✔
2625
    return TSDB_CODE_INVALID_PARA;
7✔
2626
  }
2627
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
412✔
2628
    SMqRspObj* pRspObj = (SMqRspObj*)res;
404✔
2629
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
404✔
2630
    if (pOffset->type == TMQ_OFFSET__LOG) {
404!
2631
      return pOffset->version;
404✔
2632
    } else {
2633
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2634
    }
2635
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
8!
2636
    SMqRspObj* pRspObj = (SMqRspObj*)res;
8✔
2637
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
8!
2638
      return pRspObj->rspOffset.version;
8✔
2639
    }
2640
  } else {
2641
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2642
  }
2643

2644
  // data from tsdb, no valid offset info
2645
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2646
}
2647

2648
const char* tmq_get_table_name(TAOS_RES* res) {
15,190,648✔
2649
  if (res == NULL) {
15,190,648✔
2650
    return NULL;
8✔
2651
  }
2652
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
15,190,640!
2653
    SMqRspObj* pRspObj = (SMqRspObj*)res;
15,190,640✔
2654
    SMqDataRsp* data = &pRspObj->dataRsp;
15,190,640✔
2655
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
15,190,640!
2656
        pRspObj->resIter >= data->blockNum) {
4,308,974!
2657
      return NULL;
10,881,664✔
2658
    }
2659
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
4,308,976✔
2660
  }
2661
  return NULL;
×
2662
}
2663

2664
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
14✔
2665
  if (tmq == NULL) {
14!
2666
    tqErrorC("invalid tmq handle, null");
×
2667
    if (cb != NULL) {
×
2668
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2669
    }
2670
    return;
×
2671
  }
2672
  if (pRes == NULL) {  // here needs to commit all offsets.
14!
2673
    asyncCommitAllOffsets(tmq, cb, param);
14✔
2674
  } else {  // only commit one offset
2675
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2676
  }
2677
}
2678

2679
static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) {
720✔
2680
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
720✔
2681
  pInfo->code = code;
720✔
2682
  if (tsem2_post(&pInfo->sem) != 0){
720!
2683
    tqErrorC("failed to post rsp sem in commit cb");
×
2684
  }
2685
}
720✔
2686

2687
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
716✔
2688
  if (tmq == NULL) {
716!
2689
    tqErrorC("invalid tmq handle, null");
×
2690
    return TSDB_CODE_INVALID_PARA;
×
2691
  }
2692

2693
  int32_t code = 0;
716✔
2694

2695
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
716✔
2696
  if (pInfo == NULL) {
716!
2697
    tqErrorC("failed to allocate memory for sync commit");
×
2698
    return terrno;
×
2699
  }
2700
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
716!
2701
    tqErrorC("failed to init sem for sync commit");
×
2702
    taosMemoryFree(pInfo);
×
2703
    return TSDB_CODE_OUT_OF_MEMORY;
×
2704
  }
2705
  pInfo->code = 0;
716✔
2706

2707
  if (pRes == NULL) {
716✔
2708
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
687✔
2709
  } else {
2710
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
29✔
2711
  }
2712

2713
  if (tsem2_wait(&pInfo->sem) != 0){
716!
2714
    tqErrorC("failed to wait sem for sync commit");
×
2715
  }
2716
  code = pInfo->code;
716✔
2717

2718
  if(tsem2_destroy(&pInfo->sem) != 0) {
716!
2719
    tqErrorC("failed to destroy sem for sync commit");
×
2720
  }
2721
  taosMemoryFree(pInfo);
716✔
2722

2723
  tqInfoC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
716!
2724
  return code;
716✔
2725
}
2726

2727
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2728
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
41✔
2729
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
41!
2730
    tqErrorC("Assignment or poll interface need to be called first");
8!
2731
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
8✔
2732
  }
2733

2734
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
33!
2735
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
1!
2736
             offset->walVerBegin, offset->walVerEnd);
2737
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
1✔
2738
  }
2739

2740
  return 0;
32✔
2741
}
2742

2743
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
31✔
2744
  if (tmq == NULL || pTopicName == NULL) {
31!
2745
    tqErrorC("invalid tmq handle, null");
×
2746
    return TSDB_CODE_INVALID_PARA;
×
2747
  }
2748

2749
  int32_t accId = tmq->pTscObj->acctId;
31✔
2750
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
31✔
2751
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
31✔
2752

2753
  taosWLockLatch(&tmq->lock);
31✔
2754
  SMqClientVg* pVg = NULL;
31✔
2755
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
31✔
2756
  if (code != 0) {
31✔
2757
    taosWUnLockLatch(&tmq->lock);
19✔
2758
    return code;
19✔
2759
  }
2760

2761
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
12✔
2762
  code = checkWalRange(pOffsetInfo, offset);
12✔
2763
  if (code != 0) {
12!
2764
    taosWUnLockLatch(&tmq->lock);
×
2765
    return code;
×
2766
  }
2767
  taosWUnLockLatch(&tmq->lock);
12✔
2768

2769
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
12✔
2770

2771
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
12✔
2772
  if (pInfo == NULL) {
12!
2773
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2774
    return terrno;
×
2775
  }
2776

2777
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
12!
2778
    taosMemoryFree(pInfo);
×
2779
    return TSDB_CODE_OUT_OF_MEMORY;
×
2780
  }
2781
  pInfo->code = 0;
12✔
2782

2783
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
12✔
2784
  if (code == 0) {
12✔
2785
    if (tsem2_wait(&pInfo->sem) != 0){
4!
2786
      tqErrorC("failed to wait sem for sync commit offset");
×
2787
    }
2788
    code = pInfo->code;
4✔
2789
  }
2790

2791
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
12✔
2792
  if(tsem2_destroy(&pInfo->sem) != 0) {
12!
2793
    tqErrorC("failed to destroy sem for sync commit offset");
×
2794
  }
2795
  taosMemoryFree(pInfo);
12✔
2796

2797
  tqInfoC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
12!
2798
          offset, tstrerror(code));
2799

2800
  return code;
12✔
2801
}
2802

2803
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
18✔
2804
                             void* param) {
2805
  int32_t code = 0;
18✔
2806
  if (tmq == NULL || pTopicName == NULL) {
18!
2807
    tqErrorC("invalid tmq handle, null");
×
2808
    code = TSDB_CODE_INVALID_PARA;
×
2809
    goto end;
×
2810
  }
2811

2812
  int32_t accId = tmq->pTscObj->acctId;
18✔
2813
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
18✔
2814
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
18✔
2815

2816
  taosWLockLatch(&tmq->lock);
18✔
2817
  SMqClientVg* pVg = NULL;
18✔
2818
  code = getClientVg(tmq, tname, vgId, &pVg);
18✔
2819
  if (code != 0) {
18✔
2820
    taosWUnLockLatch(&tmq->lock);
16✔
2821
    goto end;
16✔
2822
  }
2823

2824
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
2✔
2825
  code = checkWalRange(pOffsetInfo, offset);
2✔
2826
  if (code != 0) {
2!
2827
    taosWUnLockLatch(&tmq->lock);
2✔
2828
    goto end;
2✔
2829
  }
2830
  taosWUnLockLatch(&tmq->lock);
×
2831

2832
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
×
2833

2834
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
×
2835

2836
  tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
×
2837
          offset, tstrerror(code));
2838

2839
end:
×
2840
  if (code != 0 && cb != NULL) {
18!
2841
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
2842
    cb(tmq, code, param);
×
2843
  }
2844
}
18✔
2845

2846
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
386,957✔
2847
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
386,957✔
2848
  SMqDataRsp* data = &pRspObj->dataRsp;
386,957✔
2849

2850
  pRspObj->resIter++;
386,957✔
2851
  if (pRspObj->resIter < data->blockNum) {
386,957✔
2852
    if (data->withSchema) {
351,858!
2853
      doFreeReqResultInfo(&pRspObj->resInfo);
351,858✔
2854
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
351,855✔
2855
      if (pSW) {
351,851!
2856
        TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols));
351,851!
2857
      }
2858
    }
2859

2860
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
351,843✔
2861
    void*   rawData = NULL;
351,838✔
2862
    int64_t rows = 0;
351,838✔
2863
    int32_t precision = 0;
351,838✔
2864
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
351,838✔
2865

2866
    pRspObj->resInfo.pData = rawData;
351,832✔
2867
    pRspObj->resInfo.numOfRows = rows;
351,832✔
2868
    pRspObj->resInfo.current = 0;
351,832✔
2869
    pRspObj->resInfo.precision = precision;
351,832✔
2870

2871
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
351,832✔
2872
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4);
351,832✔
2873
    if (code != 0) {
351,846!
2874
      return code;
×
2875
    }
2876
    *pResInfo = &pRspObj->resInfo;
351,846✔
2877
    return code;
351,846✔
2878
  }
2879

2880
  return TSDB_CODE_TSC_INTERNAL_ERROR;
35,099✔
2881
}
2882

2883
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
3✔
2884
  if (param == NULL) {
3!
2885
    return code;
×
2886
  }
2887
  SMqVgWalInfoParam* pParam = param;
3✔
2888
  SMqVgCommon*       pCommon = pParam->pCommon;
3✔
2889

2890
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
3✔
2891
  if (code != TSDB_CODE_SUCCESS) {
3!
2892
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2893
             pParam->vgId, pCommon->pTopicName);
2894

2895
  } else {
2896
    SMqDataRsp rsp = {0};
3✔
2897
    SDecoder   decoder = {0};
3✔
2898
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
3✔
2899
    code = tDecodeMqDataRsp(&decoder, &rsp);
3✔
2900
    tDecoderClear(&decoder);
3✔
2901
    if (code != 0) {
3!
2902
      goto END;
×
2903
    }
2904

2905
    SMqRspHead*          pHead = pMsg->pData;
3✔
2906
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
3✔
2907
                                       .end = pHead->walever + 1,
3✔
2908
                                       .currentOffset = rsp.rspOffset.version,
3✔
2909
                                       .vgId = pParam->vgId};
3✔
2910

2911
    (void)taosThreadMutexLock(&pCommon->mutex);
3✔
2912
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
6!
2913
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
2914
               pParam->vgId, pCommon->pTopicName);
2915
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2916
    }
2917
    (void)taosThreadMutexUnlock(&pCommon->mutex);
3✔
2918
  }
2919

2920
END:
3✔
2921
  pCommon->code = code;
3✔
2922
  if (total == pParam->totalReq) {
3✔
2923
    if (tsem2_post(&pCommon->rsp) != 0) {
2!
2924
      tqErrorC("failed to post semaphore in get wal cb");
×
2925
    }
2926
  }
2927

2928
  if (pMsg) {
3!
2929
    taosMemoryFree(pMsg->pData);
3✔
2930
    taosMemoryFree(pMsg->pEpSet);
3✔
2931
  }
2932

2933
  return code;
3✔
2934
}
2935

2936
static void destroyCommonInfo(SMqVgCommon* pCommon) {
7✔
2937
  if (pCommon == NULL) {
7✔
2938
    return;
5✔
2939
  }
2940
  taosArrayDestroy(pCommon->pList);
2✔
2941
  if(tsem2_destroy(&pCommon->rsp) != 0) {
2!
2942
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
2943
  }
2944
  (void)taosThreadMutexDestroy(&pCommon->mutex);
2✔
2945
  taosMemoryFree(pCommon->pTopicName);
2✔
2946
  taosMemoryFree(pCommon);
2✔
2947
}
2948

2949
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
72✔
2950
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
72!
2951
    return true;
×
2952
  }
2953
  return false;
72✔
2954
}
2955

2956
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
3✔
2957
  SMqCommittedParam* pParam = param;
3✔
2958

2959
  if (code != 0) {
3!
2960
    goto end;
×
2961
  }
2962
  if (pMsg) {
3!
2963
    SDecoder decoder = {0};
3✔
2964
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
3✔
2965
    if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) {
3!
2966
      tOffsetDestroy(&pParam->vgOffset.offset);
×
2967
      code = TSDB_CODE_OUT_OF_MEMORY;
×
2968
      goto end;
×
2969
    }
2970
    tDecoderClear(&decoder);
3✔
2971
  }
2972

2973
end:
×
2974
  if (pMsg) {
3!
2975
    taosMemoryFree(pMsg->pData);
3✔
2976
    taosMemoryFree(pMsg->pEpSet);
3✔
2977
  }
2978
  pParam->code = code;
3✔
2979
  if (tsem2_post(&pParam->sem) != 0){
3!
2980
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
2981
  }
2982
  return code;
3✔
2983
}
2984

2985
int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) {
3✔
2986
  int32_t     code = 0;
3✔
2987
  SMqVgOffset pOffset = {0};
3✔
2988

2989
  pOffset.consumerId = tmq->consumerId;
3✔
2990
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
3✔
2991

2992
  int32_t len = 0;
3✔
2993
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
3!
2994
  if (code < 0) {
3!
2995
    return TSDB_CODE_INVALID_PARA;
×
2996
  }
2997

2998
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
3✔
2999
  if (buf == NULL) {
3!
3000
    return terrno;
×
3001
  }
3002

3003
  ((SMsgHead*)buf)->vgId = htonl(vgId);
3✔
3004

3005
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
3✔
3006

3007
  SEncoder encoder = {0};
3✔
3008
  tEncoderInit(&encoder, abuf, len);
3✔
3009
  code = tEncodeMqVgOffset(&encoder, &pOffset);
3✔
3010
  if (code < 0) {
3!
3011
    taosMemoryFree(buf);
×
3012
    tEncoderClear(&encoder);
×
3013
    return code;
×
3014
  }
3015
  tEncoderClear(&encoder);
3✔
3016

3017
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3✔
3018
  if (sendInfo == NULL) {
3!
3019
    taosMemoryFree(buf);
×
3020
    return terrno;
×
3021
  }
3022

3023
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
3✔
3024
  if (pParam == NULL) {
3!
3025
    taosMemoryFree(buf);
×
3026
    taosMemoryFree(sendInfo);
×
3027
    return terrno;
×
3028
  }
3029
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
3!
3030
    taosMemoryFree(buf);
×
3031
    taosMemoryFree(sendInfo);
×
3032
    taosMemoryFree(pParam);
×
3033
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3034
  }
3035

3036
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
3✔
3037
  sendInfo->requestId = generateRequestId();
3✔
3038
  sendInfo->requestObjRefId = 0;
3✔
3039
  sendInfo->param = pParam;
3✔
3040
  sendInfo->fp = tmCommittedCb;
3✔
3041
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
3✔
3042

3043
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
3✔
3044
  if (code != 0) {
3!
3045
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3046
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3047
    }
3048
    taosMemoryFree(pParam);
×
3049
    return code;
×
3050
  }
3051

3052
  if (tsem2_wait(&pParam->sem) != 0){
3!
3053
    tqErrorC("failed to wait semaphore in get committed from server");
×
3054
  }
3055
  code = pParam->code;
3✔
3056
  if (code == TSDB_CODE_SUCCESS) {
3!
3057
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
3!
3058
      code = pParam->vgOffset.offset.val.version;
3✔
3059
    } else {
3060
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3061
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3062
    }
3063
  }
3064
  if(tsem2_destroy(&pParam->sem) != 0) {
3!
3065
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3066
  }
3067
  taosMemoryFree(pParam);
3✔
3068

3069
  return code;
3✔
3070
}
3071

3072
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
33✔
3073
  if (tmq == NULL || pTopicName == NULL) {
33!
3074
    tqErrorC("invalid tmq handle, null");
×
3075
    return TSDB_CODE_INVALID_PARA;
×
3076
  }
3077

3078
  int32_t accId = tmq->pTscObj->acctId;
33✔
3079
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
33✔
3080
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
33✔
3081

3082
  taosWLockLatch(&tmq->lock);
33✔
3083

3084
  SMqClientVg* pVg = NULL;
33✔
3085
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
33✔
3086
  if (code != 0) {
33✔
3087
    taosWUnLockLatch(&tmq->lock);
15✔
3088
    return code;
15✔
3089
  }
3090

3091
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
18✔
3092
  int32_t        type = pOffsetInfo->endOffset.type;
18✔
3093
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
18!
3094
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3095
    taosWUnLockLatch(&tmq->lock);
×
3096
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3097
  }
3098

3099
  code = checkWalRange(pOffsetInfo, -1);
18✔
3100
  if (code != 0) {
18✔
3101
    taosWUnLockLatch(&tmq->lock);
4✔
3102
    return code;
4✔
3103
  }
3104
  SEpSet  epSet = pVg->epSet;
14✔
3105
  int64_t begin = pVg->offsetInfo.walVerBegin;
14✔
3106
  int64_t end = pVg->offsetInfo.walVerEnd;
14✔
3107
  taosWUnLockLatch(&tmq->lock);
14✔
3108

3109
  int64_t position = 0;
14✔
3110
  if (type == TMQ_OFFSET__LOG) {
14✔
3111
    position = pOffsetInfo->endOffset.version;
13✔
3112
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
1!
3113
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3114
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
1!
3115
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3116
        position = begin;
×
3117
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3118
        position = end;
×
3119
      }
3120
    } else {
3121
      position = code;
1✔
3122
    }
3123
  } else {
3124
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3125
  }
3126

3127
  tqInfoC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
14!
3128
  return position;
14✔
3129
}
3130

3131
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
29✔
3132
  if (tmq == NULL || pTopicName == NULL) {
29!
3133
    tqErrorC("invalid tmq handle, null");
×
3134
    return TSDB_CODE_INVALID_PARA;
×
3135
  }
3136

3137
  int32_t accId = tmq->pTscObj->acctId;
29✔
3138
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
29✔
3139
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
29✔
3140

3141
  taosWLockLatch(&tmq->lock);
29✔
3142

3143
  SMqClientVg* pVg = NULL;
29✔
3144
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
29✔
3145
  if (code != 0) {
29✔
3146
    taosWUnLockLatch(&tmq->lock);
13✔
3147
    return code;
13✔
3148
  }
3149

3150
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
16✔
3151
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
16!
3152
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3153
             pOffsetInfo->endOffset.type);
3154
    taosWUnLockLatch(&tmq->lock);
×
3155
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3156
  }
3157

3158
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
16!
3159
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3160
             pOffsetInfo->committedOffset.type);
3161
    taosWUnLockLatch(&tmq->lock);
×
3162
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3163
  }
3164

3165
  int64_t committed = 0;
16✔
3166
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
16✔
3167
    committed = pOffsetInfo->committedOffset.version;
14✔
3168
    taosWUnLockLatch(&tmq->lock);
14✔
3169
    goto end;
14✔
3170
  }
3171
  SEpSet epSet = pVg->epSet;
2✔
3172
  taosWUnLockLatch(&tmq->lock);
2✔
3173

3174
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
2✔
3175

3176
end:
16✔
3177
  tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
16!
3178
  return committed;
16✔
3179
}
3180

3181
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
32✔
3182
                                 int32_t* numOfAssignment) {
3183
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
32!
3184
    tqErrorC("invalid tmq handle, null");
25!
3185
    return TSDB_CODE_INVALID_PARA;
25✔
3186
  }
3187
  *numOfAssignment = 0;
7✔
3188
  *assignment = NULL;
7✔
3189
  SMqVgCommon* pCommon = NULL;
7✔
3190

3191
  int32_t accId = tmq->pTscObj->acctId;
7✔
3192
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
7✔
3193
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
7✔
3194

3195
  taosWLockLatch(&tmq->lock);
7✔
3196

3197
  SMqClientTopic* pTopic = NULL;
7✔
3198
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
7✔
3199
  if (code != 0) {
7!
3200
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
3201
    goto end;
×
3202
  }
3203

3204
  // in case of snapshot is opened, no valid offset will return
3205
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
7✔
3206
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
20✔
3207
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
13✔
3208
    if (pClientVg == NULL) {
13!
3209
      continue;
×
3210
    }
3211
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
13✔
3212
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
13!
3213
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3214
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3215
      goto end;
×
3216
    }
3217
  }
3218

3219
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
7✔
3220
  if (*assignment == NULL) {
7!
3221
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3222
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3223
    code = terrno;
×
3224
    goto end;
×
3225
  }
3226

3227
  bool needFetch = false;
7✔
3228

3229
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
18✔
3230
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
13✔
3231
    if (pClientVg == NULL) {
13!
3232
      continue;
×
3233
    }
3234
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
13✔
3235
      needFetch = true;
2✔
3236
      break;
2✔
3237
    }
3238

3239
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
11✔
3240
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
11✔
3241
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
11✔
3242
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
11✔
3243
    pAssignment->vgId = pClientVg->vgId;
11✔
3244
    tqInfoC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
11!
3245
            pAssignment->currentOffset);
3246
  }
3247

3248
  if (needFetch) {
7✔
3249
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
2✔
3250
    if (pCommon == NULL) {
2!
3251
      code = terrno;
×
3252
      goto end;
×
3253
    }
3254

3255
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
2✔
3256
    if (pCommon->pList == NULL) {
2!
3257
      code = terrno;
×
3258
      goto end;
×
3259
    }
3260
    if (tsem2_init(&pCommon->rsp, 0, 0) != 0) {
2!
3261
      code = TSDB_CODE_OUT_OF_MEMORY;
×
3262
      goto end;
×
3263
    }
3264
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
2✔
3265
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
2✔
3266
    if (pCommon->pTopicName == NULL) {
2!
3267
      code = terrno;
×
3268
      goto end;
×
3269
    }
3270
    pCommon->consumerId = tmq->consumerId;
2✔
3271

3272
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
5✔
3273
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
3✔
3274
      if (pClientVg == NULL) {
3!
3275
        continue;
×
3276
      }
3277
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
3✔
3278
      if (pParam == NULL) {
3!
3279
        code = terrno;
×
3280
        goto end;
×
3281
      }
3282

3283
      pParam->epoch = tmq->epoch;
3✔
3284
      pParam->vgId = pClientVg->vgId;
3✔
3285
      pParam->totalReq = *numOfAssignment;
3✔
3286
      pParam->pCommon = pCommon;
3✔
3287

3288
      SMqPollReq req = {0};
3✔
3289
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
3✔
3290
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
3✔
3291

3292
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
3✔
3293
      if (msgSize < 0) {
3!
3294
        taosMemoryFree(pParam);
×
3295
        code = TSDB_CODE_OUT_OF_MEMORY;
×
3296
        goto end;
×
3297
      }
3298

3299
      char* msg = taosMemoryCalloc(1, msgSize);
3✔
3300
      if (NULL == msg) {
3!
3301
        taosMemoryFree(pParam);
×
3302
        code = terrno;
×
3303
        goto end;
×
3304
      }
3305

3306
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
3!
3307
        taosMemoryFree(msg);
×
3308
        taosMemoryFree(pParam);
×
3309
        code = TSDB_CODE_OUT_OF_MEMORY;
×
3310
        goto end;
×
3311
      }
3312

3313
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3✔
3314
      if (sendInfo == NULL) {
3!
3315
        taosMemoryFree(pParam);
×
3316
        taosMemoryFree(msg);
×
3317
        code = terrno;
×
3318
        goto end;
×
3319
      }
3320

3321
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
3✔
3322
      sendInfo->requestId = req.reqId;
3✔
3323
      sendInfo->requestObjRefId = 0;
3✔
3324
      sendInfo->param = pParam;
3✔
3325
      sendInfo->paramFreeFp = taosMemoryFree;
3✔
3326
      sendInfo->fp = tmqGetWalInfoCb;
3✔
3327
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
3✔
3328

3329
      // int64_t transporterId = 0;
3330
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
3✔
3331
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
3✔
3332

3333
      tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
3!
3334
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3335
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
3✔
3336
      if (code != 0) {
3!
3337
        goto end;
×
3338
      }
3339
    }
3340

3341
    if (tsem2_wait(&pCommon->rsp) != 0){
2!
3342
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3343
    }
3344
    code = pCommon->code;
2✔
3345

3346
    if (code != TSDB_CODE_SUCCESS) {
2!
3347
      goto end;
×
3348
    }
3349
    int32_t num = taosArrayGetSize(pCommon->pList);
2✔
3350
    for (int32_t i = 0; i < num; ++i) {
5✔
3351
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
3✔
3352
    }
3353
    *numOfAssignment = num;
2✔
3354

3355
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5✔
3356
      tmq_topic_assignment* p = &(*assignment)[j];
3✔
3357

3358
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
8✔
3359
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
5✔
3360
        if (pClientVg == NULL) {
5!
3361
          continue;
×
3362
        }
3363
        if (pClientVg->vgId != p->vgId) {
5✔
3364
          continue;
2✔
3365
        }
3366

3367
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
3✔
3368
        tqInfoC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
3!
3369
                p->vgId, p->currentOffset);
3370

3371
        pOffsetInfo->walVerBegin = p->begin;
3✔
3372
        pOffsetInfo->walVerEnd = p->end;
3✔
3373
      }
3374
    }
3375
  }
3376

3377
end:
7✔
3378
  if (code != TSDB_CODE_SUCCESS) {
7!
3379
    taosMemoryFree(*assignment);
×
3380
    *assignment = NULL;
×
3381
    *numOfAssignment = 0;
×
3382
  }
3383
  destroyCommonInfo(pCommon);
7✔
3384
  taosWUnLockLatch(&tmq->lock);
7✔
3385
  return code;
7✔
3386
}
3387

3388
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
27✔
3389
  if (pAssignment == NULL) {
27✔
3390
    return;
20✔
3391
  }
3392

3393
  taosMemoryFree(pAssignment);
7✔
3394
}
3395

3396
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
6✔
3397
  if (pMsg) {
6!
3398
    taosMemoryFree(pMsg->pData);
6✔
3399
    taosMemoryFree(pMsg->pEpSet);
6✔
3400
  }
3401
  if (param == NULL) {
6!
3402
    return code;
×
3403
  }
3404
  SMqSeekParam* pParam = param;
6✔
3405
  pParam->code = code;
6✔
3406
  if (tsem2_post(&pParam->sem) != 0){
6!
3407
    tqErrorC("failed to post sem in tmqSeekCb");
×
3408
  }
3409
  return 0;
6✔
3410
}
3411

3412
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3413
// there is no data to poll
3414
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
29✔
3415
  if (tmq == NULL || pTopicName == NULL) {
29!
3416
    tqErrorC("invalid tmq handle, null");
×
3417
    return TSDB_CODE_INVALID_PARA;
×
3418
  }
3419

3420
  int32_t accId = tmq->pTscObj->acctId;
29✔
3421
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
29✔
3422
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
29✔
3423

3424
  taosWLockLatch(&tmq->lock);
29✔
3425

3426
  SMqClientVg* pVg = NULL;
29✔
3427
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
29✔
3428
  if (code != 0) {
29✔
3429
    taosWUnLockLatch(&tmq->lock);
20✔
3430
    return code;
20✔
3431
  }
3432

3433
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
9✔
3434

3435
  int32_t type = pOffsetInfo->endOffset.type;
9✔
3436
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
9!
3437
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3438
    taosWUnLockLatch(&tmq->lock);
×
3439
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3440
  }
3441

3442
  code = checkWalRange(pOffsetInfo, offset);
9✔
3443
  if (code != 0) {
9✔
3444
    taosWUnLockLatch(&tmq->lock);
3✔
3445
    return code;
3✔
3446
  }
3447

3448
  tqInfoC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
6!
3449
  // update the offset, and then commit to vnode
3450
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
6✔
3451
  pOffsetInfo->endOffset.version = offset;
6✔
3452
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
6✔
3453
  pVg->seekUpdated = true;
6✔
3454
  SEpSet epSet = pVg->epSet;
6✔
3455
  taosWUnLockLatch(&tmq->lock);
6✔
3456

3457
  SMqSeekReq req = {0};
6✔
3458
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
6✔
3459
  req.head.vgId = vgId;
6✔
3460
  req.consumerId = tmq->consumerId;
6✔
3461

3462
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
6✔
3463
  if (msgSize < 0) {
6!
3464
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3465
  }
3466

3467
  char* msg = taosMemoryCalloc(1, msgSize);
6✔
3468
  if (NULL == msg) {
6!
3469
    return terrno;
×
3470
  }
3471

3472
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
6!
3473
    taosMemoryFree(msg);
×
3474
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3475
  }
3476

3477
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
6✔
3478
  if (sendInfo == NULL) {
6!
3479
    taosMemoryFree(msg);
×
3480
    return terrno;
×
3481
  }
3482

3483
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
6✔
3484
  if (pParam == NULL) {
6!
3485
    taosMemoryFree(msg);
×
3486
    taosMemoryFree(sendInfo);
×
3487
    return terrno;
×
3488
  }
3489
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
6!
3490
    taosMemoryFree(msg);
×
3491
    taosMemoryFree(sendInfo);
×
3492
    taosMemoryFree(pParam);
×
3493
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3494
  }
3495

3496
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
6✔
3497
  sendInfo->requestId = generateRequestId();
6✔
3498
  sendInfo->requestObjRefId = 0;
6✔
3499
  sendInfo->param = pParam;
6✔
3500
  sendInfo->fp = tmqSeekCb;
6✔
3501
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
6✔
3502

3503
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
6✔
3504
  if (code != 0) {
6!
3505
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3506
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3507
    }
3508
    taosMemoryFree(pParam);
×
3509
    return code;
×
3510
  }
3511

3512
  if (tsem2_wait(&pParam->sem) != 0){
6!
3513
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3514
  }
3515
  code = pParam->code;
6✔
3516
  if(tsem2_destroy(&pParam->sem) != 0) {
6!
3517
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3518
  }
3519
  taosMemoryFree(pParam);
6✔
3520

3521
  tqInfoC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
6!
3522

3523
  return code;
6✔
3524
}
3525

3526
TAOS* tmq_get_connect(tmq_t* tmq) {
18✔
3527
  if (tmq && tmq->pTscObj) {
18!
3528
    return (TAOS*)(&(tmq->pTscObj->id));
18✔
3529
  }
3530
  return NULL;
×
3531
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc