• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3911

24 Apr 2025 11:36PM UTC coverage: 53.735% (-1.6%) from 55.295%
#3911

push

travis-ci

happyguoxy
Sync branches at 2025-04-25 07:35

170049 of 316459 relevant lines covered (53.73%)

1192430.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.25
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "parser.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24
#include "tref.h"
25
#include "ttimer.h"
26

27
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
28
#define tqInfoC(...)  do { if (cDebugFlag & DEBUG_INFO  || tqClientDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  INFO  ", DEBUG_INFO,  tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
29
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
30

31
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
32
#define DEFAULT_HEARTBEAT_INTERVAL     3000
33
#define DEFAULT_ASKEP_INTERVAL         1000
34
#define DEFAULT_COMMIT_CNT             1
35
#define SUBSCRIBE_RETRY_MAX_COUNT      240
36
#define SUBSCRIBE_RETRY_INTERVAL       500
37

38

39
#define SET_ERROR_MSG_TMQ(MSG) \
40
  if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
41

42
#define PROCESS_POLL_RSP(FUNC,DATA) \
43
  SDecoder decoder = {0}; \
44
  tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \
45
  if (FUNC(&decoder, DATA) < 0) { \
46
    tDecoderClear(&decoder); \
47
    code = terrno; \
48
    goto END;\
49
  }\
50
  tDecoderClear(&decoder);\
51
  (void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
52

53
#define DELETE_POLL_RSP(FUNC,DATA) \
54
  SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
55
  taosMemoryFreeClear(pRsp->pEpset);             \
56
  taosMemoryFreeClear(pRsp->data);               \
57
  FUNC(DATA);
58

59
enum {
60
  TMQ_VG_STATUS__IDLE = 0,
61
  TMQ_VG_STATUS__WAIT,
62
};
63

64
enum {
65
  TMQ_CONSUMER_STATUS__INIT = 0,
66
  TMQ_CONSUMER_STATUS__READY,
67
  TMQ_CONSUMER_STATUS__LOST,
68
  TMQ_CONSUMER_STATUS__CLOSED,
69
};
70

71
enum {
72
  TMQ_DELAYED_TASK__ASK_EP = 1,
73
  TMQ_DELAYED_TASK__COMMIT,
74
};
75

76
typedef struct {
77
  tmr_h               timer;
78
  int32_t             rsetId;
79
  TdThreadMutex       lock;
80
} SMqMgmt;
81

82
struct tmq_list_t {
83
  SArray container;
84
};
85

86
struct tmq_conf_t {
87
  char           clientId[TSDB_CLIENT_ID_LEN];
88
  char           groupId[TSDB_CGROUP_LEN];
89
  int8_t         autoCommit;
90
  int8_t         resetOffset;
91
  int8_t         withTbName;
92
  int8_t         snapEnable;
93
  int8_t         replayEnable;
94
  int8_t         sourceExcluded;  // do not consume, bit
95
  int8_t         rawData;         // fetch raw data
96
  int32_t        maxPollWaitTime;
97
  int32_t        minPollRows;
98
  uint16_t       port;
99
  int32_t        autoCommitInterval;
100
  int32_t        sessionTimeoutMs;
101
  int32_t        heartBeatIntervalMs;
102
  int32_t        maxPollIntervalMs;
103
  char*          ip;
104
  char*          user;
105
  char*          pass;
106
  tmq_commit_cb* commitCb;
107
  void*          commitCbUserParam;
108
  int8_t         enableBatchMeta;
109
};
110

111
struct tmq_t {
112
  int64_t        refId;
113
  char           groupId[TSDB_CGROUP_LEN];
114
  char           clientId[TSDB_CLIENT_ID_LEN];
115
  char           user[TSDB_USER_LEN];
116
  char           fqdn[TSDB_FQDN_LEN];
117
  int8_t         withTbName;
118
  int8_t         useSnapshot;
119
  int8_t         autoCommit;
120
  int32_t        autoCommitInterval;
121
  int32_t        sessionTimeoutMs;
122
  int32_t        heartBeatIntervalMs;
123
  int32_t        maxPollIntervalMs;
124
  int8_t         resetOffsetCfg;
125
  int8_t         replayEnable;
126
  int8_t         sourceExcluded;  // do not consume, bit
127
  int8_t         rawData;         // fetch raw data
128
  int32_t        maxPollWaitTime;
129
  int32_t        minPollRows;
130
  int64_t        consumerId;
131
  tmq_commit_cb* commitCb;
132
  void*          commitCbUserParam;
133
  int8_t         enableBatchMeta;
134

135
  // status
136
  SRWLatch lock;
137
  int8_t   status;
138
  int32_t  epoch;
139
  // poll info
140
  int64_t pollCnt;
141
  int64_t totalRows;
142
  int8_t  pollFlag;
143

144
  // timer
145
  tmr_h       hbLiveTimer;
146
  tmr_h       epTimer;
147
  tmr_h       commitTimer;
148
  STscObj*    pTscObj;       // connection
149
  SArray*     clientTopics;  // SArray<SMqClientTopic>
150
  STaosQueue* mqueue;        // queue of rsp
151
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
152
  tsem2_t     rspSem;
153
};
154

155
typedef struct {
156
  int32_t code;
157
  tsem2_t sem;
158
} SAskEpInfo;
159

160
typedef struct {
161
  STqOffsetVal committedOffset;
162
  STqOffsetVal endOffset;    // the last version in TAOS_RES + 1
163
  STqOffsetVal beginOffset;  // the first version in TAOS_RES
164
  int64_t      walVerBegin;
165
  int64_t      walVerEnd;
166
} SVgOffsetInfo;
167

168
typedef struct {
169
  int64_t       pollCnt;
170
  int64_t       numOfRows;
171
  SVgOffsetInfo offsetInfo;
172
  int32_t       vgId;
173
  int32_t       vgStatus;
174
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
175
  int64_t       blockReceiveTs;       // once empty block is received, idle for ignoreCnt then start to poll data
176
  int64_t       blockSleepForReplay;  // once empty block is received, idle for ignoreCnt then start to poll data
177
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
178
  SEpSet        epSet;
179
} SMqClientVg;
180

181
typedef struct {
182
  char           topicName[TSDB_TOPIC_FNAME_LEN];
183
  char           db[TSDB_DB_FNAME_LEN];
184
  SArray*        vgs;  // SArray<SMqClientVg>
185
  SSchemaWrapper schema;
186
  int8_t         noPrivilege;
187
} SMqClientTopic;
188

189
typedef struct {
190
  int32_t         vgId;
191
  char            topicName[TSDB_TOPIC_FNAME_LEN];
192
  SMqClientTopic* topicHandle;
193
  uint64_t        reqId;
194
  SEpSet*         pEpset;
195
  void*           data;
196
  uint32_t        len;
197
  union {
198
    struct{
199
      SMqRspHead   head;
200
      STqOffsetVal rspOffset;
201
    };
202
    SMqDataRsp      dataRsp;
203
    SMqMetaRsp      metaRsp;
204
    SMqBatchMetaRsp batchMetaRsp;
205
  };
206
} SMqPollRspWrapper;
207

208
typedef struct {
209
  int32_t code;
210
  int8_t  tmqRspType;
211
  int32_t epoch;
212
  union{
213
    SMqPollRspWrapper pollRsp;
214
    SMqAskEpRsp       epRsp;
215
  };
216
} SMqRspWrapper;
217

218
typedef struct {
219
  tsem2_t rspSem;
220
  int32_t rspErr;
221
} SMqSubscribeCbParam;
222

223
typedef struct {
224
  int64_t refId;
225
  bool    sync;
226
  void*   pParam;
227
} SMqAskEpCbParam;
228

229
typedef struct {
230
  int64_t  refId;
231
  char     topicName[TSDB_TOPIC_FNAME_LEN];
232
  int32_t  vgId;
233
  uint64_t requestId;  // request id for debug purpose
234
} SMqPollCbParam;
235

236
typedef struct {
237
  tsem2_t       rsp;
238
  int32_t       numOfRsp;
239
  SArray*       pList;
240
  TdThreadMutex mutex;
241
  int64_t       consumerId;
242
  char*         pTopicName;
243
  int32_t       code;
244
} SMqVgCommon;
245

246
typedef struct {
247
  tsem2_t sem;
248
  int32_t code;
249
} SMqSeekParam;
250

251
typedef struct {
252
  tsem2_t     sem;
253
  int32_t     code;
254
  SMqVgOffset vgOffset;
255
} SMqCommittedParam;
256

257
typedef struct {
258
  int32_t      vgId;
259
  int32_t      epoch;
260
  int32_t      totalReq;
261
  SMqVgCommon* pCommon;
262
} SMqVgWalInfoParam;
263

264
typedef struct {
265
  int64_t        refId;
266
  int32_t        epoch;
267
  int32_t        waitingRspNum;
268
  int32_t        code;
269
  tmq_commit_cb* callbackFn;
270
  void*          userParam;
271
} SMqCommitCbParamSet;
272

273
typedef struct {
274
  SMqCommitCbParamSet* params;
275
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
276
  int32_t              vgId;
277
  int64_t              consumerId;
278
} SMqCommitCbParam;
279

280
typedef struct {
281
  tsem2_t sem;
282
  int32_t code;
283
} SSyncCommitInfo;
284

285
typedef struct {
286
  STqOffsetVal currentOffset;
287
  STqOffsetVal commitOffset;
288
  STqOffsetVal seekOffset;
289
  int64_t      numOfRows;
290
  int32_t      vgStatus;
291
} SVgroupSaveInfo;
292

293
static   TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
294
volatile int32_t        tmqInitRes = 0;               // initialize rsp code
295
static   SMqMgmt        tmqMgmt = {0};
296

297
tmq_conf_t* tmq_conf_new() {
58✔
298
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
58✔
299
  if (conf == NULL) {
58✔
300
    return conf;
×
301
  }
302

303
  conf->withTbName = false;
58✔
304
  conf->autoCommit = true;
58✔
305
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
58✔
306
  conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
58✔
307
  conf->enableBatchMeta = false;
58✔
308
  conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
58✔
309
  conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
58✔
310
  conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
58✔
311
  conf->maxPollWaitTime = DEFAULT_MAX_POLL_WAIT_TIME;
58✔
312
  conf->minPollRows = DEFAULT_MIN_POLL_ROWS;
58✔
313

314
  return conf;
58✔
315
}
316

317
void tmq_conf_destroy(tmq_conf_t* conf) {
58✔
318
  if (conf) {
58✔
319
    if (conf->ip) {
58✔
320
      taosMemoryFree(conf->ip);
13✔
321
    }
322
    if (conf->user) {
58✔
323
      taosMemoryFree(conf->user);
55✔
324
    }
325
    if (conf->pass) {
58✔
326
      taosMemoryFree(conf->pass);
55✔
327
    }
328
    taosMemoryFree(conf);
58✔
329
  }
330
}
58✔
331

332
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
358✔
333
  int32_t code = 0;
358✔
334
  if (conf == NULL || key == NULL || value == NULL) {
358✔
335
    tqErrorC("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
×
336
    return TMQ_CONF_INVALID;
×
337
  }
338
  if (strcasecmp(key, "group.id") == 0) {
358✔
339
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
58✔
340
    return TMQ_CONF_OK;
58✔
341
  }
342

343
  if (strcasecmp(key, "client.id") == 0) {
300✔
344
    tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
13✔
345
    return TMQ_CONF_OK;
13✔
346
  }
347

348
  if (strcasecmp(key, "enable.auto.commit") == 0) {
287✔
349
    if (strcasecmp(value, "true") == 0) {
59✔
350
      conf->autoCommit = true;
50✔
351
      return TMQ_CONF_OK;
50✔
352
    } else if (strcasecmp(value, "false") == 0) {
9✔
353
      conf->autoCommit = false;
9✔
354
      return TMQ_CONF_OK;
9✔
355
    } else {
356
      tqErrorC("invalid value for enable.auto.commit:%s", value);
×
357
      return TMQ_CONF_INVALID;
×
358
    }
359
  }
360

361
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
228✔
362
    int64_t tmp;
363
    code = taosStr2int64(value, &tmp);
26✔
364
    if (tmp < 0 || code != 0) {
26✔
365
      tqErrorC("invalid value for auto.commit.interval.ms:%s", value);
×
366
      return TMQ_CONF_INVALID;
×
367
    }
368
    conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
26✔
369
    return TMQ_CONF_OK;
26✔
370
  }
371

372
  if (strcasecmp(key, "session.timeout.ms") == 0) {
202✔
373
    int64_t tmp;
374
    code = taosStr2int64(value, &tmp);
2✔
375
    if (tmp < 6000 || tmp > 1800000 || code != 0) {
2✔
376
      tqErrorC("invalid value for session.timeout.ms:%s", value);
×
377
      return TMQ_CONF_INVALID;
×
378
    }
379
    conf->sessionTimeoutMs = tmp;
2✔
380
    return TMQ_CONF_OK;
2✔
381
  }
382

383
  if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
200✔
384
    int64_t tmp;
385
    code = taosStr2int64(value, &tmp);
×
386
    if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
×
387
      tqErrorC("invalid value for heartbeat.interval.ms:%s", value);
×
388
      return TMQ_CONF_INVALID;
×
389
    }
390
    conf->heartBeatIntervalMs = tmp;
×
391
    return TMQ_CONF_OK;
×
392
  }
393

394
  if (strcasecmp(key, "max.poll.interval.ms") == 0) {
200✔
395
    int32_t tmp;
396
    code = taosStr2int32(value, &tmp);
2✔
397
    if (tmp < 1000 || code != 0) {
2✔
398
      tqErrorC("invalid value for max.poll.interval.ms:%s", value);
×
399
      return TMQ_CONF_INVALID;
×
400
    }
401
    conf->maxPollIntervalMs = tmp;
2✔
402
    return TMQ_CONF_OK;
2✔
403
  }
404

405
  if (strcasecmp(key, "auto.offset.reset") == 0) {
198✔
406
    if (strcasecmp(value, "none") == 0) {
26✔
407
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
×
408
      return TMQ_CONF_OK;
×
409
    } else if (strcasecmp(value, "earliest") == 0) {
26✔
410
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
21✔
411
      return TMQ_CONF_OK;
21✔
412
    } else if (strcasecmp(value, "latest") == 0) {
5✔
413
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
5✔
414
      return TMQ_CONF_OK;
5✔
415
    } else {
416
      tqErrorC("invalid value for auto.offset.reset:%s", value);
×
417
      return TMQ_CONF_INVALID;
×
418
    }
419
  }
420

421
  if (strcasecmp(key, "msg.with.table.name") == 0) {
172✔
422
    if (strcasecmp(value, "true") == 0) {
25✔
423
      conf->withTbName = true;
19✔
424
      return TMQ_CONF_OK;
19✔
425
    } else if (strcasecmp(value, "false") == 0) {
6✔
426
      conf->withTbName = false;
6✔
427
      return TMQ_CONF_OK;
6✔
428
    } else {
429
      tqErrorC("invalid value for msg.with.table.name:%s", value);
×
430
      return TMQ_CONF_INVALID;
×
431
    }
432
  }
433

434
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
147✔
435
    if (strcasecmp(value, "true") == 0) {
12✔
436
      conf->snapEnable = true;
7✔
437
      return TMQ_CONF_OK;
7✔
438
    } else if (strcasecmp(value, "false") == 0) {
5✔
439
      conf->snapEnable = false;
5✔
440
      return TMQ_CONF_OK;
5✔
441
    } else {
442
      tqErrorC("invalid value for experimental.snapshot.enable:%s", value);
×
443
      return TMQ_CONF_INVALID;
×
444
    }
445
  }
446

447
  if (strcasecmp(key, "td.connect.ip") == 0) {
135✔
448
    void *tmp = taosStrdup(value);
13✔
449
    if (tmp == NULL) {
13✔
450
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
451
      return TMQ_CONF_INVALID;
×
452
    }
453
    conf->ip = tmp;
13✔
454
    return TMQ_CONF_OK;
13✔
455
  }
456

457
  if (strcasecmp(key, "td.connect.user") == 0) {
122✔
458
    void *tmp = taosStrdup(value);
55✔
459
    if (tmp == NULL) {
55✔
460
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
461
      return TMQ_CONF_INVALID;
×
462
    }
463
    conf->user = tmp;
55✔
464
    return TMQ_CONF_OK;
55✔
465
  }
466

467
  if (strcasecmp(key, "td.connect.pass") == 0) {
67✔
468
    void *tmp = taosStrdup(value);
55✔
469
    if (tmp == NULL) {
55✔
470
      tqErrorC("tmq_conf_set out of memory:%d", terrno);
×
471
      return TMQ_CONF_INVALID;
×
472
    }
473
    conf->pass = tmp;
55✔
474
    return TMQ_CONF_OK;
55✔
475
  }
476

477
  if (strcasecmp(key, "td.connect.port") == 0) {
12✔
478
    int64_t tmp;
479
    code = taosStr2int64(value, &tmp);
11✔
480
    if (tmp <= 0 || tmp > 65535 || code != 0) {
11✔
481
      tqErrorC("invalid value for td.connect.port:%s", value);
5✔
482
      return TMQ_CONF_INVALID;
5✔
483
    }
484

485
    conf->port = tmp;
6✔
486
    return TMQ_CONF_OK;
6✔
487
  }
488

489
  if (strcasecmp(key, "enable.replay") == 0) {
1✔
490
    if (strcasecmp(value, "true") == 0) {
×
491
      conf->replayEnable = true;
×
492
      return TMQ_CONF_OK;
×
493
    } else if (strcasecmp(value, "false") == 0) {
×
494
      conf->replayEnable = false;
×
495
      return TMQ_CONF_OK;
×
496
    } else {
497
      tqErrorC("invalid value for enable.replay:%s", value);
×
498
      return TMQ_CONF_INVALID;
×
499
    }
500
  }
501
  if (strcasecmp(key, "msg.consume.excluded") == 0) {
1✔
502
    int64_t tmp = 0;
×
503
    code = taosStr2int64(value, &tmp);
×
504
    conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
×
505
    return TMQ_CONF_OK;
×
506
  }
507
  if (strcasecmp(key, "msg.consume.rawdata") == 0) {
1✔
508
    int64_t tmp = 0;
×
509
    code = taosStr2int64(value, &tmp);
×
510
    conf->rawData = (0 == code && tmp != 0) ? 1 : 0;
×
511
    return TMQ_CONF_OK;
×
512
  }
513

514
  if (strcasecmp(key, "fetch.max.wait.ms") == 0) {
1✔
515
    int64_t tmp = 0;
×
516
    code = taosStr2int64(value, &tmp);
×
517
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
×
518
      tqErrorC("invalid value for fetch.max.wait.ms:%s", value);
×
519
      return TMQ_CONF_INVALID;
×
520
    }
521
    conf->maxPollWaitTime = tmp;
×
522
    return TMQ_CONF_OK;
×
523
  }
524

525
  if (strcasecmp(key, "min.poll.rows") == 0) {
1✔
526
    int64_t tmp = 0;
×
527
    code = taosStr2int64(value, &tmp);
×
528
    if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
×
529
      tqErrorC("invalid value for min.poll.rows:%s", value);
×
530
      return TMQ_CONF_INVALID;
×
531
    }
532
    conf->minPollRows = tmp;
×
533
    return TMQ_CONF_OK;
×
534
  }
535

536
  if (strcasecmp(key, "td.connect.db") == 0) {
1✔
537
    return TMQ_CONF_OK;
×
538
  }
539

540
  if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
1✔
541
    int64_t tmp;
542
    code = taosStr2int64(value, &tmp);
×
543
    conf->enableBatchMeta = (0 == code && tmp != 0) ? true : false;
×
544
    return TMQ_CONF_OK;
×
545
  }
546

547
  tqErrorC("unknown key:%s", key);
1✔
548
  return TMQ_CONF_UNKNOWN;
1✔
549
}
550

551
tmq_list_t* tmq_list_new() {
165✔
552
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
165✔
553
}
554

555
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
70✔
556
  if (list == NULL) {
70✔
557
    return TSDB_CODE_INVALID_PARA;
×
558
  }
559
  SArray* container = &list->container;
70✔
560
  if (src == NULL || src[0] == 0) {
70✔
561
    return TSDB_CODE_INVALID_PARA;
×
562
  }
563
  char* topic = taosStrdup(src);
70✔
564
  if (topic == NULL) return terrno;
70✔
565
  if (taosArrayPush(container, &topic) == NULL) {
70✔
566
    taosMemoryFree(topic);
×
567
    return terrno;
×
568
  }
569
  return 0;
70✔
570
}
571

572
void tmq_list_destroy(tmq_list_t* list) {
165✔
573
  if (list == NULL) return;
165✔
574
  SArray* container = &list->container;
165✔
575
  taosArrayDestroyP(container, NULL);
165✔
576
}
577

578
int32_t tmq_list_get_size(const tmq_list_t* list) {
5✔
579
  if (list == NULL) {
5✔
580
    return TSDB_CODE_INVALID_PARA;
×
581
  }
582
  const SArray* container = &list->container;
5✔
583
  return taosArrayGetSize(container);
5✔
584
}
585

586
char** tmq_list_to_c_array(const tmq_list_t* list) {
5✔
587
  if (list == NULL) {
5✔
588
    return NULL;
×
589
  }
590
  const SArray* container = &list->container;
5✔
591
  return container->pData;
5✔
592
}
593

594
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
513✔
595
  if (pParamSet == NULL) {
513✔
596
    return TSDB_CODE_INVALID_PARA;
×
597
  }
598
  int64_t refId = pParamSet->refId;
513✔
599
  int32_t code = 0;
513✔
600
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
513✔
601
  if (tmq == NULL) {
513✔
602
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
603
  }
604

605
  // if no more waiting rsp
606
  if (pParamSet->callbackFn != NULL) {
513✔
607
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
513✔
608
  }
609

610
  taosMemoryFree(pParamSet);
513✔
611
  if (tmq != NULL) {
513✔
612
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
513✔
613
  }
614

615
  return code;
513✔
616
}
617

618
static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
929✔
619
  if (pParamSet == NULL) {
929✔
620
    return TSDB_CODE_INVALID_PARA;
×
621
  }
622
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
929✔
623
  if (waitingRspNum == 0) {
929✔
624
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
513✔
625
             vgId);
626
    return tmqCommitDone(pParamSet);
513✔
627
  } else {
628
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
416✔
629
             waitingRspNum);
630
  }
631
  return 0;
416✔
632
}
633

634
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
470✔
635
  if (pBuf){
470✔
636
    taosMemoryFreeClear(pBuf->pData);
470✔
637
    taosMemoryFreeClear(pBuf->pEpSet);
470✔
638
  }
639
  if(param == NULL){
470✔
640
    return TSDB_CODE_INVALID_PARA;
×
641
  }
642
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
470✔
643
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
470✔
644

645
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
470✔
646
}
647

648
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
470✔
649
                               SMqCommitCbParamSet* pParamSet) {
650
  if (tmq == NULL || epSet == NULL || offset == NULL || pTopicName == NULL || pParamSet == NULL) {
470✔
651
    return TSDB_CODE_INVALID_PARA;
×
652
  }
653
  SMqVgOffset pOffset = {0};
470✔
654

655
  pOffset.consumerId = tmq->consumerId;
470✔
656
  pOffset.offset.val = *offset;
470✔
657
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
470✔
658
  int32_t len = 0;
470✔
659
  int32_t code = 0;
470✔
660
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
470✔
661
  if (code < 0) {
470✔
662
    return TSDB_CODE_INVALID_PARA;
×
663
  }
664

665
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
470✔
666
  if (buf == NULL) {
470✔
667
    return terrno;
×
668
  }
669

670
  ((SMsgHead*)buf)->vgId = htonl(vgId);
470✔
671

672
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
470✔
673

674
  SEncoder encoder = {0};
470✔
675
  tEncoderInit(&encoder, abuf, len);
470✔
676
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
470✔
677
    tEncoderClear(&encoder);
×
678
    taosMemoryFree(buf);
×
679
    return TSDB_CODE_INVALID_PARA;
×
680
  }
681
  tEncoderClear(&encoder);
470✔
682

683
  // build param
684
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
470✔
685
  if (pParam == NULL) {
470✔
686
    taosMemoryFree(buf);
×
687
    return terrno;
×
688
  }
689

690
  pParam->params = pParamSet;
470✔
691
  pParam->vgId = vgId;
470✔
692
  pParam->consumerId = tmq->consumerId;
470✔
693

694
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
470✔
695

696
  // build send info
697
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
470✔
698
  if (pMsgSendInfo == NULL) {
470✔
699
    taosMemoryFree(buf);
×
700
    taosMemoryFree(pParam);
×
701
    return terrno;
×
702
  }
703

704
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
470✔
705

706
  pMsgSendInfo->requestId = generateRequestId();
470✔
707
  pMsgSendInfo->requestObjRefId = 0;
470✔
708
  pMsgSendInfo->param = pParam;
470✔
709
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
470✔
710
  pMsgSendInfo->fp = tmqCommitCb;
470✔
711
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
470✔
712

713
  // int64_t transporterId = 0;
714
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
470✔
715
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
470✔
716
  if (code != 0) {
470✔
717
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
718
  }
719
  return code;
470✔
720
}
721

722
static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
80✔
723
  if (tmq == NULL || pTopicName == NULL || topic == NULL) {
80✔
724
    return TSDB_CODE_INVALID_PARA;
×
725
  }
726
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
80✔
727
  for (int32_t i = 0; i < numOfTopics; ++i) {
80✔
728
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
77✔
729
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
77✔
730
      continue;
×
731
    }
732
    *topic = pTopic;
77✔
733
    return 0;
77✔
734
  }
735

736
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
3✔
737
  return TSDB_CODE_TMQ_INVALID_TOPIC;
3✔
738
}
739

740
static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
514✔
741
                                       SMqCommitCbParamSet** ppParamSet) {
742
  if (tmq == NULL || ppParamSet == NULL) {
514✔
743
    return TSDB_CODE_INVALID_PARA;
×
744
  }
745
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
514✔
746
  if (pParamSet == NULL) {
514✔
747
    return terrno;
×
748
  }
749

750
  pParamSet->refId = tmq->refId;
514✔
751
  pParamSet->epoch = tmq->epoch;
514✔
752
  pParamSet->callbackFn = pCommitFp;
514✔
753
  pParamSet->userParam = userParam;
514✔
754
  pParamSet->waitingRspNum = rspNum;
514✔
755
  *ppParamSet = pParamSet;
514✔
756
  return 0;
514✔
757
}
758

759
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
68✔
760
  if (tmq == NULL || pTopicName == NULL || pVg == NULL) {
68✔
761
    return TSDB_CODE_INVALID_PARA;
×
762
  }
763
  SMqClientTopic* pTopic = NULL;
68✔
764
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
68✔
765
  if (code != 0) {
68✔
766
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
767
    return code;
×
768
  }
769

770
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
68✔
771
  for (int32_t i = 0; i < numOfVgs; ++i) {
129✔
772
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
129✔
773
    if (pClientVg && pClientVg->vgId == vgId) {
129✔
774
      *pVg = pClientVg;
68✔
775
      break;
68✔
776
    }
777
  }
778

779
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
68✔
780
}
781

782
static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
1,076✔
783
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) {
1,076✔
784
    return TSDB_CODE_INVALID_PARA;
×
785
  }
786
  int32_t code = 0;
1,076✔
787
  if (offsetVal->type <= 0) {
1,076✔
788
    code = TSDB_CODE_TMQ_INVALID_MSG;
35✔
789
    return code;
35✔
790
  }
791
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
1,041✔
792
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
571✔
793
    return code;
571✔
794
  }
795
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
470✔
796
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
470✔
797

798
  char commitBuf[TSDB_OFFSET_LEN] = {0};
470✔
799
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
470✔
800

801
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
470✔
802
  if (code != TSDB_CODE_SUCCESS) {
470✔
803
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
804
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
805
    return code;
×
806
  }
807

808
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
470✔
809
           tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
810
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
470✔
811
  return code;
470✔
812
}
813

814
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
55✔
815
                                 tmq_commit_cb* pCommitFp, void* userParam) {
816
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL) {
55✔
817
    return TSDB_CODE_INVALID_PARA;
×
818
  }
819
  tqInfoC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
55✔
820
  SMqCommitCbParamSet* pParamSet = NULL;
55✔
821
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
55✔
822
  if (code != 0){
55✔
823
    return code;
×
824
  }
825

826
  taosRLockLatch(&tmq->lock);
55✔
827
  SMqClientVg* pVg = NULL;
55✔
828
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
55✔
829
  if (code == 0) {
55✔
830
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
55✔
831
  }
832
  taosRUnLockLatch(&tmq->lock);
55✔
833

834
  if (code != 0){
55✔
835
    taosMemoryFree(pParamSet);
1✔
836
  }
837
  return code;
55✔
838
}
839

840
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
53✔
841
  char*        pTopicName = NULL;
53✔
842
  int32_t      vgId = 0;
53✔
843
  STqOffsetVal offsetVal = {0};
53✔
844
  int32_t      code = 0;
53✔
845

846
  if (pRes == NULL || tmq == NULL) {
53✔
847
    code = TSDB_CODE_INVALID_PARA;
×
848
    goto end;
×
849
  }
850

851
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
53✔
852
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
853
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
53✔
854
    pTopicName = pRspObj->topic;
53✔
855
    vgId = pRspObj->vgId;
53✔
856
    offsetVal = pRspObj->rspOffset;
53✔
857
  } else {
858
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
859
    goto end;
×
860
  }
861

862
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
53✔
863

864
  end:
53✔
865
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
53✔
866
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
867
    pCommitFp(tmq, code, userParam);
×
868
  }
869
}
53✔
870

871
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
459✔
872
  if (tmq == NULL || pParamSet == NULL) {
459✔
873
    return TSDB_CODE_INVALID_PARA;
×
874
  }
875
  int32_t code = 0;
459✔
876
  taosRLockLatch(&tmq->lock);
459✔
877
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
459✔
878
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
459✔
879

880
  for (int32_t i = 0; i < numOfTopics; i++) {
906✔
881
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
447✔
882
    if (pTopic == NULL) {
447✔
883
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
884
      goto END;
×
885
    }
886
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
447✔
887
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
447✔
888
    for (int32_t j = 0; j < numOfVgroups; j++) {
1,468✔
889
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
1,021✔
890
      if (pVg == NULL) {
1,021✔
891
        code = terrno;
×
892
        goto END;
×
893
      }
894

895
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
1,021✔
896
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
1,021✔
897
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
35✔
898
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
899
      }
900
    }
901
  }
902
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
459✔
903
           numOfTopics);
904
  END:
366✔
905
  taosRUnLockLatch(&tmq->lock);
459✔
906
  return code;
459✔
907
}
908

909
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
459✔
910
  if (tmq == NULL) {
459✔
911
    return;
×
912
  }
913
  int32_t code = 0;
459✔
914
  SMqCommitCbParamSet* pParamSet = NULL;
459✔
915
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
916
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
459✔
917
  if (code != 0) {
459✔
918
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
919
    if (pCommitFp != NULL) {
×
920
      pCommitFp(tmq, code, userParam);
×
921
    }
922
    return;
×
923
  }
924
  code = innerCommitAll(tmq, pParamSet);
459✔
925
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
459✔
926
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
3✔
927
  }
928

929
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
459✔
930
  if (code != 0) {
459✔
931
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
932
  }
933
  return;
459✔
934
}
935

936
static void generateTimedTask(int64_t refId, int32_t type) {
883✔
937
  tmq_t*  tmq = NULL;
883✔
938
  int8_t* pTaskType = NULL;
883✔
939
  int32_t code = 0;
883✔
940

941
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
883✔
942
  if (tmq == NULL) return;
883✔
943

944
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
883✔
945
  if (code == TSDB_CODE_SUCCESS) {
883✔
946
    *pTaskType = type;
883✔
947
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
883✔
948
      if (tsem2_post(&tmq->rspSem) != 0){
883✔
949
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
950
      }
951
    }else{
952
      taosFreeQitem(pTaskType);
×
953
    }
954
  }
955

956
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
883✔
957
  if (code != 0){
883✔
958
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
959
  }
960
}
961

962
void tmqAssignAskEpTask(void* param, void* tmrId) {
555✔
963
  int64_t refId = (int64_t)param;
555✔
964
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
555✔
965
}
555✔
966

967
void tmqReplayTask(void* param, void* tmrId) {
×
968
  int64_t refId = (int64_t)param;
×
969
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
×
970
  if (tmq == NULL) return;
×
971

972
  if (tsem2_post(&tmq->rspSem) != 0){
×
973
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
974
  }
975
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
×
976
  if (code != 0){
×
977
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
978
  }
979
}
980

981
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
328✔
982
  int64_t refId = (int64_t)param;
328✔
983
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
328✔
984
}
328✔
985

986
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
330✔
987
  if (pMsg == NULL) {
330✔
988
    return TSDB_CODE_INVALID_PARA;
×
989
  }
990

991
  if (param == NULL || code != 0){
330✔
992
    goto END;
8✔
993
  }
994

995
  SMqHbRsp rsp = {0};
322✔
996
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
322✔
997
  if (code != 0) {
321✔
998
    goto END;
×
999
  }
1000

1001
  int64_t refId = (int64_t)param;
321✔
1002
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
321✔
1003
  if (tmq != NULL) {
321✔
1004
    taosWLockLatch(&tmq->lock);
320✔
1005
    for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
625✔
1006
      STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
302✔
1007
      if (privilege == NULL) {
303✔
1008
        continue;
×
1009
      }
1010
      int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
303✔
1011
      for (int32_t j = 0; j < topicNumCur; j++) {
667✔
1012
        SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
363✔
1013
        if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
363✔
1014
          tqInfoC("consumer:0x%" PRIx64 ", update noPrivilege:%d, topic:%s", tmq->consumerId, privilege->noPrivilege, privilege->topic);
302✔
1015
          pTopicCur->noPrivilege = privilege->noPrivilege;
303✔
1016
        }
1017
      }
1018
    }
1019
    taosWUnLockLatch(&tmq->lock);
321✔
1020
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
321✔
1021
    if (code != 0){
321✔
1022
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
1023
    }
1024
  }
1025

1026
  tqClientDebugFlag = rsp.debugFlag;
322✔
1027

1028
  tDestroySMqHbRsp(&rsp);
322✔
1029

1030
  END:
330✔
1031
  taosMemoryFree(pMsg->pData);
330✔
1032
  taosMemoryFree(pMsg->pEpSet);
330✔
1033
  return code;
330✔
1034
}
1035

1036
void tmqSendHbReq(void* param, void* tmrId) {
330✔
1037
  int64_t refId = (int64_t)param;
330✔
1038

1039
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
330✔
1040
  if (tmq == NULL) {
330✔
1041
    return;
×
1042
  }
1043

1044
  SMqHbReq req = {0};
330✔
1045
  req.consumerId = tmq->consumerId;
330✔
1046
  req.epoch = tmq->epoch;
330✔
1047
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
330✔
1048
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
330✔
1049
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
330✔
1050
  if (req.topics == NULL) {
330✔
1051
    goto END;
×
1052
  }
1053
  taosRLockLatch(&tmq->lock);
330✔
1054
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
644✔
1055
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
314✔
1056
    if (pTopic == NULL) {
314✔
1057
      continue;
×
1058
    }
1059
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
314✔
1060
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
314✔
1061
    if (data == NULL) {
314✔
1062
      continue;
×
1063
    }
1064
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
314✔
1065
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
314✔
1066
    if (data->offsetRows == NULL) {
314✔
1067
      continue;
×
1068
    }
1069
    for (int j = 0; j < numOfVgroups; j++) {
1,289✔
1070
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
975✔
1071
      if (pVg == NULL) {
975✔
1072
        continue;
×
1073
      }
1074
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
975✔
1075
      if (offRows == NULL) {
975✔
1076
        continue;
×
1077
      }
1078
      offRows->vgId = pVg->vgId;
975✔
1079
      offRows->rows = pVg->numOfRows;
975✔
1080
      offRows->offset = pVg->offsetInfo.endOffset;
975✔
1081
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
975✔
1082
      char buf[TSDB_OFFSET_LEN] = {0};
975✔
1083
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
975✔
1084
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
975✔
1085
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
1086
    }
1087
  }
1088
  taosRUnLockLatch(&tmq->lock);
330✔
1089

1090
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
330✔
1091
  if (tlen < 0) {
330✔
1092
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
1093
    goto END;
×
1094
  }
1095

1096
  void* pReq = taosMemoryCalloc(1, tlen);
330✔
1097
  if (pReq == NULL) {
330✔
1098
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
1099
    goto END;
×
1100
  }
1101

1102
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
330✔
1103
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
1104
    taosMemoryFree(pReq);
×
1105
    goto END;
×
1106
  }
1107

1108
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
330✔
1109
  if (sendInfo == NULL) {
330✔
1110
    taosMemoryFree(pReq);
×
1111
    goto END;
×
1112
  }
1113

1114
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
330✔
1115

1116
  sendInfo->requestId = generateRequestId();
330✔
1117
  sendInfo->requestObjRefId = 0;
330✔
1118
  sendInfo->param = (void*)refId;
330✔
1119
  sendInfo->fp = tmqHbCb;
330✔
1120
  sendInfo->msgType = TDMT_MND_TMQ_HB;
330✔
1121

1122

1123
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
330✔
1124

1125
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
330✔
1126
  if (code != 0) {
330✔
1127
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
×
1128
  }
1129
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
330✔
1130

1131
  END:
330✔
1132
  tDestroySMqHbReq(&req);
330✔
1133
  if (tmrId != NULL) {
330✔
1134
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
231✔
1135
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
231✔
1136
  }
1137
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
330✔
1138
  if (ret != 0){
330✔
1139
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1140
  }
1141
}
1142

1143
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
143✔
1144
  if (code != 0 && pTmq != NULL) {
143✔
1145
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
1146
  }
1147
}
143✔
1148

1149
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
6,567✔
1150
  if (rspWrapper == NULL) {
6,567✔
1151
    return;
×
1152
  }
1153
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
6,567✔
1154
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
504✔
1155
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
6,063✔
1156
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
6,063✔
1157
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
×
1158
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
×
1159
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
×
1160
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
×
1161
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
×
1162
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
×
1163
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
1164
    DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
×
1165
  }
1166
}
1167

1168
static void freeClientVg(void* param) {
253✔
1169
  if (param == NULL) {
253✔
1170
    return;
×
1171
  }
1172
  SMqClientVg* pVg = param;
253✔
1173
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
253✔
1174
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
253✔
1175
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
253✔
1176
}
1177
static void freeClientTopic(void* param) {
67✔
1178
  if (param == NULL) {
67✔
1179
    return;
×
1180
  }
1181
  SMqClientTopic* pTopic = param;
67✔
1182
  taosMemoryFreeClear(pTopic->schema.pSchema);
67✔
1183
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
67✔
1184
}
1185

1186
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
68✔
1187
                                   tmq_t* tmq) {
1188
  if (pTopic == NULL || pTopicEp == NULL || pVgOffsetHashMap == NULL || tmq == NULL) {
68✔
1189
    return;
×
1190
  }
1191
  pTopic->schema = pTopicEp->schema;
68✔
1192
  pTopicEp->schema.nCols = 0;
68✔
1193
  pTopicEp->schema.pSchema = NULL;
68✔
1194

1195
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
68✔
1196
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
68✔
1197

1198
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
68✔
1199
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
68✔
1200

1201
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
68✔
1202
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
68✔
1203
  if (pTopic->vgs == NULL) {
68✔
1204
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
1205
    return;
×
1206
  }
1207
  for (int32_t j = 0; j < vgNumGet; j++) {
322✔
1208
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
254✔
1209
    if (pVgEp == NULL) {
254✔
1210
      continue;
×
1211
    }
1212
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
254✔
1213
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
254✔
1214

1215
    STqOffsetVal offsetNew = {0};
254✔
1216
    offsetNew.type = tmq->resetOffsetCfg;
254✔
1217

1218
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
254✔
1219
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
1220

1221
    SMqClientVg clientVg = {
762✔
1222
        .pollCnt = 0,
1223
        .vgId = pVgEp->vgId,
254✔
1224
        .epSet = pVgEp->epSet,
1225
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
254✔
1226
        .vgSkipCnt = 0,
1227
        .blockReceiveTs = 0,
1228
        .blockSleepForReplay = 0,
1229
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
254✔
1230
    };
1231

1232
    clientVg.offsetInfo.walVerBegin = -1;
254✔
1233
    clientVg.offsetInfo.walVerEnd = -1;
254✔
1234
    clientVg.seekUpdated = false;
254✔
1235
    if (pInfo) {
254✔
1236
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
10✔
1237
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
10✔
1238
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
10✔
1239
    } else {
1240
      clientVg.offsetInfo.endOffset = offsetNew;
244✔
1241
      clientVg.offsetInfo.committedOffset = offsetNew;
244✔
1242
      clientVg.offsetInfo.beginOffset = offsetNew;
244✔
1243
    }
1244
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
508✔
1245
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
1246
               pTopic->topicName);
1247
      freeClientVg(&clientVg);
×
1248
    }
1249
  }
1250
}
1251

1252
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
63✔
1253
  if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
63✔
1254
    return;
×
1255
  }
1256
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
63✔
1257
  if (pVgOffsetHashMap == NULL) {
63✔
1258
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
1259
    return;
×
1260
  }
1261

1262
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
63✔
1263
  for (int32_t i = 0; i < topicNumCur; i++) {
71✔
1264
    // find old topic
1265
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
8✔
1266
    if (pTopicCur && pTopicCur->vgs) {
8✔
1267
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
8✔
1268
      tqInfoC("consumer:0x%" PRIx64 ", current vg num:%d", tmq->consumerId, vgNumCur);
8✔
1269
      for (int32_t j = 0; j < vgNumCur; j++) {
18✔
1270
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
10✔
1271
        if (pVgCur == NULL) {
10✔
1272
          continue;
×
1273
        }
1274
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
10✔
1275
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
10✔
1276

1277
        char buf[TSDB_OFFSET_LEN] = {0};
10✔
1278
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
10✔
1279
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
10✔
1280

1281
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
10✔
1282
            .seekOffset = pVgCur->offsetInfo.beginOffset,
1283
            .commitOffset = pVgCur->offsetInfo.committedOffset,
1284
            .numOfRows = pVgCur->numOfRows,
10✔
1285
            .vgStatus = pVgCur->vgStatus};
10✔
1286
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
10✔
1287
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
1288
        }
1289
      }
1290
    }
1291
  }
1292

1293
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
131✔
1294
    SMqClientTopic topic = {0};
68✔
1295
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
68✔
1296
    if (pTopicEp == NULL) {
68✔
1297
      continue;
×
1298
    }
1299
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
68✔
1300
    if (taosArrayPush(newTopics, &topic) == NULL) {
68✔
1301
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
1302
      freeClientTopic(&topic);
×
1303
    }
1304
  }
1305

1306
  taosHashCleanup(pVgOffsetHashMap);
63✔
1307
}
1308

1309
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
658✔
1310
  if (tmq == NULL || pRsp == NULL) {
658✔
1311
    return;
×
1312
  }
1313
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
658✔
1314
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
1315
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
1316
  if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
658✔
1317
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
542✔
1318
             tmq->epoch, epoch, topicNumGet);
1319
    return;
542✔
1320
  }
1321

1322
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
116✔
1323
  if (newTopics == NULL) {
116✔
1324
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
1325
    return;
×
1326
  }
1327
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
116✔
1328
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
1329

1330
  taosWLockLatch(&tmq->lock);
116✔
1331
  if (topicNumGet > 0){
116✔
1332
    buildNewTopicList(tmq, newTopics, pRsp);
63✔
1333
  }
1334
  // destroy current buffered existed topics info
1335
  if (tmq->clientTopics) {
116✔
1336
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
116✔
1337
  }
1338
  tmq->clientTopics = newTopics;
116✔
1339
  taosWUnLockLatch(&tmq->lock);
116✔
1340

1341
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
116✔
1342
  atomic_store_32(&tmq->epoch, epoch);
116✔
1343

1344
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
116✔
1345
}
1346

1347
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
967✔
1348
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
967✔
1349
  if (pParam == NULL) {
967✔
1350
    goto _ERR;
×
1351
  }
1352

1353
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
967✔
1354
  if (tmq == NULL) {
967✔
1355
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1356
    goto _ERR;
×
1357
  }
1358

1359
  if (code != TSDB_CODE_SUCCESS) {
967✔
1360
    if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
307✔
1361
      tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
2✔
1362
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){
2✔
1363
        atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__LOST);
2✔
1364
      }
1365
    }
1366
    goto END;
307✔
1367
  }
1368

1369
  if (pMsg == NULL) {
660✔
1370
    goto END;
×
1371
  }
1372
  SMqRspHead* head = pMsg->pData;
660✔
1373
  int32_t     epoch = atomic_load_32(&tmq->epoch);
660✔
1374
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
660✔
1375
  if (pParam->sync) {
660✔
1376
    SMqAskEpRsp rsp = {0};
156✔
1377
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) {
312✔
1378
      doUpdateLocalEp(tmq, head->epoch, &rsp);
156✔
1379
    }
1380
    tDeleteSMqAskEpRsp(&rsp);
1381
  } else {
1382
    SMqRspWrapper* pWrapper = NULL;
504✔
1383
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
504✔
1384
    if (code) {
504✔
1385
      goto END;
×
1386
    }
1387

1388
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
504✔
1389
    pWrapper->epoch = head->epoch;
504✔
1390
    (void)memcpy(&pWrapper->epRsp, pMsg->pData, sizeof(SMqRspHead));
504✔
1391
    if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->epRsp) == NULL) {
1,008✔
1392
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1393
      taosFreeQitem(pWrapper);
×
1394
    } else {
1395
      code = taosWriteQitem(tmq->mqueue, pWrapper);
504✔
1396
      if (code != 0) {
504✔
1397
        tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
1398
        taosFreeQitem(pWrapper);
×
1399
        tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
1400
      }
1401
    }
1402
  }
1403

1404
  END:
967✔
1405
  {
1406
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
967✔
1407
    if (ret != 0){
967✔
1408
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
1409
    }
1410
  }
1411

1412
  _ERR:
967✔
1413
  if (pParam && pParam->sync) {
967✔
1414
    SAskEpInfo* pInfo = pParam->pParam;
463✔
1415
    if (pInfo) {
463✔
1416
      pInfo->code = code;
463✔
1417
      if (tsem2_post(&pInfo->sem) != 0){
463✔
1418
        tqErrorC("failed to post rsp sem askep cb");
×
1419
      }
1420
    }
1421
  }
1422

1423
  if (pMsg) {
967✔
1424
    taosMemoryFree(pMsg->pEpSet);
967✔
1425
    taosMemoryFree(pMsg->pData);
967✔
1426
  }
1427

1428
  return code;
967✔
1429
}
1430

1431
static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
967✔
1432
  if (pTmq == NULL) {
967✔
1433
    return TSDB_CODE_INVALID_PARA;
×
1434
  }
1435
  int32_t code = 0;
967✔
1436
  int32_t lino = 0;
967✔
1437
  SMqAskEpReq req = {0};
967✔
1438
  req.consumerId = pTmq->consumerId;
967✔
1439
  req.epoch = updateEpSet ? -1 : pTmq->epoch;
967✔
1440
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
967✔
1441
  SMqAskEpCbParam* pParam = NULL;
967✔
1442
  void*            pReq = NULL;
967✔
1443

1444
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
967✔
1445
  TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
967✔
1446
  pReq = taosMemoryCalloc(1, tlen);
967✔
1447
  TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
967✔
1448

1449
  code = tSerializeSMqAskEpReq(pReq, tlen, &req);
967✔
1450
  TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
967✔
1451

1452
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
967✔
1453
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
967✔
1454

1455
  pParam->refId = pTmq->refId;
967✔
1456
  pParam->sync = sync;
967✔
1457
  pParam->pParam = param;
967✔
1458

1459
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
967✔
1460
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
967✔
1461

1462
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
967✔
1463
  sendInfo->requestId = generateRequestId();
967✔
1464
  sendInfo->requestObjRefId = 0;
967✔
1465
  sendInfo->param = pParam;
967✔
1466
  sendInfo->paramFreeFp = taosAutoMemoryFree;
967✔
1467
  sendInfo->fp = askEpCb;
967✔
1468
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
967✔
1469

1470
  pReq = NULL;
967✔
1471
  pParam = NULL;
967✔
1472

1473
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
967✔
1474
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode, QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
967✔
1475
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
967✔
1476

1477
END:
967✔
1478
  if (code != 0) {
967✔
1479
    tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
×
1480
  }
1481
  taosMemoryFree(pReq);
967✔
1482
  taosMemoryFree(pParam);
967✔
1483
  return code;
967✔
1484
}
1485

1486
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
11,004✔
1487
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
11,004✔
1488
  while (1) {
818✔
1489
    int8_t* pTaskType = NULL;
11,822✔
1490
    taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
11,822✔
1491
    if (pTaskType == NULL) {break;}
11,822✔
1492
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
818✔
1493
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
504✔
1494
      int32_t code = askEp(pTmq, NULL, false, false);
504✔
1495
      if (code != 0) {
504✔
1496
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
1497
      }
1498
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
504✔
1499
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
504✔
1500
                              &pTmq->epTimer);
1501
      tqDebugC("reset timer for tmq ask ep:%d", ret);
504✔
1502
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
314✔
1503
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
314✔
1504
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
314✔
1505
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
314✔
1506
               pTmq->autoCommitInterval / 1000.0);
1507
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
314✔
1508
                              &pTmq->commitTimer);
1509
      tqDebugC("reset timer for commit:%d", ret);
314✔
1510
    } else {
1511
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
1512
    }
1513

1514
    taosFreeQitem(pTaskType);
818✔
1515
  }
1516

1517
  return 0;
11,004✔
1518
}
1519

1520
void tmqClearUnhandleMsg(tmq_t* tmq) {
155✔
1521
  if (tmq == NULL) return;
155✔
1522
  while (1) {
86✔
1523
    SMqRspWrapper* rspWrapper = NULL;
241✔
1524
    taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
241✔
1525
    if (rspWrapper == NULL) break;
241✔
1526
    tmqFreeRspWrapper(rspWrapper);
86✔
1527
    taosFreeQitem(rspWrapper);
86✔
1528
  }
1529
}
1530

1531
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
160✔
1532
  if (pMsg) {
160✔
1533
    taosMemoryFreeClear(pMsg->pEpSet);
160✔
1534
  }
1535

1536
  if (param == NULL) {
160✔
1537
    return code;
×
1538
  }
1539

1540
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
160✔
1541
  pParam->rspErr = code;
160✔
1542

1543
  if (tsem2_post(&pParam->rspSem) != 0){
160✔
1544
    tqErrorC("failed to post sem, subscribe cb");
×
1545
  }
1546
  return 0;
160✔
1547
}
1548

1549
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
5✔
1550
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
5✔
1551
  if (*topics == NULL) {
5✔
1552
    *topics = tmq_list_new();
1✔
1553
    if (*topics == NULL) {
1✔
1554
      return terrno;
×
1555
    }
1556
  }
1557
  taosRLockLatch(&tmq->lock);
5✔
1558
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
10✔
1559
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
5✔
1560
    if (topic == NULL) {
5✔
1561
      tqErrorC("topic is null");
×
1562
      continue;
×
1563
    }
1564
    char* tmp = strchr(topic->topicName, '.');
5✔
1565
    if (tmp == NULL) {
5✔
1566
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
1567
      continue;
×
1568
    }
1569
    if (tmq_list_append(*topics, tmp + 1) != 0) {
5✔
1570
      tqErrorC("failed to append topic:%s", tmp + 1);
×
1571
      continue;
×
1572
    }
1573
  }
1574
  taosRUnLockLatch(&tmq->lock);
5✔
1575
  return 0;
5✔
1576
}
1577

1578
void tmqFreeImpl(void* handle) {
56✔
1579
  if (handle == NULL) return;
56✔
1580
  tmq_t*  tmq = (tmq_t*)handle;
56✔
1581
  int64_t id = tmq->consumerId;
56✔
1582

1583
  if (tmq->mqueue) {
56✔
1584
    tmqClearUnhandleMsg(tmq);
56✔
1585
    taosCloseQueue(tmq->mqueue);
56✔
1586
  }
1587

1588
  if (tmq->delayedTask) {
56✔
1589
    taosCloseQueue(tmq->delayedTask);
56✔
1590
  }
1591

1592
  if(tsem2_destroy(&tmq->rspSem) != 0) {
56✔
1593
    tqErrorC("failed to destroy sem in free tmq");
×
1594
  }
1595

1596
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
56✔
1597
  taos_close_internal(tmq->pTscObj);
56✔
1598

1599
  if (tmq->commitTimer) {
56✔
1600
    if (!taosTmrStopA(&tmq->commitTimer)) {
47✔
1601
      tqErrorC("failed to stop commit timer");
14✔
1602
    }
1603
  }
1604
  if (tmq->epTimer) {
56✔
1605
    if (!taosTmrStopA(&tmq->epTimer)) {
52✔
1606
      tqErrorC("failed to stop ep timer");
50✔
1607
    }
1608
  }
1609
  if (tmq->hbLiveTimer) {
56✔
1610
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
56✔
1611
      tqErrorC("failed to stop hb timer");
×
1612
    }
1613
  }
1614
  taosMemoryFree(tmq);
56✔
1615

1616
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
56✔
1617
}
1618

1619
static void tmqMgmtInit(void) {
19✔
1620
  tmqInitRes = 0;
19✔
1621

1622
  if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
19✔
1623
    goto END;
×
1624
  }
1625

1626
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
19✔
1627

1628
  if (tmqMgmt.timer == NULL) {
19✔
1629
    goto END;
×
1630
  }
1631

1632
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
19✔
1633
  if (tmqMgmt.rsetId < 0) {
19✔
1634
    goto END;
×
1635
  }
1636

1637
  return;
19✔
1638
END:
×
1639
  tmqInitRes = terrno;
×
1640
}
1641

1642
void tmqMgmtClose(void) {
414✔
1643
  if (tmqMgmt.timer) {
414✔
1644
    taosTmrCleanUp(tmqMgmt.timer);
19✔
1645
    tmqMgmt.timer = NULL;
19✔
1646
  }
1647

1648
  if (tmqMgmt.rsetId > 0) {
414✔
1649
    (void) taosThreadMutexLock(&tmqMgmt.lock);
19✔
1650
    tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
19✔
1651
    int64_t  refId = 0;
19✔
1652

1653
    while (tmq) {
21✔
1654
      refId = tmq->refId;
2✔
1655
      if (refId == 0) {
2✔
1656
        break;
×
1657
      }
1658
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
2✔
1659
      tmq = taosIterateRef(tmqMgmt.rsetId, refId);
2✔
1660
    }
1661
    taosCloseRef(tmqMgmt.rsetId);
19✔
1662
    tmqMgmt.rsetId = -1;
19✔
1663
    (void)taosThreadMutexUnlock(&tmqMgmt.lock);
19✔
1664
  }
1665
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
414✔
1666
}
414✔
1667

1668
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
58✔
1669
  int32_t code = 0;
58✔
1670

1671
  if (conf == NULL) {
58✔
1672
    SET_ERROR_MSG_TMQ("configure is null")
×
1673
    return NULL;
×
1674
  }
1675
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
58✔
1676
  if (code != 0) {
58✔
1677
    SET_ERROR_MSG_TMQ("tmq init error")
×
1678
    return NULL;
×
1679
  }
1680
  if (tmqInitRes != 0) {
58✔
1681
    SET_ERROR_MSG_TMQ("tmq timer init error")
×
1682
    return NULL;
×
1683
  }
1684

1685
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
58✔
1686
  if (pTmq == NULL) {
58✔
1687
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1688
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
1689
    return NULL;
×
1690
  }
1691

1692
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
58✔
1693
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
58✔
1694

1695
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
58✔
1696
  if (pTmq->clientTopics == NULL) {
58✔
1697
    tqErrorC("failed to create consumer, groupId:%s", conf->groupId);
×
1698
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
1699
    goto _failed;
×
1700
  }
1701
  code = taosOpenQueue(&pTmq->mqueue);
58✔
1702
  if (code) {
58✔
1703
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1704
             pTmq->groupId);
1705
    SET_ERROR_MSG_TMQ("open queue failed")
×
1706
    goto _failed;
×
1707
  }
1708

1709
  code = taosOpenQueue(&pTmq->delayedTask);
58✔
1710
  if (code) {
58✔
1711
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1712
             pTmq->groupId);
1713
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
1714
    goto _failed;
×
1715
  }
1716

1717
  if (conf->groupId[0] == 0) {
58✔
1718
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
×
1719
             pTmq->groupId);
1720
    SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
×
1721
    goto _failed;
×
1722
  }
1723

1724
  // init status
1725
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
58✔
1726
  pTmq->pollCnt = 0;
58✔
1727
  pTmq->epoch = 0;
58✔
1728
  pTmq->pollFlag = 0;
58✔
1729

1730
  // set conf
1731
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
58✔
1732
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
58✔
1733
  pTmq->withTbName = conf->withTbName;
58✔
1734
  pTmq->useSnapshot = conf->snapEnable;
58✔
1735
  pTmq->autoCommit = conf->autoCommit;
58✔
1736
  pTmq->autoCommitInterval = conf->autoCommitInterval;
58✔
1737
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
58✔
1738
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
58✔
1739
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
58✔
1740
  pTmq->commitCb = conf->commitCb;
58✔
1741
  pTmq->commitCbUserParam = conf->commitCbUserParam;
58✔
1742
  pTmq->resetOffsetCfg = conf->resetOffset;
58✔
1743
  pTmq->replayEnable = conf->replayEnable;
58✔
1744
  pTmq->sourceExcluded = conf->sourceExcluded;
58✔
1745
  pTmq->rawData = conf->rawData;
58✔
1746
  pTmq->maxPollWaitTime = conf->maxPollWaitTime;
58✔
1747
  pTmq->minPollRows = conf->minPollRows;
58✔
1748
  pTmq->enableBatchMeta = conf->enableBatchMeta;
58✔
1749
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
58✔
1750
  if (taosGetFqdn(pTmq->fqdn) != 0) {
58✔
1751
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
1752
  }
1753
  if (conf->replayEnable) {
58✔
1754
    pTmq->autoCommit = false;
×
1755
  }
1756
  taosInitRWLatch(&pTmq->lock);
58✔
1757

1758
  // assign consumerId
1759
  pTmq->consumerId = tGenIdPI64();
58✔
1760

1761
  // init semaphore
1762
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
58✔
1763
    tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId,
×
1764
             tstrerror(TAOS_SYSTEM_ERROR(ERRNO)), pTmq->groupId);
1765
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
1766
    goto _failed;
×
1767
  }
1768

1769
  // init connection
1770
  code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
58✔
1771
  if (code) {
58✔
1772
    terrno = code;
×
1773
    tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
1774
    SET_ERROR_MSG_TMQ("init tscObj failed")
×
1775
    goto _failed;
×
1776
  }
1777

1778
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
58✔
1779
  if (pTmq->refId < 0) {
58✔
1780
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
1781
    goto _failed;
×
1782
  }
1783

1784
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
58✔
1785
  if (pTmq->hbLiveTimer == NULL) {
58✔
1786
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
1787
    goto _failed;
×
1788
  }
1789
  char         buf[TSDB_OFFSET_LEN] = {0};
58✔
1790
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
58✔
1791
  tFormatOffset(buf, tListLen(buf), &offset);
58✔
1792
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
58✔
1793
              ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
1794
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
1795
          buf);
1796

1797
  return pTmq;
58✔
1798

1799
  _failed:
×
1800
  tmqFreeImpl(pTmq);
×
1801
  return NULL;
×
1802
}
1803

1804
static int32_t syncAskEp(tmq_t* pTmq) {
463✔
1805
  if (pTmq == NULL) return TSDB_CODE_INVALID_PARA;
463✔
1806
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
463✔
1807
  if (pInfo == NULL) return terrno;
463✔
1808
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
463✔
1809
    taosMemoryFree(pInfo);
×
1810
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1811
  }
1812

1813
  int32_t code = askEp(pTmq, pInfo, true, false);
463✔
1814
  if (code == 0) {
463✔
1815
    if (tsem2_wait(&pInfo->sem) != 0){
463✔
1816
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
1817
    }
1818
    code = pInfo->code;
463✔
1819
  }
1820

1821
  if(tsem2_destroy(&pInfo->sem) != 0) {
463✔
1822
    tqErrorC("failed to destroy sem sync ask ep");
×
1823
  }
1824
  taosMemoryFree(pInfo);
463✔
1825
  return code;
463✔
1826
}
1827

1828
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
160✔
1829
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
160✔
1830
  const SArray*   container = &topic_list->container;
160✔
1831
  int32_t         sz = taosArrayGetSize(container);
160✔
1832
  void*           buf = NULL;
160✔
1833
  SMsgSendInfo*   sendInfo = NULL;
160✔
1834
  SCMSubscribeReq req = {0};
160✔
1835
  int32_t         code = 0;
160✔
1836

1837
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
160✔
1838

1839
  req.consumerId = tmq->consumerId;
160✔
1840
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
160✔
1841
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
160✔
1842
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
160✔
1843
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
160✔
1844

1845
  req.topicNames = taosArrayInit(sz, sizeof(void*));
160✔
1846
  if (req.topicNames == NULL) {
160✔
1847
    code = terrno;
×
1848
    goto END;
×
1849
  }
1850

1851
  req.withTbName = tmq->withTbName;
160✔
1852
  req.autoCommit = tmq->autoCommit;
160✔
1853
  req.autoCommitInterval = tmq->autoCommitInterval;
160✔
1854
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
160✔
1855
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
160✔
1856
  req.resetOffsetCfg = tmq->resetOffsetCfg;
160✔
1857
  req.enableReplay = tmq->replayEnable;
160✔
1858
  req.enableBatchMeta = tmq->enableBatchMeta;
160✔
1859

1860
  for (int32_t i = 0; i < sz; i++) {
225✔
1861
    char* topic = taosArrayGetP(container, i);
65✔
1862
    if (topic == NULL) {
65✔
1863
      code = terrno;
×
1864
      goto END;
×
1865
    }
1866
    SName name = {0};
65✔
1867
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
65✔
1868
    if (code) {
65✔
1869
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1870
               code);
1871
      goto END;
×
1872
    }
1873
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
65✔
1874
    if (topicFName == NULL) {
65✔
1875
      code = terrno;
×
1876
      goto END;
×
1877
    }
1878

1879
    code = tNameExtractFullName(&name, topicFName);
65✔
1880
    if (code) {
65✔
1881
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1882
               code);
1883
      taosMemoryFree(topicFName);
×
1884
      goto END;
×
1885
    }
1886

1887
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
130✔
1888
      code = terrno;
×
1889
      taosMemoryFree(topicFName);
×
1890
      goto END;
×
1891
    }
1892
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
65✔
1893
  }
1894

1895
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
160✔
1896
  buf = taosMemoryMalloc(tlen);
160✔
1897
  if (buf == NULL) {
160✔
1898
    code = terrno;
×
1899
    goto END;
×
1900
  }
1901

1902
  void* abuf = buf;
160✔
1903
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
160✔
1904

1905
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
160✔
1906
  if (sendInfo == NULL) {
160✔
1907
    code = terrno;
×
1908
    taosMemoryFree(buf);
×
1909
    goto END;
×
1910
  }
1911

1912
  SMqSubscribeCbParam param = {.rspErr = 0};
160✔
1913
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
160✔
1914
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1915
    taosMemoryFree(buf);
×
1916
    taosMemoryFree(sendInfo);
×
1917
    goto END;
×
1918
  }
1919

1920
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
160✔
1921
  sendInfo->requestId = generateRequestId();
160✔
1922
  sendInfo->requestObjRefId = 0;
160✔
1923
  sendInfo->param = &param;
160✔
1924
  sendInfo->fp = tmqSubscribeCb;
160✔
1925
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
160✔
1926

1927
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
160✔
1928

1929
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
160✔
1930
  if (code != 0) {
160✔
1931
    goto END;
×
1932
  }
1933

1934
  if (tsem2_wait(&param.rspSem) != 0){
160✔
1935
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1936
  }
1937
  if(tsem2_destroy(&param.rspSem) != 0) {
160✔
1938
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1939
  }
1940

1941
  if (param.rspErr != 0) {
160✔
1942
    code = param.rspErr;
5✔
1943
    goto END;
5✔
1944
  }
1945

1946
  int32_t retryCnt = 0;
155✔
1947
  while ((code = syncAskEp(tmq)) != 0) {
460✔
1948
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
307✔
1949
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
2✔
1950
               tmq->consumerId, tstrerror(code));
1951
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2✔
1952
        code = 0;
2✔
1953
      }
1954
      goto END;
2✔
1955
    }
1956

1957
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
305✔
1958
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
305✔
1959
  }
1960

1961
  if (tmq->epTimer == NULL){
153✔
1962
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
53✔
1963
    if (tmq->epTimer == NULL) {
53✔
1964
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1965
      goto END;
×
1966
    }
1967
  }
1968
  if (tmq->autoCommit && tmq->commitTimer == NULL){
153✔
1969
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
47✔
1970
    if (tmq->commitTimer == NULL) {
47✔
1971
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1972
      goto END;
×
1973
    }
1974
  }
1975

1976
  END:
153✔
1977
  taosArrayDestroyP(req.topicNames, NULL);
160✔
1978
  return code;
160✔
1979
}
1980

1981
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
11✔
1982
  if (conf == NULL) return;
11✔
1983
  conf->commitCb = cb;
11✔
1984
  conf->commitCbUserParam = param;
11✔
1985
}
1986

1987
static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
5,979✔
1988
  if (tmq == NULL || topicName == NULL || pVg == NULL) {
5,979✔
1989
    return;
×
1990
  }
1991
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
5,979✔
1992
  for (int i = 0; i < topicNumCur; i++) {
8,752✔
1993
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
8,752✔
1994
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
8,752✔
1995
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
5,979✔
1996
      for (int32_t j = 0; j < vgNumCur; j++) {
10,944✔
1997
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
10,944✔
1998
        if (pVgCur && pVgCur->vgId == vgId) {
10,944✔
1999
          *pVg = pVgCur;
5,979✔
2000
          return;
5,979✔
2001
        }
2002
      }
2003
    }
2004
  }
2005
}
2006

2007
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
5,976✔
2008
  if (tmq == NULL || topicName == NULL) {
5,976✔
2009
    return NULL;
×
2010
  }
2011
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
5,976✔
2012
  for (int i = 0; i < topicNumCur; i++) {
8,748✔
2013
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
8,748✔
2014
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
8,748✔
2015
      return pTopicCur;
5,976✔
2016
    }
2017
  }
2018
  return NULL;
×
2019
}
2020

2021
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
6,064✔
2022
  tmq_t*             tmq = NULL;
6,064✔
2023
  SMqRspWrapper*     pRspWrapper = NULL;
6,064✔
2024
  int8_t             rspType = 0;
6,064✔
2025
  int32_t            vgId = 0;
6,064✔
2026
  uint64_t           requestId = 0;
6,064✔
2027
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
6,064✔
2028
  if (pMsg == NULL) {
6,064✔
2029
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2030
  }
2031
  if (pParam == NULL) {
6,064✔
2032
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2033
    goto EXIT;
×
2034
  }
2035
  int64_t refId = pParam->refId;
6,064✔
2036
  vgId = pParam->vgId;
6,064✔
2037
  requestId = pParam->requestId;
6,064✔
2038
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
6,064✔
2039
  if (tmq == NULL) {
6,064✔
2040
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
2041
    goto EXIT;
×
2042
  }
2043

2044
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
6,064✔
2045
  if (ret) {
6,064✔
2046
    code = ret;
×
2047
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
×
2048
    goto END;
×
2049
  }
2050

2051
  if (code != 0) {
6,064✔
2052
    goto END;
×
2053
  }
2054

2055
  if (pMsg->pData == NULL) {
6,064✔
2056
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
2057
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2058
    goto END;
×
2059
  }
2060

2061
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
6,064✔
2062
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
6,064✔
2063

2064
  if (msgEpoch != clientEpoch) {
6,064✔
2065
    tqErrorC("consumer:0x%" PRIx64
5✔
2066
                 " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
2067
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
2068
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
5✔
2069
    goto END;
5✔
2070
  }
2071
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
6,059✔
2072
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s), QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
6,059✔
2073

2074
  pRspWrapper->tmqRspType = rspType;
6,059✔
2075
  pRspWrapper->pollRsp.reqId = requestId;
6,059✔
2076
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
6,059✔
2077
  pRspWrapper->pollRsp.data = pMsg->pData;
6,059✔
2078
  pRspWrapper->pollRsp.len = pMsg->len;
6,059✔
2079
  pMsg->pData = NULL;
6,059✔
2080
  pMsg->pEpSet = NULL;
6,059✔
2081

2082
  END:
6,064✔
2083
  if (pRspWrapper) {
6,064✔
2084
    pRspWrapper->code = code;
6,064✔
2085
    pRspWrapper->pollRsp.vgId = vgId;
6,064✔
2086
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
6,064✔
2087
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
6,064✔
2088
    if (code != 0) {
6,064✔
2089
      tmqFreeRspWrapper(pRspWrapper);
×
2090
      taosFreeQitem(pRspWrapper);
×
2091
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
2092
    } else {
2093
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d, QID:0x%" PRIx64,
6,064✔
2094
               tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
2095
    }
2096
  }
2097

2098
  if (tsem2_post(&tmq->rspSem) != 0){
6,064✔
2099
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
2100
  }
2101
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
6,064✔
2102
  if (ret != 0){
6,064✔
2103
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
2104
  }
2105

2106
  EXIT:
6,064✔
2107
  taosMemoryFreeClear(pMsg->pData);
6,064✔
2108
  taosMemoryFreeClear(pMsg->pEpSet);
6,064✔
2109
  return code;
6,064✔
2110
}
2111

2112
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
6,076✔
2113
  if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) {
6,076✔
2114
    return;
×
2115
  }
2116
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
6,076✔
2117
  pReq->withTbName = tmq->withTbName;
6,076✔
2118
  pReq->consumerId = tmq->consumerId;
6,076✔
2119
  pReq->timeout = tmq->maxPollWaitTime;
6,076✔
2120
  pReq->minPollRows = tmq->minPollRows;
6,076✔
2121
  pReq->epoch = tmq->epoch;
6,076✔
2122
  pReq->reqOffset = pVg->offsetInfo.endOffset;
6,076✔
2123
  pReq->head.vgId = pVg->vgId;
6,076✔
2124
  pReq->useSnapshot = tmq->useSnapshot;
6,076✔
2125
  pReq->reqId = generateRequestId();
6,076✔
2126
  pReq->enableReplay = tmq->replayEnable;
6,076✔
2127
  pReq->sourceExcluded = tmq->sourceExcluded;
6,076✔
2128
  pReq->rawData = tmq->rawData;
6,076✔
2129
  pReq->enableBatchMeta = tmq->enableBatchMeta;
6,076✔
2130
}
2131

2132
void changeByteEndian(char* pData) {
39,906✔
2133
  if (pData == NULL) {
39,906✔
2134
    return;
×
2135
  }
2136
  char* p = pData;
39,906✔
2137

2138
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2139
  // length | version:
2140
  int32_t blockVersion = *(int32_t*)p;
39,906✔
2141
  if (blockVersion != BLOCK_VERSION_1) {
39,906✔
2142
    tqErrorC("invalid block version:%d", blockVersion);
×
2143
    return;
×
2144
  }
2145
  *(int32_t*)p = BLOCK_VERSION_2;
39,906✔
2146

2147
  p += sizeof(int32_t);
39,906✔
2148
  p += sizeof(int32_t);
39,906✔
2149
  p += sizeof(int32_t);
39,906✔
2150
  int32_t cols = *(int32_t*)p;
39,906✔
2151
  p += sizeof(int32_t);
39,906✔
2152
  p += sizeof(int32_t);
39,906✔
2153
  p += sizeof(uint64_t);
39,906✔
2154
  // check fields
2155
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
39,906✔
2156

2157
  int32_t* colLength = (int32_t*)p;
39,906✔
2158
  for (int32_t i = 0; i < cols; ++i) {
186,331✔
2159
    colLength[i] = htonl(colLength[i]);
146,425✔
2160
  }
2161
}
2162

2163
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
79,749✔
2164
  if (pRetrieve == NULL || rawData == NULL || rows == NULL) {
79,749✔
2165
    return;
×
2166
  }
2167
  if (*(int64_t*)pRetrieve == 0) {
79,749✔
2168
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2169
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
2170
    if (precision != NULL) {
×
2171
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
2172
    }
2173
  } else if (*(int64_t*)pRetrieve == 1) {
79,749✔
2174
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
79,749✔
2175
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
79,749✔
2176
    if (precision != NULL) {
79,749✔
2177
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
39,843✔
2178
    }
2179
  }
2180
}
2181

2182
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
3,784✔
2183
                                        SMqRspObj* pRspObj) {
2184
  if (pWrapper == NULL || pVg == NULL || numOfRows == NULL || pRspObj == NULL) {
3,784✔
2185
    return;
×
2186
  }
2187
  pRspObj->resIter = -1;
3,784✔
2188
  pRspObj->resInfo.totalRows = 0;
3,784✔
2189
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
3,784✔
2190

2191
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
3,784✔
2192
  bool needTransformSchema = !pDataRsp->withSchema;
3,784✔
2193
  if (!pDataRsp->withSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
3,784✔
2194
    pDataRsp->withSchema = true;
3,783✔
2195
    pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
3,783✔
2196
    if (pDataRsp->blockSchema == NULL) {
3,783✔
2197
      tqErrorC("failed to allocate memory for blockSchema");
×
2198
      return;
×
2199
    }
2200
  }
2201
  // extract the rows in this data packet
2202
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
43,690✔
2203
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
39,906✔
2204
    void*   rawData = NULL;
39,906✔
2205
    int64_t rows = 0;
39,906✔
2206
    // deal with compatibility
2207
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
39,906✔
2208

2209
    pVg->numOfRows += rows;
39,906✔
2210
    (*numOfRows) += rows;
39,906✔
2211
    changeByteEndian(rawData);
39,906✔
2212
    if (needTransformSchema) {  // withSchema is false if subscribe subquery, true if subscribe db or stable
39,906✔
2213
      SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
39,905✔
2214
      if (schema) {
39,905✔
2215
        if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
79,810✔
2216
          tqErrorC("failed to push schema into blockSchema");
×
2217
          continue;
×
2218
        }
2219
      }
2220
    }
2221
  }
2222
}
2223

2224
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
6,064✔
2225
  SMqPollReq      req = {0};
6,064✔
2226
  char*           msg = NULL;
6,064✔
2227
  SMqPollCbParam* pParam = NULL;
6,064✔
2228
  SMsgSendInfo*   sendInfo = NULL;
6,064✔
2229
  int             code = 0;
6,064✔
2230
  int             lino = 0;
6,064✔
2231
  tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg);
6,064✔
2232

2233
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
6,064✔
2234
  TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
6,064✔
2235

2236
  msg = taosMemoryCalloc(1, msgSize);
6,064✔
2237
  TSDB_CHECK_NULL(msg, code, lino, END, terrno);
6,064✔
2238

2239
  TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
6,064✔
2240

2241
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
6,064✔
2242
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
6,064✔
2243

2244
  pParam->refId = pTmq->refId;
6,064✔
2245
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
6,064✔
2246
  pParam->vgId = pVg->vgId;
6,064✔
2247
  pParam->requestId = req.reqId;
6,064✔
2248

2249
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
6,064✔
2250
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
6,064✔
2251

2252
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
6,064✔
2253
  sendInfo->requestId = req.reqId;
6,064✔
2254
  sendInfo->requestObjRefId = 0;
6,064✔
2255
  sendInfo->param = pParam;
6,064✔
2256
  sendInfo->paramFreeFp = taosAutoMemoryFree;
6,064✔
2257
  sendInfo->fp = tmqPollCb;
6,064✔
2258
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
6,064✔
2259

2260
  msg = NULL;
6,064✔
2261
  pParam = NULL;
6,064✔
2262

2263
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
6,064✔
2264
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
6,064✔
2265
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
6,064✔
2266
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, QID:0x%" PRIx64, pTmq->consumerId,
6,064✔
2267
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
2268
  TSDB_CHECK_CODE(code, lino, END);
6,064✔
2269

2270
  pVg->pollCnt++;
6,064✔
2271
  pVg->seekUpdated = false;  // reset this flag.
6,064✔
2272
  pTmq->pollCnt++;
6,064✔
2273

2274
END:
6,064✔
2275
  if (code != 0){
6,064✔
2276
    tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
×
2277
  }
2278
  taosMemoryFreeClear(pParam);
6,064✔
2279
  taosMemoryFreeClear(msg);
6,064✔
2280
  return code;
6,064✔
2281
}
2282

2283
static int32_t tmqPollImpl(tmq_t* tmq) {
7,220✔
2284
  if (tmq == NULL) {
7,220✔
2285
    return TSDB_CODE_INVALID_MSG;
×
2286
  }
2287
  int32_t code = 0;
7,220✔
2288
  taosWLockLatch(&tmq->lock);
7,220✔
2289

2290
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){
7,220✔
2291
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
2292
    goto end;
×
2293
  }
2294

2295
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
7,220✔
2296
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
7,220✔
2297

2298
  for (int i = 0; i < numOfTopics; i++) {
17,355✔
2299
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
10,135✔
2300
    if (pTopic == NULL) {
10,135✔
2301
      continue;
×
2302
    }
2303
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
10,135✔
2304
    if (pTopic->noPrivilege) {
10,135✔
2305
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
×
2306
      continue;
×
2307
    }
2308
    for (int j = 0; j < numOfVg; j++) {
37,765✔
2309
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
27,630✔
2310
      if (pVg == NULL) {
27,630✔
2311
        continue;
×
2312
      }
2313
      int64_t elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
27,630✔
2314
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
27,630✔
2315
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
×
2316
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
2317
        continue;
×
2318
      }
2319

2320
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
27,630✔
2321
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
27,630✔
2322
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
21,566✔
2323
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
21,566✔
2324
                 pVg->vgId, vgSkipCnt);
2325
        continue;
21,566✔
2326
      }
2327

2328
      atomic_store_32(&pVg->vgSkipCnt, 0);
6,064✔
2329
      code = doTmqPollImpl(tmq, pTopic, pVg);
6,064✔
2330
      if (code != TSDB_CODE_SUCCESS) {
6,064✔
2331
        goto end;
×
2332
      }
2333
    }
2334
  }
2335

2336
  end:
7,220✔
2337
  taosWUnLockLatch(&tmq->lock);
7,220✔
2338
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
7,220✔
2339
  return code;
7,220✔
2340
}
2341

2342
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
5,976✔
2343
                         int64_t consumerId, bool hasData) {
2344
  if (pVg == NULL || reqOffset == NULL || rspOffset == NULL) {
5,976✔
2345
    return;
×
2346
  }
2347
  if (!pVg->seekUpdated) {
5,976✔
2348
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
5,969✔
2349
    if (hasData) {
5,969✔
2350
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
3,784✔
2351
    }
2352
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
5,969✔
2353
  } else {
2354
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
7✔
2355
  }
2356

2357
  // update the status
2358
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
5,976✔
2359

2360
  // update the valid wal version range
2361
  pVg->offsetInfo.walVerBegin = sver;
5,976✔
2362
  pVg->offsetInfo.walVerEnd = ever + 1;
5,976✔
2363
}
2364

2365
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
3,784✔
2366
  typedef union {
2367
    SMqDataRsp      dataRsp;
2368
    SMqMetaRsp      metaRsp;
2369
    SMqBatchMetaRsp batchMetaRsp;
2370
  } MEMSIZE;
2371

2372
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
3,784✔
2373
  if (pRspObj == NULL) {
3,784✔
2374
    tqErrorC("buildRsp:failed to allocate memory");
×
2375
    return NULL;
×
2376
  }
2377
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
3,784✔
2378
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
3,784✔
2379
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
3,784✔
2380
  pRspObj->vgId = pollRspWrapper->vgId;
3,784✔
2381
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
3,784✔
2382
  return pRspObj;
3,784✔
2383
}
2384

2385
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
3✔
2386
  int32_t code = 0;
3✔
2387
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
3✔
2388

2389
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {  // for vnode transform
3✔
2390
    code = askEp(tmq, NULL, false, true);
×
2391
    if (code != 0) {
×
2392
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep wher vnode transform, code:%s", tmq->consumerId, tstrerror(code));
×
2393
    }
2394
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
3✔
2395
    code = syncAskEp(tmq);
3✔
2396
    if (code != 0) {
3✔
2397
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
×
2398
    }
2399
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
×
2400
    code = 0;
×
2401
  }
2402
  tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
3✔
2403
          tstrerror(pRspWrapper->code));
2404
  taosWLockLatch(&tmq->lock);
3✔
2405
  SMqClientVg* pVg = NULL;
3✔
2406
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
3✔
2407
  if (pVg) {
3✔
2408
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
3✔
2409
  }
2410
  taosWUnLockLatch(&tmq->lock);
3✔
2411

2412
  return code;
3✔
2413
}
2414

2415
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
5,976✔
2416
  int32_t code = 0;
5,976✔
2417
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
5,976✔
2418
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
5,976✔
2419
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
5,976✔
2420
    pRspWrapper->pollRsp.data = NULL;
5,976✔
2421
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
×
2422
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
×
2423
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
×
2424
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
×
2425
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
×
2426
    pRspWrapper->pollRsp.data = NULL;
×
2427
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
×
2428
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
×
2429
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
2430
    PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
×
2431
    pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
×
2432
    pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
×
2433
    pRspWrapper->pollRsp.data = NULL;
×
2434
  } else {
2435
    tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
×
2436
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
2437
    goto END;
×
2438
  }
2439
  END:
5,976✔
2440
  return code;
5,976✔
2441
}
2442

2443
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
6,478✔
2444
  int32_t    code = 0;
6,478✔
2445
  SMqRspObj* pRspObj = NULL;
6,478✔
2446

2447
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
6,478✔
2448
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
502✔
2449
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
502✔
2450
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
502✔
2451
    terrno = code;
502✔
2452
    return pRspObj;
502✔
2453
  }
2454

2455
  code = processWrapperData(pRspWrapper);
5,976✔
2456
  if (code != 0) {
5,976✔
2457
    goto END;
×
2458
  }
2459
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
5,976✔
2460
  taosWLockLatch(&tmq->lock);
5,976✔
2461
  SMqClientVg* pVg = NULL;
5,976✔
2462
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
5,976✔
2463
  if(pVg == NULL) {
5,976✔
2464
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
×
2465
             pollRspWrapper->topicName, pollRspWrapper->vgId);
2466
    code = TSDB_CODE_TMQ_INVALID_VGID;
×
2467
    goto END;
×
2468
  }
2469
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
5,976✔
2470
  if (pollRspWrapper->pEpset != NULL) {
5,976✔
2471
    pVg->epSet = *pollRspWrapper->pEpset;
18✔
2472
  }
2473

2474
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
5,976✔
2475
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
×
2476
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
5,976✔
2477
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
5,976✔
2478
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
5,976✔
2479

2480
    char buf[TSDB_OFFSET_LEN] = {0};
5,976✔
2481
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
5,976✔
2482
    if (pollRspWrapper->dataRsp.blockNum == 0) {
5,976✔
2483
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
2,192✔
2484
                   ", total:%" PRId64 ", QID:0x%" PRIx64,
2485
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
2486
    } else {
2487
      pRspObj = buildRsp(pollRspWrapper);
3,784✔
2488
      if (pRspObj == NULL) {
3,784✔
2489
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2490
        goto END;
×
2491
      }
2492
      pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
7,568✔
2493
                         (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
3,784✔
2494
      int64_t numOfRows = 0;
3,784✔
2495
      if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
3,784✔
2496
        tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
3,784✔
2497
        tmq->totalRows += numOfRows;
3,784✔
2498
      }
2499
      if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
3,784✔
2500
        pVg->blockReceiveTs = taosGetTimestampMs();
×
2501
        pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
×
2502
        if (pVg->blockSleepForReplay > 0) {
×
2503
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
×
2504
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
2505
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
2506
          }
2507
        }
2508
      }
2509
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
3,784✔
2510
                   ", vg total:%" PRId64 ", total:%" PRId64 ", QID:0x%" PRIx64,
2511
               tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
2512
               pollRspWrapper->reqId);
2513
    }
2514
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
×
2515
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
×
2516
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
2517

2518

2519
    pRspObj = buildRsp(pollRspWrapper);
×
2520
    if (pRspObj == NULL) {
×
2521
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
2522
      goto END;
×
2523
    }
2524
    pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
×
2525
  }
2526

2527
END:
×
2528
  terrno = code;
5,976✔
2529
  taosWUnLockLatch(&tmq->lock);
5,976✔
2530
  return pRspObj;
5,976✔
2531
}
2532

2533
static void* tmqHandleAllRsp(tmq_t* tmq) {
11,004✔
2534
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
11,004✔
2535

2536
  int32_t code = 0;
11,004✔
2537
  void* returnVal = NULL;
11,004✔
2538
  while (1) {
2,697✔
2539
    SMqRspWrapper* pRspWrapper = NULL;
13,701✔
2540
    taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
13,701✔
2541
    if (pRspWrapper == NULL) {break;}
17,485✔
2542

2543
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
6,481✔
2544
    if (pRspWrapper->code != 0) {
6,481✔
2545
      code = processMqRspError(tmq, pRspWrapper);
3✔
2546
    }else{
2547
      returnVal = processMqRsp(tmq, pRspWrapper);
6,478✔
2548
      code = terrno;
6,478✔
2549
    }
2550

2551
    tmqFreeRspWrapper(pRspWrapper);
6,481✔
2552
    taosFreeQitem(pRspWrapper);
6,481✔
2553
    if(returnVal != NULL || code != 0){
6,481✔
2554
      break;
2555
    }
2556
  }
2557

2558
END:
11,004✔
2559
  terrno = code;
11,004✔
2560
  return returnVal;
11,004✔
2561
}
2562

2563
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
4,058✔
2564
  int32_t lino = 0;
4,058✔
2565
  int32_t code = 0;
4,058✔
2566
  TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
4,058✔
2567

2568
  void*   rspObj = NULL;
4,058✔
2569
  int64_t startTime = taosGetTimestampMs();
4,058✔
2570

2571
  tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
4,058✔
2572
  TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
4,058✔
2573

2574
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
4,056✔
2575

2576
  while (1) {
2577
    code = tmqHandleAllDelayedTask(tmq);
11,004✔
2578
    TSDB_CHECK_CODE(code, lino, END);
11,004✔
2579

2580
    rspObj = tmqHandleAllRsp(tmq);
11,004✔
2581
    if (rspObj) {
11,004✔
2582
      tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
3,784✔
2583
      return (TAOS_RES*)rspObj;
3,784✔
2584
    }
2585
    code = terrno;
7,220✔
2586
    TSDB_CHECK_CODE(code, lino, END);
7,220✔
2587

2588
    code = tmqPollImpl(tmq);
7,220✔
2589
    TSDB_CHECK_CODE(code, lino, END);
7,220✔
2590

2591
    if (timeout >= 0) {
7,220✔
2592
      int64_t currentTime = taosGetTimestampMs();
7,220✔
2593
      int64_t elapsedTime = currentTime - startTime;
7,220✔
2594
      (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
7,220✔
2595
      TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
7,220✔
2596
    } else {
2597
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
2598
    }
2599
  }
2600

2601
END:
274✔
2602
  terrno = code;
274✔
2603
  if (tmq != NULL && terrno != 0) {
274✔
2604
    tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
2✔
2605
  }
2606
  return NULL;
274✔
2607
}
2608

2609
static void displayConsumeStatistics(tmq_t* pTmq) {
103✔
2610
  if (pTmq == NULL) return;
103✔
2611
  taosRLockLatch(&pTmq->lock);
103✔
2612
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
103✔
2613
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
103✔
2614
          pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
2615

2616
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
103✔
2617
  for (int32_t i = 0; i < numOfTopics; ++i) {
162✔
2618
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
59✔
2619
    if (pTopics == NULL) continue;
59✔
2620
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
59✔
2621
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
59✔
2622
    for (int32_t j = 0; j < numOfVgs; ++j) {
302✔
2623
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
243✔
2624
      if (pVg == NULL) continue;
243✔
2625
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
243✔
2626
    }
2627
  }
2628
  taosRUnLockLatch(&pTmq->lock);
103✔
2629
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
103✔
2630
}
2631

2632
int32_t tmq_unsubscribe(tmq_t* tmq) {
103✔
2633
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
103✔
2634
  int32_t code = 0;
103✔
2635
  int8_t status = atomic_load_8(&tmq->status);
103✔
2636
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
103✔
2637

2638
  displayConsumeStatistics(tmq);
103✔
2639
  if (status != TMQ_CONSUMER_STATUS__READY && status != TMQ_CONSUMER_STATUS__LOST) {
103✔
2640
    tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
4✔
2641
    goto END;
4✔
2642
  }
2643
  if (tmq->autoCommit) {
99✔
2644
    code = tmq_commit_sync(tmq, NULL);
91✔
2645
    if (code != 0) {
91✔
2646
      goto END;
×
2647
    }
2648
  }
2649
  tmqSendHbReq((void*)(tmq->refId), NULL);
99✔
2650

2651
  tmq_list_t* lst = tmq_list_new();
99✔
2652
  if (lst == NULL) {
99✔
2653
    code = terrno;
×
2654
    goto END;
×
2655
  }
2656
  code = tmq_subscribe(tmq, lst);
99✔
2657
  tmq_list_destroy(lst);
99✔
2658
  tmqClearUnhandleMsg(tmq);
99✔
2659
  if(code != 0){
99✔
2660
    goto END;
×
2661
  }
2662

2663
  END:
99✔
2664
  return code;
103✔
2665
}
2666

2667
int32_t tmq_consumer_close(tmq_t* tmq) {
56✔
2668
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
56✔
2669
  int32_t code = 0;
56✔
2670
  (void) taosThreadMutexLock(&tmqMgmt.lock);
56✔
2671
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
56✔
2672
    goto end;
×
2673
  }
2674
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
56✔
2675
  code = tmq_unsubscribe(tmq);
56✔
2676
  if (code == 0) {
56✔
2677
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
56✔
2678
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
56✔
2679
    if (code != 0){
56✔
2680
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
2681
    }
2682
  }
2683

2684
end:
56✔
2685
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
56✔
2686
  return code;
56✔
2687
}
2688

2689
const char* tmq_err2str(int32_t err) {
5✔
2690
  if (err == 0) {
5✔
2691
    return "success";
2✔
2692
  } else if (err == -1) {
3✔
2693
    return "fail";
×
2694
  } else {
2695
    if (*(taosGetErrMsg()) == 0) {
3✔
2696
      return tstrerror(err);
3✔
2697
    } else {
2698
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
×
2699
      return (const char*)taosGetErrMsgReturn();
×
2700
    }
2701
  }
2702
}
2703

2704
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
3,526✔
2705
  if (res == NULL) {
3,526✔
2706
    return TMQ_RES_INVALID;
×
2707
  }
2708
  if (TD_RES_TMQ(res)) {
3,526✔
2709
    return TMQ_RES_DATA;
3,526✔
2710
  } else if (TD_RES_TMQ_META(res)) {
×
2711
    return TMQ_RES_TABLE_META;
×
2712
  } else if (TD_RES_TMQ_METADATA(res)) {
×
2713
    return TMQ_RES_METADATA;
×
2714
  } else if (TD_RES_TMQ_BATCH_META(res)) {
×
2715
    return TMQ_RES_TABLE_META;
×
2716
  } else if (TD_RES_TMQ_RAW(res)) {
×
2717
    return TMQ_RES_RAWDATA;
×
2718
  } else {
2719
    return TMQ_RES_INVALID;
×
2720
  }
2721
}
2722

2723
const char* tmq_get_topic_name(TAOS_RES* res) {
3,214✔
2724
  if (res == NULL) {
3,214✔
2725
    return NULL;
×
2726
  }
2727
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
3,214✔
2728
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
2729
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
3,214✔
2730
    if (tmp == NULL) {
3,214✔
2731
      return NULL;
×
2732
    }
2733
    return tmp + 1;
3,214✔
2734
  } else {
2735
    return NULL;
×
2736
  }
2737
}
2738

2739
const char* tmq_get_db_name(TAOS_RES* res) {
3,207✔
2740
  if (res == NULL) {
3,207✔
2741
    return NULL;
×
2742
  }
2743

2744
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
3,207✔
2745
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
×
2746
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
3,207✔
2747
    if (tmp == NULL) {
3,207✔
2748
      return NULL;
×
2749
    }
2750
    return tmp + 1;
3,207✔
2751
  } else {
2752
    return NULL;
×
2753
  }
2754
}
2755

2756
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
3,214✔
2757
  if (res == NULL) {
3,214✔
2758
    return TSDB_CODE_INVALID_PARA;
×
2759
  }
2760
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
3,214✔
2761
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
×
2762
    return ((SMqRspObj*)res)->vgId;
3,214✔
2763
  } else {
2764
    return TSDB_CODE_INVALID_PARA;
×
2765
  }
2766
}
2767

2768
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
512✔
2769
  if (res == NULL) {
512✔
2770
    return TSDB_CODE_INVALID_PARA;
×
2771
  }
2772
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
512✔
2773
    SMqRspObj* pRspObj = (SMqRspObj*)res;
512✔
2774
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
512✔
2775
    if (pOffset->type == TMQ_OFFSET__LOG) {
512✔
2776
      return pOffset->version;
512✔
2777
    } else {
2778
      tqErrorC("invalid offset type:%d", pOffset->type);
×
2779
    }
2780
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
2781
    SMqRspObj* pRspObj = (SMqRspObj*)res;
×
2782
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
×
2783
      return pRspObj->rspOffset.version;
×
2784
    }
2785
  } else {
2786
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
2787
  }
2788

2789
  // data from tsdb, no valid offset info
2790
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2791
}
2792

2793
const char* tmq_get_table_name(TAOS_RES* res) {
1,280,958✔
2794
  if (res == NULL) {
1,280,958✔
2795
    return NULL;
×
2796
  }
2797
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
1,280,958✔
2798
    SMqRspObj* pRspObj = (SMqRspObj*)res;
1,280,958✔
2799
    SMqDataRsp* data = &pRspObj->dataRsp;
1,280,958✔
2800
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
1,280,958✔
2801
        pRspObj->resIter >= data->blockNum) {
×
2802
      return NULL;
1,280,958✔
2803
    }
2804
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
×
2805
  }
2806
  return NULL;
×
2807
}
2808

2809
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
×
2810
  if (tmq == NULL) {
×
2811
    tqErrorC("invalid tmq handle, null");
×
2812
    if (cb != NULL) {
×
2813
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
2814
    }
2815
    return;
×
2816
  }
2817
  if (pRes == NULL) {  // here needs to commit all offsets.
×
2818
    asyncCommitAllOffsets(tmq, cb, param);
×
2819
  } else {  // only commit one offset
2820
    asyncCommitFromResult(tmq, pRes, cb, param);
×
2821
  }
2822
}
2823

2824
static void commitCallBackFn(tmq_t* tmq, int32_t code, void* param) {
199✔
2825
  if (param == NULL) {
199✔
2826
    tqErrorC("invalid param in commit cb");
×
2827
    return;
×
2828
  }
2829
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
199✔
2830
  pInfo->code = code;
199✔
2831
  if (tsem2_post(&pInfo->sem) != 0){
199✔
2832
    tqErrorC("failed to post rsp sem in commit cb");
×
2833
  }
2834
}
2835

2836
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
198✔
2837
  if (tmq == NULL) {
198✔
2838
    tqErrorC("invalid tmq handle, null");
×
2839
    return TSDB_CODE_INVALID_PARA;
×
2840
  }
2841

2842
  int32_t code = 0;
198✔
2843

2844
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
198✔
2845
  if (pInfo == NULL) {
198✔
2846
    tqErrorC("failed to allocate memory for sync commit");
×
2847
    return terrno;
×
2848
  }
2849

2850
  code = tsem2_init(&pInfo->sem, 0, 0);
198✔
2851
  if (code != 0) {
198✔
2852
    tqErrorC("failed to init sem for sync commit");
×
2853
    taosMemoryFree(pInfo);
×
2854
    return code;
×
2855
  }
2856
  pInfo->code = 0;
198✔
2857

2858
  if (pRes == NULL) {
198✔
2859
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
145✔
2860
  } else {
2861
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
53✔
2862
  }
2863

2864
  if (tsem2_wait(&pInfo->sem) != 0){
198✔
2865
    tqErrorC("failed to wait sem for sync commit");
×
2866
  }
2867
  code = pInfo->code;
198✔
2868

2869
  if(tsem2_destroy(&pInfo->sem) != 0) {
198✔
2870
    tqErrorC("failed to destroy sem for sync commit");
×
2871
  }
2872
  taosMemoryFree(pInfo);
198✔
2873

2874
  tqInfoC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
198✔
2875
  return code;
198✔
2876
}
2877

2878
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2879
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
12✔
2880
  if (offset == NULL) {
12✔
2881
    tqErrorC("invalid offset, null");
×
2882
    return TSDB_CODE_INVALID_PARA;
×
2883
  }
2884
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
12✔
2885
    tqErrorC("Assignment or poll interface need to be called first");
×
2886
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
2887
  }
2888

2889
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
12✔
2890
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
×
2891
             offset->walVerBegin, offset->walVerEnd);
2892
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
×
2893
  }
2894

2895
  return 0;
12✔
2896
}
2897

2898
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
2✔
2899
  if (tmq == NULL || pTopicName == NULL) {
2✔
2900
    tqErrorC("invalid tmq handle, null");
×
2901
    return TSDB_CODE_INVALID_PARA;
×
2902
  }
2903

2904
  int32_t accId = tmq->pTscObj->acctId;
2✔
2905
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
2✔
2906
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
2✔
2907

2908
  taosWLockLatch(&tmq->lock);
2✔
2909
  SMqClientVg* pVg = NULL;
2✔
2910
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
2✔
2911
  if (code != 0) {
2✔
2912
    taosWUnLockLatch(&tmq->lock);
×
2913
    return code;
×
2914
  }
2915

2916
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
2✔
2917
  code = checkWalRange(pOffsetInfo, offset);
2✔
2918
  if (code != 0) {
2✔
2919
    taosWUnLockLatch(&tmq->lock);
×
2920
    return code;
×
2921
  }
2922
  taosWUnLockLatch(&tmq->lock);
2✔
2923

2924
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
2✔
2925

2926
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
2✔
2927
  if (pInfo == NULL) {
2✔
2928
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
2929
    return terrno;
×
2930
  }
2931

2932
  code = tsem2_init(&pInfo->sem, 0, 0);
2✔
2933
  if (code != 0) {
2✔
2934
    taosMemoryFree(pInfo);
×
2935
    return code;
×
2936
  }
2937
  pInfo->code = 0;
2✔
2938

2939
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
2✔
2940
  if (code == 0) {
2✔
2941
    if (tsem2_wait(&pInfo->sem) != 0){
1✔
2942
      tqErrorC("failed to wait sem for sync commit offset");
×
2943
    }
2944
    code = pInfo->code;
1✔
2945
  }
2946

2947
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
2✔
2948
  if(tsem2_destroy(&pInfo->sem) != 0) {
2✔
2949
    tqErrorC("failed to destroy sem for sync commit offset");
×
2950
  }
2951
  taosMemoryFree(pInfo);
2✔
2952

2953
  tqInfoC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
2✔
2954
          offset, tstrerror(code));
2955

2956
  return code;
2✔
2957
}
2958

2959
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
×
2960
                             void* param) {
2961
  int32_t code = 0;
×
2962
  if (tmq == NULL || pTopicName == NULL) {
×
2963
    tqErrorC("invalid tmq handle, null");
×
2964
    code = TSDB_CODE_INVALID_PARA;
×
2965
    goto end;
×
2966
  }
2967

2968
  int32_t accId = tmq->pTscObj->acctId;
×
2969
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
×
2970
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
×
2971

2972
  taosWLockLatch(&tmq->lock);
×
2973
  SMqClientVg* pVg = NULL;
×
2974
  code = getClientVg(tmq, tname, vgId, &pVg);
×
2975
  if (code != 0) {
×
2976
    taosWUnLockLatch(&tmq->lock);
×
2977
    goto end;
×
2978
  }
2979

2980
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
2981
  code = checkWalRange(pOffsetInfo, offset);
×
2982
  if (code != 0) {
×
2983
    taosWUnLockLatch(&tmq->lock);
×
2984
    goto end;
×
2985
  }
2986
  taosWUnLockLatch(&tmq->lock);
×
2987

2988
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
×
2989

2990
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
×
2991

2992
  tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
×
2993
          offset, tstrerror(code));
2994

2995
  end:
×
2996
  if (code != 0 && cb != NULL) {
×
2997
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
2998
    cb(tmq, code, param);
×
2999
  }
3000
}
×
3001

3002

3003
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
43,618✔
3004
  if (res == NULL || pResInfo == NULL) {
43,618✔
3005
    return TSDB_CODE_INVALID_PARA;
×
3006
  }
3007
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
43,618✔
3008
  SMqDataRsp* data = &pRspObj->dataRsp;
43,618✔
3009

3010
  pRspObj->resIter++;
43,618✔
3011
  if (pRspObj->resIter < data->blockNum) {
43,618✔
3012
    if (data->withSchema) {
39,843✔
3013
      doFreeReqResultInfo(&pRspObj->resInfo);
39,843✔
3014
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
39,843✔
3015
      if (pSW) {
39,843✔
3016
        TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
39,843✔
3017
      }
3018
    }
3019

3020
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
39,843✔
3021
    void*   rawData = NULL;
39,843✔
3022
    int64_t rows = 0;
39,843✔
3023
    int32_t precision = 0;
39,843✔
3024
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
39,843✔
3025

3026
    pRspObj->resInfo.pData = rawData;
39,843✔
3027
    pRspObj->resInfo.numOfRows = rows;
39,843✔
3028
    pRspObj->resInfo.current = 0;
39,843✔
3029
    pRspObj->resInfo.precision = precision;
39,843✔
3030

3031
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
39,843✔
3032
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4, false);
39,843✔
3033
    if (code != 0) {
39,843✔
3034
      return code;
×
3035
    }
3036
    *pResInfo = &pRspObj->resInfo;
39,843✔
3037
    return code;
39,843✔
3038
  }
3039

3040
  return TSDB_CODE_TSC_INTERNAL_ERROR;
3,775✔
3041
}
3042

3043
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
12✔
3044
  if (param == NULL || pMsg == NULL) {
12✔
3045
    return code;
×
3046
  }
3047
  SMqVgWalInfoParam* pParam = param;
12✔
3048
  SMqVgCommon*       pCommon = pParam->pCommon;
12✔
3049

3050
  if (code != TSDB_CODE_SUCCESS) {
12✔
3051
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3052
             pParam->vgId, pCommon->pTopicName);
3053

3054
  } else {
3055
    SMqDataRsp rsp = {0};
12✔
3056
    SDecoder   decoder = {0};
12✔
3057
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
12✔
3058
    code = tDecodeMqDataRsp(&decoder, &rsp);
12✔
3059
    tDecoderClear(&decoder);
12✔
3060
    if (code != 0) {
12✔
3061
      goto END;
×
3062
    }
3063

3064
    SMqRspHead*          pHead = pMsg->pData;
12✔
3065
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
12✔
3066
        .end = pHead->walever + 1,
12✔
3067
        .currentOffset = rsp.rspOffset.version,
12✔
3068
        .vgId = pParam->vgId};
12✔
3069

3070
    (void)taosThreadMutexLock(&pCommon->mutex);
12✔
3071
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
24✔
3072
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
3073
               pParam->vgId, pCommon->pTopicName);
3074
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
3075
    }
3076
    (void)taosThreadMutexUnlock(&pCommon->mutex);
12✔
3077
  }
3078

3079
  END:
12✔
3080
  pCommon->code = code;
12✔
3081
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
12✔
3082
  if (total == pParam->totalReq) {
12✔
3083
    if (tsem2_post(&pCommon->rsp) != 0) {
6✔
3084
      tqErrorC("failed to post semaphore in get wal cb");
×
3085
    }
3086
  }
3087

3088
  if (pMsg) {
12✔
3089
    taosMemoryFree(pMsg->pData);
12✔
3090
    taosMemoryFree(pMsg->pEpSet);
12✔
3091
  }
3092

3093
  return code;
12✔
3094
}
3095

3096
static void destroyCommonInfo(SMqVgCommon* pCommon) {
12✔
3097
  if (pCommon == NULL) {
12✔
3098
    return;
6✔
3099
  }
3100
  taosArrayDestroy(pCommon->pList);
6✔
3101
  pCommon->pList = NULL;
6✔
3102
  if(tsem2_destroy(&pCommon->rsp) != 0) {
6✔
3103
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
3104
  }
3105
  taosMemoryFreeClear(pCommon->pTopicName);
6✔
3106
  (void)taosThreadMutexDestroy(&pCommon->mutex);
6✔
3107
  taosMemoryFree(pCommon);
6✔
3108
}
3109

3110
static bool isInSnapshotMode(int8_t type, bool useSnapshot) {
29✔
3111
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
29✔
3112
    return true;
×
3113
  }
3114
  return false;
29✔
3115
}
3116

3117
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
1✔
3118
  if (param == NULL) {
1✔
3119
    return code;
×
3120
  }
3121
  SMqCommittedParam* pParam = param;
1✔
3122

3123
  if (code != 0) {
1✔
3124
    goto end;
1✔
3125
  }
3126
  if (pMsg) {
×
3127
    SDecoder decoder = {0};
×
3128
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
×
3129
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
×
3130
    if (err < 0) {
×
3131
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3132
      code = err;
×
3133
      goto end;
×
3134
    }
3135
    tDecoderClear(&decoder);
×
3136
  }
3137

3138
  end:
×
3139
  if (pMsg) {
1✔
3140
    taosMemoryFree(pMsg->pData);
1✔
3141
    taosMemoryFree(pMsg->pEpSet);
1✔
3142
  }
3143
  pParam->code = code;
1✔
3144
  if (tsem2_post(&pParam->sem) != 0){
1✔
3145
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
3146
  }
3147
  return code;
1✔
3148
}
3149

3150
int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) {
1✔
3151
  if (tmq == NULL || tname == NULL || epSet == NULL) {
1✔
3152
    return TSDB_CODE_INVALID_PARA;
×
3153
  }
3154
  int32_t     code = 0;
1✔
3155
  SMqVgOffset pOffset = {0};
1✔
3156

3157
  pOffset.consumerId = tmq->consumerId;
1✔
3158
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
1✔
3159

3160
  int32_t len = 0;
1✔
3161
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
1✔
3162
  if (code < 0) {
1✔
3163
    return TSDB_CODE_INVALID_PARA;
×
3164
  }
3165

3166
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
1✔
3167
  if (buf == NULL) {
1✔
3168
    return terrno;
×
3169
  }
3170

3171
  ((SMsgHead*)buf)->vgId = htonl(vgId);
1✔
3172

3173
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
3174

3175
  SEncoder encoder = {0};
1✔
3176
  tEncoderInit(&encoder, abuf, len);
1✔
3177
  code = tEncodeMqVgOffset(&encoder, &pOffset);
1✔
3178
  if (code < 0) {
1✔
3179
    taosMemoryFree(buf);
×
3180
    tEncoderClear(&encoder);
×
3181
    return code;
×
3182
  }
3183
  tEncoderClear(&encoder);
1✔
3184

3185
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1✔
3186
  if (sendInfo == NULL) {
1✔
3187
    taosMemoryFree(buf);
×
3188
    return terrno;
×
3189
  }
3190

3191
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
1✔
3192
  if (pParam == NULL) {
1✔
3193
    taosMemoryFree(buf);
×
3194
    taosMemoryFree(sendInfo);
×
3195
    return terrno;
×
3196
  }
3197
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
1✔
3198
    taosMemoryFree(buf);
×
3199
    taosMemoryFree(sendInfo);
×
3200
    taosMemoryFree(pParam);
×
3201
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3202
  }
3203

3204
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
1✔
3205
  sendInfo->requestId = generateRequestId();
1✔
3206
  sendInfo->requestObjRefId = 0;
1✔
3207
  sendInfo->param = pParam;
1✔
3208
  sendInfo->fp = tmCommittedCb;
1✔
3209
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
1✔
3210

3211
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
1✔
3212
  if (code != 0) {
1✔
3213
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3214
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
3215
    }
3216
    taosMemoryFree(pParam);
×
3217
    return code;
×
3218
  }
3219

3220
  if (tsem2_wait(&pParam->sem) != 0){
1✔
3221
    tqErrorC("failed to wait semaphore in get committed from server");
×
3222
  }
3223
  code = pParam->code;
1✔
3224
  if (code == TSDB_CODE_SUCCESS) {
1✔
3225
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
×
3226
      code = pParam->vgOffset.offset.val.version;
×
3227
    } else {
3228
      tOffsetDestroy(&pParam->vgOffset.offset);
×
3229
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3230
    }
3231
  }
3232
  if(tsem2_destroy(&pParam->sem) != 0) {
1✔
3233
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
3234
  }
3235
  taosMemoryFree(pParam);
1✔
3236

3237
  return code;
1✔
3238
}
3239

3240
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
×
3241
  if (tmq == NULL || pTopicName == NULL) {
×
3242
    tqErrorC("invalid tmq handle, null");
×
3243
    return TSDB_CODE_INVALID_PARA;
×
3244
  }
3245

3246
  int32_t accId = tmq->pTscObj->acctId;
×
3247
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
×
3248
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
×
3249

3250
  taosWLockLatch(&tmq->lock);
×
3251

3252
  SMqClientVg* pVg = NULL;
×
3253
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
×
3254
  if (code != 0) {
×
3255
    taosWUnLockLatch(&tmq->lock);
×
3256
    return code;
×
3257
  }
3258

3259
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
3260
  int32_t        type = pOffsetInfo->endOffset.type;
×
3261
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
×
3262
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
3263
    taosWUnLockLatch(&tmq->lock);
×
3264
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3265
  }
3266

3267
  code = checkWalRange(pOffsetInfo, -1);
×
3268
  if (code != 0) {
×
3269
    taosWUnLockLatch(&tmq->lock);
×
3270
    return code;
×
3271
  }
3272
  SEpSet  epSet = pVg->epSet;
×
3273
  int64_t begin = pVg->offsetInfo.walVerBegin;
×
3274
  int64_t end = pVg->offsetInfo.walVerEnd;
×
3275
  taosWUnLockLatch(&tmq->lock);
×
3276

3277
  int64_t position = 0;
×
3278
  if (type == TMQ_OFFSET__LOG) {
×
3279
    position = pOffsetInfo->endOffset.version;
×
3280
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
×
3281
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
×
3282
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
×
3283
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
3284
        position = begin;
×
3285
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
3286
        position = end;
×
3287
      }
3288
    } else {
3289
      position = code;
×
3290
    }
3291
  } else {
3292
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
3293
  }
3294

3295
  tqInfoC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
×
3296
  return position;
×
3297
}
3298

3299
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
1✔
3300
  if (tmq == NULL || pTopicName == NULL) {
1✔
3301
    tqErrorC("invalid tmq handle, null");
×
3302
    return TSDB_CODE_INVALID_PARA;
×
3303
  }
3304

3305
  int32_t accId = tmq->pTscObj->acctId;
1✔
3306
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
1✔
3307
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
1✔
3308

3309
  taosWLockLatch(&tmq->lock);
1✔
3310

3311
  SMqClientVg* pVg = NULL;
1✔
3312
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
1✔
3313
  if (code != 0) {
1✔
3314
    taosWUnLockLatch(&tmq->lock);
×
3315
    return code;
×
3316
  }
3317

3318
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
1✔
3319
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
1✔
3320
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3321
             pOffsetInfo->endOffset.type);
3322
    taosWUnLockLatch(&tmq->lock);
×
3323
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3324
  }
3325

3326
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
1✔
3327
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
3328
             pOffsetInfo->committedOffset.type);
3329
    taosWUnLockLatch(&tmq->lock);
×
3330
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3331
  }
3332

3333
  int64_t committed = 0;
1✔
3334
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
1✔
3335
    committed = pOffsetInfo->committedOffset.version;
×
3336
    taosWUnLockLatch(&tmq->lock);
×
3337
    goto end;
×
3338
  }
3339
  SEpSet epSet = pVg->epSet;
1✔
3340
  taosWUnLockLatch(&tmq->lock);
1✔
3341

3342
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
1✔
3343

3344
  end:
1✔
3345
  tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
1✔
3346
  return committed;
1✔
3347
}
3348

3349
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
12✔
3350
                                 int32_t* numOfAssignment) {
3351
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
12✔
3352
    tqErrorC("invalid tmq handle, null");
×
3353
    return TSDB_CODE_INVALID_PARA;
×
3354
  }
3355
  *numOfAssignment = 0;
12✔
3356
  *assignment = NULL;
12✔
3357
  SMqVgCommon* pCommon = NULL;
12✔
3358

3359
  int32_t accId = tmq->pTscObj->acctId;
12✔
3360
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
12✔
3361
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
12✔
3362

3363
  taosWLockLatch(&tmq->lock);
12✔
3364

3365
  SMqClientTopic* pTopic = NULL;
12✔
3366
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
12✔
3367
  if (code != 0) {
12✔
3368
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
3✔
3369
    goto end;
3✔
3370
  }
3371

3372
  // in case of snapshot is opened, no valid offset will return
3373
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
9✔
3374
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
26✔
3375
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
17✔
3376
    if (pClientVg == NULL) {
17✔
3377
      continue;
×
3378
    }
3379
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
17✔
3380
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
17✔
3381
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
3382
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3383
      goto end;
×
3384
    }
3385
  }
3386

3387
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
9✔
3388
  if (*assignment == NULL) {
9✔
3389
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
3390
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
3391
    code = terrno;
×
3392
    goto end;
×
3393
  }
3394

3395
  bool needFetch = false;
9✔
3396

3397
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
14✔
3398
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
11✔
3399
    if (pClientVg == NULL) {
11✔
3400
      continue;
×
3401
    }
3402
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
11✔
3403
      needFetch = true;
6✔
3404
      break;
6✔
3405
    }
3406

3407
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
5✔
3408
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
5✔
3409
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
5✔
3410
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
5✔
3411
    pAssignment->vgId = pClientVg->vgId;
5✔
3412
    tqInfoC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
5✔
3413
            pAssignment->currentOffset);
3414
  }
3415

3416
  if (needFetch) {
9✔
3417
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
6✔
3418
    if (pCommon == NULL) {
6✔
3419
      code = terrno;
×
3420
      goto end;
×
3421
    }
3422

3423
    pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
6✔
3424
    if (pCommon->pList == NULL) {
6✔
3425
      code = terrno;
×
3426
      goto end;
×
3427
    }
3428

3429
    code = tsem2_init(&pCommon->rsp, 0, 0);
6✔
3430
    if (code != 0) {
6✔
3431
      goto end;
×
3432
    }
3433
    (void)taosThreadMutexInit(&pCommon->mutex, 0);
6✔
3434
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
6✔
3435
    if (pCommon->pTopicName == NULL) {
6✔
3436
      code = terrno;
×
3437
      goto end;
×
3438
    }
3439
    pCommon->consumerId = tmq->consumerId;
6✔
3440
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
18✔
3441
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
12✔
3442
      if (pClientVg == NULL) {
12✔
3443
        continue;
×
3444
      }
3445
      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
12✔
3446
      if (pParam == NULL) {
12✔
3447
        code = terrno;
×
3448
        goto end;
×
3449
      }
3450

3451
      pParam->epoch = tmq->epoch;
12✔
3452
      pParam->vgId = pClientVg->vgId;
12✔
3453
      pParam->totalReq = *numOfAssignment;
12✔
3454
      pParam->pCommon = pCommon;
12✔
3455

3456
      SMqPollReq req = {0};
12✔
3457
      tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg);
12✔
3458
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
12✔
3459

3460
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
12✔
3461
      if (msgSize < 0) {
12✔
3462
        taosMemoryFree(pParam);
×
3463
        code = msgSize;
×
3464
        goto end;
×
3465
      }
3466

3467
      char* msg = taosMemoryCalloc(1, msgSize);
12✔
3468
      if (NULL == msg) {
12✔
3469
        taosMemoryFree(pParam);
×
3470
        code = terrno;
×
3471
        goto end;
×
3472
      }
3473

3474
      msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
12✔
3475
      if (msgSize < 0) {
12✔
3476
        taosMemoryFree(msg);
×
3477
        taosMemoryFree(pParam);
×
3478
        code = msgSize;
×
3479
        goto end;
×
3480
      }
3481

3482
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
12✔
3483
      if (sendInfo == NULL) {
12✔
3484
        taosMemoryFree(pParam);
×
3485
        taosMemoryFree(msg);
×
3486
        code = terrno;
×
3487
        goto end;
×
3488
      }
3489

3490
      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
12✔
3491
      sendInfo->requestId = req.reqId;
12✔
3492
      sendInfo->requestObjRefId = 0;
12✔
3493
      sendInfo->param = pParam;
12✔
3494
      sendInfo->paramFreeFp = taosAutoMemoryFree;
12✔
3495
      sendInfo->fp = tmqGetWalInfoCb;
12✔
3496
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
12✔
3497

3498
      // int64_t transporterId = 0;
3499
      char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
12✔
3500
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
12✔
3501

3502
      tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, QID:0x%" PRIx64, tmq->consumerId,
12✔
3503
              pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3504
      code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
12✔
3505
      if (code != 0) {
12✔
3506
        goto end;
×
3507
      }
3508
    }
3509

3510
    if (tsem2_wait(&pCommon->rsp) != 0){
6✔
3511
      tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
3512
    }
3513
    code = pCommon->code;
6✔
3514

3515
    if (code != TSDB_CODE_SUCCESS) {
6✔
3516
      goto end;
×
3517
    }
3518
    int32_t num = taosArrayGetSize(pCommon->pList);
6✔
3519
    for (int32_t i = 0; i < num; ++i) {
18✔
3520
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
12✔
3521
    }
3522
    *numOfAssignment = num;
6✔
3523

3524
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
18✔
3525
      tmq_topic_assignment* p = &(*assignment)[j];
12✔
3526

3527
      for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
36✔
3528
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
24✔
3529
        if (pClientVg == NULL) {
24✔
3530
          continue;
×
3531
        }
3532
        if (pClientVg->vgId != p->vgId) {
24✔
3533
          continue;
12✔
3534
        }
3535

3536
        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
12✔
3537
        tqInfoC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
12✔
3538
                p->vgId, p->currentOffset);
3539

3540
        pOffsetInfo->walVerBegin = p->begin;
12✔
3541
        pOffsetInfo->walVerEnd = p->end;
12✔
3542
      }
3543
    }
3544
  }
3545

3546
  end:
9✔
3547
  if (code != TSDB_CODE_SUCCESS) {
12✔
3548
    taosMemoryFree(*assignment);
3✔
3549
    *assignment = NULL;
3✔
3550
    *numOfAssignment = 0;
3✔
3551
  }
3552
  destroyCommonInfo(pCommon);
12✔
3553
  taosWUnLockLatch(&tmq->lock);
12✔
3554
  return code;
12✔
3555
}
3556

3557
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
7✔
3558
  if (pAssignment == NULL) {
7✔
3559
    return;
2✔
3560
  }
3561

3562
  taosMemoryFree(pAssignment);
5✔
3563
}
3564

3565
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
10✔
3566
  if (pMsg) {
10✔
3567
    taosMemoryFree(pMsg->pData);
10✔
3568
    taosMemoryFree(pMsg->pEpSet);
10✔
3569
  }
3570
  if (param == NULL) {
10✔
3571
    return code;
×
3572
  }
3573
  SMqSeekParam* pParam = param;
10✔
3574
  pParam->code = code;
10✔
3575
  if (tsem2_post(&pParam->sem) != 0){
10✔
3576
    tqErrorC("failed to post sem in tmqSeekCb");
×
3577
  }
3578
  return 0;
10✔
3579
}
3580

3581
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
3582
// there is no data to poll
3583
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
10✔
3584
  if (tmq == NULL || pTopicName == NULL) {
10✔
3585
    tqErrorC("invalid tmq handle, null");
×
3586
    return TSDB_CODE_INVALID_PARA;
×
3587
  }
3588

3589
  int32_t accId = tmq->pTscObj->acctId;
10✔
3590
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
10✔
3591
  (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
10✔
3592

3593
  taosWLockLatch(&tmq->lock);
10✔
3594

3595
  SMqClientVg* pVg = NULL;
10✔
3596
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
10✔
3597
  if (code != 0) {
10✔
3598
    taosWUnLockLatch(&tmq->lock);
×
3599
    return code;
×
3600
  }
3601

3602
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
10✔
3603

3604
  int32_t type = pOffsetInfo->endOffset.type;
10✔
3605
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
10✔
3606
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
3607
    taosWUnLockLatch(&tmq->lock);
×
3608
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
3609
  }
3610

3611
  code = checkWalRange(pOffsetInfo, offset);
10✔
3612
  if (code != 0) {
10✔
3613
    taosWUnLockLatch(&tmq->lock);
×
3614
    return code;
×
3615
  }
3616

3617
  tqInfoC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
10✔
3618
  // update the offset, and then commit to vnode
3619
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
10✔
3620
  pOffsetInfo->endOffset.version = offset;
10✔
3621
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
10✔
3622
  pVg->seekUpdated = true;
10✔
3623
  SEpSet epSet = pVg->epSet;
10✔
3624
  taosWUnLockLatch(&tmq->lock);
10✔
3625

3626
  SMqSeekReq req = {0};
10✔
3627
  (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
10✔
3628
  req.head.vgId = vgId;
10✔
3629
  req.consumerId = tmq->consumerId;
10✔
3630

3631
  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
10✔
3632
  if (msgSize < 0) {
10✔
3633
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3634
  }
3635

3636
  char* msg = taosMemoryCalloc(1, msgSize);
10✔
3637
  if (NULL == msg) {
10✔
3638
    return terrno;
×
3639
  }
3640

3641
  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
10✔
3642
    taosMemoryFree(msg);
×
3643
    return TSDB_CODE_PAR_INTERNAL_ERROR;
×
3644
  }
3645

3646
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
10✔
3647
  if (sendInfo == NULL) {
10✔
3648
    taosMemoryFree(msg);
×
3649
    return terrno;
×
3650
  }
3651

3652
  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
10✔
3653
  if (pParam == NULL) {
10✔
3654
    taosMemoryFree(msg);
×
3655
    taosMemoryFree(sendInfo);
×
3656
    return terrno;
×
3657
  }
3658
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
10✔
3659
    taosMemoryFree(msg);
×
3660
    taosMemoryFree(sendInfo);
×
3661
    taosMemoryFree(pParam);
×
3662
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3663
  }
3664

3665
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
10✔
3666
  sendInfo->requestId = generateRequestId();
10✔
3667
  sendInfo->requestObjRefId = 0;
10✔
3668
  sendInfo->param = pParam;
10✔
3669
  sendInfo->fp = tmqSeekCb;
10✔
3670
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
10✔
3671

3672
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
10✔
3673
  if (code != 0) {
10✔
3674
    if(tsem2_destroy(&pParam->sem) != 0) {
×
3675
      tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3676
    }
3677
    taosMemoryFree(pParam);
×
3678
    return code;
×
3679
  }
3680

3681
  if (tsem2_wait(&pParam->sem) != 0){
10✔
3682
    tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
×
3683
  }
3684
  code = pParam->code;
10✔
3685
  if(tsem2_destroy(&pParam->sem) != 0) {
10✔
3686
    tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
×
3687
  }
3688
  taosMemoryFree(pParam);
10✔
3689

3690
  tqInfoC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
10✔
3691

3692
  return code;
10✔
3693
}
3694

3695
TAOS* tmq_get_connect(tmq_t* tmq) {
×
3696
  if (tmq && tmq->pTscObj) {
×
3697
    return (TAOS*)(&(tmq->pTscObj->id));
×
3698
  }
3699
  return NULL;
×
3700
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc