• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5052

13 May 2026 12:00PM UTC coverage: 73.338% (-0.02%) from 73.358%
#5052

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

761 existing lines in 163 files now uncovered.

281469 of 383795 relevant lines covered (73.34%)

134502812.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.46
/source/client/src/clientHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "scheduler.h"
22
#include "tglobal.h"
23
#include "trpc.h"
24

25
typedef struct {
26
  union {
27
    struct {
28
      SAppHbMgr *pAppHbMgr;
29
      int64_t    clusterId;
30
      int32_t    reqCnt;
31
      int8_t     connHbFlag;
32
    };
33
  };
34
} SHbParam;
35

36
SClientHbMgr clientHbMgr = {0};
37

38
static int32_t hbCreateThread();
39
static void    hbStopThread();
40
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg);
41
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
42

43
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
30,251,619✔
44
                                        SAppHbMgr *pAppHbMgr) {
45
  int32_t code = TSDB_CODE_SUCCESS;
30,251,619✔
46

47
  SUserAuthBatchRsp batchRsp = {0};
30,251,619✔
48
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
30,251,619✔
49
    TSC_ERR_JRET(TSDB_CODE_INVALID_MSG);
×
50
  }
51

52
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
30,251,619✔
53
  for (int32_t i = 0; i < numOfBatchs; ++i) {
61,064,460✔
54
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
30,812,841✔
55
    if (NULL == rsp) {
30,812,841✔
56
      code = terrno;
×
57
      goto _return;
×
58
    }
59
    tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
30,812,841✔
60

61
    TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp));
30,812,841✔
62

63
  }
64

65
  if (numOfBatchs > 0) {
30,251,619✔
66
    TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp));
30,251,619✔
67
  }
68

69
  (void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
30,251,619✔
70

71
_return:
30,251,619✔
72
  tFreeSUserAuthBatchRsp(&batchRsp);
30,251,619✔
73
  return code;
30,251,619✔
74
}
75

76
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg) {
42,510,535✔
77
  int32_t code = 0;
42,510,535✔
78
  int32_t lino = 0;
42,510,535✔
79
  if (user == NULL || pCfg == NULL) {
42,510,535✔
80
    return code;
×
81
  }
82

83
  SUserSessCfg cfg = {0, 0, 0, 0, 0};
42,510,535✔
84

85
  if (memcmp(pCfg, &cfg, sizeof(SUserSessCfg)) == 0) {
42,510,535✔
86
    return TSDB_CODE_SUCCESS;
×
87
  }
88
  tscDebug(
42,510,535✔
89
      "update session metric for user:%s, sessPerUser:%d, sessConnTime:%d, sessConnIdleTime:%d, sessMaxConcurrency:%d, "
90
      "sessMaxCallVnodeNum:%d",
91
      user, pCfg->sessPerUser, pCfg->sessConnTime, pCfg->sessConnIdleTime, pCfg->sessMaxConcurrency,
92
      pCfg->sessMaxCallVnodeNum);
93

94
  if (pCfg->sessPerUser != 0) {
42,510,535✔
95
    code = sessMgtUpdataLimit((char *)user, SESSION_PER_USER, pCfg->sessPerUser);
42,510,535✔
96
    TAOS_CHECK_GOTO(code, &lino, _error);
42,510,535✔
97
  }
98

99
  if (pCfg->sessConnTime != 0) {
42,510,535✔
100
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_TIME, pCfg->sessConnTime);
42,510,535✔
101
    TAOS_CHECK_GOTO(code, &lino, _error);
42,510,535✔
102
  }
103

104
  if (pCfg->sessConnIdleTime != 0) {
42,510,535✔
105
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_IDLE_TIME, pCfg->sessConnIdleTime);
42,510,535✔
106
    TAOS_CHECK_GOTO(code, &lino, _error);
42,510,535✔
107
  }
108

109
  if (pCfg->sessMaxConcurrency != 0) {
42,510,535✔
110
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CONCURRENCY, pCfg->sessMaxConcurrency);
42,510,535✔
111
    TAOS_CHECK_GOTO(code, &lino, _error);
42,510,535✔
112
  }
113

114
  if (pCfg->sessMaxCallVnodeNum != 0) {
42,510,535✔
115
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CALL_VNODE_NUM, pCfg->sessMaxCallVnodeNum);
42,510,535✔
116
    TAOS_CHECK_GOTO(code, &lino, _error);
42,510,535✔
117
  }
118
_error:
42,510,535✔
119
  return code;
42,510,535✔
120
}
121
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
30,251,619✔
122
  int32_t code = 0;
30,251,619✔
123
  int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
30,251,619✔
124
  for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
66,326,488✔
125
    SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
36,074,869✔
126
    if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
36,074,869✔
127
      continue;
102,170✔
128
    }
129

130
    SClientHbReq    *pReq = NULL;
35,972,699✔
131
    SGetUserAuthRsp *pRsp = NULL;
35,972,699✔
132
    while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
79,944,832✔
133
      STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
44,061,483✔
134
      if (!pTscObj) {
44,061,483✔
135
        continue;
1,458,647✔
136
      }
137

138
      if (!pRsp) {
42,602,836✔
139
        for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
33,514,121✔
140
          SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
33,424,771✔
141
          if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
33,424,771✔
142
            pRsp = rsp;
32,877,078✔
143
            break;
32,877,078✔
144
          }
145
        }
146
        if (!pRsp) {
32,966,428✔
147
          releaseTscObj(pReq->connKey.tscRid);
89,350✔
148
          taosHashCancelIterate(hbMgr->activeInfo, pReq);
89,350✔
149
          break;
89,350✔
150
        }
151
      }
152

153
      if (pRsp->dropped == 1) {
42,513,486✔
154
        if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
2,398✔
155
          if (pTscObj->userDroppedInfo.fp) {
1,800✔
156
            SPassInfo *dropInfo = &pTscObj->userDroppedInfo;
1,000✔
157
            if (dropInfo->fp) {
1,000✔
158
              (*dropInfo->fp)(dropInfo->param, NULL, TAOS_NOTIFY_USER_DROPPED);
1,000✔
159
            }
160
          }
161
        }
162
        releaseTscObj(pReq->connKey.tscRid);
2,398✔
163
        continue;
2,398✔
164
      }
165

166
      // update token status
167
      if (pTscObj->tokenName[0] != 0) {
42,511,088✔
168
        STokenStatus *status = NULL;
553✔
169
        if (pRsp->tokens != NULL) {
553✔
170
          status = taosHashGet(pRsp->tokens, pTscObj->tokenName, strlen(pTscObj->tokenName) + 1);
×
171
        }
172

173
        STokenEvent event = { 0 };
553✔
174
        tstrncpy(event.tokenName, pTscObj->tokenName, sizeof(event.tokenName));
553✔
175
        if (status == NULL) {
553✔
176
          event.type = TSDB_TOKEN_EVENT_DROPPED;
553✔
177
        } else if (status->enabled == 0) {
×
178
          event.type = TSDB_TOKEN_EVENT_DISABLED;
×
179
        } else if (status->expireTime > 0 && status->expireTime < taosGetTimestampSec()) {
×
180
          event.type = TSDB_TOKEN_EVENT_EXPIRED;
×
181
        } else {
182
          event.type = TSDB_TOKEN_EVENT_MODIFIED;
×
183
          event.expireTime = status->expireTime;
×
184
        }
185

186
        STokenNotifyInfo *tni = &pTscObj->tokenNotifyInfo;
553✔
187
        if (event.type == TSDB_TOKEN_EVENT_MODIFIED) {
553✔
188
          int32_t oldExpireTime;
189
          do {
190
            oldExpireTime = atomic_load_32(&pTscObj->tokenExpireTime);
×
191
            if (oldExpireTime == status->expireTime) {
×
192
              break;
×
193
            }
194
          } while (atomic_val_compare_exchange_32(&pTscObj->tokenExpireTime, oldExpireTime, status->expireTime) != oldExpireTime);
×
195

196
          if (oldExpireTime != status->expireTime && tni->fp) {
×
197
            (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
198
          }
199
          tscDebug("update token %s of user %s: expire time from %d to %d, conn:%" PRIi64, pTscObj->tokenName,
×
200
                   pRsp->user, pTscObj->tokenExpireTime, status->expireTime, pTscObj->id);
201
        } else {
202
          if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
553✔
203
            if (tni->fp) {
553✔
204
              (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
205
            }
206
          }
207

208
          releaseTscObj(pReq->connKey.tscRid);
553✔
209
          continue;
553✔
210
        }
211
      }
212

213
      pTscObj->authVer = pRsp->version;
42,510,535✔
214
      if (hbUpdateUserSessMertric(pTscObj->user, &pRsp->sessCfg) != 0) {
42,510,535✔
215
        tscError("failed to update user session metric, user:%s", pTscObj->user);
×
216
      }
217

218
      if (pTscObj->sysInfo != pRsp->sysInfo) {
42,510,535✔
219
        tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", conn:%" PRIi64, pRsp->user,
2,201✔
220
                 pTscObj->sysInfo, pRsp->sysInfo, pTscObj->id);
221
        pTscObj->sysInfo = pRsp->sysInfo;
2,201✔
222
      }
223

224
      if (pTscObj->minSecLevel != pRsp->minSecLevel) {
42,510,535✔
225
        pTscObj->minSecLevel = pRsp->minSecLevel;
×
226
      }
227
      if (pTscObj->maxSecLevel != pRsp->maxSecLevel) {
42,510,535✔
228
        pTscObj->maxSecLevel = pRsp->maxSecLevel;
×
229
      }
230
      if (pTscObj->enable != (uint8_t)pRsp->enable) {
42,510,535✔
231
        pTscObj->enable = (uint8_t)pRsp->enable;
380✔
232
      }
233

234
      // update password version
235
      if (pTscObj->passInfo.fp) {
42,510,535✔
236
        SPassInfo *passInfo = &pTscObj->passInfo;
800✔
237
        int32_t    oldVer = 0;
800✔
238
        do {
239
          oldVer = atomic_load_32(&passInfo->ver);
800✔
240
          if (oldVer >= pRsp->passVer) {
800✔
241
            break;
400✔
242
          }
243
        } while (atomic_val_compare_exchange_32(&passInfo->ver, oldVer, pRsp->passVer) != oldVer);
400✔
244
        if (oldVer < pRsp->passVer) {
800✔
245
          (*passInfo->fp)(passInfo->param, &pRsp->passVer, TAOS_NOTIFY_PASSVER);
400✔
246
          tscDebug("update passVer of user %s from %d to %d, conn:%" PRIi64, pRsp->user, oldVer,
400✔
247
                   atomic_load_32(&passInfo->ver), pTscObj->id);
248
        }
249
      }
250

251
      // update ip white list version
252
      {
253
        SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
42,510,535✔
254
        int64_t oldVer = 0, newVer = pRsp->whiteListVer;
42,510,535✔
255
        do {
256
          oldVer = atomic_load_64(&whiteListInfo->ver);
42,510,535✔
257
          if (oldVer >= newVer) {
42,510,535✔
258
            break;
42,499,691✔
259
          }
260
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
10,844✔
261

262
        if (oldVer < newVer) {
42,510,535✔
263
          if (whiteListInfo->fp) {
10,844✔
264
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_WHITELIST_VER);
×
265
          }
266
          tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
10,844✔
267
                   oldVer, newVer, pTscObj->id);
268
        }
269
      }
270

271
      // update date time whitelist version
272
      {
273
        SWhiteListInfo *whiteListInfo = &pTscObj->dateTimeWhiteListInfo;
42,510,535✔
274

275
        int64_t oldVer = 0, newVer = pRsp->timeWhiteListVer;
42,510,535✔
276
        do {
277
          oldVer = atomic_load_64(&whiteListInfo->ver);
42,510,535✔
278
          if (oldVer >= newVer) {
42,510,535✔
279
            break;
42,510,535✔
280
          }
281
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
×
282

283
        if (oldVer < newVer) {
42,510,535✔
284
          if (whiteListInfo->fp) {
×
285
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_DATETIME_WHITELIST_VER);
×
286
          }
287
          tscDebug("update date time whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
×
288
                   oldVer, newVer, pTscObj->id);
289
        }
290
      }
291
      
292
      releaseTscObj(pReq->connKey.tscRid);
42,510,535✔
293
    }
294
  }
295
  return 0;
30,251,619✔
296
}
297

298
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
1,885,943✔
299
  int32_t    code = 0;
1,885,943✔
300
  SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
1,885,943✔
301
  if (NULL == vgInfo) {
1,885,943✔
302
    return terrno;
×
303
  }
304

305
  vgInfo->vgVersion = rsp->vgVersion;
1,885,943✔
306
  vgInfo->stateTs = rsp->stateTs;
1,885,943✔
307
  vgInfo->flags = rsp->flags;
1,885,943✔
308
  vgInfo->hashMethod = rsp->hashMethod;
1,885,943✔
309
  vgInfo->hashPrefix = rsp->hashPrefix;
1,885,943✔
310
  vgInfo->hashSuffix = rsp->hashSuffix;
1,885,943✔
311
  vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,885,943✔
312
  if (NULL == vgInfo->vgHash) {
1,885,943✔
313
    tscError("hash init[%d] failed", rsp->vgNum);
×
314
    code = terrno;
×
315
    goto _return;
×
316
  }
317

318
  for (int32_t j = 0; j < rsp->vgNum; ++j) {
6,632,363✔
319
    SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
4,746,420✔
320
    if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
4,746,420✔
321
      tscError("hash push failed, terrno:%d", terrno);
×
322
      code = terrno;
×
323
      goto _return;
×
324
    }
325
  }
326

327
_return:
1,885,943✔
328
  if (code) {
1,885,943✔
329
    taosHashCleanup(vgInfo->vgHash);
×
330
    taosMemoryFreeClear(vgInfo);
×
331
  }
332

333
  *pInfo = vgInfo;
1,885,943✔
334
  return code;
1,885,943✔
335
}
336

337
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
10,151,704✔
338
  int32_t code = 0;
10,151,704✔
339

340
  SDbHbBatchRsp batchRsp = {0};
10,151,704✔
341
  if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
10,151,704✔
342
    terrno = TSDB_CODE_INVALID_MSG;
×
343
    code = terrno;
×
344
    goto _return;
×
345
  }
346

347
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
10,151,704✔
348
  for (int32_t i = 0; i < numOfBatchs; ++i) {
12,358,396✔
349
    SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
2,206,692✔
350
    if (NULL == rsp) {
2,206,692✔
351
      code = terrno;
×
352
      goto _return;
×
353
    }
354
    if (rsp->useDbRsp) {
2,206,692✔
355
      tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
1,565,556✔
356
               rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
357

358
      if (rsp->useDbRsp->vgVersion < 0) {
1,565,556✔
359
        tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
68,648✔
360
        code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
68,648✔
361
      } else {
362
        SDBVgInfo *vgInfo = NULL;
1,496,908✔
363
        code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
1,496,908✔
364
        if (TSDB_CODE_SUCCESS != code) {
1,496,908✔
365
          goto _return;
×
366
        }
367

368
        tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
1,496,908✔
369

370
        TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo));
1,496,908✔
371

372
        if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
1,496,908✔
373
          code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
389,035✔
374
          if (TSDB_CODE_SUCCESS != code) {
389,035✔
375
            goto _return;
×
376
          }
377

378
          TSC_ERR_JRET(catalogUpdateDBVgInfo(
389,035✔
379
              pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
380
              rsp->useDbRsp->uid, vgInfo));
381
        }
382
      }
383
    }
384

385
    if (rsp->cfgRsp) {
2,206,692✔
386
      tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
102,627✔
387
      code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
102,627✔
388
      rsp->cfgRsp = NULL;
102,627✔
389
    }
390
    if (rsp->pTsmaRsp) {
2,206,692✔
391
      if (rsp->pTsmaRsp->pTsmas) {
1,198,356✔
392
        for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
10,010✔
393
          STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
6,468✔
394
          if (NULL == pTsma) {
6,468✔
395
            TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
×
396
          }
397
          TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
6,468✔
398
        }
399
        taosArrayClear(rsp->pTsmaRsp->pTsmas);
3,542✔
400
      } else {
401
        TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId));
1,194,814✔
402
      }
403
    }
404
  }
405

406
_return:
10,151,704✔
407

408
  tFreeSDbHbBatchRsp(&batchRsp);
10,151,704✔
409
  return code;
10,151,704✔
410
}
411

412
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,280,941✔
413
  int32_t code = TSDB_CODE_SUCCESS;
7,280,941✔
414

415
  SSTbHbRsp hbRsp = {0};
7,280,941✔
416
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
7,280,941✔
417
    terrno = TSDB_CODE_INVALID_MSG;
×
418
    return -1;
×
419
  }
420

421
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
7,280,941✔
422
  for (int32_t i = 0; i < numOfMeta; ++i) {
7,369,921✔
423
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
88,980✔
424
    if (NULL == rsp) {
88,980✔
425
      code = terrno;
×
426
      goto _return;
×
427
    }
428
    if (rsp->numOfColumns < 0) {
88,980✔
429
      tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
61,838✔
430
      TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid));
61,838✔
431
    } else {
432
      tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
27,142✔
433
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
27,142✔
434
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
×
435
        tFreeSSTbHbRsp(&hbRsp);
×
436
        return TSDB_CODE_TSC_INVALID_VALUE;
×
437
      }
438

439
      TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp));
27,142✔
440
    }
441
  }
442

443
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
7,280,941✔
444
  for (int32_t i = 0; i < numOfIndex; ++i) {
7,280,941✔
445
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
×
446
    if (NULL == rsp) {
×
447
      code = terrno;
×
448
      goto _return;
×
449
    }
450
    TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp));
×
451
  }
452

453
_return:
7,280,941✔
454
  taosArrayDestroy(hbRsp.pIndexRsp);
7,280,941✔
455
  hbRsp.pIndexRsp = NULL;
7,280,941✔
456

457
  tFreeSSTbHbRsp(&hbRsp);
7,280,941✔
458
  return code;
7,280,941✔
459
}
460

461
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,013✔
462
  return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value);
7,013✔
463
}
464

465
static void hbFreeSViewMetaInRsp(void *p) {
×
466
  if (NULL == p || NULL == *(void **)p) {
×
467
    return;
×
468
  }
469
  SViewMetaRsp *pRsp = *(SViewMetaRsp **)p;
×
470
  tFreeSViewMetaRsp(pRsp);
×
471
  taosMemoryFreeClear(pRsp);
×
472
}
473

474
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,013✔
475
  int32_t code = TSDB_CODE_SUCCESS;
7,013✔
476

477
  SViewHbRsp hbRsp = {0};
7,013✔
478
  if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
7,013✔
479
    taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
×
480
    terrno = TSDB_CODE_INVALID_MSG;
×
481
    return -1;
×
482
  }
483

484
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
7,013✔
485
  for (int32_t i = 0; i < numOfMeta; ++i) {
7,013✔
UNCOV
486
    SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
×
UNCOV
487
    if (NULL == rsp) {
×
488
      code = terrno;
×
489
      goto _return;
×
490
    }
UNCOV
491
    if (rsp->numOfCols < 0) {
×
UNCOV
492
      tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
UNCOV
493
      code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
×
UNCOV
494
      tFreeSViewMetaRsp(rsp);
×
UNCOV
495
      taosMemoryFreeClear(rsp);
×
496
    } else {
497
      tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
498
      code = catalogUpdateViewMeta(pCatalog, rsp);
×
499
    }
UNCOV
500
    TSC_ERR_JRET(code);
×
501
  }
502

503
_return:
7,013✔
504
  taosArrayDestroy(hbRsp.pViewRsp);
7,013✔
505
  return code;
7,013✔
506
}
507

508
static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,546✔
509
  int32_t code = 0;
7,546✔
510

511
  STSMAHbRsp hbRsp = {0};
7,546✔
512
  if (tDeserializeTSMAHbRsp(value, valueLen, &hbRsp)) {
7,546✔
513
    terrno = TSDB_CODE_INVALID_MSG;
×
514
    return -1;
×
515
  }
516

517
  int32_t numOfTsma = taosArrayGetSize(hbRsp.pTsmas);
7,546✔
518
  for (int32_t i = 0; i < numOfTsma; ++i) {
7,546✔
519
    STableTSMAInfo *pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i);
×
520

521
    if (!pTsmaInfo->pFuncs) {
×
522
      tscDebug("hb to remove tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
523
      code = catalogRemoveTSMA(pCatalog, pTsmaInfo);
×
524
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
525
    } else {
526
      tscDebug("hb to update tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
527
      code = catalogUpdateTSMA(pCatalog, &pTsmaInfo);
×
528
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
529
    }
530
    TSC_ERR_JRET(code);
×
531
  }
532

533
_return:
7,546✔
534
  taosArrayDestroy(hbRsp.pTsmas);
7,546✔
535
  return code;
7,546✔
536
}
537

538
static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
30,251,799✔
539
  for (int32_t i = 0; i < kvNum; ++i) {
77,957,635✔
540
    SKv *kv = taosArrayGet(pKvs, i);
47,705,836✔
541
    if (NULL == kv) {
47,705,836✔
542
      tscError("invalid hb kv, idx:%d", i);
×
543
      continue;
×
544
    }
545
    switch (kv->key) {
47,705,836✔
546
      case HEARTBEAT_KEY_USER_AUTHINFO: {
30,251,619✔
547
        if (kv->valueLen <= 0 || NULL == kv->value) {
30,251,619✔
548
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
×
549
          break;
×
550
        }
551
        if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) {
30,251,619✔
552
          tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
553
          break;
×
554
        }
555
        break;
30,251,619✔
556
      }
557
      case HEARTBEAT_KEY_DBINFO: {
10,151,704✔
558
        if (kv->valueLen <= 0 || NULL == kv->value) {
10,151,704✔
559
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
×
560
          break;
×
561
        }
562
        if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) {
10,151,704✔
563
          tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
564
          break;
×
565
        }
566
        break;
10,151,704✔
567
      }
568
      case HEARTBEAT_KEY_STBINFO: {
7,280,941✔
569
        if (kv->valueLen <= 0 || NULL == kv->value) {
7,280,941✔
570
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
×
571
          break;
×
572
        }
573
        if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) {
7,280,941✔
574
          tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
575
          break;
×
576
        }
577
        break;
7,280,941✔
578
      }
579
#ifdef TD_ENTERPRISE
580
      case HEARTBEAT_KEY_DYN_VIEW: {
7,013✔
581
        if (kv->valueLen <= 0 || NULL == kv->value) {
7,013✔
582
          tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
×
583
          break;
×
584
        }
585
        if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) {
7,013✔
586
          tscError("Process dyn view response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
587
          break;
×
588
        }
589
        break;
7,013✔
590
      }
591
      case HEARTBEAT_KEY_VIEWINFO: {
7,013✔
592
        if (kv->valueLen <= 0 || NULL == kv->value) {
7,013✔
593
          tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
×
594
          break;
×
595
        }
596
        if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) {
7,013✔
597
          tscError("Process view info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
598
          break;
×
599
        }
600
        break;
7,013✔
601
      }
602
#endif
603
      case HEARTBEAT_KEY_TSMA: {
7,546✔
604
        if (kv->valueLen <= 0 || !kv->value) {
7,546✔
605
          tscError("Invalid tsma info, len:%d, value:%p", kv->valueLen, kv->value);
×
606
        }
607
        if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) {
7,546✔
608
          tscError("Process tsma info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
609
        }
610
        break;
7,546✔
611
      }
612
      default:
×
613
        tscError("invalid hb key type:%d", kv->key);
×
614
        break;
×
615
    }
616
  }
617
}
30,251,799✔
618

619
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
40,694,038✔
620
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
40,694,038✔
621
  if (NULL == pReq) {
40,694,038✔
622
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
105,831✔
623
            pRsp->connKey.connType);
624
    return TSDB_CODE_SUCCESS;
105,831✔
625
  }
626

627
  if (pRsp->query) {
40,588,207✔
628
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
40,588,207✔
629
    if (NULL == pTscObj) {
40,588,207✔
630
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
13,367✔
631
    } else {
632
      if (pRsp->query->totalDnodes > 1) {
40,574,840✔
633
        SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
9,914,853✔
634
        if (!isEpsetEqual(&originEpset, &pRsp->query->epSet)) {
9,914,853✔
635
          SEpSet *pOrig = &originEpset;
34,857✔
636
          SEp    *pOrigEp = &pOrig->eps[pOrig->inUse];
34,857✔
637
          SEp    *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
34,857✔
638
          tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
34,857✔
639
                   pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
640
                   pNewEp->port);
641

642
          updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
34,857✔
643
        }
644
      }
645

646
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
40,574,840✔
647
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
40,574,840✔
648
      pTscObj->connId = pRsp->query->connId;
40,574,840✔
649
      tscTrace("connId:%u, hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
40,574,840✔
650
               pTscObj->pAppInfo->totalDnodes);
651

652
      if (pRsp->query->killRid) {
40,574,840✔
653
        tscDebug("QID:0x%" PRIx64 ", need to be killed now", pRsp->query->killRid);
1,404✔
654
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
1,404✔
655
        if (NULL == pRequest) {
1,404✔
656
          tscDebug("QID:0x%" PRIx64 ", not exist to kill", pRsp->query->killRid);
×
657
        } else {
658
          taos_stop_query((TAOS_RES *)pRequest);
1,404✔
659
          (void)releaseRequest(pRsp->query->killRid);
1,404✔
660
        }
661
      }
662

663
      if (pRsp->query->killConnection) {
40,574,840✔
664
        taos_close_internal(pTscObj);
×
665
      }
666

667
      if (pRsp->query->pQnodeList) {
40,574,840✔
668
        if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) {
381,706✔
669
          tscWarn("update qnode list failed");
×
670
        }
671
      }
672

673
      releaseTscObj(pRsp->connKey.tscRid);
40,574,840✔
674
    }
675
  }
676

677
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
40,588,207✔
678

679
  tscDebug("hb got %d rsp kv", kvNum);
40,588,207✔
680

681
  if (kvNum > 0) {
40,588,207✔
682
    struct SCatalog *pCatalog = NULL;
30,251,799✔
683
    int32_t          code = catalogGetHandle(pReq->clusterId, &pCatalog);
30,251,799✔
684
    if (code != TSDB_CODE_SUCCESS) {
30,251,799✔
685
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
×
686
    } else {
687
      hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr);
30,251,799✔
688
    }
689
  }
690

691
  taosHashRelease(pAppHbMgr->activeInfo, pReq);
40,588,207✔
692

693
  return TSDB_CODE_SUCCESS;
40,588,207✔
694
}
695

696
/* Choose execution phase and phase start time for desc.
697
 * Rules:
698
 *  - If both timestamps are invalid (<= 0), keep the request's stored values.
699
 *  - If only one timestamp is valid, use the valid one.
700
 *  - If both are valid, prefer the job's phase/time when jobPhaseTime >= request's phaseStartTime;
701
 *    otherwise prefer the request's record.
702
 *
703
 * Note: If the local system clock was moved backwards (time rollback), jobPhaseTime may appear
704
 * earlier than the request's phase start time even when the job record is actually newer.
705
 * This logic avoids reporting a regressed start time in that situation.
706
 */
707
static void hbSetRequestPhase(SRequestObj *pRequest, SQueryDesc *desc) {
24,649,390✔
708
  int32_t jobPhase = QUERY_PHASE_NONE;
24,649,390✔
709
  int64_t jobPhaseTime = 0;
24,649,390✔
710
  int32_t phaseCode = schedulerGetJobPhase(pRequest->body.queryJob, &jobPhase, &jobPhaseTime);
24,649,390✔
711
  if (phaseCode != TSDB_CODE_SUCCESS) {
24,649,390✔
712
    tscWarn("get job phase failed, code:%d", phaseCode);
6,586,306✔
713
    desc->execPhase = CLIENT_GET_REQUEST_PHASE(pRequest);
6,586,306✔
714
    desc->phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
6,586,306✔
715
    return;
6,586,306✔
716
  }
717

718
  tscDebug("get job phase success, jobPhase:%d, jobPhaseTime:%" PRId64, jobPhase, jobPhaseTime);
18,063,084✔
719
  int64_t phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
18,063,084✔
720
  int32_t phaseStatus = CLIENT_GET_REQUEST_PHASE(pRequest);
18,063,084✔
721

722
  if (jobPhaseTime <= 0 && phaseStartTime <= 0) {
18,063,084✔
723
    /* No valid time available, keep original behavior */
724
    desc->phaseStartTime = phaseStartTime;
×
725
    desc->execPhase = phaseStatus;
×
726
  } else if (jobPhaseTime <= 0) {
18,063,084✔
727
    desc->phaseStartTime = phaseStartTime;
2,285✔
728
    desc->execPhase = phaseStatus;
2,285✔
729
  } else if (phaseStartTime <= 0) {
18,060,799✔
730
    desc->phaseStartTime = jobPhaseTime;
57,282✔
731
    desc->execPhase = jobPhase;
57,282✔
732
  } else if (jobPhaseTime >= phaseStartTime) {
18,003,517✔
733
    /* Job record is newer (or equal, prefer job) */
734
    desc->phaseStartTime = jobPhaseTime;
15,521,357✔
735
    desc->execPhase = jobPhase;
15,521,357✔
736
  } else {
737
    /* Request record is newer */
738
    desc->phaseStartTime = phaseStartTime;
2,482,160✔
739
    desc->execPhase = phaseStatus;
2,482,160✔
740
  }
741
}
742

743
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
33,358,654✔
744
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
33,358,654✔
745
    goto _return;
1,537✔
746
  }
747

748
  static int32_t    emptyRspNum = 0;
749
  int32_t           idx = *(int32_t *)param;
33,357,117✔
750
  SClientHbBatchRsp pRsp = {0};
33,357,117✔
751
  if (TSDB_CODE_SUCCESS == code) {
33,357,117✔
752
    code = tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
33,112,868✔
753
    if (TSDB_CODE_SUCCESS != code) {
33,112,176✔
754
      tscError("deserialize hb rsp failed");
70✔
755
    }
756
    int32_t now = taosGetTimestampSec();
33,112,176✔
757
    int32_t delta = abs(now - pRsp.svrTimestamp);
33,109,356✔
758
    if (delta > tsTimestampDeltaLimit) {
33,109,356✔
759
      code = TSDB_CODE_TIME_UNSYNCED;
70✔
760
      tscError("time diff:%ds is too big", delta);
70✔
761
    }
762
  }
763

764
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
33,353,605✔
765

766
  (void)taosThreadMutexLock(&clientHbMgr.lock);
33,356,538✔
767

768
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
33,357,117✔
769
  if (pAppHbMgr == NULL) {
33,357,117✔
770
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
771
    tscError("appHbMgr not exist, idx:%d", idx);
×
772
    taosMemoryFree(pMsg->pData);
×
773
    taosMemoryFree(pMsg->pEpSet);
×
774
    tFreeClientHbBatchRsp(&pRsp);
775
    return TSDB_CODE_OUT_OF_RANGE;
×
776
  }
777

778
  SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
33,357,117✔
779

780
  if (code != 0) {
33,357,117✔
781
    pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
244,319✔
782
    tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
244,319✔
783
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
244,319✔
784
    taosMemoryFree(pMsg->pData);
244,319✔
785
    taosMemoryFree(pMsg->pEpSet);
244,319✔
786
    tFreeClientHbBatchRsp(&pRsp);
787
    return code;
244,319✔
788
  }
789

790
  pInst->serverCfg.monitorParas = pRsp.monitorParas;
33,112,798✔
791
  pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
33,112,798✔
792
  pInst->serverCfg.enableAuditSelect = pRsp.enableAuditSelect;
33,112,798✔
793
  pInst->serverCfg.enableAuditInsert = pRsp.enableAuditInsert;
33,112,798✔
794
  pInst->serverCfg.auditLevel = pRsp.auditLevel;
33,112,798✔
795
  pInst->serverCfg.enableStrongPass = pRsp.enableStrongPass;
33,112,798✔
796
  pInst->serverCfg.sodInitial = pRsp.sodInitial;
33,112,798✔
797
  pInst->serverCfg.macActive = pRsp.macActive;
33,112,798✔
798
  tsEnableStrongPassword = pInst->serverCfg.enableStrongPass;
33,112,798✔
799

800
  tscDebug("monitor paras from hb, clusterId:0x%" PRIx64 ", threshold:%d scope:%d", pInst->clusterId,
33,112,798✔
801
           pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
802

803
  if (rspNum) {
33,112,798✔
804
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
32,415,894✔
805
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
806
  } else {
807
    (void)atomic_add_fetch_32(&emptyRspNum, 1);
696,904✔
808
  }
809

810
  for (int32_t i = 0; i < rspNum; ++i) {
73,806,836✔
811
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
40,694,038✔
812
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
40,694,038✔
813
    if (code) {
40,694,038✔
814
      break;
×
815
    }
816
  }
817

818
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
33,112,798✔
819

820
  tFreeClientHbBatchRsp(&pRsp);
821

822
_return:
33,114,335✔
823
  taosMemoryFree(pMsg->pData);
33,114,335✔
824
  taosMemoryFree(pMsg->pEpSet);
33,114,335✔
825
  return code;
33,114,335✔
826
}
827

828
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
37,017,219✔
829
  int64_t    now = taosGetTimestampUs();
37,017,219✔
830
  SQueryDesc desc = {0};
37,017,219✔
831
  int32_t    code = 0;
37,017,219✔
832

833
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
37,017,219✔
834
  while (pIter != NULL) {
74,793,569✔
835
    int64_t     *rid = pIter;
37,776,350✔
836
    SRequestObj *pRequest = acquireRequest(*rid);
37,776,350✔
837
    if (NULL == pRequest) {
37,776,350✔
838
      pIter = taosHashIterate(pObj->pRequests, pIter);
29,271✔
839
      continue;
29,271✔
840
    }
841

842
    if (pRequest->killed || 0 == pRequest->body.queryJob) {
37,747,079✔
843
      (void)releaseRequest(*rid);
13,097,689✔
844
      pIter = taosHashIterate(pObj->pRequests, pIter);
13,097,689✔
845
      continue;
13,097,689✔
846
    }
847

848
    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
24,649,390✔
849
    desc.stime = pRequest->metric.start / 1000;
24,649,390✔
850
    desc.queryId = pRequest->requestId;
24,649,390✔
851
    desc.useconds = now - pRequest->metric.start;
24,649,390✔
852
    desc.reqRid = pRequest->self;
24,649,390✔
853
    desc.stableQuery = pRequest->stableQuery;
24,649,390✔
854
    desc.isSubQuery = pRequest->isSubReq;
24,649,390✔
855
    code = taosGetFqdn(desc.fqdn);
24,649,390✔
856
    if (TSDB_CODE_SUCCESS != code) {
24,649,390✔
857
      (void)releaseRequest(*rid);
×
858
      tscError("get fqdn failed");
×
859
      return TSDB_CODE_FAILED;
×
860
    }
861
    desc.subPlanNum = pRequest->body.subplanNum;
24,649,390✔
862

863
    /* Determine and set the execution phase and its start time for this request. */
864
    hbSetRequestPhase(pRequest, &desc);
24,649,390✔
865

866
    if (desc.subPlanNum) {
24,649,390✔
867
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
24,649,390✔
868
      if (NULL == desc.subDesc) {
24,649,390✔
869
        (void)releaseRequest(*rid);
×
870
        return terrno;
×
871
      }
872

873
      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
24,649,390✔
874
      if (code) {
24,649,390✔
875
        taosArrayDestroy(desc.subDesc);
6,659,315✔
876
        desc.subDesc = NULL;
6,659,315✔
877
        code = TSDB_CODE_SUCCESS;
6,659,315✔
878
      }
879
      desc.subPlanNum = taosArrayGetSize(desc.subDesc);
24,649,390✔
880
    } else {
881
      desc.subDesc = NULL;
×
882
    }
883

884
    (void)releaseRequest(*rid);
24,649,390✔
885
    if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) {
49,298,780✔
886
      taosArrayDestroy(desc.subDesc);
×
887
      return terrno;
×
888
    }
889

890
    pIter = taosHashIterate(pObj->pRequests, pIter);
24,649,390✔
891
  }
892

893
  return code;
37,017,219✔
894
}
895

896
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
41,163,008✔
897
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
41,163,008✔
898
  if (NULL == pTscObj) {
41,163,008✔
899
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
×
900
    return terrno;
×
901
  }
902

903
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
41,163,008✔
904
  if (NULL == hbBasic) {
41,163,008✔
905
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
×
906
    releaseTscObj(connKey->tscRid);
×
907
    return terrno;
×
908
  }
909

910
  hbBasic->connId = pTscObj->connId;
41,163,008✔
911

912
  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
41,163,008✔
913
  if (numOfQueries <= 0) {
41,163,008✔
914
    req->query = hbBasic;
4,145,789✔
915
    releaseTscObj(connKey->tscRid);
4,145,789✔
916
    tscDebug("no queries on connection");
4,145,789✔
917
    return TSDB_CODE_SUCCESS;
4,145,789✔
918
  }
919

920
  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
37,017,219✔
921
  if (NULL == hbBasic->queryDesc) {
37,017,219✔
922
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
×
923
    releaseTscObj(connKey->tscRid);
×
924
    taosMemoryFree(hbBasic);
×
925
    return terrno;
×
926
  }
927

928
  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
37,017,219✔
929
  if (code) {
37,017,219✔
930
    releaseTscObj(connKey->tscRid);
×
931
    if (hbBasic->queryDesc) {
×
932
      taosArrayDestroyEx(hbBasic->queryDesc, tFreeClientHbQueryDesc);
×
933
    }
934
    taosMemoryFree(hbBasic);
×
935
    return code;
×
936
  }
937

938
  req->query = hbBasic;
37,017,219✔
939
  releaseTscObj(connKey->tscRid);
37,017,219✔
940

941
  return TSDB_CODE_SUCCESS;
37,017,219✔
942
}
943

944
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
1,120,106✔
945
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
1,120,106✔
946
  if (!pTscObj) {
1,120,106✔
947
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
904✔
948
    return terrno;
904✔
949
  }
950

951
  int32_t code = 0;
1,119,202✔
952

953
  SKv  kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
1,119,202✔
954
  SKv *pKv = NULL;
1,119,202✔
955
  if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
1,119,202✔
956
    int32_t           userNum = pKv->valueLen / sizeof(SUserAuthVersion);
625,734✔
957
    SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
625,734✔
958
    for (int32_t i = 0; i < userNum; ++i) {
640,970✔
959
      SUserAuthVersion *pUserAuth = userAuths + i;
640,970✔
960
      // both key and user exist, update version
961
      if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
640,970✔
962
        pUserAuth->version = htonl(-1);  // force get userAuthInfo
625,734✔
963
        goto _return;
625,734✔
964
      }
965
    }
966
    // key exists, user not exist, append user
UNCOV
967
    SUserAuthVersion *qUserAuth =
×
UNCOV
968
        (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
×
UNCOV
969
    if (qUserAuth) {
×
UNCOV
970
      tstrncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
×
UNCOV
971
      (qUserAuth + userNum)->version = htonl(-1);  // force get userAuthInfo
×
UNCOV
972
      pKv->value = qUserAuth;
×
UNCOV
973
      pKv->valueLen += sizeof(SUserAuthVersion);
×
974
    } else {
975
      code = terrno;
×
976
    }
UNCOV
977
    goto _return;
×
978
  }
979

980
  // key/user not exist, add user
981
  SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
493,468✔
982
  if (!user) {
493,468✔
983
    code = terrno;
×
984
    goto _return;
×
985
  }
986
  tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN);
493,468✔
987
  user->version = htonl(-1);  // force get userAuthInfo
493,468✔
988
  kv.valueLen = sizeof(SUserAuthVersion);
493,468✔
989
  kv.value = user;
493,468✔
990

991
  tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
493,468✔
992
           pTscObj->authVer, connKey->tscRid);
993

994
  if (!req->info) {
493,468✔
995
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
493,468✔
996
    if (NULL == req->info) {
493,468✔
997
      code = terrno;
×
998
      goto _return;
×
999
    }
1000
  }
1001

1002
  if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) {
493,468✔
1003
    taosMemoryFree(user);
×
1004
    code = terrno ? terrno : TSDB_CODE_APP_ERROR;
×
1005
    goto _return;
×
1006
  }
1007

1008
_return:
1,099,812✔
1009
  releaseTscObj(connKey->tscRid);
1,119,202✔
1010
  if (code) {
1,119,202✔
1011
    tscError("hb got user auth info failed since %s", tstrerror(code));
×
1012
  }
1013

1014
  return code;
1,119,202✔
1015
}
1016

1017
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
30,270,726✔
1018
  SUserAuthVersion *users = NULL;
30,270,726✔
1019
  uint32_t          userNum = 0;
30,270,726✔
1020
  int32_t           code = 0;
30,270,726✔
1021

1022
  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
30,270,726✔
1023
  if (TSDB_CODE_SUCCESS != code) {
30,270,726✔
1024
    return code;
×
1025
  }
1026

1027
  if (userNum <= 0) {
30,270,726✔
1028
    taosMemoryFree(users);
214,798✔
1029
    return TSDB_CODE_SUCCESS;
214,798✔
1030
  }
1031

1032
  for (int32_t i = 0; i < userNum; ++i) {
60,674,450✔
1033
    SUserAuthVersion *user = &users[i];
30,618,522✔
1034
    user->version = htonl(user->version);
30,618,522✔
1035
  }
1036

1037
  SKv kv = {
30,055,928✔
1038
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
1039
      .valueLen = sizeof(SUserAuthVersion) * userNum,
30,055,928✔
1040
      .value = users,
1041
  };
1042

1043
  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
30,055,928✔
1044

1045
  if (NULL == req->info) {
30,055,928✔
1046
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
30,055,928✔
1047
    if (NULL == req->info) {
30,055,928✔
1048
      taosMemoryFree(users);
×
1049
      return terrno;
×
1050
    }
1051
  }
1052

1053
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
30,055,928✔
1054
  if (TSDB_CODE_SUCCESS != code) {
30,055,928✔
1055
    taosMemoryFree(users);
×
1056
    return code;
×
1057
  }
1058

1059
  return TSDB_CODE_SUCCESS;
30,055,928✔
1060
}
1061

1062
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
32,699,932✔
1063
  SDbCacheInfo *dbs = NULL;
32,699,932✔
1064
  uint32_t      dbNum = 0;
32,699,932✔
1065
  int32_t       code = 0;
32,699,932✔
1066

1067
  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
32,699,932✔
1068
  if (TSDB_CODE_SUCCESS != code) {
32,699,932✔
1069
    return code;
×
1070
  }
1071

1072
  if (dbNum <= 0) {
32,699,932✔
1073
    taosMemoryFree(dbs);
22,480,101✔
1074
    return TSDB_CODE_SUCCESS;
22,480,101✔
1075
  }
1076

1077
  for (int32_t i = 0; i < dbNum; ++i) {
23,281,207✔
1078
    SDbCacheInfo *db = &dbs[i];
13,061,376✔
1079
    tscDebug("the %dth expired db:%s, dbId:%" PRId64
13,061,376✔
1080
             ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
1081
             i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
1082

1083
    db->dbId = htobe64(db->dbId);
13,061,376✔
1084
    db->vgVersion = htonl(db->vgVersion);
13,061,376✔
1085
    db->cfgVersion = htonl(db->cfgVersion);
13,061,376✔
1086
    db->numOfTable = htonl(db->numOfTable);
13,061,376✔
1087
    db->stateTs = htobe64(db->stateTs);
13,061,376✔
1088
    db->tsmaVersion = htonl(db->tsmaVersion);
13,061,376✔
1089
  }
1090

1091
  SKv kv = {
10,219,831✔
1092
      .key = HEARTBEAT_KEY_DBINFO,
1093
      .valueLen = sizeof(SDbCacheInfo) * dbNum,
10,219,831✔
1094
      .value = dbs,
1095
  };
1096

1097
  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
10,219,831✔
1098

1099
  if (NULL == req->info) {
10,219,831✔
1100
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
148✔
1101
    if (NULL == req->info) {
148✔
1102
      taosMemoryFree(dbs);
×
1103
      return terrno;
×
1104
    }
1105
  }
1106

1107
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
10,219,831✔
1108
  if (TSDB_CODE_SUCCESS != code) {
10,219,831✔
1109
    taosMemoryFree(dbs);
×
1110
    return code;
×
1111
  }
1112

1113
  return TSDB_CODE_SUCCESS;
10,219,831✔
1114
}
1115

1116
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
32,699,932✔
1117
  SSTableVersion *stbs = NULL;
32,699,932✔
1118
  uint32_t        stbNum = 0;
32,699,932✔
1119
  int32_t         code = 0;
32,699,932✔
1120

1121
  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
32,699,932✔
1122
  if (TSDB_CODE_SUCCESS != code) {
32,699,932✔
1123
    return code;
×
1124
  }
1125

1126
  if (stbNum <= 0) {
32,699,932✔
1127
    taosMemoryFree(stbs);
25,390,020✔
1128
    return TSDB_CODE_SUCCESS;
25,390,020✔
1129
  }
1130

1131
  for (int32_t i = 0; i < stbNum; ++i) {
17,326,688✔
1132
    SSTableVersion *stb = &stbs[i];
10,016,776✔
1133
    stb->suid = htobe64(stb->suid);
10,016,776✔
1134
    stb->sversion = htonl(stb->sversion);
10,016,776✔
1135
    stb->tversion = htonl(stb->tversion);
10,016,776✔
1136
    stb->smaVer = htonl(stb->smaVer);
10,016,776✔
1137
  }
1138

1139
  SKv kv = {
7,309,912✔
1140
      .key = HEARTBEAT_KEY_STBINFO,
1141
      .valueLen = sizeof(SSTableVersion) * stbNum,
7,309,912✔
1142
      .value = stbs,
1143
  };
1144

1145
  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
7,309,912✔
1146

1147
  if (NULL == req->info) {
7,309,912✔
1148
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
32✔
1149
    if (NULL == req->info) {
32✔
1150
      taosMemoryFree(stbs);
×
1151
      return terrno;
×
1152
    }
1153
  }
1154

1155
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
7,309,912✔
1156
  if (TSDB_CODE_SUCCESS != code) {
7,309,912✔
1157
    taosMemoryFree(stbs);
×
1158
    return code;
×
1159
  }
1160

1161
  return TSDB_CODE_SUCCESS;
7,309,912✔
1162
}
1163

1164
int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
32,699,932✔
1165
  SViewVersion    *views = NULL;
32,699,932✔
1166
  uint32_t         viewNum = 0;
32,699,932✔
1167
  int32_t          code = 0;
32,699,932✔
1168
  SDynViewVersion *pDynViewVer = NULL;
32,699,932✔
1169

1170
  TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer));
32,699,932✔
1171

1172
  if (viewNum <= 0) {
32,699,932✔
1173
    taosMemoryFree(views);
32,687,924✔
1174
    taosMemoryFree(pDynViewVer);
32,687,924✔
1175
    return TSDB_CODE_SUCCESS;
32,687,924✔
1176
  }
1177

1178
  for (int32_t i = 0; i < viewNum; ++i) {
24,534✔
1179
    SViewVersion *view = &views[i];
12,526✔
1180
    view->dbId = htobe64(view->dbId);
12,526✔
1181
    view->viewId = htobe64(view->viewId);
12,526✔
1182
    view->version = htonl(view->version);
12,526✔
1183
  }
1184

1185
  tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
12,008✔
1186

1187
  if (NULL == req->info) {
12,008✔
1188
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1189
    if (NULL == req->info) {
×
1190
      TSC_ERR_JRET(terrno);
×
1191
    }
1192
  }
1193

1194
  SKv kv = {
12,008✔
1195
      .key = HEARTBEAT_KEY_DYN_VIEW,
1196
      .valueLen = sizeof(SDynViewVersion),
1197
      .value = pDynViewVer,
1198
  };
1199

1200
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
12,008✔
1201

1202
  kv.key = HEARTBEAT_KEY_VIEWINFO;
12,008✔
1203
  kv.valueLen = sizeof(SViewVersion) * viewNum;
12,008✔
1204
  kv.value = views;
12,008✔
1205

1206
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
12,008✔
1207
  return TSDB_CODE_SUCCESS;
12,008✔
1208
_return:
×
1209
  taosMemoryFree(views);
×
1210
  taosMemoryFree(pDynViewVer);
×
1211
  return code;
×
1212
}
1213

1214
int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) {
32,699,932✔
1215
  int32_t       code = 0;
32,699,932✔
1216
  uint32_t      tsmaNum = 0;
32,699,932✔
1217
  STSMAVersion *tsmas = NULL;
32,699,932✔
1218

1219
  code = catalogGetExpiredTsmas(pCatalog, &tsmas, &tsmaNum);
32,699,932✔
1220
  if (code) {
32,699,932✔
1221
    taosMemoryFree(tsmas);
×
1222
    return code;
×
1223
  }
1224

1225
  if (tsmaNum <= 0) {
32,699,932✔
1226
    taosMemoryFree(tsmas);
32,692,386✔
1227
    return TSDB_CODE_SUCCESS;
32,692,386✔
1228
  }
1229

1230
  for (int32_t i = 0; i < tsmaNum; ++i) {
17,864✔
1231
    STSMAVersion *tsma = &tsmas[i];
10,318✔
1232
    tsma->dbId = htobe64(tsma->dbId);
10,318✔
1233
    tsma->tsmaId = htobe64(tsma->tsmaId);
10,318✔
1234
    tsma->version = htonl(tsma->version);
10,318✔
1235
  }
1236

1237
  tscDebug("hb got %d expred tsmas, valueLen:%lu", tsmaNum, sizeof(STSMAVersion) * tsmaNum);
7,546✔
1238

1239
  if (!pReq->info) {
7,546✔
1240
    pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1241
    if (!pReq->info) {
×
1242
      taosMemoryFree(tsmas);
×
1243
      return terrno;
×
1244
    }
1245
  }
1246

1247
  SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas};
7,546✔
1248
  code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
7,546✔
1249
  if (TSDB_CODE_SUCCESS != code) {
7,546✔
1250
    taosMemoryFree(tsmas);
×
1251
    return code;
×
1252
  }
1253
  return TSDB_CODE_SUCCESS;
7,546✔
1254
}
1255

1256
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
41,163,008✔
1257
  SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
41,163,008✔
1258
  if (NULL != pApp) {
41,163,008✔
1259
    (void)memcpy(&req->app, pApp, sizeof(*pApp));
41,163,008✔
1260
  } else {
1261
    (void)memset(&req->app.summary, 0, sizeof(req->app.summary));
×
1262
    req->app.pid = taosGetPId();
×
1263
    req->app.appId = clientHbMgr.appId;
×
1264
    TSC_ERR_RET(taosGetAppName(req->app.name, NULL));
×
1265
  }
1266

1267
  return TSDB_CODE_SUCCESS;
41,163,008✔
1268
}
1269

1270
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
41,163,008✔
1271
  int32_t   code = 0;
41,163,008✔
1272
  SHbParam *hbParam = (SHbParam *)param;
41,163,008✔
1273
  SCatalog *pCatalog = NULL;
41,163,008✔
1274

1275
  code = hbGetQueryBasicInfo(connKey, req);
41,163,008✔
1276
  if (code != TSDB_CODE_SUCCESS) {
41,163,008✔
1277
    tscWarn("hbGetQueryBasicInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1278
    return code;
×
1279
  }
1280

1281
  if (hbParam->reqCnt == 0) {
41,163,008✔
1282
    code = catalogGetHandle(hbParam->clusterId, &pCatalog);
32,700,836✔
1283
    if (code != TSDB_CODE_SUCCESS) {
32,700,836✔
1284
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1285
      return code;
×
1286
    }
1287

1288
    code = hbGetAppInfo(hbParam->clusterId, req);
32,700,836✔
1289
    if (TSDB_CODE_SUCCESS != code) {
32,700,836✔
1290
      tscWarn("getAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1291
      return code;
×
1292
    }
1293

1294
    if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
32,700,836✔
1295
      code = hbGetExpiredUserInfo(connKey, pCatalog, req);
30,270,726✔
1296
      if (TSDB_CODE_SUCCESS != code) {
30,270,726✔
1297
        tscWarn("hbGetExpiredUserInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1298
        return code;
×
1299
      }
1300
      if (clientHbMgr.appHbHash) {
30,270,726✔
1301
        code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
3,983,241✔
1302
        if (TSDB_CODE_SUCCESS != code) {
3,983,241✔
1303
          tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId,
×
1304
                  tstrerror(code));
1305
          return code;
×
1306
        }
1307
      }
1308
    }
1309

1310
    // invoke after hbGetExpiredUserInfo
1311
    if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
32,700,836✔
1312
      code = hbGetUserAuthInfo(connKey, hbParam, req);
1,120,106✔
1313
      if (TSDB_CODE_SUCCESS != code) {
1,120,106✔
1314
        tscWarn("hbGetUserAuthInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
904✔
1315
        return code;
904✔
1316
      }
1317
      atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
1,119,202✔
1318
    }
1319

1320
    code = hbGetExpiredDBInfo(connKey, pCatalog, req);
32,699,932✔
1321
    if (TSDB_CODE_SUCCESS != code) {
32,699,932✔
1322
      tscWarn("hbGetExpiredDBInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1323
      return code;
×
1324
    }
1325

1326
    code = hbGetExpiredStbInfo(connKey, pCatalog, req);
32,699,932✔
1327
    if (TSDB_CODE_SUCCESS != code) {
32,699,932✔
1328
      tscWarn("hbGetExpiredStbInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1329
      return code;
×
1330
    }
1331

1332
#ifdef TD_ENTERPRISE
1333
    code = hbGetExpiredViewInfo(connKey, pCatalog, req);
32,699,932✔
1334
    if (TSDB_CODE_SUCCESS != code) {
32,699,932✔
1335
      tscWarn("hbGetExpiredViewInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1336
      return code;
×
1337
    }
1338
#endif
1339
    code = hbGetExpiredTSMAInfo(connKey, pCatalog, req);
32,699,932✔
1340
    if (TSDB_CODE_SUCCESS != code) {
32,699,932✔
1341
      tscWarn("hbGetExpiredTSMAInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1342
      return code;
×
1343
    }
1344
  } else {
1345
    code = hbGetAppInfo(hbParam->clusterId, req);
8,462,172✔
1346
    if (TSDB_CODE_SUCCESS != code) {
8,462,172✔
1347
      tscWarn("hbGetAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1348
      return code;
×
1349
    }
1350
  }
1351

1352
  ++hbParam->reqCnt;  // success to get catalog info
41,162,104✔
1353

1354
  return TSDB_CODE_SUCCESS;
41,162,104✔
1355
}
1356

1357
static FORCE_INLINE void hbMgrInitHandle() {
1358
  // init all handle
1359
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
1,606,411✔
1360
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbQueryHbReqHandle;
1,606,411✔
1361

1362
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
1,606,411✔
1363
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbQueryHbRspHandle;
1,606,411✔
1364
}
1,606,411✔
1365

1366
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
34,145,297✔
1367
  *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
34,145,297✔
1368
  if (pBatchReq == NULL) {
34,145,297✔
1369
    return terrno;
×
1370
  }
1371
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
34,145,297✔
1372
  (*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
34,145,297✔
1373
  if (!(*pBatchReq)->reqs) {
34,145,297✔
1374
    tFreeClientHbBatchReq(*pBatchReq);
×
1375
    return terrno;
×
1376
  }
1377

1378
  int64_t  maxIpWhiteVer = 0;
34,145,297✔
1379
  void    *pIter = NULL;
34,145,297✔
1380
  SHbParam param = {0};
34,145,297✔
1381
  while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
78,230,959✔
1382
    SClientHbReq *pOneReq = pIter;
44,085,662✔
1383
    SClientHbKey *connKey = &pOneReq->connKey;
44,085,662✔
1384
    STscObj      *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
44,085,662✔
1385

1386
    if (!pTscObj || atomic_load_8(&pTscObj->dropped) == 1) {
44,085,662✔
1387
      if (pTscObj) releaseTscObj(connKey->tscRid);
2,922,654✔
1388
      continue;
2,922,654✔
1389
    }
1390

1391
    tstrncpy(pOneReq->userApp, pTscObj->optionInfo.userApp, sizeof(pOneReq->userApp));
41,163,008✔
1392
    tstrncpy(pOneReq->cInfo, pTscObj->optionInfo.cInfo, sizeof(pOneReq->cInfo));
41,163,008✔
1393
    pOneReq->userIp = pTscObj->optionInfo.userIp;
41,163,008✔
1394
    pOneReq->userDualIp = pTscObj->optionInfo.userDualIp;
41,163,008✔
1395
    tstrncpy(pOneReq->sVer, td_version, TSDB_VERSION_LEN);
41,163,008✔
1396

1397
    pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
41,163,008✔
1398
    if (NULL == pOneReq) {
41,163,008✔
1399
      releaseTscObj(connKey->tscRid);
×
1400
      continue;
×
1401
    }
1402

1403
    switch (connKey->connType) {
41,163,008✔
1404
      case CONN_TYPE__QUERY:
41,163,008✔
1405
      case CONN_TYPE__TMQ: {
1406
        if (param.clusterId == 0) {
41,163,008✔
1407
          // init
1408
          param.clusterId = pOneReq->clusterId;
32,700,761✔
1409
          param.pAppHbMgr = pAppHbMgr;
32,700,761✔
1410
          param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
32,700,761✔
1411
        }
1412
        break;
41,163,008✔
1413
      }
1414
      default:
×
1415
        break;
×
1416
    }
1417
    if (clientHbMgr.reqHandle[connKey->connType]) {
41,163,008✔
1418
      int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, &param, pOneReq);
41,163,008✔
1419
      if (code) {
41,163,008✔
1420
        tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
904✔
1421
                connKey->tscRid, connKey->connType);
1422
      }
1423
    }
1424

1425
    int64_t ver = atomic_load_64(&pTscObj->whiteListInfo.ver);
41,163,008✔
1426
    maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
41,163,008✔
1427
    releaseTscObj(connKey->tscRid);
41,163,008✔
1428
  }
1429
  (*pBatchReq)->ipWhiteListVer = maxIpWhiteVer;
34,145,297✔
1430

1431
  return TSDB_CODE_SUCCESS;
34,145,297✔
1432
}
1433

1434
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
×
1435

1436
void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
5,315,794✔
1437
  dst->numOfInsertsReq += src->numOfInsertsReq;
5,315,794✔
1438
  dst->numOfInsertRows += src->numOfInsertRows;
5,315,794✔
1439
  dst->insertElapsedTime += src->insertElapsedTime;
5,315,794✔
1440
  dst->insertBytes += src->insertBytes;
5,315,794✔
1441
  dst->fetchBytes += src->fetchBytes;
5,315,794✔
1442
  dst->queryElapsedTime += src->queryElapsedTime;
5,315,794✔
1443
  dst->numOfSlowQueries += src->numOfSlowQueries;
5,315,794✔
1444
  dst->totalRequests += src->totalRequests;
5,315,794✔
1445
  dst->currentRequests += src->currentRequests;
5,315,794✔
1446
}
5,315,794✔
1447

1448
int32_t hbGatherAppInfo(void) {
32,816,196✔
1449
  SAppHbReq req = {0};
32,816,196✔
1450
  int32_t   code = TSDB_CODE_SUCCESS;
32,816,196✔
1451
  int       sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
32,816,196✔
1452
  if (sz > 0) {
32,816,196✔
1453
    req.pid = taosGetPId();
32,816,196✔
1454
    req.appId = clientHbMgr.appId;
32,816,196✔
1455
    TSC_ERR_RET(taosGetAppName(req.name, NULL));
32,816,196✔
1456
  }
1457

1458
  taosHashClear(clientHbMgr.appSummary);
32,816,196✔
1459

1460
  for (int32_t i = 0; i < sz; ++i) {
70,978,117✔
1461
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
38,161,921✔
1462
    if (pAppHbMgr == NULL) continue;
38,161,921✔
1463

1464
    int64_t    clusterId = pAppHbMgr->pAppInstInfo->clusterId;
38,161,921✔
1465
    SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
38,161,921✔
1466
    if (NULL == pApp) {
38,161,921✔
1467
      (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
32,846,127✔
1468
      req.startTime = pAppHbMgr->startTime;
32,846,127✔
1469
      TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)));
32,846,127✔
1470
    } else {
1471
      if (pAppHbMgr->startTime < pApp->startTime) {
5,315,794✔
1472
        pApp->startTime = pAppHbMgr->startTime;
×
1473
      }
1474

1475
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
5,315,794✔
1476
    }
1477
  }
1478

1479
  return TSDB_CODE_SUCCESS;
32,816,196✔
1480
}
1481

1482
static void *hbThreadFunc(void *param) {
1,606,411✔
1483
  setThreadName("hb");
1,606,411✔
1484
#ifdef WINDOWS
1485
  if (taosCheckCurrentInDll()) {
1486
    atexit(hbThreadFuncUnexpectedStopped);
1487
  }
1488
#endif
1489
  while (1) {
31,876,156✔
1490
    if (1 == clientHbMgr.threadStop) {
33,482,567✔
1491
      break;
661,023✔
1492
    }
1493

1494
    if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) {
32,821,544✔
1495
      tscError("taosThreadMutexLock failed");
×
1496
      return NULL;
×
1497
    }
1498

1499
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
32,821,544✔
1500
    if (sz > 0) {
32,821,544✔
1501
      if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) {
32,816,196✔
1502
        tscError("hbGatherAppInfo failed");
×
1503
        return NULL;
×
1504
      }
1505
      if (sz > 1 && !clientHbMgr.appHbHash) {
32,816,196✔
1506
        clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
63,340✔
1507
        if (NULL == clientHbMgr.appHbHash) {
63,340✔
1508
          tscError("taosHashInit failed");
×
1509
          return NULL;
×
1510
        }
1511
      }
1512
      taosHashClear(clientHbMgr.appHbHash);
32,816,196✔
1513
    }
1514

1515
    for (int i = 0; i < sz; i++) {
70,983,465✔
1516
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
38,161,921✔
1517
      if (pAppHbMgr == NULL) {
38,161,921✔
1518
        continue;
54,650✔
1519
      }
1520

1521
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
38,161,921✔
1522
      if (connCnt == 0) {
38,161,921✔
1523
        continue;
4,016,624✔
1524
      }
1525
      SClientHbBatchReq *pReq = NULL;
34,145,297✔
1526
      int32_t            code = hbGatherAllInfo(pAppHbMgr, &pReq);
34,145,297✔
1527
      if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
34,145,297✔
1528
        terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
×
1529
        tFreeClientHbBatchReq(pReq);
×
1530
        continue;
×
1531
      }
1532
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
34,145,297✔
1533
      if (tlen == -1) {
34,145,297✔
1534
        tFreeClientHbBatchReq(pReq);
×
1535
        break;
×
1536
      }
1537
      void *buf = taosMemoryMalloc(tlen);
34,145,297✔
1538
      if (buf == NULL) {
34,145,297✔
1539
        tFreeClientHbBatchReq(pReq);
×
1540
        // hbClearReqInfo(pAppHbMgr);
1541
        break;
×
1542
      }
1543

1544
      if (tSerializeSClientHbBatchReq(buf, tlen, pReq) == -1) {
34,145,297✔
1545
        tFreeClientHbBatchReq(pReq);
×
1546
        taosMemoryFree(buf);
×
1547
        break;
×
1548
      }
1549
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
34,145,297✔
1550

1551
      if (pInfo == NULL) {
34,145,297✔
1552
        tFreeClientHbBatchReq(pReq);
×
1553
        // hbClearReqInfo(pAppHbMgr);
1554
        taosMemoryFree(buf);
×
1555
        break;
×
1556
      }
1557
      pInfo->fp = hbAsyncCallBack;
34,145,297✔
1558
      pInfo->msgInfo.pData = buf;
34,145,297✔
1559
      pInfo->msgInfo.len = tlen;
34,145,297✔
1560
      pInfo->msgType = TDMT_MND_HEARTBEAT;
34,145,297✔
1561
      pInfo->param = taosMemoryMalloc(sizeof(int32_t));
34,145,297✔
1562
      if (pInfo->param  == NULL) {
34,145,297✔
1563
        tFreeClientHbBatchReq(pReq);
×
1564
        // hbClearReqInfo(pAppHbMgr);
1565
        taosMemoryFree(buf);
×
1566
        taosMemoryFree(pInfo);
×
1567
        break;
×
1568
      }
1569
      *(int32_t *)pInfo->param = i;
34,145,297✔
1570
      pInfo->paramFreeFp = taosAutoMemoryFree;
34,145,297✔
1571
      pInfo->requestId = generateRequestId();
34,145,297✔
1572
      pInfo->requestObjRefId = 0;
34,145,297✔
1573

1574
      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
34,145,297✔
1575
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
34,145,297✔
1576
      if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
34,145,297✔
1577
        tscWarn("failed to async send msg to server");
755,619✔
1578
      }
1579
      tFreeClientHbBatchReq(pReq);
34,145,297✔
1580
      // hbClearReqInfo(pAppHbMgr);
1581
      (void)atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
34,145,297✔
1582
    }
1583

1584
    if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) {
32,821,544✔
1585
      tscError("taosThreadMutexLock failed");
×
1586
      return NULL;
×
1587
    }
1588
    taosMsleep(HEARTBEAT_INTERVAL);
32,821,544✔
1589
  }
1590
  taosHashCleanup(clientHbMgr.appHbHash);
661,023✔
1591
  return NULL;
661,023✔
1592
}
1593

1594
static int32_t hbCreateThread() {
1,606,411✔
1595
  int32_t      code = TSDB_CODE_SUCCESS;
1,606,411✔
1596
  TdThreadAttr thAttr;
1,591,640✔
1597
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
1,606,411✔
1598
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
1,606,411✔
1599
#ifdef TD_COMPACT_OS
1600
  TSC_ERR_JRET(taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL));
1601
#endif
1602

1603
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
1,606,411✔
1604
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1605
    TSC_ERR_RET(terrno);
×
1606
  }
1607
  (void)taosThreadAttrDestroy(&thAttr);
1,606,411✔
1608
_return:
1,606,411✔
1609

1610
  if (code) {
1,606,411✔
1611
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1612
    TSC_ERR_RET(terrno);
×
1613
  }
1614

1615
  return code;
1,606,411✔
1616
}
1617

1618
static void hbStopThread() {
1,607,729✔
1619
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
1,607,729✔
1620
    return;
2,171✔
1621
  }
1622
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
1,605,558✔
1623
    tscDebug("hb thread already stopped");
×
1624
    return;
×
1625
  }
1626

1627
  int32_t code = TSDB_CODE_SUCCESS;
1,605,558✔
1628
  // thread quit mode kill or inner exit from self-thread
1629
  if (clientHbMgr.quitByKill) {
1,605,558✔
1630
    code = taosThreadKill(clientHbMgr.thread, 0);
1,087,934✔
1631
    if (TSDB_CODE_SUCCESS != code) {
1,087,934✔
1632
      tscError("taosThreadKill failed since %s", tstrerror(code));
×
1633
    }
1634
  } else {
1635
    code = taosThreadJoin(clientHbMgr.thread, NULL);
517,624✔
1636
    if (TSDB_CODE_SUCCESS != code) {
517,624✔
1637
      tscError("taosThreadJoin failed since %s", tstrerror(code));
×
1638
    }
1639
  }
1640

1641
  tscDebug("hb thread stopped");
1,605,558✔
1642
}
1643

1644
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) {
1,707,035✔
1645
  int32_t code = TSDB_CODE_SUCCESS;
1,707,035✔
1646
  TSC_ERR_RET(hbMgrInit());
1,707,035✔
1647
  *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
1,707,035✔
1648
  if (*pAppHbMgr == NULL) {
1,707,035✔
1649
    TSC_ERR_JRET(terrno);
×
1650
  }
1651
  // init stat
1652
  (*pAppHbMgr)->startTime = taosGetTimestampMs();
3,395,160✔
1653
  (*pAppHbMgr)->connKeyCnt = 0;
1,707,035✔
1654
  (*pAppHbMgr)->connHbFlag = 0;
1,707,035✔
1655
  (*pAppHbMgr)->reportCnt = 0;
1,707,035✔
1656
  (*pAppHbMgr)->reportBytes = 0;
1,707,035✔
1657
  (*pAppHbMgr)->key = taosStrdup(key);
1,707,035✔
1658
  if ((*pAppHbMgr)->key == NULL) {
1,707,035✔
1659
    TSC_ERR_JRET(terrno);
×
1660
  }
1661

1662
  // init app info
1663
  (*pAppHbMgr)->pAppInstInfo = pAppInstInfo;
1,707,035✔
1664

1665
  // init hash info
1666
  (*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1,707,035✔
1667

1668
  if ((*pAppHbMgr)->activeInfo == NULL) {
1,707,035✔
1669
    TSC_ERR_JRET(terrno);
×
1670
  }
1671

1672
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
1673

1674
  TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
1,707,035✔
1675
  if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
3,414,070✔
1676
    code = terrno;
×
1677
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
1678
    goto _return;
×
1679
  }
1680
  (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
1,707,035✔
1681
  TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
1,707,035✔
1682

1683
  return TSDB_CODE_SUCCESS;
1,707,035✔
1684
_return:
×
1685
  taosMemoryFree(*pAppHbMgr);
×
1686
  return code;
×
1687
}
1688

1689
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
1,706,182✔
1690
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
1,706,182✔
1691
  while (pIter != NULL) {
1,912,055✔
1692
    SClientHbReq *pOneReq = pIter;
205,873✔
1693
    tFreeClientHbReq(pOneReq);
1694
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
205,873✔
1695
  }
1696
  taosHashCleanup(pTarget->activeInfo);
1,706,182✔
1697
  pTarget->activeInfo = NULL;
1,706,182✔
1698

1699
  taosMemoryFree(pTarget->key);
1,706,182✔
1700
  taosMemoryFree(pTarget);
1,706,182✔
1701
}
1,706,182✔
1702

1703
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
1,706,182✔
1704
  int32_t code = TSDB_CODE_SUCCESS;
1,706,182✔
1705
  code = taosThreadMutexLock(&clientHbMgr.lock);
1,706,182✔
1706
  if (TSDB_CODE_SUCCESS != code) {
1,706,182✔
1707
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1708
  }
1709
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,706,182✔
1710
  for (int32_t i = 0; i < mgrSize; ++i) {
1,706,182✔
1711
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
×
1712
    if (pItem == *pAppHbMgr) {
×
1713
      hbFreeAppHbMgr(*pAppHbMgr);
×
1714
      *pAppHbMgr = NULL;
×
1715
      taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
×
1716
      break;
×
1717
    }
1718
  }
1719
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,706,182✔
1720
  if (TSDB_CODE_SUCCESS != code) {
1,706,182✔
1721
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1722
  }
1723
}
1,706,182✔
1724

1725
void appHbMgrCleanup(void) {
1,605,558✔
1726
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,605,558✔
1727
  for (int i = 0; i < sz; i++) {
3,311,740✔
1728
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
1,706,182✔
1729
    if (pTarget == NULL) continue;
1,706,182✔
1730
    hbFreeAppHbMgr(pTarget);
1,706,182✔
1731
  }
1732
}
1,605,558✔
1733

1734
int32_t hbMgrInit() {
1,707,035✔
1735
  // init once
1736
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
1,707,035✔
1737
  if (old == 1) return 0;
1,707,035✔
1738

1739
  clientHbMgr.appId = tGenIdPI64();
1,606,411✔
1740
  tscInfo("app initialized, appId:0x%" PRIx64, clientHbMgr.appId);
1,606,411✔
1741

1742
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,606,411✔
1743
  if (NULL == clientHbMgr.appSummary) {
1,606,411✔
1744
    uError("hbMgrInit:taosHashInit error") return terrno;
×
1745
  }
1746
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
1,606,411✔
1747
  if (NULL == clientHbMgr.appHbMgrs) {
1,606,411✔
1748
    uError("hbMgrInit:taosArrayInit error") return terrno;
×
1749
  }
1750
  TdThreadMutexAttr attr = {0};
1,606,411✔
1751

1752
  int ret = taosThreadMutexAttrInit(&attr);
1,606,411✔
1753
  if (ret != 0) {
1,606,411✔
1754
    uError("hbMgrInit:taosThreadMutexAttrInit error") return ret;
×
1755
  }
1756

1757
  ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
1,606,411✔
1758
  if (ret != 0) {
1,606,411✔
1759
    uError("hbMgrInit:taosThreadMutexAttrSetType error") return ret;
×
1760
  }
1761

1762
  ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
1,606,411✔
1763
  if (ret != 0) {
1,606,411✔
1764
    uError("hbMgrInit:taosThreadMutexInit error") return ret;
×
1765
  }
1766

1767
  ret = taosThreadMutexAttrDestroy(&attr);
1,606,411✔
1768
  if (ret != 0) {
1,606,411✔
1769
    uError("hbMgrInit:taosThreadMutexAttrDestroy error") return ret;
×
1770
  }
1771

1772
  // init handle funcs
1773
  hbMgrInitHandle();
1774

1775
  // init backgroud thread
1776
  ret = hbCreateThread();
1,606,411✔
1777
  if (ret != 0) {
1,606,411✔
1778
    uError("hbMgrInit:hbCreateThread error") return ret;
×
1779
  }
1780

1781
  return 0;
1,606,411✔
1782
}
1783

1784
void hbMgrCleanUp() {
1,607,729✔
1785
  hbStopThread();
1,607,729✔
1786

1787
  // destroy all appHbMgr
1788
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
1,607,729✔
1789
  if (old == 0) return;
1,607,729✔
1790

1791
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
1,605,558✔
1792
  if (TSDB_CODE_SUCCESS != code) {
1,605,558✔
1793
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1794
  }
1795
  appHbMgrCleanup();
1,605,558✔
1796
  taosArrayDestroy(clientHbMgr.appHbMgrs);
1,605,558✔
1797
  clientHbMgr.appHbMgrs = NULL;
1,605,558✔
1798
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,605,558✔
1799
  if (TSDB_CODE_SUCCESS != code) {
1,605,558✔
1800
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1801
  }
1802
}
1803

1804
int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, const char* user, const char* tokenName, int64_t clusterId) {
87,922,167✔
1805
  // init hash in activeinfo
1806
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
87,922,167✔
1807
  if (data != NULL) {
87,922,167✔
1808
    return 0;
×
1809
  }
1810
  SClientHbReq hbReq = {0};
87,922,167✔
1811
  hbReq.connKey = connKey;
87,922,167✔
1812
  hbReq.clusterId = clusterId;
87,922,167✔
1813
  tstrncpy(hbReq.user, user, sizeof(hbReq.user));
87,922,167✔
1814
  tstrncpy(hbReq.tokenName, tokenName, sizeof(hbReq.tokenName));
87,922,167✔
1815
  // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1816

1817
  TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)));
87,922,167✔
1818

1819
  (void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
87,922,167✔
1820
  return 0;
87,922,167✔
1821
}
1822

1823
int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, const char* user, const char* tokenName, int64_t clusterId, int8_t connType) {
87,922,167✔
1824
  SClientHbKey connKey = {
87,922,167✔
1825
      .tscRid = tscRefId,
1826
      .connType = connType,
1827
  };
1828

1829
  switch (connType) {
87,922,167✔
1830
    case CONN_TYPE__QUERY:
87,922,167✔
1831
    case CONN_TYPE__TMQ: {
1832
      return hbRegisterConnImpl(pAppHbMgr, connKey, user, tokenName, clusterId);
87,922,167✔
1833
    }
1834
    default:
×
1835
      return 0;
×
1836
  }
1837
}
1838

1839
void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
87,724,507✔
1840
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
87,724,507✔
1841
  if (TSDB_CODE_SUCCESS != code) {
87,727,586✔
1842
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1843
  }
1844
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
87,727,586✔
1845
  if (pAppHbMgr) {
87,727,586✔
1846
    SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
87,727,586✔
1847
    if (pReq) {
87,727,586✔
1848
      tFreeClientHbReq(pReq);
1849
      code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
87,708,617✔
1850
      if (TSDB_CODE_SUCCESS != code) {
87,708,617✔
1851
        tscError("hbDeregisterConn taosHashRemove error, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1852
      }
1853
      taosHashRelease(pAppHbMgr->activeInfo, pReq);
87,708,617✔
1854
      (void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
87,708,617✔
1855
    }
1856
  }
1857
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
87,727,586✔
1858
  if (TSDB_CODE_SUCCESS != code) {
87,727,586✔
1859
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1860
  }
1861
}
87,727,586✔
1862

1863
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
1864
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
1,089,446✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc