• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.58
/source/dnode/mnode/impl/src/mndProfile.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndMnode.h"
21
#include "mndPrivilege.h"
22
#include "mndProfile.h"
23
#include "mndQnode.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndUser.h"
28
#include "mndView.h"
29
#include "tglobal.h"
30
#include "tversion.h"
31

32
typedef struct {
33
  uint32_t id;
34
  int8_t   connType;
35
  char     user[TSDB_USER_LEN];
36
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
37
  int64_t  appStartTimeMs;          // app start time
38
  int32_t  pid;                     // pid of app that invokes taosc
39
  uint32_t ip;
40
  uint16_t port;
41
  int8_t   killed;
42
  int64_t  loginTimeMs;
43
  int64_t  lastAccessTimeMs;
44
  uint64_t killId;
45
  int32_t  numOfQueries;
46
  SRWLatch queryLock;
47
  SArray  *pQueries;  // SArray<SQueryDesc>
48
  char     userApp[TSDB_APP_NAME_LEN];
49
  uint32_t userIp;
50
} SConnObj;
51

52
typedef struct {
53
  int64_t            appId;
54
  uint32_t           ip;
55
  int32_t            pid;
56
  char               name[TSDB_APP_NAME_LEN];
57
  int64_t            startTime;
58
  SAppClusterSummary summary;
59
  int64_t            lastAccessTimeMs;
60
} SAppObj;
61

62
typedef struct {
63
  int32_t totalDnodes;
64
  int32_t onlineDnodes;
65
  SEpSet  epSet;
66
  SArray *pQnodeList;
67
  int64_t ipWhiteListVer;
68
} SConnPreparedObj;
69

70
#define CACHE_OBJ_KEEP_TIME 3  // s
71

72
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
73
                               int32_t pid, const char *app, int64_t startTime);
74
static void      mndFreeConn(SConnObj *pConn);
75
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
76
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn, bool extendLifespan);
77
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
78
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
79
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
80
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
81
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
82
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
83
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
84
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
85
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
86
static void      mndFreeApp(SAppObj *pApp);
87
static int32_t   mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
88
static void      mndCancelGetNextApp(SMnode *pMnode, void *pIter);
89
static int32_t   mndProcessSvrVerReq(SRpcMsg *pReq);
90

91
int32_t mndInitProfile(SMnode *pMnode) {
1,748✔
92
  int32_t       code = 0;
1,748✔
93
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
1,748✔
94

95
  // in ms
96
  int32_t checkTime = CACHE_OBJ_KEEP_TIME * 1000;
1,748✔
97
  pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, false, (__cache_free_fn_t)mndFreeConn, "conn");
1,748✔
98
  if (pMgmt->connCache == NULL) {
1,748!
99
    code = TSDB_CODE_OUT_OF_MEMORY;
×
100
    mError("failed to alloc profile cache since %s", terrstr());
×
101
    TAOS_RETURN(code);
×
102
  }
103

104
  pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
1,748✔
105
  if (pMgmt->appCache == NULL) {
1,748!
106
    code = TSDB_CODE_OUT_OF_MEMORY;
×
107
    mError("failed to alloc profile cache since %s", terrstr());
×
108
    TAOS_RETURN(code);
×
109
  }
110

111
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
1,748✔
112
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
1,748✔
113
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
1,748✔
114
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
1,748✔
115
  mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
1,748✔
116

117
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
1,748✔
118
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
1,748✔
119
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
1,748✔
120
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
1,748✔
121
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
1,748✔
122
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
1,748✔
123

124
  TAOS_RETURN(code);
1,748✔
125
}
126

127
void mndCleanupProfile(SMnode *pMnode) {
1,747✔
128
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
1,747✔
129
  if (pMgmt->connCache != NULL) {
1,747!
130
    taosCacheCleanup(pMgmt->connCache);
1,747✔
131
    pMgmt->connCache = NULL;
1,747✔
132
  }
133

134
  if (pMgmt->appCache != NULL) {
1,747!
135
    taosCacheCleanup(pMgmt->appCache);
1,747✔
136
    pMgmt->appCache = NULL;
1,747✔
137
  }
138
}
1,747✔
139

140
static void setUserInfo2Conn(SConnObj *connObj, char *userApp, uint32_t userIp) {
557,684✔
141
  if (connObj == NULL) {
557,684!
142
    return;
×
143
  }
144
  tstrncpy(connObj->userApp, userApp, sizeof(connObj->userApp));
557,684✔
145
  connObj->userIp = userIp;
557,684✔
146
}
147
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
99,075✔
148
                               int32_t pid, const char *app, int64_t startTime) {
149
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
99,075✔
150

151
  char     connStr[255] = {0};
99,075✔
152
  int32_t  len = tsnprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
99,075✔
153
  uint32_t connId = mndGenerateUid(connStr, len);
99,075✔
154
  if (startTime == 0) startTime = taosGetTimestampMs();
100,664✔
155

156
  SConnObj connObj = {
99,075✔
157
      .id = connId,
158
      .connType = connType,
159
      .appStartTimeMs = startTime,
160
      .pid = pid,
161
      .ip = ip,
162
      .port = port,
163
      .killed = 0,
164
      .loginTimeMs = taosGetTimestampMs(),
99,075✔
165
      .lastAccessTimeMs = 0,
166
      .killId = 0,
167
      .numOfQueries = 0,
168
      .pQueries = NULL,
169
  };
170

171
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
99,075✔
172
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
99,075✔
173
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
99,075✔
174

175
  SConnObj *pConn =
176
      taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), CACHE_OBJ_KEEP_TIME * 1000);
99,075✔
177
  if (pConn == NULL) {
99,075!
178
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
179
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
×
180
    return NULL;
×
181
  } else {
182
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
99,075✔
183
    return pConn;
99,075✔
184
  }
185
}
186

187
static void mndFreeConn(SConnObj *pConn) {
99,075✔
188
  taosWLockLatch(&pConn->queryLock);
99,075✔
189
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
99,075✔
190
  taosWUnLockLatch(&pConn->queryLock);
99,075✔
191

192
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
99,075✔
193
}
99,075✔
194

195
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
557,714✔
196
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
557,714✔
197

198
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
557,714✔
199
  if (pConn == NULL) {
557,858✔
200
    mDebug("conn:%u, already destroyed", connId);
1,587✔
201
    return NULL;
1,587✔
202
  }
203

204
  pConn->lastAccessTimeMs = taosGetTimestampMs();
556,262✔
205
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
556,262✔
206
  return pConn;
556,110✔
207
}
208

209
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn, bool extendLifespan) {
655,291✔
210
  if (pConn == NULL) return;
655,291✔
211
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
655,282✔
212

213
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
655,282✔
214
  if (extendLifespan) taosCacheTryExtendLifeSpan(pMgmt->connCache, (void **)&pConn);
655,282!
215
  taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
655,247✔
216
}
217

218
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
996,843✔
219
  SConnObj *pConn = NULL;
996,843✔
220
  bool      hasNext = taosCacheIterNext(pIter);
996,843✔
221
  if (hasNext) {
998,777✔
222
    size_t dataLen = 0;
980,722✔
223
    pConn = taosCacheIterGetData(pIter, &dataLen);
980,722✔
224
  } else {
225
    taosCacheDestroyIter(pIter);
18,055✔
226
  }
227

228
  return pConn;
998,700✔
229
}
230

231
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
×
232
  if (pIter != NULL) {
×
233
    taosCacheDestroyIter(pIter);
×
234
  }
235
}
×
236

237
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
97,497✔
238
  SMnode         *pMnode = pReq->info.node;
97,497✔
239
  SUserObj       *pUser = NULL;
97,497✔
240
  SDbObj         *pDb = NULL;
97,497✔
241
  SConnObj       *pConn = NULL;
97,497✔
242
  int32_t         code = 0;
97,497✔
243
  SConnectReq     connReq = {0};
97,497✔
244
  char            ip[TD_IP_LEN] = {0};
97,497✔
245
  const STraceId *trace = &pReq->info.traceId;
97,497✔
246

247
  if ((code = tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq)) != 0) {
97,497!
248
    goto _OVER;
×
249
  }
250

251
  if ((code = taosCheckVersionCompatibleFromStr(connReq.sVer, td_version, 3)) != 0) {
97,497!
252
    mGError("version not compatible. client version: %s, server version: %s", connReq.sVer, td_version);
×
253
    goto _OVER;
×
254
  }
255

256
  taosInetNtoa(ip, pReq->info.conn.clientIp);
97,497✔
257
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT)) != 0) {
97,497✔
258
    mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, tstrerror(code));
1!
259
    goto _OVER;
1✔
260
  }
261

262
  code = mndAcquireUser(pMnode, pReq->info.conn.user, &pUser);
97,496✔
263
  if (pUser == NULL) {
97,496!
264
    mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, tstrerror(code));
×
265
    goto _OVER;
×
266
  }
267

268
  if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0 && !tsMndSkipGrant) {
97,496!
269
    mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd);
4!
270
    code = TSDB_CODE_MND_AUTH_FAILURE;
4✔
271
    goto _OVER;
4✔
272
  }
273

274
  if (connReq.db[0]) {
97,492✔
275
    char db[TSDB_DB_FNAME_LEN] = {0};
29✔
276
    (void)snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
29✔
277
    pDb = mndAcquireDb(pMnode, db);
29✔
278
    if (pDb == NULL) {
29✔
279
      if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) &&
8✔
280
          (0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) {
6✔
281
        code = TSDB_CODE_MND_DB_NOT_EXIST;
4✔
282
        mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
4!
283
                tstrerror(code));
284
        goto _OVER;
4✔
285
      }
286
    }
287

288
    TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb), NULL, _OVER);
25!
289
  }
290

291
  pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
97,488✔
292
                        pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
97,488✔
293
  if (pConn == NULL) {
97,488!
294
    code = terrno;
×
295
    mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip,
×
296
            tstrerror(code));
297
    goto _OVER;
×
298
  }
299

300
  SConnectRsp connectRsp = {0};
97,488✔
301
  connectRsp.acctId = pUser->acctId;
97,488✔
302
  connectRsp.superUser = pUser->superUser;
97,488✔
303
  connectRsp.sysInfo = pUser->sysInfo;
97,488✔
304
  connectRsp.clusterId = pMnode->clusterId;
97,488✔
305
  connectRsp.connId = pConn->id;
97,488✔
306
  connectRsp.connType = connReq.connType;
97,488✔
307
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
97,488✔
308
  connectRsp.svrTimestamp = taosGetTimestampSec();
97,488✔
309
  connectRsp.passVer = pUser->passVersion;
97,488✔
310
  connectRsp.authVer = pUser->authVersion;
97,488✔
311
  connectRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
97,488✔
312
  connectRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
97,488✔
313
  connectRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
97,488✔
314
  connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
97,488✔
315
  connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
97,488✔
316
  connectRsp.enableAuditDelete = tsEnableAuditDelete;
97,488✔
317
  tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
97,488✔
318
  connectRsp.whiteListVer = pUser->ipWhiteListVer;
97,488✔
319

320
  tstrncpy(connectRsp.sVer, td_version, sizeof(connectRsp.sVer));
97,488✔
321
  (void)snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", td_version,
97,488✔
322
                 td_buildinfo, td_gitinfo);
323
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
97,488✔
324

325
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
97,488✔
326
  if (contLen < 0) {
97,487!
327
    TAOS_CHECK_GOTO(contLen, NULL, _OVER);
×
328
  }
329
  void *pRsp = rpcMallocCont(contLen);
97,487✔
330
  if (pRsp == NULL) {
97,488!
331
    TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
332
  }
333

334
  contLen = tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
97,488✔
335
  if (contLen < 0) {
97,488!
336
    rpcFreeCont(pRsp);
×
337
    TAOS_CHECK_GOTO(contLen, NULL, _OVER);
×
338
  }
339

340
  pReq->info.rspLen = contLen;
97,488✔
341
  pReq->info.rsp = pRsp;
97,488✔
342

343
  mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
97,488!
344

345
  code = 0;
97,488✔
346

347
  char    detail[1000] = {0};
97,488✔
348
  int32_t nBytes = snprintf(detail, sizeof(detail), "app:%s", connReq.app);
97,488✔
349
  if ((uint32_t)nBytes < sizeof(detail)) {
97,488!
350
    auditRecord(pReq, pMnode->clusterId, "login", "", "", detail, strlen(detail));
97,488✔
351
  } else {
352
    mError("failed to audit logic since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
353
  }
354

355
_OVER:
×
356

357
  mndReleaseUser(pMnode, pUser);
97,496✔
358
  mndReleaseDb(pMnode, pDb);
97,497✔
359
  mndReleaseConn(pMnode, pConn, true);
97,497✔
360

361
  TAOS_RETURN(code);
97,497✔
362
}
363

364
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
557,546✔
365
  taosWLockLatch(&pConn->queryLock);
557,546✔
366

367
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
557,758✔
368

369
  pConn->pQueries = pBasic->queryDesc;
557,758✔
370
  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
557,758✔
371
  pBasic->queryDesc = NULL;
557,758✔
372

373
  mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries);
557,758✔
374

375
  taosWUnLockLatch(&pConn->queryLock);
557,758✔
376

377
  return TSDB_CODE_SUCCESS;
557,820✔
378
}
379

380
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
2,179✔
381
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2,179✔
382

383
  SAppObj app;
384
  app.appId = pReq->appId;
2,179✔
385
  app.ip = clientIp;
2,179✔
386
  app.pid = pReq->pid;
2,179✔
387
  tstrncpy(app.name, pReq->name, sizeof(app.name));
2,179✔
388
  app.startTime = pReq->startTime;
2,179✔
389
  (void)memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
2,179✔
390
  app.lastAccessTimeMs = taosGetTimestampMs();
2,179✔
391

392
  SAppObj *pApp =
393
      taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), CACHE_OBJ_KEEP_TIME * 1000);
2,179✔
394
  if (pApp == NULL) {
2,179!
395
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
396
    mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
×
397
    return NULL;
×
398
  }
399

400
  mTrace("app %" PRIx64 " is put into cache", pReq->appId);
2,179✔
401
  return pApp;
2,179✔
402
}
403

404
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
2,179✔
405

406
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
557,783✔
407
  terrno = 0;
557,783✔
408
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
557,821✔
409

410
  SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
557,821✔
411
  if (pApp == NULL) {
557,843✔
412
    mDebug("app %" PRIx64 " not in cache", appId);
2,179✔
413
    return NULL;
2,179✔
414
  }
415

416
  pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();
555,659✔
417

418
  mTrace("app %" PRIx64 " acquired from cache", appId);
555,659✔
419
  return pApp;
555,655✔
420
}
421

422
static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
557,649✔
423
  if (pApp == NULL) return;
557,649!
424
  mTrace("release app %" PRIx64 " to cache", pApp->appId);
557,649✔
425

426
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
557,649✔
427
  taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
557,649✔
428
}
429

430
SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
260,040✔
431
  SAppObj *pApp = NULL;
260,040✔
432
  bool     hasNext = taosCacheIterNext(pIter);
260,040✔
433
  if (hasNext) {
260,817✔
434
    size_t dataLen = 0;
250,007✔
435
    pApp = taosCacheIterGetData(pIter, &dataLen);
250,007✔
436
  } else {
437
    taosCacheDestroyIter(pIter);
10,810✔
438
  }
439

440
  return pApp;
260,793✔
441
}
442

443
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
×
444
  if (pIter != NULL) {
×
445
    taosCacheDestroyIter(pIter);
×
446
  }
447
}
×
448

449
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
1✔
450
  //
451
  return NULL;
1✔
452
}
453

454
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
557,730✔
455
  int32_t    code = 0;
557,730✔
456
  SAppHbReq *pReq = &pHbReq->app;
557,730✔
457
  SAppObj   *pApp = mndAcquireApp(pMnode, pReq->appId);
557,730✔
458
  if (pApp == NULL) {
557,833✔
459
    pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
2,179✔
460
    if (pApp == NULL) {
2,179!
461
      mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
×
462
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
463
      if (terrno != 0) code = terrno;
×
464
      TAOS_RETURN(code);
×
465
    } else {
466
      mDebug("a new app %" PRIx64 " is created", pReq->appId);
2,179✔
467
      mndReleaseApp(pMnode, pApp);
2,179✔
468
      return TSDB_CODE_SUCCESS;
2,179✔
469
    }
470
  }
471

472
  (void)memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));
555,654✔
473

474
  mndReleaseApp(pMnode, pApp);
555,654✔
475

476
  return TSDB_CODE_SUCCESS;
555,521✔
477
}
478

479
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
450,353✔
480
  SSdb      *pSdb = pMnode->pSdb;
450,353✔
481
  SDnodeObj *pDnode = NULL;
450,353✔
482
  int64_t    curMs = taosGetTimestampMs();
450,427✔
483
  void      *pIter = NULL;
450,427✔
484

485
  while (true) {
491,130✔
486
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
941,557✔
487
    if (pIter == NULL) break;
941,556✔
488

489
    bool online = mndIsDnodeOnline(pDnode, curMs);
491,069✔
490
    if (online) {
490,839✔
491
      (*num)++;
487,420✔
492
    }
493

494
    sdbRelease(pSdb, pDnode);
490,839✔
495
  }
496

497
  return TSDB_CODE_SUCCESS;
450,487✔
498
}
499

500
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
563,723✔
501
                                        SClientHbBatchRsp *pBatchRsp, SConnPreparedObj *pObj) {
502
  int32_t       code = 0;
563,723✔
503
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
563,723✔
504
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
563,723✔
505
  SRpcConnInfo  connInfo = pMsg->info.conn;
563,723✔
506

507
  if (0 != pHbReq->app.appId) {
563,723✔
508
    TAOS_CHECK_RETURN(mndUpdateAppInfo(pMnode, pHbReq, &connInfo));
557,775!
509
  }
510

511
  if (pHbReq->query) {
563,782✔
512
    SQueryHbReqBasic *pBasic = pHbReq->query;
557,774✔
513

514
    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
557,774✔
515
    if (pConn == NULL) {
557,704✔
516
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
1,587✔
517
                            pHbReq->app.pid, pHbReq->app.name, 0);
1,587✔
518
      if (pConn == NULL) {
1,587!
519
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
×
520
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
521
        if (terrno != 0) code = terrno;
×
522
        TAOS_RETURN(code);
×
523
      } else {
524
        mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
1,587✔
525
      }
526
    }
527

528
    setUserInfo2Conn(pConn, pHbReq->userApp, pHbReq->userIp);
557,704✔
529
    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
557,572!
530
    if (rspBasic == NULL) {
557,542!
531
      mndReleaseConn(pMnode, pConn, true);
×
532
      code = terrno;
×
533
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
×
534
      TAOS_RETURN(code);
×
535
    }
536

537
    TAOS_CHECK_RETURN(mndSaveQueryList(pConn, pBasic));
557,542!
538
    if (pConn->killed != 0) {
557,788!
539
      rspBasic->killConnection = 1;
×
540
    }
541

542
    if (pConn->killId != 0) {
557,788!
543
      rspBasic->killRid = pConn->killId;
×
544
      pConn->killId = 0;
×
545
    }
546

547
    rspBasic->connId = pConn->id;
557,788✔
548
    rspBasic->connId = pConn->id;
557,788✔
549
    rspBasic->totalDnodes = pObj->totalDnodes;
557,788✔
550
    rspBasic->onlineDnodes = pObj->onlineDnodes;
557,788✔
551
    rspBasic->epSet = pObj->epSet;
557,788✔
552
    rspBasic->pQnodeList = taosArrayDup(pObj->pQnodeList, NULL);
557,788✔
553

554
    mndReleaseConn(pMnode, pConn, true);
557,819✔
555

556
    hbRsp.query = rspBasic;
557,848✔
557
  } else {
558
    mDebug("no query info in hb msg");
6,008✔
559
  }
560

561
  int32_t kvNum = taosHashGetSize(pHbReq->info);
563,856✔
562
  if (NULL == pHbReq->info || kvNum <= 0) {
563,751!
563
    if (taosArrayPush(pBatchRsp->rsps, &hbRsp) == NULL) {
615,039!
564
      mError("failed to put rsp into array, but continue at this heartbeat");
×
565
    }
566
    return TSDB_CODE_SUCCESS;
307,573✔
567
  }
568

569
  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
256,285✔
570
  if (NULL == hbRsp.info) {
256,290!
571
    mError("taosArrayInit %d rsp kv failed", kvNum);
×
572
    code = terrno;
×
573
    tFreeClientHbRsp(&hbRsp);
574
    TAOS_RETURN(code);
×
575
  }
576

577
#ifdef TD_ENTERPRISE
578
  bool             needCheck = true;
256,290✔
579
  int32_t          key = HEARTBEAT_KEY_DYN_VIEW;
256,290✔
580
  SDynViewVersion *pDynViewVer = NULL;
256,290✔
581
  SKv             *pKv = taosHashGet(pHbReq->info, &key, sizeof(key));
256,290✔
582
  if (NULL != pKv) {
256,276✔
583
    pDynViewVer = pKv->value;
2✔
584
    mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer);
2!
585

586
    SDynViewVersion *pRspVer = NULL;
2✔
587
    if (0 != (code = mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer))) {
2!
588
      TAOS_RETURN(code);
×
589
    }
590

591
    if (needCheck) {
2!
592
      SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pRspVer};
2✔
593
      if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
4!
594
        if (terrno != 0) code = terrno;
×
595
        TAOS_RETURN(code);
×
596
      };
597
      mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs,
2!
598
             pRspVer->dynViewVer);
599
    }
600
  }
601
#endif
602

603
  void *pIter = taosHashIterate(pHbReq->info, NULL);
256,276✔
604
  while (pIter != NULL) {
637,388✔
605
    SKv *kv = pIter;
381,094✔
606

607
    switch (kv->key) {
381,094!
608
      case HEARTBEAT_KEY_USER_AUTHINFO: {
256,261✔
609
        void   *rspMsg = NULL;
256,261✔
610
        int32_t rspLen = 0;
256,261✔
611
        (void)mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen,
256,261✔
612
                                      pObj->ipWhiteListVer);
613
        if (rspMsg && rspLen > 0) {
256,267!
614
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
17,208✔
615
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
34,416!
616
            mError("failed to put kv into array, but continue at this heartbeat");
×
617
          }
618
        }
619
        break;
256,267✔
620
      }
621
      case HEARTBEAT_KEY_DBINFO: {
48,644✔
622
        void   *rspMsg = NULL;
48,644✔
623
        int32_t rspLen = 0;
48,644✔
624
        (void)mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbCacheInfo), &rspMsg, &rspLen);
48,644✔
625
        if (rspMsg && rspLen > 0) {
48,644!
626
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
48,644✔
627
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
97,288!
628
            mError("failed to put kv into array, but continue at this heartbeat");
×
629
          }
630
        }
631
        break;
48,644✔
632
      }
633
      case HEARTBEAT_KEY_STBINFO: {
75,776✔
634
        void   *rspMsg = NULL;
75,776✔
635
        int32_t rspLen = 0;
75,776✔
636
        (void)mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
75,776✔
637
        if (rspMsg && rspLen > 0) {
75,776!
638
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
75,776✔
639
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
151,552!
640
            mError("failed to put kv into array, but continue at this heartbeat");
×
641
          }
642
        }
643
        break;
75,776✔
644
      }
645
#ifdef TD_ENTERPRISE
646
      case HEARTBEAT_KEY_DYN_VIEW: {
2✔
647
        break;
2✔
648
      }
649
      case HEARTBEAT_KEY_VIEWINFO: {
2✔
650
        if (!needCheck) {
2!
651
          break;
×
652
        }
653

654
        void   *rspMsg = NULL;
2✔
655
        int32_t rspLen = 0;
2✔
656
        (void)mndValidateViewInfo(pMnode, kv->value, kv->valueLen / sizeof(SViewVersion), &rspMsg, &rspLen);
2✔
657
        if (rspMsg && rspLen > 0) {
2!
658
          SKv kv1 = {.key = HEARTBEAT_KEY_VIEWINFO, .valueLen = rspLen, .value = rspMsg};
2✔
659
          if (taosArrayPush(hbRsp.info, &kv1) == NULL) {
4!
660
            mError("failed to put kv into array, but continue at this heartbeat");
×
661
          }
662
        }
663
        break;
2✔
664
      }
665
#endif
666
      case HEARTBEAT_KEY_TSMA: {
409✔
667
        void   *rspMsg = NULL;
409✔
668
        int32_t rspLen = 0;
409✔
669
        (void)mndValidateTSMAInfo(pMnode, kv->value, kv->valueLen / sizeof(STSMAVersion), &rspMsg, &rspLen);
409✔
670
        if (rspMsg && rspLen > 0) {
409!
671
          SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = rspLen, .value = rspMsg};
409✔
672
          if (taosArrayPush(hbRsp.info, &kv) == NULL) {
818!
673
            mError("failed to put kv into array, but continue at this heartbeat");
×
674
          }
675
        }
676
        break;
409✔
677
      }
678
      default:
×
679
        mError("invalid kv key:%d", kv->key);
×
680
        hbRsp.status = TSDB_CODE_APP_ERROR;
×
681
        break;
×
682
    }
683

684
    pIter = taosHashIterate(pHbReq->info, pIter);
381,100✔
685
  }
686

687
  if (taosArrayPush(pBatchRsp->rsps, &hbRsp) == NULL) {
512,587!
688
    if (terrno != 0) code = terrno;
×
689
  }
690
  TAOS_RETURN(code);
256,293✔
691
}
692

693
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
450,302✔
694
  int32_t code = 0;
450,302✔
695
  int32_t lino = 0;
450,302✔
696
  SMnode *pMnode = pReq->info.node;
450,302✔
697

698
  SClientHbBatchReq batchReq = {0};
450,302✔
699
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
450,302!
700
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
×
701
    code = TSDB_CODE_INVALID_MSG;
×
702
    TAOS_RETURN(code);
×
703
  }
704

705
  SConnPreparedObj obj = {0};
450,457✔
706
  obj.totalDnodes = mndGetDnodeSize(pMnode);
450,457✔
707
  obj.ipWhiteListVer = batchReq.ipWhiteList;
450,402✔
708
  TAOS_CHECK_RETURN(mndGetOnlineDnodeNum(pMnode, &obj.onlineDnodes));
450,402!
709
  mndGetMnodeEpSet(pMnode, &obj.epSet);
450,485✔
710
  TAOS_CHECK_RETURN(mndCreateQnodeList(pMnode, &obj.pQnodeList, -1));
450,505!
711

712
  SClientHbBatchRsp batchRsp = {0};
450,506✔
713
  batchRsp.svrTimestamp = taosGetTimestampSec();
450,506✔
714
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
450,466✔
715
  if (batchRsp.rsps == NULL) {
450,489!
716
    TAOS_CHECK_EXIT(terrno);
×
717
  }
718
  batchRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
450,489✔
719
  batchRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
450,489✔
720
  batchRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
450,489✔
721
  tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
450,489✔
722
  batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
450,489✔
723
  batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
450,489✔
724
  batchRsp.enableAuditDelete = tsEnableAuditDelete;
450,489✔
725
  batchRsp.enableStrongPass = tsEnableStrongPassword;
450,489✔
726

727
  int32_t sz = taosArrayGetSize(batchReq.reqs);
450,489✔
728
  for (int i = 0; i < sz; i++) {
1,014,258✔
729
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
563,767✔
730
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
563,716!
731
      TAOS_CHECK_EXIT(mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp, &obj));
563,717!
UNCOV
732
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
×
733
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
1✔
734
      if (pRsp != NULL) {
1!
735
        if (taosArrayPush(batchRsp.rsps, pRsp) == NULL) {
×
736
          mError("failed to put kv into array, but continue at this heartbeat");
×
737
        }
738
        taosMemoryFree(pRsp);
×
739
      }
740
    }
741
  }
742
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
450,491✔
743

744
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
450,475✔
745
  if (tlen < 0) {
449,037!
746
    TAOS_CHECK_EXIT(tlen);
×
747
  }
748
  void *buf = rpcMallocCont(tlen);
449,037✔
749
  if (!buf) {
449,814!
750
    TAOS_CHECK_EXIT(terrno);
×
751
  }
752
  tlen = tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
449,814✔
753
  if (tlen < 0) {
450,353✔
754
    rpcFreeCont(buf);
126✔
755
    TAOS_CHECK_EXIT(tlen);
×
756
  }
757
  pReq->info.rspLen = tlen;
450,227✔
758
  pReq->info.rsp = buf;
450,227✔
759
_exit:
450,227✔
760
  tFreeClientHbBatchRsp(&batchRsp);
761

762
  taosArrayDestroy(obj.pQnodeList);
450,360✔
763

764
  TAOS_RETURN(code);
450,397✔
765
}
766

767
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
1✔
768
  int32_t       code = 0;
1✔
769
  SMnode       *pMnode = pReq->info.node;
1✔
770
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
1✔
771

772
  SKillQueryReq killReq = {0};
1✔
773
  TAOS_CHECK_RETURN(tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq));
1!
774

775
  mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
1!
776
  TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY));
1!
777

778
  int32_t  connId = 0;
1✔
779
  uint64_t queryId = 0;
1✔
780
  char    *p = strchr(killReq.queryStrId, ':');
1✔
781
  if (NULL == p) {
1!
782
    mError("invalid QID:%s", killReq.queryStrId);
×
783
    code = TSDB_CODE_MND_INVALID_QUERY_ID;
×
784
    TAOS_RETURN(code);
×
785
  }
786
  *p = 0;
1✔
787
  connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
1✔
788
  queryId = taosStr2UInt64(p + 1, NULL, 16);
1✔
789

790
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
1✔
791
  if (pConn == NULL) {
1!
792
    mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
1!
793
    code = TSDB_CODE_MND_INVALID_CONN_ID;
1✔
794
    TAOS_RETURN(code);
1✔
795
  } else {
796
    mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
×
797
    pConn->killId = queryId;
×
798
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
×
799
    TAOS_RETURN(code);
×
800
  }
801
}
802

803
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
2✔
804
  int32_t       code = 0;
2✔
805
  SMnode       *pMnode = pReq->info.node;
2✔
806
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2✔
807

808
  SKillConnReq killReq = {0};
2✔
809
  TAOS_CHECK_RETURN(tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq));
2!
810

811
  TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN));
2✔
812

813
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
1✔
814
  if (pConn == NULL) {
1!
815
    mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
1!
816
    code = TSDB_CODE_MND_INVALID_CONN_ID;
1✔
817
    TAOS_RETURN(code);
1✔
818
  } else {
819
    mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
×
820
    pConn->killed = 1;
×
821
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
×
822
    TAOS_RETURN(code);
×
823
  }
824
}
825

826
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
×
827
  int32_t       code = 0;
×
828
  int32_t       lino = 0;
×
829
  SServerVerRsp rsp = {0};
×
830
  tstrncpy(rsp.ver, td_version, sizeof(rsp.ver));
×
831

832
  int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
×
833
  if (contLen < 0) {
×
834
    TAOS_CHECK_EXIT(contLen);
×
835
  }
836
  void *pRsp = rpcMallocCont(contLen);
×
837
  if (pRsp == NULL) {
×
838
    TAOS_CHECK_EXIT(terrno);
×
839
  }
840
  contLen = tSerializeSServerVerRsp(pRsp, contLen, &rsp);
×
841
  if (contLen < 0) {
×
842
    rpcFreeCont(pRsp);
×
843
    TAOS_CHECK_EXIT(contLen);
×
844
  }
845

846
  pReq->info.rspLen = contLen;
×
847
  pReq->info.rsp = pRsp;
×
848

849
_exit:
×
850

851
  TAOS_RETURN(code);
×
852
}
853

854
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6,520✔
855
  SMnode   *pMnode = pReq->info.node;
6,520✔
856
  SSdb     *pSdb = pMnode->pSdb;
6,520✔
857
  int32_t   numOfRows = 0;
6,520✔
858
  int32_t   cols = 0;
6,520✔
859
  int32_t   code = 0;
6,520✔
860
  SConnObj *pConn = NULL;
6,520✔
861

862
  if (pShow->pIter == NULL) {
6,520!
863
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
6,523✔
864
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
6,523✔
865
    if (!pShow->pIter) return terrno;
6,523!
866
  }
867

868
  while (numOfRows < rows) {
356,146✔
869
    pConn = mndGetNextConn(pMnode, pShow->pIter);
355,989✔
870
    if (pConn == NULL) {
357,693✔
871
      pShow->pIter = NULL;
6,524✔
872
      break;
6,524✔
873
    }
874

875
    if ((taosGetTimestampMs() - pConn->lastAccessTimeMs) > ((int64_t)CACHE_OBJ_KEEP_TIME * 1000)) {
351,134✔
876
      continue;
24,903✔
877
    }
878

879
    cols = 0;
326,231✔
880

881
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
326,231✔
882
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->id, false);
325,650✔
883
    if (code != 0) {
325,161!
884
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
885
      return code;
×
886
    }
887

888
    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
325,161✔
889
    STR_TO_VARSTR(user, pConn->user);
325,161✔
890
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
325,161✔
891
    code = colDataSetVal(pColInfo, numOfRows, (const char *)user, false);
324,845✔
892
    if (code != 0) {
324,895!
893
      mError("failed to set user since %s", tstrerror(code));
×
894
      return code;
×
895
    }
896

897
    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
898
    STR_TO_VARSTR(app, pConn->app);
324,895✔
899
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
324,895✔
900
    code = colDataSetVal(pColInfo, numOfRows, (const char *)app, false);
324,739✔
901
    if (code != 0) {
324,746!
902
      mError("failed to set app since %s", tstrerror(code));
×
903
      return code;
×
904
    }
905

906
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
324,746✔
907
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->pid, false);
324,272✔
908
    if (code != 0) {
324,573!
909
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
910
      return code;
×
911
    }
912

913
    char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
324,573✔
914
    taosInetNtoa(varDataVal(endpoint), pConn->ip);
324,573✔
915
    (void)tsnprintf(varDataVal(endpoint) + strlen(varDataVal(endpoint)),
326,175✔
916
              sizeof(endpoint) - VARSTR_HEADER_SIZE - strlen(varDataVal(endpoint)), ":%d", pConn->port);
326,175✔
917
    varDataLen(endpoint) = strlen(varDataVal(endpoint));
326,186✔
918
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
326,186✔
919
    code = colDataSetVal(pColInfo, numOfRows, (const char *)endpoint, false);
325,384✔
920
    if (code != 0) {
324,687!
921
      mError("failed to set endpoint since %s", tstrerror(code));
×
922
      return code;
×
923
    }
924

925
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
324,687✔
926
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);
324,193✔
927
    if (code != 0) {
324,344!
928
      mError("failed to set login time since %s", tstrerror(code));
×
929
      return code;
×
930
    }
931

932
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
324,344✔
933
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
323,863✔
934
    if (code != 0) {
324,289!
935
      mError("failed to set last access time since %s", tstrerror(code));
×
936
      return code;
×
937
    }
938

939
    char userApp[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
940
    STR_TO_VARSTR(userApp, pConn->userApp);
324,289✔
941
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
324,289✔
942
    code = colDataSetVal(pColInfo, numOfRows, (const char *)userApp, false);
323,959✔
943
    if (code != 0) {
324,519!
944
      mError("failed to set user app since %s", tstrerror(code));
×
945
      return code;
×
946
    }
947

948
    char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
324,519✔
949
    if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE) {
324,519!
950
      taosInetNtoa(varDataVal(userIp), pConn->userIp);
×
951
      varDataLen(userIp) = strlen(varDataVal(userIp));
×
952
    }
953
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
324,519✔
954
    code = colDataSetVal(pColInfo, numOfRows, (const char *)userIp, false);
324,479✔
955
    if (code != 0) {
324,721!
956
      mError("failed to set user ip since %s", tstrerror(code));
×
957
      return code;
×
958
    }
959

960
    numOfRows++;
324,721✔
961
  }
962

963
  pShow->numOfRows += numOfRows;
6,681✔
964
  return numOfRows;
6,681✔
965
}
966

967
/**
968
 * @param pConn the conn queries pack from
969
 * @param[out] pBlock the block data packed into
970
 * @param offset skip [offset] queries in pConn
971
 * @param rowsToPack at most rows to pack
972
 * @return rows packed
973
 */
974
static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBlock *pBlock, uint32_t offset,
629,446✔
975
                                    uint32_t rowsToPack) {
976
  int32_t cols = 0;
629,446✔
977
  int32_t code = 0;
629,446✔
978
  taosRLockLatch(&pConn->queryLock);
629,446✔
979
  int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
629,467✔
980
  if (NULL == pConn->pQueries || numOfQueries <= offset) {
629,379!
981
    taosRUnLockLatch(&pConn->queryLock);
137,301✔
982
    return 0;
137,259✔
983
  }
984

985
  int32_t i = offset;
492,078✔
986
  for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) {
1,045,198!
987
    int32_t     curRowIndex = pBlock->info.rows;
554,273✔
988
    SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
554,273✔
989
    cols = 0;
553,957✔
990

991
    char queryId[26 + VARSTR_HEADER_SIZE] = {0};
553,957✔
992
    (void)tsnprintf(&queryId[VARSTR_HEADER_SIZE], sizeof(queryId) - VARSTR_HEADER_SIZE, "%x:%" PRIx64, pConn->id,
553,957✔
993
              pQuery->reqRid);
994
    varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
554,315✔
995
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
554,315✔
996
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)queryId, false);
553,831✔
997
    if (code != 0) {
553,445!
998
      mError("failed to set query id:%s since %s", queryId, tstrerror(code));
×
999
      taosRUnLockLatch(&pConn->queryLock);
×
1000
      return code;
×
1001
    }
1002

1003
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
553,445✔
1004
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->queryId, false);
553,210✔
1005
    if (code != 0) {
553,117!
1006
      mError("failed to set query id:%" PRIx64 " since %s", pQuery->queryId, tstrerror(code));
×
1007
      taosRUnLockLatch(&pConn->queryLock);
×
1008
      return code;
×
1009
    }
1010

1011
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
553,117✔
1012
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->id, false);
552,848✔
1013
    if (code != 0) {
553,119!
1014
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
1015
      taosRUnLockLatch(&pConn->queryLock);
×
1016
      return code;
×
1017
    }
1018

1019
    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
1020
    STR_TO_VARSTR(app, pConn->app);
553,119✔
1021
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
553,119✔
1022
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)app, false);
552,931✔
1023
    if (code != 0) {
553,205!
1024
      mError("failed to set app since %s", tstrerror(code));
×
1025
      taosRUnLockLatch(&pConn->queryLock);
×
1026
      return code;
×
1027
    }
1028

1029
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
553,205✔
1030
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->pid, false);
552,967✔
1031
    if (code != 0) {
552,994!
1032
      mError("failed to set conn id:%u since %s", pConn->id, tstrerror(code));
×
1033
      taosRUnLockLatch(&pConn->queryLock);
×
1034
      return code;
×
1035
    }
1036

1037
    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
552,994✔
1038
    STR_TO_VARSTR(user, pConn->user);
552,994✔
1039
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
552,994✔
1040
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)user, false);
552,975✔
1041
    if (code != 0) {
553,393!
1042
      mError("failed to set user since %s", tstrerror(code));
×
1043
      taosRUnLockLatch(&pConn->queryLock);
×
1044
      return code;
×
1045
    }
1046

1047
    char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
553,393✔
1048
    taosInetNtoa(varDataVal(endpoint), pConn->ip);
553,393✔
1049
    (void)tsnprintf(varDataVal(endpoint) + strlen(varDataVal(endpoint)),
554,346✔
1050
              sizeof(endpoint) - VARSTR_HEADER_SIZE - strlen(varDataVal(endpoint)), ":%d", pConn->port);
554,346✔
1051
    varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
554,342✔
1052
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
554,342✔
1053
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)endpoint, false);
553,809✔
1054
    if (code != 0) {
553,210!
1055
      mError("failed to set endpoint since %s", tstrerror(code));
×
1056
      taosRUnLockLatch(&pConn->queryLock);
×
1057
      return code;
×
1058
    }
1059

1060
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
553,210✔
1061
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stime, false);
552,937✔
1062
    if (code != 0) {
552,856!
1063
      mError("failed to set start time since %s", tstrerror(code));
×
1064
      taosRUnLockLatch(&pConn->queryLock);
×
1065
      return code;
×
1066
    }
1067

1068
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
552,856✔
1069
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->useconds, false);
552,582✔
1070
    if (code != 0) {
552,727!
1071
      mError("failed to set useconds since %s", tstrerror(code));
×
1072
      taosRUnLockLatch(&pConn->queryLock);
×
1073
      return code;
×
1074
    }
1075

1076
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
552,727✔
1077
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stableQuery, false);
552,495✔
1078
    if (code != 0) {
552,785!
1079
      mError("failed to set stable query since %s", tstrerror(code));
×
1080
      taosRUnLockLatch(&pConn->queryLock);
×
1081
      return code;
×
1082
    }
1083

1084
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
552,785✔
1085
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->isSubQuery, false);
552,548✔
1086
    if (code != 0) {
552,790!
1087
      mError("failed to set sub query since %s", tstrerror(code));
×
1088
      taosRUnLockLatch(&pConn->queryLock);
×
1089
      return code;
×
1090
    }
1091

1092
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
552,790✔
1093
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false);
552,506✔
1094
    if (code != 0) {
554,055!
1095
      mError("failed to set sub plan num since %s", tstrerror(code));
×
1096
      taosRUnLockLatch(&pConn->queryLock);
×
1097
      return code;
×
1098
    }
1099

1100
    char    subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
554,055✔
1101
    int64_t reserve = 64;
554,055✔
1102
    int32_t strSize = sizeof(subStatus);
554,055✔
1103
    int32_t offset = VARSTR_HEADER_SIZE;
554,055✔
1104
    for (int32_t i = 0; i < pQuery->subPlanNum && offset + reserve < strSize; ++i) {
1,880,147!
1105
      if (i) {
1,326,162✔
1106
        offset += tsnprintf(subStatus + offset, sizeof(subStatus) - offset, ",");
772,613✔
1107
      }
1108
      if (offset + reserve < strSize) {
1,326,256!
1109
        SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
1,326,256✔
1110
        offset +=
1,326,092✔
1111
            tsnprintf(subStatus + offset, sizeof(subStatus) - offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
1,323,821✔
1112
      } else {
1113
        break;
×
1114
      }
1115
    }
1116
    varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
553,985✔
1117
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
553,985✔
1118
    code = colDataSetVal(pColInfo, curRowIndex, subStatus, (varDataLen(subStatus) == 0) ? true : false);
553,542✔
1119
    if (code != 0) {
553,064!
1120
      mError("failed to set sub status since %s", tstrerror(code));
×
1121
      taosRUnLockLatch(&pConn->queryLock);
×
1122
      return code;
×
1123
    }
1124

1125
    char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
553,064✔
1126
    STR_TO_VARSTR(sql, pQuery->sql);
553,064✔
1127
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
553,064✔
1128
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)sql, false);
553,402✔
1129
    if (code != 0) {
552,464!
1130
      mError("failed to set sql since %s", tstrerror(code));
×
1131
      taosRUnLockLatch(&pConn->queryLock);
×
1132
      return code;
×
1133
    }
1134

1135
    char userApp[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
1136
    STR_TO_VARSTR(userApp, pConn->userApp);
552,464✔
1137
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
552,464✔
1138
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)userApp, false);
552,436✔
1139
    if (code != 0) {
552,935!
1140
      mError("failed to set user app since %s", tstrerror(code));
×
1141
      taosRUnLockLatch(&pConn->queryLock);
×
1142
      return code;
×
1143
    }
1144

1145
    char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
552,935✔
1146
    if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE) {
552,935!
1147
      taosInetNtoa(varDataVal(userIp), pConn->userIp);
×
1148
      varDataLen(userIp) = strlen(varDataVal(userIp));
×
1149
    }
1150
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
552,935✔
1151
    code = colDataSetVal(pColInfo, curRowIndex, (const char *)userIp, false);
552,842✔
1152
    if (code != 0) {
553,120!
1153
      mError("failed to set user ip since %s", tstrerror(code));
×
1154
      taosRUnLockLatch(&pConn->queryLock);
×
1155
      return code;
×
1156
    }
1157

1158
    pBlock->info.rows++;
553,120✔
1159
  }
1160

1161
  taosRUnLockLatch(&pConn->queryLock);
490,925✔
1162
  return i - offset;
492,146✔
1163
}
1164

1165
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,536✔
1166
  SMnode   *pMnode = pReq->info.node;
11,536✔
1167
  SSdb     *pSdb = pMnode->pSdb;
11,536✔
1168
  int32_t   numOfRows = 0;
11,536✔
1169
  SConnObj *pConn = NULL;
11,536✔
1170

1171
  if (pShow->pIter == NULL) {
11,536!
1172
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
11,537✔
1173
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
11,537✔
1174
    if (!pShow->pIter) return terrno;
11,537✔
1175
  }
1176

1177
  // means fetched some data last time for this conn
1178
  if (pShow->curIterPackedRows > 0) {
11,527!
1179
    size_t len = 0;
×
1180
    pConn = taosCacheIterGetData(pShow->pIter, &len);
×
1181
    if (pConn && (taosArrayGetSize(pConn->pQueries) > pShow->curIterPackedRows)) {
×
1182
      numOfRows = packQueriesIntoBlock(pShow, pConn, pBlock, pShow->curIterPackedRows, rows);
×
1183
      pShow->curIterPackedRows += numOfRows;
×
1184
    }
1185
  }
1186

1187
  while (numOfRows < rows) {
640,828!
1188
    pConn = mndGetNextConn(pMnode, pShow->pIter);
640,835✔
1189
    if (pConn == NULL) {
640,940✔
1190
      pShow->pIter = NULL;
11,538✔
1191
      break;
11,538✔
1192
    }
1193

1194
    int32_t packedRows = packQueriesIntoBlock(pShow, pConn, pBlock, 0, rows - numOfRows);
629,402✔
1195
    pShow->curIterPackedRows = packedRows;
629,301✔
1196
    numOfRows += packedRows;
629,301✔
1197
  }
1198
  pShow->numOfRows += numOfRows;
11,531✔
1199
  return numOfRows;
11,531✔
1200
}
1201

1202
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
10,809✔
1203
  SMnode  *pMnode = pReq->info.node;
10,809✔
1204
  SSdb    *pSdb = pMnode->pSdb;
10,809✔
1205
  int32_t  numOfRows = 0;
10,809✔
1206
  int32_t  cols = 0;
10,809✔
1207
  SAppObj *pApp = NULL;
10,809✔
1208
  int32_t  code = 0;
10,809✔
1209

1210
  if (pShow->pIter == NULL) {
10,809!
1211
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
10,814✔
1212
    pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
10,814✔
1213
    if (!pShow->pIter) return terrno;
10,816!
1214
  }
1215

1216
  while (numOfRows < rows) {
260,013✔
1217
    pApp = mndGetNextApp(pMnode, pShow->pIter);
259,983✔
1218
    if (pApp == NULL) {
260,798✔
1219
      pShow->pIter = NULL;
10,814✔
1220
      break;
10,814✔
1221
    }
1222

1223
    cols = 0;
249,984✔
1224

1225
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,984✔
1226
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->appId, false);
249,899✔
1227
    if (code != 0) {
249,704!
1228
      mError("failed to set app id since %s", tstrerror(code));
×
1229
      return code;
×
1230
    }
1231

1232
    char ip[TD_IP_LEN + VARSTR_HEADER_SIZE] = {0};
249,704✔
1233
    taosInetNtoa(varDataVal(ip), pApp->ip);
249,704✔
1234
    varDataLen(ip) = strlen(varDataVal(ip));
250,001✔
1235
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
250,001✔
1236
    code = colDataSetVal(pColInfo, numOfRows, (const char *)ip, false);
249,826✔
1237
    if (code != 0) {
249,566!
1238
      mError("failed to set ip since %s", tstrerror(code));
×
1239
      return code;
×
1240
    }
1241

1242
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,566✔
1243
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->pid, false);
249,498✔
1244
    if (code != 0) {
249,417!
1245
      mError("failed to set pid since %s", tstrerror(code));
×
1246
      return code;
×
1247
    }
1248

1249
    char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
249,417✔
1250
    (void)tsnprintf(&name[VARSTR_HEADER_SIZE], sizeof(name) - VARSTR_HEADER_SIZE, "%s", pApp->name);
249,417✔
1251
    varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
249,917✔
1252
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,917✔
1253
    code = colDataSetVal(pColInfo, numOfRows, (const char *)name, false);
249,759✔
1254
    if (code != 0) {
249,539!
1255
      mError("failed to set app name since %s", tstrerror(code));
×
1256
      return code;
×
1257
    }
1258

1259
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,539✔
1260
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->startTime, false);
249,442✔
1261
    if (code != 0) {
249,296!
1262
      mError("failed to set start time since %s", tstrerror(code));
×
1263
      return code;
×
1264
    }
1265

1266
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,296✔
1267
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertsReq, false);
249,192✔
1268
    if (code != 0) {
249,268!
1269
      mError("failed to set insert req since %s", tstrerror(code));
×
1270
      return code;
×
1271
    }
1272

1273
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,268✔
1274
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertRows, false);
249,196✔
1275
    if (code != 0) {
249,231!
1276
      mError("failed to set insert rows since %s", tstrerror(code));
×
1277
      return code;
×
1278
    }
1279

1280
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,231✔
1281
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.insertElapsedTime, false);
249,185✔
1282
    if (code != 0) {
249,225!
1283
      mError("failed to set insert elapsed time since %s", tstrerror(code));
×
1284
      return code;
×
1285
    }
1286

1287
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,225✔
1288
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.insertBytes, false);
249,120✔
1289
    if (code != 0) {
249,182!
1290
      mError("failed to set insert bytes since %s", tstrerror(code));
×
1291
      return code;
×
1292
    }
1293

1294
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,182✔
1295
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.fetchBytes, false);
249,086✔
1296
    if (code != 0) {
249,186!
1297
      mError("failed to set fetch bytes since %s", tstrerror(code));
×
1298
      return code;
×
1299
    }
1300

1301
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,186✔
1302
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.queryElapsedTime, false);
249,094✔
1303
    if (code != 0) {
249,198!
1304
      mError("failed to set query elapsed time since %s", tstrerror(code));
×
1305
      return code;
×
1306
    }
1307

1308
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,198✔
1309
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfSlowQueries, false);
249,127✔
1310
    if (code != 0) {
249,212!
1311
      mError("failed to set slow queries since %s", tstrerror(code));
×
1312
      return code;
×
1313
    }
1314

1315
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,212✔
1316
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.totalRequests, false);
249,140✔
1317
    if (code != 0) {
249,168!
1318
      mError("failed to set total requests since %s", tstrerror(code));
×
1319
      return code;
×
1320
    }
1321

1322
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,168✔
1323
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.currentRequests, false);
249,071✔
1324
    if (code != 0) {
249,146!
1325
      mError("failed to set current requests since %s", tstrerror(code));
×
1326
      return code;
×
1327
    }
1328

1329
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
249,146✔
1330
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->lastAccessTimeMs, false);
249,089✔
1331
    if (code != 0) {
249,197!
1332
      mError("failed to set last access time since %s", tstrerror(code));
×
1333
      return code;
×
1334
    }
1335

1336
    numOfRows++;
249,197✔
1337
  }
1338

1339
  pShow->numOfRows += numOfRows;
10,844✔
1340
  return numOfRows;
10,844✔
1341
}
1342

1343
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
×
1344
  if (pIter != NULL) {
×
1345
    taosCacheDestroyIter(pIter);
×
1346
  }
1347
}
×
1348

1349
int32_t mndGetNumOfConnections(SMnode *pMnode) {
13✔
1350
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
13✔
1351
  return taosCacheGetNumOfObj(pMgmt->connCache);
13✔
1352
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc