• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5006

29 Mar 2026 04:32AM UTC coverage: 72.274% (+0.1%) from 72.152%
#5006

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253711 of 351039 relevant lines covered (72.27%)

131490495.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.24
/source/client/src/clientHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "scheduler.h"
22
#include "tglobal.h"
23
#include "trpc.h"
24

25
typedef struct {
26
  union {
27
    struct {
28
      SAppHbMgr *pAppHbMgr;
29
      int64_t    clusterId;
30
      int32_t    reqCnt;
31
      int8_t     connHbFlag;
32
    };
33
  };
34
} SHbParam;
35

36
SClientHbMgr clientHbMgr = {0};
37

38
static int32_t hbCreateThread();
39
static void    hbStopThread();
40
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg);
41
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
42

43
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
26,059,335✔
44
                                        SAppHbMgr *pAppHbMgr) {
45
  int32_t code = TSDB_CODE_SUCCESS;
26,059,335✔
46

47
  SUserAuthBatchRsp batchRsp = {0};
26,059,335✔
48
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
26,059,335✔
49
    TSC_ERR_JRET(TSDB_CODE_INVALID_MSG);
×
50
  }
51

52
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
26,059,335✔
53
  for (int32_t i = 0; i < numOfBatchs; ++i) {
52,564,229✔
54
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
26,504,894✔
55
    if (NULL == rsp) {
26,504,894✔
56
      code = terrno;
×
57
      goto _return;
×
58
    }
59
    tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
26,504,894✔
60

61
    TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp));
26,504,894✔
62

63
  }
64

65
  if (numOfBatchs > 0) {
26,059,335✔
66
    TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp));
26,059,335✔
67
  }
68

69
  (void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
26,059,335✔
70

71
_return:
26,059,335✔
72
  tFreeSUserAuthBatchRsp(&batchRsp);
26,059,335✔
73
  return code;
26,059,335✔
74
}
75

76
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg) {
36,666,136✔
77
  int32_t code = 0;
36,666,136✔
78
  int32_t lino = 0;
36,666,136✔
79
  if (user == NULL || pCfg == NULL) {
36,666,136✔
80
    return code;
×
81
  }
82

83
  SUserSessCfg cfg = {0, 0, 0, 0, 0};
36,666,136✔
84

85
  if (memcmp(pCfg, &cfg, sizeof(SUserSessCfg)) == 0) {
36,666,136✔
86
    return TSDB_CODE_SUCCESS;
×
87
  }
88
  tscInfo(
36,666,136✔
89
      "update session metric for user:%s, sessPerUser:%d, sessConnTime:%d, sessConnIdleTime:%d, sessMaxConcurrency:%d, "
90
      "sessMaxCallVnodeNum:%d",
91
      user, pCfg->sessPerUser, pCfg->sessConnTime, pCfg->sessConnIdleTime, pCfg->sessMaxConcurrency,
92
      pCfg->sessMaxCallVnodeNum);
93

94
  if (pCfg->sessPerUser != 0) {
36,666,136✔
95
    code = sessMgtUpdataLimit((char *)user, SESSION_PER_USER, pCfg->sessPerUser);
36,666,136✔
96
    TAOS_CHECK_GOTO(code, &lino, _error);
36,666,136✔
97
  }
98

99
  if (pCfg->sessConnTime != 0) {
36,666,136✔
100
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_TIME, pCfg->sessConnTime);
36,666,136✔
101
    TAOS_CHECK_GOTO(code, &lino, _error);
36,666,136✔
102
  }
103

104
  if (pCfg->sessConnIdleTime != 0) {
36,666,136✔
105
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_IDLE_TIME, pCfg->sessConnIdleTime);
36,666,136✔
106
    TAOS_CHECK_GOTO(code, &lino, _error);
36,666,136✔
107
  }
108

109
  if (pCfg->sessMaxConcurrency != 0) {
36,666,136✔
110
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CONCURRENCY, pCfg->sessMaxConcurrency);
36,666,136✔
111
    TAOS_CHECK_GOTO(code, &lino, _error);
36,666,136✔
112
  }
113

114
  if (pCfg->sessMaxCallVnodeNum != 0) {
36,666,136✔
115
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CALL_VNODE_NUM, pCfg->sessMaxCallVnodeNum);
36,666,136✔
116
    TAOS_CHECK_GOTO(code, &lino, _error);
36,666,136✔
117
  }
118
_error:
36,666,136✔
119
  return code;
36,666,136✔
120
}
121
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
26,059,335✔
122
  int32_t code = 0;
26,059,335✔
123
  int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
26,059,335✔
124
  for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
56,869,171✔
125
    SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
30,809,836✔
126
    if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
30,809,836✔
127
      continue;
90,684✔
128
    }
129

130
    SClientHbReq    *pReq = NULL;
30,719,152✔
131
    SGetUserAuthRsp *pRsp = NULL;
30,719,152✔
132
    while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
68,612,767✔
133
      STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
37,959,826✔
134
      if (!pTscObj) {
37,959,826✔
135
        continue;
1,224,970✔
136
      }
137

138
      if (!pRsp) {
36,734,856✔
139
        for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
28,762,502✔
140
          SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
28,696,291✔
141
          if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
28,696,291✔
142
            pRsp = rsp;
28,252,280✔
143
            break;
28,252,280✔
144
          }
145
        }
146
        if (!pRsp) {
28,318,491✔
147
          releaseTscObj(pReq->connKey.tscRid);
66,211✔
148
          taosHashCancelIterate(hbMgr->activeInfo, pReq);
66,211✔
149
          break;
66,211✔
150
        }
151
      }
152

153
      if (pRsp->dropped == 1) {
36,668,645✔
154
        if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
2,170✔
155
          if (pTscObj->userDroppedInfo.fp) {
1,641✔
156
            SPassInfo *dropInfo = &pTscObj->userDroppedInfo;
950✔
157
            if (dropInfo->fp) {
950✔
158
              (*dropInfo->fp)(dropInfo->param, NULL, TAOS_NOTIFY_USER_DROPPED);
950✔
159
            }
160
          }
161
        }
162
        releaseTscObj(pReq->connKey.tscRid);
2,170✔
163
        continue;
2,170✔
164
      }
165

166
      // update token status
167
      if (pTscObj->tokenName[0] != 0) {
36,666,475✔
168
        STokenStatus *status = NULL;
339✔
169
        if (pRsp->tokens != NULL) {
339✔
170
          status = taosHashGet(pRsp->tokens, pTscObj->tokenName, strlen(pTscObj->tokenName) + 1);
×
171
        }
172

173
        STokenEvent event = { 0 };
339✔
174
        tstrncpy(event.tokenName, pTscObj->tokenName, sizeof(event.tokenName));
339✔
175
        if (status == NULL) {
339✔
176
          event.type = TSDB_TOKEN_EVENT_DROPPED;
339✔
177
        } else if (status->enabled == 0) {
×
178
          event.type = TSDB_TOKEN_EVENT_DISABLED;
×
179
        } else if (status->expireTime > 0 && status->expireTime < taosGetTimestampSec()) {
×
180
          event.type = TSDB_TOKEN_EVENT_EXPIRED;
×
181
        } else {
182
          event.type = TSDB_TOKEN_EVENT_MODIFIED;
×
183
          event.expireTime = status->expireTime;
×
184
        }
185

186
        STokenNotifyInfo *tni = &pTscObj->tokenNotifyInfo;
339✔
187
        if (event.type == TSDB_TOKEN_EVENT_MODIFIED) {
339✔
188
          int32_t oldExpireTime;
189
          do {
190
            oldExpireTime = atomic_load_32(&pTscObj->tokenExpireTime);
×
191
            if (oldExpireTime == status->expireTime) {
×
192
              break;
×
193
            }
194
          } while (atomic_val_compare_exchange_32(&pTscObj->tokenExpireTime, oldExpireTime, status->expireTime) != oldExpireTime);
×
195

196
          if (oldExpireTime != status->expireTime && tni->fp) {
×
197
            (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
198
          }
199
          tscDebug("update token %s of user %s: expire time from %d to %d, conn:%" PRIi64, pTscObj->tokenName,
×
200
                   pRsp->user, pTscObj->tokenExpireTime, status->expireTime, pTscObj->id);
201
        } else {
202
          if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
339✔
203
            if (tni->fp) {
339✔
204
              (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
205
            }
206
          }
207

208
          releaseTscObj(pReq->connKey.tscRid);
339✔
209
          continue;
339✔
210
        }
211
      }
212

213
      pTscObj->authVer = pRsp->version;
36,666,136✔
214
      if (hbUpdateUserSessMertric(pTscObj->user, &pRsp->sessCfg) != 0) {
36,666,136✔
215
        tscError("failed to update user session metric, user:%s", pTscObj->user);
×
216
      }
217

218
      if (pTscObj->sysInfo != pRsp->sysInfo) {
36,666,136✔
219
        tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", conn:%" PRIi64, pRsp->user,
2,076✔
220
                 pTscObj->sysInfo, pRsp->sysInfo, pTscObj->id);
221
        pTscObj->sysInfo = pRsp->sysInfo;
2,076✔
222
      }
223

224
      // update password version
225
      if (pTscObj->passInfo.fp) {
36,666,136✔
226
        SPassInfo *passInfo = &pTscObj->passInfo;
380✔
227
        int32_t    oldVer = 0;
380✔
228
        do {
229
          oldVer = atomic_load_32(&passInfo->ver);
380✔
230
          if (oldVer >= pRsp->passVer) {
380✔
231
            break;
×
232
          }
233
        } while (atomic_val_compare_exchange_32(&passInfo->ver, oldVer, pRsp->passVer) != oldVer);
380✔
234
        if (oldVer < pRsp->passVer) {
380✔
235
          (*passInfo->fp)(passInfo->param, &pRsp->passVer, TAOS_NOTIFY_PASSVER);
380✔
236
          tscDebug("update passVer of user %s from %d to %d, conn:%" PRIi64, pRsp->user, oldVer,
380✔
237
                   atomic_load_32(&passInfo->ver), pTscObj->id);
238
        }
239
      }
240

241
      // update ip white list version
242
      {
243
        SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
36,666,136✔
244
        int64_t oldVer = 0, newVer = pRsp->whiteListVer;
36,666,136✔
245
        do {
246
          oldVer = atomic_load_64(&whiteListInfo->ver);
36,666,136✔
247
          if (oldVer >= newVer) {
36,666,136✔
248
            break;
36,656,095✔
249
          }
250
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
10,041✔
251

252
        if (oldVer < newVer) {
36,666,136✔
253
          if (whiteListInfo->fp) {
10,041✔
254
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_WHITELIST_VER);
×
255
          }
256
          tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
10,041✔
257
                   oldVer, newVer, pTscObj->id);
258
        }
259
      }
260

261
      // update date time whitelist version
262
      {
263
        SWhiteListInfo *whiteListInfo = &pTscObj->dateTimeWhiteListInfo;
36,666,136✔
264

265
        int64_t oldVer = 0, newVer = pRsp->timeWhiteListVer;
36,666,136✔
266
        do {
267
          oldVer = atomic_load_64(&whiteListInfo->ver);
36,666,136✔
268
          if (oldVer >= newVer) {
36,666,136✔
269
            break;
36,666,136✔
270
          }
271
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
×
272

273
        if (oldVer < newVer) {
36,666,136✔
274
          if (whiteListInfo->fp) {
×
275
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_DATETIME_WHITELIST_VER);
×
276
          }
277
          tscDebug("update date time whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
×
278
                   oldVer, newVer, pTscObj->id);
279
        }
280
      }
281
      
282
      releaseTscObj(pReq->connKey.tscRid);
36,666,136✔
283
    }
284
  }
285
  return 0;
26,059,335✔
286
}
287

288
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
1,603,802✔
289
  int32_t    code = 0;
1,603,802✔
290
  SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
1,603,802✔
291
  if (NULL == vgInfo) {
1,603,802✔
292
    return terrno;
×
293
  }
294

295
  vgInfo->vgVersion = rsp->vgVersion;
1,603,802✔
296
  vgInfo->stateTs = rsp->stateTs;
1,603,802✔
297
  vgInfo->flags = rsp->flags;
1,603,802✔
298
  vgInfo->hashMethod = rsp->hashMethod;
1,603,802✔
299
  vgInfo->hashPrefix = rsp->hashPrefix;
1,603,802✔
300
  vgInfo->hashSuffix = rsp->hashSuffix;
1,603,802✔
301
  vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,603,802✔
302
  if (NULL == vgInfo->vgHash) {
1,603,802✔
303
    tscError("hash init[%d] failed", rsp->vgNum);
×
304
    code = terrno;
×
305
    goto _return;
×
306
  }
307

308
  for (int32_t j = 0; j < rsp->vgNum; ++j) {
5,614,071✔
309
    SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
4,010,269✔
310
    if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
4,010,269✔
311
      tscError("hash push failed, terrno:%d", terrno);
×
312
      code = terrno;
×
313
      goto _return;
×
314
    }
315
  }
316

317
_return:
1,603,802✔
318
  if (code) {
1,603,802✔
319
    taosHashCleanup(vgInfo->vgHash);
×
320
    taosMemoryFreeClear(vgInfo);
×
321
  }
322

323
  *pInfo = vgInfo;
1,603,802✔
324
  return code;
1,603,802✔
325
}
326

327
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
8,457,229✔
328
  int32_t code = 0;
8,457,229✔
329

330
  SDbHbBatchRsp batchRsp = {0};
8,457,229✔
331
  if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
8,457,229✔
332
    terrno = TSDB_CODE_INVALID_MSG;
×
333
    code = terrno;
×
334
    goto _return;
×
335
  }
336

337
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
8,457,229✔
338
  for (int32_t i = 0; i < numOfBatchs; ++i) {
10,264,914✔
339
    SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
1,807,685✔
340
    if (NULL == rsp) {
1,807,685✔
341
      code = terrno;
×
342
      goto _return;
×
343
    }
344
    if (rsp->useDbRsp) {
1,807,685✔
345
      tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
1,326,601✔
346
               rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
347

348
      if (rsp->useDbRsp->vgVersion < 0) {
1,326,601✔
349
        tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
40,314✔
350
        code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
40,314✔
351
      } else {
352
        SDBVgInfo *vgInfo = NULL;
1,286,287✔
353
        code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
1,286,287✔
354
        if (TSDB_CODE_SUCCESS != code) {
1,286,287✔
355
          goto _return;
×
356
        }
357

358
        tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
1,286,287✔
359

360
        TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo));
1,286,287✔
361

362
        if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
1,286,287✔
363
          code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
317,515✔
364
          if (TSDB_CODE_SUCCESS != code) {
317,515✔
365
            goto _return;
×
366
          }
367

368
          TSC_ERR_JRET(catalogUpdateDBVgInfo(
317,515✔
369
              pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
370
              rsp->useDbRsp->uid, vgInfo));
371
        }
372
      }
373
    }
374

375
    if (rsp->cfgRsp) {
1,807,685✔
376
      tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
57,401✔
377
      code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
57,401✔
378
      rsp->cfgRsp = NULL;
57,401✔
379
    }
380
    if (rsp->pTsmaRsp) {
1,807,685✔
381
      if (rsp->pTsmaRsp->pTsmas) {
971,926✔
382
        for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
9,490✔
383
          STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
6,132✔
384
          if (NULL == pTsma) {
6,132✔
385
            TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
×
386
          }
387
          TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
6,132✔
388
        }
389
        taosArrayClear(rsp->pTsmaRsp->pTsmas);
3,358✔
390
      } else {
391
        TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId));
968,568✔
392
      }
393
    }
394
  }
395

396
_return:
8,457,229✔
397

398
  tFreeSDbHbBatchRsp(&batchRsp);
8,457,229✔
399
  return code;
8,457,229✔
400
}
401

402
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
5,750,506✔
403
  int32_t code = TSDB_CODE_SUCCESS;
5,750,506✔
404

405
  SSTbHbRsp hbRsp = {0};
5,750,506✔
406
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
5,750,506✔
407
    terrno = TSDB_CODE_INVALID_MSG;
×
408
    return -1;
×
409
  }
410

411
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
5,750,506✔
412
  for (int32_t i = 0; i < numOfMeta; ++i) {
5,801,567✔
413
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
51,061✔
414
    if (NULL == rsp) {
51,061✔
415
      code = terrno;
×
416
      goto _return;
×
417
    }
418
    if (rsp->numOfColumns < 0) {
51,061✔
419
      tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
26,480✔
420
      TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid));
26,480✔
421
    } else {
422
      tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
24,581✔
423
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
24,581✔
424
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
×
425
        tFreeSSTbHbRsp(&hbRsp);
×
426
        return TSDB_CODE_TSC_INVALID_VALUE;
×
427
      }
428

429
      TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp));
24,581✔
430
    }
431
  }
432

433
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
5,750,506✔
434
  for (int32_t i = 0; i < numOfIndex; ++i) {
5,750,506✔
435
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
×
436
    if (NULL == rsp) {
×
437
      code = terrno;
×
438
      goto _return;
×
439
    }
440
    TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp));
×
441
  }
442

443
_return:
5,750,506✔
444
  taosArrayDestroy(hbRsp.pIndexRsp);
5,750,506✔
445
  hbRsp.pIndexRsp = NULL;
5,750,506✔
446

447
  tFreeSSTbHbRsp(&hbRsp);
5,750,506✔
448
  return code;
5,750,506✔
449
}
450

451
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,279✔
452
  return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value);
7,279✔
453
}
454

455
static void hbFreeSViewMetaInRsp(void *p) {
×
456
  if (NULL == p || NULL == *(void **)p) {
×
457
    return;
×
458
  }
459
  SViewMetaRsp *pRsp = *(SViewMetaRsp **)p;
×
460
  tFreeSViewMetaRsp(pRsp);
×
461
  taosMemoryFreeClear(pRsp);
×
462
}
463

464
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,279✔
465
  int32_t code = TSDB_CODE_SUCCESS;
7,279✔
466

467
  SViewHbRsp hbRsp = {0};
7,279✔
468
  if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
7,279✔
469
    taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
×
470
    terrno = TSDB_CODE_INVALID_MSG;
×
471
    return -1;
×
472
  }
473

474
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
7,279✔
475
  for (int32_t i = 0; i < numOfMeta; ++i) {
7,663✔
476
    SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
384✔
477
    if (NULL == rsp) {
384✔
478
      code = terrno;
×
479
      goto _return;
×
480
    }
481
    if (rsp->numOfCols < 0) {
384✔
482
      tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
384✔
483
      code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
384✔
484
      tFreeSViewMetaRsp(rsp);
384✔
485
      taosMemoryFreeClear(rsp);
384✔
486
    } else {
487
      tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
488
      code = catalogUpdateViewMeta(pCatalog, rsp);
×
489
    }
490
    TSC_ERR_JRET(code);
384✔
491
  }
492

493
_return:
7,279✔
494
  taosArrayDestroy(hbRsp.pViewRsp);
7,279✔
495
  return code;
7,279✔
496
}
497

498
static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
9,344✔
499
  int32_t code = 0;
9,344✔
500

501
  STSMAHbRsp hbRsp = {0};
9,344✔
502
  if (tDeserializeTSMAHbRsp(value, valueLen, &hbRsp)) {
9,344✔
503
    terrno = TSDB_CODE_INVALID_MSG;
×
504
    return -1;
×
505
  }
506

507
  int32_t numOfTsma = taosArrayGetSize(hbRsp.pTsmas);
9,344✔
508
  for (int32_t i = 0; i < numOfTsma; ++i) {
9,344✔
509
    STableTSMAInfo *pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i);
×
510

511
    if (!pTsmaInfo->pFuncs) {
×
512
      tscDebug("hb to remove tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
513
      code = catalogRemoveTSMA(pCatalog, pTsmaInfo);
×
514
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
515
    } else {
516
      tscDebug("hb to update tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
517
      code = catalogUpdateTSMA(pCatalog, &pTsmaInfo);
×
518
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
519
    }
520
    TSC_ERR_JRET(code);
×
521
  }
522

523
_return:
9,344✔
524
  taosArrayDestroy(hbRsp.pTsmas);
9,344✔
525
  return code;
9,344✔
526
}
527

528
static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
26,059,488✔
529
  for (int32_t i = 0; i < kvNum; ++i) {
66,350,460✔
530
    SKv *kv = taosArrayGet(pKvs, i);
40,290,972✔
531
    if (NULL == kv) {
40,290,972✔
532
      tscError("invalid hb kv, idx:%d", i);
×
533
      continue;
×
534
    }
535
    switch (kv->key) {
40,290,972✔
536
      case HEARTBEAT_KEY_USER_AUTHINFO: {
26,059,335✔
537
        if (kv->valueLen <= 0 || NULL == kv->value) {
26,059,335✔
538
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
×
539
          break;
×
540
        }
541
        if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) {
26,059,335✔
542
          tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
543
          break;
×
544
        }
545
        break;
26,059,335✔
546
      }
547
      case HEARTBEAT_KEY_DBINFO: {
8,457,229✔
548
        if (kv->valueLen <= 0 || NULL == kv->value) {
8,457,229✔
549
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
×
550
          break;
×
551
        }
552
        if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) {
8,457,229✔
553
          tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
554
          break;
×
555
        }
556
        break;
8,457,229✔
557
      }
558
      case HEARTBEAT_KEY_STBINFO: {
5,750,506✔
559
        if (kv->valueLen <= 0 || NULL == kv->value) {
5,750,506✔
560
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
×
561
          break;
×
562
        }
563
        if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) {
5,750,506✔
564
          tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
565
          break;
×
566
        }
567
        break;
5,750,506✔
568
      }
569
#ifdef TD_ENTERPRISE
570
      case HEARTBEAT_KEY_DYN_VIEW: {
7,279✔
571
        if (kv->valueLen <= 0 || NULL == kv->value) {
7,279✔
572
          tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
×
573
          break;
×
574
        }
575
        if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) {
7,279✔
576
          tscError("Process dyn view response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
577
          break;
×
578
        }
579
        break;
7,279✔
580
      }
581
      case HEARTBEAT_KEY_VIEWINFO: {
7,279✔
582
        if (kv->valueLen <= 0 || NULL == kv->value) {
7,279✔
583
          tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
×
584
          break;
×
585
        }
586
        if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) {
7,279✔
587
          tscError("Process view info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
588
          break;
×
589
        }
590
        break;
7,279✔
591
      }
592
#endif
593
      case HEARTBEAT_KEY_TSMA: {
9,344✔
594
        if (kv->valueLen <= 0 || !kv->value) {
9,344✔
595
          tscError("Invalid tsma info, len:%d, value:%p", kv->valueLen, kv->value);
×
596
        }
597
        if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) {
9,344✔
598
          tscError("Process tsma info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
599
        }
600
        break;
9,344✔
601
      }
602
      default:
×
603
        tscError("invalid hb key type:%d", kv->key);
×
604
        break;
×
605
    }
606
  }
607
}
26,059,488✔
608

609
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
34,856,238✔
610
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
34,856,238✔
611
  if (NULL == pReq) {
34,856,238✔
612
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
107,752✔
613
            pRsp->connKey.connType);
614
    return TSDB_CODE_SUCCESS;
107,752✔
615
  }
616

617
  if (pRsp->query) {
34,748,486✔
618
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
34,748,486✔
619
    if (NULL == pTscObj) {
34,748,486✔
620
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
13,634✔
621
    } else {
622
      if (pRsp->query->totalDnodes > 1) {
34,734,852✔
623
        SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
8,168,596✔
624
        if (!isEpsetEqual(&originEpset, &pRsp->query->epSet)) {
8,168,596✔
625
          SEpSet *pOrig = &originEpset;
30,941✔
626
          SEp    *pOrigEp = &pOrig->eps[pOrig->inUse];
30,941✔
627
          SEp    *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
30,941✔
628
          tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
30,941✔
629
                   pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
630
                   pNewEp->port);
631

632
          updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
30,941✔
633
        }
634
      }
635

636
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
34,734,852✔
637
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
34,734,852✔
638
      pTscObj->connId = pRsp->query->connId;
34,734,852✔
639
      tscTrace("connId:%u, hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
34,734,852✔
640
               pTscObj->pAppInfo->totalDnodes);
641

642
      if (pRsp->query->killRid) {
34,734,852✔
643
        tscDebug("QID:0x%" PRIx64 ", need to be killed now", pRsp->query->killRid);
2✔
644
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
2✔
645
        if (NULL == pRequest) {
2✔
646
          tscDebug("QID:0x%" PRIx64 ", not exist to kill", pRsp->query->killRid);
×
647
        } else {
648
          taos_stop_query((TAOS_RES *)pRequest);
2✔
649
          (void)releaseRequest(pRsp->query->killRid);
2✔
650
        }
651
      }
652

653
      if (pRsp->query->killConnection) {
34,734,852✔
654
        taos_close_internal(pTscObj);
×
655
      }
656

657
      if (pRsp->query->pQnodeList) {
34,734,852✔
658
        if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) {
341,571✔
659
          tscWarn("update qnode list failed");
×
660
        }
661
      }
662

663
      releaseTscObj(pRsp->connKey.tscRid);
34,734,852✔
664
    }
665
  }
666

667
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
34,748,486✔
668

669
  tscDebug("hb got %d rsp kv", kvNum);
34,748,486✔
670

671
  if (kvNum > 0) {
34,748,486✔
672
    struct SCatalog *pCatalog = NULL;
26,059,488✔
673
    int32_t          code = catalogGetHandle(pReq->clusterId, &pCatalog);
26,059,488✔
674
    if (code != TSDB_CODE_SUCCESS) {
26,059,488✔
675
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
×
676
    } else {
677
      hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr);
26,059,488✔
678
    }
679
  }
680

681
  taosHashRelease(pAppHbMgr->activeInfo, pReq);
34,748,486✔
682

683
  return TSDB_CODE_SUCCESS;
34,748,486✔
684
}
685

686
/* Choose execution phase and phase start time for desc.
687
 * Rules:
688
 *  - If both timestamps are invalid (<= 0), keep the request's stored values.
689
 *  - If only one timestamp is valid, use the valid one.
690
 *  - If both are valid, prefer the job's phase/time when jobPhaseTime >= request's phaseStartTime;
691
 *    otherwise prefer the request's record.
692
 *
693
 * Note: If the local system clock was moved backwards (time rollback), jobPhaseTime may appear
694
 * earlier than the request's phase start time even when the job record is actually newer.
695
 * This logic avoids reporting a regressed start time in that situation.
696
 */
697
static void hbSetRequestPhase(SRequestObj *pRequest, SQueryDesc *desc) {
20,703,460✔
698
  int32_t jobPhase = QUERY_PHASE_NONE;
20,703,460✔
699
  int64_t jobPhaseTime = 0;
20,703,460✔
700
  int32_t phaseCode = schedulerGetJobPhase(pRequest->body.queryJob, &jobPhase, &jobPhaseTime);
20,703,460✔
701
  if (phaseCode != TSDB_CODE_SUCCESS) {
20,703,460✔
702
    tscWarn("get job phase failed, code:%d", phaseCode);
6,014,418✔
703
    desc->execPhase = CLIENT_GET_REQUEST_PHASE(pRequest);
6,014,418✔
704
    desc->phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
6,014,418✔
705
    return;
6,014,418✔
706
  }
707

708
  tscDebug("get job phase success, jobPhase:%d, jobPhaseTime:%" PRId64, jobPhase, jobPhaseTime);
14,689,042✔
709
  int64_t phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
14,689,042✔
710
  int32_t phaseStatus = CLIENT_GET_REQUEST_PHASE(pRequest);
14,689,042✔
711

712
  if (jobPhaseTime <= 0 && phaseStartTime <= 0) {
14,689,042✔
713
    /* No valid time available, keep original behavior */
714
    desc->phaseStartTime = phaseStartTime;
×
715
    desc->execPhase = phaseStatus;
×
716
  } else if (jobPhaseTime <= 0) {
14,689,042✔
717
    desc->phaseStartTime = phaseStartTime;
4,462✔
718
    desc->execPhase = phaseStatus;
4,462✔
719
  } else if (phaseStartTime <= 0) {
14,684,580✔
720
    desc->phaseStartTime = jobPhaseTime;
32,702✔
721
    desc->execPhase = jobPhase;
32,702✔
722
  } else if (jobPhaseTime >= phaseStartTime) {
14,651,878✔
723
    /* Job record is newer (or equal, prefer job) */
724
    desc->phaseStartTime = jobPhaseTime;
12,479,296✔
725
    desc->execPhase = jobPhase;
12,479,296✔
726
  } else {
727
    /* Request record is newer */
728
    desc->phaseStartTime = phaseStartTime;
2,172,582✔
729
    desc->execPhase = phaseStatus;
2,172,582✔
730
  }
731
}
732

733
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
28,670,792✔
734
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
28,670,792✔
735
    goto _return;
×
736
  }
737

738
  static int32_t    emptyRspNum = 0;
739
  int32_t           idx = *(int32_t *)param;
28,670,792✔
740
  SClientHbBatchRsp pRsp = {0};
28,670,792✔
741
  if (TSDB_CODE_SUCCESS == code) {
28,670,261✔
742
    code = tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
28,444,968✔
743
    if (TSDB_CODE_SUCCESS != code) {
28,443,699✔
744
      tscError("deserialize hb rsp failed");
56✔
745
    }
746
    int32_t now = taosGetTimestampSec();
28,443,699✔
747
    int32_t delta = abs(now - pRsp.svrTimestamp);
28,441,373✔
748
    if (delta > tsTimestampDeltaLimit) {
28,441,373✔
749
      code = TSDB_CODE_TIME_UNSYNCED;
56✔
750
      tscError("time diff:%ds is too big", delta);
56✔
751
    }
752
  }
753

754
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
28,666,666✔
755

756
  (void)taosThreadMutexLock(&clientHbMgr.lock);
28,669,383✔
757

758
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
28,670,792✔
759
  if (pAppHbMgr == NULL) {
28,670,792✔
760
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
761
    tscError("appHbMgr not exist, idx:%d", idx);
×
762
    taosMemoryFree(pMsg->pData);
×
763
    taosMemoryFree(pMsg->pEpSet);
×
764
    tFreeClientHbBatchRsp(&pRsp);
765
    return TSDB_CODE_OUT_OF_RANGE;
×
766
  }
767

768
  SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
28,670,792✔
769

770
  if (code != 0) {
28,670,792✔
771
    pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
225,726✔
772
    tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
225,726✔
773
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
225,726✔
774
    taosMemoryFree(pMsg->pData);
225,726✔
775
    taosMemoryFree(pMsg->pEpSet);
225,726✔
776
    tFreeClientHbBatchRsp(&pRsp);
777
    return code;
225,726✔
778
  }
779

780
  pInst->serverCfg.monitorParas = pRsp.monitorParas;
28,445,066✔
781
  pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
28,445,066✔
782
  pInst->serverCfg.enableAuditSelect = pRsp.enableAuditSelect;
28,445,066✔
783
  pInst->serverCfg.enableAuditInsert = pRsp.enableAuditInsert;
28,445,066✔
784
  pInst->serverCfg.auditLevel = pRsp.auditLevel;
28,445,066✔
785
  pInst->serverCfg.enableStrongPass = pRsp.enableStrongPass;
28,445,066✔
786
  tsEnableStrongPassword = pInst->serverCfg.enableStrongPass;
28,445,066✔
787
  tscDebug("monitor paras from hb, clusterId:0x%" PRIx64 ", threshold:%d scope:%d", pInst->clusterId,
28,445,066✔
788
           pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
789

790
  if (rspNum) {
28,445,066✔
791
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
27,793,125✔
792
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
793
  } else {
794
    (void)atomic_add_fetch_32(&emptyRspNum, 1);
651,941✔
795
  }
796

797
  for (int32_t i = 0; i < rspNum; ++i) {
63,301,304✔
798
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
34,856,238✔
799
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
34,856,238✔
800
    if (code) {
34,856,238✔
801
      break;
×
802
    }
803
  }
804

805
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
28,445,066✔
806

807
  tFreeClientHbBatchRsp(&pRsp);
808

809
_return:
28,445,066✔
810
  taosMemoryFree(pMsg->pData);
28,445,066✔
811
  taosMemoryFree(pMsg->pEpSet);
28,445,066✔
812
  return code;
28,445,066✔
813
}
814

815
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
31,518,547✔
816
  int64_t    now = taosGetTimestampUs();
31,518,547✔
817
  SQueryDesc desc = {0};
31,518,547✔
818
  int32_t    code = 0;
31,518,547✔
819

820
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
31,518,547✔
821
  while (pIter != NULL) {
63,809,295✔
822
    int64_t     *rid = pIter;
32,290,748✔
823
    SRequestObj *pRequest = acquireRequest(*rid);
32,290,748✔
824
    if (NULL == pRequest) {
32,290,748✔
825
      pIter = taosHashIterate(pObj->pRequests, pIter);
25,781✔
826
      continue;
25,781✔
827
    }
828

829
    if (pRequest->killed || 0 == pRequest->body.queryJob) {
32,264,967✔
830
      (void)releaseRequest(*rid);
11,561,507✔
831
      pIter = taosHashIterate(pObj->pRequests, pIter);
11,561,507✔
832
      continue;
11,561,507✔
833
    }
834

835
    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
20,703,460✔
836
    desc.stime = pRequest->metric.start / 1000;
20,703,460✔
837
    desc.queryId = pRequest->requestId;
20,703,460✔
838
    desc.useconds = now - pRequest->metric.start;
20,703,460✔
839
    desc.reqRid = pRequest->self;
20,703,460✔
840
    desc.stableQuery = pRequest->stableQuery;
20,703,460✔
841
    desc.isSubQuery = pRequest->isSubReq;
20,703,460✔
842
    code = taosGetFqdn(desc.fqdn);
20,703,460✔
843
    if (TSDB_CODE_SUCCESS != code) {
20,703,460✔
844
      (void)releaseRequest(*rid);
×
845
      tscError("get fqdn failed");
×
846
      return TSDB_CODE_FAILED;
×
847
    }
848
    desc.subPlanNum = pRequest->body.subplanNum;
20,703,460✔
849

850
    /* Determine and set the execution phase and its start time for this request. */
851
    hbSetRequestPhase(pRequest, &desc);
20,703,460✔
852

853
    if (desc.subPlanNum) {
20,703,460✔
854
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
20,703,460✔
855
      if (NULL == desc.subDesc) {
20,703,460✔
856
        (void)releaseRequest(*rid);
×
857
        return terrno;
×
858
      }
859

860
      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
20,703,460✔
861
      if (code) {
20,703,460✔
862
        taosArrayDestroy(desc.subDesc);
6,079,957✔
863
        desc.subDesc = NULL;
6,079,957✔
864
        code = TSDB_CODE_SUCCESS;
6,079,957✔
865
      }
866
      desc.subPlanNum = taosArrayGetSize(desc.subDesc);
20,703,460✔
867
    } else {
868
      desc.subDesc = NULL;
×
869
    }
870

871
    (void)releaseRequest(*rid);
20,703,460✔
872
    if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) {
41,406,920✔
873
      taosArrayDestroy(desc.subDesc);
×
874
      return terrno;
×
875
    }
876

877
    pIter = taosHashIterate(pObj->pRequests, pIter);
20,703,460✔
878
  }
879

880
  return code;
31,518,547✔
881
}
882

883
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
35,183,688✔
884
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
35,183,688✔
885
  if (NULL == pTscObj) {
35,183,688✔
886
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
410✔
887
    return terrno;
410✔
888
  }
889

890
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
35,183,278✔
891
  if (NULL == hbBasic) {
35,183,278✔
892
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
×
893
    releaseTscObj(connKey->tscRid);
×
894
    return terrno;
×
895
  }
896

897
  hbBasic->connId = pTscObj->connId;
35,183,278✔
898

899
  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
35,183,278✔
900
  if (numOfQueries <= 0) {
35,183,278✔
901
    req->query = hbBasic;
3,664,731✔
902
    releaseTscObj(connKey->tscRid);
3,664,731✔
903
    tscDebug("no queries on connection");
3,664,731✔
904
    return TSDB_CODE_SUCCESS;
3,664,731✔
905
  }
906

907
  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
31,518,547✔
908
  if (NULL == hbBasic->queryDesc) {
31,518,547✔
909
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
×
910
    releaseTscObj(connKey->tscRid);
×
911
    taosMemoryFree(hbBasic);
×
912
    return terrno;
×
913
  }
914

915
  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
31,518,547✔
916
  if (code) {
31,518,547✔
917
    releaseTscObj(connKey->tscRid);
×
918
    if (hbBasic->queryDesc) {
×
919
      taosArrayDestroyEx(hbBasic->queryDesc, tFreeClientHbQueryDesc);
×
920
    }
921
    taosMemoryFree(hbBasic);
×
922
    return code;
×
923
  }
924

925
  req->query = hbBasic;
31,518,547✔
926
  releaseTscObj(connKey->tscRid);
31,518,547✔
927

928
  return TSDB_CODE_SUCCESS;
31,518,547✔
929
}
930

931
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
1,052,472✔
932
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
1,052,472✔
933
  if (!pTscObj) {
1,052,472✔
934
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
850✔
935
    return terrno;
850✔
936
  }
937

938
  int32_t code = 0;
1,051,622✔
939

940
  SKv  kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
1,051,622✔
941
  SKv *pKv = NULL;
1,051,622✔
942
  if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
1,051,622✔
943
    int32_t           userNum = pKv->valueLen / sizeof(SUserAuthVersion);
538,755✔
944
    SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
538,755✔
945
    for (int32_t i = 0; i < userNum; ++i) {
553,172✔
946
      SUserAuthVersion *pUserAuth = userAuths + i;
553,172✔
947
      // both key and user exist, update version
948
      if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
553,172✔
949
        pUserAuth->version = htonl(-1);  // force get userAuthInfo
538,755✔
950
        goto _return;
538,755✔
951
      }
952
    }
953
    // key exists, user not exist, append user
954
    SUserAuthVersion *qUserAuth =
×
955
        (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
×
956
    if (qUserAuth) {
×
957
      tstrncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
×
958
      (qUserAuth + userNum)->version = htonl(-1);  // force get userAuthInfo
×
959
      pKv->value = qUserAuth;
×
960
      pKv->valueLen += sizeof(SUserAuthVersion);
×
961
    } else {
962
      code = terrno;
×
963
    }
964
    goto _return;
×
965
  }
966

967
  // key/user not exist, add user
968
  SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
512,867✔
969
  if (!user) {
512,867✔
970
    code = terrno;
×
971
    goto _return;
×
972
  }
973
  tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN);
512,867✔
974
  user->version = htonl(-1);  // force get userAuthInfo
512,867✔
975
  kv.valueLen = sizeof(SUserAuthVersion);
512,867✔
976
  kv.value = user;
512,867✔
977

978
  tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
512,867✔
979
           pTscObj->authVer, connKey->tscRid);
980

981
  if (!req->info) {
512,867✔
982
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
512,867✔
983
    if (NULL == req->info) {
512,867✔
984
      code = terrno;
×
985
      goto _return;
×
986
    }
987
  }
988

989
  if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) {
512,867✔
990
    taosMemoryFree(user);
×
991
    code = terrno ? terrno : TSDB_CODE_APP_ERROR;
×
992
    goto _return;
×
993
  }
994

995
_return:
1,034,181✔
996
  releaseTscObj(connKey->tscRid);
1,051,622✔
997
  if (code) {
1,051,622✔
998
    tscError("hb got user auth info failed since %s", tstrerror(code));
×
999
  }
1000

1001
  return code;
1,051,622✔
1002
}
1003

1004
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
26,052,163✔
1005
  SUserAuthVersion *users = NULL;
26,052,163✔
1006
  uint32_t          userNum = 0;
26,052,163✔
1007
  int32_t           code = 0;
26,052,163✔
1008

1009
  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
26,052,163✔
1010
  if (TSDB_CODE_SUCCESS != code) {
26,052,163✔
1011
    return code;
×
1012
  }
1013

1014
  if (userNum <= 0) {
26,052,163✔
1015
    taosMemoryFree(users);
243,384✔
1016
    return TSDB_CODE_SUCCESS;
243,384✔
1017
  }
1018

1019
  for (int32_t i = 0; i < userNum; ++i) {
52,083,133✔
1020
    SUserAuthVersion *user = &users[i];
26,274,354✔
1021
    user->version = htonl(user->version);
26,274,354✔
1022
  }
1023

1024
  SKv kv = {
25,808,779✔
1025
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
1026
      .valueLen = sizeof(SUserAuthVersion) * userNum,
25,808,779✔
1027
      .value = users,
1028
  };
1029

1030
  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
25,808,779✔
1031

1032
  if (NULL == req->info) {
25,808,779✔
1033
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
25,808,779✔
1034
    if (NULL == req->info) {
25,808,779✔
1035
      taosMemoryFree(users);
×
1036
      return terrno;
×
1037
    }
1038
  }
1039

1040
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
25,808,779✔
1041
  if (TSDB_CODE_SUCCESS != code) {
25,808,779✔
1042
    taosMemoryFree(users);
×
1043
    return code;
×
1044
  }
1045

1046
  return TSDB_CODE_SUCCESS;
25,808,779✔
1047
}
1048

1049
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
28,040,062✔
1050
  SDbCacheInfo *dbs = NULL;
28,040,062✔
1051
  uint32_t      dbNum = 0;
28,040,062✔
1052
  int32_t       code = 0;
28,040,062✔
1053

1054
  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
28,040,062✔
1055
  if (TSDB_CODE_SUCCESS != code) {
28,040,062✔
1056
    return code;
×
1057
  }
1058

1059
  if (dbNum <= 0) {
28,040,062✔
1060
    taosMemoryFree(dbs);
19,523,028✔
1061
    return TSDB_CODE_SUCCESS;
19,523,028✔
1062
  }
1063

1064
  for (int32_t i = 0; i < dbNum; ++i) {
18,858,202✔
1065
    SDbCacheInfo *db = &dbs[i];
10,341,168✔
1066
    tscDebug("the %dth expired db:%s, dbId:%" PRId64
10,341,168✔
1067
             ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
1068
             i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
1069

1070
    db->dbId = htobe64(db->dbId);
10,341,168✔
1071
    db->vgVersion = htonl(db->vgVersion);
10,341,168✔
1072
    db->cfgVersion = htonl(db->cfgVersion);
10,341,168✔
1073
    db->numOfTable = htonl(db->numOfTable);
10,341,168✔
1074
    db->stateTs = htobe64(db->stateTs);
10,341,168✔
1075
    db->tsmaVersion = htonl(db->tsmaVersion);
10,341,168✔
1076
  }
1077

1078
  SKv kv = {
8,517,034✔
1079
      .key = HEARTBEAT_KEY_DBINFO,
1080
      .valueLen = sizeof(SDbCacheInfo) * dbNum,
8,517,034✔
1081
      .value = dbs,
1082
  };
1083

1084
  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
8,517,034✔
1085

1086
  if (NULL == req->info) {
8,517,034✔
1087
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
83✔
1088
    if (NULL == req->info) {
83✔
1089
      taosMemoryFree(dbs);
×
1090
      return terrno;
×
1091
    }
1092
  }
1093

1094
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
8,517,034✔
1095
  if (TSDB_CODE_SUCCESS != code) {
8,517,034✔
1096
    taosMemoryFree(dbs);
×
1097
    return code;
×
1098
  }
1099

1100
  return TSDB_CODE_SUCCESS;
8,517,034✔
1101
}
1102

1103
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
28,040,062✔
1104
  SSTableVersion *stbs = NULL;
28,040,062✔
1105
  uint32_t        stbNum = 0;
28,040,062✔
1106
  int32_t         code = 0;
28,040,062✔
1107

1108
  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
28,040,062✔
1109
  if (TSDB_CODE_SUCCESS != code) {
28,040,062✔
1110
    return code;
×
1111
  }
1112

1113
  if (stbNum <= 0) {
28,040,062✔
1114
    taosMemoryFree(stbs);
22,249,250✔
1115
    return TSDB_CODE_SUCCESS;
22,249,250✔
1116
  }
1117

1118
  for (int32_t i = 0; i < stbNum; ++i) {
15,289,672✔
1119
    SSTableVersion *stb = &stbs[i];
9,498,860✔
1120
    stb->suid = htobe64(stb->suid);
9,498,860✔
1121
    stb->sversion = htonl(stb->sversion);
9,498,860✔
1122
    stb->tversion = htonl(stb->tversion);
9,498,860✔
1123
    stb->smaVer = htonl(stb->smaVer);
9,498,860✔
1124
  }
1125

1126
  SKv kv = {
5,790,812✔
1127
      .key = HEARTBEAT_KEY_STBINFO,
1128
      .valueLen = sizeof(SSTableVersion) * stbNum,
5,790,812✔
1129
      .value = stbs,
1130
  };
1131

1132
  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
5,790,812✔
1133

1134
  if (NULL == req->info) {
5,790,812✔
1135
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
70✔
1136
    if (NULL == req->info) {
70✔
1137
      taosMemoryFree(stbs);
×
1138
      return terrno;
×
1139
    }
1140
  }
1141

1142
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
5,790,812✔
1143
  if (TSDB_CODE_SUCCESS != code) {
5,790,812✔
1144
    taosMemoryFree(stbs);
×
1145
    return code;
×
1146
  }
1147

1148
  return TSDB_CODE_SUCCESS;
5,790,812✔
1149
}
1150

1151
int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
28,040,062✔
1152
  SViewVersion    *views = NULL;
28,040,062✔
1153
  uint32_t         viewNum = 0;
28,040,062✔
1154
  int32_t          code = 0;
28,040,062✔
1155
  SDynViewVersion *pDynViewVer = NULL;
28,040,062✔
1156

1157
  TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer));
28,040,062✔
1158

1159
  if (viewNum <= 0) {
28,040,062✔
1160
    taosMemoryFree(views);
28,027,657✔
1161
    taosMemoryFree(pDynViewVer);
28,027,657✔
1162
    return TSDB_CODE_SUCCESS;
28,027,657✔
1163
  }
1164

1165
  for (int32_t i = 0; i < viewNum; ++i) {
25,345✔
1166
    SViewVersion *view = &views[i];
12,940✔
1167
    view->dbId = htobe64(view->dbId);
12,940✔
1168
    view->viewId = htobe64(view->viewId);
12,940✔
1169
    view->version = htonl(view->version);
12,940✔
1170
  }
1171

1172
  tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
12,405✔
1173

1174
  if (NULL == req->info) {
12,405✔
1175
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1176
    if (NULL == req->info) {
×
1177
      TSC_ERR_JRET(terrno);
×
1178
    }
1179
  }
1180

1181
  SKv kv = {
12,405✔
1182
      .key = HEARTBEAT_KEY_DYN_VIEW,
1183
      .valueLen = sizeof(SDynViewVersion),
1184
      .value = pDynViewVer,
1185
  };
1186

1187
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
12,405✔
1188

1189
  kv.key = HEARTBEAT_KEY_VIEWINFO;
12,405✔
1190
  kv.valueLen = sizeof(SViewVersion) * viewNum;
12,405✔
1191
  kv.value = views;
12,405✔
1192

1193
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
12,405✔
1194
  return TSDB_CODE_SUCCESS;
12,405✔
1195
_return:
×
1196
  taosMemoryFree(views);
×
1197
  taosMemoryFree(pDynViewVer);
×
1198
  return code;
×
1199
}
1200

1201
int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) {
28,040,062✔
1202
  int32_t       code = 0;
28,040,062✔
1203
  uint32_t      tsmaNum = 0;
28,040,062✔
1204
  STSMAVersion *tsmas = NULL;
28,040,062✔
1205

1206
  code = catalogGetExpiredTsmas(pCatalog, &tsmas, &tsmaNum);
28,040,062✔
1207
  if (code) {
28,040,062✔
1208
    taosMemoryFree(tsmas);
×
1209
    return code;
×
1210
  }
1211

1212
  if (tsmaNum <= 0) {
28,040,062✔
1213
    taosMemoryFree(tsmas);
28,030,718✔
1214
    return TSDB_CODE_SUCCESS;
28,030,718✔
1215
  }
1216

1217
  for (int32_t i = 0; i < tsmaNum; ++i) {
18,688✔
1218
    STSMAVersion *tsma = &tsmas[i];
9,344✔
1219
    tsma->dbId = htobe64(tsma->dbId);
9,344✔
1220
    tsma->tsmaId = htobe64(tsma->tsmaId);
9,344✔
1221
    tsma->version = htonl(tsma->version);
9,344✔
1222
  }
1223

1224
  tscDebug("hb got %d expred tsmas, valueLen:%lu", tsmaNum, sizeof(STSMAVersion) * tsmaNum);
9,344✔
1225

1226
  if (!pReq->info) {
9,344✔
1227
    pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1228
    if (!pReq->info) {
×
1229
      taosMemoryFree(tsmas);
×
1230
      return terrno;
×
1231
    }
1232
  }
1233

1234
  SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas};
9,344✔
1235
  code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
9,344✔
1236
  if (TSDB_CODE_SUCCESS != code) {
9,344✔
1237
    taosMemoryFree(tsmas);
×
1238
    return code;
×
1239
  }
1240
  return TSDB_CODE_SUCCESS;
9,344✔
1241
}
1242

1243
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
35,183,278✔
1244
  SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
35,183,278✔
1245
  if (NULL != pApp) {
35,183,278✔
1246
    (void)memcpy(&req->app, pApp, sizeof(*pApp));
35,183,278✔
1247
  } else {
1248
    (void)memset(&req->app.summary, 0, sizeof(req->app.summary));
×
1249
    req->app.pid = taosGetPId();
×
1250
    req->app.appId = clientHbMgr.appId;
×
1251
    TSC_ERR_RET(taosGetAppName(req->app.name, NULL));
×
1252
  }
1253

1254
  return TSDB_CODE_SUCCESS;
35,183,278✔
1255
}
1256

1257
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
35,183,688✔
1258
  int32_t   code = 0;
35,183,688✔
1259
  SHbParam *hbParam = (SHbParam *)param;
35,183,688✔
1260
  SCatalog *pCatalog = NULL;
35,183,688✔
1261

1262
  code = hbGetQueryBasicInfo(connKey, req);
35,183,688✔
1263
  if (code != TSDB_CODE_SUCCESS) {
35,183,688✔
1264
    tscWarn("hbGetQueryBasicInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
410✔
1265
    return code;
410✔
1266
  }
1267

1268
  if (hbParam->reqCnt == 0) {
35,183,278✔
1269
    code = catalogGetHandle(hbParam->clusterId, &pCatalog);
28,040,912✔
1270
    if (code != TSDB_CODE_SUCCESS) {
28,040,912✔
1271
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1272
      return code;
×
1273
    }
1274

1275
    code = hbGetAppInfo(hbParam->clusterId, req);
28,040,912✔
1276
    if (TSDB_CODE_SUCCESS != code) {
28,040,912✔
1277
      tscWarn("getAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1278
      return code;
×
1279
    }
1280

1281
    if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
28,040,912✔
1282
      code = hbGetExpiredUserInfo(connKey, pCatalog, req);
26,052,163✔
1283
      if (TSDB_CODE_SUCCESS != code) {
26,052,163✔
1284
        tscWarn("hbGetExpiredUserInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1285
        return code;
×
1286
      }
1287
      if (clientHbMgr.appHbHash) {
26,052,163✔
1288
        code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
3,415,209✔
1289
        if (TSDB_CODE_SUCCESS != code) {
3,415,209✔
1290
          tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId,
×
1291
                  tstrerror(code));
1292
          return code;
×
1293
        }
1294
      }
1295
    }
1296

1297
    // invoke after hbGetExpiredUserInfo
1298
    if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
28,040,912✔
1299
      code = hbGetUserAuthInfo(connKey, hbParam, req);
1,052,472✔
1300
      if (TSDB_CODE_SUCCESS != code) {
1,052,472✔
1301
        tscWarn("hbGetUserAuthInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
850✔
1302
        return code;
850✔
1303
      }
1304
      atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
1,051,622✔
1305
    }
1306

1307
    code = hbGetExpiredDBInfo(connKey, pCatalog, req);
28,040,062✔
1308
    if (TSDB_CODE_SUCCESS != code) {
28,040,062✔
1309
      tscWarn("hbGetExpiredDBInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1310
      return code;
×
1311
    }
1312

1313
    code = hbGetExpiredStbInfo(connKey, pCatalog, req);
28,040,062✔
1314
    if (TSDB_CODE_SUCCESS != code) {
28,040,062✔
1315
      tscWarn("hbGetExpiredStbInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1316
      return code;
×
1317
    }
1318

1319
#ifdef TD_ENTERPRISE
1320
    code = hbGetExpiredViewInfo(connKey, pCatalog, req);
28,040,062✔
1321
    if (TSDB_CODE_SUCCESS != code) {
28,040,062✔
1322
      tscWarn("hbGetExpiredViewInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1323
      return code;
×
1324
    }
1325
#endif
1326
    code = hbGetExpiredTSMAInfo(connKey, pCatalog, req);
28,040,062✔
1327
    if (TSDB_CODE_SUCCESS != code) {
28,040,062✔
1328
      tscWarn("hbGetExpiredTSMAInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1329
      return code;
×
1330
    }
1331
  } else {
1332
    code = hbGetAppInfo(hbParam->clusterId, req);
7,142,366✔
1333
    if (TSDB_CODE_SUCCESS != code) {
7,142,366✔
1334
      tscWarn("hbGetAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1335
      return code;
×
1336
    }
1337
  }
1338

1339
  ++hbParam->reqCnt;  // success to get catalog info
35,182,428✔
1340

1341
  return TSDB_CODE_SUCCESS;
35,182,428✔
1342
}
1343

1344
static FORCE_INLINE void hbMgrInitHandle() {
1345
  // init all handle
1346
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
1,401,665✔
1347
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbQueryHbReqHandle;
1,401,665✔
1348

1349
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
1,401,665✔
1350
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbQueryHbRspHandle;
1,401,665✔
1351
}
1,401,665✔
1352

1353
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
28,695,223✔
1354
  *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
28,695,223✔
1355
  if (pBatchReq == NULL) {
28,695,223✔
1356
    return terrno;
×
1357
  }
1358
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
28,695,223✔
1359
  (*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
28,695,223✔
1360
  if (!(*pBatchReq)->reqs) {
28,695,223✔
1361
    tFreeClientHbBatchReq(*pBatchReq);
×
1362
    return terrno;
×
1363
  }
1364

1365
  int64_t  maxIpWhiteVer = 0;
28,695,223✔
1366
  void    *pIter = NULL;
28,695,223✔
1367
  SHbParam param = {0};
28,695,223✔
1368
  while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
65,607,882✔
1369
    SClientHbReq *pOneReq = pIter;
36,912,659✔
1370
    SClientHbKey *connKey = &pOneReq->connKey;
36,912,659✔
1371
    STscObj      *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
36,912,659✔
1372

1373
    if (!pTscObj || atomic_load_8(&pTscObj->dropped) == 1) {
36,912,659✔
1374
      if (pTscObj) releaseTscObj(connKey->tscRid);
1,728,971✔
1375
      continue;
1,728,971✔
1376
    }
1377

1378
    tstrncpy(pOneReq->userApp, pTscObj->optionInfo.userApp, sizeof(pOneReq->userApp));
35,183,688✔
1379
    tstrncpy(pOneReq->cInfo, pTscObj->optionInfo.cInfo, sizeof(pOneReq->cInfo));
35,183,688✔
1380
    pOneReq->userIp = pTscObj->optionInfo.userIp;
35,183,688✔
1381
    pOneReq->userDualIp = pTscObj->optionInfo.userDualIp;
35,183,688✔
1382
    tstrncpy(pOneReq->sVer, td_version, TSDB_VERSION_LEN);
35,183,688✔
1383

1384
    pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
35,183,688✔
1385
    if (NULL == pOneReq) {
35,183,688✔
1386
      releaseTscObj(connKey->tscRid);
×
1387
      continue;
×
1388
    }
1389

1390
    switch (connKey->connType) {
35,183,688✔
1391
      case CONN_TYPE__QUERY:
35,183,688✔
1392
      case CONN_TYPE__TMQ: {
1393
        if (param.clusterId == 0) {
35,183,688✔
1394
          // init
1395
          param.clusterId = pOneReq->clusterId;
28,041,163✔
1396
          param.pAppHbMgr = pAppHbMgr;
28,041,163✔
1397
          param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
28,041,163✔
1398
        }
1399
        break;
35,183,688✔
1400
      }
1401
      default:
×
1402
        break;
×
1403
    }
1404
    if (clientHbMgr.reqHandle[connKey->connType]) {
35,183,688✔
1405
      int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, &param, pOneReq);
35,183,688✔
1406
      if (code) {
35,183,688✔
1407
        tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
1,260✔
1408
                connKey->tscRid, connKey->connType);
1409
      }
1410
    }
1411

1412
    int64_t ver = atomic_load_64(&pTscObj->whiteListInfo.ver);
35,183,688✔
1413
    maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
35,183,688✔
1414
    releaseTscObj(connKey->tscRid);
35,183,688✔
1415
  }
1416
  (*pBatchReq)->ipWhiteListVer = maxIpWhiteVer;
28,695,223✔
1417

1418
  return TSDB_CODE_SUCCESS;
28,695,223✔
1419
}
1420

1421
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
×
1422

1423
void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
4,349,853✔
1424
  dst->numOfInsertsReq += src->numOfInsertsReq;
4,349,853✔
1425
  dst->numOfInsertRows += src->numOfInsertRows;
4,349,853✔
1426
  dst->insertElapsedTime += src->insertElapsedTime;
4,349,853✔
1427
  dst->insertBytes += src->insertBytes;
4,349,853✔
1428
  dst->fetchBytes += src->fetchBytes;
4,349,853✔
1429
  dst->queryElapsedTime += src->queryElapsedTime;
4,349,853✔
1430
  dst->numOfSlowQueries += src->numOfSlowQueries;
4,349,853✔
1431
  dst->totalRequests += src->totalRequests;
4,349,853✔
1432
  dst->currentRequests += src->currentRequests;
4,349,853✔
1433
}
4,349,853✔
1434

1435
int32_t hbGatherAppInfo(void) {
27,578,362✔
1436
  SAppHbReq req = {0};
27,578,362✔
1437
  int32_t   code = TSDB_CODE_SUCCESS;
27,578,362✔
1438
  int       sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
27,578,362✔
1439
  if (sz > 0) {
27,578,362✔
1440
    req.pid = taosGetPId();
27,578,362✔
1441
    req.appId = clientHbMgr.appId;
27,578,362✔
1442
    TSC_ERR_RET(taosGetAppName(req.name, NULL));
27,578,362✔
1443
  }
1444

1445
  taosHashClear(clientHbMgr.appSummary);
27,578,362✔
1446

1447
  for (int32_t i = 0; i < sz; ++i) {
59,528,845✔
1448
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
31,950,483✔
1449
    if (pAppHbMgr == NULL) continue;
31,950,483✔
1450

1451
    int64_t    clusterId = pAppHbMgr->pAppInstInfo->clusterId;
31,950,483✔
1452
    SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
31,950,483✔
1453
    if (NULL == pApp) {
31,950,483✔
1454
      (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
27,600,630✔
1455
      req.startTime = pAppHbMgr->startTime;
27,600,630✔
1456
      TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)));
27,600,630✔
1457
    } else {
1458
      if (pAppHbMgr->startTime < pApp->startTime) {
4,349,853✔
1459
        pApp->startTime = pAppHbMgr->startTime;
×
1460
      }
1461

1462
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
4,349,853✔
1463
    }
1464
  }
1465

1466
  return TSDB_CODE_SUCCESS;
27,578,362✔
1467
}
1468

1469
static void *hbThreadFunc(void *param) {
1,401,665✔
1470
  setThreadName("hb");
1,401,665✔
1471
#ifdef WINDOWS
1472
  if (taosCheckCurrentInDll()) {
1473
    atexit(hbThreadFuncUnexpectedStopped);
1474
  }
1475
#endif
1476
  while (1) {
26,755,221✔
1477
    if (1 == clientHbMgr.threadStop) {
28,156,886✔
1478
      break;
574,255✔
1479
    }
1480

1481
    if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) {
27,582,631✔
1482
      tscError("taosThreadMutexLock failed");
×
1483
      return NULL;
×
1484
    }
1485

1486
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
27,582,631✔
1487
    if (sz > 0) {
27,582,631✔
1488
      if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) {
27,578,362✔
1489
        tscError("hbGatherAppInfo failed");
×
1490
        return NULL;
×
1491
      }
1492
      if (sz > 1 && !clientHbMgr.appHbHash) {
27,578,362✔
1493
        clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
52,588✔
1494
        if (NULL == clientHbMgr.appHbHash) {
52,588✔
1495
          tscError("taosHashInit failed");
×
1496
          return NULL;
×
1497
        }
1498
      }
1499
      taosHashClear(clientHbMgr.appHbHash);
27,578,362✔
1500
    }
1501

1502
    for (int i = 0; i < sz; i++) {
59,533,114✔
1503
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
31,950,483✔
1504
      if (pAppHbMgr == NULL) {
31,950,483✔
1505
        continue;
30,981✔
1506
      }
1507

1508
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
31,950,483✔
1509
      if (connCnt == 0) {
31,950,483✔
1510
        continue;
3,255,260✔
1511
      }
1512
      SClientHbBatchReq *pReq = NULL;
28,695,223✔
1513
      int32_t            code = hbGatherAllInfo(pAppHbMgr, &pReq);
28,695,223✔
1514
      if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
28,695,223✔
1515
        terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
×
1516
        tFreeClientHbBatchReq(pReq);
×
1517
        continue;
×
1518
      }
1519
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
28,695,223✔
1520
      if (tlen == -1) {
28,695,223✔
1521
        tFreeClientHbBatchReq(pReq);
×
1522
        break;
×
1523
      }
1524
      void *buf = taosMemoryMalloc(tlen);
28,695,223✔
1525
      if (buf == NULL) {
28,695,223✔
1526
        tFreeClientHbBatchReq(pReq);
×
1527
        // hbClearReqInfo(pAppHbMgr);
1528
        break;
×
1529
      }
1530

1531
      if (tSerializeSClientHbBatchReq(buf, tlen, pReq) == -1) {
28,695,223✔
1532
        tFreeClientHbBatchReq(pReq);
×
1533
        taosMemoryFree(buf);
×
1534
        break;
×
1535
      }
1536
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
28,695,223✔
1537

1538
      if (pInfo == NULL) {
28,695,223✔
1539
        tFreeClientHbBatchReq(pReq);
×
1540
        // hbClearReqInfo(pAppHbMgr);
1541
        taosMemoryFree(buf);
×
1542
        break;
×
1543
      }
1544
      pInfo->fp = hbAsyncCallBack;
28,695,223✔
1545
      pInfo->msgInfo.pData = buf;
28,695,223✔
1546
      pInfo->msgInfo.len = tlen;
28,695,223✔
1547
      pInfo->msgType = TDMT_MND_HEARTBEAT;
28,695,223✔
1548
      pInfo->param = taosMemoryMalloc(sizeof(int32_t));
28,695,223✔
1549
      if (pInfo->param  == NULL) {
28,695,223✔
1550
        tFreeClientHbBatchReq(pReq);
×
1551
        // hbClearReqInfo(pAppHbMgr);
1552
        taosMemoryFree(buf);
×
1553
        taosMemoryFree(pInfo);
×
1554
        break;
×
1555
      }
1556
      *(int32_t *)pInfo->param = i;
28,695,223✔
1557
      pInfo->paramFreeFp = taosAutoMemoryFree;
28,695,223✔
1558
      pInfo->requestId = generateRequestId();
28,695,223✔
1559
      pInfo->requestObjRefId = 0;
28,695,223✔
1560

1561
      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
28,695,223✔
1562
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
28,695,223✔
1563
      if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
28,695,223✔
1564
        tscWarn("failed to async send msg to server");
×
1565
      }
1566
      tFreeClientHbBatchReq(pReq);
28,695,223✔
1567
      // hbClearReqInfo(pAppHbMgr);
1568
      (void)atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
28,695,223✔
1569
    }
1570

1571
    if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) {
27,582,631✔
1572
      tscError("taosThreadMutexLock failed");
×
1573
      return NULL;
×
1574
    }
1575
    taosMsleep(HEARTBEAT_INTERVAL);
27,582,631✔
1576
  }
1577
  taosHashCleanup(clientHbMgr.appHbHash);
574,255✔
1578
  return NULL;
574,255✔
1579
}
1580

1581
static int32_t hbCreateThread() {
1,401,665✔
1582
  int32_t      code = TSDB_CODE_SUCCESS;
1,401,665✔
1583
  TdThreadAttr thAttr;
1,386,478✔
1584
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
1,401,665✔
1585
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
1,401,665✔
1586
#ifdef TD_COMPACT_OS
1587
  TSC_ERR_JRET(taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL));
1588
#endif
1589

1590
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
1,401,665✔
1591
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1592
    TSC_ERR_RET(terrno);
×
1593
  }
1594
  (void)taosThreadAttrDestroy(&thAttr);
1,401,665✔
1595
_return:
1,401,665✔
1596

1597
  if (code) {
1,401,665✔
1598
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1599
    TSC_ERR_RET(terrno);
×
1600
  }
1601

1602
  return code;
1,401,665✔
1603
}
1604

1605
static void hbStopThread() {
1,403,385✔
1606
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
1,403,385✔
1607
    return;
1,720✔
1608
  }
1609
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
1,401,665✔
1610
    tscDebug("hb thread already stopped");
×
1611
    return;
×
1612
  }
1613

1614
  int32_t code = TSDB_CODE_SUCCESS;
1,401,665✔
1615
  // thread quit mode kill or inner exit from self-thread
1616
  if (clientHbMgr.quitByKill) {
1,401,665✔
1617
    code = taosThreadKill(clientHbMgr.thread, 0);
954,226✔
1618
    if (TSDB_CODE_SUCCESS != code) {
954,226✔
1619
      tscError("taosThreadKill failed since %s", tstrerror(code));
×
1620
    }
1621
  } else {
1622
    code = taosThreadJoin(clientHbMgr.thread, NULL);
447,439✔
1623
    if (TSDB_CODE_SUCCESS != code) {
447,439✔
1624
      tscError("taosThreadJoin failed since %s", tstrerror(code));
×
1625
    }
1626
  }
1627

1628
  tscDebug("hb thread stopped");
1,401,665✔
1629
}
1630

1631
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) {
1,482,679✔
1632
  int32_t code = TSDB_CODE_SUCCESS;
1,482,679✔
1633
  TSC_ERR_RET(hbMgrInit());
1,482,679✔
1634
  *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
1,482,679✔
1635
  if (*pAppHbMgr == NULL) {
1,482,679✔
1636
    TSC_ERR_JRET(terrno);
×
1637
  }
1638
  // init stat
1639
  (*pAppHbMgr)->startTime = taosGetTimestampMs();
2,946,837✔
1640
  (*pAppHbMgr)->connKeyCnt = 0;
1,482,679✔
1641
  (*pAppHbMgr)->connHbFlag = 0;
1,482,679✔
1642
  (*pAppHbMgr)->reportCnt = 0;
1,482,679✔
1643
  (*pAppHbMgr)->reportBytes = 0;
1,482,679✔
1644
  (*pAppHbMgr)->key = taosStrdup(key);
1,482,679✔
1645
  if ((*pAppHbMgr)->key == NULL) {
1,482,679✔
1646
    TSC_ERR_JRET(terrno);
×
1647
  }
1648

1649
  // init app info
1650
  (*pAppHbMgr)->pAppInstInfo = pAppInstInfo;
1,482,679✔
1651

1652
  // init hash info
1653
  (*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1,482,679✔
1654

1655
  if ((*pAppHbMgr)->activeInfo == NULL) {
1,482,679✔
1656
    TSC_ERR_JRET(terrno);
×
1657
  }
1658

1659
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
1660

1661
  TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
1,482,679✔
1662
  if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
2,965,358✔
1663
    code = terrno;
×
1664
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
1665
    goto _return;
×
1666
  }
1667
  (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
1,482,679✔
1668
  TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
1,482,679✔
1669

1670
  return TSDB_CODE_SUCCESS;
1,482,679✔
1671
_return:
×
1672
  taosMemoryFree(*pAppHbMgr);
×
1673
  return code;
×
1674
}
1675

1676
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
1,482,679✔
1677
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
1,482,679✔
1678
  while (pIter != NULL) {
1,670,297✔
1679
    SClientHbReq *pOneReq = pIter;
187,618✔
1680
    tFreeClientHbReq(pOneReq);
1681
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
187,618✔
1682
  }
1683
  taosHashCleanup(pTarget->activeInfo);
1,482,679✔
1684
  pTarget->activeInfo = NULL;
1,482,679✔
1685

1686
  taosMemoryFree(pTarget->key);
1,482,679✔
1687
  taosMemoryFree(pTarget);
1,482,679✔
1688
}
1,482,679✔
1689

1690
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
1,482,679✔
1691
  int32_t code = TSDB_CODE_SUCCESS;
1,482,679✔
1692
  code = taosThreadMutexLock(&clientHbMgr.lock);
1,482,679✔
1693
  if (TSDB_CODE_SUCCESS != code) {
1,482,679✔
1694
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1695
  }
1696
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,482,679✔
1697
  for (int32_t i = 0; i < mgrSize; ++i) {
1,482,679✔
1698
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
×
1699
    if (pItem == *pAppHbMgr) {
×
1700
      hbFreeAppHbMgr(*pAppHbMgr);
×
1701
      *pAppHbMgr = NULL;
×
1702
      taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
×
1703
      break;
×
1704
    }
1705
  }
1706
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,482,679✔
1707
  if (TSDB_CODE_SUCCESS != code) {
1,482,679✔
1708
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1709
  }
1710
}
1,482,679✔
1711

1712
void appHbMgrCleanup(void) {
1,401,665✔
1713
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,401,665✔
1714
  for (int i = 0; i < sz; i++) {
2,884,344✔
1715
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
1,482,679✔
1716
    if (pTarget == NULL) continue;
1,482,679✔
1717
    hbFreeAppHbMgr(pTarget);
1,482,679✔
1718
  }
1719
}
1,401,665✔
1720

1721
int32_t hbMgrInit() {
1,482,679✔
1722
  // init once
1723
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
1,482,679✔
1724
  if (old == 1) return 0;
1,482,679✔
1725

1726
  clientHbMgr.appId = tGenIdPI64();
1,401,665✔
1727
  tscInfo("app initialized, appId:0x%" PRIx64, clientHbMgr.appId);
1,401,665✔
1728

1729
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,401,665✔
1730
  if (NULL == clientHbMgr.appSummary) {
1,401,665✔
1731
    uError("hbMgrInit:taosHashInit error") return terrno;
×
1732
  }
1733
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
1,401,665✔
1734
  if (NULL == clientHbMgr.appHbMgrs) {
1,401,665✔
1735
    uError("hbMgrInit:taosArrayInit error") return terrno;
×
1736
  }
1737
  TdThreadMutexAttr attr = {0};
1,401,665✔
1738

1739
  int ret = taosThreadMutexAttrInit(&attr);
1,401,665✔
1740
  if (ret != 0) {
1,401,665✔
1741
    uError("hbMgrInit:taosThreadMutexAttrInit error") return ret;
×
1742
  }
1743

1744
  ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
1,401,665✔
1745
  if (ret != 0) {
1,401,665✔
1746
    uError("hbMgrInit:taosThreadMutexAttrSetType error") return ret;
×
1747
  }
1748

1749
  ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
1,401,665✔
1750
  if (ret != 0) {
1,401,665✔
1751
    uError("hbMgrInit:taosThreadMutexInit error") return ret;
×
1752
  }
1753

1754
  ret = taosThreadMutexAttrDestroy(&attr);
1,401,665✔
1755
  if (ret != 0) {
1,401,665✔
1756
    uError("hbMgrInit:taosThreadMutexAttrDestroy error") return ret;
×
1757
  }
1758

1759
  // init handle funcs
1760
  hbMgrInitHandle();
1761

1762
  // init backgroud thread
1763
  ret = hbCreateThread();
1,401,665✔
1764
  if (ret != 0) {
1,401,665✔
1765
    uError("hbMgrInit:hbCreateThread error") return ret;
×
1766
  }
1767

1768
  return 0;
1,401,665✔
1769
}
1770

1771
void hbMgrCleanUp() {
1,403,385✔
1772
  hbStopThread();
1,403,385✔
1773

1774
  // destroy all appHbMgr
1775
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
1,403,385✔
1776
  if (old == 0) return;
1,403,385✔
1777

1778
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
1,401,665✔
1779
  if (TSDB_CODE_SUCCESS != code) {
1,401,665✔
1780
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1781
  }
1782
  appHbMgrCleanup();
1,401,665✔
1783
  taosArrayDestroy(clientHbMgr.appHbMgrs);
1,401,665✔
1784
  clientHbMgr.appHbMgrs = NULL;
1,401,665✔
1785
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,401,665✔
1786
  if (TSDB_CODE_SUCCESS != code) {
1,401,665✔
1787
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1788
  }
1789
}
1790

1791
int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, const char* user, const char* tokenName, int64_t clusterId) {
92,537,415✔
1792
  // init hash in activeinfo
1793
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
92,537,415✔
1794
  if (data != NULL) {
92,537,415✔
1795
    return 0;
×
1796
  }
1797
  SClientHbReq hbReq = {0};
92,537,415✔
1798
  hbReq.connKey = connKey;
92,537,415✔
1799
  hbReq.clusterId = clusterId;
92,537,415✔
1800
  tstrncpy(hbReq.user, user, sizeof(hbReq.user));
92,537,415✔
1801
  tstrncpy(hbReq.tokenName, tokenName, sizeof(hbReq.tokenName));
92,537,415✔
1802
  // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1803

1804
  TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)));
92,537,415✔
1805

1806
  (void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
92,537,415✔
1807
  return 0;
92,537,415✔
1808
}
1809

1810
int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, const char* user, const char* tokenName, int64_t clusterId, int8_t connType) {
92,537,415✔
1811
  SClientHbKey connKey = {
92,537,415✔
1812
      .tscRid = tscRefId,
1813
      .connType = connType,
1814
  };
1815

1816
  switch (connType) {
92,537,415✔
1817
    case CONN_TYPE__QUERY:
92,537,415✔
1818
    case CONN_TYPE__TMQ: {
1819
      return hbRegisterConnImpl(pAppHbMgr, connKey, user, tokenName, clusterId);
92,537,415✔
1820
    }
1821
    default:
×
1822
      return 0;
×
1823
  }
1824
}
1825

1826
void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
92,360,808✔
1827
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
92,360,808✔
1828
  if (TSDB_CODE_SUCCESS != code) {
92,363,840✔
1829
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1830
  }
1831
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
92,363,840✔
1832
  if (pAppHbMgr) {
92,363,840✔
1833
    SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
92,363,840✔
1834
    if (pReq) {
92,363,840✔
1835
      tFreeClientHbReq(pReq);
1836
      code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
92,349,797✔
1837
      if (TSDB_CODE_SUCCESS != code) {
92,349,797✔
1838
        tscError("hbDeregisterConn taosHashRemove error, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1839
      }
1840
      taosHashRelease(pAppHbMgr->activeInfo, pReq);
92,349,797✔
1841
      (void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
92,349,797✔
1842
    }
1843
  }
1844
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
92,363,840✔
1845
  if (TSDB_CODE_SUCCESS != code) {
92,363,840✔
1846
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1847
  }
1848
}
92,363,840✔
1849

1850
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
1851
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
955,286✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc