• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5010

29 Mar 2026 04:32AM UTC coverage: 72.292% (+0.03%) from 72.26%
#5010

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253774 of 351039 relevant lines covered (72.29%)

133420324.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.24
/source/client/src/clientHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "scheduler.h"
22
#include "tglobal.h"
23
#include "trpc.h"
24

25
typedef struct {
26
  union {
27
    struct {
28
      SAppHbMgr *pAppHbMgr;
29
      int64_t    clusterId;
30
      int32_t    reqCnt;
31
      int8_t     connHbFlag;
32
    };
33
  };
34
} SHbParam;
35

36
SClientHbMgr clientHbMgr = {0};
37

38
static int32_t hbCreateThread();
39
static void    hbStopThread();
40
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg);
41
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
42

43
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
20,720,504✔
44
                                        SAppHbMgr *pAppHbMgr) {
45
  int32_t code = TSDB_CODE_SUCCESS;
20,720,504✔
46

47
  SUserAuthBatchRsp batchRsp = {0};
20,720,504✔
48
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
20,720,504✔
49
    TSC_ERR_JRET(TSDB_CODE_INVALID_MSG);
×
50
  }
51

52
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
20,720,504✔
53
  for (int32_t i = 0; i < numOfBatchs; ++i) {
41,855,702✔
54
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
21,135,198✔
55
    if (NULL == rsp) {
21,135,198✔
56
      code = terrno;
×
57
      goto _return;
×
58
    }
59
    tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
21,135,198✔
60

61
    TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp));
21,135,198✔
62

63
  }
64

65
  if (numOfBatchs > 0) {
20,720,504✔
66
    TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp));
20,720,504✔
67
  }
68

69
  (void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
20,720,504✔
70

71
_return:
20,720,504✔
72
  tFreeSUserAuthBatchRsp(&batchRsp);
20,720,504✔
73
  return code;
20,720,504✔
74
}
75

76
static int32_t hbUpdateUserSessMertric(const char *user, SUserSessCfg *pCfg) {
28,451,818✔
77
  int32_t code = 0;
28,451,818✔
78
  int32_t lino = 0;
28,451,818✔
79
  if (user == NULL || pCfg == NULL) {
28,451,818✔
80
    return code;
×
81
  }
82

83
  SUserSessCfg cfg = {0, 0, 0, 0, 0};
28,451,818✔
84

85
  if (memcmp(pCfg, &cfg, sizeof(SUserSessCfg)) == 0) {
28,451,818✔
86
    return TSDB_CODE_SUCCESS;
×
87
  }
88
  tscInfo(
28,451,818✔
89
      "update session metric for user:%s, sessPerUser:%d, sessConnTime:%d, sessConnIdleTime:%d, sessMaxConcurrency:%d, "
90
      "sessMaxCallVnodeNum:%d",
91
      user, pCfg->sessPerUser, pCfg->sessConnTime, pCfg->sessConnIdleTime, pCfg->sessMaxConcurrency,
92
      pCfg->sessMaxCallVnodeNum);
93

94
  if (pCfg->sessPerUser != 0) {
28,451,818✔
95
    code = sessMgtUpdataLimit((char *)user, SESSION_PER_USER, pCfg->sessPerUser);
28,451,818✔
96
    TAOS_CHECK_GOTO(code, &lino, _error);
28,451,818✔
97
  }
98

99
  if (pCfg->sessConnTime != 0) {
28,451,818✔
100
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_TIME, pCfg->sessConnTime);
28,451,818✔
101
    TAOS_CHECK_GOTO(code, &lino, _error);
28,451,818✔
102
  }
103

104
  if (pCfg->sessConnIdleTime != 0) {
28,451,818✔
105
    code = sessMgtUpdataLimit((char *)user, SESSION_CONN_IDLE_TIME, pCfg->sessConnIdleTime);
28,451,818✔
106
    TAOS_CHECK_GOTO(code, &lino, _error);
28,451,818✔
107
  }
108

109
  if (pCfg->sessMaxConcurrency != 0) {
28,451,818✔
110
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CONCURRENCY, pCfg->sessMaxConcurrency);
28,451,818✔
111
    TAOS_CHECK_GOTO(code, &lino, _error);
28,451,818✔
112
  }
113

114
  if (pCfg->sessMaxCallVnodeNum != 0) {
28,451,818✔
115
    code = sessMgtUpdataLimit((char *)user, SESSION_MAX_CALL_VNODE_NUM, pCfg->sessMaxCallVnodeNum);
28,451,818✔
116
    TAOS_CHECK_GOTO(code, &lino, _error);
28,451,818✔
117
  }
118
_error:
28,451,818✔
119
  return code;
28,451,818✔
120
}
121
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
20,720,504✔
122
  int32_t code = 0;
20,720,504✔
123
  int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
20,720,504✔
124
  for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
45,325,501✔
125
    SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
24,604,997✔
126
    if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
24,604,997✔
127
      continue;
86,671✔
128
    }
129

130
    SClientHbReq    *pReq = NULL;
24,518,326✔
131
    SGetUserAuthRsp *pRsp = NULL;
24,518,326✔
132
    while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
53,860,295✔
133
      STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
29,400,397✔
134
      if (!pTscObj) {
29,400,397✔
135
        continue;
887,327✔
136
      }
137

138
      if (!pRsp) {
28,513,070✔
139
        for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
22,856,921✔
140
          SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
22,798,493✔
141
          if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
22,798,493✔
142
            pRsp = rsp;
22,384,244✔
143
            break;
22,384,244✔
144
          }
145
        }
146
        if (!pRsp) {
22,442,672✔
147
          releaseTscObj(pReq->connKey.tscRid);
58,428✔
148
          taosHashCancelIterate(hbMgr->activeInfo, pReq);
58,428✔
149
          break;
58,428✔
150
        }
151
      }
152

153
      if (pRsp->dropped == 1) {
28,454,642✔
154
        if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
2,420✔
155
          if (pTscObj->userDroppedInfo.fp) {
1,783✔
156
            SPassInfo *dropInfo = &pTscObj->userDroppedInfo;
1,000✔
157
            if (dropInfo->fp) {
1,000✔
158
              (*dropInfo->fp)(dropInfo->param, NULL, TAOS_NOTIFY_USER_DROPPED);
1,000✔
159
            }
160
          }
161
        }
162
        releaseTscObj(pReq->connKey.tscRid);
2,420✔
163
        continue;
2,420✔
164
      }
165

166
      // update token status
167
      if (pTscObj->tokenName[0] != 0) {
28,452,222✔
168
        STokenStatus *status = NULL;
404✔
169
        if (pRsp->tokens != NULL) {
404✔
170
          status = taosHashGet(pRsp->tokens, pTscObj->tokenName, strlen(pTscObj->tokenName) + 1);
×
171
        }
172

173
        STokenEvent event = { 0 };
404✔
174
        tstrncpy(event.tokenName, pTscObj->tokenName, sizeof(event.tokenName));
404✔
175
        if (status == NULL) {
404✔
176
          event.type = TSDB_TOKEN_EVENT_DROPPED;
404✔
177
        } else if (status->enabled == 0) {
×
178
          event.type = TSDB_TOKEN_EVENT_DISABLED;
×
179
        } else if (status->expireTime > 0 && status->expireTime < taosGetTimestampSec()) {
×
180
          event.type = TSDB_TOKEN_EVENT_EXPIRED;
×
181
        } else {
182
          event.type = TSDB_TOKEN_EVENT_MODIFIED;
×
183
          event.expireTime = status->expireTime;
×
184
        }
185

186
        STokenNotifyInfo *tni = &pTscObj->tokenNotifyInfo;
404✔
187
        if (event.type == TSDB_TOKEN_EVENT_MODIFIED) {
404✔
188
          int32_t oldExpireTime;
189
          do {
190
            oldExpireTime = atomic_load_32(&pTscObj->tokenExpireTime);
×
191
            if (oldExpireTime == status->expireTime) {
×
192
              break;
×
193
            }
194
          } while (atomic_val_compare_exchange_32(&pTscObj->tokenExpireTime, oldExpireTime, status->expireTime) != oldExpireTime);
×
195

196
          if (oldExpireTime != status->expireTime && tni->fp) {
×
197
            (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
198
          }
199
          tscDebug("update token %s of user %s: expire time from %d to %d, conn:%" PRIi64, pTscObj->tokenName,
×
200
                   pRsp->user, pTscObj->tokenExpireTime, status->expireTime, pTscObj->id);
201
        } else {
202
          if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
404✔
203
            if (tni->fp) {
404✔
204
              (*tni->fp)(tni->param, &event, TAOS_NOTIFY_TOKEN);
×
205
            }
206
          }
207

208
          releaseTscObj(pReq->connKey.tscRid);
404✔
209
          continue;
404✔
210
        }
211
      }
212

213
      pTscObj->authVer = pRsp->version;
28,451,818✔
214
      if (hbUpdateUserSessMertric(pTscObj->user, &pRsp->sessCfg) != 0) {
28,451,818✔
215
        tscError("failed to update user session metric, user:%s", pTscObj->user);
×
216
      }
217

218
      if (pTscObj->sysInfo != pRsp->sysInfo) {
28,451,818✔
219
        tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", conn:%" PRIi64, pRsp->user,
2,173✔
220
                 pTscObj->sysInfo, pRsp->sysInfo, pTscObj->id);
221
        pTscObj->sysInfo = pRsp->sysInfo;
2,173✔
222
      }
223

224
      // update password version
225
      if (pTscObj->passInfo.fp) {
28,451,818✔
226
        SPassInfo *passInfo = &pTscObj->passInfo;
400✔
227
        int32_t    oldVer = 0;
400✔
228
        do {
229
          oldVer = atomic_load_32(&passInfo->ver);
400✔
230
          if (oldVer >= pRsp->passVer) {
400✔
231
            break;
×
232
          }
233
        } while (atomic_val_compare_exchange_32(&passInfo->ver, oldVer, pRsp->passVer) != oldVer);
400✔
234
        if (oldVer < pRsp->passVer) {
400✔
235
          (*passInfo->fp)(passInfo->param, &pRsp->passVer, TAOS_NOTIFY_PASSVER);
400✔
236
          tscDebug("update passVer of user %s from %d to %d, conn:%" PRIi64, pRsp->user, oldVer,
400✔
237
                   atomic_load_32(&passInfo->ver), pTscObj->id);
238
        }
239
      }
240

241
      // update ip white list version
242
      {
243
        SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
28,451,818✔
244
        int64_t oldVer = 0, newVer = pRsp->whiteListVer;
28,451,818✔
245
        do {
246
          oldVer = atomic_load_64(&whiteListInfo->ver);
28,451,818✔
247
          if (oldVer >= newVer) {
28,451,818✔
248
            break;
28,441,697✔
249
          }
250
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
10,121✔
251

252
        if (oldVer < newVer) {
28,451,818✔
253
          if (whiteListInfo->fp) {
10,121✔
254
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_WHITELIST_VER);
×
255
          }
256
          tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
10,121✔
257
                   oldVer, newVer, pTscObj->id);
258
        }
259
      }
260

261
      // update date time whitelist version
262
      {
263
        SWhiteListInfo *whiteListInfo = &pTscObj->dateTimeWhiteListInfo;
28,451,818✔
264

265
        int64_t oldVer = 0, newVer = pRsp->timeWhiteListVer;
28,451,818✔
266
        do {
267
          oldVer = atomic_load_64(&whiteListInfo->ver);
28,451,818✔
268
          if (oldVer >= newVer) {
28,451,818✔
269
            break;
28,451,818✔
270
          }
271
        } while (atomic_val_compare_exchange_64(&whiteListInfo->ver, oldVer, newVer) != oldVer);
×
272

273
        if (oldVer < newVer) {
28,451,818✔
274
          if (whiteListInfo->fp) {
×
275
            (*whiteListInfo->fp)(whiteListInfo->param, &newVer, TAOS_NOTIFY_DATETIME_WHITELIST_VER);
×
276
          }
277
          tscDebug("update date time whitelist version of user %s from %" PRId64 " to %" PRId64 ", conn:%" PRIi64, pRsp->user,
×
278
                   oldVer, newVer, pTscObj->id);
279
        }
280
      }
281
      
282
      releaseTscObj(pReq->connKey.tscRid);
28,451,818✔
283
    }
284
  }
285
  return 0;
20,720,504✔
286
}
287

288
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
1,526,567✔
289
  int32_t    code = 0;
1,526,567✔
290
  SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
1,526,567✔
291
  if (NULL == vgInfo) {
1,526,567✔
292
    return terrno;
×
293
  }
294

295
  vgInfo->vgVersion = rsp->vgVersion;
1,526,567✔
296
  vgInfo->stateTs = rsp->stateTs;
1,526,567✔
297
  vgInfo->flags = rsp->flags;
1,526,567✔
298
  vgInfo->hashMethod = rsp->hashMethod;
1,526,567✔
299
  vgInfo->hashPrefix = rsp->hashPrefix;
1,526,567✔
300
  vgInfo->hashSuffix = rsp->hashSuffix;
1,526,567✔
301
  vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,526,567✔
302
  if (NULL == vgInfo->vgHash) {
1,526,567✔
303
    tscError("hash init[%d] failed", rsp->vgNum);
×
304
    code = terrno;
×
305
    goto _return;
×
306
  }
307

308
  for (int32_t j = 0; j < rsp->vgNum; ++j) {
5,308,038✔
309
    SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
3,781,471✔
310
    if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
3,781,471✔
311
      tscError("hash push failed, terrno:%d", terrno);
×
312
      code = terrno;
×
313
      goto _return;
×
314
    }
315
  }
316

317
_return:
1,526,567✔
318
  if (code) {
1,526,567✔
319
    taosHashCleanup(vgInfo->vgHash);
×
320
    taosMemoryFreeClear(vgInfo);
×
321
  }
322

323
  *pInfo = vgInfo;
1,526,567✔
324
  return code;
1,526,567✔
325
}
326

327
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
7,009,375✔
328
  int32_t code = 0;
7,009,375✔
329

330
  SDbHbBatchRsp batchRsp = {0};
7,009,375✔
331
  if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
7,009,375✔
332
    terrno = TSDB_CODE_INVALID_MSG;
×
333
    code = terrno;
×
334
    goto _return;
×
335
  }
336

337
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
7,009,375✔
338
  for (int32_t i = 0; i < numOfBatchs; ++i) {
8,643,010✔
339
    SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
1,633,635✔
340
    if (NULL == rsp) {
1,633,635✔
341
      code = terrno;
×
342
      goto _return;
×
343
    }
344
    if (rsp->useDbRsp) {
1,633,635✔
345
      tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
1,261,571✔
346
               rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
347

348
      if (rsp->useDbRsp->vgVersion < 0) {
1,261,571✔
349
        tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
42,450✔
350
        code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
42,450✔
351
      } else {
352
        SDBVgInfo *vgInfo = NULL;
1,219,121✔
353
        code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
1,219,121✔
354
        if (TSDB_CODE_SUCCESS != code) {
1,219,121✔
355
          goto _return;
×
356
        }
357

358
        tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
1,219,121✔
359

360
        TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo));
1,219,121✔
361

362
        if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
1,219,121✔
363
          code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
307,446✔
364
          if (TSDB_CODE_SUCCESS != code) {
307,446✔
365
            goto _return;
×
366
          }
367

368
          TSC_ERR_JRET(catalogUpdateDBVgInfo(
307,446✔
369
              pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
370
              rsp->useDbRsp->uid, vgInfo));
371
        }
372
      }
373
    }
374

375
    if (rsp->cfgRsp) {
1,633,635✔
376
      tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
55,419✔
377
      code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
55,419✔
378
      rsp->cfgRsp = NULL;
55,419✔
379
    }
380
    if (rsp->pTsmaRsp) {
1,633,635✔
381
      if (rsp->pTsmaRsp->pTsmas) {
847,575✔
382
        for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
9,100✔
383
          STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
5,880✔
384
          if (NULL == pTsma) {
5,880✔
385
            TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
×
386
          }
387
          TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
5,880✔
388
        }
389
        taosArrayClear(rsp->pTsmaRsp->pTsmas);
3,220✔
390
      } else {
391
        TSC_ERR_JRET(catalogAsyncUpdateDbTsmaVersion(pCatalog, rsp->dbTsmaVersion, rsp->db, rsp->dbId));
844,355✔
392
      }
393
    }
394
  }
395

396
_return:
7,009,375✔
397

398
  tFreeSDbHbBatchRsp(&batchRsp);
7,009,375✔
399
  return code;
7,009,375✔
400
}
401

402
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
4,664,467✔
403
  int32_t code = TSDB_CODE_SUCCESS;
4,664,467✔
404

405
  SSTbHbRsp hbRsp = {0};
4,664,467✔
406
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
4,664,467✔
407
    terrno = TSDB_CODE_INVALID_MSG;
×
408
    return -1;
×
409
  }
410

411
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
4,664,467✔
412
  for (int32_t i = 0; i < numOfMeta; ++i) {
4,708,337✔
413
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
43,870✔
414
    if (NULL == rsp) {
43,870✔
415
      code = terrno;
×
416
      goto _return;
×
417
    }
418
    if (rsp->numOfColumns < 0) {
43,870✔
419
      tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
26,955✔
420
      TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid));
26,955✔
421
    } else {
422
      tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
16,915✔
423
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
16,915✔
424
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
×
425
        tFreeSSTbHbRsp(&hbRsp);
×
426
        return TSDB_CODE_TSC_INVALID_VALUE;
×
427
      }
428

429
      TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp));
16,915✔
430
    }
431
  }
432

433
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
4,664,467✔
434
  for (int32_t i = 0; i < numOfIndex; ++i) {
4,664,467✔
435
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
×
436
    if (NULL == rsp) {
×
437
      code = terrno;
×
438
      goto _return;
×
439
    }
440
    TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp));
×
441
  }
442

443
_return:
4,664,467✔
444
  taosArrayDestroy(hbRsp.pIndexRsp);
4,664,467✔
445
  hbRsp.pIndexRsp = NULL;
4,664,467✔
446

447
  tFreeSSTbHbRsp(&hbRsp);
4,664,467✔
448
  return code;
4,664,467✔
449
}
450

451
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
5,602✔
452
  return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value);
5,602✔
453
}
454

455
static void hbFreeSViewMetaInRsp(void *p) {
×
456
  if (NULL == p || NULL == *(void **)p) {
×
457
    return;
×
458
  }
459
  SViewMetaRsp *pRsp = *(SViewMetaRsp **)p;
×
460
  tFreeSViewMetaRsp(pRsp);
×
461
  taosMemoryFreeClear(pRsp);
×
462
}
463

464
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
5,602✔
465
  int32_t code = TSDB_CODE_SUCCESS;
5,602✔
466

467
  SViewHbRsp hbRsp = {0};
5,602✔
468
  if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
5,602✔
469
    taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
×
470
    terrno = TSDB_CODE_INVALID_MSG;
×
471
    return -1;
×
472
  }
473

474
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
5,602✔
475
  for (int32_t i = 0; i < numOfMeta; ++i) {
6,378✔
476
    SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
776✔
477
    if (NULL == rsp) {
776✔
478
      code = terrno;
×
479
      goto _return;
×
480
    }
481
    if (rsp->numOfCols < 0) {
776✔
482
      tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
776✔
483
      code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
776✔
484
      tFreeSViewMetaRsp(rsp);
776✔
485
      taosMemoryFreeClear(rsp);
776✔
486
    } else {
487
      tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
×
488
      code = catalogUpdateViewMeta(pCatalog, rsp);
×
489
    }
490
    TSC_ERR_JRET(code);
776✔
491
  }
492

493
_return:
5,602✔
494
  taosArrayDestroy(hbRsp.pViewRsp);
5,602✔
495
  return code;
5,602✔
496
}
497

498
static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
6,020✔
499
  int32_t code = 0;
6,020✔
500

501
  STSMAHbRsp hbRsp = {0};
6,020✔
502
  if (tDeserializeTSMAHbRsp(value, valueLen, &hbRsp)) {
6,020✔
503
    terrno = TSDB_CODE_INVALID_MSG;
×
504
    return -1;
×
505
  }
506

507
  int32_t numOfTsma = taosArrayGetSize(hbRsp.pTsmas);
6,020✔
508
  for (int32_t i = 0; i < numOfTsma; ++i) {
6,020✔
509
    STableTSMAInfo *pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i);
×
510

511
    if (!pTsmaInfo->pFuncs) {
×
512
      tscDebug("hb to remove tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
513
      code = catalogRemoveTSMA(pCatalog, pTsmaInfo);
×
514
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
515
    } else {
516
      tscDebug("hb to update tsma:%s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
×
517
      code = catalogUpdateTSMA(pCatalog, &pTsmaInfo);
×
518
      tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
519
    }
520
    TSC_ERR_JRET(code);
×
521
  }
522

523
_return:
6,020✔
524
  taosArrayDestroy(hbRsp.pTsmas);
6,020✔
525
  return code;
6,020✔
526
}
527

528
static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
20,720,710✔
529
  for (int32_t i = 0; i < kvNum; ++i) {
53,132,280✔
530
    SKv *kv = taosArrayGet(pKvs, i);
32,411,570✔
531
    if (NULL == kv) {
32,411,570✔
532
      tscError("invalid hb kv, idx:%d", i);
×
533
      continue;
×
534
    }
535
    switch (kv->key) {
32,411,570✔
536
      case HEARTBEAT_KEY_USER_AUTHINFO: {
20,720,504✔
537
        if (kv->valueLen <= 0 || NULL == kv->value) {
20,720,504✔
538
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
×
539
          break;
×
540
        }
541
        if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) {
20,720,504✔
542
          tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
543
          break;
×
544
        }
545
        break;
20,720,504✔
546
      }
547
      case HEARTBEAT_KEY_DBINFO: {
7,009,375✔
548
        if (kv->valueLen <= 0 || NULL == kv->value) {
7,009,375✔
549
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
×
550
          break;
×
551
        }
552
        if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) {
7,009,375✔
553
          tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
554
          break;
×
555
        }
556
        break;
7,009,375✔
557
      }
558
      case HEARTBEAT_KEY_STBINFO: {
4,664,467✔
559
        if (kv->valueLen <= 0 || NULL == kv->value) {
4,664,467✔
560
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
×
561
          break;
×
562
        }
563
        if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) {
4,664,467✔
564
          tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value);
×
565
          break;
×
566
        }
567
        break;
4,664,467✔
568
      }
569
#ifdef TD_ENTERPRISE
570
      case HEARTBEAT_KEY_DYN_VIEW: {
5,602✔
571
        if (kv->valueLen <= 0 || NULL == kv->value) {
5,602✔
572
          tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
×
573
          break;
×
574
        }
575
        if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) {
5,602✔
576
          tscError("Process dyn view response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
577
          break;
×
578
        }
579
        break;
5,602✔
580
      }
581
      case HEARTBEAT_KEY_VIEWINFO: {
5,602✔
582
        if (kv->valueLen <= 0 || NULL == kv->value) {
5,602✔
583
          tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
×
584
          break;
×
585
        }
586
        if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) {
5,602✔
587
          tscError("Process view info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
588
          break;
×
589
        }
590
        break;
5,602✔
591
      }
592
#endif
593
      case HEARTBEAT_KEY_TSMA: {
6,020✔
594
        if (kv->valueLen <= 0 || !kv->value) {
6,020✔
595
          tscError("Invalid tsma info, len:%d, value:%p", kv->valueLen, kv->value);
×
596
        }
597
        if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) {
6,020✔
598
          tscError("Process tsma info response failed, len:%d, value:%p", kv->valueLen, kv->value);
×
599
        }
600
        break;
6,020✔
601
      }
602
      default:
×
603
        tscError("invalid hb key type:%d", kv->key);
×
604
        break;
×
605
    }
606
  }
607
}
20,720,710✔
608

609
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
27,322,191✔
610
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
27,322,191✔
611
  if (NULL == pReq) {
27,322,191✔
612
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
102,036✔
613
            pRsp->connKey.connType);
614
    return TSDB_CODE_SUCCESS;
102,036✔
615
  }
616

617
  if (pRsp->query) {
27,220,155✔
618
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
27,220,155✔
619
    if (NULL == pTscObj) {
27,220,155✔
620
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
15,009✔
621
    } else {
622
      if (pRsp->query->totalDnodes > 1) {
27,205,146✔
623
        SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6,624,750✔
624
        if (!isEpsetEqual(&originEpset, &pRsp->query->epSet)) {
6,624,750✔
625
          SEpSet *pOrig = &originEpset;
31,189✔
626
          SEp    *pOrigEp = &pOrig->eps[pOrig->inUse];
31,189✔
627
          SEp    *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
31,189✔
628
          tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
31,189✔
629
                   pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
630
                   pNewEp->port);
631

632
          updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
31,189✔
633
        }
634
      }
635

636
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
27,205,146✔
637
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
27,205,146✔
638
      pTscObj->connId = pRsp->query->connId;
27,205,146✔
639
      tscTrace("connId:%u, hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
27,205,146✔
640
               pTscObj->pAppInfo->totalDnodes);
641

642
      if (pRsp->query->killRid) {
27,205,146✔
643
        tscDebug("QID:0x%" PRIx64 ", need to be killed now", pRsp->query->killRid);
21✔
644
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
21✔
645
        if (NULL == pRequest) {
21✔
646
          tscDebug("QID:0x%" PRIx64 ", not exist to kill", pRsp->query->killRid);
×
647
        } else {
648
          taos_stop_query((TAOS_RES *)pRequest);
21✔
649
          (void)releaseRequest(pRsp->query->killRid);
21✔
650
        }
651
      }
652

653
      if (pRsp->query->killConnection) {
27,205,146✔
654
        taos_close_internal(pTscObj);
×
655
      }
656

657
      if (pRsp->query->pQnodeList) {
27,205,146✔
658
        if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) {
232,368✔
659
          tscWarn("update qnode list failed");
×
660
        }
661
      }
662

663
      releaseTscObj(pRsp->connKey.tscRid);
27,205,146✔
664
    }
665
  }
666

667
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
27,220,155✔
668

669
  tscDebug("hb got %d rsp kv", kvNum);
27,220,155✔
670

671
  if (kvNum > 0) {
27,220,155✔
672
    struct SCatalog *pCatalog = NULL;
20,720,710✔
673
    int32_t          code = catalogGetHandle(pReq->clusterId, &pCatalog);
20,720,710✔
674
    if (code != TSDB_CODE_SUCCESS) {
20,720,710✔
675
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
×
676
    } else {
677
      hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr);
20,720,710✔
678
    }
679
  }
680

681
  taosHashRelease(pAppHbMgr->activeInfo, pReq);
27,220,155✔
682

683
  return TSDB_CODE_SUCCESS;
27,220,155✔
684
}
685

686
/* Choose execution phase and phase start time for desc.
687
 * Rules:
688
 *  - If both timestamps are invalid (<= 0), keep the request's stored values.
689
 *  - If only one timestamp is valid, use the valid one.
690
 *  - If both are valid, prefer the job's phase/time when jobPhaseTime >= request's phaseStartTime;
691
 *    otherwise prefer the request's record.
692
 *
693
 * Note: If the local system clock was moved backwards (time rollback), jobPhaseTime may appear
694
 * earlier than the request's phase start time even when the job record is actually newer.
695
 * This logic avoids reporting a regressed start time in that situation.
696
 */
697
static void hbSetRequestPhase(SRequestObj *pRequest, SQueryDesc *desc) {
15,912,937✔
698
  int32_t jobPhase = QUERY_PHASE_NONE;
15,912,937✔
699
  int64_t jobPhaseTime = 0;
15,912,937✔
700
  int32_t phaseCode = schedulerGetJobPhase(pRequest->body.queryJob, &jobPhase, &jobPhaseTime);
15,912,937✔
701
  if (phaseCode != TSDB_CODE_SUCCESS) {
15,912,937✔
702
    tscWarn("get job phase failed, code:%d", phaseCode);
5,371,172✔
703
    desc->execPhase = CLIENT_GET_REQUEST_PHASE(pRequest);
5,371,172✔
704
    desc->phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
5,371,172✔
705
    return;
5,371,172✔
706
  }
707

708
  tscDebug("get job phase success, jobPhase:%d, jobPhaseTime:%" PRId64, jobPhase, jobPhaseTime);
10,541,765✔
709
  int64_t phaseStartTime = CLIENT_GET_REQUEST_PHASE_START_TIME(pRequest);
10,541,765✔
710
  int32_t phaseStatus = CLIENT_GET_REQUEST_PHASE(pRequest);
10,541,765✔
711

712
  if (jobPhaseTime <= 0 && phaseStartTime <= 0) {
10,541,765✔
713
    /* No valid time available, keep original behavior */
714
    desc->phaseStartTime = phaseStartTime;
×
715
    desc->execPhase = phaseStatus;
×
716
  } else if (jobPhaseTime <= 0) {
10,541,765✔
717
    desc->phaseStartTime = phaseStartTime;
2,010✔
718
    desc->execPhase = phaseStatus;
2,010✔
719
  } else if (phaseStartTime <= 0) {
10,539,755✔
720
    desc->phaseStartTime = jobPhaseTime;
17,759✔
721
    desc->execPhase = jobPhase;
17,759✔
722
  } else if (jobPhaseTime >= phaseStartTime) {
10,521,996✔
723
    /* Job record is newer (or equal, prefer job) */
724
    desc->phaseStartTime = jobPhaseTime;
8,909,370✔
725
    desc->execPhase = jobPhase;
8,909,370✔
726
  } else {
727
    /* Request record is newer */
728
    desc->phaseStartTime = phaseStartTime;
1,612,626✔
729
    desc->execPhase = phaseStatus;
1,612,626✔
730
  }
731
}
732

733
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
22,730,374✔
734
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
22,730,374✔
735
    goto _return;
663✔
736
  }
737

738
  static int32_t    emptyRspNum = 0;
739
  int32_t           idx = *(int32_t *)param;
22,729,711✔
740
  SClientHbBatchRsp pRsp = {0};
22,730,014✔
741
  if (TSDB_CODE_SUCCESS == code) {
22,729,203✔
742
    code = tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
22,487,332✔
743
    if (TSDB_CODE_SUCCESS != code) {
22,486,957✔
744
      tscError("deserialize hb rsp failed");
75✔
745
    }
746
    int32_t now = taosGetTimestampSec();
22,486,957✔
747
    int32_t delta = abs(now - pRsp.svrTimestamp);
22,483,758✔
748
    if (delta > tsTimestampDeltaLimit) {
22,483,758✔
749
      code = TSDB_CODE_TIME_UNSYNCED;
75✔
750
      tscError("time diff:%ds is too big", delta);
75✔
751
    }
752
  }
753

754
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
22,725,629✔
755

756
  (void)taosThreadMutexLock(&clientHbMgr.lock);
22,726,274✔
757

758
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
22,730,014✔
759
  if (pAppHbMgr == NULL) {
22,730,014✔
760
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
761
    tscError("appHbMgr not exist, idx:%d", idx);
×
762
    taosMemoryFree(pMsg->pData);
×
763
    taosMemoryFree(pMsg->pEpSet);
×
764
    tFreeClientHbBatchRsp(&pRsp);
765
    return TSDB_CODE_OUT_OF_RANGE;
×
766
  }
767

768
  SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
22,730,014✔
769

770
  if (code != 0) {
22,730,014✔
771
    pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
241,946✔
772
    tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
241,946✔
773
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
241,946✔
774
    taosMemoryFree(pMsg->pData);
241,946✔
775
    taosMemoryFree(pMsg->pEpSet);
241,946✔
776
    tFreeClientHbBatchRsp(&pRsp);
777
    return code;
241,946✔
778
  }
779

780
  pInst->serverCfg.monitorParas = pRsp.monitorParas;
22,488,068✔
781
  pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
22,488,068✔
782
  pInst->serverCfg.enableAuditSelect = pRsp.enableAuditSelect;
22,488,068✔
783
  pInst->serverCfg.enableAuditInsert = pRsp.enableAuditInsert;
22,488,068✔
784
  pInst->serverCfg.auditLevel = pRsp.auditLevel;
22,488,068✔
785
  pInst->serverCfg.enableStrongPass = pRsp.enableStrongPass;
22,488,068✔
786
  tsEnableStrongPassword = pInst->serverCfg.enableStrongPass;
22,488,068✔
787
  tscDebug("monitor paras from hb, clusterId:0x%" PRIx64 ", threshold:%d scope:%d", pInst->clusterId,
22,488,068✔
788
           pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
789

790
  if (rspNum) {
22,488,068✔
791
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
22,029,674✔
792
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
793
  } else {
794
    (void)atomic_add_fetch_32(&emptyRspNum, 1);
458,394✔
795
  }
796

797
  for (int32_t i = 0; i < rspNum; ++i) {
49,810,259✔
798
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
27,322,191✔
799
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
27,322,191✔
800
    if (code) {
27,322,191✔
801
      break;
×
802
    }
803
  }
804

805
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
22,488,068✔
806

807
  tFreeClientHbBatchRsp(&pRsp);
808

809
_return:
22,488,731✔
810
  taosMemoryFree(pMsg->pData);
22,488,731✔
811
  taosMemoryFree(pMsg->pEpSet);
22,488,731✔
812
  return code;
22,488,731✔
813
}
814

815
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
24,559,684✔
816
  int64_t    now = taosGetTimestampUs();
24,559,684✔
817
  SQueryDesc desc = {0};
24,559,684✔
818
  int32_t    code = 0;
24,559,684✔
819

820
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
24,559,684✔
821
  while (pIter != NULL) {
49,757,901✔
822
    int64_t     *rid = pIter;
25,198,217✔
823
    SRequestObj *pRequest = acquireRequest(*rid);
25,198,217✔
824
    if (NULL == pRequest) {
25,198,217✔
825
      pIter = taosHashIterate(pObj->pRequests, pIter);
13,209✔
826
      continue;
13,209✔
827
    }
828

829
    if (pRequest->killed || 0 == pRequest->body.queryJob) {
25,185,008✔
830
      (void)releaseRequest(*rid);
9,272,071✔
831
      pIter = taosHashIterate(pObj->pRequests, pIter);
9,272,071✔
832
      continue;
9,272,071✔
833
    }
834

835
    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
15,912,937✔
836
    desc.stime = pRequest->metric.start / 1000;
15,912,937✔
837
    desc.queryId = pRequest->requestId;
15,912,937✔
838
    desc.useconds = now - pRequest->metric.start;
15,912,937✔
839
    desc.reqRid = pRequest->self;
15,912,937✔
840
    desc.stableQuery = pRequest->stableQuery;
15,912,937✔
841
    desc.isSubQuery = pRequest->isSubReq;
15,912,937✔
842
    code = taosGetFqdn(desc.fqdn);
15,912,937✔
843
    if (TSDB_CODE_SUCCESS != code) {
15,912,937✔
844
      (void)releaseRequest(*rid);
×
845
      tscError("get fqdn failed");
×
846
      return TSDB_CODE_FAILED;
×
847
    }
848
    desc.subPlanNum = pRequest->body.subplanNum;
15,912,937✔
849

850
    /* Determine and set the execution phase and its start time for this request. */
851
    hbSetRequestPhase(pRequest, &desc);
15,912,937✔
852

853
    if (desc.subPlanNum) {
15,912,937✔
854
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
15,912,937✔
855
      if (NULL == desc.subDesc) {
15,912,937✔
856
        (void)releaseRequest(*rid);
×
857
        return terrno;
×
858
      }
859

860
      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
15,912,937✔
861
      if (code) {
15,912,937✔
862
        taosArrayDestroy(desc.subDesc);
5,408,899✔
863
        desc.subDesc = NULL;
5,408,899✔
864
        code = TSDB_CODE_SUCCESS;
5,408,899✔
865
      }
866
      desc.subPlanNum = taosArrayGetSize(desc.subDesc);
15,912,937✔
867
    } else {
868
      desc.subDesc = NULL;
×
869
    }
870

871
    (void)releaseRequest(*rid);
15,912,937✔
872
    if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) {
31,825,874✔
873
      taosArrayDestroy(desc.subDesc);
×
874
      return terrno;
×
875
    }
876

877
    pIter = taosHashIterate(pObj->pRequests, pIter);
15,912,937✔
878
  }
879

880
  return code;
24,559,684✔
881
}
882

883
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
27,674,054✔
884
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
27,674,054✔
885
  if (NULL == pTscObj) {
27,674,054✔
886
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
×
887
    return terrno;
×
888
  }
889

890
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
27,674,054✔
891
  if (NULL == hbBasic) {
27,674,054✔
892
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
×
893
    releaseTscObj(connKey->tscRid);
×
894
    return terrno;
×
895
  }
896

897
  hbBasic->connId = pTscObj->connId;
27,674,054✔
898

899
  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
27,674,054✔
900
  if (numOfQueries <= 0) {
27,674,054✔
901
    req->query = hbBasic;
3,114,370✔
902
    releaseTscObj(connKey->tscRid);
3,114,370✔
903
    tscDebug("no queries on connection");
3,114,370✔
904
    return TSDB_CODE_SUCCESS;
3,114,370✔
905
  }
906

907
  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
24,559,684✔
908
  if (NULL == hbBasic->queryDesc) {
24,559,684✔
909
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
×
910
    releaseTscObj(connKey->tscRid);
×
911
    taosMemoryFree(hbBasic);
×
912
    return terrno;
×
913
  }
914

915
  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
24,559,684✔
916
  if (code) {
24,559,684✔
917
    releaseTscObj(connKey->tscRid);
×
918
    if (hbBasic->queryDesc) {
×
919
      taosArrayDestroyEx(hbBasic->queryDesc, tFreeClientHbQueryDesc);
×
920
    }
921
    taosMemoryFree(hbBasic);
×
922
    return code;
×
923
  }
924

925
  req->query = hbBasic;
24,559,684✔
926
  releaseTscObj(connKey->tscRid);
24,559,684✔
927

928
  return TSDB_CODE_SUCCESS;
24,559,684✔
929
}
930

931
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
944,315✔
932
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
944,315✔
933
  if (!pTscObj) {
944,315✔
934
    tscWarn("tscObj rid 0x%" PRIx64 " not exist", connKey->tscRid);
×
935
    return terrno;
×
936
  }
937

938
  int32_t code = 0;
944,315✔
939

940
  SKv  kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
944,315✔
941
  SKv *pKv = NULL;
944,315✔
942
  if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
944,315✔
943
    int32_t           userNum = pKv->valueLen / sizeof(SUserAuthVersion);
547,127✔
944
    SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
547,127✔
945
    for (int32_t i = 0; i < userNum; ++i) {
554,523✔
946
      SUserAuthVersion *pUserAuth = userAuths + i;
554,345✔
947
      // both key and user exist, update version
948
      if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
554,345✔
949
        pUserAuth->version = htonl(-1);  // force get userAuthInfo
546,949✔
950
        goto _return;
546,949✔
951
      }
952
    }
953
    // key exists, user not exist, append user
954
    SUserAuthVersion *qUserAuth =
356✔
955
        (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
178✔
956
    if (qUserAuth) {
178✔
957
      tstrncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
178✔
958
      (qUserAuth + userNum)->version = htonl(-1);  // force get userAuthInfo
178✔
959
      pKv->value = qUserAuth;
178✔
960
      pKv->valueLen += sizeof(SUserAuthVersion);
178✔
961
    } else {
962
      code = terrno;
×
963
    }
964
    goto _return;
178✔
965
  }
966

967
  // key/user not exist, add user
968
  SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
397,188✔
969
  if (!user) {
397,188✔
970
    code = terrno;
×
971
    goto _return;
×
972
  }
973
  tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN);
397,188✔
974
  user->version = htonl(-1);  // force get userAuthInfo
397,188✔
975
  kv.valueLen = sizeof(SUserAuthVersion);
397,188✔
976
  kv.value = user;
397,188✔
977

978
  tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
397,188✔
979
           pTscObj->authVer, connKey->tscRid);
980

981
  if (!req->info) {
397,188✔
982
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
397,188✔
983
    if (NULL == req->info) {
397,188✔
984
      code = terrno;
×
985
      goto _return;
×
986
    }
987
  }
988

989
  if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) {
397,188✔
990
    taosMemoryFree(user);
×
991
    code = terrno ? terrno : TSDB_CODE_APP_ERROR;
×
992
    goto _return;
×
993
  }
994

995
_return:
926,950✔
996
  releaseTscObj(connKey->tscRid);
944,315✔
997
  if (code) {
944,315✔
998
    tscError("hb got user auth info failed since %s", tstrerror(code));
×
999
  }
1000

1001
  return code;
944,315✔
1002
}
1003

1004
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
20,751,627✔
1005
  SUserAuthVersion *users = NULL;
20,751,627✔
1006
  uint32_t          userNum = 0;
20,751,627✔
1007
  int32_t           code = 0;
20,751,627✔
1008

1009
  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
20,751,627✔
1010
  if (TSDB_CODE_SUCCESS != code) {
20,751,627✔
1011
    return code;
×
1012
  }
1013

1014
  if (userNum <= 0) {
20,751,627✔
1015
    taosMemoryFree(users);
131,872✔
1016
    return TSDB_CODE_SUCCESS;
131,872✔
1017
  }
1018

1019
  for (int32_t i = 0; i < userNum; ++i) {
41,703,151✔
1020
    SUserAuthVersion *user = &users[i];
21,083,396✔
1021
    user->version = htonl(user->version);
21,083,396✔
1022
  }
1023

1024
  SKv kv = {
20,619,755✔
1025
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
1026
      .valueLen = sizeof(SUserAuthVersion) * userNum,
20,619,755✔
1027
      .value = users,
1028
  };
1029

1030
  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
20,619,755✔
1031

1032
  if (NULL == req->info) {
20,619,755✔
1033
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
20,619,755✔
1034
    if (NULL == req->info) {
20,619,755✔
1035
      taosMemoryFree(users);
×
1036
      return terrno;
×
1037
    }
1038
  }
1039

1040
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
20,619,755✔
1041
  if (TSDB_CODE_SUCCESS != code) {
20,619,755✔
1042
    taosMemoryFree(users);
×
1043
    return code;
×
1044
  }
1045

1046
  return TSDB_CODE_SUCCESS;
20,619,755✔
1047
}
1048

1049
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
22,303,427✔
1050
  SDbCacheInfo *dbs = NULL;
22,303,427✔
1051
  uint32_t      dbNum = 0;
22,303,427✔
1052
  int32_t       code = 0;
22,303,427✔
1053

1054
  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
22,303,427✔
1055
  if (TSDB_CODE_SUCCESS != code) {
22,303,427✔
1056
    return code;
×
1057
  }
1058

1059
  if (dbNum <= 0) {
22,303,427✔
1060
    taosMemoryFree(dbs);
15,244,398✔
1061
    return TSDB_CODE_SUCCESS;
15,244,398✔
1062
  }
1063

1064
  for (int32_t i = 0; i < dbNum; ++i) {
15,560,661✔
1065
    SDbCacheInfo *db = &dbs[i];
8,501,632✔
1066
    tscDebug("the %dth expired db:%s, dbId:%" PRId64
8,501,632✔
1067
             ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
1068
             i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
1069

1070
    db->dbId = htobe64(db->dbId);
8,501,632✔
1071
    db->vgVersion = htonl(db->vgVersion);
8,501,632✔
1072
    db->cfgVersion = htonl(db->cfgVersion);
8,501,632✔
1073
    db->numOfTable = htonl(db->numOfTable);
8,501,632✔
1074
    db->stateTs = htobe64(db->stateTs);
8,501,632✔
1075
    db->tsmaVersion = htonl(db->tsmaVersion);
8,501,632✔
1076
  }
1077

1078
  SKv kv = {
7,059,029✔
1079
      .key = HEARTBEAT_KEY_DBINFO,
1080
      .valueLen = sizeof(SDbCacheInfo) * dbNum,
7,059,029✔
1081
      .value = dbs,
1082
  };
1083

1084
  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
7,059,029✔
1085

1086
  if (NULL == req->info) {
7,059,029✔
1087
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
174✔
1088
    if (NULL == req->info) {
174✔
1089
      taosMemoryFree(dbs);
×
1090
      return terrno;
×
1091
    }
1092
  }
1093

1094
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
7,059,029✔
1095
  if (TSDB_CODE_SUCCESS != code) {
7,059,029✔
1096
    taosMemoryFree(dbs);
×
1097
    return code;
×
1098
  }
1099

1100
  return TSDB_CODE_SUCCESS;
7,059,029✔
1101
}
1102

1103
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
22,303,427✔
1104
  SSTableVersion *stbs = NULL;
22,303,427✔
1105
  uint32_t        stbNum = 0;
22,303,427✔
1106
  int32_t         code = 0;
22,303,427✔
1107

1108
  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
22,303,427✔
1109
  if (TSDB_CODE_SUCCESS != code) {
22,303,427✔
1110
    return code;
×
1111
  }
1112

1113
  if (stbNum <= 0) {
22,303,427✔
1114
    taosMemoryFree(stbs);
17,623,178✔
1115
    return TSDB_CODE_SUCCESS;
17,623,178✔
1116
  }
1117

1118
  for (int32_t i = 0; i < stbNum; ++i) {
11,286,611✔
1119
    SSTableVersion *stb = &stbs[i];
6,606,362✔
1120
    stb->suid = htobe64(stb->suid);
6,606,362✔
1121
    stb->sversion = htonl(stb->sversion);
6,606,362✔
1122
    stb->tversion = htonl(stb->tversion);
6,606,362✔
1123
    stb->smaVer = htonl(stb->smaVer);
6,606,362✔
1124
  }
1125

1126
  SKv kv = {
4,680,249✔
1127
      .key = HEARTBEAT_KEY_STBINFO,
1128
      .valueLen = sizeof(SSTableVersion) * stbNum,
4,680,249✔
1129
      .value = stbs,
1130
  };
1131

1132
  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
4,680,249✔
1133

1134
  if (NULL == req->info) {
4,680,249✔
1135
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
32✔
1136
    if (NULL == req->info) {
32✔
1137
      taosMemoryFree(stbs);
×
1138
      return terrno;
×
1139
    }
1140
  }
1141

1142
  code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
4,680,249✔
1143
  if (TSDB_CODE_SUCCESS != code) {
4,680,249✔
1144
    taosMemoryFree(stbs);
×
1145
    return code;
×
1146
  }
1147

1148
  return TSDB_CODE_SUCCESS;
4,680,249✔
1149
}
1150

1151
int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
22,303,427✔
1152
  SViewVersion    *views = NULL;
22,303,427✔
1153
  uint32_t         viewNum = 0;
22,303,427✔
1154
  int32_t          code = 0;
22,303,427✔
1155
  SDynViewVersion *pDynViewVer = NULL;
22,303,427✔
1156

1157
  TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer));
22,303,427✔
1158

1159
  if (viewNum <= 0) {
22,303,427✔
1160
    taosMemoryFree(views);
22,295,155✔
1161
    taosMemoryFree(pDynViewVer);
22,295,155✔
1162
    return TSDB_CODE_SUCCESS;
22,295,155✔
1163
  }
1164

1165
  for (int32_t i = 0; i < viewNum; ++i) {
16,544✔
1166
    SViewVersion *view = &views[i];
8,272✔
1167
    view->dbId = htobe64(view->dbId);
8,272✔
1168
    view->viewId = htobe64(view->viewId);
8,272✔
1169
    view->version = htonl(view->version);
8,272✔
1170
  }
1171

1172
  tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
8,272✔
1173

1174
  if (NULL == req->info) {
8,272✔
1175
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1176
    if (NULL == req->info) {
×
1177
      TSC_ERR_JRET(terrno);
×
1178
    }
1179
  }
1180

1181
  SKv kv = {
8,272✔
1182
      .key = HEARTBEAT_KEY_DYN_VIEW,
1183
      .valueLen = sizeof(SDynViewVersion),
1184
      .value = pDynViewVer,
1185
  };
1186

1187
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
8,272✔
1188

1189
  kv.key = HEARTBEAT_KEY_VIEWINFO;
8,272✔
1190
  kv.valueLen = sizeof(SViewVersion) * viewNum;
8,272✔
1191
  kv.value = views;
8,272✔
1192

1193
  TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
8,272✔
1194
  return TSDB_CODE_SUCCESS;
8,272✔
1195
_return:
×
1196
  taosMemoryFree(views);
×
1197
  taosMemoryFree(pDynViewVer);
×
1198
  return code;
×
1199
}
1200

1201
int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) {
22,303,427✔
1202
  int32_t       code = 0;
22,303,427✔
1203
  uint32_t      tsmaNum = 0;
22,303,427✔
1204
  STSMAVersion *tsmas = NULL;
22,303,427✔
1205

1206
  code = catalogGetExpiredTsmas(pCatalog, &tsmas, &tsmaNum);
22,303,427✔
1207
  if (code) {
22,303,427✔
1208
    taosMemoryFree(tsmas);
×
1209
    return code;
×
1210
  }
1211

1212
  if (tsmaNum <= 0) {
22,303,427✔
1213
    taosMemoryFree(tsmas);
22,297,407✔
1214
    return TSDB_CODE_SUCCESS;
22,297,407✔
1215
  }
1216

1217
  for (int32_t i = 0; i < tsmaNum; ++i) {
14,700✔
1218
    STSMAVersion *tsma = &tsmas[i];
8,680✔
1219
    tsma->dbId = htobe64(tsma->dbId);
8,680✔
1220
    tsma->tsmaId = htobe64(tsma->tsmaId);
8,680✔
1221
    tsma->version = htonl(tsma->version);
8,680✔
1222
  }
1223

1224
  tscDebug("hb got %d expred tsmas, valueLen:%lu", tsmaNum, sizeof(STSMAVersion) * tsmaNum);
6,020✔
1225

1226
  if (!pReq->info) {
6,020✔
1227
    pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
×
1228
    if (!pReq->info) {
×
1229
      taosMemoryFree(tsmas);
×
1230
      return terrno;
×
1231
    }
1232
  }
1233

1234
  SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas};
6,020✔
1235
  code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
6,020✔
1236
  if (TSDB_CODE_SUCCESS != code) {
6,020✔
1237
    taosMemoryFree(tsmas);
×
1238
    return code;
×
1239
  }
1240
  return TSDB_CODE_SUCCESS;
6,020✔
1241
}
1242

1243
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
27,674,054✔
1244
  SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
27,674,054✔
1245
  if (NULL != pApp) {
27,674,054✔
1246
    (void)memcpy(&req->app, pApp, sizeof(*pApp));
27,674,054✔
1247
  } else {
1248
    (void)memset(&req->app.summary, 0, sizeof(req->app.summary));
×
1249
    req->app.pid = taosGetPId();
×
1250
    req->app.appId = clientHbMgr.appId;
×
1251
    TSC_ERR_RET(taosGetAppName(req->app.name, NULL));
×
1252
  }
1253

1254
  return TSDB_CODE_SUCCESS;
27,674,054✔
1255
}
1256

1257
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
27,674,054✔
1258
  int32_t   code = 0;
27,674,054✔
1259
  SHbParam *hbParam = (SHbParam *)param;
27,674,054✔
1260
  SCatalog *pCatalog = NULL;
27,674,054✔
1261

1262
  code = hbGetQueryBasicInfo(connKey, req);
27,674,054✔
1263
  if (code != TSDB_CODE_SUCCESS) {
27,674,054✔
1264
    tscWarn("hbGetQueryBasicInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1265
    return code;
×
1266
  }
1267

1268
  if (hbParam->reqCnt == 0) {
27,674,054✔
1269
    code = catalogGetHandle(hbParam->clusterId, &pCatalog);
22,303,427✔
1270
    if (code != TSDB_CODE_SUCCESS) {
22,303,427✔
1271
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1272
      return code;
×
1273
    }
1274

1275
    code = hbGetAppInfo(hbParam->clusterId, req);
22,303,427✔
1276
    if (TSDB_CODE_SUCCESS != code) {
22,303,427✔
1277
      tscWarn("getAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1278
      return code;
×
1279
    }
1280

1281
    if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
22,303,427✔
1282
      code = hbGetExpiredUserInfo(connKey, pCatalog, req);
20,751,627✔
1283
      if (TSDB_CODE_SUCCESS != code) {
20,751,627✔
1284
        tscWarn("hbGetExpiredUserInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1285
        return code;
×
1286
      }
1287
      if (clientHbMgr.appHbHash) {
20,751,627✔
1288
        code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
2,725,804✔
1289
        if (TSDB_CODE_SUCCESS != code) {
2,725,804✔
1290
          tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId,
×
1291
                  tstrerror(code));
1292
          return code;
×
1293
        }
1294
      }
1295
    }
1296

1297
    // invoke after hbGetExpiredUserInfo
1298
    if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
22,303,427✔
1299
      code = hbGetUserAuthInfo(connKey, hbParam, req);
944,315✔
1300
      if (TSDB_CODE_SUCCESS != code) {
944,315✔
1301
        tscWarn("hbGetUserAuthInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1302
        return code;
×
1303
      }
1304
      atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
944,315✔
1305
    }
1306

1307
    code = hbGetExpiredDBInfo(connKey, pCatalog, req);
22,303,427✔
1308
    if (TSDB_CODE_SUCCESS != code) {
22,303,427✔
1309
      tscWarn("hbGetExpiredDBInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1310
      return code;
×
1311
    }
1312

1313
    code = hbGetExpiredStbInfo(connKey, pCatalog, req);
22,303,427✔
1314
    if (TSDB_CODE_SUCCESS != code) {
22,303,427✔
1315
      tscWarn("hbGetExpiredStbInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1316
      return code;
×
1317
    }
1318

1319
#ifdef TD_ENTERPRISE
1320
    code = hbGetExpiredViewInfo(connKey, pCatalog, req);
22,303,427✔
1321
    if (TSDB_CODE_SUCCESS != code) {
22,303,427✔
1322
      tscWarn("hbGetExpiredViewInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1323
      return code;
×
1324
    }
1325
#endif
1326
    code = hbGetExpiredTSMAInfo(connKey, pCatalog, req);
22,303,427✔
1327
    if (TSDB_CODE_SUCCESS != code) {
22,303,427✔
1328
      tscWarn("hbGetExpiredTSMAInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1329
      return code;
×
1330
    }
1331
  } else {
1332
    code = hbGetAppInfo(hbParam->clusterId, req);
5,370,627✔
1333
    if (TSDB_CODE_SUCCESS != code) {
5,370,627✔
1334
      tscWarn("hbGetAppInfo failed, clusterId:0x%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
×
1335
      return code;
×
1336
    }
1337
  }
1338

1339
  ++hbParam->reqCnt;  // success to get catalog info
27,674,054✔
1340

1341
  return TSDB_CODE_SUCCESS;
27,674,054✔
1342
}
1343

1344
static FORCE_INLINE void hbMgrInitHandle() {
1345
  // init all handle
1346
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
1,385,582✔
1347
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbQueryHbReqHandle;
1,385,582✔
1348

1349
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
1,385,582✔
1350
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbQueryHbRspHandle;
1,385,582✔
1351
}
1,385,582✔
1352

1353
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
22,762,637✔
1354
  *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
22,762,637✔
1355
  if (pBatchReq == NULL) {
22,762,637✔
1356
    return terrno;
×
1357
  }
1358
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
22,762,637✔
1359
  (*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
22,762,637✔
1360
  if (!(*pBatchReq)->reqs) {
22,762,637✔
1361
    tFreeClientHbBatchReq(*pBatchReq);
×
1362
    return terrno;
×
1363
  }
1364

1365
  int64_t  maxIpWhiteVer = 0;
22,762,637✔
1366
  void    *pIter = NULL;
22,762,637✔
1367
  SHbParam param = {0};
22,762,637✔
1368
  while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
51,666,131✔
1369
    SClientHbReq *pOneReq = pIter;
28,903,494✔
1370
    SClientHbKey *connKey = &pOneReq->connKey;
28,903,494✔
1371
    STscObj      *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
28,903,494✔
1372

1373
    if (!pTscObj || atomic_load_8(&pTscObj->dropped) == 1) {
28,903,494✔
1374
      if (pTscObj) releaseTscObj(connKey->tscRid);
1,229,440✔
1375
      continue;
1,229,440✔
1376
    }
1377

1378
    tstrncpy(pOneReq->userApp, pTscObj->optionInfo.userApp, sizeof(pOneReq->userApp));
27,674,054✔
1379
    tstrncpy(pOneReq->cInfo, pTscObj->optionInfo.cInfo, sizeof(pOneReq->cInfo));
27,674,054✔
1380
    pOneReq->userIp = pTscObj->optionInfo.userIp;
27,674,054✔
1381
    pOneReq->userDualIp = pTscObj->optionInfo.userDualIp;
27,674,054✔
1382
    tstrncpy(pOneReq->sVer, td_version, TSDB_VERSION_LEN);
27,674,054✔
1383

1384
    pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
27,674,054✔
1385
    if (NULL == pOneReq) {
27,674,054✔
1386
      releaseTscObj(connKey->tscRid);
×
1387
      continue;
×
1388
    }
1389

1390
    switch (connKey->connType) {
27,674,054✔
1391
      case CONN_TYPE__QUERY:
27,674,054✔
1392
      case CONN_TYPE__TMQ: {
1393
        if (param.clusterId == 0) {
27,674,054✔
1394
          // init
1395
          param.clusterId = pOneReq->clusterId;
22,303,427✔
1396
          param.pAppHbMgr = pAppHbMgr;
22,303,427✔
1397
          param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
22,303,427✔
1398
        }
1399
        break;
27,674,054✔
1400
      }
1401
      default:
×
1402
        break;
×
1403
    }
1404
    if (clientHbMgr.reqHandle[connKey->connType]) {
27,674,054✔
1405
      int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, &param, pOneReq);
27,674,054✔
1406
      if (code) {
27,674,054✔
1407
        tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
×
1408
                connKey->tscRid, connKey->connType);
1409
      }
1410
    }
1411

1412
    int64_t ver = atomic_load_64(&pTscObj->whiteListInfo.ver);
27,674,054✔
1413
    maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
27,674,054✔
1414
    releaseTscObj(connKey->tscRid);
27,674,054✔
1415
  }
1416
  (*pBatchReq)->ipWhiteListVer = maxIpWhiteVer;
22,762,637✔
1417

1418
  return TSDB_CODE_SUCCESS;
22,762,637✔
1419
}
1420

1421
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
×
1422

1423
void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
3,524,814✔
1424
  dst->numOfInsertsReq += src->numOfInsertsReq;
3,524,814✔
1425
  dst->numOfInsertRows += src->numOfInsertRows;
3,524,814✔
1426
  dst->insertElapsedTime += src->insertElapsedTime;
3,524,814✔
1427
  dst->insertBytes += src->insertBytes;
3,524,814✔
1428
  dst->fetchBytes += src->fetchBytes;
3,524,814✔
1429
  dst->queryElapsedTime += src->queryElapsedTime;
3,524,814✔
1430
  dst->numOfSlowQueries += src->numOfSlowQueries;
3,524,814✔
1431
  dst->totalRequests += src->totalRequests;
3,524,814✔
1432
  dst->currentRequests += src->currentRequests;
3,524,814✔
1433
}
3,524,814✔
1434

1435
int32_t hbGatherAppInfo(void) {
22,278,748✔
1436
  SAppHbReq req = {0};
22,278,748✔
1437
  int32_t   code = TSDB_CODE_SUCCESS;
22,278,748✔
1438
  int       sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
22,278,748✔
1439
  if (sz > 0) {
22,278,748✔
1440
    req.pid = taosGetPId();
22,278,748✔
1441
    req.appId = clientHbMgr.appId;
22,278,748✔
1442
    TSC_ERR_RET(taosGetAppName(req.name, NULL));
22,278,748✔
1443
  }
1444

1445
  taosHashClear(clientHbMgr.appSummary);
22,278,748✔
1446

1447
  for (int32_t i = 0; i < sz; ++i) {
48,102,204✔
1448
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
25,823,456✔
1449
    if (pAppHbMgr == NULL) continue;
25,823,456✔
1450

1451
    int64_t    clusterId = pAppHbMgr->pAppInstInfo->clusterId;
25,823,456✔
1452
    SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
25,823,456✔
1453
    if (NULL == pApp) {
25,823,456✔
1454
      (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
22,298,642✔
1455
      req.startTime = pAppHbMgr->startTime;
22,298,642✔
1456
      TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)));
22,298,642✔
1457
    } else {
1458
      if (pAppHbMgr->startTime < pApp->startTime) {
3,524,814✔
1459
        pApp->startTime = pAppHbMgr->startTime;
×
1460
      }
1461

1462
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
3,524,814✔
1463
    }
1464
  }
1465

1466
  return TSDB_CODE_SUCCESS;
22,278,748✔
1467
}
1468

1469
static void *hbThreadFunc(void *param) {
1,385,582✔
1470
  setThreadName("hb");
1,385,582✔
1471
#ifdef WINDOWS
1472
  if (taosCheckCurrentInDll()) {
1473
    atexit(hbThreadFuncUnexpectedStopped);
1474
  }
1475
#endif
1476
  while (1) {
21,371,481✔
1477
    if (1 == clientHbMgr.threadStop) {
22,757,063✔
1478
      break;
476,088✔
1479
    }
1480

1481
    if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) {
22,280,975✔
1482
      tscError("taosThreadMutexLock failed");
×
1483
      return NULL;
×
1484
    }
1485

1486
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
22,280,975✔
1487
    if (sz > 0) {
22,280,975✔
1488
      if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) {
22,278,748✔
1489
        tscError("hbGatherAppInfo failed");
×
1490
        return NULL;
×
1491
      }
1492
      if (sz > 1 && !clientHbMgr.appHbHash) {
22,278,748✔
1493
        clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
51,754✔
1494
        if (NULL == clientHbMgr.appHbHash) {
51,754✔
1495
          tscError("taosHashInit failed");
×
1496
          return NULL;
×
1497
        }
1498
      }
1499
      taosHashClear(clientHbMgr.appHbHash);
22,278,748✔
1500
    }
1501

1502
    for (int i = 0; i < sz; i++) {
48,104,431✔
1503
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
25,823,456✔
1504
      if (pAppHbMgr == NULL) {
25,823,456✔
1505
        continue;
30,232✔
1506
      }
1507

1508
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
25,823,456✔
1509
      if (connCnt == 0) {
25,823,456✔
1510
        continue;
3,060,819✔
1511
      }
1512
      SClientHbBatchReq *pReq = NULL;
22,762,637✔
1513
      int32_t            code = hbGatherAllInfo(pAppHbMgr, &pReq);
22,762,637✔
1514
      if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
22,762,637✔
1515
        terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
×
1516
        tFreeClientHbBatchReq(pReq);
×
1517
        continue;
×
1518
      }
1519
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
22,762,637✔
1520
      if (tlen == -1) {
22,762,637✔
1521
        tFreeClientHbBatchReq(pReq);
×
1522
        break;
×
1523
      }
1524
      void *buf = taosMemoryMalloc(tlen);
22,762,637✔
1525
      if (buf == NULL) {
22,762,637✔
1526
        tFreeClientHbBatchReq(pReq);
×
1527
        // hbClearReqInfo(pAppHbMgr);
1528
        break;
×
1529
      }
1530

1531
      if (tSerializeSClientHbBatchReq(buf, tlen, pReq) == -1) {
22,762,637✔
1532
        tFreeClientHbBatchReq(pReq);
×
1533
        taosMemoryFree(buf);
×
1534
        break;
×
1535
      }
1536
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
22,762,637✔
1537

1538
      if (pInfo == NULL) {
22,762,637✔
1539
        tFreeClientHbBatchReq(pReq);
×
1540
        // hbClearReqInfo(pAppHbMgr);
1541
        taosMemoryFree(buf);
×
1542
        break;
×
1543
      }
1544
      pInfo->fp = hbAsyncCallBack;
22,762,637✔
1545
      pInfo->msgInfo.pData = buf;
22,762,637✔
1546
      pInfo->msgInfo.len = tlen;
22,762,637✔
1547
      pInfo->msgType = TDMT_MND_HEARTBEAT;
22,762,637✔
1548
      pInfo->param = taosMemoryMalloc(sizeof(int32_t));
22,762,637✔
1549
      if (pInfo->param  == NULL) {
22,762,637✔
1550
        tFreeClientHbBatchReq(pReq);
×
1551
        // hbClearReqInfo(pAppHbMgr);
1552
        taosMemoryFree(buf);
×
1553
        taosMemoryFree(pInfo);
×
1554
        break;
×
1555
      }
1556
      *(int32_t *)pInfo->param = i;
22,762,637✔
1557
      pInfo->paramFreeFp = taosAutoMemoryFree;
22,762,637✔
1558
      pInfo->requestId = generateRequestId();
22,762,637✔
1559
      pInfo->requestObjRefId = 0;
22,762,637✔
1560

1561
      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
22,762,637✔
1562
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
22,762,637✔
1563
      if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
22,762,637✔
1564
        tscWarn("failed to async send msg to server");
×
1565
      }
1566
      tFreeClientHbBatchReq(pReq);
22,762,637✔
1567
      // hbClearReqInfo(pAppHbMgr);
1568
      (void)atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
22,762,637✔
1569
    }
1570

1571
    if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) {
22,280,975✔
1572
      tscError("taosThreadMutexLock failed");
×
1573
      return NULL;
×
1574
    }
1575
    taosMsleep(HEARTBEAT_INTERVAL);
22,280,975✔
1576
  }
1577
  taosHashCleanup(clientHbMgr.appHbHash);
476,088✔
1578
  return NULL;
476,088✔
1579
}
1580

1581
static int32_t hbCreateThread() {
1,385,582✔
1582
  int32_t      code = TSDB_CODE_SUCCESS;
1,385,582✔
1583
  TdThreadAttr thAttr;
1,370,081✔
1584
  TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
1,385,582✔
1585
  TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
1,385,582✔
1586
#ifdef TD_COMPACT_OS
1587
  TSC_ERR_JRET(taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL));
1588
#endif
1589

1590
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
1,385,582✔
1591
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1592
    TSC_ERR_RET(terrno);
×
1593
  }
1594
  (void)taosThreadAttrDestroy(&thAttr);
1,385,582✔
1595
_return:
1,385,582✔
1596

1597
  if (code) {
1,385,582✔
1598
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1599
    TSC_ERR_RET(terrno);
×
1600
  }
1601

1602
  return code;
1,385,582✔
1603
}
1604

1605
static void hbStopThread() {
1,387,484✔
1606
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
1,387,484✔
1607
    return;
1,902✔
1608
  }
1609
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
1,385,582✔
1610
    tscDebug("hb thread already stopped");
×
1611
    return;
×
1612
  }
1613

1614
  int32_t code = TSDB_CODE_SUCCESS;
1,385,582✔
1615
  // thread quit mode kill or inner exit from self-thread
1616
  if (clientHbMgr.quitByKill) {
1,385,582✔
1617
    code = taosThreadKill(clientHbMgr.thread, 0);
928,704✔
1618
    if (TSDB_CODE_SUCCESS != code) {
928,704✔
1619
      tscError("taosThreadKill failed since %s", tstrerror(code));
×
1620
    }
1621
  } else {
1622
    code = taosThreadJoin(clientHbMgr.thread, NULL);
456,878✔
1623
    if (TSDB_CODE_SUCCESS != code) {
456,878✔
1624
      tscError("taosThreadJoin failed since %s", tstrerror(code));
×
1625
    }
1626
  }
1627

1628
  tscDebug("hb thread stopped");
1,385,582✔
1629
}
1630

1631
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) {
1,467,667✔
1632
  int32_t code = TSDB_CODE_SUCCESS;
1,467,667✔
1633
  TSC_ERR_RET(hbMgrInit());
1,467,667✔
1634
  *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
1,467,667✔
1635
  if (*pAppHbMgr == NULL) {
1,467,667✔
1636
    TSC_ERR_JRET(terrno);
×
1637
  }
1638
  // init stat
1639
  (*pAppHbMgr)->startTime = taosGetTimestampMs();
2,916,419✔
1640
  (*pAppHbMgr)->connKeyCnt = 0;
1,467,667✔
1641
  (*pAppHbMgr)->connHbFlag = 0;
1,467,667✔
1642
  (*pAppHbMgr)->reportCnt = 0;
1,467,667✔
1643
  (*pAppHbMgr)->reportBytes = 0;
1,467,667✔
1644
  (*pAppHbMgr)->key = taosStrdup(key);
1,467,667✔
1645
  if ((*pAppHbMgr)->key == NULL) {
1,467,667✔
1646
    TSC_ERR_JRET(terrno);
×
1647
  }
1648

1649
  // init app info
1650
  (*pAppHbMgr)->pAppInstInfo = pAppInstInfo;
1,467,667✔
1651

1652
  // init hash info
1653
  (*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1,467,667✔
1654

1655
  if ((*pAppHbMgr)->activeInfo == NULL) {
1,467,667✔
1656
    TSC_ERR_JRET(terrno);
×
1657
  }
1658

1659
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
1660

1661
  TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
1,467,667✔
1662
  if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
2,935,334✔
1663
    code = terrno;
×
1664
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
1665
    goto _return;
×
1666
  }
1667
  (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
1,467,667✔
1668
  TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
1,467,667✔
1669

1670
  return TSDB_CODE_SUCCESS;
1,467,667✔
1671
_return:
×
1672
  taosMemoryFree(*pAppHbMgr);
×
1673
  return code;
×
1674
}
1675

1676
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
1,467,667✔
1677
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
1,467,667✔
1678
  while (pIter != NULL) {
1,660,314✔
1679
    SClientHbReq *pOneReq = pIter;
192,647✔
1680
    tFreeClientHbReq(pOneReq);
1681
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
192,647✔
1682
  }
1683
  taosHashCleanup(pTarget->activeInfo);
1,467,667✔
1684
  pTarget->activeInfo = NULL;
1,467,667✔
1685

1686
  taosMemoryFree(pTarget->key);
1,467,667✔
1687
  taosMemoryFree(pTarget);
1,467,667✔
1688
}
1,467,667✔
1689

1690
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
1,467,667✔
1691
  int32_t code = TSDB_CODE_SUCCESS;
1,467,667✔
1692
  code = taosThreadMutexLock(&clientHbMgr.lock);
1,467,667✔
1693
  if (TSDB_CODE_SUCCESS != code) {
1,467,667✔
1694
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1695
  }
1696
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,467,667✔
1697
  for (int32_t i = 0; i < mgrSize; ++i) {
1,467,667✔
1698
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
×
1699
    if (pItem == *pAppHbMgr) {
×
1700
      hbFreeAppHbMgr(*pAppHbMgr);
×
1701
      *pAppHbMgr = NULL;
×
1702
      taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
×
1703
      break;
×
1704
    }
1705
  }
1706
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,467,667✔
1707
  if (TSDB_CODE_SUCCESS != code) {
1,467,667✔
1708
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1709
  }
1710
}
1,467,667✔
1711

1712
void appHbMgrCleanup(void) {
1,385,582✔
1713
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
1,385,582✔
1714
  for (int i = 0; i < sz; i++) {
2,853,249✔
1715
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
1,467,667✔
1716
    if (pTarget == NULL) continue;
1,467,667✔
1717
    hbFreeAppHbMgr(pTarget);
1,467,667✔
1718
  }
1719
}
1,385,582✔
1720

1721
int32_t hbMgrInit() {
1,467,667✔
1722
  // init once
1723
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
1,467,667✔
1724
  if (old == 1) return 0;
1,467,667✔
1725

1726
  clientHbMgr.appId = tGenIdPI64();
1,385,582✔
1727
  tscInfo("app initialized, appId:0x%" PRIx64, clientHbMgr.appId);
1,385,582✔
1728

1729
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,385,582✔
1730
  if (NULL == clientHbMgr.appSummary) {
1,385,582✔
1731
    uError("hbMgrInit:taosHashInit error") return terrno;
×
1732
  }
1733
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
1,385,582✔
1734
  if (NULL == clientHbMgr.appHbMgrs) {
1,385,582✔
1735
    uError("hbMgrInit:taosArrayInit error") return terrno;
×
1736
  }
1737
  TdThreadMutexAttr attr = {0};
1,385,582✔
1738

1739
  int ret = taosThreadMutexAttrInit(&attr);
1,385,582✔
1740
  if (ret != 0) {
1,385,582✔
1741
    uError("hbMgrInit:taosThreadMutexAttrInit error") return ret;
×
1742
  }
1743

1744
  ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
1,385,582✔
1745
  if (ret != 0) {
1,385,582✔
1746
    uError("hbMgrInit:taosThreadMutexAttrSetType error") return ret;
×
1747
  }
1748

1749
  ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
1,385,582✔
1750
  if (ret != 0) {
1,385,582✔
1751
    uError("hbMgrInit:taosThreadMutexInit error") return ret;
×
1752
  }
1753

1754
  ret = taosThreadMutexAttrDestroy(&attr);
1,385,582✔
1755
  if (ret != 0) {
1,385,582✔
1756
    uError("hbMgrInit:taosThreadMutexAttrDestroy error") return ret;
×
1757
  }
1758

1759
  // init handle funcs
1760
  hbMgrInitHandle();
1761

1762
  // init backgroud thread
1763
  ret = hbCreateThread();
1,385,582✔
1764
  if (ret != 0) {
1,385,582✔
1765
    uError("hbMgrInit:hbCreateThread error") return ret;
×
1766
  }
1767

1768
  return 0;
1,385,582✔
1769
}
1770

1771
void hbMgrCleanUp() {
1,387,484✔
1772
  hbStopThread();
1,387,484✔
1773

1774
  // destroy all appHbMgr
1775
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
1,387,484✔
1776
  if (old == 0) return;
1,387,484✔
1777

1778
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
1,385,582✔
1779
  if (TSDB_CODE_SUCCESS != code) {
1,385,582✔
1780
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1781
  }
1782
  appHbMgrCleanup();
1,385,582✔
1783
  taosArrayDestroy(clientHbMgr.appHbMgrs);
1,385,582✔
1784
  clientHbMgr.appHbMgrs = NULL;
1,385,582✔
1785
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
1,385,582✔
1786
  if (TSDB_CODE_SUCCESS != code) {
1,385,582✔
1787
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1788
  }
1789
}
1790

1791
int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, const char* user, const char* tokenName, int64_t clusterId) {
102,588,864✔
1792
  // init hash in activeinfo
1793
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
102,588,864✔
1794
  if (data != NULL) {
102,588,864✔
1795
    return 0;
×
1796
  }
1797
  SClientHbReq hbReq = {0};
102,588,864✔
1798
  hbReq.connKey = connKey;
102,588,864✔
1799
  hbReq.clusterId = clusterId;
102,588,864✔
1800
  tstrncpy(hbReq.user, user, sizeof(hbReq.user));
102,588,864✔
1801
  tstrncpy(hbReq.tokenName, tokenName, sizeof(hbReq.tokenName));
102,588,864✔
1802
  // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
1803

1804
  TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)));
102,588,864✔
1805

1806
  (void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
102,588,864✔
1807
  return 0;
102,588,864✔
1808
}
1809

1810
int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, const char* user, const char* tokenName, int64_t clusterId, int8_t connType) {
102,588,864✔
1811
  SClientHbKey connKey = {
102,588,864✔
1812
      .tscRid = tscRefId,
1813
      .connType = connType,
1814
  };
1815

1816
  switch (connType) {
102,588,864✔
1817
    case CONN_TYPE__QUERY:
102,588,864✔
1818
    case CONN_TYPE__TMQ: {
1819
      return hbRegisterConnImpl(pAppHbMgr, connKey, user, tokenName, clusterId);
102,588,864✔
1820
    }
1821
    default:
×
1822
      return 0;
×
1823
  }
1824
}
1825

1826
void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
102,407,750✔
1827
  int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
102,407,750✔
1828
  if (TSDB_CODE_SUCCESS != code) {
102,410,525✔
1829
    tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1830
  }
1831
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
102,410,525✔
1832
  if (pAppHbMgr) {
102,410,525✔
1833
    SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
102,410,525✔
1834
    if (pReq) {
102,410,525✔
1835
      tFreeClientHbReq(pReq);
1836
      code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
102,396,217✔
1837
      if (TSDB_CODE_SUCCESS != code) {
102,396,217✔
1838
        tscError("hbDeregisterConn taosHashRemove error, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1839
      }
1840
      taosHashRelease(pAppHbMgr->activeInfo, pReq);
102,396,217✔
1841
      (void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
102,396,217✔
1842
    }
1843
  }
1844
  code = taosThreadMutexUnlock(&clientHbMgr.lock);
102,410,525✔
1845
  if (TSDB_CODE_SUCCESS != code) {
102,410,432✔
1846
    tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
1847
  }
1848
}
102,410,432✔
1849

1850
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
1851
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
929,916✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc