• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4941

27 Jan 2026 10:23AM UTC coverage: 66.868% (+0.04%) from 66.832%
#4941

push

travis-ci

web-flow
fix: asan invalid write issue (#34400)

7 of 8 new or added lines in 2 files covered. (87.5%)

560 existing lines in 126 files now uncovered.

204401 of 305680 relevant lines covered (66.87%)

126915843.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.55
/source/client/src/clientHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "scheduler.h"
22
#include "tglobal.h"
23
#include "trpc.h"
24

25
typedef struct {
26
  union {
27
    struct {
28
      SAppHbMgr *pAppHbMgr;
29
      int64_t    clusterId;
30
      int32_t    reqCnt;
31
      int8_t     connHbFlag;
32
    };
33
  };
34
} SHbParam;
35

36
SClientHbMgr clientHbMgr = {0};
37

38
static int32_t hbCreateThread();
39
static void    hbStopThread();
40
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg);
41
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
42

43
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
21,404,502✔
44
                                        SAppHbMgr *pAppHbMgr) {
45
  int32_t code = TSDB_CODE_SUCCESS;
21,404,502✔
46

47
  SUserAuthBatchRsp batchRsp = {0};
21,404,502✔
48
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
21,404,502✔
49
    return TSDB_CODE_INVALID_MSG;
×
50
  }
51

52
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
21,404,502✔
53
  for (int32_t i = 0; i < numOfBatchs; ++i) {
43,044,241✔
54
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
21,639,739✔
55
    if (NULL == rsp) {
21,639,739✔
56
      code = terrno;
×
57
      goto _return;
×
58
    }
59
    tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
21,639,739✔
60

61
    TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp));
21,639,739✔
62

63
  }
64

65
  if (numOfBatchs > 0) {
21,404,502✔
66
    TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp));
21,404,502✔
67
  }
68

69
  (void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
21,404,502✔
70

71
_return:
21,404,502✔
72
  taosArrayDestroy(batchRsp.pArray);
21,404,502✔
73
  return code;
21,404,502✔
74
}
75

76
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg) {
28,470,744✔
77
  int32_t code = 0;
28,470,744✔
78
  int32_t lino = 0;
28,470,744✔
79
  if (user == NULL || pCfg == NULL) {
28,470,744✔
80
    return code;
×
81
  }
82

83
  SUserSessCfg cfg = {0, 0, 0, 0, 0};
28,470,744✔
84

85
  if (memcmp(pCfg, &cfg, sizeof(SUserSessCfg)) == 0) {
28,470,744✔
86
    return TSDB_CODE_SUCCESS;
×
87
  }
88
  tscInfo(
28,470,744✔
89
      "update session metric for user:%s, sessPerUser:%d, sessConnTime:%d, sessConnIdleTime:%d, sessMaxConcurrency:%d, "
90
      "sessMaxCallVnodeNum:%d",
91
      user, pCfg->sessPerUser, pCfg->sessConnTime, pCfg->sessConnIdleTime, pCfg->sessMaxConcurrency,
92
      pCfg->sessMaxCallVnodeNum);
93

94
  if (pCfg->sessPerUser != 0) {
28,470,744✔
95
    code = sessMgtUpdataLimit((char *)user, SESSION_PER_USER, pCfg->sessPerUser);
28,470,744✔
96
    TAOS_CHECK_GOTO(code, &lino, _error);
28,470,744✔
97
  }
98

99
  if (pCfg->sessConnTime != 0) {
28,470,744✔
100
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_TIME, pCfg->sessConnTime);
28,470,744✔
101
    TAOS_CHECK_GOTO(code, &lino, _error);
28,470,744✔
102
  }
103

104
  if (pCfg->sessConnIdleTime != 0) {
28,470,744✔
105
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_IDLE_TIME, pCfg->sessConnIdleTime);
28,470,744✔
106
    TAOS_CHECK_GOTO(code, &lino, _error);
28,470,744✔
107
  }
108

109
  if (pCfg->sessMaxConcurrency != 0) {
28,470,744✔
110
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CONCURRENCY, pCfg->sessMaxConcurrency);
28,470,744✔
111
    TAOS_CHECK_GOTO(code, &lino, _error);
28,470,744✔
112
  }
113

114
  if (pCfg->sessMaxCallVnodeNum != 0) {
28,470,744✔
115
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CALL_VNODE_NUM, pCfg->sessMaxCallVnodeNum);
28,470,744✔
116
    TAOS_CHECK_GOTO(code, &lino, _error);
28,470,744✔
117
  }
118
_error:
28,470,744✔
119
  return code;
28,470,744✔
120
}
121
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
21,404,502✔
122
  int32_t code = 0;
21,404,502✔
123
  int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
21,404,502✔
124
  for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
46,039,002✔
125
    SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
24,634,500✔
126
    if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
24,634,500✔
127
      continue;
64,986✔
128
    }
129

130
    SClientHbReq    *pReq = NULL;
24,569,514✔
131
    SGetUserAuthRsp *pRsp = NULL;
24,569,514✔
132
    while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
54,062,054✔
133
      STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
29,525,914✔
134
      if (!pTscObj) {
29,525,914✔
135
        continue;
1,019,921✔
136
      }
137

138
      if (!pRsp) {
28,505,993✔
139
        for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
23,167,668✔
140
          SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
23,134,294✔
141
          if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
23,134,294✔
142
            pRsp = rsp;
22,942,174✔
143
            break;
22,942,174✔
144
          }
145
        }
146
        if (!pRsp) {
22,975,548✔
147
          releaseTscObj(pReq->connKey.tscRid);
33,374✔
148
          taosHashCancelIterate(hbMgr->activeInfo, pReq);
33,374✔
149
          break;
33,374✔
150
        }
151
      }
152

153
      if (pRsp->dropped == 1) {
28,472,619✔
154
        if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
1,563✔
155
          if (pTscObj->userDroppedInfo.fp) {
1,242✔
156
            SPassInfo *dropInfo = &pTscObj->userDroppedInfo;
850✔
157
            if (dropInfo->fp) {
850✔
158
              (*dropInfo->fp)(dropInfo->param, NULL, TAOS_NOTIFY_USER_DROPPED);
850✔
159
            }
160
          }
161
        }
162
        releaseTscObj(pReq->connKey.tscRid);
1,563✔
163
        continue;
1,563✔
164
      }
165

166
      // update token status
167
      if (pTscObj->tokenName[0] != 0) {
28,471,056✔
168
        STokenStatus *status = NULL;
1,448✔
169
        if (pRsp->tokens != NULL) {
1,448✔
170
          status = taosHashGet(pRsp->tokens, pTscObj->tokenName, strlen(pTscObj->tokenName) + 1);
1,448✔
171
        }
172

173
        STokenEvent event = { 0 };
1,448✔
174
        tstrncpy(event.tokenName, pTscObj->tokenName, sizeof(event.tokenName));
1,448✔
175
        if (status == NULL) {
1,448✔
176
          event.type = TSDB_TOKEN_EVENT_DROPPED;
×
177
        } else if (status->enabled == 0) {
1,448✔
178
          event.type = TSDB_TOKEN_EVENT_DISABLED;
312✔
179
        } else if (status->expireTime > 0 && status->expireTime < taosGetTimestampSec()) {
1,136✔
180
          event.type = TSDB_TOKEN_EVENT_EXPIRED;
×
181
        } else {
182
          event.type = TSDB_TOKEN_EVENT_MODIFIED;
1,136✔
183
          event.expireTime = status->expireTime;
1,136✔
184
        }
185

186
        STokenNotifyInfo *tni = &pTscObj->tokenNotifyInfo;
1,448✔
187
        if (event.type == TSDB_TOKEN_EVENT_MODIFIED) {
1,448✔
188
          int32_t oldExpireTime;
189
          do {
190
            oldExpireTime = atomic_load_32(&pTscObj->tokenExpireTime);
1,136✔
191
            if (oldExpireTime == status->expireTime) {
1,136✔
192
              break;
824✔
193
            }
194
          } while (atomic_val_compare_exchange_32(&pTscObj->tokenExpireTime, oldExpireTime, status->expireTime) != oldExpireTime);
312✔
195

196
          if (oldExpireTime != status->expireTime && tni->fp) {
1,136✔
197
            (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
198
          }
199
          tscDebug("update token %s of user %s: expire time from %d to %d, conn:%" PRIi64, pTscObj->tokenName,
1,136✔
200
                   pRsp->user, pTscObj->tokenExpireTime, status->expireTime, pTscObj->id);
201
        } else {
202
          if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
312✔
203
            if (tni->fp) {
312✔
204
              (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
205
            }
206
          }
207

208
          releaseTscObj(pReq->connKey.tscRid);
312✔
209
          continue;
312✔
210
        }
211
      }
212

213
      pTscObj->authVer = pRsp->version;
28,470,744✔
214
      if (hbUpdateUserSessMertric(pTscObj->user, &pRsp->sessCfg) != 0) {
28,470,744✔
215
        tscError("failed to update user session metric, user:%s", pTscObj->user);
×
216
      }
217

218
      if (pTscObj->sysInfo != pRsp->sysInfo) {
28,470,744✔
219
        tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", conn:%" PRIi64, pRsp->user,
1,700✔
220
                 pTscObj->sysInfo, pRsp->sysInfo, pTscObj->id);
221
        pTscObj->sysInfo = pRsp->sysInfo;
1,700✔
222
      }
223

224
      // update password version
225
      if (pTscObj->passInfo.fp) {
28,470,744✔
226
        SPassInfo *passInfo = &pTscObj->passInfo;
680✔
227
        int32_t    oldVer = 0;
680✔
228
        do {
229
          oldVer = atomic_load_32(&passInfo->ver);
680✔
230
          if (oldVer >= pRsp->passVer) {
680✔
231
            break;
340✔
232
          }
233
        } while (atomic_val_compare_exchange_32(&passInfo->ver, oldVer, pRsp->passVer) != oldVer);
340✔
234
        if (oldVer < pRsp->passVer) {
680✔
235
          (*passInfo->fp)(passInfo->param, &pRsp->passVer, TAOS_NOTIFY_PASSVER);
340✔
236
          tscDebug("update passVer of user %s from %d to %d, conn:%" PRIi64, pRsp->user, oldVer,
340✔
237
                   atomic_load_32(&passInfo->ver), pTscObj->id);
238
        }
239
      }
240

241
      // update ip white list version
242
      {
243
        SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
28,470,744✔
244
        int64_t oldVer = 0, newVer = pRsp->whiteListVer;
28,470,744✔
245
        do {
246
          oldVer = atomic_load_64(&whiteListInfo->ver);
28,470,744✔
247
          if (oldVer >= newVer) {
28,470,744✔
248
            break;
28,461,450✔
249
          }
250
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
9,294✔
251

252
        if (oldVer < newVer) {
28,470,744✔
253
          if (whiteListInfo->fp) {
9,294✔
254
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_WHITELIST_VER);
×
255
          }
256
          tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
9,294✔
257
                   oldVer, newVer, pTscObj->id);
258
        }
259
      }
260

261
      // update date time whitelist version
262
      {
263
        SWhiteListInfo *whiteListInfo = &pTscObj->dateTimeWhiteListInfo;
28,470,744✔
264

265
        int64_t oldVer = 0, newVer = pRsp->timeWhiteListVer;
28,470,744✔
266
        do {
267
          oldVer = atomic_load_64(&whiteListInfo->ver);
28,470,744✔
268
          if (oldVer >= newVer) {
28,470,744✔
269
            break;
28,470,744✔
270
          }
271
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
×
272

273
        if (oldVer < newVer) {
28,470,744✔
274
          if (whiteListInfo->fp) {
×
275
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_DATETIME_WHITELIST_VER);
×
276
          }
277
          tscDebug("update date time whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
×
278
                   oldVer, newVer, pTscObj->id);
279
        }
280
      }
281
      
282
      releaseTscObj(pReq->connKey.tscRid);
28,470,744✔
283
    }
284
  }
285
  return 0;
21,404,502✔
286
}
287

288
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
1,322,019✔
289
  int32_t    code = 0;
1,322,019✔
290
  SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
1,322,019✔
291
  if (NULL == vgInfo) {
1,322,019✔
292
    return terrno;
×
293
  }
294

295
  vgInfo->vgVersion = rsp->vgVersion;
1,322,019✔
296
  vgInfo->stateTs = rsp->stateTs;
1,322,019✔
297
  vgInfo->flags = rsp->flags;
1,322,019✔
298
  vgInfo->hashMethod = rsp->hashMethod;
1,322,019✔
299
  vgInfo->hashPrefix = rsp->hashPrefix;
1,322,019✔
300
  vgInfo->hashSuffix = rsp->hashSuffix;
1,322,019✔
301
  vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,322,019✔
302
  if (NULL == vgInfo->vgHash) {
1,322,019✔
303
    tscError("hash init[%d] failed", rsp->vgNum);
×
304
    code = terrno;
×
305
    goto _return;
×
306
  }
307

308
  for (int32_t j = 0; j < rsp->vgNum; ++j) {
4,765,120✔
309
    SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
3,443,101✔
310
    if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
3,443,101✔
311
      tscError("hash push failed, terrno:%d", terrno);
×
312
      code = terrno;
×
313
      goto _return;
×
314
    }
315
  }
316

317
_return:
1,322,019✔
318
  if (code) {
1,322,019✔
319
    taosHashCleanup(vgInfo->vgHash);
×
320
    taosMemoryFreeClear(vgInfo);
×
321
  }
322

323
  *pInfo = vgInfo;
1,322,019✔
324
  return code;
1,322,019✔
325
}
326

327
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
6,909,788✔
328
  int32_t code = 0;
6,909,788✔
329

330
  SDbHbBatchRsp batchRsp = {0};
6,909,788✔
331
  if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
6,909,788✔
332
    terrno = TSDB_CODE_INVALID_MSG;
×
333
    code = terrno;
×
334
    goto _return;
×
335
  }
336

337
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
6,909,788✔
338
  for (int32_t i = 0; i < numOfBatchs; ++i) {
8,330,309✔
339
    SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
1,420,521✔
340
    if (NULL == rsp) {
1,420,521✔
341
      code = terrno;
×
342
      goto _return;
×
343
    }
344
    if (rsp->useDbRsp) {
1,420,521✔
345
      tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
1,113,130✔
346
               rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
347

348
      if (rsp->useDbRsp->vgVersion < 0) {
1,113,130✔
349
        tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
30,560✔
350
        code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
30,560✔
351
      } else {
352
        SDBVgInfo *vgInfo = NULL;
1,082,570✔
353
        code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
1,082,570✔
354
        if (TSDB_CODE_SUCCESS != code) {
1,082,570✔
355
          goto _return;
×
356
        }
357

358
        tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
1,082,570✔
359

360
        TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo));
1,082,570✔
361

362
        if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
1,082,570✔
363
          code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
239,449✔
364
          if (TSDB_CODE_SUCCESS != code) {
239,449✔
365
            goto _return;
×
366
          }
367

368
          TSC_ERR_JRET(catalogUpdateDBVgInfo(
239,449✔
369
              pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
370
              rsp->useDbRsp->uid, vgInfo));
371
        }
372
      }
373
    }
374

375
    if (rsp->cfgRsp) {
1,420,521✔
376
      tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
52,820✔
377
      code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
52,820✔
378
      rsp->cfgRsp = NULL;
52,820✔
379
    }
380
    if (rsp->pTsmaRsp) {
1,420,521✔
381
      if (rsp->pTsmaRsp->pTsmas) {
720,612✔
382
        for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
1,348✔
383
          STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
674✔
384
          if (NULL == pTsma) {
674✔
385
            TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
×
386
          }
387
          TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
674✔
388
        }
389
        taosArrayClear(rsp->pTsmaRsp->pTsmas);
674✔
390
      } else {
391
        TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId));
719,938✔
392
      }
393
    }
394
  }
395

396
_return:
6,909,788✔
397

398
  tFreeSDbHbBatchRsp(&batchRsp);
6,909,788✔
399
  return code;
6,909,788✔
400
}
401

402
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
4,841,564✔
403
  int32_t code = TSDB_CODE_SUCCESS;
4,841,564✔
404

405
  SSTbHbRsp hbRsp = {0};
4,841,564✔
406
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
4,841,564✔
407
    terrno = TSDB_CODE_INVALID_MSG;
×
408
    return -1;
×
409
  }
410

411
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
4,841,564✔
412
  for (int32_t i = 0; i < numOfMeta; ++i) {
4,891,105✔
413
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
49,541✔
414
    if (NULL == rsp) {
49,541✔
415
      code = terrno;
×
416
      goto _return;
×
417
    }
418
    if (rsp->numOfColumns < 0) {
49,541✔
419
      tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
31,599✔
420
      TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid));
31,599✔
421
    } else {
422
      tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
17,942✔
423
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
17,942✔
424
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
×
425
        tFreeSSTbHbRsp(&hbRsp);
×
426
        return TSDB_CODE_TSC_INVALID_VALUE;
×
427
      }
428

429
      TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp));
17,942✔
430
    }
431
  }
432

433
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
4,841,564✔
434
  for (int32_t i = 0; i < numOfIndex; ++i) {
4,841,564✔
435
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
×
436
    if (NULL == rsp) {
×
437
      code = terrno;
×
438
      goto _return;
×
439
    }
440
    TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp));
×
441
  }
442

443
_return:
4,841,564✔
444
  taosArrayDestroy(hbRsp.pIndexRsp);
4,841,564✔
445
  hbRsp.pIndexRsp = NULL;
4,841,564✔
446

447
  tFreeSSTbHbRsp(&hbRsp);
4,841,564✔
448
  return code;
4,841,564✔
449
}
450

451
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
1,057✔
452
  return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value);
1,057✔
453
}
454

455
static void hbFreeSViewMetaInRsp(void *p) {
×
456
  if (NULL == p || NULL == *(void **)p) {
×
457
    return;
×
458
  }
459
  SViewMetaRsp *pRsp = *(SViewMetaRsp **)p;
×
460
  tFreeSViewMetaRsp(pRsp);
×
461
  taosMemoryFreeClear(pRsp);
×
462
}
463

464
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
1,057✔
465
  int32_t code = TSDB_CODE_SUCCESS;
1,057✔
466

467
  SViewHbRsp hbRsp = {0};
1,057✔
468
  if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
1,057✔
469
    taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
×
470
    terrno = TSDB_CODE_INVALID_MSG;
×
471
    return -1;
×
472
  }
473

474
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
1,057✔
475
  for (int32_t i = 0; i < numOfMeta; ++i) {
1,057✔
476
    SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
×
477
    if (NULL == rsp) {
×
478
      code = terrno;
×
479
      goto _return;
×
480
    }
481
    if (rsp->numOfCols < 0) {
×
482
      tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
483
      code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
×
484
      tFreeSViewMetaRsp(rsp);
×
485
      taosMemoryFreeClear(rsp);
×
486
    } else {
487
      tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
488
      code = catalogUpdateViewMeta(pCatalog, rsp);
×
489
    }
490
    TSC_ERR_JRET(code);
×
491
  }
492

493
_return:
1,057✔
494
  taosArrayDestroy(hbRsp.pViewRsp);
1,057✔
495
  return code;
1,057✔
496
}
497

498
static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
43,810✔
499
  int32_t code = 0;
43,810✔
500

501
  STSMAHbRsp hbRsp = {0};
43,810✔
502
  if (tDeserializeTSMAHbRsp(value, valueLen, &hbRsp)) {
43,810✔
503
    terrno = TSDB_CODE_INVALID_MSG;
×
504
    return -1;
×
505
  }
506

507
  int32_t numOfTsma = taosArrayGetSize(hbRsp.pTsmas);
43,810✔
508
  for (int32_t i = 0; i < numOfTsma; ++i) {
43,810✔
509
    STableTSMAInfo *pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i);
×
510

511
    if (!pTsmaInfo->pFuncs) {
×
512
      tscDebug("hb to remove tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
513
      code = catalogRemoveTSMA(pCatalog, pTsmaInfo);
×
514
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
515
    } else {
516
      tscDebug("hb to update tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
517
      code = catalogUpdateTSMA(pCatalog, &pTsmaInfo);
×
518
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
519
    }
520
    TSC_ERR_JRET(code);
×
521
  }
522

523
_return:
43,810✔
524
  taosArrayDestroy(hbRsp.pTsmas);
43,810✔
525
  return code;
43,810✔
526
}
527

528
static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
21,404,690✔
529
  for (int32_t i = 0; i < kvNum; ++i) {
54,606,468✔
530
    SKv *kv = taosArrayGet(pKvs, i);
33,201,778✔
531
    if (NULL == kv) {
33,201,778✔
532
      tscError("invalid hb kv, idx:%d", i);
×
533
      continue;
×
534
    }
535
    switch (kv->key) {
33,201,778✔
536
      case HEARTBEAT_KEY_USER_AUTHINFO: {
21,404,502✔
537
        if (kv->valueLen <= 0 || NULL == kv->value) {
21,404,502✔
538
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
×
539
          break;
×
540
        }
541
        if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) {
21,404,502✔
542
          tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
543
          break;
×
544
        }
545
        break;
21,404,502✔
546
      }
547
      case HEARTBEAT_KEY_DBINFO: {
6,909,788✔
548
        if (kv->valueLen <= 0 || NULL == kv->value) {
6,909,788✔
549
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
×
550
          break;
×
551
        }
552
        if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) {
6,909,788✔
553
          tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
554
          break;
×
555
        }
556
        break;
6,909,788✔
557
      }
558
      case HEARTBEAT_KEY_STBINFO: {
4,841,564✔
559
        if (kv->valueLen <= 0 || NULL == kv->value) {
4,841,564✔
560
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
×
561
          break;
×
562
        }
563
        if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) {
4,841,564✔
564
          tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
565
          break;
×
566
        }
567
        break;
4,841,564✔
568
      }
569
#ifdef TD_ENTERPRISE
570
      case HEARTBEAT_KEY_DYN_VIEW: {
1,057✔
571
        if (kv->valueLen <= 0 || NULL == kv->value) {
1,057✔
572
          tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
×
573
          break;
×
574
        }
575
        if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) {
1,057✔
576
          tscError("Process dyn view response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
577
          break;
×
578
        }
579
        break;
1,057✔
580
      }
581
      case HEARTBEAT_KEY_VIEWINFO: {
1,057✔
582
        if (kv->valueLen <= 0 || NULL == kv->value) {
1,057✔
583
          tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
×
584
          break;
×
585
        }
586
        if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) {
1,057✔
587
          tscError("Process view info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
588
          break;
×
589
        }
590
        break;
1,057✔
591
      }
592
#endif
593
      case HEARTBEAT_KEY_TSMA: {
43,810✔
594
        if (kv->valueLen <= 0 || !kv->value) {
43,810✔
595
          tscError("Invalid tsma info, len:%d, value:%p", kv->valueLen, kv->value);
×
596
        }
597
        if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) {
43,810✔
598
          tscError("Process tsma info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
599
        }
600
        break;
43,810✔
601
      }
602
      default:
×
603
        tscError("invalid hb key type:%d", kv->key);
×
604
        break;
×
605
    }
606
  }
607
}
21,404,690✔
608

609
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
27,964,179✔
610
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
27,964,179✔
611
  if (NULL == pReq) {
27,964,179✔
612
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
9,905✔
613
            pRsp->connKey.connType);
614
    return TSDB_CODE_SUCCESS;
9,905✔
615
  }
616

617
  if (pRsp->query) {
27,954,274✔
618
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
27,954,274✔
619
    if (NULL == pTscObj) {
27,954,274✔
620
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
2,582✔
621
    } else {
622
      if (pRsp->query->totalDnodes > 1) {
27,951,692✔
623
        SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
7,095,883✔
624
        if (!isEpsetEqual(&originEpset, &pRsp->query->epSet)) {
7,095,883✔
625
          SEpSet *pOrig = &originEpset;
28,456✔
626
          SEp    *pOrigEp = &pOrig->eps[pOrig->inUse];
28,456✔
627
          SEp    *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
28,456✔
628
          tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
28,456✔
629
                   pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
630
                   pNewEp->port);
631

632
          updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
28,456✔
633
        }
634
      }
635

636
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
27,951,692✔
637
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
27,951,692✔
638
      pTscObj->connId = pRsp->query->connId;
27,951,692✔
639
      tscTrace("connId:%u, hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
27,951,692✔
640
               pTscObj->pAppInfo->totalDnodes);
641

642
      if (pRsp->query->killRid) {
27,951,692✔
643
        tscDebug("QID:0x%" PRIx64 ", need to be killed now", pRsp->query->killRid);
4✔
644
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
4✔
645
        if (NULL == pRequest) {
4✔
646
          tscDebug("QID:0x%" PRIx64 ", not exist to kill", pRsp->query->killRid);
×
647
        } else {
648
          taos_stop_query((TAOS_RES *)pRequest);
4✔
649
          (void)releaseRequest(pRsp->query->killRid);
4✔
650
        }
651
      }
652

653
      if (pRsp->query->killConnection) {
27,951,692✔
654
        taos_close_internal(pTscObj);
×
655
      }
656

657
      if (pRsp->query->pQnodeList) {
27,951,692✔
658
        if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) {
207,960✔
659
          tscWarn("update qnode list failed");
×
660
        }
661
      }
662

663
      releaseTscObj(pRsp->connKey.tscRid);
27,951,692✔
664
    }
665
  }
666

667
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
27,954,274✔
668

669
  tscDebug("hb got %d rsp kv", kvNum);
27,954,274✔
670

671
  if (kvNum > 0) {
27,954,274✔
672
    struct SCatalog *pCatalog = NULL;
21,404,690✔
673
    int32_t          code = catalogGetHandle(pReq->clusterId, &pCatalog);
21,404,690✔
674
    if (code != TSDB_CODE_SUCCESS) {
21,404,690✔
675
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
×
676
    } else {
677
      hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr);
21,404,690✔
678
    }
679
  }
680

681
  taosHashRelease(pAppHbMgr->activeInfo, pReq);
27,954,274✔
682

683
  return TSDB_CODE_SUCCESS;
27,954,274✔
684
}
685

686
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
23,565,445✔
687
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
23,565,445✔
688
    goto _return;
×
689
  }
690

691
  static int32_t    emptyRspNum = 0;
692
  int32_t           idx = *(int32_t *)param;
23,565,343✔
693
  SClientHbBatchRsp pRsp = {0};
23,565,343✔
694
  if (TSDB_CODE_SUCCESS == code) {
23,565,364✔
695
    code = tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
23,391,973✔
696
    if (TSDB_CODE_SUCCESS != code) {
23,388,843✔
UNCOV
697
      tscError("deserialize hb rsp failed");
×
698
    }
699
    int32_t now = taosGetTimestampSec();
23,388,843✔
700
    int32_t delta = abs(now - pRsp.svrTimestamp);
23,388,312✔
701
    if (delta > tsTimestampDeltaLimit) {
23,388,312✔
UNCOV
702
      code = TSDB_CODE_TIME_UNSYNCED;
×
UNCOV
703
      tscError("time diff:%ds is too big", delta);
×
704
    }
705
  }
706

707
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
23,561,703✔
708

709
  (void)taosThreadMutexLock(&clientHbMgr.lock);
23,562,481✔
710

711
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
23,565,445✔
712
  if (pAppHbMgr == NULL) {
23,565,445✔
713
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
714
    tscError("appHbMgr not exist, idx:%d", idx);
×
715
    taosMemoryFree(pMsg->pData);
×
716
    taosMemoryFree(pMsg->pEpSet);
×
717
    tFreeClientHbBatchRsp(&pRsp);
718
    return TSDB_CODE_OUT_OF_RANGE;
×
719
  }
720

721
  SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
23,565,445✔
722

723
  if (code != 0) {
23,565,445✔
724
    pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
173,391✔
725
    tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
173,391✔
726
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
173,391✔
727
    taosMemoryFree(pMsg->pData);
173,391✔
728
    taosMemoryFree(pMsg->pEpSet);
173,391✔
729
    tFreeClientHbBatchRsp(&pRsp);
730
    return code;
173,391✔
731
  }
732

733
  pInst->serverCfg.monitorParas = pRsp.monitorParas;
23,392,054✔
734
  pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
23,392,054✔
735
  pInst->serverCfg.enableAuditSelect = pRsp.enableAuditSelect;
23,392,054✔
736
  pInst->serverCfg.enableAuditInsert = pRsp.enableAuditInsert;
23,392,054✔
737
  pInst->serverCfg.auditLevel = pRsp.auditLevel;
23,392,054✔
738
  pInst->serverCfg.enableStrongPass = pRsp.enableStrongPass;
23,392,054✔
739
  tsEnableStrongPassword = pInst->serverCfg.enableStrongPass;
23,392,054✔
740
  tscDebug("monitor paras from hb, clusterId:0x%" PRIx64 ", threshold:%d scope:%d", pInst->clusterId,
23,392,054✔
741
           pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
742

743
  if (rspNum) {
23,392,054✔
744
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
22,814,565✔
745
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
746
  } else {
747
    (void)atomic_add_fetch_32(&emptyRspNum, 1);
577,489✔
748
  }
749

750
  for (int32_t i = 0; i < rspNum; ++i) {
51,356,233✔
751
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
27,964,179✔
752
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
27,964,179✔
753
    if (code) {
27,964,179✔
754
      break;
×
755
    }
756
  }
757

758
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
23,392,054✔
759

760
  tFreeClientHbBatchRsp(&pRsp);
761

762
_return:
23,392,054✔
763
  taosMemoryFree(pMsg->pData);
23,392,054✔
764
  taosMemoryFree(pMsg->pEpSet);
23,392,054✔
765
  return code;
23,392,054✔
766
}
767

768
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
25,105,897✔
769
  int64_t    now = taosGetTimestampUs();
25,105,897✔
770
  SQueryDesc desc = {0};
25,105,897✔
771
  int32_t    code = 0;
25,105,897✔
772

773
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
25,105,897✔
774
  while (pIter != NULL) {
50,775,062✔
775
    int64_t     *rid = pIter;
25,669,165✔
776
    SRequestObj *pRequest = acquireRequest(*rid);
25,669,165✔
777
    if (NULL == pRequest) {
25,669,165✔
778
      pIter = taosHashIterate(pObj->pRequests, pIter);
17,784✔
779
      continue;
17,784✔
780
    }
781

782
    if (pRequest->killed || 0 == pRequest->body.queryJob) {
25,651,381✔
783
      (void)releaseRequest(*rid);
9,942,530✔
784
      pIter = taosHashIterate(pObj->pRequests, pIter);
9,942,530✔
785
      continue;
9,942,530✔
786
    }
787

788
    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
15,708,851✔
789
    desc.stime = pRequest->metric.start / 1000;
15,708,851✔
790
    desc.queryId = pRequest->requestId;
15,708,851✔
791
    desc.useconds = now - pRequest->metric.start;
15,708,851✔
792
    desc.reqRid = pRequest->self;
15,708,851✔
793
    desc.stableQuery = pRequest->stableQuery;
15,708,851✔
794
    desc.isSubQuery = pRequest->isSubReq;
15,708,851✔
795
    code = taosGetFqdn(desc.fqdn);
15,708,851✔
796
    if (TSDB_CODE_SUCCESS != code) {
15,708,851✔
797
      (void)releaseRequest(*rid);
×
798
      tscError("get fqdn failed");
×
799
      return TSDB_CODE_FAILED;
×
800
    }
801
    desc.subPlanNum = pRequest->body.subplanNum;
15,708,851✔
802

803
    if (desc.subPlanNum) {
15,708,851✔
804
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
15,708,851✔
805
      if (NULL == desc.subDesc) {
15,708,851✔
806
        (void)releaseRequest(*rid);
×
807
        return terrno;
×
808
      }
809

810
      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
15,708,851✔
811
      if (code) {
15,708,851✔
812
        taosArrayDestroy(desc.subDesc);
4,564,796✔
813
        desc.subDesc = NULL;
4,564,796✔
814
        code = TSDB_CODE_SUCCESS;
4,564,796✔
815
      }
816
      desc.subPlanNum = taosArrayGetSize(desc.subDesc);
15,708,851✔
817
    } else {
818
      desc.subDesc = NULL;
×
819
    }
820

821
    (void)releaseRequest(*rid);
15,708,851✔
822
    if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) {
31,417,702✔
823
      taosArrayDestroy(desc.subDesc);
×
824
      return terrno;
×
825
    }
826

827
    pIter = taosHashIterate(pObj->pRequests, pIter);
15,708,851✔
828
  }
829

830
  return code;
25,105,897✔
831
}
832

833
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
28,144,690✔
834
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
28,144,690✔
835
  if (NULL == pTscObj) {
28,144,690✔
836
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
×
837
    return terrno;
×
838
  }
839

840
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
28,144,690✔
841
  if (NULL == hbBasic) {
28,144,690✔
842
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
×
843
    releaseTscObj(connKey->tscRid);
×
844
    return terrno;
×
845
  }
846

847
  hbBasic->connId = pTscObj->connId;
28,144,690✔
848

849
  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
28,144,690✔
850
  if (numOfQueries <= 0) {
28,144,690✔
851
    req->query = hbBasic;
3,038,793✔
852
    releaseTscObj(connKey->tscRid);
3,038,793✔
853
    tscDebug("no queries on connection");
3,038,793✔
854
    return TSDB_CODE_SUCCESS;
3,038,793✔
855
  }
856

857
  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
25,105,897✔
858
  if (NULL == hbBasic->queryDesc) {
25,105,897✔
859
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
×
860
    releaseTscObj(connKey->tscRid);
×
861
    taosMemoryFree(hbBasic);
×
862
    return terrno;
×
863
  }
864

865
  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
25,105,897✔
866
  if (code) {
25,105,897✔
867
    releaseTscObj(connKey->tscRid);
×
868
    if (hbBasic->queryDesc) {
×
869
      taosArrayDestroyEx(hbBasic->queryDesc, tFreeClientHbQueryDesc);
×
870
    }
871
    taosMemoryFree(hbBasic);
×
872
    return code;
×
873
  }
874

875
  req->query = hbBasic;
25,105,897✔
876
  releaseTscObj(connKey->tscRid);
25,105,897✔
877

878
  return TSDB_CODE_SUCCESS;
25,105,897✔
879
}
880

881
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
889,907✔
882
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
889,907✔
883
  if (!pTscObj) {
889,907✔
884
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
712✔
885
    return terrno;
712✔
886
  }
887

888
  int32_t code = 0;
889,195✔
889

890
  SKv  kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
889,195✔
891
  SKv *pKv = NULL;
889,195✔
892
  if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
889,195✔
893
    int32_t           userNum = pKv->valueLen / sizeof(SUserAuthVersion);
532,762✔
894
    SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
532,762✔
895
    for (int32_t i = 0; i < userNum; ++i) {
547,802✔
896
      SUserAuthVersion *pUserAuth = userAuths + i;
547,802✔
897
      // both key and user exist, update version
898
      if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
547,802✔
899
        pUserAuth->version = htonl(-1);  // force get userAuthInfo
532,762✔
900
        goto _return;
532,762✔
901
      }
902
    }
903
    // key exists, user not exist, append user
904
    SUserAuthVersion *qUserAuth =
×
905
        (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
×
906
    if (qUserAuth) {
×
907
      tstrncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
×
908
      (qUserAuth + userNum)->version = htonl(-1);  // force get userAuthInfo
×
909
      pKv->value = qUserAuth;
×
910
      pKv->valueLen += sizeof(SUserAuthVersion);
×
911
    } else {
912
      code = terrno;
×
913
    }
914
    goto _return;
×
915
  }
916

917
  // key/user not exist, add user
918
  SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
356,433✔
919
  if (!user) {
356,433✔
920
    code = terrno;
×
921
    goto _return;
×
922
  }
923
  tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN);
356,433✔
924
  user->version = htonl(-1);  // force get userAuthInfo
356,433✔
925
  kv.valueLen = sizeof(SUserAuthVersion);
356,433✔
926
  kv.value = user;
356,433✔
927

928
  tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
356,433✔
929
           pTscObj->authVer, connKey->tscRid);
930

931
  if (!req->info) {
356,433✔
932
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
356,433✔
933
    if (NULL == req->info) {
356,433✔
934
      code = terrno;
×
935
      goto _return;
×
936
    }
937
  }
938

939
  if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) {
356,433✔
940
    taosMemoryFree(user);
×
941
    code = terrno ? terrno : TSDB_CODE_APP_ERROR;
×
942
    goto _return;
×
943
  }
944

945
_return:
880,462✔
946
  releaseTscObj(connKey->tscRid);
889,195✔
947
  if (code) {
889,195✔
948
    tscError("hb got user auth info failed since %s", tstrerror(code));
×
949
  }
950

951
  return code;
889,195✔
952
}
953

954
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
21,510,571✔
955
  SUserAuthVersion *users = NULL;
21,510,571✔
956
  uint32_t          userNum = 0;
21,510,571✔
957
  int32_t           code = 0;
21,510,571✔
958

959
  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
21,510,571✔
960
  if (TSDB_CODE_SUCCESS != code) {
21,510,571✔
961
    return code;
×
962
  }
963

964
  if (userNum <= 0) {
21,510,571✔
965
    taosMemoryFree(users);
290,149✔
966
    return TSDB_CODE_SUCCESS;
290,149✔
967
  }
968

969
  for (int32_t i = 0; i < userNum; ++i) {
42,676,081✔
970
    SUserAuthVersion *user = &users[i];
21,455,659✔
971
    user->version = htonl(user->version);
21,455,659✔
972
  }
973

974
  SKv kv = {
21,220,422✔
975
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
976
      .valueLen = sizeof(SUserAuthVersion) * userNum,
21,220,422✔
977
      .value = users,
978
  };
979

980
  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
21,220,422✔
981

982
  if (NULL == req->info) {
21,220,422✔
983
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
21,220,422✔
984
    if (NULL == req->info) {
21,220,422✔
985
      taosMemoryFree(users);
×
986
      return terrno;
×
987
    }
988
  }
989

990
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
21,220,422✔
991
  if (TSDB_CODE_SUCCESS != code) {
21,220,422✔
992
    taosMemoryFree(users);
×
993
    return code;
×
994
  }
995

996
  return TSDB_CODE_SUCCESS;
21,220,422✔
997
}
998

999
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
22,993,095✔
1000
  SDbCacheInfo *dbs = NULL;
22,993,095✔
1001
  uint32_t      dbNum = 0;
22,993,095✔
1002
  int32_t       code = 0;
22,993,095✔
1003

1004
  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
22,993,095✔
1005
  if (TSDB_CODE_SUCCESS != code) {
22,993,095✔
1006
    return code;
×
1007
  }
1008

1009
  if (dbNum <= 0) {
22,993,095✔
1010
    taosMemoryFree(dbs);
16,059,546✔
1011
    return TSDB_CODE_SUCCESS;
16,059,546✔
1012
  }
1013

1014
  for (int32_t i = 0; i < dbNum; ++i) {
15,191,389✔
1015
    SDbCacheInfo *db = &dbs[i];
8,257,840✔
1016
    tscDebug("the %dth expired db:%s, dbId:%" PRId64
8,257,840✔
1017
             ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
1018
             i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
1019

1020
    db->dbId = htobe64(db->dbId);
8,257,840✔
1021
    db->vgVersion = htonl(db->vgVersion);
8,257,840✔
1022
    db->cfgVersion = htonl(db->cfgVersion);
8,257,840✔
1023
    db->numOfTable = htonl(db->numOfTable);
8,257,840✔
1024
    db->stateTs = htobe64(db->stateTs);
8,257,840✔
1025
    db->tsmaVersion = htonl(db->tsmaVersion);
8,257,840✔
1026
  }
1027

1028
  SKv kv = {
6,933,549✔
1029
      .key = HEARTBEAT_KEY_DBINFO,
1030
      .valueLen = sizeof(SDbCacheInfo) * dbNum,
6,933,549✔
1031
      .value = dbs,
1032
  };
1033

1034
  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
6,933,549✔
1035

1036
  if (NULL == req->info) {
6,933,549✔
1037
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
85✔
1038
    if (NULL == req->info) {
85✔
1039
      taosMemoryFree(dbs);
×
1040
      return terrno;
×
1041
    }
1042
  }
1043

1044
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
6,933,549✔
1045
  if (TSDB_CODE_SUCCESS != code) {
6,933,549✔
1046
    taosMemoryFree(dbs);
×
1047
    return code;
×
1048
  }
1049

1050
  return TSDB_CODE_SUCCESS;
6,933,549✔
1051
}
1052

1053
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
22,993,095✔
1054
  SSTableVersion *stbs = NULL;
22,993,095✔
1055
  uint32_t        stbNum = 0;
22,993,095✔
1056
  int32_t         code = 0;
22,993,095✔
1057

1058
  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
22,993,095✔
1059
  if (TSDB_CODE_SUCCESS != code) {
22,993,095✔
1060
    return code;
×
1061
  }
1062

1063
  if (stbNum <= 0) {
22,993,095✔
1064
    taosMemoryFree(stbs);
18,135,254✔
1065
    return TSDB_CODE_SUCCESS;
18,135,254✔
1066
  }
1067

1068
  for (int32_t i = 0; i < stbNum; ++i) {
11,182,456✔
1069
    SSTableVersion *stb = &stbs[i];
6,324,615✔
1070
    stb->suid = htobe64(stb->suid);
6,324,615✔
1071
    stb->sversion = htonl(stb->sversion);
6,324,615✔
1072
    stb->tversion = htonl(stb->tversion);
6,324,615✔
1073
    stb->smaVer = htonl(stb->smaVer);
6,324,615✔
1074
  }
1075

1076
  SKv kv = {
4,857,841✔
1077
      .key = HEARTBEAT_KEY_STBINFO,
1078
      .valueLen = sizeof(SSTableVersion) * stbNum,
4,857,841✔
1079
      .value = stbs,
1080
  };
1081

1082
  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
4,857,841✔
1083

1084
  if (NULL == req->info) {
4,857,841✔
1085
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
103✔
1086
    if (NULL == req->info) {
103✔
1087
      taosMemoryFree(stbs);
×
1088
      return terrno;
×
1089
    }
1090
  }
1091

1092
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
4,857,841✔
1093
  if (TSDB_CODE_SUCCESS != code) {
4,857,841✔
1094
    taosMemoryFree(stbs);
×
1095
    return code;
×
1096
  }
1097

1098
  return TSDB_CODE_SUCCESS;
4,857,841✔
1099
}
1100

1101
int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
22,993,095✔
1102
  SViewVersion    *views = NULL;
22,993,095✔
1103
  uint32_t         viewNum = 0;
22,993,095✔
1104
  int32_t          code = 0;
22,993,095✔
1105
  SDynViewVersion *pDynViewVer = NULL;
22,993,095✔
1106

1107
  TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer));
22,993,095✔
1108

1109
  if (viewNum <= 0) {
22,993,095✔
1110
    taosMemoryFree(views);
22,990,032✔
1111
    taosMemoryFree(pDynViewVer);
22,990,032✔
1112
    return TSDB_CODE_SUCCESS;
22,990,032✔
1113
  }
1114

1115
  for (int32_t i = 0; i < viewNum; ++i) {
6,126✔
1116
    SViewVersion *view = &views[i];
3,063✔
1117
    view->dbId = htobe64(view->dbId);
3,063✔
1118
    view->viewId = htobe64(view->viewId);
3,063✔
1119
    view->version = htonl(view->version);
3,063✔
1120
  }
1121

1122
  tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
3,063✔
1123

1124
  if (NULL == req->info) {
3,063✔
1125
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1126
    if (NULL == req->info) {
×
1127
      TSC_ERR_JRET(terrno);
×
1128
    }
1129
  }
1130

1131
  SKv kv = {
3,063✔
1132
      .key = HEARTBEAT_KEY_DYN_VIEW,
1133
      .valueLen = sizeof(SDynViewVersion),
1134
      .value = pDynViewVer,
1135
  };
1136

1137
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
3,063✔
1138

1139
  kv.key = HEARTBEAT_KEY_VIEWINFO;
3,063✔
1140
  kv.valueLen = sizeof(SViewVersion) * viewNum;
3,063✔
1141
  kv.value = views;
3,063✔
1142

1143
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
3,063✔
1144
  return TSDB_CODE_SUCCESS;
3,063✔
1145
_return:
×
1146
  taosMemoryFree(views);
×
1147
  taosMemoryFree(pDynViewVer);
×
1148
  return code;
×
1149
}
1150

1151
int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) {
22,993,095✔
1152
  int32_t       code = 0;
22,993,095✔
1153
  uint32_t      tsmaNum = 0;
22,993,095✔
1154
  STSMAVersion *tsmas = NULL;
22,993,095✔
1155

1156
  code = catalogGetExpiredTsmas(pCatalog, &tsmas, &tsmaNum);
22,993,095✔
1157
  if (code) {
22,993,095✔
1158
    taosMemoryFree(tsmas);
×
1159
    return code;
×
1160
  }
1161

1162
  if (tsmaNum <= 0) {
22,993,095✔
1163
    taosMemoryFree(tsmas);
22,949,285✔
1164
    return TSDB_CODE_SUCCESS;
22,949,285✔
1165
  }
1166

1167
  for (int32_t i = 0; i < tsmaNum; ++i) {
87,620✔
1168
    STSMAVersion *tsma = &tsmas[i];
43,810✔
1169
    tsma->dbId = htobe64(tsma->dbId);
43,810✔
1170
    tsma->tsmaId = htobe64(tsma->tsmaId);
43,810✔
1171
    tsma->version = htonl(tsma->version);
43,810✔
1172
  }
1173

1174
  tscDebug("hb got %d expred tsmas, valueLen:%lu", tsmaNum, sizeof(STSMAVersion) * tsmaNum);
43,810✔
1175

1176
  if (!pReq->info) {
43,810✔
1177
    pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1178
    if (!pReq->info) {
×
1179
      taosMemoryFree(tsmas);
×
1180
      return terrno;
×
1181
    }
1182
  }
1183

1184
  SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas};
43,810✔
1185
  code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
43,810✔
1186
  if (TSDB_CODE_SUCCESS != code) {
43,810✔
1187
    taosMemoryFree(tsmas);
×
1188
    return code;
×
1189
  }
1190
  return TSDB_CODE_SUCCESS;
43,810✔
1191
}
1192

1193
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
28,144,690✔
1194
  SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
28,144,690✔
1195
  if (NULL != pApp) {
28,144,690✔
1196
    (void)memcpy(&req->app, pApp, sizeof(*pApp));
28,144,690✔
1197
  } else {
1198
    (void)memset(&req->app.summary, 0, sizeof(req->app.summary));
×
1199
    req->app.pid = taosGetPId();
×
1200
    req->app.appId = clientHbMgr.appId;
×
1201
    TSC_ERR_RET(taosGetAppName(req->app.name, NULL));
×
1202
  }
1203

1204
  return TSDB_CODE_SUCCESS;
28,144,690✔
1205
}
1206

1207
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
28,144,690✔
1208
  int32_t   code = 0;
28,144,690✔
1209
  SHbParam *hbParam = (SHbParam *)param;
28,144,690✔
1210
  SCatalog *pCatalog = NULL;
28,144,690✔
1211

1212
  code = hbGetQueryBasicInfo(connKey, req);
28,144,690✔
1213
  if (code != TSDB_CODE_SUCCESS) {
28,144,690✔
1214
    tscWarn("hbGetQueryBasicInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1215
    return code;
×
1216
  }
1217

1218
  if (hbParam->reqCnt == 0) {
28,144,690✔
1219
    code = catalogGetHandle(hbParam->clusterId, &pCatalog);
22,993,807✔
1220
    if (code != TSDB_CODE_SUCCESS) {
22,993,807✔
1221
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1222
      return code;
×
1223
    }
1224

1225
    code = hbGetAppInfo(hbParam->clusterId, req);
22,993,807✔
1226
    if (TSDB_CODE_SUCCESS != code) {
22,993,807✔
1227
      tscWarn("getAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1228
      return code;
×
1229
    }
1230

1231
    if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
22,993,807✔
1232
      code = hbGetExpiredUserInfo(connKey, pCatalog, req);
21,510,571✔
1233
      if (TSDB_CODE_SUCCESS != code) {
21,510,571✔
1234
        tscWarn("hbGetExpiredUserInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1235
        return code;
×
1236
      }
1237
      if (clientHbMgr.appHbHash) {
21,510,571✔
1238
        code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
2,522,647✔
1239
        if (TSDB_CODE_SUCCESS != code) {
2,522,647✔
1240
          tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId,
×
1241
                  tstrerror(code));
1242
          return code;
×
1243
        }
1244
      }
1245
    }
1246

1247
    // invoke after hbGetExpiredUserInfo
1248
    if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
22,993,807✔
1249
      code = hbGetUserAuthInfo(connKey, hbParam, req);
889,907✔
1250
      if (TSDB_CODE_SUCCESS != code) {
889,907✔
1251
        tscWarn("hbGetUserAuthInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
712✔
1252
        return code;
712✔
1253
      }
1254
      atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
889,195✔
1255
    }
1256

1257
    code = hbGetExpiredDBInfo(connKey, pCatalog, req);
22,993,095✔
1258
    if (TSDB_CODE_SUCCESS != code) {
22,993,095✔
1259
      tscWarn("hbGetExpiredDBInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1260
      return code;
×
1261
    }
1262

1263
    code = hbGetExpiredStbInfo(connKey, pCatalog, req);
22,993,095✔
1264
    if (TSDB_CODE_SUCCESS != code) {
22,993,095✔
1265
      tscWarn("hbGetExpiredStbInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1266
      return code;
×
1267
    }
1268

1269
#ifdef TD_ENTERPRISE
1270
    code = hbGetExpiredViewInfo(connKey, pCatalog, req);
22,993,095✔
1271
    if (TSDB_CODE_SUCCESS != code) {
22,993,095✔
1272
      tscWarn("hbGetExpiredViewInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1273
      return code;
×
1274
    }
1275
#endif
1276
    code = hbGetExpiredTSMAInfo(connKey, pCatalog, req);
22,993,095✔
1277
    if (TSDB_CODE_SUCCESS != code) {
22,993,095✔
1278
      tscWarn("hbGetExpiredTSMAInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1279
      return code;
×
1280
    }
1281
  } else {
1282
    code = hbGetAppInfo(hbParam->clusterId, req);
5,150,883✔
1283
    if (TSDB_CODE_SUCCESS != code) {
5,150,883✔
1284
      tscWarn("hbGetAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1285
      return code;
×
1286
    }
1287
  }
1288

1289
  ++hbParam->reqCnt;  // success to get catalog info
28,143,978✔
1290

1291
  return TSDB_CODE_SUCCESS;
28,143,978✔
1292
}
1293

1294
static FORCE_INLINE void hbMgrInitHandle() {
1295
  // init all handle
1296
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
1,557,038✔
1297
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbQueryHbReqHandle;
1,557,038✔
1298

1299
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
1,557,038✔
1300
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbQueryHbRspHandle;
1,557,038✔
1301
}
1,557,038✔
1302

1303
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
23,573,931✔
1304
  *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
23,573,931✔
1305
  if (pBatchReq == NULL) {
23,573,931✔
1306
    return terrno;
×
1307
  }
1308
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
23,573,931✔
1309
  (*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
23,573,931✔
1310
  if (!(*pBatchReq)->reqs) {
23,573,931✔
1311
    tFreeClientHbBatchReq(*pBatchReq);
×
1312
    return terrno;
×
1313
  }
1314

1315
  int64_t  maxIpWhiteVer = 0;
23,573,931✔
1316
  void    *pIter = NULL;
23,573,931✔
1317
  SHbParam param = {0};
23,573,931✔
1318
  while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
53,220,682✔
1319
    SClientHbReq *pOneReq = pIter;
29,646,751✔
1320
    SClientHbKey *connKey = &pOneReq->connKey;
29,646,751✔
1321
    STscObj      *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
29,646,751✔
1322

1323
    if (!pTscObj || atomic_load_8(&pTscObj->dropped) == 1) {
29,646,751✔
1324
      if (pTscObj) releaseTscObj(connKey->tscRid);
1,502,061✔
1325
      continue;
1,502,061✔
1326
    }
1327

1328
    tstrncpy(pOneReq->userApp, pTscObj->optionInfo.userApp, sizeof(pOneReq->userApp));
28,144,690✔
1329
    tstrncpy(pOneReq->cInfo, pTscObj->optionInfo.cInfo, sizeof(pOneReq->cInfo));
28,144,690✔
1330
    pOneReq->userIp = pTscObj->optionInfo.userIp;
28,144,690✔
1331
    pOneReq->userDualIp = pTscObj->optionInfo.userDualIp;
28,144,690✔
1332
    tstrncpy(pOneReq->sVer, td_version, TSDB_VERSION_LEN);
28,144,690✔
1333

1334
    pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
28,144,690✔
1335
    if (NULL == pOneReq) {
28,144,690✔
1336
      releaseTscObj(connKey->tscRid);
×
1337
      continue;
×
1338
    }
1339

1340
    switch (connKey->connType) {
28,144,690✔
1341
      case CONN_TYPE__QUERY:
28,144,690✔
1342
      case CONN_TYPE__TMQ: {
1343
        if (param.clusterId == 0) {
28,144,690✔
1344
          // init
1345
          param.clusterId = pOneReq->clusterId;
22,993,807✔
1346
          param.pAppHbMgr = pAppHbMgr;
22,993,807✔
1347
          param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
22,993,807✔
1348
        }
1349
        break;
28,144,690✔
1350
      }
1351
      default:
×
1352
        break;
×
1353
    }
1354
    if (clientHbMgr.reqHandle[connKey->connType]) {
28,144,690✔
1355
      int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, &param, pOneReq);
28,144,690✔
1356
      if (code) {
28,144,690✔
1357
        tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
712✔
1358
                connKey->tscRid, connKey->connType);
1359
      }
1360
    }
1361

1362
    int64_t ver = atomic_load_64(&pTscObj->whiteListInfo.ver);
28,144,690✔
1363
    maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
28,144,690✔
1364
    releaseTscObj(connKey->tscRid);
28,144,690✔
1365
  }
1366
  (*pBatchReq)->ipWhiteListVer = maxIpWhiteVer;
23,573,931✔
1367

1368
  return TSDB_CODE_SUCCESS;
23,573,931✔
1369
}
1370

1371
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
×
1372

1373
void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
3,099,686✔
1374
  dst->numOfInsertsReq += src->numOfInsertsReq;
3,099,686✔
1375
  dst->numOfInsertRows += src->numOfInsertRows;
3,099,686✔
1376
  dst->insertElapsedTime += src->insertElapsedTime;
3,099,686✔
1377
  dst->insertBytes += src->insertBytes;
3,099,686✔
1378
  dst->fetchBytes += src->fetchBytes;
3,099,686✔
1379
  dst->queryElapsedTime += src->queryElapsedTime;
3,099,686✔
1380
  dst->numOfSlowQueries += src->numOfSlowQueries;
3,099,686✔
1381
  dst->totalRequests += src->totalRequests;
3,099,686✔
1382
  dst->currentRequests += src->currentRequests;
3,099,686✔
1383
}
3,099,686✔
1384

1385
int32_t hbGatherAppInfo(void) {
23,062,017✔
1386
  SAppHbReq req = {0};
23,062,017✔
1387
  int32_t   code = TSDB_CODE_SUCCESS;
23,062,017✔
1388
  int       sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
23,062,017✔
1389
  if (sz > 0) {
23,062,017✔
1390
    req.pid = taosGetPId();
23,062,017✔
1391
    req.appId = clientHbMgr.appId;
23,062,017✔
1392
    TSC_ERR_RET(taosGetAppName(req.name, NULL));
23,062,017✔
1393
  }
1394

1395
  taosHashClear(clientHbMgr.appSummary);
23,062,017✔
1396

1397
  for (int32_t i = 0; i < sz; ++i) {
49,233,310✔
1398
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
26,171,293✔
1399
    if (pAppHbMgr == NULL) continue;
26,171,293✔
1400

1401
    int64_t    clusterId = pAppHbMgr->pAppInstInfo->clusterId;
26,171,293✔
1402
    SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
26,171,293✔
1403
    if (NULL == pApp) {
26,171,293✔
1404
      (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
23,071,607✔
1405
      req.startTime = pAppHbMgr->startTime;
23,071,607✔
1406
      TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)));
23,071,607✔
1407
    } else {
1408
      if (pAppHbMgr->startTime < pApp->startTime) {
3,099,686✔
1409
        pApp->startTime = pAppHbMgr->startTime;
×
1410
      }
1411

1412
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
3,099,686✔
1413
    }
1414
  }
1415

1416
  return TSDB_CODE_SUCCESS;
23,062,017✔
1417
}
1418

1419
static void *hbThreadFunc(void *param) {
1,557,038✔
1420
  setThreadName("hb");
1,557,038✔
1421
#ifdef WINDOWS
1422
  if (taosCheckCurrentInDll()) {
1423
    atexit(hbThreadFuncUnexpectedStopped);
1424
  }
1425
#endif
1426
  while (1) {
22,011,124✔
1427
    if (1 == clientHbMgr.threadStop) {
23,568,162✔
1428
      break;
500,974✔
1429
    }
1430

1431
    if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) {
23,067,188✔
1432
      tscError("taosThreadMutexLock failed");
×
1433
      return NULL;
×
1434
    }
1435

1436
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
23,067,188✔
1437
    if (sz > 0) {
23,067,188✔
1438
      if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) {
23,062,017✔
1439
        tscError("hbGatherAppInfo failed");
×
1440
        return NULL;
×
1441
      }
1442
      if (sz > 1 && !clientHbMgr.appHbHash) {
23,062,017✔
1443
        clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
44,060✔
1444
        if (NULL == clientHbMgr.appHbHash) {
44,060✔
1445
          tscError("taosHashInit failed");
×
1446
          return NULL;
×
1447
        }
1448
      }
1449
      taosHashClear(clientHbMgr.appHbHash);
23,062,017✔
1450
    }
1451

1452
    for (int i = 0; i < sz; i++) {
49,238,481✔
1453
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
26,171,293✔
1454
      if (pAppHbMgr == NULL) {
26,171,293✔
1455
        continue;
18,914✔
1456
      }
1457

1458
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
26,171,293✔
1459
      if (connCnt == 0) {
26,171,293✔
1460
        continue;
2,597,362✔
1461
      }
1462
      SClientHbBatchReq *pReq = NULL;
23,573,931✔
1463
      int32_t            code = hbGatherAllInfo(pAppHbMgr, &pReq);
23,573,931✔
1464
      if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
23,573,931✔
1465
        terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
×
1466
        tFreeClientHbBatchReq(pReq);
×
1467
        continue;
×
1468
      }
1469
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
23,573,931✔
1470
      if (tlen == -1) {
23,573,931✔
1471
        tFreeClientHbBatchReq(pReq);
×
1472
        break;
×
1473
      }
1474
      void *buf = taosMemoryMalloc(tlen);
23,573,931✔
1475
      if (buf == NULL) {
23,573,931✔
1476
        tFreeClientHbBatchReq(pReq);
×
1477
        // hbClearReqInfo(pAppHbMgr);
1478
        break;
×
1479
      }
1480

1481
      if (tSerializeSClientHbBatchReq(buf, tlen, pReq) == -1) {
23,573,931✔
1482
        tFreeClientHbBatchReq(pReq);
×
1483
        taosMemoryFree(buf);
×
1484
        break;
×
1485
      }
1486
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
23,573,931✔
1487

1488
      if (pInfo == NULL) {
23,573,931✔
1489
        tFreeClientHbBatchReq(pReq);
×
1490
        // hbClearReqInfo(pAppHbMgr);
1491
        taosMemoryFree(buf);
×
1492
        break;
×
1493
      }
1494
      pInfo->fp = hbAsyncCallBack;
23,573,931✔
1495
      pInfo->msgInfo.pData = buf;
23,573,931✔
1496
      pInfo->msgInfo.len = tlen;
23,573,931✔
1497
      pInfo->msgType = TDMT_MND_HEARTBEAT;
23,573,931✔
1498
      pInfo->param = taosMemoryMalloc(sizeof(int32_t));
23,573,931✔
1499
      if (pInfo->param  == NULL) {
23,573,931✔
1500
        tFreeClientHbBatchReq(pReq);
×
1501
        // hbClearReqInfo(pAppHbMgr);
1502
        taosMemoryFree(buf);
×
1503
        taosMemoryFree(pInfo);
×
1504
        break;
×
1505
      }
1506
      *(int32_t *)pInfo->param = i;
23,573,931✔
1507
      pInfo->paramFreeFp = taosAutoMemoryFree;
23,573,931✔
1508
      pInfo->requestId = generateRequestId();
23,573,931✔
1509
      pInfo->requestObjRefId = 0;
23,573,931✔
1510

1511
      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
23,573,931✔
1512
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
23,573,931✔
1513
      if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
23,573,931✔
1514
        tscWarn("failed to async send msg to server");
×
1515
      }
1516
      tFreeClientHbBatchReq(pReq);
23,573,931✔
1517
      // hbClearReqInfo(pAppHbMgr);
1518
      (void)atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
23,573,931✔
1519
    }
1520

1521
    if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) {
23,067,188✔
1522
      tscError("taosThreadMutexLock failed");
×
1523
      return NULL;
×
1524
    }
1525
    taosMsleep(HEARTBEAT_INTERVAL);
23,067,188✔
1526
  }
1527
  taosHashCleanup(clientHbMgr.appHbHash);
500,974✔
1528
  return NULL;
500,974✔
1529
}
1530

1531
static int32_t hbCreateThread() {
1,557,038✔
1532
  int32_t      code = TSDB_CODE_SUCCESS;
1,557,038✔
1533
  TdThreadAttr thAttr;
1,545,956✔
1534
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
1,557,038✔
1535
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
1,557,038✔
1536
#ifdef TD_COMPACT_OS
1537
  TSC_ERR_JRET(taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL));
1538
#endif
1539

1540
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
1,557,038✔
1541
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1542
    TSC_ERR_RET(terrno);
×
1543
  }
1544
  (void)taosThreadAttrDestroy(&thAttr);
1,557,038✔
1545
_return:
1,557,038✔
1546

1547
  if (code) {
1,557,038✔
1548
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1549
    TSC_ERR_RET(terrno);
×
1550
  }
1551

1552
  return code;
1,557,038✔
1553
}
1554

1555
static void hbStopThread() {
1,558,134✔
1556
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
1,558,134✔
1557
    return;
1,096✔
1558
  }
1559
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
1,557,038✔
1560
    tscDebug("hb thread already stopped");
×
1561
    return;
×
1562
  }
1563

1564
  int32_t code = TSDB_CODE_SUCCESS;
1,557,038✔
1565
  // thread quit mode kill or inner exit from self-thread
1566
  if (clientHbMgr.quitByKill) {
1,557,038✔
1567
    code = taosThreadKill(clientHbMgr.thread, 0);
1,165,548✔
1568
    if (TSDB_CODE_SUCCESS != code) {
1,165,548✔
1569
      tscError("taosThreadKill failed since %s", tstrerror(code));
×
1570
    }
1571
  } else {
1572
    code = taosThreadJoin(clientHbMgr.thread, NULL);
391,490✔
1573
    if (TSDB_CODE_SUCCESS != code) {
391,490✔
1574
      tscError("taosThreadJoin failed since %s", tstrerror(code));
×
1575
    }
1576
  }
1577

1578
  tscDebug("hb thread stopped");
1,557,038✔
1579
}
1580

1581
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) {
1,618,965✔
1582
  int32_t code = TSDB_CODE_SUCCESS;
1,618,965✔
1583
  TSC_ERR_RET(hbMgrInit());
1,618,965✔
1584
  *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
1,618,965✔
1585
  if (*pAppHbMgr == NULL) {
1,618,965✔
1586
    TSC_ERR_JRET(terrno);
×
1587
  }
1588
  // init stat
1589
  (*pAppHbMgr)->startTime = taosGetTimestampMs();
3,225,816✔
1590
  (*pAppHbMgr)->connKeyCnt = 0;
1,618,965✔
1591
  (*pAppHbMgr)->connHbFlag = 0;
1,618,965✔
1592
  (*pAppHbMgr)->reportCnt = 0;
1,618,965✔
1593
  (*pAppHbMgr)->reportBytes = 0;
1,618,965✔
1594
  (*pAppHbMgr)->key = taosStrdup(key);
1,618,965✔
1595
  if ((*pAppHbMgr)->key == NULL) {
1,618,965✔
1596
    TSC_ERR_JRET(terrno);
×
1597
  }
1598

1599
  // init app info
1600
  (*pAppHbMgr)->pAppInstInfo = pAppInstInfo;
1,618,965✔
1601

1602
  // init hash info
1603
  (*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1,618,965✔
1604

1605
  if ((*pAppHbMgr)->activeInfo == NULL) {
1,618,965✔
1606
    TSC_ERR_JRET(terrno);
×
1607
  }
1608

1609
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
1610

1611
  TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
1,618,965✔
1612
  if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
3,237,930✔
1613
    code = terrno;
×
1614
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
1615
    goto _return;
×
1616
  }
1617
  (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
1,618,965✔
1618
  TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
1,618,965✔
1619

1620
  return TSDB_CODE_SUCCESS;
1,618,965✔
1621
_return:
×
1622
  taosMemoryFree(*pAppHbMgr);
×
1623
  return code;
×
1624
}
1625

1626
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
1,618,965✔
1627
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
1,618,965✔
1628
  while (pIter != NULL) {
1,789,386✔
1629
    SClientHbReq *pOneReq = pIter;
170,421✔
1630
    tFreeClientHbReq(pOneReq);
1631
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
170,421✔
1632
  }
1633
  taosHashCleanup(pTarget->activeInfo);
1,618,965✔
1634
  pTarget->activeInfo = NULL;
1,618,965✔
1635

1636
  taosMemoryFree(pTarget->key);
1,618,965✔
1637
  taosMemoryFree(pTarget);
1,618,965✔
1638
}
1,618,965✔
1639

1640
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
1,618,965✔
1641
  int32_t code = TSDB_CODE_SUCCESS;
1,618,965✔
1642
  code = taosThreadMutexLock(&clientHbMgr.lock);
1,618,965✔
1643
  if (TSDB_CODE_SUCCESS != code) {
1,618,965✔
1644
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1645
  }
1646
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,618,965✔
1647
  for (int32_t i = 0; i < mgrSize; ++i) {
1,618,965✔
1648
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
×
1649
    if (pItem == *pAppHbMgr) {
×
1650
      hbFreeAppHbMgr(*pAppHbMgr);
×
1651
      *pAppHbMgr = NULL;
×
1652
      taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
×
1653
      break;
×
1654
    }
1655
  }
1656
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,618,965✔
1657
  if (TSDB_CODE_SUCCESS != code) {
1,618,965✔
1658
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1659
  }
1660
}
1,618,965✔
1661

1662
void appHbMgrCleanup(void) {
1,557,038✔
1663
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,557,038✔
1664
  for (int i = 0; i < sz; i++) {
3,176,003✔
1665
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
1,618,965✔
1666
    if (pTarget == NULL) continue;
1,618,965✔
1667
    hbFreeAppHbMgr(pTarget);
1,618,965✔
1668
  }
1669
}
1,557,038✔
1670

1671
int32_t hbMgrInit() {
1,618,965✔
1672
  // init once
1673
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
1,618,965✔
1674
  if (old == 1) return 0;
1,618,965✔
1675

1676
  clientHbMgr.appId = tGenIdPI64();
1,557,038✔
1677
  tscInfo("app initialized, appId:0x%" PRIx64, clientHbMgr.appId);
1,557,038✔
1678

1679
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,557,038✔
1680
  if (NULL == clientHbMgr.appSummary) {
1,557,038✔
1681
    uError("hbMgrInit:taosHashInit error") return terrno;
×
1682
  }
1683
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
1,557,038✔
1684
  if (NULL == clientHbMgr.appHbMgrs) {
1,557,038✔
1685
    uError("hbMgrInit:taosArrayInit error") return terrno;
×
1686
  }
1687
  TdThreadMutexAttr attr = {0};
1,557,038✔
1688

1689
  int ret = taosThreadMutexAttrInit(&attr);
1,557,038✔
1690
  if (ret != 0) {
1,557,038✔
1691
    uError("hbMgrInit:taosThreadMutexAttrInit error") return ret;
×
1692
  }
1693

1694
  ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
1,557,038✔
1695
  if (ret != 0) {
1,557,038✔
1696
    uError("hbMgrInit:taosThreadMutexAttrSetType error") return ret;
×
1697
  }
1698

1699
  ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
1,557,038✔
1700
  if (ret != 0) {
1,557,038✔
1701
    uError("hbMgrInit:taosThreadMutexInit error") return ret;
×
1702
  }
1703

1704
  ret = taosThreadMutexAttrDestroy(&attr);
1,557,038✔
1705
  if (ret != 0) {
1,557,038✔
1706
    uError("hbMgrInit:taosThreadMutexAttrDestroy error") return ret;
×
1707
  }
1708

1709
  // init handle funcs
1710
  hbMgrInitHandle();
1711

1712
  // init backgroud thread
1713
  ret = hbCreateThread();
1,557,038✔
1714
  if (ret != 0) {
1,557,038✔
1715
    uError("hbMgrInit:hbCreateThread error") return ret;
×
1716
  }
1717

1718
  return 0;
1,557,038✔
1719
}
1720

1721
void hbMgrCleanUp() {
1,558,134✔
1722
  hbStopThread();
1,558,134✔
1723

1724
  // destroy all appHbMgr
1725
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
1,558,134✔
1726
  if (old == 0) return;
1,558,134✔
1727

1728
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
1,557,038✔
1729
  if (TSDB_CODE_SUCCESS != code) {
1,557,038✔
1730
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1731
  }
1732
  appHbMgrCleanup();
1,557,038✔
1733
  taosArrayDestroy(clientHbMgr.appHbMgrs);
1,557,038✔
1734
  clientHbMgr.appHbMgrs = NULL;
1,557,038✔
1735
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,557,038✔
1736
  if (TSDB_CODE_SUCCESS != code) {
1,557,038✔
1737
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1738
  }
1739
}
1740

1741
int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, const char* user, const char* tokenName, int64_t clusterId) {
2,675,043✔
1742
  // init hash in activeinfo
1743
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
2,675,043✔
1744
  if (data != NULL) {
2,675,043✔
1745
    return 0;
×
1746
  }
1747
  SClientHbReq hbReq = {0};
2,675,043✔
1748
  hbReq.connKey = connKey;
2,675,043✔
1749
  hbReq.clusterId = clusterId;
2,675,043✔
1750
  tstrncpy(hbReq.user, user, sizeof(hbReq.user));
2,675,043✔
1751
  tstrncpy(hbReq.tokenName, tokenName, sizeof(hbReq.tokenName));
2,675,043✔
1752
  // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1753

1754
  TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)));
2,675,043✔
1755

1756
  (void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
2,675,043✔
1757
  return 0;
2,675,043✔
1758
}
1759

1760
int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, const char* user, const char* tokenName, int64_t clusterId, int8_t connType) {
2,675,043✔
1761
  SClientHbKey connKey = {
2,675,043✔
1762
      .tscRid = tscRefId,
1763
      .connType = connType,
1764
  };
1765

1766
  switch (connType) {
2,675,043✔
1767
    case CONN_TYPE__QUERY:
2,675,043✔
1768
    case CONN_TYPE__TMQ: {
1769
      return hbRegisterConnImpl(pAppHbMgr, connKey, user, tokenName, clusterId);
2,675,043✔
1770
    }
1771
    default:
×
1772
      return 0;
×
1773
  }
1774
}
1775

1776
void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
2,516,477✔
1777
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
2,516,477✔
1778
  if (TSDB_CODE_SUCCESS != code) {
2,516,509✔
1779
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1780
  }
1781
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
2,516,509✔
1782
  if (pAppHbMgr) {
2,516,509✔
1783
    SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
2,515,841✔
1784
    if (pReq) {
2,515,841✔
1785
      tFreeClientHbReq(pReq);
1786
      code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
2,504,622✔
1787
      if (TSDB_CODE_SUCCESS != code) {
2,504,622✔
1788
        tscError("hbDeregisterConn taosHashRemove error, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1789
      }
1790
      taosHashRelease(pAppHbMgr->activeInfo, pReq);
2,504,622✔
1791
      (void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
2,504,622✔
1792
    }
1793
  }
1794
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
2,516,509✔
1795
  if (TSDB_CODE_SUCCESS != code) {
2,516,509✔
1796
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1797
  }
1798
}
2,516,509✔
1799

1800
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
1801
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
1,166,564✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc