• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.2
/source/dnode/mnode/impl/src/mndProfile.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndProfile.h"
18
#include "audit.h"
19
#include "crypt.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndQnode.h"
25
#include "mndShow.h"
26
#include "mndSma.h"
27
#include "mndStb.h"
28
#include "mndUser.h"
29
#include "mndView.h"
30
#include "tglobal.h"
31
#include "tversion.h"
32

33
typedef struct {
34
  uint32_t id;
35
  int8_t   connType;
36
  char     user[TSDB_USER_LEN];
37
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
38
  int64_t  appStartTimeMs;          // app start time
39
  int32_t  pid;                     // pid of app that invokes taosc
40
  int8_t   killed;
41
  int64_t  loginTimeMs;
42
  int64_t  lastAccessTimeMs;
43
  uint64_t killId;
44
  int32_t  numOfQueries;
45
  SRWLatch queryLock;
46
  SArray  *pQueries;  // SArray<SQueryDesc>
47
  char     userApp[TSDB_APP_NAME_LEN];
48
  uint32_t userIp;
49
  SIpAddr  userDualIp;
50
  SIpAddr  addr;
51
  char     sVer[TSDB_VERSION_LEN];
52
  char     cInfo[CONNECTOR_INFO_LEN];
53
} SConnObj;
54

55
typedef struct {
56
  int64_t            appId;
57
  SIpAddr            cliAddr;
58
  int32_t            pid;
59
  char               name[TSDB_APP_NAME_LEN];
60
  int64_t            startTime;
61
  SAppClusterSummary summary;
62
  int64_t            lastAccessTimeMs;
63
} SAppObj;
64

65
typedef struct {
66
  int32_t totalDnodes;
67
  int32_t onlineDnodes;
68
  SEpSet  epSet;
69
  SArray *pQnodeList;
70
  int64_t ipWhiteListVer;
71
} SConnPreparedObj;
72

73
#define CACHE_OBJ_KEEP_TIME 3  // s
74

75
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, SIpAddr *ip, int32_t pid,
76
                               const char *app, int64_t startTime, const char *sVer);
77
static void      mndFreeConn(SConnObj *pConn);
78
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
79
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn, bool extendLifespan);
80
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
81
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
82
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
83
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
84
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
85
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
86
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
87
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
88
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
89
static void      mndFreeApp(SAppObj *pApp);
90
static int32_t   mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
91
static void      mndCancelGetNextApp(SMnode *pMnode, void *pIter);
92
static int32_t   mndProcessSvrVerReq(SRpcMsg *pReq);
93

94
int32_t mndInitProfile(SMnode *pMnode) {
504,277✔
95
  int32_t       code = 0;
504,277✔
96
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
504,277✔
97

98
  // in ms
99
  int32_t checkTime = CACHE_OBJ_KEEP_TIME * 1000;
504,277✔
100
  pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, false, (__cache_free_fn_t)mndFreeConn, "conn");
504,277✔
101
  if (pMgmt->connCache == NULL) {
504,277✔
102
    code = TSDB_CODE_OUT_OF_MEMORY;
×
103
    mError("failed to alloc profile cache since %s", terrstr());
×
104
    TAOS_RETURN(code);
×
105
  }
106

107
  pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
504,277✔
108
  if (pMgmt->appCache == NULL) {
504,277✔
109
    code = TSDB_CODE_OUT_OF_MEMORY;
×
110
    mError("failed to alloc profile cache since %s", terrstr());
×
111
    TAOS_RETURN(code);
×
112
  }
113

114
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
504,277✔
115
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
504,277✔
116
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
504,277✔
117
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
504,277✔
118
  mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
504,277✔
119

120
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
504,277✔
121
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
504,277✔
122
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
504,277✔
123
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
504,277✔
124
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
504,277✔
125
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
504,277✔
126

127
  TAOS_RETURN(code);
504,277✔
128
}
129

130
void mndCleanupProfile(SMnode *pMnode) {
503,463✔
131
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
503,463✔
132
  if (pMgmt->connCache != NULL) {
503,463✔
133
    taosCacheCleanup(pMgmt->connCache);
503,463✔
134
    pMgmt->connCache = NULL;
503,463✔
135
  }
136

137
  if (pMgmt->appCache != NULL) {
503,463✔
138
    taosCacheCleanup(pMgmt->appCache);
503,463✔
139
    pMgmt->appCache = NULL;
503,463✔
140
  }
141
}
503,463✔
142

143
static void getUserIpFromConnObj(SConnObj *pConn, char *dst) {
49,086✔
144
  static char *none = "0.0.0.0";
145
  if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE) {
49,086✔
146
    taosInetNtoa(varDataVal(dst), pConn->userIp);
×
147
    varDataLen(dst) = strlen(varDataVal(dst));
×
148
  }
149

150
  if (pConn->userDualIp.ipv4[0] != 0 && strncmp(pConn->userDualIp.ipv4, none, strlen(none)) != 0) {
49,086✔
151
    char   *ipstr = IP_ADDR_STR(&pConn->userDualIp);
×
152
    int32_t len = strlen(ipstr);
×
153
    memcpy(varDataVal(dst), ipstr, len);
×
154
    varDataLen(dst) = len;
×
155
  }
156
  return;
49,086✔
157
}
158
static void setUserInfo2Conn(SConnObj *connObj, char *userApp, uint32_t userIp, char *cInfo) {
18,951,003✔
159
  if (connObj == NULL) {
18,951,003✔
160
    return;
×
161
  }
162
  tstrncpy(connObj->userApp, userApp, sizeof(connObj->userApp));
18,951,003✔
163
  tstrncpy(connObj->cInfo, cInfo, sizeof(connObj->cInfo));
18,945,900✔
164
  connObj->userIp = userIp;
18,945,551✔
165
}
166
static void setUserInfoIpToConn(SConnObj *connObj, SIpRange *pRange) {
18,950,261✔
167
  int32_t code = 0;
18,950,261✔
168
  if (connObj == NULL) {
18,950,261✔
169
    return;
×
170
  }
171

172
  code = tIpUintToStr(pRange, &connObj->userDualIp);
18,950,261✔
173
  if (code != 0) {
18,948,753✔
174
    mError("conn:%u, failed to set user ip to conn since %s", connObj->id, tstrerror(code));
×
175
    return;
×
176
  }
177
}
178
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, SIpAddr *pAddr, int32_t pid,
3,285,618✔
179
                               const char *app, int64_t startTime, const char *sVer) {
180
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
3,285,618✔
181

182
  char     connStr[255] = {0};
3,285,618✔
183
  char    *ip = IP_ADDR_STR(pAddr);
3,285,618✔
184
  uint16_t port = pAddr->port;
3,285,618✔
185

186
  int32_t  len = tsnprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
3,285,365✔
187
  uint32_t connId = mndGenerateUid(connStr, len);
3,285,618✔
188
  if (startTime == 0) startTime = taosGetTimestampMs();
3,869,699✔
189

190
  SConnObj connObj = {
3,285,618✔
191
      .id = connId,
192
      .connType = connType,
193
      .appStartTimeMs = startTime,
194
      .pid = pid,
195
      .addr = *pAddr,
196
      .killed = 0,
197
      .loginTimeMs = taosGetTimestampMs(),
3,285,618✔
198
      .lastAccessTimeMs = 0,
199
      .killId = 0,
200
      .numOfQueries = 0,
201
      .pQueries = NULL,
202
  };
203

204
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
3,285,618✔
205
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
3,285,618✔
206
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
3,285,618✔
207
  tstrncpy(connObj.sVer, sVer, TSDB_VERSION_LEN);
3,285,618✔
208

209
  SConnObj *pConn =
210
      taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), CACHE_OBJ_KEEP_TIME * 1000);
3,285,618✔
211
  if (pConn == NULL) {
3,284,318✔
212
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
213
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
×
214
    return NULL;
×
215
  } else {
216
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
3,284,318✔
217
    return pConn;
3,284,849✔
218
  }
219
}
220

221
static void mndFreeConn(SConnObj *pConn) {
3,285,618✔
222
  taosWLockLatch(&pConn->queryLock);
3,285,618✔
223
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
3,285,618✔
224
  taosWUnLockLatch(&pConn->queryLock);
3,285,618✔
225

226
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
3,285,618✔
227
}
3,285,618✔
228

229
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
18,950,889✔
230
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
18,950,889✔
231

232
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
18,949,554✔
233
  if (pConn == NULL) {
18,952,516✔
234
    mDebug("conn:%u, already destroyed", connId);
584,081✔
235
    return NULL;
584,081✔
236
  }
237

238
  pConn->lastAccessTimeMs = taosGetTimestampMs();
18,367,625✔
239
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
18,367,030✔
240
  return pConn;
18,362,120✔
241
}
242

243
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn, bool extendLifespan) {
22,490,675✔
244
  if (pConn == NULL) return;
22,490,675✔
245
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
22,484,261✔
246

247
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
22,484,261✔
248
  if (extendLifespan) taosCacheTryExtendLifeSpan(pMgmt->connCache, (void **)&pConn);
22,484,355✔
249
  taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
22,484,042✔
250
}
251

252
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
1,313,313✔
253
  SConnObj *pConn = NULL;
1,313,313✔
254
  bool      hasNext = taosCacheIterNext(pIter);
1,313,313✔
255
  if (hasNext) {
1,313,313✔
256
    size_t dataLen = 0;
1,210,899✔
257
    pConn = taosCacheIterGetData(pIter, &dataLen);
1,210,899✔
258
  } else {
259
    taosCacheDestroyIter(pIter);
102,414✔
260
  }
261

262
  return pConn;
1,312,845✔
263
}
264

265
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
×
266
  if (pIter != NULL) {
×
267
    taosCacheDestroyIter(pIter);
×
268
  }
269
}
×
270

271

272

273
// TODO: if there are many connections, this function may be slow
274
static int32_t mndCountUserConns(SMnode *pMnode, const char *user) {
78,464✔
275
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
78,464✔
276
  SCacheIter   *pIter = taosCacheCreateIter(pMgmt->connCache);
78,464✔
277
  if (pIter == NULL) {
78,464✔
278
    mError("failed to create conn cache iterator");
×
279
    return -1;
×
280
  }
281

282
  int32_t    count = 0;
78,464✔
283
  SConnObj  *pConn = NULL;
78,464✔
284
  while ((pConn = mndGetNextConn(pMnode, pIter)) != NULL) {
908,108✔
285
    if (strncmp(pConn->user, user, TSDB_USER_LEN) == 0) {
829,644✔
286
      count++;
158,048✔
287
    }
288
    mndReleaseConn(pMnode, pConn, true);
829,644✔
289
  }
290

291
  return count;
78,464✔
292
}
293

294

295

296
static bool constTimeEq(const char *a, const char *b, size_t len) {
2,704,828✔
297
  volatile uint8_t res = 0;
2,704,828✔
298
  for (size_t i = 0; i < len; i++) {
86,549,305✔
299
    res |= a[i] ^ b[i];
83,844,213✔
300
  }
301
  return res == 0;
2,705,092✔
302
}
303

304

305

306
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
2,707,951✔
307
  SMnode         *pMnode = pReq->info.node;
2,707,951✔
308
  SUserObj       *pUser = NULL;
2,707,951✔
309
  SDbObj         *pDb = NULL;
2,707,698✔
310
  SConnObj       *pConn = NULL;
2,707,698✔
311
  int32_t         code = 0;
2,707,698✔
312
  SConnectReq     connReq = {0};
2,707,698✔
313
  const STraceId *trace = &pReq->info.traceId;
2,707,698✔
314

315
  char    *ip = IP_ADDR_STR(&pReq->info.conn.cliAddr);
2,707,951✔
316
  uint16_t port = pReq->info.conn.cliAddr.port;
2,707,698✔
317

318
  if ((code = tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq)) != 0) {
2,707,951✔
319
    goto _OVER;
×
320
  }
321

322
  if ((code = taosCheckVersionCompatibleFromStr(connReq.sVer, td_version, 3)) != 0) {
2,707,951✔
323
    mGError("version not compatible. client version: %s, server version: %s", connReq.sVer, td_version);
×
324
    goto _OVER;
×
325
  }
326

327
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT)) != 0) {
2,707,951✔
328
    mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, IP_ADDR_STR(&pReq->info.conn.cliAddr),
2,859✔
329
            tstrerror(code));
330
    goto _OVER;
2,859✔
331
  }
332

333
  code = mndAcquireUser(pMnode, pReq->info.conn.user, &pUser);
2,705,092✔
334
  if (pUser == NULL) {
2,705,092✔
335
    mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, tstrerror(code));
×
336
    goto _OVER;
×
337
  }
338

339
  int64_t now = taosGetTimestampSec();
2,705,092✔
340
  if (pUser->passwordLifeTime > 0 && pUser->passwordGraceTime >= 0) {
2,705,092✔
341
    int64_t lifeTime = now - pUser->passwords[0].setTime;
78,464✔
342
    int64_t maxLifeTime = pUser->passwordLifeTime + pUser->passwordGraceTime;
78,464✔
343
    if (lifeTime >= maxLifeTime) {
78,464✔
344
      mGError("user:%s, failed to login from %s since password expired", pReq->info.conn.user, ip);
×
345
      code = TSDB_CODE_MND_USER_PASSWORD_EXPIRED;
×
346
      goto _OVER;
×
347
    }
348
  }
349

350
  if (!isTimeInDateTimeWhiteList(pUser->pTimeWhiteList, now)) {
2,705,092✔
351
    mGError("user:%s, failed to login from %s since not in date white list", pReq->info.conn.user, ip);
×
352
    code = TSDB_CODE_MND_USER_DISABLED;
×
353
    goto _OVER;
×
354
  }
355

356
  SLoginInfo loginInfo = {0};
2,705,065✔
357
  mndGetUserLoginInfo(pReq->info.conn.user, &loginInfo);
2,705,092✔
358
  if (pUser->inactiveAccountTime >= 0 && (now - loginInfo.lastLoginTime >= pUser->inactiveAccountTime)) {
2,704,839✔
359
    mGError("user:%s, failed to login from %s since inactive account", pReq->info.conn.user, ip);
×
360
    code = TSDB_CODE_MND_USER_DISABLED;
×
361
    goto _OVER;
×
362
  }
363

364
  if (pUser->failedLoginAttempts >= 0 & loginInfo.failedLoginCount >= pUser->failedLoginAttempts) {
2,704,839✔
365
    if(now - loginInfo.lastFailedLoginTime < pUser->passwordLockTime) {
×
366
      mGError("user:%s, failed to login from %s since too many login failures", pReq->info.conn.user, ip);
×
367
      code = TSDB_CODE_MND_USER_DISABLED;
×
368
      goto _OVER;
×
369
    }
370
  }
371

372
  if (pUser->sessionPerUser >= 0) {
2,704,575✔
373
    int32_t currentSessions = mndCountUserConns(pMnode, pReq->info.conn.user);
78,464✔
374
    if (currentSessions >= pUser->sessionPerUser) {
78,464✔
375
      mGError("user:%s, failed to login from %s since exceed max connections:%d", pReq->info.conn.user, ip, pUser->sessionPerUser);
×
376
      code = TSDB_CODE_MND_TOO_MANY_CONNECTIONS;
×
377
      goto _OVER;
×
378
    }
379
  }
380

381
  char tmpPass[TSDB_PASSWORD_LEN] = {0};
2,705,092✔
382
  (void)memcpy(tmpPass, connReq.passwd, TSDB_PASSWORD_LEN);
2,704,828✔
383
  tmpPass[TSDB_PASSWORD_LEN - 1] = 0;
2,704,828✔
384

385
  if (pUser->passEncryptAlgorithm != 0) {
2,704,828✔
386
    if (pUser->passEncryptAlgorithm != tsiEncryptPassAlgorithm) {
360✔
387
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
388
      goto _OVER;
×
389
    }
390
    TAOS_CHECK_GOTO(mndEncryptPass(tmpPass, pUser->salt, NULL), NULL, _OVER);
360✔
391
  }
392

393
  if (tsMndSkipGrant) {
2,704,828✔
394
    loginInfo.lastLoginTime= now;
×
395
  } else if (constTimeEq(tmpPass, pUser->passwords[0].pass, sizeof(tmpPass) - 1)) {
2,704,828✔
396
    loginInfo.failedLoginCount = 0;
2,702,465✔
397
    loginInfo.lastLoginTime= now;
2,702,465✔
398
  } else {
399
    mGError("user:%s, failed to login from %s since pass not match, input:%s", pReq->info.conn.user, ip, connReq.passwd);
2,600✔
400
    if (pUser->failedLoginAttempts >= 0) {
2,600✔
401
      if (loginInfo.failedLoginCount >= pUser->failedLoginAttempts) {
984✔
402
        // if we can get here, it means the lock time has passed, so reset the counter
403
        loginInfo.failedLoginCount = 0;
×
404
      }
405
      loginInfo.failedLoginCount++;
984✔
406
      loginInfo.lastFailedLoginTime = now;
984✔
407
    }
408
    code = TSDB_CODE_MND_AUTH_FAILURE;
2,600✔
409
  }
410

411
  mndSetUserLoginInfo(pReq->info.conn.user, &loginInfo);
2,705,065✔
412
  if (code != 0) {
2,705,092✔
413
    goto _OVER;
2,600✔
414
  }
415

416
  if (connReq.db[0]) {
2,702,492✔
417
    char db[TSDB_DB_FNAME_LEN] = {0};
1,195,088✔
418
    (void)snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
1,195,088✔
419
    pDb = mndAcquireDb(pMnode, db);
1,195,088✔
420
    if (pDb == NULL) {
1,195,088✔
421
      if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) &&
2,391✔
422
          (0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) {
1,673✔
423
        code = TSDB_CODE_MND_DB_NOT_EXIST;
955✔
424
        mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
955✔
425
                tstrerror(code));
426
        goto _OVER;
955✔
427
      }
428
    }
429

430
    TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb), NULL, _OVER);
1,194,133✔
431
  }
432

433
  pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, &pReq->info.conn.cliAddr, connReq.pid,
2,701,537✔
434
                        connReq.app, connReq.startTime, connReq.sVer);
435
  if (pConn == NULL) {
2,700,237✔
436
    code = terrno;
×
437
    mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip,
×
438
            tstrerror(code));
439
    goto _OVER;
×
440
  }
441

442
  SConnectRsp connectRsp = {0};
2,700,237✔
443
  connectRsp.acctId = pUser->acctId;
2,700,389✔
444
  connectRsp.superUser = pUser->superUser;
2,701,155✔
445
  connectRsp.sysInfo = pUser->sysInfo;
2,700,653✔
446
  connectRsp.clusterId = pMnode->clusterId;
2,700,653✔
447
  connectRsp.connId = pConn->id;
2,700,501✔
448
  connectRsp.connType = connReq.connType;
2,701,537✔
449
  connectRsp.mustChangePass = mndMustChangePassword(pUser) ? 1 : 0;
2,701,537✔
450
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
2,700,501✔
451
  connectRsp.svrTimestamp = taosGetTimestampSec();
2,701,537✔
452
  connectRsp.passVer = pUser->passVersion;
2,700,504✔
453
  connectRsp.authVer = pUser->authVersion;
2,701,270✔
454
  connectRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
2,700,768✔
455
  connectRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
2,700,768✔
456
  connectRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
2,700,768✔
457
  connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
2,700,768✔
458
  connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
2,700,768✔
459
  connectRsp.enableAuditDelete = tsEnableAuditDelete;
2,700,768✔
460
  tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
2,700,768✔
461
  connectRsp.whiteListVer = pUser->ipWhiteListVer;
2,700,768✔
462
  connectRsp.timeWhiteListVer = pUser->timeWhiteListVer;
2,701,273✔
463

464
  tstrncpy(connectRsp.sVer, td_version, sizeof(connectRsp.sVer));
2,700,237✔
465
  (void)snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", td_version,
2,700,237✔
466
                 td_buildinfo, td_gitinfo);
467
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
2,700,237✔
468

469
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
2,701,537✔
470
  if (contLen < 0) {
2,700,504✔
471
    TAOS_CHECK_GOTO(contLen, NULL, _OVER);
×
472
  }
473
  void *pRsp = rpcMallocCont(contLen);
2,700,504✔
474
  if (pRsp == NULL) {
2,700,171✔
475
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
476
  }
477

478
  contLen = tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
2,700,171✔
479
  if (contLen < 0) {
2,701,537✔
480
    rpcFreeCont(pRsp);
×
481
    TAOS_CHECK_GOTO(contLen, NULL, _OVER);
×
482
  }
483

484
  pReq->info.rspLen = contLen;
2,701,537✔
485
  pReq->info.rsp = pRsp;
2,700,515✔
486

487
  mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, port, pConn->id, connReq.app);
2,699,160✔
488

489
  code = 0;
2,701,858✔
490

491
  char    detail[1000] = {0};
2,701,858✔
492
  int32_t nBytes = snprintf(detail, sizeof(detail), "app:%s", connReq.app);
2,701,537✔
493
  if ((uint32_t)nBytes < sizeof(detail)) {
2,701,537✔
494
    auditRecord(pReq, pMnode->clusterId, "login", "", "", detail, strlen(detail));
2,701,537✔
495
  } else {
496
    mError("failed to audit logic since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
497
  }
498

499
_OVER:
2,680,292✔
500

501
  mndReleaseUser(pMnode, pUser);
2,707,951✔
502
  mndReleaseDb(pMnode, pDb);
2,707,951✔
503
  mndReleaseConn(pMnode, pConn, true);
2,707,951✔
504

505
  TAOS_RETURN(code);
2,707,951✔
506
}
507

508
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
18,950,892✔
509
  taosWLockLatch(&pConn->queryLock);
18,950,892✔
510

511
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
18,951,932✔
512

513
  pConn->pQueries = pBasic->queryDesc;
18,950,866✔
514
  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
18,952,132✔
515
  pBasic->queryDesc = NULL;
18,951,891✔
516

517
  mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries);
18,952,590✔
518

519
  taosWUnLockLatch(&pConn->queryLock);
18,953,858✔
520

521
  return TSDB_CODE_SUCCESS;
18,953,248✔
522
}
523

524
static SAppObj *mndCreateApp(SMnode *pMnode, SIpAddr *pAddr, SAppHbReq *pReq) {
862,851✔
525
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
862,851✔
526

527
  SAppObj app;
859,840✔
528
  app.appId = pReq->appId;
862,851✔
529
  app.cliAddr = *pAddr;
862,851✔
530
  app.pid = pReq->pid;
862,851✔
531
  tstrncpy(app.name, pReq->name, sizeof(app.name));
862,851✔
532
  app.startTime = pReq->startTime;
862,851✔
533
  (void)memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
862,851✔
534
  app.lastAccessTimeMs = taosGetTimestampMs();
862,851✔
535

536
  SAppObj *pApp =
537
      taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), CACHE_OBJ_KEEP_TIME * 1000);
862,851✔
538
  if (pApp == NULL) {
862,851✔
539
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
540
    mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
×
541
    return NULL;
×
542
  }
543

544
  mTrace("app %" PRIx64 " is put into cache", pReq->appId);
862,851✔
545
  return pApp;
862,851✔
546
}
547

548
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
862,851✔
549

550
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
18,951,996✔
551
  terrno = 0;
18,951,996✔
552
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
18,953,490✔
553

554
  SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
18,952,563✔
555
  if (pApp == NULL) {
18,950,255✔
556
    mDebug("app %" PRIx64 " not in cache", appId);
862,851✔
557
    return NULL;
862,851✔
558
  }
559

560
  pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();
18,087,225✔
561

562
  mTrace("app %" PRIx64 " acquired from cache", appId);
18,086,624✔
563
  return pApp;
18,086,329✔
564
}
565

566
static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
18,948,567✔
567
  if (pApp == NULL) return;
18,948,567✔
568
  mTrace("release app %" PRIx64 " to cache", pApp->appId);
18,948,567✔
569

570
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
18,948,567✔
571
  taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
18,948,493✔
572
}
573

574
SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
12,216✔
575
  SAppObj *pApp = NULL;
12,216✔
576
  bool     hasNext = taosCacheIterNext(pIter);
12,216✔
577
  if (hasNext) {
12,216✔
578
    size_t dataLen = 0;
6,108✔
579
    pApp = taosCacheIterGetData(pIter, &dataLen);
6,108✔
580
  } else {
581
    taosCacheDestroyIter(pIter);
6,108✔
582
  }
583

584
  return pApp;
12,216✔
585
}
586

587
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
×
588
  if (pIter != NULL) {
×
589
    taosCacheDestroyIter(pIter);
×
590
  }
591
}
×
592

593
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
×
594
  //
595
  return NULL;
×
596
}
597

598
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
18,950,783✔
599
  int32_t    code = 0;
18,950,783✔
600
  SAppHbReq *pReq = &pHbReq->app;
18,950,783✔
601
  SAppObj   *pApp = mndAcquireApp(pMnode, pReq->appId);
18,952,953✔
602
  if (pApp == NULL) {
18,949,326✔
603
    pApp = mndCreateApp(pMnode, &connInfo->cliAddr, pReq);
862,851✔
604
    if (pApp == NULL) {
862,851✔
605
      mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
×
606
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
607
      if (terrno != 0) code = terrno;
×
608
      TAOS_RETURN(code);
×
609
    } else {
610
      mDebug("a new app %" PRIx64 " is created", pReq->appId);
862,851✔
611
      mndReleaseApp(pMnode, pApp);
862,851✔
612
      return TSDB_CODE_SUCCESS;
862,851✔
613
    }
614
  }
615

616
  (void)memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));
18,086,475✔
617

618
  mndReleaseApp(pMnode, pApp);
18,089,129✔
619

620
  return TSDB_CODE_SUCCESS;
18,083,863✔
621
}
622

623
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
16,945,970✔
624
  SSdb      *pSdb = pMnode->pSdb;
16,945,970✔
625
  SDnodeObj *pDnode = NULL;
16,946,487✔
626
  int64_t    curMs = taosGetTimestampMs();
16,945,745✔
627
  void      *pIter = NULL;
16,945,745✔
628

629
  while (true) {
33,612,078✔
630
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
50,557,823✔
631
    if (pIter == NULL) break;
50,559,678✔
632

633
    bool online = mndIsDnodeOnline(pDnode, curMs);
33,611,663✔
634
    if (online) {
33,608,077✔
635
      (*num)++;
32,224,515✔
636
    }
637

638
    sdbRelease(pSdb, pDnode);
33,607,894✔
639
  }
640

641
  return TSDB_CODE_SUCCESS;
16,948,015✔
642
}
643

644
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
20,838,238✔
645
                                        SClientHbBatchRsp *pBatchRsp, SConnPreparedObj *pObj) {
646
  int32_t       code = 0;
20,838,238✔
647
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
20,838,238✔
648
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
20,839,025✔
649
  SRpcConnInfo  connInfo = pMsg->info.conn;
20,838,820✔
650

651
  if (0 != pHbReq->app.appId) {
20,836,401✔
652
    TAOS_CHECK_RETURN(mndUpdateAppInfo(pMnode, pHbReq, &connInfo));
18,952,044✔
653
  }
654

655
  if (pHbReq->query) {
20,833,812✔
656
    SQueryHbReqBasic *pBasic = pHbReq->query;
18,948,527✔
657

658
    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
18,950,353✔
659
    if (pConn == NULL) {
18,947,147✔
660
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, &connInfo.cliAddr, pHbReq->app.pid,
584,081✔
661
                            pHbReq->app.name, 0, pHbReq->sVer);
584,081✔
662
      if (pConn == NULL) {
584,081✔
663
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
×
664
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
665
        if (terrno != 0) code = terrno;
×
666
        TAOS_RETURN(code);
×
667
      } else {
668
        mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
584,081✔
669
      }
670
    }
671

672
    setUserInfo2Conn(pConn, pHbReq->userApp, pHbReq->userIp, pHbReq->cInfo);
18,947,147✔
673
    setUserInfoIpToConn(pConn, &pHbReq->userDualIp);
18,947,795✔
674

675
    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
18,948,504✔
676
    if (rspBasic == NULL) {
18,949,430✔
677
      mndReleaseConn(pMnode, pConn, true);
×
678
      code = terrno;
×
679
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
×
680
      TAOS_RETURN(code);
×
681
    }
682

683
    TAOS_CHECK_RETURN(mndSaveQueryList(pConn, pBasic));
18,949,430✔
684
    if (pConn->killed != 0) {
18,953,127✔
685
      rspBasic->killConnection = 1;
×
686
    }
687

688
    if (pConn->killId != 0) {
18,953,127✔
689
      rspBasic->killRid = pConn->killId;
×
690
      pConn->killId = 0;
×
691
    }
692

693
    rspBasic->connId = pConn->id;
18,953,369✔
694
    rspBasic->connId = pConn->id;
18,953,127✔
695
    rspBasic->totalDnodes = pObj->totalDnodes;
18,953,174✔
696
    rspBasic->onlineDnodes = pObj->onlineDnodes;
18,952,811✔
697
    rspBasic->epSet = pObj->epSet;
18,952,932✔
698
    rspBasic->pQnodeList = taosArrayDup(pObj->pQnodeList, NULL);
18,953,248✔
699

700
    mndReleaseConn(pMnode, pConn, true);
18,953,248✔
701

702
    hbRsp.query = rspBasic;
18,953,490✔
703
  } else {
704
    mDebug("no query info in hb msg");
1,886,062✔
705
  }
706

707
  int32_t kvNum = taosHashGetSize(pHbReq->info);
20,839,552✔
708
  if (NULL == pHbReq->info || kvNum <= 0) {
20,839,067✔
709
    if (taosArrayPush(pBatchRsp->rsps, &hbRsp) == NULL) {
14,390,218✔
710
      mError("failed to put rsp into array, but continue at this heartbeat");
×
711
    }
712
    return TSDB_CODE_SUCCESS;
7,195,403✔
713
  }
714

715
  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
13,644,252✔
716
  if (NULL == hbRsp.info) {
13,644,252✔
717
    mError("taosArrayInit %d rsp kv failed", kvNum);
×
718
    code = terrno;
×
719
    tFreeClientHbRsp(&hbRsp);
720
    TAOS_RETURN(code);
×
721
  }
722

723
#ifdef TD_ENTERPRISE
724
  bool             needCheck = true;
13,644,252✔
725
  int32_t          key = HEARTBEAT_KEY_DYN_VIEW;
13,644,252✔
726
  SDynViewVersion *pDynViewVer = NULL;
13,644,252✔
727
  SKv             *pKv = taosHashGet(pHbReq->info, &key, sizeof(key));
13,644,252✔
728
  if (NULL != pKv) {
13,644,252✔
729
    pDynViewVer = pKv->value;
3,230✔
730
    mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer);
3,230✔
731

732
    SDynViewVersion *pRspVer = NULL;
3,230✔
733
    if (0 != (code = mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer))) {
3,230✔
734
      TAOS_RETURN(code);
×
735
    }
736

737
    if (needCheck) {
3,230✔
738
      SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pRspVer};
2,320✔
739
      if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
4,640✔
740
        if (terrno != 0) code = terrno;
×
741
        TAOS_RETURN(code);
×
742
      };
743
      mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs,
2,320✔
744
             pRspVer->dynViewVer);
745
    }
746
  }
747
#endif
748

749
  void *pIter = taosHashIterate(pHbReq->info, NULL);
13,644,252✔
750
  while (pIter != NULL) {
34,045,130✔
751
    SKv *kv = pIter;
20,400,878✔
752

753
    switch (kv->key) {
20,400,878✔
754
      case HEARTBEAT_KEY_USER_AUTHINFO: {
13,616,707✔
755
        void   *rspMsg = NULL;
13,616,707✔
756
        int32_t rspLen = 0;
13,616,707✔
757
        (void)mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen,
13,616,707✔
758
                                      pObj->ipWhiteListVer);
759
        if (rspMsg && rspLen > 0) {
13,615,734✔
760
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
769,577✔
761
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
1,539,758✔
762
            mError("failed to put kv into array, but continue at this heartbeat");
×
763
          }
764
        }
765
        break;
13,616,036✔
766
      }
767
      case HEARTBEAT_KEY_DBINFO: {
4,065,378✔
768
        void   *rspMsg = NULL;
4,065,378✔
769
        int32_t rspLen = 0;
4,065,378✔
770
        (void)mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbCacheInfo), &rspMsg, &rspLen);
4,065,378✔
771
        if (rspMsg && rspLen > 0) {
4,065,378✔
772
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
4,065,378✔
773
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
8,130,756✔
774
            mError("failed to put kv into array, but continue at this heartbeat");
×
775
          }
776
        }
777
        break;
4,065,378✔
778
      }
779
      case HEARTBEAT_KEY_STBINFO: {
2,712,333✔
780
        void   *rspMsg = NULL;
2,712,333✔
781
        int32_t rspLen = 0;
2,712,333✔
782
        (void)mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
2,712,333✔
783
        if (rspMsg && rspLen > 0) {
2,712,333✔
784
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
2,712,333✔
785
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
5,424,666✔
786
            mError("failed to put kv into array, but continue at this heartbeat");
×
787
          }
788
        }
789
        break;
2,712,333✔
790
      }
791
#ifdef TD_ENTERPRISE
792
      case HEARTBEAT_KEY_DYN_VIEW: {
3,230✔
793
        break;
3,230✔
794
      }
795
      case HEARTBEAT_KEY_VIEWINFO: {
3,230✔
796
        if (!needCheck) {
3,230✔
797
          break;
910✔
798
        }
799

800
        void   *rspMsg = NULL;
2,320✔
801
        int32_t rspLen = 0;
2,320✔
802
        (void)mndValidateViewInfo(pMnode, kv->value, kv->valueLen / sizeof(SViewVersion), &rspMsg, &rspLen);
2,320✔
803
        if (rspMsg && rspLen > 0) {
2,320✔
804
          SKv kv1 = {.key = HEARTBEAT_KEY_VIEWINFO, .valueLen = rspLen, .value = rspMsg};
2,320✔
805
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
4,640✔
806
            mError("failed to put kv into array, but continue at this heartbeat");
×
807
          }
808
        }
809
        break;
2,320✔
810
      }
811
#endif
812
      case HEARTBEAT_KEY_TSMA: {
×
813
        void   *rspMsg = NULL;
×
814
        int32_t rspLen = 0;
×
815
        (void)mndValidateTSMAInfo(pMnode, kv->value, kv->valueLen / sizeof(STSMAVersion), &rspMsg, &rspLen);
×
816
        if (rspMsg && rspLen > 0) {
×
817
          SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = rspLen, .value = rspMsg};
×
818
          if (taosArrayPush(hbRsp.info, &kv) == NULL) {
×
819
            mError("failed to put kv into array, but continue at this heartbeat");
×
820
          }
821
        }
822
        break;
×
823
      }
824
      default:
×
825
        mError("invalid kv key:%d", kv->key);
×
826
        hbRsp.status = TSDB_CODE_APP_ERROR;
×
827
        break;
×
828
    }
829

830
    pIter = taosHashIterate(pHbReq->info, pIter);
20,399,508✔
831
  }
832

833
  if (taosArrayPush(pBatchRsp->rsps, &hbRsp) == NULL) {
27,288,504✔
834
    if (terrno != 0) code = terrno;
×
835
  }
836
  TAOS_RETURN(code);
13,644,252✔
837
}
838

839
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
16,947,408✔
840
  int32_t code = 0;
16,947,408✔
841
  int32_t lino = 0;
16,947,408✔
842
  SMnode *pMnode = pReq->info.node;
16,947,408✔
843

844
  SClientHbBatchReq batchReq = {0};
16,948,015✔
845
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
16,947,228✔
846
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
×
847
    code = TSDB_CODE_INVALID_MSG;
×
848
    TAOS_RETURN(code);
×
849
  }
850

851
  SConnPreparedObj obj = {0};
16,946,742✔
852
  obj.totalDnodes = mndGetDnodeSize(pMnode);
16,947,247✔
853
  obj.ipWhiteListVer = batchReq.ipWhiteListVer;
16,947,491✔
854
  TAOS_CHECK_RETURN(mndGetOnlineDnodeNum(pMnode, &obj.onlineDnodes));
16,947,491✔
855
  mndGetMnodeEpSet(pMnode, &obj.epSet);
16,947,615✔
856
  TAOS_CHECK_RETURN(mndCreateQnodeList(pMnode, &obj.pQnodeList, -1));
16,948,015✔
857

858
  SClientHbBatchRsp batchRsp = {0};
16,944,835✔
859
  batchRsp.svrTimestamp = taosGetTimestampSec();
16,944,835✔
860
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
16,946,527✔
861
  if (batchRsp.rsps == NULL) {
16,944,550✔
862
    TAOS_CHECK_EXIT(terrno);
×
863
  }
864
  batchRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
16,944,550✔
865
  batchRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
16,944,550✔
866
  batchRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
16,944,550✔
867
  tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
16,944,550✔
868
  batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
16,944,550✔
869
  batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
16,944,550✔
870
  batchRsp.enableAuditDelete = tsEnableAuditDelete;
16,944,550✔
871
  batchRsp.enableStrongPass = tsEnableStrongPassword;
16,944,550✔
872

873
  int32_t sz = taosArrayGetSize(batchReq.reqs);
16,944,550✔
874
  for (int i = 0; i < sz; i++) {
37,786,294✔
875
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
20,838,262✔
876
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
20,839,064✔
877
      TAOS_CHECK_EXIT(mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp, &obj));
20,839,012✔
878
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
×
879
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
×
880
      if (pRsp != NULL) {
×
881
        if (taosArrayPush(batchRsp.rsps, pRsp) == NULL) {
×
882
          mError("failed to put kv into array, but continue at this heartbeat");
×
883
        }
884
        taosMemoryFree(pRsp);
×
885
      }
886
    }
887
  }
888
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
16,948,032✔
889

890
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
16,947,299✔
891
  if (tlen < 0) {
16,944,499✔
892
    TAOS_CHECK_EXIT(tlen);
×
893
  }
894
  void *buf = rpcMallocCont(tlen);
16,944,499✔
895
  if (!buf) {
16,943,994✔
896
    TAOS_CHECK_EXIT(terrno);
×
897
  }
898
  tlen = tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
16,943,994✔
899
  if (tlen < 0) {
16,944,788✔
900
    rpcFreeCont(buf);
×
901
    TAOS_CHECK_EXIT(tlen);
×
902
  }
903
  pReq->info.rspLen = tlen;
16,944,788✔
904
  pReq->info.rsp = buf;
16,943,099✔
905
_exit:
16,944,165✔
906
  tFreeClientHbBatchRsp(&batchRsp);
907

908
  taosArrayDestroy(obj.pQnodeList);
16,945,897✔
909

910
  TAOS_RETURN(code);
16,945,846✔
911
}
912

913
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
×
914
  int32_t       code = 0;
×
915
  SMnode       *pMnode = pReq->info.node;
×
916
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
×
917

918
  SKillQueryReq killReq = {0};
×
919
  TAOS_CHECK_RETURN(tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq));
×
920

921
  mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
×
922
  TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY));
×
923

924
  int32_t  connId = 0;
×
925
  uint64_t queryId = 0;
×
926
  char    *p = strchr(killReq.queryStrId, ':');
×
927
  if (NULL == p) {
×
928
    mError("invalid QID:%s", killReq.queryStrId);
×
929
    code = TSDB_CODE_MND_INVALID_QUERY_ID;
×
930
    TAOS_RETURN(code);
×
931
  }
932
  *p = 0;
×
933
  connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
×
934
  queryId = taosStr2UInt64(p + 1, NULL, 16);
×
935

936
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
×
937
  if (pConn == NULL) {
×
938
    mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
×
939
    code = TSDB_CODE_MND_INVALID_CONN_ID;
×
940
    TAOS_RETURN(code);
×
941
  } else {
942
    mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
×
943
    pConn->killId = queryId;
×
944
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
×
945
    TAOS_RETURN(code);
×
946
  }
947
}
948

949
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
359✔
950
  int32_t       code = 0;
359✔
951
  SMnode       *pMnode = pReq->info.node;
359✔
952
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
359✔
953

954
  SKillConnReq killReq = {0};
359✔
955
  TAOS_CHECK_RETURN(tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq));
359✔
956

957
  TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN));
359✔
958

959
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
×
960
  if (pConn == NULL) {
×
961
    mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
×
962
    code = TSDB_CODE_MND_INVALID_CONN_ID;
×
963
    TAOS_RETURN(code);
×
964
  } else {
965
    mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
×
966
    pConn->killed = 1;
×
967
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
×
968
    TAOS_RETURN(code);
×
969
  }
970
}
971

972
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
×
973
  int32_t       code = 0;
×
974
  int32_t       lino = 0;
×
975
  SServerVerRsp rsp = {0};
×
976
  tstrncpy(rsp.ver, td_version, sizeof(rsp.ver));
×
977

978
  int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
×
979
  if (contLen < 0) {
×
980
    TAOS_CHECK_EXIT(contLen);
×
981
  }
982
  void *pRsp = rpcMallocCont(contLen);
×
983
  if (pRsp == NULL) {
×
984
    TAOS_CHECK_EXIT(terrno);
×
985
  }
986
  contLen = tSerializeSServerVerRsp(pRsp, contLen, &rsp);
×
987
  if (contLen < 0) {
×
988
    rpcFreeCont(pRsp);
×
989
    TAOS_CHECK_EXIT(contLen);
×
990
  }
991

992
  pReq->info.rspLen = contLen;
×
993
  pReq->info.rsp = pRsp;
×
994

995
_exit:
×
996

997
  TAOS_RETURN(code);
×
998
}
999

1000
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
17,364✔
1001
  SMnode   *pMnode = pReq->info.node;
17,364✔
1002
  SSdb     *pSdb = pMnode->pSdb;
17,364✔
1003
  int32_t   numOfRows = 0;
17,364✔
1004
  int32_t   cols = 0;
17,364✔
1005
  int32_t   code = 0;
17,364✔
1006
  SConnObj *pConn = NULL;
17,364✔
1007

1008
  if (pShow->pIter == NULL) {
17,364✔
1009
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
17,364✔
1010
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
17,364✔
1011
    if (!pShow->pIter) return terrno;
17,364✔
1012
  }
1013

1014
  while (numOfRows < rows) {
354,826✔
1015
    pConn = mndGetNextConn(pMnode, pShow->pIter);
354,826✔
1016
    if (pConn == NULL) {
354,826✔
1017
      pShow->pIter = NULL;
17,364✔
1018
      break;
17,364✔
1019
    }
1020

1021
    if ((taosGetTimestampMs() - pConn->lastAccessTimeMs) > ((int64_t)CACHE_OBJ_KEEP_TIME * 1000)) {
337,930✔
1022
      continue;
292,078✔
1023
    }
1024

1025
    cols = 0;
45,852✔
1026

1027
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1028
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->id, false);
45,852✔
1029
    if (code != 0) {
45,852✔
1030
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
1031
      return code;
×
1032
    }
1033

1034
    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
45,852✔
1035
    STR_TO_VARSTR(user, pConn->user);
45,852✔
1036
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1037
    code = colDataSetVal(pColInfo, numOfRows, (const char *)user, false);
45,852✔
1038
    if (code != 0) {
45,852✔
1039
      mError("failed to set user since %s", tstrerror(code));
×
1040
      return code;
×
1041
    }
1042

1043
    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
45,852✔
1044
    STR_TO_VARSTR(app, pConn->app);
45,852✔
1045
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1046
    code = colDataSetVal(pColInfo, numOfRows, (const char *)app, false);
45,852✔
1047
    if (code != 0) {
45,852✔
1048
      mError("failed to set app since %s", tstrerror(code));
×
1049
      return code;
×
1050
    }
1051

1052
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1053
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->pid, false);
45,852✔
1054
    if (code != 0) {
45,852✔
1055
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
1056
      return code;
×
1057
    }
1058

1059
    char addr[IP_RESERVE_CAP] = {0};
45,852✔
1060
    char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
45,852✔
1061
    if (tsnprintf(addr, sizeof(addr), "%s:%d", IP_ADDR_STR(&pConn->addr), pConn->addr.port) >= sizeof(addr)) {
45,852✔
1062
      code = TSDB_CODE_OUT_OF_RANGE;
×
1063
      mError("failed to set endpoint since %s", tstrerror(code));
×
1064
      return code;
×
1065
    }
1066

1067
    STR_TO_VARSTR(endpoint, addr);
45,852✔
1068

1069
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1070
    code = colDataSetVal(pColInfo, numOfRows, (const char *)endpoint, false);
45,852✔
1071
    if (code != 0) {
45,852✔
1072
      mError("failed to set endpoint since %s", tstrerror(code));
×
1073
      return code;
×
1074
    }
1075

1076
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1077
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);
45,852✔
1078
    if (code != 0) {
45,852✔
1079
      mError("failed to set login time since %s", tstrerror(code));
×
1080
      return code;
×
1081
    }
1082

1083
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1084
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
45,852✔
1085
    if (code != 0) {
45,852✔
1086
      mError("failed to set last access time since %s", tstrerror(code));
×
1087
      return code;
×
1088
    }
1089

1090
    char userApp[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
45,852✔
1091
    STR_TO_VARSTR(userApp, pConn->userApp);
45,852✔
1092
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1093
    code = colDataSetVal(pColInfo, numOfRows, (const char *)userApp, false);
45,852✔
1094
    if (code != 0) {
45,852✔
1095
      mError("failed to set user app since %s", tstrerror(code));
×
1096
      return code;
×
1097
    }
1098

1099
    char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
45,852✔
1100
    getUserIpFromConnObj(pConn, userIp);
45,852✔
1101

1102
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1103
    code = colDataSetVal(pColInfo, numOfRows, (const char *)userIp, false);
45,852✔
1104
    if (code != 0) {
45,852✔
1105
      mError("failed to set user ip since %s", tstrerror(code));
×
1106
      return code;
×
1107
    }
1108

1109
    char ver[TSDB_VERSION_LEN + VARSTR_HEADER_SIZE];
45,852✔
1110
    STR_TO_VARSTR(ver, pConn->sVer);
45,852✔
1111
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1112
    code = colDataSetVal(pColInfo, numOfRows, (const char *)ver, false);
45,852✔
1113
    if (code != 0) {
45,852✔
1114
      mError("failed to set ver since %s", tstrerror(code));
×
1115
      return code;
×
1116
    }
1117

1118
    char cInfo[CONNECTOR_INFO_LEN + VARSTR_HEADER_SIZE];
45,852✔
1119
    STR_TO_VARSTR(cInfo, pConn->cInfo);
45,852✔
1120
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,852✔
1121
    code = colDataSetVal(pColInfo, numOfRows, (const char *)cInfo, false);
45,852✔
1122
    if (code != 0) {
45,852✔
1123
      mError("failed to set connector info since %s", tstrerror(code));
×
1124
      return code;
×
1125
    }
1126
    numOfRows++;
45,852✔
1127
  }
1128

1129
  pShow->numOfRows += numOfRows;
17,364✔
1130
  return numOfRows;
17,364✔
1131
}
1132

1133
/**
1134
 * @param pConn the conn queries pack from
1135
 * @param[out] pBlock the block data packed into
1136
 * @param offset skip [offset] queries in pConn
1137
 * @param rowsToPack at most rows to pack
1138
 * @return rows packed
1139
 */
1140
static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBlock *pBlock, uint32_t offset,
43,325✔
1141
                                    uint32_t rowsToPack) {
1142
  int32_t cols = 0;
43,325✔
1143
  int32_t code = 0;
43,325✔
1144
  taosRLockLatch(&pConn->queryLock);
43,325✔
1145
  int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
43,325✔
1146
  if (NULL == pConn->pQueries || numOfQueries <= offset) {
43,325✔
1147
    taosRUnLockLatch(&pConn->queryLock);
40,091✔
1148
    return 0;
40,091✔
1149
  }
1150

1151
  int32_t i = offset;
3,234✔
1152
  for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) {
6,468✔
1153
    int32_t     curRowIndex = pBlock->info.rows;
3,234✔
1154
    SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
3,234✔
1155
    cols = 0;
3,234✔
1156

1157
    char queryId[26 + VARSTR_HEADER_SIZE] = {0};
3,234✔
1158
    (void)tsnprintf(&queryId[VARSTR_HEADER_SIZE], sizeof(queryId) - VARSTR_HEADER_SIZE, "%x:%" PRIx64, pConn->id,
3,234✔
1159
                    pQuery->reqRid);
1160
    varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
3,234✔
1161
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1162
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)queryId, false);
3,234✔
1163
    if (code != 0) {
3,234✔
1164
      mError("failed to set query id:%s since %s", queryId, tstrerror(code));
×
1165
      taosRUnLockLatch(&pConn->queryLock);
×
1166
      return code;
×
1167
    }
1168

1169
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1170
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->queryId, false);
3,234✔
1171
    if (code != 0) {
3,234✔
1172
      mError("failed to set query id:%" PRIx64 " since %s", pQuery->queryId, tstrerror(code));
×
1173
      taosRUnLockLatch(&pConn->queryLock);
×
1174
      return code;
×
1175
    }
1176

1177
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1178
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->id, false);
3,234✔
1179
    if (code != 0) {
3,234✔
1180
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
1181
      taosRUnLockLatch(&pConn->queryLock);
×
1182
      return code;
×
1183
    }
1184

1185
    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
3,234✔
1186
    STR_TO_VARSTR(app, pConn->app);
3,234✔
1187
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1188
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)app, false);
3,234✔
1189
    if (code != 0) {
3,234✔
1190
      mError("failed to set app since %s", tstrerror(code));
×
1191
      taosRUnLockLatch(&pConn->queryLock);
×
1192
      return code;
×
1193
    }
1194

1195
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1196
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->pid, false);
3,234✔
1197
    if (code != 0) {
3,234✔
1198
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
1199
      taosRUnLockLatch(&pConn->queryLock);
×
1200
      return code;
×
1201
    }
1202

1203
    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
3,234✔
1204
    STR_TO_VARSTR(user, pConn->user);
3,234✔
1205
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1206
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)user, false);
3,234✔
1207
    if (code != 0) {
3,234✔
1208
      mError("failed to set user since %s", tstrerror(code));
×
1209
      taosRUnLockLatch(&pConn->queryLock);
×
1210
      return code;
×
1211
    }
1212

1213
    char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
3,234✔
1214
    char buf[IP_RESERVE_CAP] = {0};
3,234✔
1215
    (void)tsnprintf(buf, sizeof(buf), "%s:%d", IP_ADDR_STR(&pConn->addr), pConn->addr.port);
3,234✔
1216
    STR_TO_VARSTR(endpoint, buf);
3,234✔
1217
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1218
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)endpoint, false);
3,234✔
1219
    if (code != 0) {
3,234✔
1220
      mError("failed to set endpoint since %s", tstrerror(code));
×
1221
      taosRUnLockLatch(&pConn->queryLock);
×
1222
      return code;
×
1223
    }
1224

1225
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1226
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stime, false);
3,234✔
1227
    if (code != 0) {
3,234✔
1228
      mError("failed to set start time since %s", tstrerror(code));
×
1229
      taosRUnLockLatch(&pConn->queryLock);
×
1230
      return code;
×
1231
    }
1232

1233
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1234
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->useconds, false);
3,234✔
1235
    if (code != 0) {
3,234✔
1236
      mError("failed to set useconds since %s", tstrerror(code));
×
1237
      taosRUnLockLatch(&pConn->queryLock);
×
1238
      return code;
×
1239
    }
1240

1241
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1242
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stableQuery, false);
3,234✔
1243
    if (code != 0) {
3,234✔
1244
      mError("failed to set stable query since %s", tstrerror(code));
×
1245
      taosRUnLockLatch(&pConn->queryLock);
×
1246
      return code;
×
1247
    }
1248

1249
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1250
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->isSubQuery, false);
3,234✔
1251
    if (code != 0) {
3,234✔
1252
      mError("failed to set sub query since %s", tstrerror(code));
×
1253
      taosRUnLockLatch(&pConn->queryLock);
×
1254
      return code;
×
1255
    }
1256

1257
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1258
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false);
3,234✔
1259
    if (code != 0) {
3,234✔
1260
      mError("failed to set sub plan num since %s", tstrerror(code));
×
1261
      taosRUnLockLatch(&pConn->queryLock);
×
1262
      return code;
×
1263
    }
1264

1265
    char    subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
3,234✔
1266
    int64_t reserve = 64;
3,234✔
1267
    int32_t strSize = sizeof(subStatus);
3,234✔
1268
    int32_t offset = VARSTR_HEADER_SIZE;
3,234✔
1269
    for (int32_t i = 0; i < pQuery->subPlanNum && offset + reserve < strSize; ++i) {
6,468✔
1270
      if (i) {
3,234✔
1271
        offset += tsnprintf(subStatus + offset, sizeof(subStatus) - offset, ",");
×
1272
      }
1273
      if (offset + reserve < strSize) {
3,234✔
1274
        SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
3,234✔
1275
        offset +=
3,234✔
1276
            tsnprintf(subStatus + offset, sizeof(subStatus) - offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
3,234✔
1277
      } else {
1278
        break;
×
1279
      }
1280
    }
1281
    varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
3,234✔
1282
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1283
    code = colDataSetVal(pColInfo, curRowIndex, subStatus, (varDataLen(subStatus) == 0) ? true : false);
3,234✔
1284
    if (code != 0) {
3,234✔
1285
      mError("failed to set sub status since %s", tstrerror(code));
×
1286
      taosRUnLockLatch(&pConn->queryLock);
×
1287
      return code;
×
1288
    }
1289

1290
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
3,234✔
1291
    STR_TO_VARSTR(sql, pQuery->sql);
3,234✔
1292
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1293
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)sql, false);
3,234✔
1294
    if (code != 0) {
3,234✔
1295
      mError("failed to set sql since %s", tstrerror(code));
×
1296
      taosRUnLockLatch(&pConn->queryLock);
×
1297
      return code;
×
1298
    }
1299

1300
    char userApp[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
3,234✔
1301
    STR_TO_VARSTR(userApp, pConn->userApp);
3,234✔
1302
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1303
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)userApp, false);
3,234✔
1304
    if (code != 0) {
3,234✔
1305
      mError("failed to set user app since %s", tstrerror(code));
×
1306
      taosRUnLockLatch(&pConn->queryLock);
×
1307
      return code;
×
1308
    }
1309

1310
    char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
3,234✔
1311
    getUserIpFromConnObj(pConn, userIp);
3,234✔
1312

1313
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,234✔
1314
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)userIp, false);
3,234✔
1315
    if (code != 0) {
3,234✔
1316
      mError("failed to set user ip since %s", tstrerror(code));
×
1317
      taosRUnLockLatch(&pConn->queryLock);
×
1318
      return code;
×
1319
    }
1320

1321
    pBlock->info.rows++;
3,234✔
1322
  }
1323

1324
  taosRUnLockLatch(&pConn->queryLock);
3,234✔
1325
  return i - offset;
3,234✔
1326
}
1327

1328
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6,586✔
1329
  SMnode   *pMnode = pReq->info.node;
6,586✔
1330
  SSdb     *pSdb = pMnode->pSdb;
6,586✔
1331
  int32_t   numOfRows = 0;
6,586✔
1332
  SConnObj *pConn = NULL;
6,586✔
1333

1334
  if (pShow->pIter == NULL) {
6,586✔
1335
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
6,586✔
1336
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
6,586✔
1337
    if (!pShow->pIter) return terrno;
6,586✔
1338
  }
1339

1340
  // means fetched some data last time for this conn
1341
  if (pShow->curIterPackedRows > 0) {
6,586✔
1342
    size_t len = 0;
×
1343
    pConn = taosCacheIterGetData(pShow->pIter, &len);
×
1344
    if (pConn && (taosArrayGetSize(pConn->pQueries) > pShow->curIterPackedRows)) {
×
1345
      numOfRows = packQueriesIntoBlock(pShow, pConn, pBlock, pShow->curIterPackedRows, rows);
×
1346
      pShow->curIterPackedRows += numOfRows;
×
1347
    }
1348
  }
1349

1350
  while (numOfRows < rows) {
49,911✔
1351
    pConn = mndGetNextConn(pMnode, pShow->pIter);
49,911✔
1352
    if (pConn == NULL) {
49,911✔
1353
      pShow->pIter = NULL;
6,586✔
1354
      break;
6,586✔
1355
    }
1356

1357
    int32_t packedRows = packQueriesIntoBlock(pShow, pConn, pBlock, 0, rows - numOfRows);
43,325✔
1358
    pShow->curIterPackedRows = packedRows;
43,325✔
1359
    numOfRows += packedRows;
43,325✔
1360
  }
1361
  pShow->numOfRows += numOfRows;
6,586✔
1362
  return numOfRows;
6,586✔
1363
}
1364

1365
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6,108✔
1366
  SMnode  *pMnode = pReq->info.node;
6,108✔
1367
  SSdb    *pSdb = pMnode->pSdb;
6,108✔
1368
  int32_t  numOfRows = 0;
6,108✔
1369
  int32_t  cols = 0;
6,108✔
1370
  SAppObj *pApp = NULL;
6,108✔
1371
  int32_t  code = 0;
6,108✔
1372

1373
  if (pShow->pIter == NULL) {
6,108✔
1374
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
6,108✔
1375
    pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
6,108✔
1376
    if (!pShow->pIter) return terrno;
6,108✔
1377
  }
1378

1379
  while (numOfRows < rows) {
12,216✔
1380
    pApp = mndGetNextApp(pMnode, pShow->pIter);
12,216✔
1381
    if (pApp == NULL) {
12,216✔
1382
      pShow->pIter = NULL;
6,108✔
1383
      break;
6,108✔
1384
    }
1385

1386
    cols = 0;
6,108✔
1387

1388
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1389
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->appId, false);
6,108✔
1390
    if (code != 0) {
6,108✔
1391
      mError("failed to set app id since %s", tstrerror(code));
×
1392
      return code;
×
1393
    }
1394

1395
    char ip[TD_IP_LEN + VARSTR_HEADER_SIZE] = {0};
6,108✔
1396
    char buf[IP_RESERVE_CAP] = {0};
6,108✔
1397
    snprintf(buf, sizeof(buf), "%s", IP_ADDR_STR(&pApp->cliAddr));
6,108✔
1398
    STR_TO_VARSTR(ip, buf);
6,108✔
1399

1400
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1401
    code = colDataSetVal(pColInfo, numOfRows, (const char *)ip, false);
6,108✔
1402
    if (code != 0) {
6,108✔
1403
      mError("failed to set ip since %s", tstrerror(code));
×
1404
      return code;
×
1405
    }
1406

1407
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1408
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->pid, false);
6,108✔
1409
    if (code != 0) {
6,108✔
1410
      mError("failed to set pid since %s", tstrerror(code));
×
1411
      return code;
×
1412
    }
1413

1414
    char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
6,108✔
1415
    (void)tsnprintf(&name[VARSTR_HEADER_SIZE], sizeof(name) - VARSTR_HEADER_SIZE, "%s", pApp->name);
6,108✔
1416
    varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
6,108✔
1417
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1418
    code = colDataSetVal(pColInfo, numOfRows, (const char *)name, false);
6,108✔
1419
    if (code != 0) {
6,108✔
1420
      mError("failed to set app name since %s", tstrerror(code));
×
1421
      return code;
×
1422
    }
1423

1424
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1425
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->startTime, false);
6,108✔
1426
    if (code != 0) {
6,108✔
1427
      mError("failed to set start time since %s", tstrerror(code));
×
1428
      return code;
×
1429
    }
1430

1431
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1432
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertsReq, false);
6,108✔
1433
    if (code != 0) {
6,108✔
1434
      mError("failed to set insert req since %s", tstrerror(code));
×
1435
      return code;
×
1436
    }
1437

1438
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1439
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertRows, false);
6,108✔
1440
    if (code != 0) {
6,108✔
1441
      mError("failed to set insert rows since %s", tstrerror(code));
×
1442
      return code;
×
1443
    }
1444

1445
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1446
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.insertElapsedTime, false);
6,108✔
1447
    if (code != 0) {
6,108✔
1448
      mError("failed to set insert elapsed time since %s", tstrerror(code));
×
1449
      return code;
×
1450
    }
1451

1452
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1453
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.insertBytes, false);
6,108✔
1454
    if (code != 0) {
6,108✔
1455
      mError("failed to set insert bytes since %s", tstrerror(code));
×
1456
      return code;
×
1457
    }
1458

1459
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1460
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.fetchBytes, false);
6,108✔
1461
    if (code != 0) {
6,108✔
1462
      mError("failed to set fetch bytes since %s", tstrerror(code));
×
1463
      return code;
×
1464
    }
1465

1466
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1467
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.queryElapsedTime, false);
6,108✔
1468
    if (code != 0) {
6,108✔
1469
      mError("failed to set query elapsed time since %s", tstrerror(code));
×
1470
      return code;
×
1471
    }
1472

1473
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1474
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfSlowQueries, false);
6,108✔
1475
    if (code != 0) {
6,108✔
1476
      mError("failed to set slow queries since %s", tstrerror(code));
×
1477
      return code;
×
1478
    }
1479

1480
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1481
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.totalRequests, false);
6,108✔
1482
    if (code != 0) {
6,108✔
1483
      mError("failed to set total requests since %s", tstrerror(code));
×
1484
      return code;
×
1485
    }
1486

1487
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1488
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.currentRequests, false);
6,108✔
1489
    if (code != 0) {
6,108✔
1490
      mError("failed to set current requests since %s", tstrerror(code));
×
1491
      return code;
×
1492
    }
1493

1494
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,108✔
1495
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->lastAccessTimeMs, false);
6,108✔
1496
    if (code != 0) {
6,108✔
1497
      mError("failed to set last access time since %s", tstrerror(code));
×
1498
      return code;
×
1499
    }
1500

1501
    numOfRows++;
6,108✔
1502
  }
1503

1504
  pShow->numOfRows += numOfRows;
6,108✔
1505
  return numOfRows;
6,108✔
1506
}
1507

1508
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
×
1509
  if (pIter != NULL) {
×
1510
    taosCacheDestroyIter(pIter);
×
1511
  }
1512
}
×
1513

1514
int32_t mndGetNumOfConnections(SMnode *pMnode) {
146✔
1515
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
146✔
1516
  return taosCacheGetNumOfObj(pMgmt->connCache);
146✔
1517
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc