• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.33
/source/client/src/clientHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "scheduler.h"
22
#include "tglobal.h"
23
#include "trpc.h"
24

25
typedef struct {
26
  union {
27
    struct {
28
      SAppHbMgr *pAppHbMgr;
29
      int64_t    clusterId;
30
      int32_t    reqCnt;
31
      int8_t     connHbFlag;
32
    };
33
  };
34
} SHbParam;
35

36
SClientHbMgr clientHbMgr = {0};
37

38
static int32_t hbCreateThread();
39
static void    hbStopThread();
40
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg);
41
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
42

43
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
26,715,827✔
44
                                        SAppHbMgr *pAppHbMgr) {
45
  int32_t code = TSDB_CODE_SUCCESS;
26,715,827✔
46

47
  SUserAuthBatchRsp batchRsp = {0};
26,715,827✔
48
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
26,715,827✔
49
    TSC_ERR_JRET(TSDB_CODE_INVALID_MSG);
×
50
  }
51

52
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
26,715,827✔
53
  for (int32_t i = 0; i < numOfBatchs; ++i) {
53,841,738✔
54
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
27,125,911✔
55
    if (NULL == rsp) {
27,125,911✔
56
      code = terrno;
×
57
      goto _return;
×
58
    }
59
    tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
27,125,911✔
60

61
    TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp));
27,125,911✔
62

63
  }
64

65
  if (numOfBatchs > 0) {
26,715,827✔
66
    TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp));
26,715,827✔
67
  }
68

69
  (void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
26,715,827✔
70

71
_return:
26,715,827✔
72
  tFreeSUserAuthBatchRsp(&batchRsp);
26,715,827✔
73
  return code;
26,715,827✔
74
}
75

76
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg) {
36,852,723✔
77
  int32_t code = 0;
36,852,723✔
78
  int32_t lino = 0;
36,852,723✔
79
  if (user == NULL || pCfg == NULL) {
36,852,723✔
80
    return code;
×
81
  }
82

83
  SUserSessCfg cfg = {0, 0, 0, 0, 0};
36,852,723✔
84

85
  if (memcmp(pCfg, &cfg, sizeof(SUserSessCfg)) == 0) {
36,852,723✔
86
    return TSDB_CODE_SUCCESS;
×
87
  }
88
  tscDebug(
36,852,723✔
89
      "update session metric for user:%s, sessPerUser:%d, sessConnTime:%d, sessConnIdleTime:%d, sessMaxConcurrency:%d, "
90
      "sessMaxCallVnodeNum:%d",
91
      user, pCfg->sessPerUser, pCfg->sessConnTime, pCfg->sessConnIdleTime, pCfg->sessMaxConcurrency,
92
      pCfg->sessMaxCallVnodeNum);
93

94
  if (pCfg->sessPerUser != 0) {
36,852,723✔
95
    code = sessMgtUpdataLimit((char *)user, SESSION_PER_USER, pCfg->sessPerUser);
36,852,723✔
96
    TAOS_CHECK_GOTO(code, &lino, _error);
36,852,723✔
97
  }
98

99
  if (pCfg->sessConnTime != 0) {
36,852,723✔
100
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_TIME, pCfg->sessConnTime);
36,852,723✔
101
    TAOS_CHECK_GOTO(code, &lino, _error);
36,852,723✔
102
  }
103

104
  if (pCfg->sessConnIdleTime != 0) {
36,852,723✔
105
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_IDLE_TIME, pCfg->sessConnIdleTime);
36,852,723✔
106
    TAOS_CHECK_GOTO(code, &lino, _error);
36,852,723✔
107
  }
108

109
  if (pCfg->sessMaxConcurrency != 0) {
36,852,723✔
110
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CONCURRENCY, pCfg->sessMaxConcurrency);
36,852,723✔
111
    TAOS_CHECK_GOTO(code, &lino, _error);
36,852,723✔
112
  }
113

114
  if (pCfg->sessMaxCallVnodeNum != 0) {
36,852,723✔
115
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CALL_VNODE_NUM, pCfg->sessMaxCallVnodeNum);
36,852,723✔
116
    TAOS_CHECK_GOTO(code, &lino, _error);
36,852,723✔
117
  }
118
_error:
36,852,723✔
119
  return code;
36,852,723✔
120
}
121
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
26,715,827✔
122
  int32_t code = 0;
26,715,827✔
123
  int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
26,715,827✔
124
  for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
57,792,457✔
125
    SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
31,076,630✔
126
    if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
31,076,630✔
127
      continue;
78,146✔
128
    }
129

130
    SClientHbReq    *pReq = NULL;
30,998,484✔
131
    SGetUserAuthRsp *pRsp = NULL;
30,998,484✔
132
    while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
69,050,357✔
133
      STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
38,113,049✔
134
      if (!pTscObj) {
38,113,049✔
135
        continue;
1,196,738✔
136
      }
137

138
      if (!pRsp) {
36,916,311✔
139
        for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
29,283,313✔
140
          SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
29,222,137✔
141
          if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
29,222,137✔
142
            pRsp = rsp;
28,816,364✔
143
            break;
28,816,364✔
144
          }
145
        }
146
        if (!pRsp) {
28,877,540✔
147
          releaseTscObj(pReq->connKey.tscRid);
61,176✔
148
          taosHashCancelIterate(hbMgr->activeInfo, pReq);
61,176✔
149
          break;
61,176✔
150
        }
151
      }
152

153
      if (pRsp->dropped == 1) {
36,855,135✔
154
        if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
2,090✔
155
          if (pTscObj->userDroppedInfo.fp) {
1,589✔
156
            SPassInfo *dropInfo = &pTscObj->userDroppedInfo;
950✔
157
            if (dropInfo->fp) {
950✔
158
              (*dropInfo->fp)(dropInfo->param, NULL, TAOS_NOTIFY_USER_DROPPED);
950✔
159
            }
160
          }
161
        }
162
        releaseTscObj(pReq->connKey.tscRid);
2,090✔
163
        continue;
2,090✔
164
      }
165

166
      // update token status
167
      if (pTscObj->tokenName[0] != 0) {
36,853,045✔
168
        STokenStatus *status = NULL;
322✔
169
        if (pRsp->tokens != NULL) {
322✔
170
          status = taosHashGet(pRsp->tokens, pTscObj->tokenName, strlen(pTscObj->tokenName) + 1);
×
171
        }
172

173
        STokenEvent event = { 0 };
322✔
174
        tstrncpy(event.tokenName, pTscObj->tokenName, sizeof(event.tokenName));
322✔
175
        if (status == NULL) {
322✔
176
          event.type = TSDB_TOKEN_EVENT_DROPPED;
322✔
177
        } else if (status->enabled == 0) {
×
178
          event.type = TSDB_TOKEN_EVENT_DISABLED;
×
179
        } else if (status->expireTime > 0 && status->expireTime < taosGetTimestampSec()) {
×
180
          event.type = TSDB_TOKEN_EVENT_EXPIRED;
×
181
        } else {
182
          event.type = TSDB_TOKEN_EVENT_MODIFIED;
×
183
          event.expireTime = status->expireTime;
×
184
        }
185

186
        STokenNotifyInfo *tni = &pTscObj->tokenNotifyInfo;
322✔
187
        if (event.type == TSDB_TOKEN_EVENT_MODIFIED) {
322✔
188
          int32_t oldExpireTime;
189
          do {
190
            oldExpireTime = atomic_load_32(&pTscObj->tokenExpireTime);
×
191
            if (oldExpireTime == status->expireTime) {
×
192
              break;
×
193
            }
194
          } while (atomic_val_compare_exchange_32(&pTscObj->tokenExpireTime, oldExpireTime, status->expireTime) != oldExpireTime);
×
195

196
          if (oldExpireTime != status->expireTime && tni->fp) {
×
197
            (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
198
          }
199
          tscDebug("update token %s of user %s: expire time from %d to %d, conn:%" PRIi64, pTscObj->tokenName,
×
200
                   pRsp->user, pTscObj->tokenExpireTime, status->expireTime, pTscObj->id);
201
        } else {
202
          if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
322✔
203
            if (tni->fp) {
322✔
204
              (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
205
            }
206
          }
207

208
          releaseTscObj(pReq->connKey.tscRid);
322✔
209
          continue;
322✔
210
        }
211
      }
212

213
      pTscObj->authVer = pRsp->version;
36,852,723✔
214
      if (hbUpdateUserSessMertric(pTscObj->user, &pRsp->sessCfg) != 0) {
36,852,723✔
215
        tscError("failed to update user session metric, user:%s", pTscObj->user);
×
216
      }
217

218
      if (pTscObj->sysInfo != pRsp->sysInfo) {
36,852,723✔
219
        tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", conn:%" PRIi64, pRsp->user,
2,064✔
220
                 pTscObj->sysInfo, pRsp->sysInfo, pTscObj->id);
221
        pTscObj->sysInfo = pRsp->sysInfo;
2,064✔
222
      }
223

224
      // update password version
225
      if (pTscObj->passInfo.fp) {
36,852,723✔
226
        SPassInfo *passInfo = &pTscObj->passInfo;
380✔
227
        int32_t    oldVer = 0;
380✔
228
        do {
229
          oldVer = atomic_load_32(&passInfo->ver);
380✔
230
          if (oldVer >= pRsp->passVer) {
380✔
231
            break;
×
232
          }
233
        } while (atomic_val_compare_exchange_32(&passInfo->ver, oldVer, pRsp->passVer) != oldVer);
380✔
234
        if (oldVer < pRsp->passVer) {
380✔
235
          (*passInfo->fp)(passInfo->param, &pRsp->passVer, TAOS_NOTIFY_PASSVER);
380✔
236
          tscDebug("update passVer of user %s from %d to %d, conn:%" PRIi64, pRsp->user, oldVer,
380✔
237
                   atomic_load_32(&passInfo->ver), pTscObj->id);
238
        }
239
      }
240

241
      // update ip white list version
242
      {
243
        SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
36,852,723✔
244
        int64_t oldVer = 0, newVer = pRsp->whiteListVer;
36,852,723✔
245
        do {
246
          oldVer = atomic_load_64(&whiteListInfo->ver);
36,852,723✔
247
          if (oldVer >= newVer) {
36,852,723✔
248
            break;
36,842,810✔
249
          }
250
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
9,913✔
251

252
        if (oldVer < newVer) {
36,852,723✔
253
          if (whiteListInfo->fp) {
9,913✔
254
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_WHITELIST_VER);
×
255
          }
256
          tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
9,913✔
257
                   oldVer, newVer, pTscObj->id);
258
        }
259
      }
260

261
      // update date time whitelist version
262
      {
263
        SWhiteListInfo *whiteListInfo = &pTscObj->dateTimeWhiteListInfo;
36,852,723✔
264

265
        int64_t oldVer = 0, newVer = pRsp->timeWhiteListVer;
36,852,723✔
266
        do {
267
          oldVer = atomic_load_64(&whiteListInfo->ver);
36,852,723✔
268
          if (oldVer >= newVer) {
36,852,723✔
269
            break;
36,852,723✔
270
          }
271
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
×
272

273
        if (oldVer < newVer) {
36,852,723✔
274
          if (whiteListInfo->fp) {
×
275
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_DATETIME_WHITELIST_VER);
×
276
          }
277
          tscDebug("update date time whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
×
278
                   oldVer, newVer, pTscObj->id);
279
        }
280
      }
281
      
282
      releaseTscObj(pReq->connKey.tscRid);
36,852,723✔
283
    }
284
  }
285
  return 0;
26,715,827✔
286
}
287

288
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
1,677,995✔
289
  int32_t    code = 0;
1,677,995✔
290
  SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
1,677,995✔
291
  if (NULL == vgInfo) {
1,677,995✔
292
    return terrno;
×
293
  }
294

295
  vgInfo->vgVersion = rsp->vgVersion;
1,677,995✔
296
  vgInfo->stateTs = rsp->stateTs;
1,677,995✔
297
  vgInfo->flags = rsp->flags;
1,677,995✔
298
  vgInfo->hashMethod = rsp->hashMethod;
1,677,995✔
299
  vgInfo->hashPrefix = rsp->hashPrefix;
1,677,995✔
300
  vgInfo->hashSuffix = rsp->hashSuffix;
1,677,995✔
301
  vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,677,995✔
302
  if (NULL == vgInfo->vgHash) {
1,677,995✔
303
    tscError("hash init[%d] failed", rsp->vgNum);
×
304
    code = terrno;
×
305
    goto _return;
×
306
  }
307

308
  for (int32_t j = 0; j < rsp->vgNum; ++j) {
6,058,334✔
309
    SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
4,380,339✔
310
    if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
4,380,339✔
311
      tscError("hash push failed, terrno:%d", terrno);
×
312
      code = terrno;
×
313
      goto _return;
×
314
    }
315
  }
316

317
_return:
1,677,995✔
318
  if (code) {
1,677,995✔
319
    taosHashCleanup(vgInfo->vgHash);
×
320
    taosMemoryFreeClear(vgInfo);
×
321
  }
322

323
  *pInfo = vgInfo;
1,677,995✔
324
  return code;
1,677,995✔
325
}
326

327
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
9,063,422✔
328
  int32_t code = 0;
9,063,422✔
329

330
  SDbHbBatchRsp batchRsp = {0};
9,063,422✔
331
  if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
9,063,422✔
332
    terrno = TSDB_CODE_INVALID_MSG;
×
333
    code = terrno;
×
334
    goto _return;
×
335
  }
336

337
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
9,063,422✔
338
  for (int32_t i = 0; i < numOfBatchs; ++i) {
10,890,994✔
339
    SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
1,827,572✔
340
    if (NULL == rsp) {
1,827,572✔
341
      code = terrno;
×
342
      goto _return;
×
343
    }
344
    if (rsp->useDbRsp) {
1,827,572✔
345
      tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
1,378,479✔
346
               rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
347

348
      if (rsp->useDbRsp->vgVersion < 0) {
1,378,479✔
349
        tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
44,111✔
350
        code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
44,111✔
351
      } else {
352
        SDBVgInfo *vgInfo = NULL;
1,334,368✔
353
        code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
1,334,368✔
354
        if (TSDB_CODE_SUCCESS != code) {
1,334,368✔
355
          goto _return;
×
356
        }
357

358
        tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
1,334,368✔
359

360
        TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo));
1,334,368✔
361

362
        if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
1,334,368✔
363
          code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
343,627✔
364
          if (TSDB_CODE_SUCCESS != code) {
343,627✔
365
            goto _return;
×
366
          }
367

368
          TSC_ERR_JRET(catalogUpdateDBVgInfo(
343,627✔
369
              pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
370
              rsp->useDbRsp->uid, vgInfo));
371
        }
372
      }
373
    }
374

375
    if (rsp->cfgRsp) {
1,827,572✔
376
      tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
108,306✔
377
      code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
108,306✔
378
      rsp->cfgRsp = NULL;
108,306✔
379
    }
380
    if (rsp->pTsmaRsp) {
1,827,572✔
381
      if (rsp->pTsmaRsp->pTsmas) {
941,624✔
382
        for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
7,540✔
383
          STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
4,872✔
384
          if (NULL == pTsma) {
4,872✔
385
            TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
×
386
          }
387
          TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
4,872✔
388
        }
389
        taosArrayClear(rsp->pTsmaRsp->pTsmas);
2,668✔
390
      } else {
391
        TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId));
938,956✔
392
      }
393
    }
394
  }
395

396
_return:
9,063,422✔
397

398
  tFreeSDbHbBatchRsp(&batchRsp);
9,063,422✔
399
  return code;
9,063,422✔
400
}
401

402
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
6,316,979✔
403
  int32_t code = TSDB_CODE_SUCCESS;
6,316,979✔
404

405
  SSTbHbRsp hbRsp = {0};
6,316,979✔
406
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
6,316,979✔
407
    terrno = TSDB_CODE_INVALID_MSG;
×
408
    return -1;
×
409
  }
410

411
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
6,316,979✔
412
  for (int32_t i = 0; i < numOfMeta; ++i) {
6,394,412✔
413
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
77,433✔
414
    if (NULL == rsp) {
77,433✔
415
      code = terrno;
×
416
      goto _return;
×
417
    }
418
    if (rsp->numOfColumns < 0) {
77,433✔
419
      tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
39,745✔
420
      TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid));
39,745✔
421
    } else {
422
      tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
37,688✔
423
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
37,688✔
424
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
×
425
        tFreeSSTbHbRsp(&hbRsp);
×
426
        return TSDB_CODE_TSC_INVALID_VALUE;
×
427
      }
428

429
      TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp));
37,688✔
430
    }
431
  }
432

433
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
6,316,979✔
434
  for (int32_t i = 0; i < numOfIndex; ++i) {
6,316,979✔
435
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
×
436
    if (NULL == rsp) {
×
437
      code = terrno;
×
438
      goto _return;
×
439
    }
440
    TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp));
×
441
  }
442

443
_return:
6,316,979✔
444
  taosArrayDestroy(hbRsp.pIndexRsp);
6,316,979✔
445
  hbRsp.pIndexRsp = NULL;
6,316,979✔
446

447
  tFreeSSTbHbRsp(&hbRsp);
6,316,979✔
448
  return code;
6,316,979✔
449
}
450

451
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
6,755✔
452
  return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value);
6,755✔
453
}
454

455
static void hbFreeSViewMetaInRsp(void *p) {
×
456
  if (NULL == p || NULL == *(void **)p) {
×
457
    return;
×
458
  }
459
  SViewMetaRsp *pRsp = *(SViewMetaRsp **)p;
×
460
  tFreeSViewMetaRsp(pRsp);
×
461
  taosMemoryFreeClear(pRsp);
×
462
}
463

464
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
6,755✔
465
  int32_t code = TSDB_CODE_SUCCESS;
6,755✔
466

467
  SViewHbRsp hbRsp = {0};
6,755✔
468
  if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
6,755✔
469
    taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
×
470
    terrno = TSDB_CODE_INVALID_MSG;
×
471
    return -1;
×
472
  }
473

474
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
6,755✔
475
  for (int32_t i = 0; i < numOfMeta; ++i) {
7,523✔
476
    SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
768✔
477
    if (NULL == rsp) {
768✔
478
      code = terrno;
×
479
      goto _return;
×
480
    }
481
    if (rsp->numOfCols < 0) {
768✔
482
      tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
768✔
483
      code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
768✔
484
      tFreeSViewMetaRsp(rsp);
768✔
485
      taosMemoryFreeClear(rsp);
768✔
486
    } else {
487
      tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
488
      code = catalogUpdateViewMeta(pCatalog, rsp);
×
489
    }
490
    TSC_ERR_JRET(code);
768✔
491
  }
492

493
_return:
6,755✔
494
  taosArrayDestroy(hbRsp.pViewRsp);
6,755✔
495
  return code;
6,755✔
496
}
497

498
static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
4,988✔
499
  int32_t code = 0;
4,988✔
500

501
  STSMAHbRsp hbRsp = {0};
4,988✔
502
  if (tDeserializeTSMAHbRsp(value, valueLen, &hbRsp)) {
4,988✔
503
    terrno = TSDB_CODE_INVALID_MSG;
×
504
    return -1;
×
505
  }
506

507
  int32_t numOfTsma = taosArrayGetSize(hbRsp.pTsmas);
4,988✔
508
  for (int32_t i = 0; i < numOfTsma; ++i) {
4,988✔
509
    STableTSMAInfo *pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i);
×
510

511
    if (!pTsmaInfo->pFuncs) {
×
512
      tscDebug("hb to remove tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
513
      code = catalogRemoveTSMA(pCatalog, pTsmaInfo);
×
514
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
515
    } else {
516
      tscDebug("hb to update tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
517
      code = catalogUpdateTSMA(pCatalog, &pTsmaInfo);
×
518
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
519
    }
520
    TSC_ERR_JRET(code);
×
521
  }
522

523
_return:
4,988✔
524
  taosArrayDestroy(hbRsp.pTsmas);
4,988✔
525
  return code;
4,988✔
526
}
527

528
static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
26,715,886✔
529
  for (int32_t i = 0; i < kvNum; ++i) {
68,830,612✔
530
    SKv *kv = taosArrayGet(pKvs, i);
42,114,726✔
531
    if (NULL == kv) {
42,114,726✔
532
      tscError("invalid hb kv, idx:%d", i);
×
533
      continue;
×
534
    }
535
    switch (kv->key) {
42,114,726✔
536
      case HEARTBEAT_KEY_USER_AUTHINFO: {
26,715,827✔
537
        if (kv->valueLen <= 0 || NULL == kv->value) {
26,715,827✔
538
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
×
539
          break;
×
540
        }
541
        if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) {
26,715,827✔
542
          tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
543
          break;
×
544
        }
545
        break;
26,715,827✔
546
      }
547
      case HEARTBEAT_KEY_DBINFO: {
9,063,422✔
548
        if (kv->valueLen <= 0 || NULL == kv->value) {
9,063,422✔
549
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
×
550
          break;
×
551
        }
552
        if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) {
9,063,422✔
553
          tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
554
          break;
×
555
        }
556
        break;
9,063,422✔
557
      }
558
      case HEARTBEAT_KEY_STBINFO: {
6,316,979✔
559
        if (kv->valueLen <= 0 || NULL == kv->value) {
6,316,979✔
560
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
×
561
          break;
×
562
        }
563
        if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) {
6,316,979✔
564
          tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
565
          break;
×
566
        }
567
        break;
6,316,979✔
568
      }
569
#ifdef TD_ENTERPRISE
570
      case HEARTBEAT_KEY_DYN_VIEW: {
6,755✔
571
        if (kv->valueLen <= 0 || NULL == kv->value) {
6,755✔
572
          tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
×
573
          break;
×
574
        }
575
        if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) {
6,755✔
576
          tscError("Process dyn view response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
577
          break;
×
578
        }
579
        break;
6,755✔
580
      }
581
      case HEARTBEAT_KEY_VIEWINFO: {
6,755✔
582
        if (kv->valueLen <= 0 || NULL == kv->value) {
6,755✔
583
          tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
×
584
          break;
×
585
        }
586
        if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) {
6,755✔
587
          tscError("Process view info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
588
          break;
×
589
        }
590
        break;
6,755✔
591
      }
592
#endif
593
      case HEARTBEAT_KEY_TSMA: {
4,988✔
594
        if (kv->valueLen <= 0 || !kv->value) {
4,988✔
595
          tscError("Invalid tsma info, len:%d, value:%p", kv->valueLen, kv->value);
×
596
        }
597
        if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) {
4,988✔
598
          tscError("Process tsma info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
599
        }
600
        break;
4,988✔
601
      }
602
      default:
×
603
        tscError("invalid hb key type:%d", kv->key);
×
604
        break;
×
605
    }
606
  }
607
}
26,715,886✔
608

609
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
35,218,432✔
610
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
35,218,432✔
611
  if (NULL == pReq) {
35,218,432✔
612
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
101,002✔
613
            pRsp->connKey.connType);
614
    return TSDB_CODE_SUCCESS;
101,002✔
615
  }
616

617
  if (pRsp->query) {
35,117,430✔
618
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
35,117,430✔
619
    if (NULL == pTscObj) {
35,117,430✔
620
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
10,010✔
621
    } else {
622
      if (pRsp->query->totalDnodes > 1) {
35,107,420✔
623
        SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
8,177,165✔
624
        if (!isEpsetEqual(&originEpset, &pRsp->query->epSet)) {
8,177,165✔
625
          SEpSet *pOrig = &originEpset;
28,398✔
626
          SEp    *pOrigEp = &pOrig->eps[pOrig->inUse];
28,398✔
627
          SEp    *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
28,398✔
628
          tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
28,398✔
629
                   pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
630
                   pNewEp->port);
631

632
          updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
28,398✔
633
        }
634
      }
635

636
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
35,107,420✔
637
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
35,107,420✔
638
      pTscObj->connId = pRsp->query->connId;
35,107,420✔
639
      tscTrace("connId:%u, hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
35,107,420✔
640
               pTscObj->pAppInfo->totalDnodes);
641

642
      if (pRsp->query->killRid) {
35,107,420✔
643
        tscDebug("QID:0x%" PRIx64 ", need to be killed now", pRsp->query->killRid);
2✔
644
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
2✔
645
        if (NULL == pRequest) {
2✔
646
          tscDebug("QID:0x%" PRIx64 ", not exist to kill", pRsp->query->killRid);
×
647
        } else {
648
          taos_stop_query((TAOS_RES *)pRequest);
2✔
649
          (void)releaseRequest(pRsp->query->killRid);
2✔
650
        }
651
      }
652

653
      if (pRsp->query->killConnection) {
35,107,420✔
654
        taos_close_internal(pTscObj);
×
655
      }
656

657
      if (pRsp->query->pQnodeList) {
35,107,420✔
658
        if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) {
328,883✔
659
          tscWarn("update qnode list failed");
×
660
        }
661
      }
662

663
      releaseTscObj(pRsp->connKey.tscRid);
35,107,420✔
664
    }
665
  }
666

667
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
35,117,430✔
668

669
  tscDebug("hb got %d rsp kv", kvNum);
35,117,430✔
670

671
  if (kvNum > 0) {
35,117,430✔
672
    struct SCatalog *pCatalog = NULL;
26,715,886✔
673
    int32_t          code = catalogGetHandle(pReq->clusterId, &pCatalog);
26,715,886✔
674
    if (code != TSDB_CODE_SUCCESS) {
26,715,886✔
675
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
×
676
    } else {
677
      hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr);
26,715,886✔
678
    }
679
  }
680

681
  taosHashRelease(pAppHbMgr->activeInfo, pReq);
35,117,430✔
682

683
  return TSDB_CODE_SUCCESS;
35,117,430✔
684
}
685

686
/* Choose execution phase and phase start time for desc.
687
 * Rules:
688
 *  - If both timestamps are invalid (<= 0), keep the request's stored values.
689
 *  - If only one timestamp is valid, use the valid one.
690
 *  - If both are valid, prefer the job's phase/time when jobPhaseTime >= request's phaseStartTime;
691
 *    otherwise prefer the request's record.
692
 *
693
 * Note: If the local system clock was moved backwards (time rollback), jobPhaseTime may appear
694
 * earlier than the request's phase start time even when the job record is actually newer.
695
 * This logic avoids reporting a regressed start time in that situation.
696
 */
697
static void hbSetRequestPhase(SRequestObj *pRequest, SQueryDesc *desc) {
20,877,325✔
698
  int32_t jobPhase = QUERY_PHASE_NONE;
20,877,325✔
699
  int64_t jobPhaseTime = 0;
20,877,325✔
700
  int32_t phaseCode = schedulerGetJobPhase(pRequest->body.queryJob, &jobPhase, &jobPhaseTime);
20,877,325✔
701
  if (phaseCode != TSDB_CODE_SUCCESS) {
20,877,325✔
702
    tscWarn("get job phase failed, code:%d", phaseCode);
5,746,506✔
703
    desc->execPhase = CLIENT_GET_REQUEST_PHASE(pRequest);
5,746,506✔
704
    desc->phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
5,746,506✔
705
    return;
5,746,506✔
706
  }
707

708
  tscDebug("get job phase success, jobPhase:%d, jobPhaseTime:%" PRId64, jobPhase, jobPhaseTime);
15,130,819✔
709
  int64_t phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
15,130,819✔
710
  int32_t phaseStatus = CLIENT_GET_REQUEST_PHASE(pRequest);
15,130,819✔
711

712
  if (jobPhaseTime <= 0 && phaseStartTime <= 0) {
15,130,819✔
713
    /* No valid time available, keep original behavior */
714
    desc->phaseStartTime = phaseStartTime;
×
715
    desc->execPhase = phaseStatus;
×
716
  } else if (jobPhaseTime <= 0) {
15,130,819✔
717
    desc->phaseStartTime = phaseStartTime;
9,634✔
718
    desc->execPhase = phaseStatus;
9,634✔
719
  } else if (phaseStartTime <= 0) {
15,121,185✔
720
    desc->phaseStartTime = jobPhaseTime;
59,749✔
721
    desc->execPhase = jobPhase;
59,749✔
722
  } else if (jobPhaseTime >= phaseStartTime) {
15,061,436✔
723
    /* Job record is newer (or equal, prefer job) */
724
    desc->phaseStartTime = jobPhaseTime;
13,195,528✔
725
    desc->execPhase = jobPhase;
13,195,528✔
726
  } else {
727
    /* Request record is newer */
728
    desc->phaseStartTime = phaseStartTime;
1,865,908✔
729
    desc->execPhase = phaseStatus;
1,865,908✔
730
  }
731
}
732

733
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
29,194,222✔
734
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
29,194,222✔
735
    goto _return;
773✔
736
  }
737

738
  static int32_t    emptyRspNum = 0;
739
  int32_t           idx = *(int32_t *)param;
29,193,449✔
740
  SClientHbBatchRsp pRsp = {0};
29,193,449✔
741
  if (TSDB_CODE_SUCCESS == code) {
29,193,285✔
742
    code = tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
28,993,789✔
743
    if (TSDB_CODE_SUCCESS != code) {
28,993,734✔
744
      tscError("deserialize hb rsp failed");
70✔
745
    }
746
    int32_t now = taosGetTimestampSec();
28,993,734✔
747
    int32_t delta = abs(now - pRsp.svrTimestamp);
28,991,370✔
748
    if (delta > tsTimestampDeltaLimit) {
28,991,370✔
749
      code = TSDB_CODE_TIME_UNSYNCED;
70✔
750
      tscError("time diff:%ds is too big", delta);
70✔
751
    }
752
  }
753

754
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
29,190,866✔
755

756
  (void)taosThreadMutexLock(&clientHbMgr.lock);
29,192,976✔
757

758
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
29,193,449✔
759
  if (pAppHbMgr == NULL) {
29,193,449✔
760
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
761
    tscError("appHbMgr not exist, idx:%d", idx);
×
762
    taosMemoryFree(pMsg->pData);
×
763
    taosMemoryFree(pMsg->pEpSet);
×
764
    tFreeClientHbBatchRsp(&pRsp);
765
    return TSDB_CODE_OUT_OF_RANGE;
×
766
  }
767

768
  SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
29,193,449✔
769

770
  if (code != 0) {
29,193,449✔
771
    pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
199,566✔
772
    tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
199,566✔
773
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
199,566✔
774
    taosMemoryFree(pMsg->pData);
199,566✔
775
    taosMemoryFree(pMsg->pEpSet);
199,566✔
776
    tFreeClientHbBatchRsp(&pRsp);
777
    return code;
199,566✔
778
  }
779

780
  pInst->serverCfg.monitorParas = pRsp.monitorParas;
28,993,883✔
781
  pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
28,993,883✔
782
  pInst->serverCfg.enableAuditSelect = pRsp.enableAuditSelect;
28,993,883✔
783
  pInst->serverCfg.enableAuditInsert = pRsp.enableAuditInsert;
28,993,883✔
784
  pInst->serverCfg.auditLevel = pRsp.auditLevel;
28,993,883✔
785
  pInst->serverCfg.enableStrongPass = pRsp.enableStrongPass;
28,993,883✔
786
  tsEnableStrongPassword = pInst->serverCfg.enableStrongPass;
28,993,883✔
787
  tscDebug("monitor paras from hb, clusterId:0x%" PRIx64 ", threshold:%d scope:%d", pInst->clusterId,
28,993,883✔
788
           pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
789

790
  if (rspNum) {
28,993,883✔
791
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
28,401,205✔
792
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
793
  } else {
794
    (void)atomic_add_fetch_32(&emptyRspNum, 1);
592,678✔
795
  }
796

797
  for (int32_t i = 0; i < rspNum; ++i) {
64,212,315✔
798
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
35,218,432✔
799
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
35,218,432✔
800
    if (code) {
35,218,432✔
801
      break;
×
802
    }
803
  }
804

805
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
28,993,883✔
806

807
  tFreeClientHbBatchRsp(&pRsp);
808

809
_return:
28,994,656✔
810
  taosMemoryFree(pMsg->pData);
28,994,656✔
811
  taosMemoryFree(pMsg->pEpSet);
28,994,656✔
812
  return code;
28,994,656✔
813
}
814

815
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
32,070,810✔
816
  int64_t    now = taosGetTimestampUs();
32,070,810✔
817
  SQueryDesc desc = {0};
32,070,810✔
818
  int32_t    code = 0;
32,070,810✔
819

820
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
32,070,810✔
821
  while (pIter != NULL) {
64,759,582✔
822
    int64_t     *rid = pIter;
32,688,772✔
823
    SRequestObj *pRequest = acquireRequest(*rid);
32,688,772✔
824
    if (NULL == pRequest) {
32,688,772✔
825
      pIter = taosHashIterate(pObj->pRequests, pIter);
30,841✔
826
      continue;
30,841✔
827
    }
828

829
    if (pRequest->killed || 0 == pRequest->body.queryJob) {
32,657,931✔
830
      (void)releaseRequest(*rid);
11,780,606✔
831
      pIter = taosHashIterate(pObj->pRequests, pIter);
11,780,606✔
832
      continue;
11,780,606✔
833
    }
834

835
    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
20,877,325✔
836
    desc.stime = pRequest->metric.start / 1000;
20,877,325✔
837
    desc.queryId = pRequest->requestId;
20,877,325✔
838
    desc.useconds = now - pRequest->metric.start;
20,877,325✔
839
    desc.reqRid = pRequest->self;
20,877,325✔
840
    desc.stableQuery = pRequest->stableQuery;
20,877,325✔
841
    desc.isSubQuery = pRequest->isSubReq;
20,877,325✔
842
    code = taosGetFqdn(desc.fqdn);
20,877,325✔
843
    if (TSDB_CODE_SUCCESS != code) {
20,877,325✔
844
      (void)releaseRequest(*rid);
×
845
      tscError("get fqdn failed");
×
846
      return TSDB_CODE_FAILED;
×
847
    }
848
    desc.subPlanNum = pRequest->body.subplanNum;
20,877,325✔
849

850
    /* Determine and set the execution phase and its start time for this request. */
851
    hbSetRequestPhase(pRequest, &desc);
20,877,325✔
852

853
    if (desc.subPlanNum) {
20,877,325✔
854
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
20,877,325✔
855
      if (NULL == desc.subDesc) {
20,877,325✔
856
        (void)releaseRequest(*rid);
×
857
        return terrno;
×
858
      }
859

860
      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
20,877,325✔
861
      if (code) {
20,877,325✔
862
        taosArrayDestroy(desc.subDesc);
5,814,319✔
863
        desc.subDesc = NULL;
5,814,319✔
864
        code = TSDB_CODE_SUCCESS;
5,814,319✔
865
      }
866
      desc.subPlanNum = taosArrayGetSize(desc.subDesc);
20,877,325✔
867
    } else {
868
      desc.subDesc = NULL;
×
869
    }
870

871
    (void)releaseRequest(*rid);
20,877,325✔
872
    if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) {
41,754,650✔
873
      taosArrayDestroy(desc.subDesc);
×
874
      return terrno;
×
875
    }
876

877
    pIter = taosHashIterate(pObj->pRequests, pIter);
20,877,325✔
878
  }
879

880
  return code;
32,070,810✔
881
}
882

883
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
35,515,750✔
884
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
35,515,750✔
885
  if (NULL == pTscObj) {
35,515,750✔
886
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
93✔
887
    return terrno;
93✔
888
  }
889

890
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
35,515,657✔
891
  if (NULL == hbBasic) {
35,515,657✔
892
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
×
893
    releaseTscObj(connKey->tscRid);
×
894
    return terrno;
×
895
  }
896

897
  hbBasic->connId = pTscObj->connId;
35,515,657✔
898

899
  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
35,515,657✔
900
  if (numOfQueries <= 0) {
35,515,657✔
901
    req->query = hbBasic;
3,444,847✔
902
    releaseTscObj(connKey->tscRid);
3,444,847✔
903
    tscDebug("no queries on connection");
3,444,847✔
904
    return TSDB_CODE_SUCCESS;
3,444,847✔
905
  }
906

907
  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
32,070,810✔
908
  if (NULL == hbBasic->queryDesc) {
32,070,810✔
909
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
×
910
    releaseTscObj(connKey->tscRid);
×
911
    taosMemoryFree(hbBasic);
×
912
    return terrno;
×
913
  }
914

915
  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
32,070,810✔
916
  if (code) {
32,070,810✔
917
    releaseTscObj(connKey->tscRid);
×
918
    if (hbBasic->queryDesc) {
×
919
      taosArrayDestroyEx(hbBasic->queryDesc, tFreeClientHbQueryDesc);
×
920
    }
921
    taosMemoryFree(hbBasic);
×
922
    return code;
×
923
  }
924

925
  req->query = hbBasic;
32,070,810✔
926
  releaseTscObj(connKey->tscRid);
32,070,810✔
927

928
  return TSDB_CODE_SUCCESS;
32,070,810✔
929
}
930

931
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
974,166✔
932
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
974,166✔
933
  if (!pTscObj) {
974,166✔
934
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
446✔
935
    return terrno;
446✔
936
  }
937

938
  int32_t code = 0;
973,720✔
939

940
  SKv  kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
973,720✔
941
  SKv *pKv = NULL;
973,720✔
942
  if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
973,720✔
943
    int32_t           userNum = pKv->valueLen / sizeof(SUserAuthVersion);
548,024✔
944
    SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
548,024✔
945
    for (int32_t i = 0; i < userNum; ++i) {
563,105✔
946
      SUserAuthVersion *pUserAuth = userAuths + i;
563,105✔
947
      // both key and user exist, update version
948
      if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
563,105✔
949
        pUserAuth->version = htonl(-1);  // force get userAuthInfo
548,024✔
950
        goto _return;
548,024✔
951
      }
952
    }
953
    // key exists, user not exist, append user
UNCOV
954
    SUserAuthVersion *qUserAuth =
×
UNCOV
955
        (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
×
UNCOV
956
    if (qUserAuth) {
×
UNCOV
957
      tstrncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
×
UNCOV
958
      (qUserAuth + userNum)->version = htonl(-1);  // force get userAuthInfo
×
UNCOV
959
      pKv->value = qUserAuth;
×
UNCOV
960
      pKv->valueLen += sizeof(SUserAuthVersion);
×
961
    } else {
962
      code = terrno;
×
963
    }
UNCOV
964
    goto _return;
×
965
  }
966

967
  // key/user not exist, add user
968
  SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
425,696✔
969
  if (!user) {
425,696✔
970
    code = terrno;
×
971
    goto _return;
×
972
  }
973
  tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN);
425,696✔
974
  user->version = htonl(-1);  // force get userAuthInfo
425,696✔
975
  kv.valueLen = sizeof(SUserAuthVersion);
425,696✔
976
  kv.value = user;
425,696✔
977

978
  tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
425,696✔
979
           pTscObj->authVer, connKey->tscRid);
980

981
  if (!req->info) {
425,696✔
982
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
425,696✔
983
    if (NULL == req->info) {
425,696✔
984
      code = terrno;
×
985
      goto _return;
×
986
    }
987
  }
988

989
  if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) {
425,696✔
990
    taosMemoryFree(user);
×
991
    code = terrno ? terrno : TSDB_CODE_APP_ERROR;
×
992
    goto _return;
×
993
  }
994

995
_return:
958,049✔
996
  releaseTscObj(connKey->tscRid);
973,720✔
997
  if (code) {
973,720✔
998
    tscError("hb got user auth info failed since %s", tstrerror(code));
×
999
  }
1000

1001
  return code;
973,720✔
1002
}
1003

1004
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
26,711,771✔
1005
  SUserAuthVersion *users = NULL;
26,711,771✔
1006
  uint32_t          userNum = 0;
26,711,771✔
1007
  int32_t           code = 0;
26,711,771✔
1008

1009
  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
26,711,771✔
1010
  if (TSDB_CODE_SUCCESS != code) {
26,711,771✔
1011
    return code;
×
1012
  }
1013

1014
  if (userNum <= 0) {
26,711,771✔
1015
    taosMemoryFree(users);
180,559✔
1016
    return TSDB_CODE_SUCCESS;
180,559✔
1017
  }
1018

1019
  for (int32_t i = 0; i < userNum; ++i) {
53,473,111✔
1020
    SUserAuthVersion *user = &users[i];
26,941,899✔
1021
    user->version = htonl(user->version);
26,941,899✔
1022
  }
1023

1024
  SKv kv = {
26,531,212✔
1025
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
1026
      .valueLen = sizeof(SUserAuthVersion) * userNum,
26,531,212✔
1027
      .value = users,
1028
  };
1029

1030
  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
26,531,212✔
1031

1032
  if (NULL == req->info) {
26,531,212✔
1033
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
26,531,212✔
1034
    if (NULL == req->info) {
26,531,212✔
1035
      taosMemoryFree(users);
×
1036
      return terrno;
×
1037
    }
1038
  }
1039

1040
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
26,531,212✔
1041
  if (TSDB_CODE_SUCCESS != code) {
26,531,212✔
1042
    taosMemoryFree(users);
×
1043
    return code;
×
1044
  }
1045

1046
  return TSDB_CODE_SUCCESS;
26,531,212✔
1047
}
1048

1049
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
28,623,779✔
1050
  SDbCacheInfo *dbs = NULL;
28,623,779✔
1051
  uint32_t      dbNum = 0;
28,623,779✔
1052
  int32_t       code = 0;
28,623,779✔
1053

1054
  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
28,623,779✔
1055
  if (TSDB_CODE_SUCCESS != code) {
28,623,779✔
1056
    return code;
×
1057
  }
1058

1059
  if (dbNum <= 0) {
28,623,779✔
1060
    taosMemoryFree(dbs);
19,507,066✔
1061
    return TSDB_CODE_SUCCESS;
19,507,066✔
1062
  }
1063

1064
  for (int32_t i = 0; i < dbNum; ++i) {
20,607,459✔
1065
    SDbCacheInfo *db = &dbs[i];
11,490,746✔
1066
    tscDebug("the %dth expired db:%s, dbId:%" PRId64
11,490,746✔
1067
             ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
1068
             i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
1069

1070
    db->dbId = htobe64(db->dbId);
11,490,746✔
1071
    db->vgVersion = htonl(db->vgVersion);
11,490,746✔
1072
    db->cfgVersion = htonl(db->cfgVersion);
11,490,746✔
1073
    db->numOfTable = htonl(db->numOfTable);
11,490,746✔
1074
    db->stateTs = htobe64(db->stateTs);
11,490,746✔
1075
    db->tsmaVersion = htonl(db->tsmaVersion);
11,490,746✔
1076
  }
1077

1078
  SKv kv = {
9,116,713✔
1079
      .key = HEARTBEAT_KEY_DBINFO,
1080
      .valueLen = sizeof(SDbCacheInfo) * dbNum,
9,116,713✔
1081
      .value = dbs,
1082
  };
1083

1084
  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
9,116,713✔
1085

1086
  if (NULL == req->info) {
9,116,713✔
1087
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
44✔
1088
    if (NULL == req->info) {
44✔
1089
      taosMemoryFree(dbs);
×
1090
      return terrno;
×
1091
    }
1092
  }
1093

1094
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
9,116,713✔
1095
  if (TSDB_CODE_SUCCESS != code) {
9,116,713✔
1096
    taosMemoryFree(dbs);
×
1097
    return code;
×
1098
  }
1099

1100
  return TSDB_CODE_SUCCESS;
9,116,713✔
1101
}
1102

1103
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
28,623,779✔
1104
  SSTableVersion *stbs = NULL;
28,623,779✔
1105
  uint32_t        stbNum = 0;
28,623,779✔
1106
  int32_t         code = 0;
28,623,779✔
1107

1108
  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
28,623,779✔
1109
  if (TSDB_CODE_SUCCESS != code) {
28,623,779✔
1110
    return code;
×
1111
  }
1112

1113
  if (stbNum <= 0) {
28,623,779✔
1114
    taosMemoryFree(stbs);
22,286,521✔
1115
    return TSDB_CODE_SUCCESS;
22,286,521✔
1116
  }
1117

1118
  for (int32_t i = 0; i < stbNum; ++i) {
15,133,792✔
1119
    SSTableVersion *stb = &stbs[i];
8,796,534✔
1120
    stb->suid = htobe64(stb->suid);
8,796,534✔
1121
    stb->sversion = htonl(stb->sversion);
8,796,534✔
1122
    stb->tversion = htonl(stb->tversion);
8,796,534✔
1123
    stb->smaVer = htonl(stb->smaVer);
8,796,534✔
1124
  }
1125

1126
  SKv kv = {
6,337,258✔
1127
      .key = HEARTBEAT_KEY_STBINFO,
1128
      .valueLen = sizeof(SSTableVersion) * stbNum,
6,337,258✔
1129
      .value = stbs,
1130
  };
1131

1132
  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
6,337,258✔
1133

1134
  if (NULL == req->info) {
6,337,258✔
1135
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
15✔
1136
    if (NULL == req->info) {
15✔
1137
      taosMemoryFree(stbs);
×
1138
      return terrno;
×
1139
    }
1140
  }
1141

1142
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
6,337,258✔
1143
  if (TSDB_CODE_SUCCESS != code) {
6,337,258✔
1144
    taosMemoryFree(stbs);
×
1145
    return code;
×
1146
  }
1147

1148
  return TSDB_CODE_SUCCESS;
6,337,258✔
1149
}
1150

1151
int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
28,623,779✔
1152
  SViewVersion    *views = NULL;
28,623,779✔
1153
  uint32_t         viewNum = 0;
28,623,779✔
1154
  int32_t          code = 0;
28,623,779✔
1155
  SDynViewVersion *pDynViewVer = NULL;
28,623,779✔
1156

1157
  TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer));
28,623,779✔
1158

1159
  if (viewNum <= 0) {
28,623,779✔
1160
    taosMemoryFree(views);
28,611,069✔
1161
    taosMemoryFree(pDynViewVer);
28,611,069✔
1162
    return TSDB_CODE_SUCCESS;
28,611,069✔
1163
  }
1164

1165
  for (int32_t i = 0; i < viewNum; ++i) {
25,420✔
1166
    SViewVersion *view = &views[i];
12,710✔
1167
    view->dbId = htobe64(view->dbId);
12,710✔
1168
    view->viewId = htobe64(view->viewId);
12,710✔
1169
    view->version = htonl(view->version);
12,710✔
1170
  }
1171

1172
  tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
12,710✔
1173

1174
  if (NULL == req->info) {
12,710✔
1175
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1176
    if (NULL == req->info) {
×
1177
      TSC_ERR_JRET(terrno);
×
1178
    }
1179
  }
1180

1181
  SKv kv = {
12,710✔
1182
      .key = HEARTBEAT_KEY_DYN_VIEW,
1183
      .valueLen = sizeof(SDynViewVersion),
1184
      .value = pDynViewVer,
1185
  };
1186

1187
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
12,710✔
1188

1189
  kv.key = HEARTBEAT_KEY_VIEWINFO;
12,710✔
1190
  kv.valueLen = sizeof(SViewVersion) * viewNum;
12,710✔
1191
  kv.value = views;
12,710✔
1192

1193
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
12,710✔
1194
  return TSDB_CODE_SUCCESS;
12,710✔
1195
_return:
×
1196
  taosMemoryFree(views);
×
1197
  taosMemoryFree(pDynViewVer);
×
1198
  return code;
×
1199
}
1200

1201
int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) {
28,623,779✔
1202
  int32_t       code = 0;
28,623,779✔
1203
  uint32_t      tsmaNum = 0;
28,623,779✔
1204
  STSMAVersion *tsmas = NULL;
28,623,779✔
1205

1206
  code = catalogGetExpiredTsmas(pCatalog, &tsmas, &tsmaNum);
28,623,779✔
1207
  if (code) {
28,623,779✔
1208
    taosMemoryFree(tsmas);
×
1209
    return code;
×
1210
  }
1211

1212
  if (tsmaNum <= 0) {
28,623,779✔
1213
    taosMemoryFree(tsmas);
28,618,791✔
1214
    return TSDB_CODE_SUCCESS;
28,618,791✔
1215
  }
1216

1217
  for (int32_t i = 0; i < tsmaNum; ++i) {
12,180✔
1218
    STSMAVersion *tsma = &tsmas[i];
7,192✔
1219
    tsma->dbId = htobe64(tsma->dbId);
7,192✔
1220
    tsma->tsmaId = htobe64(tsma->tsmaId);
7,192✔
1221
    tsma->version = htonl(tsma->version);
7,192✔
1222
  }
1223

1224
  tscDebug("hb got %d expred tsmas, valueLen:%lu", tsmaNum, sizeof(STSMAVersion) * tsmaNum);
4,988✔
1225

1226
  if (!pReq->info) {
4,988✔
1227
    pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1228
    if (!pReq->info) {
×
1229
      taosMemoryFree(tsmas);
×
1230
      return terrno;
×
1231
    }
1232
  }
1233

1234
  SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas};
4,988✔
1235
  code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
4,988✔
1236
  if (TSDB_CODE_SUCCESS != code) {
4,988✔
1237
    taosMemoryFree(tsmas);
×
1238
    return code;
×
1239
  }
1240
  return TSDB_CODE_SUCCESS;
4,988✔
1241
}
1242

1243
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
35,515,657✔
1244
  SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
35,515,657✔
1245
  if (NULL != pApp) {
35,515,657✔
1246
    (void)memcpy(&req->app, pApp, sizeof(*pApp));
35,515,657✔
1247
  } else {
1248
    (void)memset(&req->app.summary, 0, sizeof(req->app.summary));
×
1249
    req->app.pid = taosGetPId();
×
1250
    req->app.appId = clientHbMgr.appId;
×
1251
    TSC_ERR_RET(taosGetAppName(req->app.name, NULL));
×
1252
  }
1253

1254
  return TSDB_CODE_SUCCESS;
35,515,657✔
1255
}
1256

1257
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
35,515,750✔
1258
  int32_t   code = 0;
35,515,750✔
1259
  SHbParam *hbParam = (SHbParam *)param;
35,515,750✔
1260
  SCatalog *pCatalog = NULL;
35,515,750✔
1261

1262
  code = hbGetQueryBasicInfo(connKey, req);
35,515,750✔
1263
  if (code != TSDB_CODE_SUCCESS) {
35,515,750✔
1264
    tscWarn("hbGetQueryBasicInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
93✔
1265
    return code;
93✔
1266
  }
1267

1268
  if (hbParam->reqCnt == 0) {
35,515,657✔
1269
    code = catalogGetHandle(hbParam->clusterId, &pCatalog);
28,624,225✔
1270
    if (code != TSDB_CODE_SUCCESS) {
28,624,225✔
1271
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1272
      return code;
×
1273
    }
1274

1275
    code = hbGetAppInfo(hbParam->clusterId, req);
28,624,225✔
1276
    if (TSDB_CODE_SUCCESS != code) {
28,624,225✔
1277
      tscWarn("getAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1278
      return code;
×
1279
    }
1280

1281
    if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
28,624,225✔
1282
      code = hbGetExpiredUserInfo(connKey, pCatalog, req);
26,711,771✔
1283
      if (TSDB_CODE_SUCCESS != code) {
26,711,771✔
1284
        tscWarn("hbGetExpiredUserInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1285
        return code;
×
1286
      }
1287
      if (clientHbMgr.appHbHash) {
26,711,771✔
1288
        code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
3,144,946✔
1289
        if (TSDB_CODE_SUCCESS != code) {
3,144,946✔
1290
          tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId,
×
1291
                  tstrerror(code));
1292
          return code;
×
1293
        }
1294
      }
1295
    }
1296

1297
    // invoke after hbGetExpiredUserInfo
1298
    if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
28,624,225✔
1299
      code = hbGetUserAuthInfo(connKey, hbParam, req);
974,166✔
1300
      if (TSDB_CODE_SUCCESS != code) {
974,166✔
1301
        tscWarn("hbGetUserAuthInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
446✔
1302
        return code;
446✔
1303
      }
1304
      atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
973,720✔
1305
    }
1306

1307
    code = hbGetExpiredDBInfo(connKey, pCatalog, req);
28,623,779✔
1308
    if (TSDB_CODE_SUCCESS != code) {
28,623,779✔
1309
      tscWarn("hbGetExpiredDBInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1310
      return code;
×
1311
    }
1312

1313
    code = hbGetExpiredStbInfo(connKey, pCatalog, req);
28,623,779✔
1314
    if (TSDB_CODE_SUCCESS != code) {
28,623,779✔
1315
      tscWarn("hbGetExpiredStbInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1316
      return code;
×
1317
    }
1318

1319
#ifdef TD_ENTERPRISE
1320
    code = hbGetExpiredViewInfo(connKey, pCatalog, req);
28,623,779✔
1321
    if (TSDB_CODE_SUCCESS != code) {
28,623,779✔
1322
      tscWarn("hbGetExpiredViewInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1323
      return code;
×
1324
    }
1325
#endif
1326
    code = hbGetExpiredTSMAInfo(connKey, pCatalog, req);
28,623,779✔
1327
    if (TSDB_CODE_SUCCESS != code) {
28,623,779✔
1328
      tscWarn("hbGetExpiredTSMAInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1329
      return code;
×
1330
    }
1331
  } else {
1332
    code = hbGetAppInfo(hbParam->clusterId, req);
6,891,432✔
1333
    if (TSDB_CODE_SUCCESS != code) {
6,891,432✔
1334
      tscWarn("hbGetAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1335
      return code;
×
1336
    }
1337
  }
1338

1339
  ++hbParam->reqCnt;  // success to get catalog info
35,515,211✔
1340

1341
  return TSDB_CODE_SUCCESS;
35,515,211✔
1342
}
1343

1344
static FORCE_INLINE void hbMgrInitHandle() {
1345
  // init all handle
1346
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
1,447,914✔
1347
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbQueryHbReqHandle;
1,447,914✔
1348

1349
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
1,447,914✔
1350
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbQueryHbRspHandle;
1,447,914✔
1351
}
1,447,914✔
1352

1353
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
29,218,695✔
1354
  *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
29,218,695✔
1355
  if (pBatchReq == NULL) {
29,218,695✔
1356
    return terrno;
×
1357
  }
1358
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
29,218,695✔
1359
  (*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
29,218,695✔
1360
  if (!(*pBatchReq)->reqs) {
29,218,695✔
1361
    tFreeClientHbBatchReq(*pBatchReq);
×
1362
    return terrno;
×
1363
  }
1364

1365
  int64_t  maxIpWhiteVer = 0;
29,218,695✔
1366
  void    *pIter = NULL;
29,218,695✔
1367
  SHbParam param = {0};
29,218,695✔
1368
  while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
66,420,679✔
1369
    SClientHbReq *pOneReq = pIter;
37,201,984✔
1370
    SClientHbKey *connKey = &pOneReq->connKey;
37,201,984✔
1371
    STscObj      *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
37,201,984✔
1372

1373
    if (!pTscObj || atomic_load_8(&pTscObj->dropped) == 1) {
37,201,984✔
1374
      if (pTscObj) releaseTscObj(connKey->tscRid);
1,686,234✔
1375
      continue;
1,686,234✔
1376
    }
1377

1378
    tstrncpy(pOneReq->userApp, pTscObj->optionInfo.userApp, sizeof(pOneReq->userApp));
35,515,750✔
1379
    tstrncpy(pOneReq->cInfo, pTscObj->optionInfo.cInfo, sizeof(pOneReq->cInfo));
35,515,750✔
1380
    pOneReq->userIp = pTscObj->optionInfo.userIp;
35,515,750✔
1381
    pOneReq->userDualIp = pTscObj->optionInfo.userDualIp;
35,515,750✔
1382
    tstrncpy(pOneReq->sVer, td_version, TSDB_VERSION_LEN);
35,515,750✔
1383

1384
    pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
35,515,750✔
1385
    if (NULL == pOneReq) {
35,515,750✔
1386
      releaseTscObj(connKey->tscRid);
×
1387
      continue;
×
1388
    }
1389

1390
    switch (connKey->connType) {
35,515,750✔
1391
      case CONN_TYPE__QUERY:
35,515,750✔
1392
      case CONN_TYPE__TMQ: {
1393
        if (param.clusterId == 0) {
35,515,750✔
1394
          // init
1395
          param.clusterId = pOneReq->clusterId;
28,624,225✔
1396
          param.pAppHbMgr = pAppHbMgr;
28,624,225✔
1397
          param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
28,624,225✔
1398
        }
1399
        break;
35,515,750✔
1400
      }
1401
      default:
×
1402
        break;
×
1403
    }
1404
    if (clientHbMgr.reqHandle[connKey->connType]) {
35,515,750✔
1405
      int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, &param, pOneReq);
35,515,750✔
1406
      if (code) {
35,515,750✔
1407
        tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
539✔
1408
                connKey->tscRid, connKey->connType);
1409
      }
1410
    }
1411

1412
    int64_t ver = atomic_load_64(&pTscObj->whiteListInfo.ver);
35,515,750✔
1413
    maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
35,515,750✔
1414
    releaseTscObj(connKey->tscRid);
35,515,750✔
1415
  }
1416
  (*pBatchReq)->ipWhiteListVer = maxIpWhiteVer;
29,218,695✔
1417

1418
  return TSDB_CODE_SUCCESS;
29,218,695✔
1419
}
1420

1421
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
×
1422

1423
void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
3,954,949✔
1424
  dst->numOfInsertsReq += src->numOfInsertsReq;
3,954,949✔
1425
  dst->numOfInsertRows += src->numOfInsertRows;
3,954,949✔
1426
  dst->insertElapsedTime += src->insertElapsedTime;
3,954,949✔
1427
  dst->insertBytes += src->insertBytes;
3,954,949✔
1428
  dst->fetchBytes += src->fetchBytes;
3,954,949✔
1429
  dst->queryElapsedTime += src->queryElapsedTime;
3,954,949✔
1430
  dst->numOfSlowQueries += src->numOfSlowQueries;
3,954,949✔
1431
  dst->totalRequests += src->totalRequests;
3,954,949✔
1432
  dst->currentRequests += src->currentRequests;
3,954,949✔
1433
}
3,954,949✔
1434

1435
int32_t hbGatherAppInfo(void) {
28,345,609✔
1436
  SAppHbReq req = {0};
28,345,609✔
1437
  int32_t   code = TSDB_CODE_SUCCESS;
28,345,609✔
1438
  int       sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
28,345,609✔
1439
  if (sz > 0) {
28,345,609✔
1440
    req.pid = taosGetPId();
28,345,609✔
1441
    req.appId = clientHbMgr.appId;
28,345,609✔
1442
    TSC_ERR_RET(taosGetAppName(req.name, NULL));
28,345,609✔
1443
  }
1444

1445
  taosHashClear(clientHbMgr.appSummary);
28,345,609✔
1446

1447
  for (int32_t i = 0; i < sz; ++i) {
60,665,124✔
1448
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
32,319,515✔
1449
    if (pAppHbMgr == NULL) continue;
32,319,515✔
1450

1451
    int64_t    clusterId = pAppHbMgr->pAppInstInfo->clusterId;
32,319,515✔
1452
    SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
32,319,515✔
1453
    if (NULL == pApp) {
32,319,515✔
1454
      (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
28,364,566✔
1455
      req.startTime = pAppHbMgr->startTime;
28,364,566✔
1456
      TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)));
28,364,566✔
1457
    } else {
1458
      if (pAppHbMgr->startTime < pApp->startTime) {
3,954,949✔
1459
        pApp->startTime = pAppHbMgr->startTime;
×
1460
      }
1461

1462
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
3,954,949✔
1463
    }
1464
  }
1465

1466
  return TSDB_CODE_SUCCESS;
28,345,609✔
1467
}
1468

1469
static void *hbThreadFunc(void *param) {
1,447,914✔
1470
  setThreadName("hb");
1,447,914✔
1471
#ifdef WINDOWS
1472
  if (taosCheckCurrentInDll()) {
1473
    atexit(hbThreadFuncUnexpectedStopped);
1474
  }
1475
#endif
1476
  while (1) {
27,456,811✔
1477
    if (1 == clientHbMgr.threadStop) {
28,904,725✔
1478
      break;
552,743✔
1479
    }
1480

1481
    if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) {
28,351,982✔
1482
      tscError("taosThreadMutexLock failed");
×
1483
      return NULL;
×
1484
    }
1485

1486
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
28,351,982✔
1487
    if (sz > 0) {
28,351,982✔
1488
      if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) {
28,345,609✔
1489
        tscError("hbGatherAppInfo failed");
×
1490
        return NULL;
×
1491
      }
1492
      if (sz > 1 && !clientHbMgr.appHbHash) {
28,345,609✔
1493
        clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
50,312✔
1494
        if (NULL == clientHbMgr.appHbHash) {
50,312✔
1495
          tscError("taosHashInit failed");
×
1496
          return NULL;
×
1497
        }
1498
      }
1499
      taosHashClear(clientHbMgr.appHbHash);
28,345,609✔
1500
    }
1501

1502
    for (int i = 0; i < sz; i++) {
60,671,497✔
1503
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
32,319,515✔
1504
      if (pAppHbMgr == NULL) {
32,319,515✔
1505
        continue;
30,870✔
1506
      }
1507

1508
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
32,319,515✔
1509
      if (connCnt == 0) {
32,319,515✔
1510
        continue;
3,100,820✔
1511
      }
1512
      SClientHbBatchReq *pReq = NULL;
29,218,695✔
1513
      int32_t            code = hbGatherAllInfo(pAppHbMgr, &pReq);
29,218,695✔
1514
      if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
29,218,695✔
1515
        terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
×
1516
        tFreeClientHbBatchReq(pReq);
×
1517
        continue;
×
1518
      }
1519
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
29,218,695✔
1520
      if (tlen == -1) {
29,218,695✔
1521
        tFreeClientHbBatchReq(pReq);
×
1522
        break;
×
1523
      }
1524
      void *buf = taosMemoryMalloc(tlen);
29,218,695✔
1525
      if (buf == NULL) {
29,218,695✔
1526
        tFreeClientHbBatchReq(pReq);
×
1527
        // hbClearReqInfo(pAppHbMgr);
1528
        break;
×
1529
      }
1530

1531
      if (tSerializeSClientHbBatchReq(buf, tlen, pReq) == -1) {
29,218,695✔
1532
        tFreeClientHbBatchReq(pReq);
×
1533
        taosMemoryFree(buf);
×
1534
        break;
×
1535
      }
1536
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
29,218,695✔
1537

1538
      if (pInfo == NULL) {
29,218,695✔
1539
        tFreeClientHbBatchReq(pReq);
×
1540
        // hbClearReqInfo(pAppHbMgr);
1541
        taosMemoryFree(buf);
×
1542
        break;
×
1543
      }
1544
      pInfo->fp = hbAsyncCallBack;
29,218,695✔
1545
      pInfo->msgInfo.pData = buf;
29,218,695✔
1546
      pInfo->msgInfo.len = tlen;
29,218,695✔
1547
      pInfo->msgType = TDMT_MND_HEARTBEAT;
29,218,695✔
1548
      pInfo->param = taosMemoryMalloc(sizeof(int32_t));
29,218,695✔
1549
      if (pInfo->param  == NULL) {
29,218,695✔
1550
        tFreeClientHbBatchReq(pReq);
×
1551
        // hbClearReqInfo(pAppHbMgr);
1552
        taosMemoryFree(buf);
×
1553
        taosMemoryFree(pInfo);
×
1554
        break;
×
1555
      }
1556
      *(int32_t *)pInfo->param = i;
29,218,695✔
1557
      pInfo->paramFreeFp = taosAutoMemoryFree;
29,218,695✔
1558
      pInfo->requestId = generateRequestId();
29,218,695✔
1559
      pInfo->requestObjRefId = 0;
29,218,695✔
1560

1561
      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
29,218,695✔
1562
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
29,218,695✔
1563
      if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
29,218,695✔
1564
        tscWarn("failed to async send msg to server");
×
1565
      }
1566
      tFreeClientHbBatchReq(pReq);
29,218,695✔
1567
      // hbClearReqInfo(pAppHbMgr);
1568
      (void)atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
29,218,695✔
1569
    }
1570

1571
    if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) {
28,351,982✔
1572
      tscError("taosThreadMutexLock failed");
×
1573
      return NULL;
×
1574
    }
1575
    taosMsleep(HEARTBEAT_INTERVAL);
28,351,982✔
1576
  }
1577
  taosHashCleanup(clientHbMgr.appHbHash);
552,743✔
1578
  return NULL;
552,743✔
1579
}
1580

1581
static int32_t hbCreateThread() {
1,447,914✔
1582
  int32_t      code = TSDB_CODE_SUCCESS;
1,447,914✔
1583
  TdThreadAttr thAttr;
1,433,716✔
1584
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
1,447,914✔
1585
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
1,447,914✔
1586
#ifdef TD_COMPACT_OS
1587
  TSC_ERR_JRET(taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL));
1588
#endif
1589

1590
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
1,447,914✔
1591
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1592
    TSC_ERR_RET(terrno);
×
1593
  }
1594
  (void)taosThreadAttrDestroy(&thAttr);
1,447,914✔
1595
_return:
1,447,914✔
1596

1597
  if (code) {
1,447,914✔
1598
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1599
    TSC_ERR_RET(terrno);
×
1600
  }
1601

1602
  return code;
1,447,914✔
1603
}
1604

1605
static void hbStopThread() {
1,449,552✔
1606
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
1,449,552✔
1607
    return;
1,638✔
1608
  }
1609
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
1,447,914✔
1610
    tscDebug("hb thread already stopped");
×
1611
    return;
×
1612
  }
1613

1614
  int32_t code = TSDB_CODE_SUCCESS;
1,447,914✔
1615
  // thread quit mode kill or inner exit from self-thread
1616
  if (clientHbMgr.quitByKill) {
1,447,914✔
1617
    code = taosThreadKill(clientHbMgr.thread, 0);
1,002,437✔
1618
    if (TSDB_CODE_SUCCESS != code) {
1,002,437✔
1619
      tscError("taosThreadKill failed since %s", tstrerror(code));
×
1620
    }
1621
  } else {
1622
    code = taosThreadJoin(clientHbMgr.thread, NULL);
445,477✔
1623
    if (TSDB_CODE_SUCCESS != code) {
445,477✔
1624
      tscError("taosThreadJoin failed since %s", tstrerror(code));
×
1625
    }
1626
  }
1627

1628
  tscDebug("hb thread stopped");
1,447,914✔
1629
}
1630

1631
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) {
1,525,229✔
1632
  int32_t code = TSDB_CODE_SUCCESS;
1,525,229✔
1633
  TSC_ERR_RET(hbMgrInit());
1,525,229✔
1634
  *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
1,525,229✔
1635
  if (*pAppHbMgr == NULL) {
1,525,229✔
1636
    TSC_ERR_JRET(terrno);
×
1637
  }
1638
  // init stat
1639
  (*pAppHbMgr)->startTime = taosGetTimestampMs();
3,033,113✔
1640
  (*pAppHbMgr)->connKeyCnt = 0;
1,525,229✔
1641
  (*pAppHbMgr)->connHbFlag = 0;
1,525,229✔
1642
  (*pAppHbMgr)->reportCnt = 0;
1,525,229✔
1643
  (*pAppHbMgr)->reportBytes = 0;
1,525,229✔
1644
  (*pAppHbMgr)->key = taosStrdup(key);
1,525,229✔
1645
  if ((*pAppHbMgr)->key == NULL) {
1,525,229✔
1646
    TSC_ERR_JRET(terrno);
×
1647
  }
1648

1649
  // init app info
1650
  (*pAppHbMgr)->pAppInstInfo = pAppInstInfo;
1,525,229✔
1651

1652
  // init hash info
1653
  (*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1,525,229✔
1654

1655
  if ((*pAppHbMgr)->activeInfo == NULL) {
1,525,229✔
1656
    TSC_ERR_JRET(terrno);
×
1657
  }
1658

1659
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
1660

1661
  TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
1,525,229✔
1662
  if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
3,050,458✔
1663
    code = terrno;
×
1664
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
1665
    goto _return;
×
1666
  }
1667
  (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
1,525,229✔
1668
  TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
1,525,229✔
1669

1670
  return TSDB_CODE_SUCCESS;
1,525,229✔
1671
_return:
×
1672
  taosMemoryFree(*pAppHbMgr);
×
1673
  return code;
×
1674
}
1675

1676
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
1,525,229✔
1677
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
1,525,229✔
1678
  while (pIter != NULL) {
1,716,209✔
1679
    SClientHbReq *pOneReq = pIter;
190,980✔
1680
    tFreeClientHbReq(pOneReq);
1681
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
190,980✔
1682
  }
1683
  taosHashCleanup(pTarget->activeInfo);
1,525,229✔
1684
  pTarget->activeInfo = NULL;
1,525,229✔
1685

1686
  taosMemoryFree(pTarget->key);
1,525,229✔
1687
  taosMemoryFree(pTarget);
1,525,229✔
1688
}
1,525,229✔
1689

1690
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
1,525,229✔
1691
  int32_t code = TSDB_CODE_SUCCESS;
1,525,229✔
1692
  code = taosThreadMutexLock(&clientHbMgr.lock);
1,525,229✔
1693
  if (TSDB_CODE_SUCCESS != code) {
1,525,229✔
1694
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1695
  }
1696
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,525,229✔
1697
  for (int32_t i = 0; i < mgrSize; ++i) {
1,525,229✔
1698
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
×
1699
    if (pItem == *pAppHbMgr) {
×
1700
      hbFreeAppHbMgr(*pAppHbMgr);
×
1701
      *pAppHbMgr = NULL;
×
1702
      taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
×
1703
      break;
×
1704
    }
1705
  }
1706
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,525,229✔
1707
  if (TSDB_CODE_SUCCESS != code) {
1,525,229✔
1708
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1709
  }
1710
}
1,525,229✔
1711

1712
void appHbMgrCleanup(void) {
1,447,914✔
1713
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,447,914✔
1714
  for (int i = 0; i < sz; i++) {
2,973,143✔
1715
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
1,525,229✔
1716
    if (pTarget == NULL) continue;
1,525,229✔
1717
    hbFreeAppHbMgr(pTarget);
1,525,229✔
1718
  }
1719
}
1,447,914✔
1720

1721
int32_t hbMgrInit() {
1,525,229✔
1722
  // init once
1723
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
1,525,229✔
1724
  if (old == 1) return 0;
1,525,229✔
1725

1726
  clientHbMgr.appId = tGenIdPI64();
1,447,914✔
1727
  tscInfo("app initialized, appId:0x%" PRIx64, clientHbMgr.appId);
1,447,914✔
1728

1729
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,447,914✔
1730
  if (NULL == clientHbMgr.appSummary) {
1,447,914✔
1731
    uError("hbMgrInit:taosHashInit error") return terrno;
×
1732
  }
1733
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
1,447,914✔
1734
  if (NULL == clientHbMgr.appHbMgrs) {
1,447,914✔
1735
    uError("hbMgrInit:taosArrayInit error") return terrno;
×
1736
  }
1737
  TdThreadMutexAttr attr = {0};
1,447,914✔
1738

1739
  int ret = taosThreadMutexAttrInit(&attr);
1,447,914✔
1740
  if (ret != 0) {
1,447,914✔
1741
    uError("hbMgrInit:taosThreadMutexAttrInit error") return ret;
×
1742
  }
1743

1744
  ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
1,447,914✔
1745
  if (ret != 0) {
1,447,914✔
1746
    uError("hbMgrInit:taosThreadMutexAttrSetType error") return ret;
×
1747
  }
1748

1749
  ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
1,447,914✔
1750
  if (ret != 0) {
1,447,914✔
1751
    uError("hbMgrInit:taosThreadMutexInit error") return ret;
×
1752
  }
1753

1754
  ret = taosThreadMutexAttrDestroy(&attr);
1,447,914✔
1755
  if (ret != 0) {
1,447,914✔
1756
    uError("hbMgrInit:taosThreadMutexAttrDestroy error") return ret;
×
1757
  }
1758

1759
  // init handle funcs
1760
  hbMgrInitHandle();
1761

1762
  // init backgroud thread
1763
  ret = hbCreateThread();
1,447,914✔
1764
  if (ret != 0) {
1,447,914✔
1765
    uError("hbMgrInit:hbCreateThread error") return ret;
×
1766
  }
1767

1768
  return 0;
1,447,914✔
1769
}
1770

1771
void hbMgrCleanUp() {
1,449,552✔
1772
  hbStopThread();
1,449,552✔
1773

1774
  // destroy all appHbMgr
1775
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
1,449,552✔
1776
  if (old == 0) return;
1,449,552✔
1777

1778
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
1,447,914✔
1779
  if (TSDB_CODE_SUCCESS != code) {
1,447,914✔
1780
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1781
  }
1782
  appHbMgrCleanup();
1,447,914✔
1783
  taosArrayDestroy(clientHbMgr.appHbMgrs);
1,447,914✔
1784
  clientHbMgr.appHbMgrs = NULL;
1,447,914✔
1785
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,447,914✔
1786
  if (TSDB_CODE_SUCCESS != code) {
1,447,914✔
1787
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1788
  }
1789
}
1790

1791
int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, const char* user, const char* tokenName, int64_t clusterId) {
84,742,034✔
1792
  // init hash in activeinfo
1793
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
84,742,034✔
1794
  if (data != NULL) {
84,742,034✔
1795
    return 0;
×
1796
  }
1797
  SClientHbReq hbReq = {0};
84,742,034✔
1798
  hbReq.connKey = connKey;
84,742,034✔
1799
  hbReq.clusterId = clusterId;
84,742,034✔
1800
  tstrncpy(hbReq.user, user, sizeof(hbReq.user));
84,742,034✔
1801
  tstrncpy(hbReq.tokenName, tokenName, sizeof(hbReq.tokenName));
84,742,034✔
1802
  // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1803

1804
  TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)));
84,742,034✔
1805

1806
  (void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
84,742,034✔
1807
  return 0;
84,742,034✔
1808
}
1809

1810
int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, const char* user, const char* tokenName, int64_t clusterId, int8_t connType) {
84,742,034✔
1811
  SClientHbKey connKey = {
84,742,034✔
1812
      .tscRid = tscRefId,
1813
      .connType = connType,
1814
  };
1815

1816
  switch (connType) {
84,742,034✔
1817
    case CONN_TYPE__QUERY:
84,742,034✔
1818
    case CONN_TYPE__TMQ: {
1819
      return hbRegisterConnImpl(pAppHbMgr, connKey, user, tokenName, clusterId);
84,742,034✔
1820
    }
1821
    default:
×
1822
      return 0;
×
1823
  }
1824
}
1825

1826
void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
84,560,758✔
1827
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
84,560,758✔
1828
  if (TSDB_CODE_SUCCESS != code) {
84,563,764✔
1829
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1830
  }
1831
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
84,563,764✔
1832
  if (pAppHbMgr) {
84,563,764✔
1833
    SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
84,563,764✔
1834
    if (pReq) {
84,563,764✔
1835
      tFreeClientHbReq(pReq);
1836
      code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
84,551,054✔
1837
      if (TSDB_CODE_SUCCESS != code) {
84,551,054✔
1838
        tscError("hbDeregisterConn taosHashRemove error, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1839
      }
1840
      taosHashRelease(pAppHbMgr->activeInfo, pReq);
84,551,054✔
1841
      (void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
84,551,054✔
1842
    }
1843
  }
1844
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
84,563,764✔
1845
  if (TSDB_CODE_SUCCESS != code) {
84,563,764✔
1846
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1847
  }
1848
}
84,563,764✔
1849

1850
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
1851
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
1,003,436✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc