• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.05
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24
#include "tref.h"
25
#include "ttimer.h"
26

27
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
28
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO  || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  INFO  ", DEBUG_INFO,  tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
29
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30

31
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
32
#define DEFAULT_HEARTBEAT_INTERVAL     3000
33
#define DEFAULT_ASKEP_INTERVAL         1000
34
#define DEFAULT_COMMIT_CNT             1
35
#define SUBSCRIBE_RETRY_MAX_COUNT      240
36
#define SUBSCRIBE_RETRY_INTERVAL       500
37

38

39
#define SET_ERROR_MSG_TMQ(MSG) \
40
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
41

42
#define PROCESS_POLL_RSP(FUNC,DATA) \
43
  SDecoder decoder = {0}; \
44
  tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \
45
  if (FUNC(&decoder, DATA) < 0) { \
46
    tDecoderClear(&decoder); \
47
    code = terrno; \
48
    goto END;\
49
  }\
50
  tDecoderClear(&decoder);\
51
  (void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
52

53
#define DELETE_POLL_RSP(FUNC,DATA) \
54
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
55
  taosMemoryFreeClear(pRsp->pEpset);             \
56
  taosMemoryFreeClear(pRsp->data);               \
57
  FUNC(DATA);
58

59
enum {
60
  TMQ_VG_STATUS__IDLE = 0,
61
  TMQ_VG_STATUS__WAIT,
62
};
63

64
enum {
65
  TMQ_CONSUMER_STATUS__INIT = 0,
66
  TMQ_CONSUMER_STATUS__READY,
67
  TMQ_CONSUMER_STATUS__LOST,
68
  TMQ_CONSUMER_STATUS__CLOSED,
69
};
70

71
enum {
72
  TMQ_DELAYED_TASK__ASK_EP = 1,
73
  TMQ_DELAYED_TASK__COMMIT,
74
};
75

76
typedef struct {
77
  tmr_h               timer;
78
  int32_t             rsetId;
79
  TdThreadMutex       lock;
80
} SMqMgmt;
81

82
struct tmq_list_t {
83
  SArray container;
84
};
85

86
struct tmq_conf_t {
87
  char           clientId[TSDB_CLIENT_ID_LEN];
88
  char           groupId[TSDB_CGROUP_LEN];
89
  int8_t         autoCommit;
90
  int8_t         resetOffset;
91
  int8_t         withTbName;
92
  int8_t         snapEnable;
93
  int8_t         replayEnable;
94
  int8_t         sourceExcluded;  // do not consume, bit
95
  int8_t         rawData;         // fetch raw data
96
  int32_t        maxPollWaitTime;
97
  int32_t        minPollRows;
98
  uint16_t       port;
99
  int32_t        autoCommitInterval;
100
  int32_t        sessionTimeoutMs;
101
  int32_t        heartBeatIntervalMs;
102
  int32_t        maxPollIntervalMs;
103
  char*          ip;
104
  char*          user;
105
  char*          pass;
106
  tmq_commit_cb* commitCb;
107
  void*          commitCbUserParam;
108
  int8_t         enableBatchMeta;
109
};
110

111
struct tmq_t {
112
  int64_t        refId;
113
  char           groupId[TSDB_CGROUP_LEN];
114
  char           clientId[TSDB_CLIENT_ID_LEN];
115
  char           user[TSDB_USER_LEN];
116
  char           fqdn[TSDB_FQDN_LEN];
117
  int8_t         withTbName;
118
  int8_t         useSnapshot;
119
  int8_t         autoCommit;
120
  int32_t        autoCommitInterval;
121
  int32_t        sessionTimeoutMs;
122
  int32_t        heartBeatIntervalMs;
123
  int32_t        maxPollIntervalMs;
124
  int8_t         resetOffsetCfg;
125
  int8_t         replayEnable;
126
  int8_t         sourceExcluded;  // do not consume, bit
127
  int8_t         rawData;         // fetch raw data
128
  int32_t        maxPollWaitTime;
129
  int32_t        minPollRows;
130
  int64_t        consumerId;
131
  tmq_commit_cb* commitCb;
132
  void*          commitCbUserParam;
133
  int8_t         enableBatchMeta;
134

135
  // status
136
  SRWLatch lock;
137
  int8_t   status;
138
  int32_t  epoch;
139
  // poll info
140
  int64_t pollCnt;
141
  int64_t totalRows;
142
  int8_t  pollFlag;
143

144
  // timer
145
  tmr_h       hbLiveTimer;
146
  tmr_h       epTimer;
147
  tmr_h       commitTimer;
148
  STscObj*    pTscObj;       // connection
149
  SArray*     clientTopics;  // SArray<SMqClientTopic>
150
  STaosQueue* mqueue;        // queue of rsp
151
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
152
  tsem2_t     rspSem;
153
};
154

155
typedef struct {
156
  int32_t code;
157
  tsem2_t sem;
158
} SAskEpInfo;
159

160
typedef struct {
161
  STqOffsetVal committedOffset;
162
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
163
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
164
  int64_t      walVerBegin;
165
  int64_t      walVerEnd;
166
} SVgOffsetInfo;
167

168
typedef struct {
169
  int64_t       pollCnt;
170
  int64_t       numOfRows;
171
  SVgOffsetInfo offsetInfo;
172
  int32_t       vgId;
173
  int32_t       vgStatus;
174
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
175
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
176
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
177
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
178
  SEpSet        epSet;
179
} SMqClientVg;
180

181
typedef struct {
182
  char           topicName[TSDB_TOPIC_FNAME_LEN];
183
  char           db[TSDB_DB_FNAME_LEN];
184
  SArray*        vgs;  // SArray<SMqClientVg>
185
  SSchemaWrapper schema;
186
  int8_t         noPrivilege;
187
} SMqClientTopic;
188

189
typedef struct {
190
  int32_t         vgId;
191
  char            topicName[TSDB_TOPIC_FNAME_LEN];
192
  SMqClientTopic* topicHandle;
193
  uint64_t        reqId;
194
  SEpSet*         pEpset;
195
  void*           data;
196
  uint32_t        len;
197
  union {
198
    struct{
199
      SMqRspHead   head;
200
      STqOffsetVal rspOffset;
201
    };
202
    SMqDataRsp      dataRsp;
203
    SMqMetaRsp      metaRsp;
204
    SMqBatchMetaRsp batchMetaRsp;
205
  };
206
} SMqPollRspWrapper;
207

208
typedef struct {
209
  int32_t code;
210
  int8_t  tmqRspType;
211
  int32_t epoch;
212
  union{
213
    SMqPollRspWrapper pollRsp;
214
    SMqAskEpRsp       epRsp;
215
  };
216
} SMqRspWrapper;
217

218
typedef struct {
219
  tsem2_t rspSem;
220
  int32_t rspErr;
221
} SMqSubscribeCbParam;
222

223
typedef struct {
224
  int64_t refId;
225
  bool    sync;
226
  void*   pParam;
227
} SMqAskEpCbParam;
228

229
typedef struct {
230
  int64_t  refId;
231
  char     topicName[TSDB_TOPIC_FNAME_LEN];
232
  int32_t  vgId;
233
  uint64_t requestId;  // request id for debug purpose
234
} SMqPollCbParam;
235

236
typedef struct {
237
  tsem2_t       rsp;
238
  int32_t       numOfRsp;
239
  SArray*       pList;
240
  TdThreadMutex mutex;
241
  int64_t       consumerId;
242
  char*         pTopicName;
243
  int32_t       code;
244
} SMqVgCommon;
245

246
typedef struct {
247
  tsem2_t sem;
248
  int32_t code;
249
} SMqSeekParam;
250

251
typedef struct {
252
  tsem2_t     sem;
253
  int32_t     code;
254
  SMqVgOffset vgOffset;
255
} SMqCommittedParam;
256

257
typedef struct {
258
  int32_t      vgId;
259
  int32_t      epoch;
260
  int32_t      totalReq;
261
  SMqVgCommon* pCommon;
262
} SMqVgWalInfoParam;
263

264
typedef struct {
265
  int64_t        refId;
266
  int32_t        epoch;
267
  int32_t        waitingRspNum;
268
  int32_t        code;
269
  tmq_commit_cb* callbackFn;
270
  void*          userParam;
271
} SMqCommitCbParamSet;
272

273
typedef struct {
274
  SMqCommitCbParamSet* params;
275
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
276
  int32_t              vgId;
277
  int64_t              consumerId;
278
} SMqCommitCbParam;
279

280
typedef struct {
281
  tsem2_t sem;
282
  int32_t code;
283
} SSyncCommitInfo;
284

285
typedef struct {
286
  STqOffsetVal currentOffset;
287
  STqOffsetVal commitOffset;
288
  STqOffsetVal seekOffset;
289
  int64_t      numOfRows;
290
  int32_t      vgStatus;
291
} SVgroupSaveInfo;
292

293
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
294
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
295
static   SMqMgmt        tmqMgmt = {0};
296

297
tmq_conf_t* tmq_conf_new() {
51✔
298
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
51!
299
  if (conf == NULL) {
51!
300
    return conf;
×
301
  }
302

303
  conf->withTbName = false;
51✔
304
  conf->autoCommit = true;
51✔
305
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
51✔
306
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
51✔
307
  conf->enableBatchMeta = false;
51✔
308
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
51✔
309
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
51✔
310
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
51✔
311
  conf->maxPollWaitTime = DEFAULT_MAX_POLL_WAIT_TIME;
51✔
312
  conf->minPollRows = DEFAULT_MIN_POLL_ROWS;
51✔
313

314
  return conf;
51✔
315
}
316

317
void tmq_conf_destroy(tmq_conf_t* conf) {
51✔
318
  if (conf) {
51!
319
    if (conf->ip) {
51✔
320
      taosMemoryFree(conf->ip);
8!
321
    }
322
    if (conf->user) {
51✔
323
      taosMemoryFree(conf->user);
48!
324
    }
325
    if (conf->pass) {
51✔
326
      taosMemoryFree(conf->pass);
48!
327
    }
328
    taosMemoryFree(conf);
51!
329
  }
330
}
51✔
331

332
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
295✔
333
  int32_t code = 0;
295✔
334
  if (conf == NULL || key == NULL || value == NULL) {
295!
335
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
336
    return TMQ_CONF_INVALID;
×
337
  }
338
  if (strcasecmp(key, "group.id") == 0) {
295✔
339
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
51✔
340
    return TMQ_CONF_OK;
51✔
341
  }
342

343
  if (strcasecmp(key, "client.id") == 0) {
244✔
344
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
8✔
345
    return TMQ_CONF_OK;
8✔
346
  }
347

348
  if (strcasecmp(key, "enable.auto.commit") == 0) {
236✔
349
    if (strcasecmp(value, "true") == 0) {
52✔
350
      conf->autoCommit = true;
43✔
351
      return TMQ_CONF_OK;
43✔
352
    } else if (strcasecmp(value, "false") == 0) {
9!
353
      conf->autoCommit = false;
9✔
354
      return TMQ_CONF_OK;
9✔
355
    } else {
356
      tqErrorC("invalid value for enable.auto.commit:%s", value);
×
357
      return TMQ_CONF_INVALID;
×
358
    }
359
  }
360

361
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
184✔
362
    int64_t tmp;
363
    code = taosStr2int64(value, &tmp);
21✔
364
    if (tmp < 0 || code != 0) {
21!
365
      tqErrorC("invalid value for auto.commit.interval.ms:%s", value);
×
366
      return TMQ_CONF_INVALID;
×
367
    }
368
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
21✔
369
    return TMQ_CONF_OK;
21✔
370
  }
371

372
  if (strcasecmp(key, "session.timeout.ms") == 0) {
163✔
373
    int64_t tmp;
374
    code = taosStr2int64(value, &tmp);
2✔
375
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
2!
376
      tqErrorC("invalid value for session.timeout.ms:%s", value);
×
377
      return TMQ_CONF_INVALID;
×
378
    }
379
    conf->sessionTimeoutMs = tmp;
2✔
380
    return TMQ_CONF_OK;
2✔
381
  }
382

383
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
161!
384
    int64_t tmp;
385
    code = taosStr2int64(value, &tmp);
×
386
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
×
387
      tqErrorC("invalid value for heartbeat.interval.ms:%s", value);
×
388
      return TMQ_CONF_INVALID;
×
389
    }
390
    conf->heartBeatIntervalMs = tmp;
×
391
    return TMQ_CONF_OK;
×
392
  }
393

394
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
161✔
395
    int32_t tmp;
396
    code = taosStr2int32(value, &tmp);
2✔
397
    if (tmp < 1000 || code != 0) {
2!
398
      tqErrorC("invalid value for max.poll.interval.ms:%s", value);
×
399
      return TMQ_CONF_INVALID;
×
400
    }
401
    conf->maxPollIntervalMs = tmp;
2✔
402
    return TMQ_CONF_OK;
2✔
403
  }
404

405
  if (strcasecmp(key, "auto.offset.reset") == 0) {
159✔
406
    if (strcasecmp(value, "none") == 0) {
21!
407
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
×
408
      return TMQ_CONF_OK;
×
409
    } else if (strcasecmp(value, "earliest") == 0) {
21✔
410
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
16✔
411
      return TMQ_CONF_OK;
16✔
412
    } else if (strcasecmp(value, "latest") == 0) {
5!
413
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
5✔
414
      return TMQ_CONF_OK;
5✔
415
    } else {
416
      tqErrorC("invalid value for auto.offset.reset:%s", value);
×
417
      return TMQ_CONF_INVALID;
×
418
    }
419
  }
420

421
  if (strcasecmp(key, "msg.with.table.name") == 0) {
138✔
422
    if (strcasecmp(value, "true") == 0) {
20✔
423
      conf->withTbName = true;
19✔
424
      return TMQ_CONF_OK;
19✔
425
    } else if (strcasecmp(value, "false") == 0) {
1!
426
      conf->withTbName = false;
1✔
427
      return TMQ_CONF_OK;
1✔
428
    } else {
429
      tqErrorC("invalid value for msg.with.table.name:%s", value);
×
430
      return TMQ_CONF_INVALID;
×
431
    }
432
  }
433

434
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
118✔
435
    if (strcasecmp(value, "true") == 0) {
7✔
436
      conf->snapEnable = true;
2✔
437
      return TMQ_CONF_OK;
2✔
438
    } else if (strcasecmp(value, "false") == 0) {
5!
439
      conf->snapEnable = false;
5✔
440
      return TMQ_CONF_OK;
5✔
441
    } else {
442
      tqErrorC("invalid value for experimental.snapshot.enable:%s", value);
×
443
      return TMQ_CONF_INVALID;
×
444
    }
445
  }
446

447
  if (strcasecmp(key, "td.connect.ip") == 0) {
111✔
448
    void *tmp = taosStrdup(value);
8!
449
    if (tmp == NULL) {
8!
450
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
451
      return TMQ_CONF_INVALID;
×
452
    }
453
    conf->ip = tmp;
8✔
454
    return TMQ_CONF_OK;
8✔
455
  }
456

457
  if (strcasecmp(key, "td.connect.user") == 0) {
103✔
458
    void *tmp = taosStrdup(value);
48!
459
    if (tmp == NULL) {
48!
460
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
461
      return TMQ_CONF_INVALID;
×
462
    }
463
    conf->user = tmp;
48✔
464
    return TMQ_CONF_OK;
48✔
465
  }
466

467
  if (strcasecmp(key, "td.connect.pass") == 0) {
55✔
468
    void *tmp = taosStrdup(value);
48!
469
    if (tmp == NULL) {
48!
470
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
471
      return TMQ_CONF_INVALID;
×
472
    }
473
    conf->pass = tmp;
48✔
474
    return TMQ_CONF_OK;
48✔
475
  }
476

477
  if (strcasecmp(key, "td.connect.port") == 0) {
7✔
478
    int64_t tmp;
479
    code = taosStr2int64(value, &tmp);
6✔
480
    if (tmp <= 0 || tmp > 65535 || code != 0) {
6!
481
      tqErrorC("invalid value for td.connect.port:%s", value);
×
482
      return TMQ_CONF_INVALID;
×
483
    }
484

485
    conf->port = tmp;
6✔
486
    return TMQ_CONF_OK;
6✔
487
  }
488

489
  if (strcasecmp(key, "enable.replay") == 0) {
1!
490
    if (strcasecmp(value, "true") == 0) {
×
491
      conf->replayEnable = true;
×
492
      return TMQ_CONF_OK;
×
493
    } else if (strcasecmp(value, "false") == 0) {
×
494
      conf->replayEnable = false;
×
495
      return TMQ_CONF_OK;
×
496
    } else {
497
      tqErrorC("invalid value for enable.replay:%s", value);
×
498
      return TMQ_CONF_INVALID;
×
499
    }
500
  }
501
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
1!
502
    int64_t tmp = 0;
×
503
    code = taosStr2int64(value, &tmp);
×
504
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
×
505
    return TMQ_CONF_OK;
×
506
  }
507
  if (strcasecmp(key, "msg.consume.rawdata") == 0) {
1!
508
    int64_t tmp = 0;
×
509
    code = taosStr2int64(value, &tmp);
×
510
    conf->rawData = (0 == code && tmp != 0) ? 1 : 0;
×
511
    return TMQ_CONF_OK;
×
512
  }
513

514
  if (strcasecmp(key, "fetch.max.wait.ms") == 0) {
1!
515
    int64_t tmp = 0;
×
516
    code = taosStr2int64(value, &tmp);
×
517
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
×
518
      tqErrorC("invalid value for fetch.max.wait.ms:%s", value);
×
519
      return TMQ_CONF_INVALID;
×
520
    }
521
    conf->maxPollWaitTime = tmp;
×
522
    return TMQ_CONF_OK;
×
523
  }
524

525
  if (strcasecmp(key, "min.poll.rows") == 0) {
1!
526
    int64_t tmp = 0;
×
527
    code = taosStr2int64(value, &tmp);
×
528
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
×
529
      tqErrorC("invalid value for min.poll.rows:%s", value);
×
530
      return TMQ_CONF_INVALID;
×
531
    }
532
    conf->minPollRows = tmp;
×
533
    return TMQ_CONF_OK;
×
534
  }
535

536
  if (strcasecmp(key, "td.connect.db") == 0) {
1!
537
    return TMQ_CONF_OK;
×
538
  }
539

540
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
1!
541
    int64_t tmp;
542
    code = taosStr2int64(value, &tmp);
×
543
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
×
544
    return TMQ_CONF_OK;
×
545
  }
546

547
  tqErrorC("unknown key:%s", key);
1!
548
  return TMQ_CONF_UNKNOWN;
1✔
549
}
550

551
tmq_list_t* tmq_list_new() {
149✔
552
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
149✔
553
}
554

555
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
63✔
556
  if (list == NULL) {
63!
557
    return TSDB_CODE_INVALID_PARA;
×
558
  }
559
  SArray* container = &list->container;
63✔
560
  if (src == NULL || src[0] == 0) {
63!
561
    return TSDB_CODE_INVALID_PARA;
×
562
  }
563
  char* topic = taosStrdup(src);
63!
564
  if (topic == NULL) return terrno;
63!
565
  if (taosArrayPush(container, &topic) == NULL) {
63!
566
    taosMemoryFree(topic);
×
567
    return terrno;
×
568
  }
569
  return 0;
63✔
570
}
571

572
void tmq_list_destroy(tmq_list_t* list) {
149✔
573
  if (list == NULL) return;
149!
574
  SArray* container = &list->container;
149✔
575
  taosArrayDestroyP(container, NULL);
149✔
576
}
577

578
int32_t tmq_list_get_size(const tmq_list_t* list) {
5✔
579
  if (list == NULL) {
5!
580
    return TSDB_CODE_INVALID_PARA;
×
581
  }
582
  const SArray* container = &list->container;
5✔
583
  return taosArrayGetSize(container);
5✔
584
}
585

586
char** tmq_list_to_c_array(const tmq_list_t* list) {
5✔
587
  if (list == NULL) {
5!
588
    return NULL;
×
589
  }
590
  const SArray* container = &list->container;
5✔
591
  return container->pData;
5✔
592
}
593

594
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
495✔
595
  if (pParamSet == NULL) {
495!
596
    return TSDB_CODE_INVALID_PARA;
×
597
  }
598
  int64_t refId = pParamSet->refId;
495✔
599
  int32_t code = 0;
495✔
600
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
495✔
601
  if (tmq == NULL) {
495!
602
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
603
  }
604

605
  // if no more waiting rsp
606
  if (pParamSet->callbackFn != NULL) {
495!
607
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
495✔
608
  }
609

610
  taosMemoryFree(pParamSet);
495!
611
  if (tmq != NULL) {
495!
612
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
495✔
613
  }
614

615
  return code;
495✔
616
}
617

618
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
917✔
619
  if (pParamSet == NULL) {
917!
620
    return TSDB_CODE_INVALID_PARA;
×
621
  }
622
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
917✔
623
  if (waitingRspNum == 0) {
917✔
624
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
495!
625
             vgId);
626
    return tmqCommitDone(pParamSet);
495✔
627
  } else {
628
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
422!
629
             waitingRspNum);
630
  }
631
  return 0;
422✔
632
}
633

634
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
477✔
635
  if (pBuf){
477!
636
    taosMemoryFreeClear(pBuf->pData);
477!
637
    taosMemoryFreeClear(pBuf->pEpSet);
477!
638
  }
639
  if(param == NULL){
477!
640
    return TSDB_CODE_INVALID_PARA;
×
641
  }
642
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
477✔
643
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
477✔
644

645
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
477✔
646
}
647

648
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
477✔
649
                               SMqCommitCbParamSet* pParamSet) {
650
  if (tmq == NULL || epSet == NULL || offset == NULL || pTopicName == NULL || pParamSet == NULL) {
477!
651
    return TSDB_CODE_INVALID_PARA;
×
652
  }
653
  SMqVgOffset pOffset = {0};
477✔
654

655
  pOffset.consumerId = tmq->consumerId;
477✔
656
  pOffset.offset.val = *offset;
477✔
657
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
477✔
658
  int32_t len = 0;
477✔
659
  int32_t code = 0;
477✔
660
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
477!
661
  if (code < 0) {
477!
662
    return TSDB_CODE_INVALID_PARA;
×
663
  }
664

665
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
477!
666
  if (buf == NULL) {
477!
667
    return terrno;
×
668
  }
669

670
  ((SMsgHead*)buf)->vgId = htonl(vgId);
477✔
671

672
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
477✔
673

674
  SEncoder encoder = {0};
477✔
675
  tEncoderInit(&encoder, abuf, len);
477✔
676
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
477!
677
    tEncoderClear(&encoder);
×
678
    taosMemoryFree(buf);
×
679
    return TSDB_CODE_INVALID_PARA;
×
680
  }
681
  tEncoderClear(&encoder);
477✔
682

683
  // build param
684
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
477!
685
  if (pParam == NULL) {
477!
686
    taosMemoryFree(buf);
×
687
    return terrno;
×
688
  }
689

690
  pParam->params = pParamSet;
477✔
691
  pParam->vgId = vgId;
477✔
692
  pParam->consumerId = tmq->consumerId;
477✔
693

694
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
477✔
695

696
  // build send info
697
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
477!
698
  if (pMsgSendInfo == NULL) {
477!
699
    taosMemoryFree(buf);
×
700
    taosMemoryFree(pParam);
×
701
    return terrno;
×
702
  }
703

704
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
477✔
705

706
  pMsgSendInfo->requestId = generateRequestId();
477✔
707
  pMsgSendInfo->requestObjRefId = 0;
477✔
708
  pMsgSendInfo->param = pParam;
477✔
709
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
477✔
710
  pMsgSendInfo->fp = tmqCommitCb;
477✔
711
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
477✔
712

713
  // int64_t transporterId = 0;
714
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
477✔
715
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
477✔
716
  if (code != 0) {
477!
717
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
718
  }
719
  return code;
477✔
720
}
721

722
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
80✔
723
  if (tmq == NULL || pTopicName == NULL || topic == NULL) {
80!
724
    return TSDB_CODE_INVALID_PARA;
×
725
  }
726
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
80✔
727
  for (int32_t i = 0; i < numOfTopics; ++i) {
80✔
728
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
77✔
729
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
77!
730
      continue;
×
731
    }
732
    *topic = pTopic;
77✔
733
    return 0;
77✔
734
  }
735

736
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
3!
737
  return TSDB_CODE_TMQ_INVALID_TOPIC;
3✔
738
}
739

740
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
495✔
741
                                       SMqCommitCbParamSet** ppParamSet) {
742
  if (tmq == NULL || ppParamSet == NULL) {
495!
743
    return TSDB_CODE_INVALID_PARA;
×
744
  }
745
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
495!
746
  if (pParamSet == NULL) {
495!
747
    return terrno;
×
748
  }
749

750
  pParamSet->refId = tmq->refId;
495✔
751
  pParamSet->epoch = tmq->epoch;
495✔
752
  pParamSet->callbackFn = pCommitFp;
495✔
753
  pParamSet->userParam = userParam;
495✔
754
  pParamSet->waitingRspNum = rspNum;
495✔
755
  *ppParamSet = pParamSet;
495✔
756
  return 0;
495✔
757
}
758

759
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
68✔
760
  if (tmq == NULL || pTopicName == NULL || pVg == NULL) {
68!
761
    return TSDB_CODE_INVALID_PARA;
×
762
  }
763
  SMqClientTopic* pTopic = NULL;
68✔
764
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
68✔
765
  if (code != 0) {
68!
766
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
767
    return code;
×
768
  }
769

770
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
68✔
771
  for (int32_t i = 0; i < numOfVgs; ++i) {
129!
772
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
129✔
773
    if (pClientVg && pClientVg->vgId == vgId) {
129!
774
      *pVg = pClientVg;
68✔
775
      break;
68✔
776
    }
777
  }
778

779
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
68!
780
}
781

782
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
1,114✔
783
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) {
1,114!
784
    return TSDB_CODE_INVALID_PARA;
×
785
  }
786
  int32_t code = 0;
1,114✔
787
  if (offsetVal->type <= 0) {
1,114✔
788
    code = TSDB_CODE_TMQ_INVALID_MSG;
33✔
789
    return code;
33✔
790
  }
791
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
1,081✔
792
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
604✔
793
    return code;
604✔
794
  }
795
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
477✔
796
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
477✔
797

798
  char commitBuf[TSDB_OFFSET_LEN] = {0};
477✔
799
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
477✔
800

801
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
477✔
802
  if (code != TSDB_CODE_SUCCESS) {
477!
803
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
804
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
805
    return code;
×
806
  }
807

808
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
477!
809
           tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
810
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
477✔
811
  return code;
477✔
812
}
813

814
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
55✔
815
                                 tmq_commit_cb* pCommitFp, void* userParam) {
816
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL) {
55!
817
    return TSDB_CODE_INVALID_PARA;
×
818
  }
819
  tqInfoC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
55!
820
  SMqCommitCbParamSet* pParamSet = NULL;
55✔
821
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
55✔
822
  if (code != 0){
55!
823
    return code;
×
824
  }
825

826
  taosRLockLatch(&tmq->lock);
55✔
827
  SMqClientVg* pVg = NULL;
55✔
828
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
55✔
829
  if (code == 0) {
55!
830
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
55✔
831
  }
832
  taosRUnLockLatch(&tmq->lock);
55✔
833

834
  if (code != 0){
55!
835
    taosMemoryFree(pParamSet);
×
836
  }
837
  return code;
55✔
838
}
839

840
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
53✔
841
  char*        pTopicName = NULL;
53✔
842
  int32_t      vgId = 0;
53✔
843
  STqOffsetVal offsetVal = {0};
53✔
844
  int32_t      code = 0;
53✔
845

846
  if (pRes == NULL || tmq == NULL) {
53!
847
    code = TSDB_CODE_INVALID_PARA;
×
848
    goto end;
×
849
  }
850

851
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
53!
852
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
853
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
53✔
854
    pTopicName = pRspObj->topic;
53✔
855
    vgId = pRspObj->vgId;
53✔
856
    offsetVal = pRspObj->rspOffset;
53✔
857
  } else {
858
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
859
    goto end;
×
860
  }
861

862
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
53✔
863

864
  end:
53✔
865
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
53!
866
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
867
    pCommitFp(tmq, code, userParam);
×
868
  }
869
}
53✔
870

871
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
440✔
872
  if (tmq == NULL || pParamSet == NULL) {
440!
873
    return TSDB_CODE_INVALID_PARA;
×
874
  }
875
  int32_t code = 0;
440✔
876
  taosRLockLatch(&tmq->lock);
440✔
877
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
440✔
878
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
440!
879

880
  for (int32_t i = 0; i < numOfTopics; i++) {
876✔
881
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
436✔
882
    if (pTopic == NULL) {
436!
883
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
884
      goto END;
×
885
    }
886
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
436✔
887
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
436!
888
    for (int32_t j = 0; j < numOfVgroups; j++) {
1,495✔
889
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
1,059✔
890
      if (pVg == NULL) {
1,059!
891
        code = terrno;
×
892
        goto END;
×
893
      }
894

895
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
1,059✔
896
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
1,059✔
897
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
33!
898
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
899
      }
900
    }
901
  }
902
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
440!
903
           numOfTopics);
904
  END:
345✔
905
  taosRUnLockLatch(&tmq->lock);
440✔
906
  return code;
440✔
907
}
908

909
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
440✔
910
  if (tmq == NULL) {
440!
911
    return;
×
912
  }
913
  int32_t code = 0;
440✔
914
  SMqCommitCbParamSet* pParamSet = NULL;
440✔
915
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
916
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
440✔
917
  if (code != 0) {
440!
918
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
919
    if (pCommitFp != NULL) {
×
920
      pCommitFp(tmq, code, userParam);
×
921
    }
922
    return;
×
923
  }
924
  code = innerCommitAll(tmq, pParamSet);
440✔
925
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
440✔
926
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
3!
927
  }
928

929
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
440✔
930
  if (code != 0) {
440!
931
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
932
  }
933
  return;
440✔
934
}
935

936
static void generateTimedTask(int64_t refId, int32_t type) {
868✔
937
  tmq_t*  tmq = NULL;
868✔
938
  int8_t* pTaskType = NULL;
868✔
939
  int32_t code = 0;
868✔
940

941
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
868✔
942
  if (tmq == NULL) return;
868!
943

944
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
868✔
945
  if (code == TSDB_CODE_SUCCESS) {
868!
946
    *pTaskType = type;
868✔
947
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
868!
948
      if (tsem2_post(&tmq->rspSem) != 0){
868!
949
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
950
      }
951
    }else{
952
      taosFreeQitem(pTaskType);
×
953
    }
954
  }
955

956
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
868✔
957
  if (code != 0){
868!
958
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
959
  }
960
}
961

962
void tmqAssignAskEpTask(void* param, void* tmrId) {
557✔
963
  int64_t refId = (int64_t)param;
557✔
964
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
557✔
965
}
557✔
966

967
void tmqReplayTask(void* param, void* tmrId) {
×
968
  int64_t refId = (int64_t)param;
×
969
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
×
970
  if (tmq == NULL) return;
×
971

972
  if (tsem2_post(&tmq->rspSem) != 0){
×
973
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
974
  }
975
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
×
976
  if (code != 0){
×
977
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
978
  }
979
}
980

981
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
311✔
982
  int64_t refId = (int64_t)param;
311✔
983
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
311✔
984
}
311✔
985

986
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
320✔
987
  if (pMsg == NULL) {
320!
988
    return TSDB_CODE_INVALID_PARA;
×
989
  }
990

991
  if (param == NULL || code != 0){
320!
992
    goto END;
4✔
993
  }
994

995
  SMqHbRsp rsp = {0};
316✔
996
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
316✔
997
  if (code != 0) {
316!
998
    goto END;
×
999
  }
1000

1001
  int64_t refId = (int64_t)param;
316✔
1002
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
316✔
1003
  if (tmq != NULL) {
316!
1004
    taosWLockLatch(&tmq->lock);
316✔
1005
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
618✔
1006
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
302✔
1007
      if (privilege && privilege->noPrivilege == 1) {
302!
1008
        int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
×
1009
        for (int32_t j = 0; j < topicNumCur; j++) {
×
1010
          SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
×
1011
          if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
×
1012
            tqInfoC("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
×
1013
            pTopicCur->noPrivilege = 1;
×
1014
          }
1015
        }
1016
      }
1017
    }
1018
    taosWUnLockLatch(&tmq->lock);
316✔
1019
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
316✔
1020
    if (code != 0){
316!
1021
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
1022
    }
1023
  }
1024

1025
  tqClientDebugFlag = rsp.debugFlag;
316✔
1026

1027
  tDestroySMqHbRsp(&rsp);
316✔
1028

1029
  END:
320✔
1030
  taosMemoryFree(pMsg->pData);
320!
1031
  taosMemoryFree(pMsg->pEpSet);
320!
1032
  return code;
320✔
1033
}
1034

1035
void tmqSendHbReq(void* param, void* tmrId) {
320✔
1036
  int64_t refId = (int64_t)param;
320✔
1037

1038
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
320✔
1039
  if (tmq == NULL) {
320!
1040
    return;
×
1041
  }
1042

1043
  SMqHbReq req = {0};
320✔
1044
  req.consumerId = tmq->consumerId;
320✔
1045
  req.epoch = tmq->epoch;
320✔
1046
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
320✔
1047
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
320!
1048
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
320✔
1049
  if (req.topics == NULL) {
320!
1050
    goto END;
×
1051
  }
1052
  taosRLockLatch(&tmq->lock);
320✔
1053
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
630✔
1054
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
310✔
1055
    if (pTopic == NULL) {
310!
1056
      continue;
×
1057
    }
1058
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
310✔
1059
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
310✔
1060
    if (data == NULL) {
310!
1061
      continue;
×
1062
    }
1063
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
310✔
1064
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
310✔
1065
    if (data->offsetRows == NULL) {
310!
1066
      continue;
×
1067
    }
1068
    for (int j = 0; j < numOfVgroups; j++) {
1,371✔
1069
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
1,061✔
1070
      if (pVg == NULL) {
1,061!
1071
        continue;
×
1072
      }
1073
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
1,061✔
1074
      if (offRows == NULL) {
1,061!
1075
        continue;
×
1076
      }
1077
      offRows->vgId = pVg->vgId;
1,061✔
1078
      offRows->rows = pVg->numOfRows;
1,061✔
1079
      offRows->offset = pVg->offsetInfo.endOffset;
1,061✔
1080
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
1,061✔
1081
      char buf[TSDB_OFFSET_LEN] = {0};
1,061✔
1082
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
1,061✔
1083
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
1,061!
1084
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1085
    }
1086
  }
1087
  taosRUnLockLatch(&tmq->lock);
320✔
1088

1089
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
320✔
1090
  if (tlen < 0) {
320!
1091
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1092
    goto END;
×
1093
  }
1094

1095
  void* pReq = taosMemoryCalloc(1, tlen);
320!
1096
  if (pReq == NULL) {
320!
1097
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1098
    goto END;
×
1099
  }
1100

1101
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
320!
1102
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1103
    taosMemoryFree(pReq);
×
1104
    goto END;
×
1105
  }
1106

1107
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
320!
1108
  if (sendInfo == NULL) {
320!
1109
    taosMemoryFree(pReq);
×
1110
    goto END;
×
1111
  }
1112

1113
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
320✔
1114

1115
  sendInfo->requestId = generateRequestId();
320✔
1116
  sendInfo->requestObjRefId = 0;
320✔
1117
  sendInfo->param = (void*)refId;
320✔
1118
  sendInfo->fp = tmqHbCb;
320✔
1119
  sendInfo->msgType = TDMT_MND_TMQ_HB;
320✔
1120

1121

1122
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
320✔
1123

1124
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
320✔
1125
  if (code != 0) {
320!
1126
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1127
  }
1128
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
320✔
1129

1130
  END:
320✔
1131
  tDestroySMqHbReq(&req);
320✔
1132
  if (tmrId != NULL) {
320✔
1133
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
230✔
1134
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
230!
1135
  }
1136
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
320✔
1137
  if (ret != 0){
320!
1138
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1139
  }
1140
}
1141

1142
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
126✔
1143
  if (code != 0 && pTmq != NULL) {
126!
1144
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1145
  }
1146
}
126✔
1147

1148
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
7,632✔
1149
  if (rspWrapper == NULL) {
7,632!
1150
    return;
×
1151
  }
1152
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
7,632✔
1153
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
515✔
1154
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
7,117!
1155
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
7,117!
1156
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
×
1157
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
×
1158
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
×
1159
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
×
1160
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
×
1161
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
×
1162
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
1163
    DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
×
1164
  }
1165
}
1166

1167
static void freeClientVg(void* param) {
269✔
1168
  if (param == NULL) {
269!
1169
    return;
×
1170
  }
1171
  SMqClientVg* pVg = param;
269✔
1172
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
269✔
1173
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
269✔
1174
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
269✔
1175
}
1176
static void freeClientTopic(void* param) {
64✔
1177
  if (param == NULL) {
64!
1178
    return;
×
1179
  }
1180
  SMqClientTopic* pTopic = param;
64✔
1181
  taosMemoryFreeClear(pTopic->schema.pSchema);
64!
1182
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
64✔
1183
}
1184

1185
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
65✔
1186
                                   tmq_t* tmq) {
1187
  if (pTopic == NULL || pTopicEp == NULL || pVgOffsetHashMap == NULL || tmq == NULL) {
65!
1188
    return;
×
1189
  }
1190
  pTopic->schema = pTopicEp->schema;
65✔
1191
  pTopicEp->schema.nCols = 0;
65✔
1192
  pTopicEp->schema.pSchema = NULL;
65✔
1193

1194
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
65✔
1195
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
65✔
1196

1197
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
65✔
1198
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
65✔
1199

1200
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
65!
1201
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
65✔
1202
  if (pTopic->vgs == NULL) {
65!
1203
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1204
    return;
×
1205
  }
1206
  for (int32_t j = 0; j < vgNumGet; j++) {
335✔
1207
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
270✔
1208
    if (pVgEp == NULL) {
270!
1209
      continue;
×
1210
    }
1211
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
270✔
1212
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
270✔
1213

1214
    STqOffsetVal offsetNew = {0};
270✔
1215
    offsetNew.type = tmq->resetOffsetCfg;
270✔
1216

1217
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
270!
1218
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1219

1220
    SMqClientVg clientVg = {
810✔
1221
        .pollCnt = 0,
1222
        .vgId = pVgEp->vgId,
270✔
1223
        .epSet = pVgEp->epSet,
1224
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
270✔
1225
        .vgSkipCnt = 0,
1226
        .blockReceiveTs = 0,
1227
        .blockSleepForReplay = 0,
1228
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
270✔
1229
    };
1230

1231
    clientVg.offsetInfo.walVerBegin = -1;
270✔
1232
    clientVg.offsetInfo.walVerEnd = -1;
270✔
1233
    clientVg.seekUpdated = false;
270✔
1234
    if (pInfo) {
270✔
1235
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
28✔
1236
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
28✔
1237
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
28✔
1238
    } else {
1239
      clientVg.offsetInfo.endOffset = offsetNew;
242✔
1240
      clientVg.offsetInfo.committedOffset = offsetNew;
242✔
1241
      clientVg.offsetInfo.beginOffset = offsetNew;
242✔
1242
    }
1243
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
540!
1244
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1245
               pTopic->topicName);
1246
      freeClientVg(&clientVg);
×
1247
    }
1248
  }
1249
}
1250

1251
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
55✔
1252
  if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
55!
1253
    return;
×
1254
  }
1255
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
55✔
1256
  if (pVgOffsetHashMap == NULL) {
55!
1257
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1258
    return;
×
1259
  }
1260

1261
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
55✔
1262
  for (int32_t i = 0; i < topicNumCur; i++) {
67✔
1263
    // find old topic
1264
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
12✔
1265
    if (pTopicCur && pTopicCur->vgs) {
12!
1266
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
12✔
1267
      tqInfoC("consumer:0x%" PRIx64 ", current vg num:%d", tmq->consumerId, vgNumCur);
12!
1268
      for (int32_t j = 0; j < vgNumCur; j++) {
48✔
1269
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
36✔
1270
        if (pVgCur == NULL) {
36!
1271
          continue;
×
1272
        }
1273
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
36✔
1274
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
36✔
1275

1276
        char buf[TSDB_OFFSET_LEN] = {0};
36✔
1277
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
36✔
1278
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
36!
1279

1280
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
36✔
1281
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1282
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1283
            .numOfRows = pVgCur->numOfRows,
36✔
1284
            .vgStatus = pVgCur->vgStatus};
36✔
1285
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
36!
1286
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1287
        }
1288
      }
1289
    }
1290
  }
1291

1292
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
120✔
1293
    SMqClientTopic topic = {0};
65✔
1294
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
65✔
1295
    if (pTopicEp == NULL) {
65!
1296
      continue;
×
1297
    }
1298
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
65✔
1299
    if (taosArrayPush(newTopics, &topic) == NULL) {
65!
1300
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1301
      freeClientTopic(&topic);
×
1302
    }
1303
  }
1304

1305
  taosHashCleanup(pVgOffsetHashMap);
55✔
1306
}
1307

1308
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
668✔
1309
  if (tmq == NULL || pRsp == NULL) {
668!
1310
    return;
×
1311
  }
1312
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
668✔
1313
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1314
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1315
  if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
668!
1316
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
567!
1317
             tmq->epoch, epoch, topicNumGet);
1318
    return;
567✔
1319
  }
1320

1321
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
101✔
1322
  if (newTopics == NULL) {
101!
1323
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1324
    return;
×
1325
  }
1326
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
101!
1327
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1328

1329
  taosWLockLatch(&tmq->lock);
101✔
1330
  if (topicNumGet > 0){
101✔
1331
    buildNewTopicList(tmq, newTopics, pRsp);
55✔
1332
  }
1333
  // destroy current buffered existed topics info
1334
  if (tmq->clientTopics) {
101!
1335
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
101✔
1336
  }
1337
  tmq->clientTopics = newTopics;
101✔
1338
  taosWUnLockLatch(&tmq->lock);
101✔
1339

1340
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
101✔
1341
  atomic_store_32(&tmq->epoch, epoch);
101✔
1342

1343
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
101!
1344
}
1345

1346
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
928✔
1347
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
928✔
1348
  if (pParam == NULL) {
928!
1349
    goto _ERR;
×
1350
  }
1351

1352
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
928✔
1353
  if (tmq == NULL) {
928!
1354
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1355
    goto _ERR;
×
1356
  }
1357

1358
  if (code != TSDB_CODE_SUCCESS) {
928✔
1359
    if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
259✔
1360
      tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
2!
1361
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){
2!
1362
        atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__LOST);
2✔
1363
      }
1364
    }
1365
    goto END;
259✔
1366
  }
1367

1368
  if (pMsg == NULL) {
669!
1369
    goto END;
×
1370
  }
1371
  SMqRspHead* head = pMsg->pData;
669✔
1372
  int32_t     epoch = atomic_load_32(&tmq->epoch);
669✔
1373
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
669!
1374
  if (pParam->sync) {
669✔
1375
    SMqAskEpRsp rsp = {0};
154✔
1376
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
308!
1377
      doUpdateLocalEp(tmq, head->epoch, &rsp);
154✔
1378
    }
1379
    tDeleteSMqAskEpRsp(&rsp);
1380
  } else {
1381
    SMqRspWrapper* pWrapper = NULL;
515✔
1382
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
515✔
1383
    if (code) {
515!
1384
      goto END;
×
1385
    }
1386

1387
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
515✔
1388
    pWrapper->epoch = head->epoch;
515✔
1389
    (void)memcpy(&pWrapper->epRsp, pMsg->pData, sizeof(SMqRspHead));
515✔
1390
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->epRsp) == NULL) {
1,030!
1391
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1392
      taosFreeQitem(pWrapper);
×
1393
    } else {
1394
      code = taosWriteQitem(tmq->mqueue, pWrapper);
515✔
1395
      if (code != 0) {
515!
1396
        tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1397
        taosFreeQitem(pWrapper);
×
1398
        tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1399
      }
1400
    }
1401
  }
1402

1403
  END:
928✔
1404
  {
1405
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
928✔
1406
    if (ret != 0){
928!
1407
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1408
    }
1409
  }
1410

1411
  _ERR:
928✔
1412
  if (pParam && pParam->sync) {
928!
1413
    SAskEpInfo* pInfo = pParam->pParam;
413✔
1414
    if (pInfo) {
413!
1415
      pInfo->code = code;
413✔
1416
      if (tsem2_post(&pInfo->sem) != 0){
413!
1417
        tqErrorC("failed to post rsp sem askep cb");
×
1418
      }
1419
    }
1420
  }
1421

1422
  if (pMsg) {
928!
1423
    taosMemoryFree(pMsg->pEpSet);
928!
1424
    taosMemoryFree(pMsg->pData);
928!
1425
  }
1426

1427
  return code;
928✔
1428
}
1429

1430
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
928✔
1431
  if (pTmq == NULL) {
928!
1432
    return TSDB_CODE_INVALID_PARA;
×
1433
  }
1434
  int32_t code = 0;
928✔
1435
  int32_t lino = 0;
928✔
1436
  SMqAskEpReq req = {0};
928✔
1437
  req.consumerId = pTmq->consumerId;
928✔
1438
  req.epoch = updateEpSet ? -1 : pTmq->epoch;
928!
1439
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
928✔
1440
  SMqAskEpCbParam* pParam = NULL;
928✔
1441
  void*            pReq = NULL;
928✔
1442

1443
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
928✔
1444
  TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
928!
1445
  pReq = taosMemoryCalloc(1, tlen);
928!
1446
  TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
928!
1447

1448
  code = tSerializeSMqAskEpReq(pReq, tlen, &req);
928✔
1449
  TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
928!
1450

1451
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
928!
1452
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
928!
1453

1454
  pParam->refId = pTmq->refId;
928✔
1455
  pParam->sync = sync;
928✔
1456
  pParam->pParam = param;
928✔
1457

1458
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
928!
1459
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
928!
1460

1461
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
928✔
1462
  sendInfo->requestId = generateRequestId();
928✔
1463
  sendInfo->requestObjRefId = 0;
928✔
1464
  sendInfo->param = pParam;
928✔
1465
  sendInfo->paramFreeFp = taosAutoMemoryFree;
928✔
1466
  sendInfo->fp = askEpCb;
928✔
1467
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
928✔
1468

1469
  pReq = NULL;
928✔
1470
  pParam = NULL;
928✔
1471

1472
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
928✔
1473
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode, QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
928!
1474
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
928✔
1475

1476
END:
928✔
1477
  if (code != 0) {
928!
1478
    tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
×
1479
  }
1480
  taosMemoryFree(pReq);
928!
1481
  taosMemoryFree(pParam);
928!
1482
  return code;
928✔
1483
}
1484

1485
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
13,028✔
1486
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
13,028!
1487
  while (1) {
819✔
1488
    int8_t* pTaskType = NULL;
13,847✔
1489
    taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
13,847✔
1490
    if (pTaskType == NULL) {break;}
13,847✔
1491
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
819✔
1492
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
515!
1493
      int32_t code = askEp(pTmq, NULL, false, false);
515✔
1494
      if (code != 0) {
515!
1495
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1496
      }
1497
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
515!
1498
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
515✔
1499
                              &pTmq->epTimer);
1500
      tqDebugC("reset timer for tmq ask ep:%d", ret);
515!
1501
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
304!
1502
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
304✔
1503
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
304✔
1504
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
304!
1505
               pTmq->autoCommitInterval / 1000.0);
1506
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
304✔
1507
                              &pTmq->commitTimer);
1508
      tqDebugC("reset timer for commit:%d", ret);
304!
1509
    } else {
1510
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1511
    }
1512

1513
    taosFreeQitem(pTaskType);
819✔
1514
  }
1515

1516
  return 0;
13,028✔
1517
}
1518

1519
void tmqClearUnhandleMsg(tmq_t* tmq) {
139✔
1520
  if (tmq == NULL) return;
139!
1521
  while (1) {
85✔
1522
    SMqRspWrapper* rspWrapper = NULL;
224✔
1523
    taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
224✔
1524
    if (rspWrapper == NULL) break;
224✔
1525
    tmqFreeRspWrapper(rspWrapper);
85✔
1526
    taosFreeQitem(rspWrapper);
85✔
1527
  }
1528
}
1529

1530
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
144✔
1531
  if (pMsg) {
144!
1532
    taosMemoryFreeClear(pMsg->pEpSet);
144!
1533
  }
1534

1535
  if (param == NULL) {
144!
1536
    return code;
×
1537
  }
1538

1539
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
144✔
1540
  pParam->rspErr = code;
144✔
1541

1542
  if (tsem2_post(&pParam->rspSem) != 0){
144!
1543
    tqErrorC("failed to post sem, subscribe cb");
×
1544
  }
1545
  return 0;
144✔
1546
}
1547

1548
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
5✔
1549
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
5!
1550
  if (*topics == NULL) {
5✔
1551
    *topics = tmq_list_new();
1✔
1552
    if (*topics == NULL) {
1!
1553
      return terrno;
×
1554
    }
1555
  }
1556
  taosRLockLatch(&tmq->lock);
5✔
1557
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
10✔
1558
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
5✔
1559
    if (topic == NULL) {
5!
1560
      tqErrorC("topic is null");
×
1561
      continue;
×
1562
    }
1563
    char* tmp = strchr(topic->topicName, '.');
5✔
1564
    if (tmp == NULL) {
5!
1565
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1566
      continue;
×
1567
    }
1568
    if (tmq_list_append(*topics, tmp + 1) != 0) {
5!
1569
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1570
      continue;
×
1571
    }
1572
  }
1573
  taosRUnLockLatch(&tmq->lock);
5✔
1574
  return 0;
5✔
1575
}
1576

1577
void tmqFreeImpl(void* handle) {
49✔
1578
  if (handle == NULL) return;
49!
1579
  tmq_t*  tmq = (tmq_t*)handle;
49✔
1580
  int64_t id = tmq->consumerId;
49✔
1581

1582
  if (tmq->mqueue) {
49!
1583
    tmqClearUnhandleMsg(tmq);
49✔
1584
    taosCloseQueue(tmq->mqueue);
49✔
1585
  }
1586

1587
  if (tmq->delayedTask) {
49!
1588
    taosCloseQueue(tmq->delayedTask);
49✔
1589
  }
1590

1591
  if(tsem2_destroy(&tmq->rspSem) != 0) {
49!
1592
    tqErrorC("failed to destroy sem in free tmq");
×
1593
  }
1594

1595
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
49✔
1596
  taos_close_internal(tmq->pTscObj);
49✔
1597

1598
  if (tmq->commitTimer) {
49✔
1599
    if (!taosTmrStopA(&tmq->commitTimer)) {
40✔
1600
      tqErrorC("failed to stop commit timer");
7!
1601
    }
1602
  }
1603
  if (tmq->epTimer) {
49✔
1604
    if (!taosTmrStopA(&tmq->epTimer)) {
45✔
1605
      tqErrorC("failed to stop ep timer");
41!
1606
    }
1607
  }
1608
  if (tmq->hbLiveTimer) {
49!
1609
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
49!
1610
      tqErrorC("failed to stop hb timer");
×
1611
    }
1612
  }
1613
  taosMemoryFree(tmq);
49!
1614

1615
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
49!
1616
}
1617

1618
static void tmqMgmtInit(void) {
18✔
1619
  tmqInitRes = 0;
18✔
1620

1621
  if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
18!
1622
    goto END;
×
1623
  }
1624

1625
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
18✔
1626

1627
  if (tmqMgmt.timer == NULL) {
18!
1628
    goto END;
×
1629
  }
1630

1631
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
18✔
1632
  if (tmqMgmt.rsetId < 0) {
18!
1633
    goto END;
×
1634
  }
1635

1636
  return;
18✔
1637
END:
×
1638
  tmqInitRes = terrno;
×
1639
}
1640

1641
void tmqMgmtClose(void) {
394✔
1642
  if (tmqMgmt.timer) {
394✔
1643
    taosTmrCleanUp(tmqMgmt.timer);
18✔
1644
    tmqMgmt.timer = NULL;
18✔
1645
  }
1646

1647
  if (tmqMgmt.rsetId > 0) {
394✔
1648
    (void) taosThreadMutexLock(&tmqMgmt.lock);
18✔
1649
    tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
18✔
1650
    int64_t  refId = 0;
18✔
1651

1652
    while (tmq) {
20✔
1653
      refId = tmq->refId;
2✔
1654
      if (refId == 0) {
2!
1655
        break;
×
1656
      }
1657
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
2✔
1658
      tmq = taosIterateRef(tmqMgmt.rsetId, refId);
2✔
1659
    }
1660
    taosCloseRef(tmqMgmt.rsetId);
18✔
1661
    tmqMgmt.rsetId = -1;
18✔
1662
    (void)taosThreadMutexUnlock(&tmqMgmt.lock);
18✔
1663
  }
1664
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
394✔
1665
}
394✔
1666

1667
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
51✔
1668
  int32_t code = 0;
51✔
1669

1670
  if (conf == NULL) {
51!
1671
    SET_ERROR_MSG_TMQ("configure is null")
×
1672
    return NULL;
×
1673
  }
1674
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
51✔
1675
  if (code != 0) {
51!
1676
    SET_ERROR_MSG_TMQ("tmq init error")
×
1677
    return NULL;
×
1678
  }
1679
  if (tmqInitRes != 0) {
51!
1680
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1681
    return NULL;
×
1682
  }
1683

1684
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
51!
1685
  if (pTmq == NULL) {
51!
1686
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1687
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1688
    return NULL;
×
1689
  }
1690

1691
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
51✔
1692
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
51✔
1693

1694
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
51✔
1695
  if (pTmq->clientTopics == NULL) {
51!
1696
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1697
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1698
    goto _failed;
×
1699
  }
1700
  code = taosOpenQueue(&pTmq->mqueue);
51✔
1701
  if (code) {
51!
1702
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1703
             pTmq->groupId);
1704
    SET_ERROR_MSG_TMQ("open queue failed")
×
1705
    goto _failed;
×
1706
  }
1707

1708
  code = taosOpenQueue(&pTmq->delayedTask);
51✔
1709
  if (code) {
51!
1710
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1711
             pTmq->groupId);
1712
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1713
    goto _failed;
×
1714
  }
1715

1716
  if (conf->groupId[0] == 0) {
51!
1717
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1718
             pTmq->groupId);
1719
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1720
    goto _failed;
×
1721
  }
1722

1723
  // init status
1724
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
51✔
1725
  pTmq->pollCnt = 0;
51✔
1726
  pTmq->epoch = 0;
51✔
1727
  pTmq->pollFlag = 0;
51✔
1728

1729
  // set conf
1730
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
51✔
1731
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
51✔
1732
  pTmq->withTbName = conf->withTbName;
51✔
1733
  pTmq->useSnapshot = conf->snapEnable;
51✔
1734
  pTmq->autoCommit = conf->autoCommit;
51✔
1735
  pTmq->autoCommitInterval = conf->autoCommitInterval;
51✔
1736
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
51✔
1737
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
51✔
1738
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
51✔
1739
  pTmq->commitCb = conf->commitCb;
51✔
1740
  pTmq->commitCbUserParam = conf->commitCbUserParam;
51✔
1741
  pTmq->resetOffsetCfg = conf->resetOffset;
51✔
1742
  pTmq->replayEnable = conf->replayEnable;
51✔
1743
  pTmq->sourceExcluded = conf->sourceExcluded;
51✔
1744
  pTmq->rawData = conf->rawData;
51✔
1745
  pTmq->maxPollWaitTime = conf->maxPollWaitTime;
51✔
1746
  pTmq->minPollRows = conf->minPollRows;
51✔
1747
  pTmq->enableBatchMeta = conf->enableBatchMeta;
51✔
1748
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
51✔
1749
  if (taosGetFqdn(pTmq->fqdn) != 0) {
51!
1750
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1751
  }
1752
  if (conf->replayEnable) {
51!
1753
    pTmq->autoCommit = false;
×
1754
  }
1755
  taosInitRWLatch(&pTmq->lock);
51✔
1756

1757
  // assign consumerId
1758
  pTmq->consumerId = tGenIdPI64();
51✔
1759

1760
  // init semaphore
1761
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
51!
1762
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1763
             tstrerror(TAOS_SYSTEM_ERROR(ERRNO)), pTmq->groupId);
1764
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1765
    goto _failed;
×
1766
  }
1767

1768
  // init connection
1769
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
51✔
1770
  if (code) {
51!
1771
    terrno = code;
×
1772
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1773
    SET_ERROR_MSG_TMQ("init tscObj failed")
×
1774
    goto _failed;
×
1775
  }
1776

1777
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
51✔
1778
  if (pTmq->refId < 0) {
51!
1779
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1780
    goto _failed;
×
1781
  }
1782

1783
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
51✔
1784
  if (pTmq->hbLiveTimer == NULL) {
51!
1785
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1786
    goto _failed;
×
1787
  }
1788
  char         buf[TSDB_OFFSET_LEN] = {0};
51✔
1789
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
51✔
1790
  tFormatOffset(buf, tListLen(buf), &offset);
51✔
1791
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
51!
1792
              ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
1793
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1794
          buf);
1795

1796
  return pTmq;
51✔
1797

1798
  _failed:
×
1799
  tmqFreeImpl(pTmq);
×
1800
  return NULL;
×
1801
}
1802

1803
static int32_t syncAskEp(tmq_t* pTmq) {
413✔
1804
  if (pTmq == NULL) return TSDB_CODE_INVALID_PARA;
413!
1805
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
413!
1806
  if (pInfo == NULL) return terrno;
413!
1807
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
413!
1808
    taosMemoryFree(pInfo);
×
1809
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1810
  }
1811

1812
  int32_t code = askEp(pTmq, pInfo, true, false);
413✔
1813
  if (code == 0) {
413!
1814
    if (tsem2_wait(&pInfo->sem) != 0){
413!
1815
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1816
    }
1817
    code = pInfo->code;
413✔
1818
  }
1819

1820
  if(tsem2_destroy(&pInfo->sem) != 0) {
413!
1821
    tqErrorC("failed to destroy sem sync ask ep");
×
1822
  }
1823
  taosMemoryFree(pInfo);
413!
1824
  return code;
413✔
1825
}
1826

1827
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
144✔
1828
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
144!
1829
  const SArray*   container = &topic_list->container;
144✔
1830
  int32_t         sz = taosArrayGetSize(container);
144✔
1831
  void*           buf = NULL;
144✔
1832
  SMsgSendInfo*   sendInfo = NULL;
144✔
1833
  SCMSubscribeReq req = {0};
144✔
1834
  int32_t         code = 0;
144✔
1835

1836
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
144!
1837

1838
  req.consumerId = tmq->consumerId;
144✔
1839
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
144✔
1840
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
144✔
1841
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
144✔
1842
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
144✔
1843

1844
  req.topicNames = taosArrayInit(sz, sizeof(void*));
144✔
1845
  if (req.topicNames == NULL) {
144!
1846
    code = terrno;
×
1847
    goto END;
×
1848
  }
1849

1850
  req.withTbName = tmq->withTbName;
144✔
1851
  req.autoCommit = tmq->autoCommit;
144✔
1852
  req.autoCommitInterval = tmq->autoCommitInterval;
144✔
1853
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
144✔
1854
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
144✔
1855
  req.resetOffsetCfg = tmq->resetOffsetCfg;
144✔
1856
  req.enableReplay = tmq->replayEnable;
144✔
1857
  req.enableBatchMeta = tmq->enableBatchMeta;
144✔
1858

1859
  for (int32_t i = 0; i < sz; i++) {
202✔
1860
    char* topic = taosArrayGetP(container, i);
58✔
1861
    if (topic == NULL) {
58!
1862
      code = terrno;
×
1863
      goto END;
×
1864
    }
1865
    SName name = {0};
58✔
1866
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
58✔
1867
    if (code) {
58!
1868
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1869
               code);
1870
      goto END;
×
1871
    }
1872
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
58!
1873
    if (topicFName == NULL) {
58!
1874
      code = terrno;
×
1875
      goto END;
×
1876
    }
1877

1878
    code = tNameExtractFullName(&name, topicFName);
58✔
1879
    if (code) {
58!
1880
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1881
               code);
1882
      taosMemoryFree(topicFName);
×
1883
      goto END;
×
1884
    }
1885

1886
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
116!
1887
      code = terrno;
×
1888
      taosMemoryFree(topicFName);
×
1889
      goto END;
×
1890
    }
1891
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
58!
1892
  }
1893

1894
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
144✔
1895
  buf = taosMemoryMalloc(tlen);
144!
1896
  if (buf == NULL) {
144!
1897
    code = terrno;
×
1898
    goto END;
×
1899
  }
1900

1901
  void* abuf = buf;
144!
1902
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
144✔
1903

1904
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
144!
1905
  if (sendInfo == NULL) {
144!
1906
    code = terrno;
×
1907
    taosMemoryFree(buf);
×
1908
    goto END;
×
1909
  }
1910

1911
  SMqSubscribeCbParam param = {.rspErr = 0};
144✔
1912
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
144!
1913
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1914
    taosMemoryFree(buf);
×
1915
    taosMemoryFree(sendInfo);
×
1916
    goto END;
×
1917
  }
1918

1919
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
144✔
1920
  sendInfo->requestId = generateRequestId();
144✔
1921
  sendInfo->requestObjRefId = 0;
144✔
1922
  sendInfo->param = &param;
144✔
1923
  sendInfo->fp = tmqSubscribeCb;
144✔
1924
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
144✔
1925

1926
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
144✔
1927

1928
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
144✔
1929
  if (code != 0) {
144!
1930
    goto END;
×
1931
  }
1932

1933
  if (tsem2_wait(&param.rspSem) != 0){
144!
1934
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1935
  }
1936
  if(tsem2_destroy(&param.rspSem) != 0) {
144!
1937
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1938
  }
1939

1940
  if (param.rspErr != 0) {
144✔
1941
    code = param.rspErr;
5✔
1942
    goto END;
5✔
1943
  }
1944

1945
  int32_t retryCnt = 0;
139✔
1946
  while ((code = syncAskEp(tmq)) != 0) {
396✔
1947
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
259!
1948
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
2!
1949
               tmq->consumerId, tstrerror(code));
1950
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2!
1951
        code = 0;
2✔
1952
      }
1953
      goto END;
2✔
1954
    }
1955

1956
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
257!
1957
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
257✔
1958
  }
1959

1960
  if (tmq->epTimer == NULL){
137✔
1961
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
46✔
1962
    if (tmq->epTimer == NULL) {
46!
1963
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1964
      goto END;
×
1965
    }
1966
  }
1967
  if (tmq->autoCommit && tmq->commitTimer == NULL){
137✔
1968
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
40✔
1969
    if (tmq->commitTimer == NULL) {
40!
1970
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1971
      goto END;
×
1972
    }
1973
  }
1974

1975
  END:
137✔
1976
  taosArrayDestroyP(req.topicNames, NULL);
144✔
1977
  return code;
144✔
1978
}
1979

1980
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
11✔
1981
  if (conf == NULL) return;
11!
1982
  conf->commitCb = cb;
11✔
1983
  conf->commitCbUserParam = param;
11✔
1984
}
1985

1986
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
7,033✔
1987
  if (tmq == NULL || topicName == NULL || pVg == NULL) {
7,033!
1988
    return;
×
1989
  }
1990
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
7,033✔
1991
  for (int i = 0; i < topicNumCur; i++) {
10,963✔
1992
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
10,955✔
1993
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
10,955!
1994
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
7,033✔
1995
      for (int32_t j = 0; j < vgNumCur; j++) {
14,820✔
1996
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
14,812✔
1997
        if (pVgCur && pVgCur->vgId == vgId) {
14,812!
1998
          *pVg = pVgCur;
7,025✔
1999
          return;
7,025✔
2000
        }
2001
      }
2002
    }
2003
  }
2004
}
2005

2006
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
7,016✔
2007
  if (tmq == NULL || topicName == NULL) {
7,016!
2008
    return NULL;
×
2009
  }
2010
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
7,016✔
2011
  for (int i = 0; i < topicNumCur; i++) {
10,928!
2012
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
10,928✔
2013
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
10,928✔
2014
      return pTopicCur;
7,016✔
2015
    }
2016
  }
2017
  return NULL;
×
2018
}
2019

2020
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
7,118✔
2021
  tmq_t*             tmq = NULL;
7,118✔
2022
  SMqRspWrapper*     pRspWrapper = NULL;
7,118✔
2023
  int8_t             rspType = 0;
7,118✔
2024
  int32_t            vgId = 0;
7,118✔
2025
  uint64_t           requestId = 0;
7,118✔
2026
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
7,118✔
2027
  if (pMsg == NULL) {
7,118!
2028
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2029
  }
2030
  if (pParam == NULL) {
7,118!
2031
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2032
    goto EXIT;
×
2033
  }
2034
  int64_t refId = pParam->refId;
7,118✔
2035
  vgId = pParam->vgId;
7,118✔
2036
  requestId = pParam->requestId;
7,118✔
2037
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
7,118✔
2038
  if (tmq == NULL) {
7,118!
2039
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
2040
    goto EXIT;
×
2041
  }
2042

2043
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
7,118✔
2044
  if (ret) {
7,118!
2045
    code = ret;
×
2046
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
×
2047
    goto END;
×
2048
  }
2049

2050
  if (code != 0) {
7,118✔
2051
    goto END;
10✔
2052
  }
2053

2054
  if (pMsg->pData == NULL) {
7,108!
2055
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
2056
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2057
    goto END;
×
2058
  }
2059

2060
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
7,108✔
2061
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
7,108✔
2062

2063
  if (msgEpoch != clientEpoch) {
7,108✔
2064
    tqErrorC("consumer:0x%" PRIx64
15!
2065
                 " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
2066
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
2067
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
15✔
2068
    goto END;
15✔
2069
  }
2070
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
7,093✔
2071
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s), QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
7,093!
2072

2073
  pRspWrapper->tmqRspType = rspType;
7,093✔
2074
  pRspWrapper->pollRsp.reqId = requestId;
7,093✔
2075
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
7,093✔
2076
  pRspWrapper->pollRsp.data = pMsg->pData;
7,093✔
2077
  pRspWrapper->pollRsp.len = pMsg->len;
7,093✔
2078
  pMsg->pData = NULL;
7,093✔
2079
  pMsg->pEpSet = NULL;
7,093✔
2080

2081
  END:
7,118✔
2082
  if (pRspWrapper) {
7,118!
2083
    pRspWrapper->code = code;
7,118✔
2084
    pRspWrapper->pollRsp.vgId = vgId;
7,118✔
2085
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
7,118✔
2086
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
7,118✔
2087
    if (code != 0) {
7,118!
2088
      tmqFreeRspWrapper(pRspWrapper);
×
2089
      taosFreeQitem(pRspWrapper);
×
2090
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2091
    } else {
2092
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d, QID:0x%" PRIx64,
7,118!
2093
               tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
2094
    }
2095
  }
2096

2097
  if (tsem2_post(&tmq->rspSem) != 0){
7,118!
2098
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2099
  }
2100
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
7,118✔
2101
  if (ret != 0){
7,118!
2102
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2103
  }
2104

2105
  EXIT:
7,118✔
2106
  taosMemoryFreeClear(pMsg->pData);
7,118!
2107
  taosMemoryFreeClear(pMsg->pEpSet);
7,118!
2108
  return code;
7,118✔
2109
}
2110

2111
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
7,130✔
2112
  if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) {
7,130!
2113
    return;
×
2114
  }
2115
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
7,130✔
2116
  pReq->withTbName = tmq->withTbName;
7,130✔
2117
  pReq->consumerId = tmq->consumerId;
7,130✔
2118
  pReq->timeout = tmq->maxPollWaitTime;
7,130✔
2119
  pReq->minPollRows = tmq->minPollRows;
7,130✔
2120
  pReq->epoch = tmq->epoch;
7,130✔
2121
  pReq->reqOffset = pVg->offsetInfo.endOffset;
7,130✔
2122
  pReq->head.vgId = pVg->vgId;
7,130✔
2123
  pReq->useSnapshot = tmq->useSnapshot;
7,130✔
2124
  pReq->reqId = generateRequestId();
7,130✔
2125
  pReq->enableReplay = tmq->replayEnable;
7,130✔
2126
  pReq->sourceExcluded = tmq->sourceExcluded;
7,130✔
2127
  pReq->rawData = tmq->rawData;
7,130✔
2128
  pReq->enableBatchMeta = tmq->enableBatchMeta;
7,130✔
2129
}
2130

2131
void changeByteEndian(char* pData) {
42,022✔
2132
  if (pData == NULL) {
42,022!
2133
    return;
×
2134
  }
2135
  char* p = pData;
42,022✔
2136

2137
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2138
  // length | version:
2139
  int32_t blockVersion = *(int32_t*)p;
42,022✔
2140
  if (blockVersion != BLOCK_VERSION_1) {
42,022!
2141
    tqErrorC("invalid block version:%d", blockVersion);
×
2142
    return;
×
2143
  }
2144
  *(int32_t*)p = BLOCK_VERSION_2;
42,022✔
2145

2146
  p += sizeof(int32_t);
42,022✔
2147
  p += sizeof(int32_t);
42,022✔
2148
  p += sizeof(int32_t);
42,022✔
2149
  int32_t cols = *(int32_t*)p;
42,022✔
2150
  p += sizeof(int32_t);
42,022✔
2151
  p += sizeof(int32_t);
42,022✔
2152
  p += sizeof(uint64_t);
42,022✔
2153
  // check fields
2154
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
42,022✔
2155

2156
  int32_t* colLength = (int32_t*)p;
42,022✔
2157
  for (int32_t i = 0; i < cols; ++i) {
188,543✔
2158
    colLength[i] = htonl(colLength[i]);
146,521✔
2159
  }
2160
}
2161

2162
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
83,982✔
2163
  if (pRetrieve == NULL || rawData == NULL || rows == NULL) {
83,982!
2164
    return;
×
2165
  }
2166
  if (*(int64_t*)pRetrieve == 0) {
83,982!
2167
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2168
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2169
    if (precision != NULL) {
×
2170
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2171
    }
2172
  } else if (*(int64_t*)pRetrieve == 1) {
83,982!
2173
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
83,982✔
2174
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
83,982✔
2175
    if (precision != NULL) {
83,982✔
2176
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
41,960✔
2177
    }
2178
  }
2179
}
2180

2181
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
4,747✔
2182
                                        SMqRspObj* pRspObj) {
2183
  if (pWrapper == NULL || pVg == NULL || numOfRows == NULL || pRspObj == NULL) {
4,747!
2184
    return;
×
2185
  }
2186
  pRspObj->resIter = -1;
4,747✔
2187
  pRspObj->resInfo.totalRows = 0;
4,747✔
2188
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
4,747✔
2189

2190
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
4,747✔
2191
  bool needTransformSchema = !pDataRsp->withSchema;
4,747✔
2192
  if (!pDataRsp->withSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
4,747✔
2193
    pDataRsp->withSchema = true;
4,746✔
2194
    pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
4,746✔
2195
    if (pDataRsp->blockSchema == NULL) {
4,746!
2196
      tqErrorC("failed to allocate memory for blockSchema");
×
2197
      return;
×
2198
    }
2199
  }
2200
  // extract the rows in this data packet
2201
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
46,769✔
2202
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
42,022✔
2203
    void*   rawData = NULL;
42,022✔
2204
    int64_t rows = 0;
42,022✔
2205
    // deal with compatibility
2206
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
42,022✔
2207

2208
    pVg->numOfRows += rows;
42,022✔
2209
    (*numOfRows) += rows;
42,022✔
2210
    changeByteEndian(rawData);
42,022✔
2211
    if (needTransformSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
42,022✔
2212
      SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
42,021!
2213
      if (schema) {
42,021!
2214
        if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
84,042!
2215
          tqErrorC("failed to push schema into blockSchema");
×
2216
          continue;
×
2217
        }
2218
      }
2219
    }
2220
  }
2221
}
2222

2223
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
7,118✔
2224
  SMqPollReq      req = {0};
7,118✔
2225
  char*           msg = NULL;
7,118✔
2226
  SMqPollCbParam* pParam = NULL;
7,118✔
2227
  SMsgSendInfo*   sendInfo = NULL;
7,118✔
2228
  int             code = 0;
7,118✔
2229
  int             lino = 0;
7,118✔
2230
  tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg);
7,118✔
2231

2232
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
7,118✔
2233
  TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
7,118!
2234

2235
  msg = taosMemoryCalloc(1, msgSize);
7,118!
2236
  TSDB_CHECK_NULL(msg, code, lino, END, terrno);
7,118!
2237

2238
  TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
7,118!
2239

2240
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
7,118!
2241
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
7,118!
2242

2243
  pParam->refId = pTmq->refId;
7,118✔
2244
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
7,118✔
2245
  pParam->vgId = pVg->vgId;
7,118✔
2246
  pParam->requestId = req.reqId;
7,118✔
2247

2248
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
7,118!
2249
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
7,118!
2250

2251
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
7,118✔
2252
  sendInfo->requestId = req.reqId;
7,118✔
2253
  sendInfo->requestObjRefId = 0;
7,118✔
2254
  sendInfo->param = pParam;
7,118✔
2255
  sendInfo->paramFreeFp = taosAutoMemoryFree;
7,118✔
2256
  sendInfo->fp = tmqPollCb;
7,118✔
2257
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
7,118✔
2258

2259
  msg = NULL;
7,118✔
2260
  pParam = NULL;
7,118✔
2261

2262
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
7,118✔
2263
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
7,118✔
2264
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
7,118✔
2265
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, QID:0x%" PRIx64, pTmq->consumerId,
7,118!
2266
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2267
  TSDB_CHECK_CODE(code, lino, END);
7,118!
2268

2269
  pVg->pollCnt++;
7,118✔
2270
  pVg->seekUpdated = false;  // reset this flag.
7,118✔
2271
  pTmq->pollCnt++;
7,118✔
2272

2273
END:
7,118✔
2274
  if (code != 0){
7,118!
2275
    tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
×
2276
  }
2277
  taosMemoryFreeClear(pParam);
7,118!
2278
  taosMemoryFreeClear(msg);
7,118!
2279
  return code;
7,118✔
2280
}
2281

2282
static int32_t tmqPollImpl(tmq_t* tmq) {
8,281✔
2283
  if (tmq == NULL) {
8,281!
2284
    return TSDB_CODE_INVALID_MSG;
×
2285
  }
2286
  int32_t code = 0;
8,281✔
2287
  taosWLockLatch(&tmq->lock);
8,281✔
2288

2289
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){
8,281!
2290
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
2291
    goto end;
×
2292
  }
2293

2294
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
8,281✔
2295
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
8,281!
2296

2297
  for (int i = 0; i < numOfTopics; i++) {
20,674✔
2298
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
12,393✔
2299
    if (pTopic == NULL) {
12,393!
2300
      continue;
×
2301
    }
2302
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
12,393✔
2303
    if (pTopic->noPrivilege) {
12,393!
2304
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
×
2305
      continue;
×
2306
    }
2307
    for (int j = 0; j < numOfVg; j++) {
50,943✔
2308
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
38,550✔
2309
      if (pVg == NULL) {
38,550!
2310
        continue;
×
2311
      }
2312
      int64_t elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
38,550✔
2313
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
38,550!
2314
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
×
2315
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2316
        continue;
×
2317
      }
2318

2319
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
38,550✔
2320
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
38,550✔
2321
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
31,432✔
2322
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
31,432!
2323
                 pVg->vgId, vgSkipCnt);
2324
        continue;
31,432✔
2325
      }
2326

2327
      atomic_store_32(&pVg->vgSkipCnt, 0);
7,118✔
2328
      code = doTmqPollImpl(tmq, pTopic, pVg);
7,118✔
2329
      if (code != TSDB_CODE_SUCCESS) {
7,118!
2330
        goto end;
×
2331
      }
2332
    }
2333
  }
2334

2335
  end:
8,281✔
2336
  taosWUnLockLatch(&tmq->lock);
8,281✔
2337
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
8,281!
2338
  return code;
8,281✔
2339
}
2340

2341
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
7,016✔
2342
                         int64_t consumerId, bool hasData) {
2343
  if (pVg == NULL || reqOffset == NULL || rspOffset == NULL) {
7,016!
2344
    return;
×
2345
  }
2346
  if (!pVg->seekUpdated) {
7,016✔
2347
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
7,009!
2348
    if (hasData) {
7,009✔
2349
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
4,747✔
2350
    }
2351
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
7,009✔
2352
  } else {
2353
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
7!
2354
  }
2355

2356
  // update the status
2357
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
7,016✔
2358

2359
  // update the valid wal version range
2360
  pVg->offsetInfo.walVerBegin = sver;
7,016✔
2361
  pVg->offsetInfo.walVerEnd = ever + 1;
7,016✔
2362
}
2363

2364
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
4,747✔
2365
  typedef union {
2366
    SMqDataRsp      dataRsp;
2367
    SMqMetaRsp      metaRsp;
2368
    SMqBatchMetaRsp batchMetaRsp;
2369
  } MEMSIZE;
2370

2371
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
4,747!
2372
  if (pRspObj == NULL) {
4,747!
2373
    tqErrorC("buildRsp:failed to allocate memory");
×
2374
    return NULL;
×
2375
  }
2376
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
4,747✔
2377
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
4,747✔
2378
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
4,747✔
2379
  pRspObj->vgId = pollRspWrapper->vgId;
4,747✔
2380
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
4,747✔
2381
  return pRspObj;
4,747✔
2382
}
2383

2384
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
17✔
2385
  int32_t code = 0;
17✔
2386
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
17✔
2387

2388
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
17!
2389
    code = askEp(tmq, NULL, false, true);
×
2390
    if (code != 0) {
×
2391
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep wher vnode transform, code:%s", tmq->consumerId, tstrerror(code));
×
2392
    }
2393
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
17!
2394
    code = syncAskEp(tmq);
17✔
2395
    if (code != 0) {
17!
2396
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
×
2397
    }
2398
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
×
2399
    code = 0;
×
2400
  }
2401
  tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
17!
2402
          tstrerror(pRspWrapper->code));
2403
  taosWLockLatch(&tmq->lock);
17✔
2404
  SMqClientVg* pVg = NULL;
17✔
2405
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
17✔
2406
  if (pVg) {
17✔
2407
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
9✔
2408
  }
2409
  taosWUnLockLatch(&tmq->lock);
17✔
2410

2411
  return code;
17✔
2412
}
2413

2414
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
7,016✔
2415
  int32_t code = 0;
7,016✔
2416
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
7,016!
2417
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
7,016!
2418
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
7,016✔
2419
    pRspWrapper->pollRsp.data = NULL;
7,016✔
2420
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
×
2421
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
×
2422
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
×
2423
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
×
2424
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
×
2425
    pRspWrapper->pollRsp.data = NULL;
×
2426
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
×
2427
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
×
2428
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
2429
    PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
×
2430
    pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
×
2431
    pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
×
2432
    pRspWrapper->pollRsp.data = NULL;
×
2433
  } else {
2434
    tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
×
2435
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2436
    goto END;
×
2437
  }
2438
  END:
7,016✔
2439
  return code;
7,016✔
2440
}
2441

2442
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
7,530✔
2443
  int32_t    code = 0;
7,530✔
2444
  SMqRspObj* pRspObj = NULL;
7,530✔
2445

2446
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
7,530✔
2447
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
514!
2448
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
514✔
2449
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
514✔
2450
    terrno = code;
514✔
2451
    return pRspObj;
514✔
2452
  }
2453

2454
  code = processWrapperData(pRspWrapper);
7,016✔
2455
  if (code != 0) {
7,016!
2456
    goto END;
×
2457
  }
2458
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
7,016✔
2459
  taosWLockLatch(&tmq->lock);
7,016✔
2460
  SMqClientVg* pVg = NULL;
7,016✔
2461
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
7,016✔
2462
  if(pVg == NULL) {
7,016!
2463
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
×
2464
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2465
    code = TSDB_CODE_TMQ_INVALID_VGID;
×
2466
    goto END;
×
2467
  }
2468
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
7,016✔
2469
  if (pollRspWrapper->pEpset != NULL) {
7,016✔
2470
    pVg->epSet = *pollRspWrapper->pEpset;
16✔
2471
  }
2472

2473
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
7,016!
2474
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
×
2475
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
7,016!
2476
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
7,016✔
2477
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
7,016✔
2478

2479
    char buf[TSDB_OFFSET_LEN] = {0};
7,016✔
2480
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
7,016✔
2481
    if (pollRspWrapper->dataRsp.blockNum == 0) {
7,016✔
2482
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
2,269!
2483
                   ", total:%" PRId64 ", QID:0x%" PRIx64,
2484
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2485
    } else {
2486
      pRspObj = buildRsp(pollRspWrapper);
4,747✔
2487
      if (pRspObj == NULL) {
4,747!
2488
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2489
        goto END;
×
2490
      }
2491
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
9,494!
2492
                         (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
4,747!
2493
      int64_t numOfRows = 0;
4,747✔
2494
      if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
4,747!
2495
        tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
4,747✔
2496
        tmq->totalRows += numOfRows;
4,747✔
2497
      }
2498
      if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
4,747!
2499
        pVg->blockReceiveTs = taosGetTimestampMs();
×
2500
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
×
2501
        if (pVg->blockSleepForReplay > 0) {
×
2502
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
×
2503
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2504
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2505
          }
2506
        }
2507
      }
2508
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
4,747!
2509
                   ", vg total:%" PRId64 ", total:%" PRId64 ", QID:0x%" PRIx64,
2510
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2511
               pollRspWrapper->reqId);
2512
    }
2513
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
×
2514
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
×
2515
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2516

2517

2518
    pRspObj = buildRsp(pollRspWrapper);
×
2519
    if (pRspObj == NULL) {
×
2520
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2521
      goto END;
×
2522
    }
2523
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
×
2524
  }
2525

2526
END:
×
2527
  terrno = code;
7,016✔
2528
  taosWUnLockLatch(&tmq->lock);
7,016✔
2529
  return pRspObj;
7,016✔
2530
}
2531

2532
static void* tmqHandleAllRsp(tmq_t* tmq) {
13,028✔
2533
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
13,028!
2534

2535
  int32_t code = 0;
13,028✔
2536
  void* returnVal = NULL;
13,028✔
2537
  while (1) {
2,800✔
2538
    SMqRspWrapper* pRspWrapper = NULL;
15,828✔
2539
    taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
15,828✔
2540
    if (pRspWrapper == NULL) {break;}
20,575✔
2541

2542
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
7,547!
2543
    if (pRspWrapper->code != 0) {
7,547✔
2544
      code = processMqRspError(tmq, pRspWrapper);
17✔
2545
    }else{
2546
      returnVal = processMqRsp(tmq, pRspWrapper);
7,530✔
2547
      code = terrno;
7,530✔
2548
    }
2549

2550
    tmqFreeRspWrapper(pRspWrapper);
7,547✔
2551
    taosFreeQitem(pRspWrapper);
7,547✔
2552
    if(returnVal != NULL || code != 0){
7,547!
2553
      break;
2554
    }
2555
  }
2556

2557
END:
13,028✔
2558
  terrno = code;
13,028✔
2559
  return returnVal;
13,028✔
2560
}
2561

2562
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
5,020✔
2563
  int32_t lino = 0;
5,020✔
2564
  int32_t code = 0;
5,020✔
2565
  TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
5,020!
2566

2567
  void*   rspObj = NULL;
5,020✔
2568
  int64_t startTime = taosGetTimestampMs();
5,020✔
2569

2570
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
5,020!
2571
  TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
5,020✔
2572

2573
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
5,018✔
2574

2575
  while (1) {
2576
    code = tmqHandleAllDelayedTask(tmq);
13,028✔
2577
    TSDB_CHECK_CODE(code, lino, END);
13,028!
2578

2579
    rspObj = tmqHandleAllRsp(tmq);
13,028✔
2580
    if (rspObj) {
13,028✔
2581
      tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
4,747!
2582
      return (TAOS_RES*)rspObj;
4,747✔
2583
    }
2584
    code = terrno;
8,281✔
2585
    TSDB_CHECK_CODE(code, lino, END);
8,281!
2586

2587
    code = tmqPollImpl(tmq);
8,281✔
2588
    TSDB_CHECK_CODE(code, lino, END);
8,281!
2589

2590
    if (timeout >= 0) {
8,281!
2591
      int64_t currentTime = taosGetTimestampMs();
8,281✔
2592
      int64_t elapsedTime = currentTime - startTime;
8,281✔
2593
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
8,281✔
2594
      TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
8,281!
2595
    } else {
2596
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2597
    }
2598
  }
2599

2600
END:
273✔
2601
  terrno = code;
273✔
2602
  if (tmq != NULL && terrno != 0) {
273!
2603
    tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
2!
2604
  }
2605
  return NULL;
273✔
2606
}
2607

2608
static void displayConsumeStatistics(tmq_t* pTmq) {
94✔
2609
  if (pTmq == NULL) return;
94!
2610
  taosRLockLatch(&pTmq->lock);
94✔
2611
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
94✔
2612
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
94!
2613
          pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2614

2615
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
94!
2616
  for (int32_t i = 0; i < numOfTopics; ++i) {
146✔
2617
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
52✔
2618
    if (pTopics == NULL) continue;
52!
2619
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
52!
2620
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
52✔
2621
    for (int32_t j = 0; j < numOfVgs; ++j) {
285✔
2622
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
233✔
2623
      if (pVg == NULL) continue;
233!
2624
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
233!
2625
    }
2626
  }
2627
  taosRUnLockLatch(&pTmq->lock);
94✔
2628
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
94!
2629
}
2630

2631
int32_t tmq_unsubscribe(tmq_t* tmq) {
94✔
2632
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
94!
2633
  int32_t code = 0;
94✔
2634
  int8_t status = atomic_load_8(&tmq->status);
94✔
2635
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
94!
2636

2637
  displayConsumeStatistics(tmq);
94✔
2638
  if (status != TMQ_CONSUMER_STATUS__READY && status != TMQ_CONSUMER_STATUS__LOST) {
94!
2639
    tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
4!
2640
    goto END;
4✔
2641
  }
2642
  if (tmq->autoCommit) {
90✔
2643
    code = tmq_commit_sync(tmq, NULL);
82✔
2644
    if (code != 0) {
82!
2645
      goto END;
×
2646
    }
2647
  }
2648
  tmqSendHbReq((void*)(tmq->refId), NULL);
90✔
2649

2650
  tmq_list_t* lst = tmq_list_new();
90✔
2651
  if (lst == NULL) {
90!
2652
    code = terrno;
×
2653
    goto END;
×
2654
  }
2655
  code = tmq_subscribe(tmq, lst);
90✔
2656
  tmq_list_destroy(lst);
90✔
2657
  tmqClearUnhandleMsg(tmq);
90✔
2658
  if(code != 0){
90!
2659
    goto END;
×
2660
  }
2661

2662
  END:
90✔
2663
  return code;
94✔
2664
}
2665

2666
int32_t tmq_consumer_close(tmq_t* tmq) {
49✔
2667
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
49!
2668
  int32_t code = 0;
49✔
2669
  (void) taosThreadMutexLock(&tmqMgmt.lock);
49✔
2670
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
49!
2671
    goto end;
×
2672
  }
2673
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
49!
2674
  code = tmq_unsubscribe(tmq);
49✔
2675
  if (code == 0) {
49!
2676
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
49✔
2677
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
49✔
2678
    if (code != 0){
49!
2679
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2680
    }
2681
  }
2682

2683
end:
49✔
2684
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
49✔
2685
  return code;
49✔
2686
}
2687

2688
const char* tmq_err2str(int32_t err) {
5✔
2689
  if (err == 0) {
5✔
2690
    return "success";
2✔
2691
  } else if (err == -1) {
3!
2692
    return "fail";
×
2693
  } else {
2694
    if (*(taosGetErrMsg()) == 0) {
3!
2695
      return tstrerror(err);
3✔
2696
    } else {
2697
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
×
2698
      return (const char*)taosGetErrMsgReturn();
×
2699
    }
2700
  }
2701
}
2702

2703
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
4,489✔
2704
  if (res == NULL) {
4,489!
2705
    return TMQ_RES_INVALID;
×
2706
  }
2707
  if (TD_RES_TMQ(res)) {
4,489!
2708
    return TMQ_RES_DATA;
4,489✔
2709
  } else if (TD_RES_TMQ_META(res)) {
×
2710
    return TMQ_RES_TABLE_META;
×
2711
  } else if (TD_RES_TMQ_METADATA(res)) {
×
2712
    return TMQ_RES_METADATA;
×
2713
  } else if (TD_RES_TMQ_BATCH_META(res)) {
×
2714
    return TMQ_RES_TABLE_META;
×
2715
  } else if (TD_RES_TMQ_RAW(res)) {
×
2716
    return TMQ_RES_RAWDATA;
×
2717
  } else {
2718
    return TMQ_RES_INVALID;
×
2719
  }
2720
}
2721

2722
const char* tmq_get_topic_name(TAOS_RES* res) {
4,187✔
2723
  if (res == NULL) {
4,187!
2724
    return NULL;
×
2725
  }
2726
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
4,187!
2727
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
2728
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
4,187✔
2729
    if (tmp == NULL) {
4,187!
2730
      return NULL;
×
2731
    }
2732
    return tmp + 1;
4,187✔
2733
  } else {
2734
    return NULL;
×
2735
  }
2736
}
2737

2738
const char* tmq_get_db_name(TAOS_RES* res) {
4,131✔
2739
  if (res == NULL) {
4,131!
2740
    return NULL;
×
2741
  }
2742

2743
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
4,131!
2744
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
×
2745
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
4,131✔
2746
    if (tmp == NULL) {
4,131!
2747
      return NULL;
×
2748
    }
2749
    return tmp + 1;
4,131✔
2750
  } else {
2751
    return NULL;
×
2752
  }
2753
}
2754

2755
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
4,187✔
2756
  if (res == NULL) {
4,187!
2757
    return TSDB_CODE_INVALID_PARA;
×
2758
  }
2759
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
4,187!
2760
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
×
2761
    return ((SMqRspObj*)res)->vgId;
4,187✔
2762
  } else {
2763
    return TSDB_CODE_INVALID_PARA;
×
2764
  }
2765
}
2766

2767
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
511✔
2768
  if (res == NULL) {
511!
2769
    return TSDB_CODE_INVALID_PARA;
×
2770
  }
2771
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
511!
2772
    SMqRspObj* pRspObj = (SMqRspObj*)res;
511✔
2773
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
511✔
2774
    if (pOffset->type == TMQ_OFFSET__LOG) {
511!
2775
      return pOffset->version;
511✔
2776
    } else {
2777
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2778
    }
2779
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
2780
    SMqRspObj* pRspObj = (SMqRspObj*)res;
×
2781
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
×
2782
      return pRspObj->rspOffset.version;
×
2783
    }
2784
  } else {
2785
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2786
  }
2787

2788
  // data from tsdb, no valid offset info
2789
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2790
}
2791

2792
const char* tmq_get_table_name(TAOS_RES* res) {
293,856✔
2793
  if (res == NULL) {
293,856!
2794
    return NULL;
×
2795
  }
2796
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
293,856!
2797
    SMqRspObj* pRspObj = (SMqRspObj*)res;
293,856✔
2798
    SMqDataRsp* data = &pRspObj->dataRsp;
293,856✔
2799
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
293,856!
2800
        pRspObj->resIter >= data->blockNum) {
×
2801
      return NULL;
293,856✔
2802
    }
2803
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
×
2804
  }
2805
  return NULL;
×
2806
}
2807

2808
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
×
2809
  if (tmq == NULL) {
×
2810
    tqErrorC("invalid tmq handle, null");
×
2811
    if (cb != NULL) {
×
2812
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2813
    }
2814
    return;
×
2815
  }
2816
  if (pRes == NULL) {  // here needs to commit all offsets.
×
2817
    asyncCommitAllOffsets(tmq, cb, param);
×
2818
  } else {  // only commit one offset
2819
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2820
  }
2821
}
2822

2823
static void commitCallBackFn(tmq_t* tmq, int32_t code, void* param) {
191✔
2824
  if (param == NULL) {
191!
2825
    tqErrorC("invalid param in commit cb");
×
2826
    return;
×
2827
  }
2828
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
191✔
2829
  pInfo->code = code;
191✔
2830
  if (tsem2_post(&pInfo->sem) != 0){
191!
2831
    tqErrorC("failed to post rsp sem in commit cb");
×
2832
  }
2833
}
2834

2835
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
189✔
2836
  if (tmq == NULL) {
189!
2837
    tqErrorC("invalid tmq handle, null");
×
2838
    return TSDB_CODE_INVALID_PARA;
×
2839
  }
2840

2841
  int32_t code = 0;
189✔
2842

2843
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
189!
2844
  if (pInfo == NULL) {
189!
2845
    tqErrorC("failed to allocate memory for sync commit");
×
2846
    return terrno;
×
2847
  }
2848

2849
  code = tsem2_init(&pInfo->sem, 0, 0);
189✔
2850
  if (code != 0) {
189!
2851
    tqErrorC("failed to init sem for sync commit");
×
2852
    taosMemoryFree(pInfo);
×
2853
    return code;
×
2854
  }
2855
  pInfo->code = 0;
189✔
2856

2857
  if (pRes == NULL) {
189✔
2858
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
136✔
2859
  } else {
2860
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
53✔
2861
  }
2862

2863
  if (tsem2_wait(&pInfo->sem) != 0){
189!
2864
    tqErrorC("failed to wait sem for sync commit");
×
2865
  }
2866
  code = pInfo->code;
189✔
2867

2868
  if(tsem2_destroy(&pInfo->sem) != 0) {
189!
2869
    tqErrorC("failed to destroy sem for sync commit");
×
2870
  }
2871
  taosMemoryFree(pInfo);
189!
2872

2873
  tqInfoC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
189!
2874
  return code;
189✔
2875
}
2876

2877
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2878
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
12✔
2879
  if (offset == NULL) {
12!
2880
    tqErrorC("invalid offset, null");
×
2881
    return TSDB_CODE_INVALID_PARA;
×
2882
  }
2883
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
12!
2884
    tqErrorC("Assignment or poll interface need to be called first");
×
2885
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
2886
  }
2887

2888
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
12!
2889
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
×
2890
             offset->walVerBegin, offset->walVerEnd);
2891
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
×
2892
  }
2893

2894
  return 0;
12✔
2895
}
2896

2897
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
2✔
2898
  if (tmq == NULL || pTopicName == NULL) {
2!
2899
    tqErrorC("invalid tmq handle, null");
×
2900
    return TSDB_CODE_INVALID_PARA;
×
2901
  }
2902

2903
  int32_t accId = tmq->pTscObj->acctId;
2✔
2904
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
2✔
2905
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
2✔
2906

2907
  taosWLockLatch(&tmq->lock);
2✔
2908
  SMqClientVg* pVg = NULL;
2✔
2909
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
2✔
2910
  if (code != 0) {
2!
2911
    taosWUnLockLatch(&tmq->lock);
×
2912
    return code;
×
2913
  }
2914

2915
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
2✔
2916
  code = checkWalRange(pOffsetInfo, offset);
2✔
2917
  if (code != 0) {
2!
2918
    taosWUnLockLatch(&tmq->lock);
×
2919
    return code;
×
2920
  }
2921
  taosWUnLockLatch(&tmq->lock);
2✔
2922

2923
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
2✔
2924

2925
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
2!
2926
  if (pInfo == NULL) {
2!
2927
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2928
    return terrno;
×
2929
  }
2930

2931
  code = tsem2_init(&pInfo->sem, 0, 0);
2✔
2932
  if (code != 0) {
2!
2933
    taosMemoryFree(pInfo);
×
2934
    return code;
×
2935
  }
2936
  pInfo->code = 0;
2✔
2937

2938
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
2✔
2939
  if (code == 0) {
2!
2940
    if (tsem2_wait(&pInfo->sem) != 0){
2!
2941
      tqErrorC("failed to wait sem for sync commit offset");
×
2942
    }
2943
    code = pInfo->code;
2✔
2944
  }
2945

2946
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
2!
2947
  if(tsem2_destroy(&pInfo->sem) != 0) {
2!
2948
    tqErrorC("failed to destroy sem for sync commit offset");
×
2949
  }
2950
  taosMemoryFree(pInfo);
2!
2951

2952
  tqInfoC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
2!
2953
          offset, tstrerror(code));
2954

2955
  return code;
2✔
2956
}
2957

2958
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
×
2959
                             void* param) {
2960
  int32_t code = 0;
×
2961
  if (tmq == NULL || pTopicName == NULL) {
×
2962
    tqErrorC("invalid tmq handle, null");
×
2963
    code = TSDB_CODE_INVALID_PARA;
×
2964
    goto end;
×
2965
  }
2966

2967
  int32_t accId = tmq->pTscObj->acctId;
×
2968
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
×
2969
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
×
2970

2971
  taosWLockLatch(&tmq->lock);
×
2972
  SMqClientVg* pVg = NULL;
×
2973
  code = getClientVg(tmq, tname, vgId, &pVg);
×
2974
  if (code != 0) {
×
2975
    taosWUnLockLatch(&tmq->lock);
×
2976
    goto end;
×
2977
  }
2978

2979
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
2980
  code = checkWalRange(pOffsetInfo, offset);
×
2981
  if (code != 0) {
×
2982
    taosWUnLockLatch(&tmq->lock);
×
2983
    goto end;
×
2984
  }
2985
  taosWUnLockLatch(&tmq->lock);
×
2986

2987
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
×
2988

2989
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
×
2990

2991
  tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
×
2992
          offset, tstrerror(code));
2993

2994
  end:
×
2995
  if (code != 0 && cb != NULL) {
×
2996
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
2997
    cb(tmq, code, param);
×
2998
  }
2999
}
×
3000

3001

3002
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
46,699✔
3003
  if (res == NULL || pResInfo == NULL) {
46,699!
3004
    return TSDB_CODE_INVALID_PARA;
×
3005
  }
3006
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
46,699✔
3007
  SMqDataRsp* data = &pRspObj->dataRsp;
46,699✔
3008

3009
  pRspObj->resIter++;
46,699✔
3010
  if (pRspObj->resIter < data->blockNum) {
46,699✔
3011
    if (data->withSchema) {
41,960!
3012
      doFreeReqResultInfo(&pRspObj->resInfo);
41,960✔
3013
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
41,960✔
3014
      if (pSW) {
41,960!
3015
        TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
41,960!
3016
      }
3017
    }
3018

3019
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
41,960✔
3020
    void*   rawData = NULL;
41,960✔
3021
    int64_t rows = 0;
41,960✔
3022
    int32_t precision = 0;
41,960✔
3023
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
41,960✔
3024

3025
    pRspObj->resInfo.pData = rawData;
41,960✔
3026
    pRspObj->resInfo.numOfRows = rows;
41,960✔
3027
    pRspObj->resInfo.current = 0;
41,960✔
3028
    pRspObj->resInfo.precision = precision;
41,960✔
3029

3030
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
41,960✔
3031
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4, false);
41,960✔
3032
    if (code != 0) {
41,960!
3033
      return code;
×
3034
    }
3035
    *pResInfo = &pRspObj->resInfo;
41,960✔
3036
    return code;
41,960✔
3037
  }
3038

3039
  return TSDB_CODE_TSC_INTERNAL_ERROR;
4,739✔
3040
}
3041

3042
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
12✔
3043
  if (param == NULL || pMsg == NULL) {
12!
3044
    return code;
×
3045
  }
3046
  SMqVgWalInfoParam* pParam = param;
12✔
3047
  SMqVgCommon*       pCommon = pParam->pCommon;
12✔
3048

3049
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
12✔
3050
  if (code != TSDB_CODE_SUCCESS) {
12!
3051
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3052
             pParam->vgId, pCommon->pTopicName);
3053

3054
  } else {
3055
    SMqDataRsp rsp = {0};
12✔
3056
    SDecoder   decoder = {0};
12✔
3057
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
12✔
3058
    code = tDecodeMqDataRsp(&decoder, &rsp);
12✔
3059
    tDecoderClear(&decoder);
12✔
3060
    if (code != 0) {
12!
3061
      goto END;
×
3062
    }
3063

3064
    SMqRspHead*          pHead = pMsg->pData;
12✔
3065
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
12✔
3066
        .end = pHead->walever + 1,
12✔
3067
        .currentOffset = rsp.rspOffset.version,
12✔
3068
        .vgId = pParam->vgId};
12✔
3069

3070
    (void)taosThreadMutexLock(&pCommon->mutex);
12✔
3071
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
24!
3072
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3073
               pParam->vgId, pCommon->pTopicName);
3074
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
3075
    }
3076
    (void)taosThreadMutexUnlock(&pCommon->mutex);
12✔
3077
  }
3078

3079
  END:
12✔
3080
  pCommon->code = code;
12✔
3081
  if (total == pParam->totalReq) {
12✔
3082
    if (tsem2_post(&pCommon->rsp) != 0) {
6!
3083
      tqErrorC("failed to post semaphore in get wal cb");
×
3084
    }
3085
  }
3086

3087
  if (pMsg) {
12!
3088
    taosMemoryFree(pMsg->pData);
12!
3089
    taosMemoryFree(pMsg->pEpSet);
12!
3090
  }
3091

3092
  return code;
12✔
3093
}
3094

3095
static void destroyCommonInfo(SMqVgCommon* pCommon) {
12✔
3096
  if (pCommon == NULL) {
12✔
3097
    return;
6✔
3098
  }
3099
  taosArrayDestroy(pCommon->pList);
6✔
3100
  if(tsem2_destroy(&pCommon->rsp) != 0) {
6!
3101
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
3102
  }
3103
  (void)taosThreadMutexDestroy(&pCommon->mutex);
6✔
3104
  taosMemoryFree(pCommon->pTopicName);
6!
3105
  taosMemoryFree(pCommon);
6!
3106
}
3107

3108
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
29✔
3109
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
29!
3110
    return true;
×
3111
  }
3112
  return false;
29✔
3113
}
3114

3115
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
1✔
3116
  if (param == NULL) {
1!
3117
    return code;
×
3118
  }
3119
  SMqCommittedParam* pParam = param;
1✔
3120

3121
  if (code != 0) {
1!
3122
    goto end;
1✔
3123
  }
3124
  if (pMsg) {
×
3125
    SDecoder decoder = {0};
×
3126
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
×
3127
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
×
3128
    if (err < 0) {
×
3129
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3130
      code = err;
×
3131
      goto end;
×
3132
    }
3133
    tDecoderClear(&decoder);
×
3134
  }
3135

3136
  end:
×
3137
  if (pMsg) {
1!
3138
    taosMemoryFree(pMsg->pData);
1!
3139
    taosMemoryFree(pMsg->pEpSet);
1!
3140
  }
3141
  pParam->code = code;
1✔
3142
  if (tsem2_post(&pParam->sem) != 0){
1!
3143
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
3144
  }
3145
  return code;
1✔
3146
}
3147

3148
int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) {
1✔
3149
  if (tmq == NULL || tname == NULL || epSet == NULL) {
1!
3150
    return TSDB_CODE_INVALID_PARA;
×
3151
  }
3152
  int32_t     code = 0;
1✔
3153
  SMqVgOffset pOffset = {0};
1✔
3154

3155
  pOffset.consumerId = tmq->consumerId;
1✔
3156
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
1✔
3157

3158
  int32_t len = 0;
1✔
3159
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
1!
3160
  if (code < 0) {
1!
3161
    return TSDB_CODE_INVALID_PARA;
×
3162
  }
3163

3164
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
1!
3165
  if (buf == NULL) {
1!
3166
    return terrno;
×
3167
  }
3168

3169
  ((SMsgHead*)buf)->vgId = htonl(vgId);
1✔
3170

3171
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
3172

3173
  SEncoder encoder = {0};
1✔
3174
  tEncoderInit(&encoder, abuf, len);
1✔
3175
  code = tEncodeMqVgOffset(&encoder, &pOffset);
1✔
3176
  if (code < 0) {
1!
3177
    taosMemoryFree(buf);
×
3178
    tEncoderClear(&encoder);
×
3179
    return code;
×
3180
  }
3181
  tEncoderClear(&encoder);
1✔
3182

3183
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1!
3184
  if (sendInfo == NULL) {
1!
3185
    taosMemoryFree(buf);
×
3186
    return terrno;
×
3187
  }
3188

3189
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
1!
3190
  if (pParam == NULL) {
1!
3191
    taosMemoryFree(buf);
×
3192
    taosMemoryFree(sendInfo);
×
3193
    return terrno;
×
3194
  }
3195
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
1!
3196
    taosMemoryFree(buf);
×
3197
    taosMemoryFree(sendInfo);
×
3198
    taosMemoryFree(pParam);
×
3199
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3200
  }
3201

3202
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
1✔
3203
  sendInfo->requestId = generateRequestId();
1✔
3204
  sendInfo->requestObjRefId = 0;
1✔
3205
  sendInfo->param = pParam;
1✔
3206
  sendInfo->fp = tmCommittedCb;
1✔
3207
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
1✔
3208

3209
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
1✔
3210
  if (code != 0) {
1!
3211
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3212
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3213
    }
3214
    taosMemoryFree(pParam);
×
3215
    return code;
×
3216
  }
3217

3218
  if (tsem2_wait(&pParam->sem) != 0){
1!
3219
    tqErrorC("failed to wait semaphore in get committed from server");
×
3220
  }
3221
  code = pParam->code;
1✔
3222
  if (code == TSDB_CODE_SUCCESS) {
1!
3223
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
×
3224
      code = pParam->vgOffset.offset.val.version;
×
3225
    } else {
3226
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3227
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3228
    }
3229
  }
3230
  if(tsem2_destroy(&pParam->sem) != 0) {
1!
3231
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3232
  }
3233
  taosMemoryFree(pParam);
1!
3234

3235
  return code;
1✔
3236
}
3237

3238
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
×
3239
  if (tmq == NULL || pTopicName == NULL) {
×
3240
    tqErrorC("invalid tmq handle, null");
×
3241
    return TSDB_CODE_INVALID_PARA;
×
3242
  }
3243

3244
  int32_t accId = tmq->pTscObj->acctId;
×
3245
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
×
3246
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
×
3247

3248
  taosWLockLatch(&tmq->lock);
×
3249

3250
  SMqClientVg* pVg = NULL;
×
3251
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
×
3252
  if (code != 0) {
×
3253
    taosWUnLockLatch(&tmq->lock);
×
3254
    return code;
×
3255
  }
3256

3257
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
3258
  int32_t        type = pOffsetInfo->endOffset.type;
×
3259
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
×
3260
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3261
    taosWUnLockLatch(&tmq->lock);
×
3262
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3263
  }
3264

3265
  code = checkWalRange(pOffsetInfo, -1);
×
3266
  if (code != 0) {
×
3267
    taosWUnLockLatch(&tmq->lock);
×
3268
    return code;
×
3269
  }
3270
  SEpSet  epSet = pVg->epSet;
×
3271
  int64_t begin = pVg->offsetInfo.walVerBegin;
×
3272
  int64_t end = pVg->offsetInfo.walVerEnd;
×
3273
  taosWUnLockLatch(&tmq->lock);
×
3274

3275
  int64_t position = 0;
×
3276
  if (type == TMQ_OFFSET__LOG) {
×
3277
    position = pOffsetInfo->endOffset.version;
×
3278
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
×
3279
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
×
3280
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
×
3281
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3282
        position = begin;
×
3283
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3284
        position = end;
×
3285
      }
3286
    } else {
3287
      position = code;
×
3288
    }
3289
  } else {
3290
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3291
  }
3292

3293
  tqInfoC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
×
3294
  return position;
×
3295
}
3296

3297
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
1✔
3298
  if (tmq == NULL || pTopicName == NULL) {
1!
3299
    tqErrorC("invalid tmq handle, null");
×
3300
    return TSDB_CODE_INVALID_PARA;
×
3301
  }
3302

3303
  int32_t accId = tmq->pTscObj->acctId;
1✔
3304
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
1✔
3305
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
1✔
3306

3307
  taosWLockLatch(&tmq->lock);
1✔
3308

3309
  SMqClientVg* pVg = NULL;
1✔
3310
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
1✔
3311
  if (code != 0) {
1!
3312
    taosWUnLockLatch(&tmq->lock);
×
3313
    return code;
×
3314
  }
3315

3316
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
1✔
3317
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
1!
3318
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3319
             pOffsetInfo->endOffset.type);
3320
    taosWUnLockLatch(&tmq->lock);
×
3321
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3322
  }
3323

3324
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
1!
3325
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3326
             pOffsetInfo->committedOffset.type);
3327
    taosWUnLockLatch(&tmq->lock);
×
3328
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3329
  }
3330

3331
  int64_t committed = 0;
1✔
3332
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
1!
3333
    committed = pOffsetInfo->committedOffset.version;
×
3334
    taosWUnLockLatch(&tmq->lock);
×
3335
    goto end;
×
3336
  }
3337
  SEpSet epSet = pVg->epSet;
1✔
3338
  taosWUnLockLatch(&tmq->lock);
1✔
3339

3340
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3341

3342
  end:
1✔
3343
  tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
1!
3344
  return committed;
1✔
3345
}
3346

3347
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
12✔
3348
                                 int32_t* numOfAssignment) {
3349
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
12!
3350
    tqErrorC("invalid tmq handle, null");
×
3351
    return TSDB_CODE_INVALID_PARA;
×
3352
  }
3353
  *numOfAssignment = 0;
12✔
3354
  *assignment = NULL;
12✔
3355
  SMqVgCommon* pCommon = NULL;
12✔
3356

3357
  int32_t accId = tmq->pTscObj->acctId;
12✔
3358
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
12✔
3359
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
12✔
3360

3361
  taosWLockLatch(&tmq->lock);
12✔
3362

3363
  SMqClientTopic* pTopic = NULL;
12✔
3364
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
12✔
3365
  if (code != 0) {
12✔
3366
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
3!
3367
    goto end;
3✔
3368
  }
3369

3370
  // in case of snapshot is opened, no valid offset will return
3371
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
9✔
3372
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
26✔
3373
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
17✔
3374
    if (pClientVg == NULL) {
17!
3375
      continue;
×
3376
    }
3377
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
17✔
3378
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
17!
3379
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3380
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3381
      goto end;
×
3382
    }
3383
  }
3384

3385
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
9!
3386
  if (*assignment == NULL) {
9!
3387
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3388
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3389
    code = terrno;
×
3390
    goto end;
×
3391
  }
3392

3393
  bool needFetch = false;
9✔
3394

3395
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
14✔
3396
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
11✔
3397
    if (pClientVg == NULL) {
11!
3398
      continue;
×
3399
    }
3400
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
11✔
3401
      needFetch = true;
6✔
3402
      break;
6✔
3403
    }
3404

3405
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
5✔
3406
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
5✔
3407
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
5✔
3408
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
5✔
3409
    pAssignment->vgId = pClientVg->vgId;
5✔
3410
    tqInfoC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
5!
3411
            pAssignment->currentOffset);
3412
  }
3413

3414
  if (needFetch) {
9✔
3415
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
6!
3416
    if (pCommon == NULL) {
6!
3417
      code = terrno;
×
3418
      goto end;
×
3419
    }
3420

3421
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
6✔
3422
    if (pCommon->pList == NULL) {
6!
3423
      code = terrno;
×
3424
      goto end;
×
3425
    }
3426

3427
    code = tsem2_init(&pCommon->rsp, 0, 0);
6✔
3428
    if (code != 0) {
6!
3429
      goto end;
×
3430
    }
3431
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
6✔
3432
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
6!
3433
    if (pCommon->pTopicName == NULL) {
6!
3434
      code = terrno;
×
3435
      goto end;
×
3436
    }
3437
    pCommon->consumerId = tmq->consumerId;
6✔
3438

3439
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
18✔
3440
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
12✔
3441
      if (pClientVg == NULL) {
12!
3442
        continue;
×
3443
      }
3444
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
12!
3445
      if (pParam == NULL) {
12!
3446
        code = terrno;
×
3447
        goto end;
×
3448
      }
3449

3450
      pParam->epoch = tmq->epoch;
12✔
3451
      pParam->vgId = pClientVg->vgId;
12✔
3452
      pParam->totalReq = *numOfAssignment;
12✔
3453
      pParam->pCommon = pCommon;
12✔
3454

3455
      SMqPollReq req = {0};
12✔
3456
      tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg);
12✔
3457
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
12✔
3458

3459
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
12✔
3460
      if (msgSize < 0) {
12!
3461
        taosMemoryFree(pParam);
×
3462
        code = msgSize;
×
3463
        goto end;
×
3464
      }
3465

3466
      char* msg = taosMemoryCalloc(1, msgSize);
12!
3467
      if (NULL == msg) {
12!
3468
        taosMemoryFree(pParam);
×
3469
        code = terrno;
×
3470
        goto end;
×
3471
      }
3472

3473
      msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
12✔
3474
      if (msgSize < 0) {
12!
3475
        taosMemoryFree(msg);
×
3476
        taosMemoryFree(pParam);
×
3477
        code = msgSize;
×
3478
        goto end;
×
3479
      }
3480

3481
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
12!
3482
      if (sendInfo == NULL) {
12!
3483
        taosMemoryFree(pParam);
×
3484
        taosMemoryFree(msg);
×
3485
        code = terrno;
×
3486
        goto end;
×
3487
      }
3488

3489
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
12✔
3490
      sendInfo->requestId = req.reqId;
12✔
3491
      sendInfo->requestObjRefId = 0;
12✔
3492
      sendInfo->param = pParam;
12✔
3493
      sendInfo->paramFreeFp = taosAutoMemoryFree;
12✔
3494
      sendInfo->fp = tmqGetWalInfoCb;
12✔
3495
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
12✔
3496

3497
      // int64_t transporterId = 0;
3498
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
12✔
3499
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
12✔
3500

3501
      tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, QID:0x%" PRIx64, tmq->consumerId,
12!
3502
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3503
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
12✔
3504
      if (code != 0) {
12!
3505
        goto end;
×
3506
      }
3507
    }
3508

3509
    if (tsem2_wait(&pCommon->rsp) != 0){
6!
3510
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3511
    }
3512
    code = pCommon->code;
6✔
3513

3514
    if (code != TSDB_CODE_SUCCESS) {
6!
3515
      goto end;
×
3516
    }
3517
    int32_t num = taosArrayGetSize(pCommon->pList);
6✔
3518
    for (int32_t i = 0; i < num; ++i) {
18✔
3519
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
12✔
3520
    }
3521
    *numOfAssignment = num;
6✔
3522

3523
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
18✔
3524
      tmq_topic_assignment* p = &(*assignment)[j];
12✔
3525

3526
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
36✔
3527
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
24✔
3528
        if (pClientVg == NULL) {
24!
3529
          continue;
×
3530
        }
3531
        if (pClientVg->vgId != p->vgId) {
24✔
3532
          continue;
12✔
3533
        }
3534

3535
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
12✔
3536
        tqInfoC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
12!
3537
                p->vgId, p->currentOffset);
3538

3539
        pOffsetInfo->walVerBegin = p->begin;
12✔
3540
        pOffsetInfo->walVerEnd = p->end;
12✔
3541
      }
3542
    }
3543
  }
3544

3545
  end:
9✔
3546
  if (code != TSDB_CODE_SUCCESS) {
12✔
3547
    taosMemoryFree(*assignment);
3!
3548
    *assignment = NULL;
3✔
3549
    *numOfAssignment = 0;
3✔
3550
  }
3551
  destroyCommonInfo(pCommon);
12✔
3552
  taosWUnLockLatch(&tmq->lock);
12✔
3553
  return code;
12✔
3554
}
3555

3556
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
7✔
3557
  if (pAssignment == NULL) {
7✔
3558
    return;
2✔
3559
  }
3560

3561
  taosMemoryFree(pAssignment);
5!
3562
}
3563

3564
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
10✔
3565
  if (pMsg) {
10!
3566
    taosMemoryFree(pMsg->pData);
10!
3567
    taosMemoryFree(pMsg->pEpSet);
10!
3568
  }
3569
  if (param == NULL) {
10!
3570
    return code;
×
3571
  }
3572
  SMqSeekParam* pParam = param;
10✔
3573
  pParam->code = code;
10✔
3574
  if (tsem2_post(&pParam->sem) != 0){
10!
3575
    tqErrorC("failed to post sem in tmqSeekCb");
×
3576
  }
3577
  return 0;
10✔
3578
}
3579

3580
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3581
// there is no data to poll
3582
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
10✔
3583
  if (tmq == NULL || pTopicName == NULL) {
10!
3584
    tqErrorC("invalid tmq handle, null");
×
3585
    return TSDB_CODE_INVALID_PARA;
×
3586
  }
3587

3588
  int32_t accId = tmq->pTscObj->acctId;
10✔
3589
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
10✔
3590
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
10✔
3591

3592
  taosWLockLatch(&tmq->lock);
10✔
3593

3594
  SMqClientVg* pVg = NULL;
10✔
3595
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
10✔
3596
  if (code != 0) {
10!
3597
    taosWUnLockLatch(&tmq->lock);
×
3598
    return code;
×
3599
  }
3600

3601
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
10✔
3602

3603
  int32_t type = pOffsetInfo->endOffset.type;
10✔
3604
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
10!
3605
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3606
    taosWUnLockLatch(&tmq->lock);
×
3607
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3608
  }
3609

3610
  code = checkWalRange(pOffsetInfo, offset);
10✔
3611
  if (code != 0) {
10!
3612
    taosWUnLockLatch(&tmq->lock);
×
3613
    return code;
×
3614
  }
3615

3616
  tqInfoC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
10!
3617
  // update the offset, and then commit to vnode
3618
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
10✔
3619
  pOffsetInfo->endOffset.version = offset;
10✔
3620
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
10✔
3621
  pVg->seekUpdated = true;
10✔
3622
  SEpSet epSet = pVg->epSet;
10✔
3623
  taosWUnLockLatch(&tmq->lock);
10✔
3624

3625
  SMqSeekReq req = {0};
10✔
3626
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
10✔
3627
  req.head.vgId = vgId;
10✔
3628
  req.consumerId = tmq->consumerId;
10✔
3629

3630
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
10✔
3631
  if (msgSize < 0) {
10!
3632
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3633
  }
3634

3635
  char* msg = taosMemoryCalloc(1, msgSize);
10!
3636
  if (NULL == msg) {
10!
3637
    return terrno;
×
3638
  }
3639

3640
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
10!
3641
    taosMemoryFree(msg);
×
3642
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3643
  }
3644

3645
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
10!
3646
  if (sendInfo == NULL) {
10!
3647
    taosMemoryFree(msg);
×
3648
    return terrno;
×
3649
  }
3650

3651
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
10!
3652
  if (pParam == NULL) {
10!
3653
    taosMemoryFree(msg);
×
3654
    taosMemoryFree(sendInfo);
×
3655
    return terrno;
×
3656
  }
3657
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
10!
3658
    taosMemoryFree(msg);
×
3659
    taosMemoryFree(sendInfo);
×
3660
    taosMemoryFree(pParam);
×
3661
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3662
  }
3663

3664
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
10✔
3665
  sendInfo->requestId = generateRequestId();
10✔
3666
  sendInfo->requestObjRefId = 0;
10✔
3667
  sendInfo->param = pParam;
10✔
3668
  sendInfo->fp = tmqSeekCb;
10✔
3669
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
10✔
3670

3671
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
10✔
3672
  if (code != 0) {
10!
3673
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3674
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3675
    }
3676
    taosMemoryFree(pParam);
×
3677
    return code;
×
3678
  }
3679

3680
  if (tsem2_wait(&pParam->sem) != 0){
10!
3681
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3682
  }
3683
  code = pParam->code;
10✔
3684
  if(tsem2_destroy(&pParam->sem) != 0) {
10!
3685
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3686
  }
3687
  taosMemoryFree(pParam);
10!
3688

3689
  tqInfoC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
10!
3690

3691
  return code;
10✔
3692
}
3693

3694
TAOS* tmq_get_connect(tmq_t* tmq) {
×
3695
  if (tmq && tmq->pTscObj) {
×
3696
    return (TAOS*)(&(tmq->pTscObj->id));
×
3697
  }
3698
  return NULL;
×
3699
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc