• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4928

15 Jan 2026 04:09PM UTC coverage: 66.708% (+0.6%) from 66.12%
#4928

push

travis-ci

web-flow
merge: from main to 3.0 branch #34317

840 of 1255 new or added lines in 23 files covered. (66.93%)

826 existing lines in 119 files now uncovered.

203035 of 304362 relevant lines covered (66.71%)

130886757.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.79
/source/client/src/clientHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "scheduler.h"
22
#include "tglobal.h"
23
#include "trpc.h"
24

25
typedef struct {
26
  union {
27
    struct {
28
      SAppHbMgr *pAppHbMgr;
29
      int64_t    clusterId;
30
      int32_t    reqCnt;
31
      int8_t     connHbFlag;
32
    };
33
  };
34
} SHbParam;
35

36
SClientHbMgr clientHbMgr = {0};
37

38
static int32_t hbCreateThread();
39
static void    hbStopThread();
40
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg);
41
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
42

43
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
22,482,897✔
44
                                        SAppHbMgr *pAppHbMgr) {
45
  int32_t code = TSDB_CODE_SUCCESS;
22,482,897✔
46

47
  SUserAuthBatchRsp batchRsp = {0};
22,482,897✔
48
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
22,482,897✔
49
    return TSDB_CODE_INVALID_MSG;
×
50
  }
51

52
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
22,482,897✔
53
  for (int32_t i = 0; i < numOfBatchs; ++i) {
45,194,381✔
54
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
22,711,484✔
55
    if (NULL == rsp) {
22,711,484✔
56
      code = terrno;
×
57
      goto _return;
×
58
    }
59
    tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
22,711,484✔
60

61
    TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp));
22,711,484✔
62

63
  }
64

65
  if (numOfBatchs > 0) {
22,482,897✔
66
    TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp));
22,482,897✔
67
  }
68

69
  (void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
22,482,897✔
70

71
_return:
22,482,897✔
72
  taosArrayDestroy(batchRsp.pArray);
22,482,897✔
73
  return code;
22,482,897✔
74
}
75

76
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg) {
29,465,343✔
77
  int32_t code = 0;
29,465,343✔
78
  int32_t lino = 0;
29,465,343✔
79
  if (user == NULL || pCfg == NULL) {
29,465,343✔
80
    return code;
×
81
  }
82

83
  SUserSessCfg cfg = {0, 0, 0, 0, 0};
29,465,343✔
84

85
  if (memcmp(pCfg, &cfg, sizeof(SUserSessCfg)) == 0) {
29,465,343✔
86
    return TSDB_CODE_SUCCESS;
×
87
  }
88
  tscInfo(
29,465,343✔
89
      "update session metric for user:%s, sessPerUser:%d, sessConnTime:%d, sessConnIdleTime:%d, sessMaxConcurrency:%d, "
90
      "sessMaxCallVnodeNum:%d",
91
      user, pCfg->sessPerUser, pCfg->sessConnTime, pCfg->sessConnIdleTime, pCfg->sessMaxConcurrency,
92
      pCfg->sessMaxCallVnodeNum);
93

94
  if (pCfg->sessPerUser != 0) {
29,465,343✔
95
    code = sessMgtUpdataLimit((char *)user, SESSION_PER_USER, pCfg->sessPerUser);
29,465,343✔
96
    TAOS_CHECK_GOTO(code, &lino, _error);
29,465,343✔
97
  }
98

99
  if (pCfg->sessConnTime != 0) {
29,465,343✔
100
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_TIME, pCfg->sessConnTime);
29,465,343✔
101
    TAOS_CHECK_GOTO(code, &lino, _error);
29,465,343✔
102
  }
103

104
  if (pCfg->sessConnIdleTime != 0) {
29,465,343✔
105
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_IDLE_TIME, pCfg->sessConnIdleTime);
29,465,343✔
106
    TAOS_CHECK_GOTO(code, &lino, _error);
29,465,343✔
107
  }
108

109
  if (pCfg->sessMaxConcurrency != 0) {
29,465,343✔
110
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CONCURRENCY, pCfg->sessMaxConcurrency);
29,465,343✔
111
    TAOS_CHECK_GOTO(code, &lino, _error);
29,465,343✔
112
  }
113

114
  if (pCfg->sessMaxCallVnodeNum != 0) {
29,465,343✔
115
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CALL_VNODE_NUM, pCfg->sessMaxCallVnodeNum);
29,465,343✔
116
    TAOS_CHECK_GOTO(code, &lino, _error);
29,465,343✔
117
  }
118
_error:
29,465,343✔
119
  return code;
29,465,343✔
120
}
121
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
22,482,897✔
122
  int32_t code = 0;
22,482,897✔
123
  int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
22,482,897✔
124
  for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
48,343,015✔
125
    SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
25,860,118✔
126
    if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
25,860,118✔
127
      continue;
56,121✔
128
    }
129

130
    SClientHbReq    *pReq = NULL;
25,803,997✔
131
    SGetUserAuthRsp *pRsp = NULL;
25,803,997✔
132
    while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
56,149,385✔
133
      STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
30,379,158✔
134
      if (!pTscObj) {
30,379,158✔
135
        continue;
878,795✔
136
      }
137

138
      if (!pRsp) {
29,500,363✔
139
        for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
24,378,327✔
140
          SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
24,344,557✔
141
          if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
24,344,557✔
142
            pRsp = rsp;
24,154,551✔
143
            break;
24,154,551✔
144
          }
145
        }
146
        if (!pRsp) {
24,188,321✔
147
          releaseTscObj(pReq->connKey.tscRid);
33,770✔
148
          taosHashCancelIterate(hbMgr->activeInfo, pReq);
33,770✔
149
          break;
33,770✔
150
        }
151
      }
152

153
      if (pRsp->dropped == 1) {
29,466,593✔
154
        if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
1,250✔
155
          if (pTscObj->userDroppedInfo.fp) {
1,080✔
156
            SPassInfo *dropInfo = &pTscObj->userDroppedInfo;
850✔
157
            if (dropInfo->fp) {
850✔
158
              (*dropInfo->fp)(dropInfo->param, NULL, TAOS_NOTIFY_USER_DROPPED);
850✔
159
            }
160
          }
161
        }
162
        releaseTscObj(pReq->connKey.tscRid);
1,250✔
163
        continue;
1,250✔
164
      }
165

166
      // update token status
167
      if (pTscObj->tokenName[0] != 0) {
29,465,343✔
NEW
168
        STokenStatus *status = NULL;
×
NEW
169
        if (pRsp->tokens != NULL) {
×
NEW
170
          status = taosHashGet(pRsp->tokens, pTscObj->tokenName, strlen(pTscObj->tokenName) + 1);
×
171
        }
172

NEW
173
        STokenEvent event = {.type = TSDB_TOKEN_EVENT_MODIFIED, .expireTime = status->expireTime};
×
NEW
174
        tstrncpy(event.tokenName, pTscObj->tokenName, sizeof(event.tokenName));
×
NEW
175
        if (status == NULL) {
×
NEW
176
          event.type = TSDB_TOKEN_EVENT_DROPPED;
×
NEW
177
        } else if (status->enabled == 0) {
×
NEW
178
          event.type = TSDB_TOKEN_EVENT_DISABLED;
×
NEW
179
        } else if (status->expireTime > 0 && status->expireTime < taosGetTimestampSec()) {
×
NEW
180
          event.type = TSDB_TOKEN_EVENT_EXPIRED;
×
181
        }
182

NEW
183
        STokenNotifyInfo *tni = &pTscObj->tokenNotifyInfo;
×
NEW
184
        if (event.type == TSDB_TOKEN_EVENT_MODIFIED) {
×
185
          int32_t oldExpireTime;
186
          do {
NEW
187
            oldExpireTime = atomic_load_32(&pTscObj->tokenExpireTime);
×
NEW
188
            if (oldExpireTime == status->expireTime) {
×
NEW
189
              break;
×
190
            }
NEW
191
          } while (atomic_val_compare_exchange_32(&pTscObj->tokenExpireTime, oldExpireTime, status->expireTime) != oldExpireTime);
×
192

NEW
193
          if (oldExpireTime != status->expireTime && tni->fp) {
×
NEW
194
            (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
195
          }
NEW
196
          tscDebug("update token %s of user %s: expire time from %d to %d, conn:%" PRIi64, pTscObj->tokenName,
×
197
                   pRsp->user, pTscObj->tokenExpireTime, status->expireTime, pTscObj->id);
198
        } else {
NEW
199
          if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
×
NEW
200
            if (tni->fp) {
×
NEW
201
              (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
202
            }
203
          }
204

NEW
205
          releaseTscObj(pReq->connKey.tscRid);
×
NEW
206
          continue;
×
207
        }
208
      }
209

210
      pTscObj->authVer = pRsp->version;
29,465,343✔
211
      if (hbUpdateUserSessMertric(pTscObj->user, &pRsp->sessCfg) != 0) {
29,465,343✔
212
        tscError("failed to update user session metric, user:%s", pTscObj->user);
×
213
      }
214

215
      if (pTscObj->sysInfo != pRsp->sysInfo) {
29,465,343✔
216
        tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", conn:%" PRIi64, pRsp->user,
1,700✔
217
                 pTscObj->sysInfo, pRsp->sysInfo, pTscObj->id);
218
        pTscObj->sysInfo = pRsp->sysInfo;
1,700✔
219
      }
220

221
      // update password version
222
      if (pTscObj->passInfo.fp) {
29,465,343✔
223
        SPassInfo *passInfo = &pTscObj->passInfo;
340✔
224
        int32_t    oldVer = 0;
340✔
225
        do {
226
          oldVer = atomic_load_32(&passInfo->ver);
340✔
227
          if (oldVer >= pRsp->passVer) {
340✔
NEW
228
            break;
×
229
          }
230
        } while (atomic_val_compare_exchange_32(&passInfo->ver, oldVer, pRsp->passVer) != oldVer);
340✔
231
        if (oldVer < pRsp->passVer) {
340✔
232
          (*passInfo->fp)(passInfo->param, &pRsp->passVer, TAOS_NOTIFY_PASSVER);
340✔
233
          tscDebug("update passVer of user %s from %d to %d, conn:%" PRIi64, pRsp->user, oldVer,
340✔
234
                   atomic_load_32(&passInfo->ver), pTscObj->id);
235
        }
236
      }
237

238
      // update ip white list version
239
      {
240
        SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
29,465,343✔
241
        int64_t oldVer = 0, newVer = pRsp->whiteListVer;
29,465,343✔
242
        do {
243
          oldVer = atomic_load_64(&whiteListInfo->ver);
29,465,343✔
244
          if (oldVer >= newVer) {
29,465,343✔
245
            break;
29,465,044✔
246
          }
247
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
299✔
248

249
        if (oldVer < newVer) {
29,465,343✔
250
          if (whiteListInfo->fp) {
299✔
NEW
251
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_WHITELIST_VER);
×
252
          }
253
          tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
299✔
254
                   oldVer, newVer, pTscObj->id);
255
        }
256
      }
257

258
      // update date time whitelist version
259
      {
260
        SWhiteListInfo *whiteListInfo = &pTscObj->dateTimeWhiteListInfo;
29,465,343✔
261

262
        int64_t oldVer = 0, newVer = pRsp->timeWhiteListVer;
29,465,343✔
263
        do {
264
          oldVer = atomic_load_64(&whiteListInfo->ver);
29,465,343✔
265
          if (oldVer >= newVer) {
29,465,343✔
266
            break;
29,465,044✔
267
          }
268
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
299✔
269

270
        if (oldVer < newVer) {
29,465,343✔
271
          if (whiteListInfo->fp) {
299✔
NEW
272
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_DATETIME_WHITELIST_VER);
×
273
          }
274
          tscDebug("update date time whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
299✔
275
                   oldVer, newVer, pTscObj->id);
276
        }
277
      }
278
      
279
      releaseTscObj(pReq->connKey.tscRid);
29,465,343✔
280
    }
281
  }
282
  return 0;
22,482,897✔
283
}
284

285
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
1,298,860✔
286
  int32_t    code = 0;
1,298,860✔
287
  SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
1,298,860✔
288
  if (NULL == vgInfo) {
1,298,860✔
289
    return terrno;
×
290
  }
291

292
  vgInfo->vgVersion = rsp->vgVersion;
1,298,860✔
293
  vgInfo->stateTs = rsp->stateTs;
1,298,860✔
294
  vgInfo->flags = rsp->flags;
1,298,860✔
295
  vgInfo->hashMethod = rsp->hashMethod;
1,298,860✔
296
  vgInfo->hashPrefix = rsp->hashPrefix;
1,298,860✔
297
  vgInfo->hashSuffix = rsp->hashSuffix;
1,298,860✔
298
  vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,298,860✔
299
  if (NULL == vgInfo->vgHash) {
1,298,860✔
300
    tscError("hash init[%d] failed", rsp->vgNum);
×
301
    code = terrno;
×
302
    goto _return;
×
303
  }
304

305
  for (int32_t j = 0; j < rsp->vgNum; ++j) {
4,632,059✔
306
    SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
3,333,199✔
307
    if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
3,333,199✔
308
      tscError("hash push failed, terrno:%d", terrno);
×
309
      code = terrno;
×
310
      goto _return;
×
311
    }
312
  }
313

314
_return:
1,298,860✔
315
  if (code) {
1,298,860✔
316
    taosHashCleanup(vgInfo->vgHash);
×
317
    taosMemoryFreeClear(vgInfo);
×
318
  }
319

320
  *pInfo = vgInfo;
1,298,860✔
321
  return code;
1,298,860✔
322
}
323

324
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,144,759✔
325
  int32_t code = 0;
7,144,759✔
326

327
  SDbHbBatchRsp batchRsp = {0};
7,144,759✔
328
  if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
7,144,759✔
329
    terrno = TSDB_CODE_INVALID_MSG;
×
330
    code = terrno;
×
331
    goto _return;
×
332
  }
333

334
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
7,144,759✔
335
  for (int32_t i = 0; i < numOfBatchs; ++i) {
8,585,817✔
336
    SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
1,441,058✔
337
    if (NULL == rsp) {
1,441,058✔
338
      code = terrno;
×
339
      goto _return;
×
340
    }
341
    if (rsp->useDbRsp) {
1,441,058✔
342
      tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
1,101,738✔
343
               rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
344

345
      if (rsp->useDbRsp->vgVersion < 0) {
1,101,738✔
346
        tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
36,932✔
347
        code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
36,932✔
348
      } else {
349
        SDBVgInfo *vgInfo = NULL;
1,064,806✔
350
        code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
1,064,806✔
351
        if (TSDB_CODE_SUCCESS != code) {
1,064,806✔
352
          goto _return;
×
353
        }
354

355
        tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
1,064,806✔
356

357
        TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo));
1,064,806✔
358

359
        if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
1,064,806✔
360
          code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
234,054✔
361
          if (TSDB_CODE_SUCCESS != code) {
234,054✔
362
            goto _return;
×
363
          }
364

365
          TSC_ERR_JRET(catalogUpdateDBVgInfo(
234,054✔
366
              pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
367
              rsp->useDbRsp->uid, vgInfo));
368
        }
369
      }
370
    }
371

372
    if (rsp->cfgRsp) {
1,441,058✔
373
      tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
50,413✔
374
      code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
50,413✔
375
      rsp->cfgRsp = NULL;
50,413✔
376
    }
377
    if (rsp->pTsmaRsp) {
1,441,058✔
378
      if (rsp->pTsmaRsp->pTsmas) {
746,880✔
379
        for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
1,310✔
380
          STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
655✔
381
          if (NULL == pTsma) {
655✔
382
            TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
×
383
          }
384
          TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
655✔
385
        }
386
        taosArrayClear(rsp->pTsmaRsp->pTsmas);
655✔
387
      } else {
388
        TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId));
746,225✔
389
      }
390
    }
391
  }
392

393
_return:
7,144,759✔
394

395
  tFreeSDbHbBatchRsp(&batchRsp);
7,144,759✔
396
  return code;
7,144,759✔
397
}
398

399
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
5,065,126✔
400
  int32_t code = TSDB_CODE_SUCCESS;
5,065,126✔
401

402
  SSTbHbRsp hbRsp = {0};
5,065,126✔
403
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
5,065,126✔
404
    terrno = TSDB_CODE_INVALID_MSG;
×
405
    return -1;
×
406
  }
407

408
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
5,065,126✔
409
  for (int32_t i = 0; i < numOfMeta; ++i) {
5,114,563✔
410
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
49,437✔
411
    if (NULL == rsp) {
49,437✔
412
      code = terrno;
×
413
      goto _return;
×
414
    }
415
    if (rsp->numOfColumns < 0) {
49,437✔
416
      tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
34,349✔
417
      TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid));
34,349✔
418
    } else {
419
      tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
15,088✔
420
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
15,088✔
421
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
×
422
        tFreeSSTbHbRsp(&hbRsp);
×
423
        return TSDB_CODE_TSC_INVALID_VALUE;
×
424
      }
425

426
      TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp));
15,088✔
427
    }
428
  }
429

430
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
5,065,126✔
431
  for (int32_t i = 0; i < numOfIndex; ++i) {
5,065,126✔
432
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
×
433
    if (NULL == rsp) {
×
434
      code = terrno;
×
435
      goto _return;
×
436
    }
437
    TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp));
×
438
  }
439

440
_return:
5,065,126✔
441
  taosArrayDestroy(hbRsp.pIndexRsp);
5,065,126✔
442
  hbRsp.pIndexRsp = NULL;
5,065,126✔
443

444
  tFreeSSTbHbRsp(&hbRsp);
5,065,126✔
445
  return code;
5,065,126✔
446
}
447

448
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
949✔
449
  return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value);
949✔
450
}
451

452
static void hbFreeSViewMetaInRsp(void *p) {
×
453
  if (NULL == p || NULL == *(void **)p) {
×
454
    return;
×
455
  }
456
  SViewMetaRsp *pRsp = *(SViewMetaRsp **)p;
×
457
  tFreeSViewMetaRsp(pRsp);
×
458
  taosMemoryFreeClear(pRsp);
×
459
}
460

461
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
949✔
462
  int32_t code = TSDB_CODE_SUCCESS;
949✔
463

464
  SViewHbRsp hbRsp = {0};
949✔
465
  if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
949✔
466
    taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
×
467
    terrno = TSDB_CODE_INVALID_MSG;
×
468
    return -1;
×
469
  }
470

471
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
949✔
472
  for (int32_t i = 0; i < numOfMeta; ++i) {
949✔
473
    SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
×
474
    if (NULL == rsp) {
×
475
      code = terrno;
×
476
      goto _return;
×
477
    }
478
    if (rsp->numOfCols < 0) {
×
479
      tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
480
      code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
×
481
      tFreeSViewMetaRsp(rsp);
×
482
      taosMemoryFreeClear(rsp);
×
483
    } else {
484
      tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
485
      code = catalogUpdateViewMeta(pCatalog, rsp);
×
486
    }
487
    TSC_ERR_JRET(code);
×
488
  }
489

490
_return:
949✔
491
  taosArrayDestroy(hbRsp.pViewRsp);
949✔
492
  return code;
949✔
493
}
494

495
static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
42,575✔
496
  int32_t code = 0;
42,575✔
497

498
  STSMAHbRsp hbRsp = {0};
42,575✔
499
  if (tDeserializeTSMAHbRsp(value, valueLen, &hbRsp)) {
42,575✔
500
    terrno = TSDB_CODE_INVALID_MSG;
×
501
    return -1;
×
502
  }
503

504
  int32_t numOfTsma = taosArrayGetSize(hbRsp.pTsmas);
42,575✔
505
  for (int32_t i = 0; i < numOfTsma; ++i) {
42,575✔
506
    STableTSMAInfo *pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i);
×
507

508
    if (!pTsmaInfo->pFuncs) {
×
509
      tscDebug("hb to remove tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
510
      code = catalogRemoveTSMA(pCatalog, pTsmaInfo);
×
511
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
512
    } else {
513
      tscDebug("hb to update tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
514
      code = catalogUpdateTSMA(pCatalog, &pTsmaInfo);
×
515
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
516
    }
517
    TSC_ERR_JRET(code);
×
518
  }
519

520
_return:
42,575✔
521
  taosArrayDestroy(hbRsp.pTsmas);
42,575✔
522
  return code;
42,575✔
523
}
524

525
static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
22,482,957✔
526
  for (int32_t i = 0; i < kvNum; ++i) {
57,220,212✔
527
    SKv *kv = taosArrayGet(pKvs, i);
34,737,255✔
528
    if (NULL == kv) {
34,737,255✔
529
      tscError("invalid hb kv, idx:%d", i);
×
530
      continue;
×
531
    }
532
    switch (kv->key) {
34,737,255✔
533
      case HEARTBEAT_KEY_USER_AUTHINFO: {
22,482,897✔
534
        if (kv->valueLen <= 0 || NULL == kv->value) {
22,482,897✔
535
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
×
536
          break;
×
537
        }
538
        if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) {
22,482,897✔
539
          tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
540
          break;
×
541
        }
542
        break;
22,482,897✔
543
      }
544
      case HEARTBEAT_KEY_DBINFO: {
7,144,759✔
545
        if (kv->valueLen <= 0 || NULL == kv->value) {
7,144,759✔
546
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
×
547
          break;
×
548
        }
549
        if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) {
7,144,759✔
550
          tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
551
          break;
×
552
        }
553
        break;
7,144,759✔
554
      }
555
      case HEARTBEAT_KEY_STBINFO: {
5,065,126✔
556
        if (kv->valueLen <= 0 || NULL == kv->value) {
5,065,126✔
557
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
×
558
          break;
×
559
        }
560
        if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) {
5,065,126✔
561
          tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
562
          break;
×
563
        }
564
        break;
5,065,126✔
565
      }
566
#ifdef TD_ENTERPRISE
567
      case HEARTBEAT_KEY_DYN_VIEW: {
949✔
568
        if (kv->valueLen <= 0 || NULL == kv->value) {
949✔
569
          tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
×
570
          break;
×
571
        }
572
        if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) {
949✔
573
          tscError("Process dyn view response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
574
          break;
×
575
        }
576
        break;
949✔
577
      }
578
      case HEARTBEAT_KEY_VIEWINFO: {
949✔
579
        if (kv->valueLen <= 0 || NULL == kv->value) {
949✔
580
          tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
×
581
          break;
×
582
        }
583
        if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) {
949✔
584
          tscError("Process view info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
585
          break;
×
586
        }
587
        break;
949✔
588
      }
589
#endif
590
      case HEARTBEAT_KEY_TSMA: {
42,575✔
591
        if (kv->valueLen <= 0 || !kv->value) {
42,575✔
592
          tscError("Invalid tsma info, len:%d, value:%p", kv->valueLen, kv->value);
×
593
        }
594
        if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) {
42,575✔
595
          tscError("Process tsma info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
596
        }
597
        break;
42,575✔
598
      }
599
      default:
×
600
        tscError("invalid hb key type:%d", kv->key);
×
601
        break;
×
602
    }
603
  }
604
}
22,482,957✔
605

606
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
28,816,728✔
607
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
28,816,728✔
608
  if (NULL == pReq) {
28,816,728✔
609
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
12,784✔
610
            pRsp->connKey.connType);
611
    return TSDB_CODE_SUCCESS;
12,784✔
612
  }
613

614
  if (pRsp->query) {
28,803,944✔
615
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
28,803,944✔
616
    if (NULL == pTscObj) {
28,803,944✔
617
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
849✔
618
    } else {
619
      if (pRsp->query->totalDnodes > 1) {
28,803,095✔
620
        SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6,651,495✔
621
        if (!isEpsetEqual(&originEpset, &pRsp->query->epSet)) {
6,651,495✔
622
          SEpSet *pOrig = &originEpset;
25,801✔
623
          SEp    *pOrigEp = &pOrig->eps[pOrig->inUse];
25,801✔
624
          SEp    *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
25,801✔
625
          tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
25,801✔
626
                   pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
627
                   pNewEp->port);
628

629
          updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
25,801✔
630
        }
631
      }
632

633
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
28,803,095✔
634
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
28,803,095✔
635
      pTscObj->connId = pRsp->query->connId;
28,803,095✔
636
      tscTrace("connId:%u, hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
28,803,095✔
637
               pTscObj->pAppInfo->totalDnodes);
638

639
      if (pRsp->query->killRid) {
28,803,095✔
640
        tscDebug("QID:0x%" PRIx64 ", need to be killed now", pRsp->query->killRid);
3✔
641
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
3✔
642
        if (NULL == pRequest) {
3✔
643
          tscDebug("QID:0x%" PRIx64 ", not exist to kill", pRsp->query->killRid);
×
644
        } else {
645
          taos_stop_query((TAOS_RES *)pRequest);
3✔
646
          (void)releaseRequest(pRsp->query->killRid);
3✔
647
        }
648
      }
649

650
      if (pRsp->query->killConnection) {
28,803,095✔
651
        taos_close_internal(pTscObj);
×
652
      }
653

654
      if (pRsp->query->pQnodeList) {
28,803,095✔
655
        if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) {
19,709✔
656
          tscWarn("update qnode list failed");
×
657
        }
658
      }
659

660
      releaseTscObj(pRsp->connKey.tscRid);
28,803,095✔
661
    }
662
  }
663

664
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
28,803,944✔
665

666
  tscDebug("hb got %d rsp kv", kvNum);
28,803,944✔
667

668
  if (kvNum > 0) {
28,803,944✔
669
    struct SCatalog *pCatalog = NULL;
22,482,957✔
670
    int32_t          code = catalogGetHandle(pReq->clusterId, &pCatalog);
22,482,957✔
671
    if (code != TSDB_CODE_SUCCESS) {
22,482,957✔
672
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
×
673
    } else {
674
      hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr);
22,482,957✔
675
    }
676
  }
677

678
  taosHashRelease(pAppHbMgr->activeInfo, pReq);
28,803,944✔
679

680
  return TSDB_CODE_SUCCESS;
28,803,944✔
681
}
682

683
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
24,680,128✔
684
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
24,680,128✔
685
    goto _return;
×
686
  }
687

688
  static int32_t    emptyRspNum = 0;
689
  int32_t           idx = *(int32_t *)param;
24,679,872✔
690
  SClientHbBatchRsp pRsp = {0};
24,679,872✔
691
  if (TSDB_CODE_SUCCESS == code) {
24,679,872✔
692
    code = tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
24,525,315✔
693
    if (TSDB_CODE_SUCCESS != code) {
24,522,328✔
694
      tscError("deserialize hb rsp failed");
65✔
695
    }
696
    int32_t now = taosGetTimestampSec();
24,522,328✔
697
    int32_t delta = abs(now - pRsp.svrTimestamp);
24,523,555✔
698
    if (delta > tsTimestampDeltaLimit) {
24,523,555✔
699
      code = TSDB_CODE_TIME_UNSYNCED;
65✔
700
      tscError("time diff:%ds is too big", delta);
65✔
701
    }
702
  }
703

704
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
24,678,112✔
705

706
  (void)taosThreadMutexLock(&clientHbMgr.lock);
24,677,419✔
707

708
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
24,680,200✔
709
  if (pAppHbMgr == NULL) {
24,680,200✔
710
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
711
    tscError("appHbMgr not exist, idx:%d", idx);
×
712
    taosMemoryFree(pMsg->pData);
×
713
    taosMemoryFree(pMsg->pEpSet);
×
714
    tFreeClientHbBatchRsp(&pRsp);
715
    return TSDB_CODE_OUT_OF_RANGE;
×
716
  }
717

718
  SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
24,680,200✔
719

720
  if (code != 0) {
24,680,200✔
721
    pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
154,622✔
722
    tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
154,622✔
723
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
154,622✔
724
    taosMemoryFree(pMsg->pData);
154,622✔
725
    taosMemoryFree(pMsg->pEpSet);
154,622✔
726
    tFreeClientHbBatchRsp(&pRsp);
727
    return code;
154,622✔
728
  }
729

730
  pInst->serverCfg.monitorParas = pRsp.monitorParas;
24,525,578✔
731
  pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
24,525,578✔
732
  pInst->serverCfg.enableAuditSelect = pRsp.enableAuditSelect;
24,525,578✔
733
  pInst->serverCfg.enableAuditInsert = pRsp.enableAuditInsert;
24,525,578✔
734
  pInst->serverCfg.auditLevel = pRsp.auditLevel;
24,525,578✔
735
  pInst->serverCfg.enableStrongPass = pRsp.enableStrongPass;
24,525,578✔
736
  tsEnableStrongPassword = pInst->serverCfg.enableStrongPass;
24,525,578✔
737
  tscDebug("monitor paras from hb, clusterId:0x%" PRIx64 ", threshold:%d scope:%d", pInst->clusterId,
24,525,578✔
738
           pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
739

740
  if (rspNum) {
24,525,578✔
741
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
24,019,137✔
742
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
743
  } else {
744
    (void)atomic_add_fetch_32(&emptyRspNum, 1);
506,441✔
745
  }
746

747
  for (int32_t i = 0; i < rspNum; ++i) {
53,342,306✔
748
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
28,816,728✔
749
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
28,816,728✔
750
    if (code) {
28,816,728✔
751
      break;
×
752
    }
753
  }
754

755
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
24,525,578✔
756

757
  tFreeClientHbBatchRsp(&pRsp);
758

759
_return:
24,525,578✔
760
  taosMemoryFree(pMsg->pData);
24,525,578✔
761
  taosMemoryFree(pMsg->pEpSet);
24,525,578✔
762
  return code;
24,525,578✔
763
}
764

765
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
26,081,757✔
766
  int64_t    now = taosGetTimestampUs();
26,081,757✔
767
  SQueryDesc desc = {0};
26,081,757✔
768
  int32_t    code = 0;
26,081,757✔
769

770
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
26,081,757✔
771
  while (pIter != NULL) {
52,762,177✔
772
    int64_t     *rid = pIter;
26,680,420✔
773
    SRequestObj *pRequest = acquireRequest(*rid);
26,680,420✔
774
    if (NULL == pRequest) {
26,680,420✔
775
      pIter = taosHashIterate(pObj->pRequests, pIter);
13,192✔
776
      continue;
13,192✔
777
    }
778

779
    if (pRequest->killed || 0 == pRequest->body.queryJob) {
26,667,228✔
780
      (void)releaseRequest(*rid);
9,212,025✔
781
      pIter = taosHashIterate(pObj->pRequests, pIter);
9,212,025✔
782
      continue;
9,212,025✔
783
    }
784

785
    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
17,455,203✔
786
    desc.stime = pRequest->metric.start / 1000;
17,455,203✔
787
    desc.queryId = pRequest->requestId;
17,455,203✔
788
    desc.useconds = now - pRequest->metric.start;
17,455,203✔
789
    desc.reqRid = pRequest->self;
17,455,203✔
790
    desc.stableQuery = pRequest->stableQuery;
17,455,203✔
791
    desc.isSubQuery = pRequest->isSubReq;
17,455,203✔
792
    code = taosGetFqdn(desc.fqdn);
17,455,203✔
793
    if (TSDB_CODE_SUCCESS != code) {
17,455,203✔
794
      (void)releaseRequest(*rid);
×
795
      tscError("get fqdn failed");
×
796
      return TSDB_CODE_FAILED;
×
797
    }
798
    desc.subPlanNum = pRequest->body.subplanNum;
17,455,203✔
799

800
    if (desc.subPlanNum) {
17,455,203✔
801
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
17,455,203✔
802
      if (NULL == desc.subDesc) {
17,455,203✔
803
        (void)releaseRequest(*rid);
×
804
        return terrno;
×
805
      }
806

807
      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
17,455,203✔
808
      if (code) {
17,455,203✔
809
        taosArrayDestroy(desc.subDesc);
5,625,561✔
810
        desc.subDesc = NULL;
5,625,561✔
811
        code = TSDB_CODE_SUCCESS;
5,625,561✔
812
      }
813
      desc.subPlanNum = taosArrayGetSize(desc.subDesc);
17,455,203✔
814
    } else {
815
      desc.subDesc = NULL;
×
816
    }
817

818
    (void)releaseRequest(*rid);
17,455,203✔
819
    if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) {
34,910,406✔
820
      taosArrayDestroy(desc.subDesc);
×
821
      return terrno;
×
822
    }
823

824
    pIter = taosHashIterate(pObj->pRequests, pIter);
17,455,203✔
825
  }
826

827
  return code;
26,081,757✔
828
}
829

830
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
28,981,102✔
831
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
28,981,102✔
832
  if (NULL == pTscObj) {
28,981,102✔
833
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
×
834
    return terrno;
×
835
  }
836

837
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
28,981,102✔
838
  if (NULL == hbBasic) {
28,981,102✔
839
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
×
840
    releaseTscObj(connKey->tscRid);
×
841
    return terrno;
×
842
  }
843

844
  hbBasic->connId = pTscObj->connId;
28,981,102✔
845

846
  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
28,981,102✔
847
  if (numOfQueries <= 0) {
28,981,102✔
848
    req->query = hbBasic;
2,899,345✔
849
    releaseTscObj(connKey->tscRid);
2,899,345✔
850
    tscDebug("no queries on connection");
2,899,345✔
851
    return TSDB_CODE_SUCCESS;
2,899,345✔
852
  }
853

854
  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
26,081,757✔
855
  if (NULL == hbBasic->queryDesc) {
26,081,757✔
856
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
×
857
    releaseTscObj(connKey->tscRid);
×
858
    taosMemoryFree(hbBasic);
×
859
    return terrno;
×
860
  }
861

862
  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
26,081,757✔
863
  if (code) {
26,081,757✔
864
    releaseTscObj(connKey->tscRid);
×
865
    if (hbBasic->queryDesc) {
×
866
      taosArrayDestroyEx(hbBasic->queryDesc, tFreeClientHbQueryDesc);
×
867
    }
868
    taosMemoryFree(hbBasic);
×
869
    return code;
×
870
  }
871

872
  req->query = hbBasic;
26,081,757✔
873
  releaseTscObj(connKey->tscRid);
26,081,757✔
874

875
  return TSDB_CODE_SUCCESS;
26,081,757✔
876
}
877

878
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
732,279✔
879
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
732,279✔
880
  if (!pTscObj) {
732,279✔
881
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
36✔
882
    return terrno;
36✔
883
  }
884

885
  int32_t code = 0;
732,243✔
886

887
  SKv  kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
732,243✔
888
  SKv *pKv = NULL;
732,243✔
889
  if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
732,243✔
890
    int32_t           userNum = pKv->valueLen / sizeof(SUserAuthVersion);
483,126✔
891
    SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
483,126✔
892
    for (int32_t i = 0; i < userNum; ++i) {
496,674✔
893
      SUserAuthVersion *pUserAuth = userAuths + i;
496,055✔
894
      // both key and user exist, update version
895
      if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
496,055✔
896
        pUserAuth->version = htonl(-1);  // force get userAuthInfo
482,507✔
897
        goto _return;
482,507✔
898
      }
899
    }
900
    // key exists, user not exist, append user
901
    SUserAuthVersion *qUserAuth =
1,238✔
902
        (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
619✔
903
    if (qUserAuth) {
619✔
904
      tstrncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
619✔
905
      (qUserAuth + userNum)->version = htonl(-1);  // force get userAuthInfo
619✔
906
      pKv->value = qUserAuth;
619✔
907
      pKv->valueLen += sizeof(SUserAuthVersion);
619✔
908
    } else {
909
      code = terrno;
×
910
    }
911
    goto _return;
619✔
912
  }
913

914
  // key/user not exist, add user
915
  SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
249,117✔
916
  if (!user) {
249,117✔
917
    code = terrno;
×
918
    goto _return;
×
919
  }
920
  tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN);
249,117✔
921
  user->version = htonl(-1);  // force get userAuthInfo
249,117✔
922
  kv.valueLen = sizeof(SUserAuthVersion);
249,117✔
923
  kv.value = user;
249,117✔
924

925
  tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
249,117✔
926
           pTscObj->authVer, connKey->tscRid);
927

928
  if (!req->info) {
249,117✔
929
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
249,117✔
930
    if (NULL == req->info) {
249,117✔
931
      code = terrno;
×
932
      goto _return;
×
933
    }
934
  }
935

936
  if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) {
249,117✔
937
    taosMemoryFree(user);
×
938
    code = terrno ? terrno : TSDB_CODE_APP_ERROR;
×
939
    goto _return;
×
940
  }
941

942
_return:
725,494✔
943
  releaseTscObj(connKey->tscRid);
732,243✔
944
  if (code) {
732,243✔
945
    tscError("hb got user auth info failed since %s", tstrerror(code));
×
946
  }
947

948
  return code;
732,243✔
949
}
950

951
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
22,570,378✔
952
  SUserAuthVersion *users = NULL;
22,570,378✔
953
  uint32_t          userNum = 0;
22,570,378✔
954
  int32_t           code = 0;
22,570,378✔
955

956
  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
22,570,378✔
957
  if (TSDB_CODE_SUCCESS != code) {
22,570,378✔
958
    return code;
×
959
  }
960

961
  if (userNum <= 0) {
22,570,378✔
962
    taosMemoryFree(users);
177,871✔
963
    return TSDB_CODE_SUCCESS;
177,871✔
964
  }
965

966
  for (int32_t i = 0; i < userNum; ++i) {
45,012,982✔
967
    SUserAuthVersion *user = &users[i];
22,620,475✔
968
    user->version = htonl(user->version);
22,620,475✔
969
  }
970

971
  SKv kv = {
22,392,507✔
972
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
973
      .valueLen = sizeof(SUserAuthVersion) * userNum,
22,392,507✔
974
      .value = users,
975
  };
976

977
  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
22,392,507✔
978

979
  if (NULL == req->info) {
22,392,507✔
980
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
22,392,507✔
981
    if (NULL == req->info) {
22,392,507✔
982
      taosMemoryFree(users);
×
983
      return terrno;
×
984
    }
985
  }
986

987
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
22,392,507✔
988
  if (TSDB_CODE_SUCCESS != code) {
22,392,507✔
989
    taosMemoryFree(users);
×
990
    return code;
×
991
  }
992

993
  return TSDB_CODE_SUCCESS;
22,392,507✔
994
}
995

996
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
24,181,549✔
997
  SDbCacheInfo *dbs = NULL;
24,181,549✔
998
  uint32_t      dbNum = 0;
24,181,549✔
999
  int32_t       code = 0;
24,181,549✔
1000

1001
  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
24,181,549✔
1002
  if (TSDB_CODE_SUCCESS != code) {
24,181,549✔
1003
    return code;
×
1004
  }
1005

1006
  if (dbNum <= 0) {
24,181,549✔
1007
    taosMemoryFree(dbs);
17,011,430✔
1008
    return TSDB_CODE_SUCCESS;
17,011,430✔
1009
  }
1010

1011
  for (int32_t i = 0; i < dbNum; ++i) {
15,871,779✔
1012
    SDbCacheInfo *db = &dbs[i];
8,701,660✔
1013
    tscDebug("the %dth expired db:%s, dbId:%" PRId64
8,701,660✔
1014
             ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
1015
             i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
1016

1017
    db->dbId = htobe64(db->dbId);
8,701,660✔
1018
    db->vgVersion = htonl(db->vgVersion);
8,701,660✔
1019
    db->cfgVersion = htonl(db->cfgVersion);
8,701,660✔
1020
    db->numOfTable = htonl(db->numOfTable);
8,701,660✔
1021
    db->stateTs = htobe64(db->stateTs);
8,701,660✔
1022
    db->tsmaVersion = htonl(db->tsmaVersion);
8,701,660✔
1023
  }
1024

1025
  SKv kv = {
7,170,119✔
1026
      .key = HEARTBEAT_KEY_DBINFO,
1027
      .valueLen = sizeof(SDbCacheInfo) * dbNum,
7,170,119✔
1028
      .value = dbs,
1029
  };
1030

1031
  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
7,170,119✔
1032

1033
  if (NULL == req->info) {
7,170,119✔
1034
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
32✔
1035
    if (NULL == req->info) {
32✔
1036
      taosMemoryFree(dbs);
×
1037
      return terrno;
×
1038
    }
1039
  }
1040

1041
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
7,170,119✔
1042
  if (TSDB_CODE_SUCCESS != code) {
7,170,119✔
1043
    taosMemoryFree(dbs);
×
1044
    return code;
×
1045
  }
1046

1047
  return TSDB_CODE_SUCCESS;
7,170,119✔
1048
}
1049

1050
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
24,181,549✔
1051
  SSTableVersion *stbs = NULL;
24,181,549✔
1052
  uint32_t        stbNum = 0;
24,181,549✔
1053
  int32_t         code = 0;
24,181,549✔
1054

1055
  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
24,181,549✔
1056
  if (TSDB_CODE_SUCCESS != code) {
24,181,549✔
1057
    return code;
×
1058
  }
1059

1060
  if (stbNum <= 0) {
24,181,549✔
1061
    taosMemoryFree(stbs);
19,100,488✔
1062
    return TSDB_CODE_SUCCESS;
19,100,488✔
1063
  }
1064

1065
  for (int32_t i = 0; i < stbNum; ++i) {
11,752,856✔
1066
    SSTableVersion *stb = &stbs[i];
6,671,795✔
1067
    stb->suid = htobe64(stb->suid);
6,671,795✔
1068
    stb->sversion = htonl(stb->sversion);
6,671,795✔
1069
    stb->tversion = htonl(stb->tversion);
6,671,795✔
1070
    stb->smaVer = htonl(stb->smaVer);
6,671,795✔
1071
  }
1072

1073
  SKv kv = {
5,081,061✔
1074
      .key = HEARTBEAT_KEY_STBINFO,
1075
      .valueLen = sizeof(SSTableVersion) * stbNum,
5,081,061✔
1076
      .value = stbs,
1077
  };
1078

1079
  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
5,081,061✔
1080

1081
  if (NULL == req->info) {
5,081,061✔
1082
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
28✔
1083
    if (NULL == req->info) {
28✔
1084
      taosMemoryFree(stbs);
×
1085
      return terrno;
×
1086
    }
1087
  }
1088

1089
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
5,081,061✔
1090
  if (TSDB_CODE_SUCCESS != code) {
5,081,061✔
1091
    taosMemoryFree(stbs);
×
1092
    return code;
×
1093
  }
1094

1095
  return TSDB_CODE_SUCCESS;
5,081,061✔
1096
}
1097

1098
int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
24,181,549✔
1099
  SViewVersion    *views = NULL;
24,181,549✔
1100
  uint32_t         viewNum = 0;
24,181,549✔
1101
  int32_t          code = 0;
24,181,549✔
1102
  SDynViewVersion *pDynViewVer = NULL;
24,181,549✔
1103

1104
  TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer));
24,181,549✔
1105

1106
  if (viewNum <= 0) {
24,181,549✔
1107
    taosMemoryFree(views);
24,179,322✔
1108
    taosMemoryFree(pDynViewVer);
24,179,322✔
1109
    return TSDB_CODE_SUCCESS;
24,179,322✔
1110
  }
1111

1112
  for (int32_t i = 0; i < viewNum; ++i) {
4,454✔
1113
    SViewVersion *view = &views[i];
2,227✔
1114
    view->dbId = htobe64(view->dbId);
2,227✔
1115
    view->viewId = htobe64(view->viewId);
2,227✔
1116
    view->version = htonl(view->version);
2,227✔
1117
  }
1118

1119
  tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
2,227✔
1120

1121
  if (NULL == req->info) {
2,227✔
1122
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1123
    if (NULL == req->info) {
×
1124
      TSC_ERR_JRET(terrno);
×
1125
    }
1126
  }
1127

1128
  SKv kv = {
2,227✔
1129
      .key = HEARTBEAT_KEY_DYN_VIEW,
1130
      .valueLen = sizeof(SDynViewVersion),
1131
      .value = pDynViewVer,
1132
  };
1133

1134
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
2,227✔
1135

1136
  kv.key = HEARTBEAT_KEY_VIEWINFO;
2,227✔
1137
  kv.valueLen = sizeof(SViewVersion) * viewNum;
2,227✔
1138
  kv.value = views;
2,227✔
1139

1140
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
2,227✔
1141
  return TSDB_CODE_SUCCESS;
2,227✔
1142
_return:
×
1143
  taosMemoryFree(views);
×
1144
  taosMemoryFree(pDynViewVer);
×
1145
  return code;
×
1146
}
1147

1148
int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) {
24,181,549✔
1149
  int32_t       code = 0;
24,181,549✔
1150
  uint32_t      tsmaNum = 0;
24,181,549✔
1151
  STSMAVersion *tsmas = NULL;
24,181,549✔
1152

1153
  code = catalogGetExpiredTsmas(pCatalog, &tsmas, &tsmaNum);
24,181,549✔
1154
  if (code) {
24,181,549✔
1155
    taosMemoryFree(tsmas);
×
1156
    return code;
×
1157
  }
1158

1159
  if (tsmaNum <= 0) {
24,181,549✔
1160
    taosMemoryFree(tsmas);
24,138,974✔
1161
    return TSDB_CODE_SUCCESS;
24,138,974✔
1162
  }
1163

1164
  for (int32_t i = 0; i < tsmaNum; ++i) {
85,150✔
1165
    STSMAVersion *tsma = &tsmas[i];
42,575✔
1166
    tsma->dbId = htobe64(tsma->dbId);
42,575✔
1167
    tsma->tsmaId = htobe64(tsma->tsmaId);
42,575✔
1168
    tsma->version = htonl(tsma->version);
42,575✔
1169
  }
1170

1171
  tscDebug("hb got %d expred tsmas, valueLen:%lu", tsmaNum, sizeof(STSMAVersion) * tsmaNum);
42,575✔
1172

1173
  if (!pReq->info) {
42,575✔
1174
    pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1175
    if (!pReq->info) {
×
1176
      taosMemoryFree(tsmas);
×
1177
      return terrno;
×
1178
    }
1179
  }
1180

1181
  SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas};
42,575✔
1182
  code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
42,575✔
1183
  if (TSDB_CODE_SUCCESS != code) {
42,575✔
1184
    taosMemoryFree(tsmas);
×
1185
    return code;
×
1186
  }
1187
  return TSDB_CODE_SUCCESS;
42,575✔
1188
}
1189

1190
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
28,981,102✔
1191
  SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
28,981,102✔
1192
  if (NULL != pApp) {
28,981,102✔
1193
    (void)memcpy(&req->app, pApp, sizeof(*pApp));
28,981,102✔
1194
  } else {
1195
    (void)memset(&req->app.summary, 0, sizeof(req->app.summary));
×
1196
    req->app.pid = taosGetPId();
×
1197
    req->app.appId = clientHbMgr.appId;
×
1198
    TSC_ERR_RET(taosGetAppName(req->app.name, NULL));
×
1199
  }
1200

1201
  return TSDB_CODE_SUCCESS;
28,981,102✔
1202
}
1203

1204
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
28,981,102✔
1205
  int32_t   code = 0;
28,981,102✔
1206
  SHbParam *hbParam = (SHbParam *)param;
28,981,102✔
1207
  SCatalog *pCatalog = NULL;
28,981,102✔
1208

1209
  code = hbGetQueryBasicInfo(connKey, req);
28,981,102✔
1210
  if (code != TSDB_CODE_SUCCESS) {
28,981,102✔
1211
    tscWarn("hbGetQueryBasicInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1212
    return code;
×
1213
  }
1214

1215
  if (hbParam->reqCnt == 0) {
28,981,102✔
1216
    code = catalogGetHandle(hbParam->clusterId, &pCatalog);
24,181,585✔
1217
    if (code != TSDB_CODE_SUCCESS) {
24,181,585✔
1218
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1219
      return code;
×
1220
    }
1221

1222
    code = hbGetAppInfo(hbParam->clusterId, req);
24,181,585✔
1223
    if (TSDB_CODE_SUCCESS != code) {
24,181,585✔
1224
      tscWarn("getAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1225
      return code;
×
1226
    }
1227

1228
    if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
24,181,585✔
1229
      code = hbGetExpiredUserInfo(connKey, pCatalog, req);
22,570,378✔
1230
      if (TSDB_CODE_SUCCESS != code) {
22,570,378✔
1231
        tscWarn("hbGetExpiredUserInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1232
        return code;
×
1233
      }
1234
      if (clientHbMgr.appHbHash) {
22,570,378✔
1235
        code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
2,686,458✔
1236
        if (TSDB_CODE_SUCCESS != code) {
2,686,458✔
1237
          tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId,
×
1238
                  tstrerror(code));
1239
          return code;
×
1240
        }
1241
      }
1242
    }
1243

1244
    // invoke after hbGetExpiredUserInfo
1245
    if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
24,181,585✔
1246
      code = hbGetUserAuthInfo(connKey, hbParam, req);
732,279✔
1247
      if (TSDB_CODE_SUCCESS != code) {
732,279✔
1248
        tscWarn("hbGetUserAuthInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
36✔
1249
        return code;
36✔
1250
      }
1251
      atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
732,243✔
1252
    }
1253

1254
    code = hbGetExpiredDBInfo(connKey, pCatalog, req);
24,181,549✔
1255
    if (TSDB_CODE_SUCCESS != code) {
24,181,549✔
1256
      tscWarn("hbGetExpiredDBInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1257
      return code;
×
1258
    }
1259

1260
    code = hbGetExpiredStbInfo(connKey, pCatalog, req);
24,181,549✔
1261
    if (TSDB_CODE_SUCCESS != code) {
24,181,549✔
1262
      tscWarn("hbGetExpiredStbInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1263
      return code;
×
1264
    }
1265

1266
#ifdef TD_ENTERPRISE
1267
    code = hbGetExpiredViewInfo(connKey, pCatalog, req);
24,181,549✔
1268
    if (TSDB_CODE_SUCCESS != code) {
24,181,549✔
1269
      tscWarn("hbGetExpiredViewInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1270
      return code;
×
1271
    }
1272
#endif
1273
    code = hbGetExpiredTSMAInfo(connKey, pCatalog, req);
24,181,549✔
1274
    if (TSDB_CODE_SUCCESS != code) {
24,181,549✔
1275
      tscWarn("hbGetExpiredTSMAInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1276
      return code;
×
1277
    }
1278
  } else {
1279
    code = hbGetAppInfo(hbParam->clusterId, req);
4,799,517✔
1280
    if (TSDB_CODE_SUCCESS != code) {
4,799,517✔
1281
      tscWarn("hbGetAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1282
      return code;
×
1283
    }
1284
  }
1285

1286
  ++hbParam->reqCnt;  // success to get catalog info
28,981,066✔
1287

1288
  return TSDB_CODE_SUCCESS;
28,981,066✔
1289
}
1290

1291
static FORCE_INLINE void hbMgrInitHandle() {
1292
  // init all handle
1293
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
1,232,446✔
1294
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbQueryHbReqHandle;
1,232,446✔
1295

1296
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
1,232,446✔
1297
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbQueryHbRspHandle;
1,232,446✔
1298
}
1,232,446✔
1299

1300
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
24,690,885✔
1301
  *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
24,690,885✔
1302
  if (pBatchReq == NULL) {
24,690,885✔
1303
    return terrno;
×
1304
  }
1305
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
24,690,885✔
1306
  (*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
24,690,885✔
1307
  if (!(*pBatchReq)->reqs) {
24,690,885✔
1308
    tFreeClientHbBatchReq(*pBatchReq);
×
1309
    return terrno;
×
1310
  }
1311

1312
  int64_t  maxIpWhiteVer = 0;
24,690,885✔
1313
  void    *pIter = NULL;
24,690,885✔
1314
  SHbParam param = {0};
24,690,885✔
1315
  while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
54,968,650✔
1316
    SClientHbReq *pOneReq = pIter;
30,277,765✔
1317
    SClientHbKey *connKey = &pOneReq->connKey;
30,277,765✔
1318
    STscObj      *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
30,277,765✔
1319

1320
    if (!pTscObj || atomic_load_8(&pTscObj->dropped) == 1) {
30,277,765✔
1321
      if (pTscObj) releaseTscObj(connKey->tscRid);
1,296,663✔
1322
      continue;
1,296,663✔
1323
    }
1324

1325
    tstrncpy(pOneReq->userApp, pTscObj->optionInfo.userApp, sizeof(pOneReq->userApp));
28,981,102✔
1326
    tstrncpy(pOneReq->cInfo, pTscObj->optionInfo.cInfo, sizeof(pOneReq->cInfo));
28,981,102✔
1327
    pOneReq->userIp = pTscObj->optionInfo.userIp;
28,981,102✔
1328
    pOneReq->userDualIp = pTscObj->optionInfo.userDualIp;
28,981,102✔
1329
    tstrncpy(pOneReq->sVer, td_version, TSDB_VERSION_LEN);
28,981,102✔
1330

1331
    pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
28,981,102✔
1332
    if (NULL == pOneReq) {
28,981,102✔
1333
      releaseTscObj(connKey->tscRid);
×
1334
      continue;
×
1335
    }
1336

1337
    switch (connKey->connType) {
28,981,102✔
1338
      case CONN_TYPE__QUERY:
28,981,102✔
1339
      case CONN_TYPE__TMQ: {
1340
        if (param.clusterId == 0) {
28,981,102✔
1341
          // init
1342
          param.clusterId = pOneReq->clusterId;
24,181,585✔
1343
          param.pAppHbMgr = pAppHbMgr;
24,181,585✔
1344
          param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
24,181,585✔
1345
        }
1346
        break;
28,981,102✔
1347
      }
1348
      default:
×
1349
        break;
×
1350
    }
1351
    if (clientHbMgr.reqHandle[connKey->connType]) {
28,981,102✔
1352
      int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, &param, pOneReq);
28,981,102✔
1353
      if (code) {
28,981,102✔
1354
        tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
36✔
1355
                connKey->tscRid, connKey->connType);
1356
      }
1357
    }
1358

1359
    int64_t ver = atomic_load_64(&pTscObj->whiteListInfo.ver);
28,981,102✔
1360
    maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
28,981,102✔
1361
    releaseTscObj(connKey->tscRid);
28,981,102✔
1362
  }
1363
  (*pBatchReq)->ipWhiteListVer = maxIpWhiteVer;
24,690,885✔
1364

1365
  return TSDB_CODE_SUCCESS;
24,690,885✔
1366
}
1367

1368
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
×
1369

1370
void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
3,230,872✔
1371
  dst->numOfInsertsReq += src->numOfInsertsReq;
3,230,872✔
1372
  dst->numOfInsertRows += src->numOfInsertRows;
3,230,872✔
1373
  dst->insertElapsedTime += src->insertElapsedTime;
3,230,872✔
1374
  dst->insertBytes += src->insertBytes;
3,230,872✔
1375
  dst->fetchBytes += src->fetchBytes;
3,230,872✔
1376
  dst->queryElapsedTime += src->queryElapsedTime;
3,230,872✔
1377
  dst->numOfSlowQueries += src->numOfSlowQueries;
3,230,872✔
1378
  dst->totalRequests += src->totalRequests;
3,230,872✔
1379
  dst->currentRequests += src->currentRequests;
3,230,872✔
1380
}
3,230,872✔
1381

1382
int32_t hbGatherAppInfo(void) {
23,911,443✔
1383
  SAppHbReq req = {0};
23,911,443✔
1384
  int32_t   code = TSDB_CODE_SUCCESS;
23,911,443✔
1385
  int       sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
23,911,443✔
1386
  if (sz > 0) {
23,911,443✔
1387
    req.pid = taosGetPId();
23,911,443✔
1388
    req.appId = clientHbMgr.appId;
23,911,443✔
1389
    TSC_ERR_RET(taosGetAppName(req.name, NULL));
23,911,443✔
1390
  }
1391

1392
  taosHashClear(clientHbMgr.appSummary);
23,911,443✔
1393

1394
  for (int32_t i = 0; i < sz; ++i) {
51,063,001✔
1395
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
27,151,558✔
1396
    if (pAppHbMgr == NULL) continue;
27,151,558✔
1397

1398
    int64_t    clusterId = pAppHbMgr->pAppInstInfo->clusterId;
27,151,558✔
1399
    SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
27,151,558✔
1400
    if (NULL == pApp) {
27,151,558✔
1401
      (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
23,920,686✔
1402
      req.startTime = pAppHbMgr->startTime;
23,920,686✔
1403
      TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)));
23,920,686✔
1404
    } else {
1405
      if (pAppHbMgr->startTime < pApp->startTime) {
3,230,872✔
1406
        pApp->startTime = pAppHbMgr->startTime;
×
1407
      }
1408

1409
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
3,230,872✔
1410
    }
1411
  }
1412

1413
  return TSDB_CODE_SUCCESS;
23,911,443✔
1414
}
1415

1416
static void *hbThreadFunc(void *param) {
1,232,446✔
1417
  setThreadName("hb");
1,232,446✔
1418
#ifdef WINDOWS
1419
  if (taosCheckCurrentInDll()) {
1420
    atexit(hbThreadFuncUnexpectedStopped);
1421
  }
1422
#endif
1423
  while (1) {
23,153,312✔
1424
    if (1 == clientHbMgr.threadStop) {
24,385,758✔
1425
      break;
467,932✔
1426
    }
1427

1428
    if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) {
23,917,826✔
1429
      tscError("taosThreadMutexLock failed");
×
1430
      return NULL;
×
1431
    }
1432

1433
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
23,917,826✔
1434
    if (sz > 0) {
23,917,826✔
1435
      if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) {
23,911,443✔
1436
        tscError("hbGatherAppInfo failed");
×
1437
        return NULL;
×
1438
      }
1439
      if (sz > 1 && !clientHbMgr.appHbHash) {
23,911,443✔
1440
        clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
42,187✔
1441
        if (NULL == clientHbMgr.appHbHash) {
42,187✔
1442
          tscError("taosHashInit failed");
×
1443
          return NULL;
×
1444
        }
1445
      }
1446
      taosHashClear(clientHbMgr.appHbHash);
23,911,443✔
1447
    }
1448

1449
    for (int i = 0; i < sz; i++) {
51,069,384✔
1450
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
27,151,558✔
1451
      if (pAppHbMgr == NULL) {
27,151,558✔
1452
        continue;
17,396✔
1453
      }
1454

1455
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
27,151,558✔
1456
      if (connCnt == 0) {
27,151,558✔
1457
        continue;
2,460,673✔
1458
      }
1459
      SClientHbBatchReq *pReq = NULL;
24,690,885✔
1460
      int32_t            code = hbGatherAllInfo(pAppHbMgr, &pReq);
24,690,885✔
1461
      if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
24,690,885✔
1462
        terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
×
1463
        tFreeClientHbBatchReq(pReq);
×
1464
        continue;
×
1465
      }
1466
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
24,690,885✔
1467
      if (tlen == -1) {
24,690,885✔
1468
        tFreeClientHbBatchReq(pReq);
×
1469
        break;
×
1470
      }
1471
      void *buf = taosMemoryMalloc(tlen);
24,690,885✔
1472
      if (buf == NULL) {
24,690,885✔
1473
        tFreeClientHbBatchReq(pReq);
×
1474
        // hbClearReqInfo(pAppHbMgr);
1475
        break;
×
1476
      }
1477

1478
      if (tSerializeSClientHbBatchReq(buf, tlen, pReq) == -1) {
24,690,885✔
1479
        tFreeClientHbBatchReq(pReq);
×
1480
        taosMemoryFree(buf);
×
1481
        break;
×
1482
      }
1483
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
24,690,885✔
1484

1485
      if (pInfo == NULL) {
24,690,885✔
1486
        tFreeClientHbBatchReq(pReq);
×
1487
        // hbClearReqInfo(pAppHbMgr);
1488
        taosMemoryFree(buf);
×
1489
        break;
×
1490
      }
1491
      pInfo->fp = hbAsyncCallBack;
24,690,885✔
1492
      pInfo->msgInfo.pData = buf;
24,690,885✔
1493
      pInfo->msgInfo.len = tlen;
24,690,885✔
1494
      pInfo->msgType = TDMT_MND_HEARTBEAT;
24,690,885✔
1495
      pInfo->param = taosMemoryMalloc(sizeof(int32_t));
24,690,885✔
1496
      if (pInfo->param  == NULL) {
24,690,885✔
1497
        tFreeClientHbBatchReq(pReq);
×
1498
        // hbClearReqInfo(pAppHbMgr);
1499
        taosMemoryFree(buf);
×
1500
        taosMemoryFree(pInfo);
×
1501
        break;
×
1502
      }
1503
      *(int32_t *)pInfo->param = i;
24,690,885✔
1504
      pInfo->paramFreeFp = taosAutoMemoryFree;
24,690,885✔
1505
      pInfo->requestId = generateRequestId();
24,690,885✔
1506
      pInfo->requestObjRefId = 0;
24,690,885✔
1507

1508
      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
24,690,885✔
1509
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
24,690,885✔
1510
      if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
24,690,885✔
1511
        tscWarn("failed to async send msg to server");
×
1512
      }
1513
      tFreeClientHbBatchReq(pReq);
24,690,885✔
1514
      // hbClearReqInfo(pAppHbMgr);
1515
      (void)atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
24,690,885✔
1516
    }
1517

1518
    if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) {
23,917,826✔
1519
      tscError("taosThreadMutexLock failed");
×
1520
      return NULL;
×
1521
    }
1522
    taosMsleep(HEARTBEAT_INTERVAL);
23,917,826✔
1523
  }
1524
  taosHashCleanup(clientHbMgr.appHbHash);
467,932✔
1525
  return NULL;
467,932✔
1526
}
1527

1528
static int32_t hbCreateThread() {
1,232,446✔
1529
  int32_t      code = TSDB_CODE_SUCCESS;
1,232,446✔
1530
  TdThreadAttr thAttr;
1,223,809✔
1531
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
1,232,446✔
1532
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
1,232,446✔
1533
#ifdef TD_COMPACT_OS
1534
  TSC_ERR_JRET(taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL));
1535
#endif
1536

1537
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
1,232,446✔
1538
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1539
    TSC_ERR_RET(terrno);
×
1540
  }
1541
  (void)taosThreadAttrDestroy(&thAttr);
1,232,446✔
1542
_return:
1,232,446✔
1543

1544
  if (code) {
1,232,446✔
1545
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1546
    TSC_ERR_RET(terrno);
×
1547
  }
1548

1549
  return code;
1,232,446✔
1550
}
1551

1552
static void hbStopThread() {
1,233,521✔
1553
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
1,233,521✔
1554
    return;
1,075✔
1555
  }
1556
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
1,232,446✔
1557
    tscDebug("hb thread already stopped");
×
1558
    return;
×
1559
  }
1560

1561
  int32_t code = TSDB_CODE_SUCCESS;
1,232,446✔
1562
  // thread quit mode kill or inner exit from self-thread
1563
  if (clientHbMgr.quitByKill) {
1,232,446✔
1564
    code = taosThreadKill(clientHbMgr.thread, 0);
839,026✔
1565
    if (TSDB_CODE_SUCCESS != code) {
839,026✔
1566
      tscError("taosThreadKill failed since %s", tstrerror(code));
×
1567
    }
1568
  } else {
1569
    code = taosThreadJoin(clientHbMgr.thread, NULL);
393,420✔
1570
    if (TSDB_CODE_SUCCESS != code) {
393,420✔
1571
      tscError("taosThreadJoin failed since %s", tstrerror(code));
×
1572
    }
1573
  }
1574

1575
  tscDebug("hb thread stopped");
1,232,446✔
1576
}
1577

1578
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) {
1,292,381✔
1579
  int32_t code = TSDB_CODE_SUCCESS;
1,292,381✔
1580
  TSC_ERR_RET(hbMgrInit());
1,292,381✔
1581
  *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
1,292,381✔
1582
  if (*pAppHbMgr == NULL) {
1,292,381✔
1583
    TSC_ERR_JRET(terrno);
×
1584
  }
1585
  // init stat
1586
  (*pAppHbMgr)->startTime = taosGetTimestampMs();
2,575,136✔
1587
  (*pAppHbMgr)->connKeyCnt = 0;
1,292,381✔
1588
  (*pAppHbMgr)->connHbFlag = 0;
1,292,381✔
1589
  (*pAppHbMgr)->reportCnt = 0;
1,292,381✔
1590
  (*pAppHbMgr)->reportBytes = 0;
1,292,381✔
1591
  (*pAppHbMgr)->key = taosStrdup(key);
1,292,381✔
1592
  if ((*pAppHbMgr)->key == NULL) {
1,292,381✔
1593
    TSC_ERR_JRET(terrno);
×
1594
  }
1595

1596
  // init app info
1597
  (*pAppHbMgr)->pAppInstInfo = pAppInstInfo;
1,292,381✔
1598

1599
  // init hash info
1600
  (*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1,292,381✔
1601

1602
  if ((*pAppHbMgr)->activeInfo == NULL) {
1,292,381✔
1603
    TSC_ERR_JRET(terrno);
×
1604
  }
1605

1606
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
1607

1608
  TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
1,292,381✔
1609
  if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
2,584,762✔
1610
    code = terrno;
×
1611
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
1612
    goto _return;
×
1613
  }
1614
  (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
1,292,381✔
1615
  TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
1,292,381✔
1616

1617
  return TSDB_CODE_SUCCESS;
1,292,381✔
1618
_return:
×
1619
  taosMemoryFree(*pAppHbMgr);
×
1620
  return code;
×
1621
}
1622

1623
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
1,292,381✔
1624
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
1,292,381✔
1625
  while (pIter != NULL) {
1,459,755✔
1626
    SClientHbReq *pOneReq = pIter;
167,374✔
1627
    tFreeClientHbReq(pOneReq);
1628
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
167,374✔
1629
  }
1630
  taosHashCleanup(pTarget->activeInfo);
1,292,381✔
1631
  pTarget->activeInfo = NULL;
1,292,381✔
1632

1633
  taosMemoryFree(pTarget->key);
1,292,381✔
1634
  taosMemoryFree(pTarget);
1,292,381✔
1635
}
1,292,381✔
1636

1637
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
1,292,381✔
1638
  int32_t code = TSDB_CODE_SUCCESS;
1,292,381✔
1639
  code = taosThreadMutexLock(&clientHbMgr.lock);
1,292,381✔
1640
  if (TSDB_CODE_SUCCESS != code) {
1,292,381✔
1641
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1642
  }
1643
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,292,381✔
1644
  for (int32_t i = 0; i < mgrSize; ++i) {
1,292,381✔
1645
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
×
1646
    if (pItem == *pAppHbMgr) {
×
1647
      hbFreeAppHbMgr(*pAppHbMgr);
×
1648
      *pAppHbMgr = NULL;
×
1649
      taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
×
1650
      break;
×
1651
    }
1652
  }
1653
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,292,381✔
1654
  if (TSDB_CODE_SUCCESS != code) {
1,292,381✔
1655
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1656
  }
1657
}
1,292,381✔
1658

1659
void appHbMgrCleanup(void) {
1,232,446✔
1660
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,232,446✔
1661
  for (int i = 0; i < sz; i++) {
2,524,827✔
1662
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
1,292,381✔
1663
    if (pTarget == NULL) continue;
1,292,381✔
1664
    hbFreeAppHbMgr(pTarget);
1,292,381✔
1665
  }
1666
}
1,232,446✔
1667

1668
int32_t hbMgrInit() {
1,292,381✔
1669
  // init once
1670
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
1,292,381✔
1671
  if (old == 1) return 0;
1,292,381✔
1672

1673
  clientHbMgr.appId = tGenIdPI64();
1,232,446✔
1674
  tscInfo("app initialized, appId:0x%" PRIx64, clientHbMgr.appId);
1,232,446✔
1675

1676
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,232,446✔
1677
  if (NULL == clientHbMgr.appSummary) {
1,232,446✔
1678
    uError("hbMgrInit:taosHashInit error") return terrno;
×
1679
  }
1680
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
1,232,446✔
1681
  if (NULL == clientHbMgr.appHbMgrs) {
1,232,446✔
1682
    uError("hbMgrInit:taosArrayInit error") return terrno;
×
1683
  }
1684
  TdThreadMutexAttr attr = {0};
1,232,446✔
1685

1686
  int ret = taosThreadMutexAttrInit(&attr);
1,232,446✔
1687
  if (ret != 0) {
1,232,446✔
1688
    uError("hbMgrInit:taosThreadMutexAttrInit error") return ret;
×
1689
  }
1690

1691
  ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
1,232,446✔
1692
  if (ret != 0) {
1,232,446✔
1693
    uError("hbMgrInit:taosThreadMutexAttrSetType error") return ret;
×
1694
  }
1695

1696
  ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
1,232,446✔
1697
  if (ret != 0) {
1,232,446✔
1698
    uError("hbMgrInit:taosThreadMutexInit error") return ret;
×
1699
  }
1700

1701
  ret = taosThreadMutexAttrDestroy(&attr);
1,232,446✔
1702
  if (ret != 0) {
1,232,446✔
1703
    uError("hbMgrInit:taosThreadMutexAttrDestroy error") return ret;
×
1704
  }
1705

1706
  // init handle funcs
1707
  hbMgrInitHandle();
1708

1709
  // init backgroud thread
1710
  ret = hbCreateThread();
1,232,446✔
1711
  if (ret != 0) {
1,232,446✔
1712
    uError("hbMgrInit:hbCreateThread error") return ret;
×
1713
  }
1714

1715
  return 0;
1,232,446✔
1716
}
1717

1718
void hbMgrCleanUp() {
1,233,521✔
1719
  hbStopThread();
1,233,521✔
1720

1721
  // destroy all appHbMgr
1722
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
1,233,521✔
1723
  if (old == 0) return;
1,233,521✔
1724

1725
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
1,232,446✔
1726
  if (TSDB_CODE_SUCCESS != code) {
1,232,446✔
1727
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1728
  }
1729
  appHbMgrCleanup();
1,232,446✔
1730
  taosArrayDestroy(clientHbMgr.appHbMgrs);
1,232,446✔
1731
  clientHbMgr.appHbMgrs = NULL;
1,232,446✔
1732
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,232,446✔
1733
  if (TSDB_CODE_SUCCESS != code) {
1,232,446✔
1734
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1735
  }
1736
}
1737

1738
int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, const char* user, const char* tokenName, int64_t clusterId) {
2,325,340✔
1739
  // init hash in activeinfo
1740
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
2,325,340✔
1741
  if (data != NULL) {
2,325,340✔
1742
    return 0;
×
1743
  }
1744
  SClientHbReq hbReq = {0};
2,325,340✔
1745
  hbReq.connKey = connKey;
2,325,340✔
1746
  hbReq.clusterId = clusterId;
2,325,340✔
1747
  tstrncpy(hbReq.user, user, sizeof(hbReq.user));
2,325,340✔
1748
  tstrncpy(hbReq.tokenName, tokenName, sizeof(hbReq.tokenName));
2,325,340✔
1749
  // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1750

1751
  TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)));
2,325,340✔
1752

1753
  (void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
2,325,340✔
1754
  return 0;
2,325,340✔
1755
}
1756

1757
int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, const char* user, const char* tokenName, int64_t clusterId, int8_t connType) {
2,325,340✔
1758
  SClientHbKey connKey = {
2,325,340✔
1759
      .tscRid = tscRefId,
1760
      .connType = connType,
1761
  };
1762

1763
  switch (connType) {
2,325,340✔
1764
    case CONN_TYPE__QUERY:
2,325,340✔
1765
    case CONN_TYPE__TMQ: {
1766
      return hbRegisterConnImpl(pAppHbMgr, connKey, user, tokenName, clusterId);
2,325,340✔
1767
    }
1768
    default:
×
1769
      return 0;
×
1770
  }
1771
}
1772

1773
void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
2,167,252✔
1774
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
2,167,252✔
1775
  if (TSDB_CODE_SUCCESS != code) {
2,167,287✔
1776
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1777
  }
1778
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
2,167,287✔
1779
  if (pAppHbMgr) {
2,167,287✔
1780
    SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
2,167,287✔
1781
    if (pReq) {
2,167,287✔
1782
      tFreeClientHbReq(pReq);
1783
      code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
2,157,966✔
1784
      if (TSDB_CODE_SUCCESS != code) {
2,157,966✔
1785
        tscError("hbDeregisterConn taosHashRemove error, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1786
      }
1787
      taosHashRelease(pAppHbMgr->activeInfo, pReq);
2,157,966✔
1788
      (void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
2,157,966✔
1789
    }
1790
  }
1791
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
2,167,287✔
1792
  if (TSDB_CODE_SUCCESS != code) {
2,167,287✔
1793
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1794
  }
1795
}
2,167,287✔
1796

1797
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
1798
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
840,031✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc