• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.45
/source/client/src/clientMonitor.c
1
#include "clientMonitor.h"
2
#include "cJSON.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5
#include "osEnv.h"
6
#include "query.h"
7
#include "taos.h"
8
#include "taoserror.h"
9
#include "tarray.h"
10
#include "tglobal.h"
11
#include "thash.h"
12
#include "tmisce.h"
13
#include "tqueue.h"
14
#include "ttime.h"
15
#include "ttimer.h"
16

17
SRWLatch    monitorLock;
18
SRWLatch    monitorQueueLock;
19
void*       monitorTimer;
20
SHashObj*   monitorCounterHash;
21
int32_t     monitorFlag = 0;
22
int64_t     quitTime = 0;
23
int32_t     quitCnt = 0;
24
tsem2_t     monitorSem;
25
STaosQueue* monitorQueue;
26
SHashObj*   monitorSlowLogHash;
27
SHashObj*   monitorSlowLogHashPath;
28
char        tmpSlowLogPath[PATH_MAX] = {0};
29
TdThread    monitorThread;
30
extern bool tsEnableAuditDelete;
31

32
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
1,239,978✔
33
  if (strlen(tsTempDir) == 0) {
1,239,978✔
34
    return TSDB_CODE_INVALID_PARA;
×
35
  }
36
  bool hasDelim = (tsTempDir[strlen(tsTempDir) - 1] == TD_DIRSEP_CHAR) ? true : false;
1,239,978✔
37
  int ret = tsnprintf(tmpPath, size, "%s%stdengine_slow_log%s", tsTempDir, hasDelim ? "" : TD_DIRSEP, TD_DIRSEP);
1,239,978✔
38
  if (ret < 0) {
1,239,978✔
39
    tscError("failed to get tmp path ret:%d", ret);
×
40
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
41
  }
42
  return 0;
1,239,978✔
43
}
44

UNCOV
45
static void destroySlowLogClient(void* data) {
×
46
  if (data == NULL) {
×
47
    return;
×
48
  }
UNCOV
49
  SlowLogClient* slowLogClient = *(SlowLogClient**)data;
×
50
  if (taosCloseFile(&slowLogClient->pFile) != 0) {
×
51
    tscError("%s monitor failed to close file:%s, terrno:%d", __func__, slowLogClient->path, terrno);
×
52
  }
UNCOV
53
  slowLogClient->pFile = NULL;
×
54

UNCOV
55
  taosMemoryFree(slowLogClient);
×
56
}
57

58
static void destroyMonitorClient(void* data) {
1,234,879✔
59
  if (data == NULL) {
1,234,879✔
60
    return;
×
61
  }
62
  MonitorClient* pMonitor = *(MonitorClient**)data;
1,234,879✔
63
  if (pMonitor == NULL) {
1,234,879✔
UNCOV
64
    return;
×
65
  }
66
  if (!taosTmrStopA(&pMonitor->timer)) {
1,234,879✔
67
    tscError("failed to stop timer, pMonitor:%p", pMonitor);
191,333✔
68
  }
69
  taosHashCleanup(pMonitor->counters);
1,234,879✔
70
  int ret = taos_collector_registry_destroy(pMonitor->registry);
1,234,879✔
71
  if (ret) {
1,234,879✔
UNCOV
72
    tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
×
73
  }
74
  taosMemoryFree(pMonitor);
1,234,879✔
75
}
76

77
static void monitorFreeSlowLogData(void* paras) {
1,235,064✔
78
  MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
1,235,064✔
79
  if (pData == NULL) {
1,235,064✔
80
    return;
926✔
81
  }
82
  taosMemoryFreeClear(pData->data);
1,234,138✔
83
}
84

85
static void monitorFreeSlowLogDataEx(void* paras) {
926✔
86
  monitorFreeSlowLogData(paras);
926✔
87
  taosMemoryFree(paras);
926✔
88
}
926✔
89

90
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
4,674,889✔
91
  void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
4,674,889✔
92
  if (p == NULL) {
4,674,900✔
UNCOV
93
    tscError("failed to get app inst, clusterId:0x%" PRIx64, clusterId);
×
UNCOV
94
    return NULL;
×
95
  }
96
  return *(SAppInstInfo**)p;
4,674,900✔
97
}
98

99
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
820✔
100
  if (TSDB_CODE_SUCCESS != code) {
820✔
UNCOV
101
    tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
×
102
  }
103
  if (pMsg) {
820✔
104
    taosMemoryFree(pMsg->pData);
820✔
105
    taosMemoryFree(pMsg->pEpSet);
820✔
106
  }
107
  if (param != NULL) {
820✔
UNCOV
108
    MonitorSlowLogData* p = (MonitorSlowLogData*)param;
×
UNCOV
109
    if (code != 0) {
×
UNCOV
110
      tscError("failed to send slow log:%s, clusterId:0x%" PRIx64, p->data, p->clusterId);
×
111
    }
UNCOV
112
    MonitorSlowLogData tmp = *p;
×
UNCOV
113
    tmp.data = NULL;
×
UNCOV
114
    (void)monitorPutData2MonitorQueue(tmp);
×
115
  }
116
  return TSDB_CODE_SUCCESS;
820✔
117
}
118

119
static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) {
926✔
120
  int32_t    code = 0;
926✔
121
  void*      buf = NULL;
926✔
122
  SStatisReq sStatisReq;
132✔
123
  sStatisReq.pCont = pCont;
926✔
124
  sStatisReq.contLen = strlen(pCont);
926✔
125
  sStatisReq.type = type;
926✔
126

127
  int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
926✔
128
  if (tlen < 0) {
926✔
129
    code = terrno;
×
UNCOV
130
    goto FAILED;
×
131
  }
132
  buf = taosMemoryMalloc(tlen);
926✔
133
  if (buf == NULL) {
926✔
UNCOV
134
    tscError("sendReport failed, out of memory, len:%d", tlen);
×
UNCOV
135
    code = terrno;
×
UNCOV
136
    goto FAILED;
×
137
  }
138
  tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
926✔
139
  if (tlen < 0) {
926✔
UNCOV
140
    code = terrno;
×
UNCOV
141
    goto FAILED;
×
142
  }
143

144
  SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
926✔
145
  if (pInfo == NULL) {
926✔
UNCOV
146
    tscError("sendReport failed, out of memory send info");
×
147
    code = terrno;
×
UNCOV
148
    goto FAILED;
×
149
  }
150
  pInfo->fp = monitorReportAsyncCB;
926✔
151
  pInfo->msgInfo.pData = buf;
926✔
152
  pInfo->msgInfo.len = tlen;
926✔
153
  pInfo->msgType = TDMT_MND_STATIS;
926✔
154
  pInfo->param = param;
926✔
155
  pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
926✔
156
  pInfo->requestId = tGenIdPI64();
926✔
157
  pInfo->requestObjRefId = 0;
926✔
158

159
  // int64_t transporterId = 0;
160
  return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
926✔
161

162
FAILED:
×
163
  taosMemoryFree(buf);
×
164
  monitorFreeSlowLogDataEx(param);
×
UNCOV
165
  return code;
×
166
}
167

168
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
2,205,883✔
169
  char ts[50] = {0};
2,205,883✔
170
  (void)snprintf(ts, sizeof(ts), "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
2,205,883✔
171
  char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
2,205,883✔
172
  if (NULL == pCont) {
2,205,883✔
173
    tscError("generateClusterReport failed, get null content. since %s", tstrerror(terrno));
2,204,957✔
174
    return;
2,204,957✔
175
  }
176

177
  if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
926✔
178
    int ret = taos_collector_registry_clear_batch(registry);
926✔
179
    if (ret) {
926✔
180
      tscError("failed to clear registry, ret:%d", ret);
×
181
    }
182
  }
183
  taosMemoryFreeClear(pCont);
926✔
184
}
185

186
static void reportSendProcess(void* param, void* tmrId) {
981,551✔
187
  taosRLockLatch(&monitorLock);
981,551✔
188
  if (atomic_load_32(&monitorFlag) == 1) {
981,551✔
189
    taosRUnLockLatch(&monitorLock);
10,547✔
190
    return;
10,547✔
191
  }
192

193
  MonitorClient* pMonitor = (MonitorClient*)param;
971,004✔
194
  SAppInstInfo*  pInst = getAppInstByClusterId(pMonitor->clusterId);
971,004✔
195
  if (pInst == NULL) {
971,004✔
UNCOV
196
    taosRUnLockLatch(&monitorLock);
×
UNCOV
197
    return;
×
198
  }
199

200
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
971,004✔
201
  generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
971,004✔
202
  bool reset = taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param,
971,004✔
203
                            monitorTimer, &tmrId);
204
  tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
971,004✔
205
  taosRUnLockLatch(&monitorLock);
971,004✔
206
}
207

208
static void sendAllCounter() {
1,238,691✔
209
  MonitorClient** ppMonitor = NULL;
1,238,691✔
210
  while ((ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor))) {
2,473,570✔
211
    MonitorClient* pMonitor = *ppMonitor;
1,234,879✔
212
    if (pMonitor == NULL) {
1,234,879✔
UNCOV
213
      continue;
×
214
    }
215
    SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
1,234,879✔
216
    if (pInst == NULL) {
1,234,879✔
UNCOV
217
      taosHashCancelIterate(monitorCounterHash, ppMonitor);
×
UNCOV
218
      break;
×
219
    }
220
    SEpSet ep = getEpSet_s(&pInst->mgmtEp);
1,234,879✔
221
    generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
1,234,879✔
222
  }
223
}
1,238,691✔
224

225
void monitorCreateClient(int64_t clusterId) {
2,469,758✔
226
  MonitorClient* pMonitor = NULL;
2,469,758✔
227
  taosWLockLatch(&monitorLock);
2,469,758✔
228
  if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
2,469,758✔
229
    tscInfo("clusterId:0x%" PRIx64 ", create monitor", clusterId);
1,234,879✔
230
    pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
1,234,879✔
231
    if (pMonitor == NULL) {
1,234,879✔
UNCOV
232
      tscError("failed to create monitor client");
×
UNCOV
233
      goto fail;
×
234
    }
235
    pMonitor->clusterId = clusterId;
1,234,879✔
236
    char clusterKey[32] = {0};
1,234,879✔
237
    if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) {
1,234,879✔
UNCOV
238
      tscError("failed to create cluster key");
×
UNCOV
239
      goto fail;
×
240
    }
241
    pMonitor->registry = taos_collector_registry_new(clusterKey);
1,234,879✔
242
    if (pMonitor->registry == NULL) {
1,234,879✔
UNCOV
243
      tscError("failed to create registry");
×
UNCOV
244
      goto fail;
×
245
    }
246
    pMonitor->colector = taos_collector_new(clusterKey);
1,234,879✔
247
    if (pMonitor->colector == NULL) {
1,234,879✔
UNCOV
248
      tscError("failed to create collector");
×
UNCOV
249
      goto fail;
×
250
    }
251

252
    int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
1,234,879✔
253
    if (r) {
1,234,879✔
UNCOV
254
      tscError("failed to register collector, ret:%d", r);
×
UNCOV
255
      goto fail;
×
256
    }
257
    pMonitor->counters =
1,243,775✔
258
        (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,234,879✔
259
    if (pMonitor->counters == NULL) {
1,234,879✔
UNCOV
260
      tscError("failed to create monitor counters");
×
261
      goto fail;
×
262
    }
263

264
    if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
1,234,879✔
UNCOV
265
      tscError("failed to put monitor client to hash");
×
266
      goto fail;
×
267
    }
268

269
    SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
1,234,879✔
270
    if (pInst == NULL) {
1,234,879✔
UNCOV
271
      tscError("failed to get app instance by cluster id");
×
272
      pMonitor = NULL;
×
273
      goto fail;
×
274
    }
275
    pMonitor->timer = taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000,
1,234,879✔
276
                                   (void*)pMonitor, monitorTimer);
277
    if (pMonitor->timer == NULL) {
1,234,879✔
278
      tscError("failed to start timer");
×
279
      goto fail;
×
280
    }
281
    tscInfo("clusterId:0x%" PRIx64 ", create monitor finished, monitor:%p", clusterId, pMonitor);
1,234,879✔
282
  }
283
  taosWUnLockLatch(&monitorLock);
2,469,758✔
284

285
  return;
2,469,758✔
286

UNCOV
287
fail:
×
UNCOV
288
  destroyMonitorClient(&pMonitor);
×
289
  taosWUnLockLatch(&monitorLock);
×
290
}
291

292
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count,
2,469,758✔
293
                                const char** label_keys) {
294
  taosWLockLatch(&monitorLock);
2,469,758✔
295
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
2,469,758✔
296
  if (ppMonitor == NULL || *ppMonitor == NULL) {
2,469,758✔
297
    tscError("failed to get monitor client");
×
UNCOV
298
    goto end;
×
299
  }
300
  taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
2,469,758✔
301
  if (newCounter == NULL) return;
2,469,758✔
302
  MonitorClient* pMonitor = *ppMonitor;
2,469,758✔
303
  if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
2,469,758✔
UNCOV
304
    tscError("failed to add metric to collector");
×
305
    int r = taos_counter_destroy(newCounter);
×
306
    if (r) {
×
307
      tscError("failed to destroy counter, code:%d", r);
×
308
    }
UNCOV
309
    goto end;
×
310
  }
311
  if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
2,469,758✔
UNCOV
312
    tscError("failed to put counter to monitor");
×
UNCOV
313
    int r = taos_counter_destroy(newCounter);
×
UNCOV
314
    if (r) {
×
315
      tscError("failed to destroy counter, code:%d", r);
×
316
    }
UNCOV
317
    goto end;
×
318
  }
319
  tscInfo("clusterId:0x%" PRIx64 ", monitor:%p, create counter:%s %p", pMonitor->clusterId, pMonitor, name, newCounter);
2,469,758✔
320

321
end:
2,451,966✔
322
  taosWUnLockLatch(&monitorLock);
2,469,758✔
323
}
324

325
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
150,777✔
326
  taosWLockLatch(&monitorLock);
150,777✔
327
  if (atomic_load_32(&monitorFlag) == 1) {
150,777✔
328
    taosWUnLockLatch(&monitorLock);
7✔
329
    return;
7✔
330
  }
331

332
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
150,770✔
333
  if (ppMonitor == NULL || *ppMonitor == NULL) {
150,770✔
UNCOV
334
    tscError("clusterId:0x%" PRIx64 ", monitor not found", clusterId);
×
335
    goto end;
×
336
  }
337

338
  MonitorClient*   pMonitor = *ppMonitor;
150,770✔
339
  taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
150,770✔
340
  if (ppCounter == NULL || *ppCounter == NULL) {
150,770✔
UNCOV
341
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s not found", clusterId, pMonitor, counterName);
×
UNCOV
342
    goto end;
×
343
  }
344
  if (taos_counter_inc(*ppCounter, label_values) != 0) {
150,770✔
UNCOV
345
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s inc failed", clusterId, pMonitor, counterName);
×
UNCOV
346
    goto end;
×
347
  }
348
  tscTrace("clusterId:0x%" PRIx64 ", monitor:%p, counter:%s inc", pMonitor->clusterId, pMonitor, counterName);
150,770✔
349

350
end:
150,770✔
351
  taosWUnLockLatch(&monitorLock);
150,770✔
352
}
353

354
const char* monitorResultStr(SQL_RESULT_CODE code) {
150,777✔
355
  static const char* result_state[] = {"Success", "Failed", "Cancel"};
356
  return result_state[code];
150,777✔
357
}
358

359
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
×
360
  TdFilePtr pFile = NULL;
×
UNCOV
361
  SlowLogClient* pClient = NULL;
×
UNCOV
362
  void*     tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
×
363
  if (tmp == NULL) {
×
364
    char path[PATH_MAX] = {0};
×
UNCOV
365
    char clusterId[32] = {0};
×
UNCOV
366
    if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64"-%"PRId64, slowLogData->clusterId, taosGetTimestampNs()) < 0) {
×
UNCOV
367
      tscError("failed to generate clusterId:0x%" PRIx64, slowLogData->clusterId);
×
UNCOV
368
      return;
×
369
    }
UNCOV
370
    taosGetTmpfilePath(tmpPath, clusterId, path);
×
UNCOV
371
    tscInfo("monitor create slow log file:%s", path);
×
UNCOV
372
    pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
UNCOV
373
    if (pFile == NULL) {
×
UNCOV
374
      tscError("failed to open file:%s since %d", path, terrno);
×
UNCOV
375
      return;
×
376
    }
377

378
    pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
×
379
    if (pClient == NULL) {
×
380
      tscError("failed to allocate memory for slow log client");
×
381
      int32_t ret = taosCloseFile(&pFile);
×
382
      if (ret != 0) {
×
383
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
384
      }
385
      return;
×
386
    }
387
    tstrncpy(pClient->path, path, PATH_MAX);
×
388
    pClient->sendOffset = 0;
×
389
    pClient->pFile = pFile;
×
390
    pClient->clusterId = slowLogData->clusterId;
×
391
    pClient->type = SLOW_LOG_READ_RUNNING;
×
392
    if (taosHashPut(monitorSlowLogHash, &pClient->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
×
UNCOV
393
      tscError("failed to put clusterId:0x%" PRIx64 " to hash table", pClient->clusterId);
×
UNCOV
394
      int32_t ret = taosCloseFile(&pFile);
×
395
      if (ret != 0) {
×
396
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
397
      }
398
      taosMemoryFree(pClient);
×
399
      return;
×
400
    }
401

402
    if (taosLockFile(pFile) < 0) {
×
UNCOV
403
      tscError("failed to lock file:%p since %s", pFile, terrstr());
×
404
      return;
×
405
    }
406
  } else {
407
    pFile = (*(SlowLogClient**)tmp)->pFile;
×
408
    pClient = *(SlowLogClient**)tmp;
×
409
  }
410

411
  if (taosLSeekFile(pFile, 0, SEEK_END) < 0) {
×
412
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
UNCOV
413
    return;
×
414
  }
415
  if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) {
×
UNCOV
416
    tscError("failed to write len to file:%p since %s", pFile, terrstr());
×
417
  }
418
  tscDebug("monitor write slow log to file:%s, clusterId:0x%" PRIx64 ", data:%s", pClient->path, pClient->clusterId, slowLogData->data);
×
419
}
420

UNCOV
421
static char* readFile(SlowLogClient* pClient) {
×
UNCOV
422
  tscDebug("monitor readFile slow begin file:%s, offset:%" PRId64 ", size:%" PRId64, pClient->path,
×
423
           pClient->sendOffset, pClient->size);
UNCOV
424
  if (taosLSeekFile(pClient->pFile, pClient->sendOffset, SEEK_SET) < 0) {
×
UNCOV
425
    tscError("failed to seek file:%p code:%d", pClient->path, terrno);
×
426
    return NULL;
×
427
  }
428

UNCOV
429
  if ((pClient->size <= pClient->sendOffset)) {
×
430
    tscError("invalid size:%" PRId64 ", offset:%" PRId64, pClient->size, pClient->sendOffset);
×
431
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
UNCOV
432
    return NULL;
×
433
  }
UNCOV
434
  char*   pCont = NULL;
×
UNCOV
435
  int64_t totalSize = 0;
×
436
  if (pClient->size - pClient->sendOffset >= SLOW_LOG_SEND_SIZE_MAX) {
×
437
    totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX;
×
438
  } else {
439
    totalSize = 4 + (pClient->size - pClient->sendOffset);
×
440
  }
441

UNCOV
442
  pCont = taosMemoryCalloc(1, totalSize);  // 4 reserved for []
×
443
  if (pCont == NULL) {
×
444
    tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
×
445
    return NULL;
×
446
  }
UNCOV
447
  char* buf = pCont;
×
448
  (void)strncat(buf++, "[", totalSize - 1);
×
449
  int64_t readSize = taosReadFile(pClient->pFile, buf, totalSize - 4);  // 4 reserved for []
×
450
  if (readSize <= 0) {
×
451
    if (readSize < 0) {
×
UNCOV
452
      tscError("failed to read len from file:%s since %s", pClient->path, terrstr());
×
453
    }
UNCOV
454
    taosMemoryFree(pCont);
×
UNCOV
455
    return NULL;
×
456
  }
457

458
  totalSize = 0;
×
459
  while (1) {
×
UNCOV
460
    size_t len = strlen(buf);
×
461
    if (len == SLOW_LOG_SEND_SIZE_MAX) {  // one item is too long
×
462
      pClient->sendOffset = pClient->size;
×
463
      *buf = ']';
×
464
      *(buf + 1) = '\0';
×
465
      break;
×
466
    }
467

468
    totalSize += (len + 1);
×
469
    if (totalSize > readSize) {
×
UNCOV
470
      *(buf - 1) = ']';
×
UNCOV
471
      *buf = '\0';
×
472
      break;
×
473
    }
474

475
    if (len == 0) {             // one item is empty
×
476
      if (*(buf - 1) == '[') {  // data is "\0"
×
477
        // no data read
478
        *buf = ']';
×
479
        *(buf + 1) = '\0';
×
480
      } else {  // data is "ass\0\0"
UNCOV
481
        *(buf - 1) = ']';
×
482
        *buf = '\0';
×
483
      }
484
      pClient->sendOffset += 1;
×
485
      break;
×
486
    }
UNCOV
487
    buf[len] = ',';  // replace '\0' with ','
×
UNCOV
488
    buf += (len + 1);
×
489
    pClient->sendOffset += (len + 1);
×
490
  }
491

492
  tscDebugL("monitor readFile slow log end, data:%s, offset:%" PRId64, pCont, pClient->sendOffset);
×
493
  return pCont;
×
494
}
495

496
static void processFileRemoved(SlowLogClient* pClient) {
×
UNCOV
497
  int32_t ret = taosCloseFile(&(pClient->pFile));
×
498
  if (ret != 0) {
×
499
    tscError("%s failed to close file:%s ret:%d", __func__, pClient->path, ret);
×
500
  }
501
  pClient->pFile = NULL;
×
502

503
  TdFilePtr pFile =
UNCOV
504
      taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
UNCOV
505
  if (pFile == NULL) {
×
506
    tscError("%s failed to open file:%s since %d", __func__, pClient->path, terrno);
×
507
  } else {
UNCOV
508
    pClient->pFile = pFile;
×
509
  }
510
  pClient->size = 0;
×
511
}
×
512

513
static int32_t processFile(SlowLogClient* pClient) {
×
UNCOV
514
  int32_t code = 0;
×
UNCOV
515
  code = taosStatFile(pClient->path, &pClient->size, NULL, NULL);
×
516
  if (code < 0 && ERRNO == ENOENT) {
×
UNCOV
517
    processFileRemoved(pClient);
×
UNCOV
518
    tscError("%s monitor rebuild file:%s because of file not exist", __func__, pClient->path);
×
519
    code = 0;
×
520
  }
521
  return code;
×
522
}
523

UNCOV
524
static int32_t sendSlowLog(int64_t clusterId, char* data, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter,
×
525
                           SEpSet* epSet) {
526
  if (data == NULL) {
×
UNCOV
527
    return TSDB_CODE_INVALID_PARA;
×
528
  }
529
  MonitorSlowLogData* pParam = taosMemoryCalloc(1, sizeof(MonitorSlowLogData));
×
530
  if (pParam == NULL) {
×
531
    taosMemoryFree(data);
×
UNCOV
532
    return terrno;
×
533
  }
534
  pParam->data = data;
×
535
  pParam->type = type;
×
UNCOV
536
  pParam->fileName = fileName;
×
537
  pParam->clusterId = clusterId;
×
538
  return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
×
539
}
540

541
static int32_t monitorReadSend(SlowLogClient* pClient) {
×
542
  SAppInstInfo* pInst = getAppInstByClusterId(pClient->clusterId);
×
543
  if (pInst == NULL) {
×
UNCOV
544
    tscError("%s failed to get app instance by clusterId:0x%" PRIx64, __func__, pClient->clusterId);
×
UNCOV
545
    return terrno;
×
546
  }
UNCOV
547
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
548
  char*  data = readFile(pClient);
×
549
  if (data == NULL) return terrno;
×
550
  int32_t code = sendSlowLog(pClient->clusterId, data, pClient->type, pClient->path, pInst->pTransporter, &ep);
×
551
  if (code != 0) {
×
552
    pClient->sendOffset = 0;  // set offset = 0 to send data at beginning if send data failed
×
UNCOV
553
    tscError("monitor send slow log failed, clusterId:0x%" PRIx64 ", ret:%d", pClient->clusterId, code);
×
554
  }
555
  return code;
×
556
}
557

558
static void sendOneClient(SlowLogClient* pClient) {
×
559
  int32_t code = processFile(pClient);
×
560
  if (code < 0) {
×
UNCOV
561
    tscError("failed to get file size for file:%s, code:%d, errno:%d", pClient->path, code, errno);
×
UNCOV
562
    return;
×
563
  }
564
  if (pClient->size <= pClient->sendOffset) {
×
565
    if (pClient->type == SLOW_LOG_READ_RUNNING && pClient->size > 0) {
×
566
      if (taosFtruncateFile(pClient->pFile, 0) < 0) {
×
UNCOV
567
        tscError("failed to truncate file:%s code:%d", pClient->path, terrno);
×
568
      }
569
      tscDebug("monitor truncate file to 0 file:%s", pClient->path);
×
570
    } else if (pClient->type == SLOW_LOG_READ_BEGINNIG || pClient->type == SLOW_LOG_READ_QUIT) {
×
571
      if (taosCloseFile(&pClient->pFile) != 0) {
×
UNCOV
572
        tscError("failed to close file:%s ret:%d", pClient->path, terrno);
×
573
      }
574
      pClient->pFile = NULL;
×
575
      if (taosRemoveFile(pClient->path) != 0) {
×
UNCOV
576
        tscError("failed to remove file:%s, terrno:%d", pClient->path, terrno);
×
577
      }
UNCOV
578
      tscInfo("monitor remove file:%s when send data out at beginning", pClient->path);
×
579
    }
580

UNCOV
581
    pClient->sendOffset = 0;
×
UNCOV
582
    if (pClient->type == SLOW_LOG_READ_QUIT) {
×
583
      pClient->closed = true;
×
584
      quitCnt++;
×
585
    }
586
  } else {
587
    if (pClient->closed) {
×
UNCOV
588
      tscWarn("%s client is closed, skip send slow log, file:%s", __func__, pClient->path);
×
589
      return;
×
590
    }
591
    code = monitorReadSend(pClient);
×
592
    tscDebug("%s monitor send slow log, file:%s code:%d", __func__, pClient->path, code);
×
593
  }
594
}
595

596
static void monitorSendSlowLogAtRunningCb(int64_t clusterId) {
×
597
  int32_t code = 0;
×
UNCOV
598
  void*   tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
599
  if (tmp == NULL) {
×
600
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
UNCOV
601
    return;
×
602
  }
603
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
UNCOV
604
  if (pClient == NULL) {
×
UNCOV
605
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
UNCOV
606
    return;
×
607
  }
608
  sendOneClient(pClient);
×
609
}
610

UNCOV
611
static void monitorSendSlowLogAtBeginningCb(const char* fileName) {
×
612
  int32_t code = 0;
×
613
  void*   tmp = taosHashGet(monitorSlowLogHashPath, fileName, strlen(fileName));
×
614
  if (tmp == NULL) {
×
UNCOV
615
    tscError("failed to get slow log client by fileName:%s", fileName);
×
616
    return;
×
617
  }
618
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
619
  if (pClient == NULL) {
×
620
    tscError("failed to get slow log client by fileName:%s", fileName);
×
621
    return;
×
622
  }
UNCOV
623
  sendOneClient(pClient);
×
624
}
625

626
static void monitorSendAllSlowLog() {
357,613,987✔
627
  int32_t code = 0;
357,613,987✔
628
  int64_t t = taosGetMonoTimestampMs();
357,613,987✔
629
  void*   pIter = NULL;
357,613,987✔
630
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
357,613,987✔
UNCOV
631
    int64_t*       clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
×
UNCOV
632
    SAppInstInfo*  pInst = getAppInstByClusterId(*clusterId);
×
633
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
×
634
    if (pClient == NULL || pInst == NULL) {
×
635
      taosHashCancelIterate(monitorSlowLogHash, pIter);
×
UNCOV
636
      return;
×
637
    }
638
    if (t - pClient->lastSendTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000 ||
×
639
        atomic_load_32(&monitorFlag) == 1) {
×
640
      pClient->lastSendTime = t;
×
641
    } else {
642
      continue;
×
643
    }
644

645
    if (atomic_load_32(&monitorFlag) == 1) {  // change type to quit
×
646
      pClient->type = SLOW_LOG_READ_QUIT;
×
647
    }
UNCOV
648
    tscDebug("monitor send slow log for clusterId:0x%" PRIx64 ", file:%s, type:%d", *clusterId, pClient->path, pClient->type);
×
UNCOV
649
    if (pClient->sendOffset > 0) {  // already in sending process
×
UNCOV
650
      continue;
×
651
    }
652
    sendOneClient(pClient);
×
653
  }
654
}
655

656
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
1,234,138✔
657
  SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
1,234,138✔
658

659
  if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
1,234,138✔
660
    tscInfo("monitor is disabled, skip send slow log");
1,232,808✔
661
    return;
1,232,808✔
662
  }
663
  char namePrefix[PATH_MAX] = {0};
1,330✔
664
  if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
1,330✔
665
    tscError("failed to generate slow log file name prefix");
×
666
    return;
×
667
  }
668

669
  char tmpPath[PATH_MAX] = {0};
1,330✔
670
  if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
1,330✔
UNCOV
671
    return;
×
672
  }
673

674
  TdDirPtr pDir = taosOpenDir(tmpPath);
1,330✔
675
  if (pDir == NULL) {
1,330✔
676
    return;
×
677
  }
678

679
  TdDirEntryPtr de = NULL;
1,330✔
680
  while ((de = taosReadDir(pDir)) != NULL) {
3,990✔
681
    if (taosDirEntryIsDir(de)) {
2,660✔
682
      continue;
2,660✔
683
    }
684

UNCOV
685
    char* name = taosGetDirEntryName(de);
×
686
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) {
×
UNCOV
687
      tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId);
×
UNCOV
688
      continue;
×
689
    }
690

691
    char filename[PATH_MAX] = {0};
×
692
    (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
×
693
    int64_t fileSize = 0;
×
694
    if (taosStatFile(filename, &fileSize, NULL, NULL) < 0) {
×
695
      tscError("failed to get file:%s status, since %s", filename, terrstr());
×
UNCOV
696
      continue;
×
697
    }
698
    if (fileSize == 0) {
×
UNCOV
699
      if (taosRemoveFile(filename) != 0) {
×
700
        tscError("failed to remove file:%s, terrno:%d", filename, terrno);
×
701
      }
UNCOV
702
      continue;
×
703
    }
704

UNCOV
705
    tscInfo("%s monitor opening file:%s at beginning", __func__, filename);
×
UNCOV
706
    TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
×
UNCOV
707
    if (pFile == NULL) {
×
UNCOV
708
      tscError("failed to open file:%s since %s", filename, terrstr());
×
UNCOV
709
      continue;
×
710
    }
UNCOV
711
    if (taosLockFile(pFile) < 0) {
×
UNCOV
712
      tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
×
UNCOV
713
      int32_t ret = taosCloseFile(&pFile);
×
UNCOV
714
      if (ret != 0) {
×
715
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
716
      }
UNCOV
717
      continue;
×
718
    }
719

UNCOV
720
    SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
×
721
    if (pClient == NULL) {
×
UNCOV
722
      tscError("failed to allocate memory for slow log client");
×
UNCOV
723
      int32_t ret = taosCloseFile(&pFile);
×
UNCOV
724
      if (ret != 0) {
×
UNCOV
725
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
726
      }
UNCOV
727
      return;
×
728
    }
UNCOV
729
    tstrncpy(pClient->path, filename, PATH_MAX);
×
UNCOV
730
    pClient->sendOffset = 0;
×
UNCOV
731
    pClient->pFile = pFile;
×
UNCOV
732
    pClient->clusterId = clusterId;
×
UNCOV
733
    pClient->type = SLOW_LOG_READ_BEGINNIG;
×
UNCOV
734
    if (taosHashPut(monitorSlowLogHashPath, filename, strlen(filename), &pClient, POINTER_BYTES) != 0) {
×
735
      tscError("failed to put clusterId:0x%" PRIx64 " to hash table", pClient->clusterId);
×
736
      int32_t ret = taosCloseFile(&pFile);
×
737
      if (ret != 0) {
×
738
        tscError("failed to close file:%s ret:%d", filename, ret);
×
739
      }
UNCOV
740
      taosMemoryFree(pClient);
×
741
      return;
×
742
    }
743
    sendOneClient(pClient);
×
744
  }
745

746
  int32_t ret = taosCloseDir(&pDir);
1,330✔
747
  if (ret != 0) {
1,330✔
748
    tscError("failed to close dir, ret:%d", ret);
×
749
  }
750
}
751

752
static void* monitorThreadFunc(void* param) {
1,238,648✔
753
  setThreadName("client-monitor-slowlog");
1,238,648✔
754
  tscInfo("monitor update thread started");
1,238,648✔
755
  while (1) {
357,613,987✔
756
    if (atomic_load_32(&monitorFlag) == 1) {
358,852,635✔
757
      if (taosGetMonoTimestampMs() - quitTime > 1000 ||
1,238,648✔
758
          quitCnt == taosHashGetSize(monitorSlowLogHash)) {  // quit at most 1000ms or no data need to send
1,238,648✔
759
        tscInfo("monitorThreadFunc quit since timeout or quitcnt:%d", quitCnt);
1,238,648✔
760
        break;
1,238,648✔
761
      }
762
    }
763

764
    (void)tsem2_timewait(&monitorSem, 100);
357,613,987✔
765
    monitorSendAllSlowLog();
357,613,987✔
766

767
    MonitorSlowLogData* slowLogData = NULL;
357,613,987✔
768
    taosReadQitem(monitorQueue, (void**)&slowLogData);
357,613,987✔
769
    if (slowLogData == NULL) {
357,613,987✔
770
      continue;
356,379,849✔
771
    }
772

773
    if (slowLogData->type == SLOW_LOG_READ_ALL) {
1,234,138✔
774
      monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
1,234,138✔
UNCOV
775
    } else if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) {
×
UNCOV
776
      monitorSendSlowLogAtBeginningCb(slowLogData->fileName);
×
UNCOV
777
    } else if (slowLogData->type == SLOW_LOG_WRITE) {
×
UNCOV
778
      monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
×
UNCOV
779
    } else if (slowLogData->type == SLOW_LOG_READ_RUNNING || slowLogData->type == SLOW_LOG_READ_QUIT) {
×
UNCOV
780
      monitorSendSlowLogAtRunningCb(slowLogData->clusterId);
×
781
    }
782
    monitorFreeSlowLogData(slowLogData);
1,234,138✔
783
    taosFreeQitem(slowLogData);
1,234,138✔
784
  }
785

786
  return NULL;
1,238,648✔
787
}
788

789
static int32_t tscMonitortInit() {
1,238,648✔
790
  TdThreadAttr thAttr;
1,229,584✔
791
  if (taosThreadAttrInit(&thAttr) != 0) {
1,238,648✔
792
    tscError("failed to init thread attr since %s", strerror(ERRNO));
×
793
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
794
  }
795
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
1,238,648✔
UNCOV
796
    tscError("failed to set thread attr since %s", strerror(ERRNO));
×
UNCOV
797
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
798
  }
799

800
  if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
1,238,648✔
UNCOV
801
    tscError("failed to create monitor thread since %s", strerror(ERRNO));
×
802
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
803
  }
804

805
  (void)taosThreadAttrDestroy(&thAttr);
1,238,648✔
806
  return 0;
1,238,648✔
807
}
808

809
static void tscMonitorStop() {
1,238,691✔
810
  if (taosCheckPthreadValid(monitorThread)) {
1,238,691✔
811
    (void)taosThreadJoin(monitorThread, NULL);
1,238,648✔
812
    taosThreadClear(&monitorThread);
1,238,648✔
813
  }
814
}
1,238,691✔
815

816
int32_t monitorInit() {
1,238,648✔
817
  int32_t code = 0;
1,238,648✔
818

819
  tscInfo("monitor init");
1,238,648✔
820
  monitorCounterHash =
1,238,648✔
821
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,238,648✔
822
  if (monitorCounterHash == NULL) {
1,238,648✔
UNCOV
823
    tscError("failed to create monitorCounterHash");
×
UNCOV
824
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
825
  }
826
  taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
1,238,648✔
827

828
  monitorSlowLogHashPath =
1,238,648✔
829
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,238,648✔
830
  if (monitorSlowLogHashPath == NULL) {
1,238,648✔
UNCOV
831
    tscError("failed to create monitorSlowLogHashPath");
×
UNCOV
832
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
833
  }
834
  taosHashSetFreeFp(monitorSlowLogHashPath, destroySlowLogClient);
1,238,648✔
835

836
  monitorSlowLogHash =
1,238,648✔
837
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,238,648✔
838
  if (monitorSlowLogHash == NULL) {
1,238,648✔
839
    tscError("failed to create monitorSlowLogHash");
×
UNCOV
840
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
841
  }
842
  taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
1,238,648✔
843

844
  monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
1,238,648✔
845
  if (monitorTimer == NULL) {
1,238,648✔
UNCOV
846
    tscError("failed to create monitor timer");
×
UNCOV
847
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
848
  }
849

850
  code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
1,238,648✔
851
  if (code != 0) {
1,238,648✔
UNCOV
852
    return code;
×
853
  }
854

855
  code = taosMulModeMkDir(tmpSlowLogPath, 0777, true);
1,238,648✔
856
  if (code != 0) {
1,238,648✔
UNCOV
857
    tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
×
UNCOV
858
    return code;
×
859
  }
860

861
  if (tsem2_init(&monitorSem, 0, 0) != 0) {
1,238,648✔
UNCOV
862
    tscError("sem init error since %s", terrstr());
×
UNCOV
863
    return TAOS_SYSTEM_ERROR(ERRNO);
×
864
  }
865

866
  code = taosOpenQueue(&monitorQueue);
1,238,648✔
867
  if (code) {
1,238,648✔
UNCOV
868
    tscError("open queue error since %s", terrstr());
×
UNCOV
869
    return TAOS_GET_TERRNO(code);
×
870
  }
871

872
  taosInitRWLatch(&monitorLock);
1,238,648✔
873
  taosInitRWLatch(&monitorQueueLock);
1,238,648✔
874
  return tscMonitortInit();
1,238,648✔
875
}
876

877
void monitorClose() {
1,238,691✔
878
  tscInfo("monitor close");
1,238,691✔
879
  taosWLockLatch(&monitorLock);
1,238,691✔
880
  atomic_store_32(&monitorFlag, 1);
1,238,691✔
881
  atomic_store_64(&quitTime, taosGetMonoTimestampMs());
1,238,691✔
882
  tscMonitorStop();
1,238,691✔
883
  sendAllCounter();
1,238,691✔
884
  taosHashCleanup(monitorCounterHash);
1,238,691✔
885
  taosHashCleanup(monitorSlowLogHash);
1,238,691✔
886
  taosTmrCleanUp(monitorTimer);
1,238,691✔
887
  taosWUnLockLatch(&monitorLock);
1,238,691✔
888

889
  taosWLockLatch(&monitorQueueLock);
1,238,691✔
890
  taosCloseQueue(monitorQueue);
1,238,691✔
891
  monitorQueue = NULL;
1,238,691✔
892
  if (tsem2_destroy(&monitorSem) != 0) {
1,238,691✔
UNCOV
893
    tscError("failed to destroy semaphore");
×
894
  }
895
  taosWUnLockLatch(&monitorQueueLock);
1,238,691✔
896
}
1,238,691✔
897

898
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
1,234,879✔
899
  int32_t             code = 0;
1,234,879✔
900
  MonitorSlowLogData* slowLogData = NULL;
1,234,879✔
901

902
  code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
1,234,879✔
903
  if (code) {
1,234,879✔
UNCOV
904
    tscError("monitor failed to allocate slow log data");
×
UNCOV
905
    return code;
×
906
  }
907
  *slowLogData = data;
1,234,879✔
908
  tscDebug("monitor write slow log to queue, clusterId:0x%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
1,234,879✔
909
           queueTypeStr[slowLogData->type], slowLogData->data == NULL ? "null" : slowLogData->data);
910
  taosWLockLatch(&monitorQueueLock);
1,234,879✔
911
  if (monitorQueue == NULL) {
1,234,879✔
UNCOV
912
    tscError("monitor queue is null");
×
UNCOV
913
    taosWUnLockLatch(&monitorQueueLock);
×
UNCOV
914
    taosFreeQitem(slowLogData);
×
UNCOV
915
    return 0;
×
916
  }
917
  code = taosWriteQitem(monitorQueue, slowLogData);
1,234,879✔
918
  taosWUnLockLatch(&monitorQueueLock);
1,234,879✔
919

920
  if (code == 0) {
1,234,879✔
921
    if (tsem2_post(&monitorSem) != 0) {
1,234,879✔
UNCOV
922
      tscError("failed to post semaphore");
×
923
    }
924
  } else {
UNCOV
925
    taosFreeQitem(slowLogData);
×
926
  }
927
  return code;
1,234,879✔
928
}
929

UNCOV
930
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
×
931
  taosMemoryFree(pMsg->pData);
×
932
  taosMemoryFree(pMsg->pEpSet);
×
UNCOV
933
  tscDebug("[del report]delete reportCB code:%d", code);
×
UNCOV
934
  return 0;
×
935
}
936

937
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
×
938
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
UNCOV
939
  if (sendInfo == NULL) {
×
UNCOV
940
    tscError("[del report] failed to allocate memory for sendInfo");
×
UNCOV
941
    return terrno;
×
942
  }
943

UNCOV
944
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
×
945

UNCOV
946
  sendInfo->requestId = requestId;
×
UNCOV
947
  sendInfo->requestObjRefId = 0;
×
948
  sendInfo->param = NULL;
×
949
  sendInfo->fp = reportCB;
×
UNCOV
950
  sendInfo->msgType = TDMT_MND_AUDIT;
×
951

952
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
953

UNCOV
954
  int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
UNCOV
955
  if (code != 0) {
×
UNCOV
956
    tscError("[del report] failed to send msg to server, code:%d", code);
×
957
    taosMemoryFree(sendInfo);
×
958
    return code;
×
959
  }
960
  return TSDB_CODE_SUCCESS;
×
961
}
962

UNCOV
963
static int32_t setDeleteStmtAuditReqTableInfo(SDeleteStmt* pStmt, SAuditReq* pReq) {
×
964
  if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
×
965
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->pFromTable->type));
×
966
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
967
  }
968
  SRealTableNode* pTableNode = (SRealTableNode*)pStmt->pFromTable;
×
UNCOV
969
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTableNode->table.tableName));
×
UNCOV
970
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTableNode->table.dbName));
×
971
  return TSDB_CODE_SUCCESS;
×
972
}
973

974
static int32_t setModifyStmtAuditReqTableInfo(SVnodeModifyOpStmt* pStmt, SAuditReq* pReq) {
×
975
  if (pStmt->insertType != TSDB_QUERY_TYPE_INSERT && pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT &&
×
976
      pStmt->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
×
977
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->sqlNodeType));
×
UNCOV
978
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
979
  }
980

981
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pStmt->targetTableName.tname));
×
982
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pStmt->targetTableName.dbname));
×
983
  return TSDB_CODE_SUCCESS;
×
984
}
985

986
typedef struct SAuditTableListInfo {
987
  SArray* dbList;
988
  SArray* tableList;
989
} SAuditTableListInfo;
990

991
static int32_t initAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
992
  if (pInfo->dbList) return TSDB_CODE_SUCCESS;
×
993

UNCOV
994
  pInfo->dbList = taosArrayInit(4, TSDB_DB_FNAME_LEN);
×
995
  if (pInfo->dbList == NULL) {
×
996
    tscError("[report] failed to create db list array");
×
997
    return terrno;
×
998
  }
999

UNCOV
1000
  pInfo->tableList = taosArrayInit(4, TSDB_TABLE_NAME_LEN);
×
1001
  if (pInfo->tableList == NULL) {
×
1002
    tscError("[report] failed to create table list array");
×
1003
    taosArrayDestroy(pInfo->dbList);
×
1004
    return terrno;
×
1005
  }
UNCOV
1006
  return TSDB_CODE_SUCCESS;
×
1007
}
1008

1009
static void destroyAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
1010
  if (pInfo->dbList) {
×
UNCOV
1011
    taosArrayDestroy(pInfo->dbList);
×
UNCOV
1012
    pInfo->dbList = NULL;
×
1013
  }
UNCOV
1014
  if (pInfo->tableList) {
×
UNCOV
1015
    taosArrayDestroy(pInfo->tableList);
×
UNCOV
1016
    pInfo->tableList = NULL;
×
1017
  }
1018
}
×
1019

UNCOV
1020
static void copyTableInfoToAuditReq(SAuditTableListInfo* pTbListInfo, SAuditReq* pReq) {
×
1021
  if (pTbListInfo->dbList->size > 0) {
×
1022
    char* dbName = (char*)taosArrayGet(pTbListInfo->dbList, 0);
×
1023
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", dbName));
×
1024
  }
UNCOV
1025
  if (pTbListInfo->tableList->size > 0) {
×
UNCOV
1026
    char* tableName = (char*)taosArrayGet(pTbListInfo->tableList, 0);
×
1027
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", tableName));
×
1028
  }
1029
}
×
1030

1031
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo);
UNCOV
1032
static int32_t doSetOperatorTableInfo(SSetOperator* pSetOperator, SAuditTableListInfo* pTbListInfo) {
×
1033
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1034
  if (pSetOperator->pLeft) {
×
UNCOV
1035
    SSelectStmt* pLeftSelect = (SSelectStmt*)pSetOperator->pLeft;
×
1036
    if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SELECT_STMT) {
×
1037
      code = doSetSelectStmtAuditReqTableInfo(pLeftSelect->pFromTable, NULL, pTbListInfo);
×
1038
    } else if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SET_OPERATOR) {
×
1039
      SSetOperator* pLeftSetOperator = (SSetOperator*)pSetOperator->pLeft;
×
UNCOV
1040
      code = doSetOperatorTableInfo(pLeftSetOperator, pTbListInfo);
×
1041
      TAOS_RETURN(code);
×
1042
    }
1043
  }
UNCOV
1044
  if (pSetOperator->pRight) {
×
1045
    SSelectStmt* pRightSelect = (SSelectStmt*)pSetOperator->pRight;
×
UNCOV
1046
    if (nodeType(pSetOperator->pRight) == QUERY_NODE_SELECT_STMT) {
×
1047
      code = doSetSelectStmtAuditReqTableInfo(pRightSelect->pFromTable, NULL, pTbListInfo);
×
1048
      TAOS_RETURN(code);
×
1049
    } else if (nodeType(pSetOperator->pRight) == QUERY_NODE_SET_OPERATOR) {
×
1050
      SSetOperator* pRightSetOperator = (SSetOperator*)pSetOperator->pRight;
×
UNCOV
1051
      code = doSetOperatorTableInfo(pRightSetOperator, pTbListInfo);
×
1052
      TAOS_RETURN(code);
×
1053
    }
1054
  }
UNCOV
1055
  return code;
×
1056
}
1057

UNCOV
1058
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo) {
×
1059
  int32_t     code = TSDB_CODE_SUCCESS;
×
1060
  int32_t     lino = 0;
×
1061
  STableNode* pTable = NULL;
×
1062

1063
  if (!pFromTable) return TSDB_CODE_TSC_INVALID_OPERATION;
×
1064

1065
  if (nodeType(pFromTable) == QUERY_NODE_REAL_TABLE) {
×
1066
    SRealTableNode* pTableNode = (SRealTableNode*)pFromTable;
×
1067
    pTable = &pTableNode->table;
×
1068

UNCOV
1069
  } else if (nodeType(pFromTable) == QUERY_NODE_TEMP_TABLE && ((STempTableNode*)pFromTable)->pSubquery) {
×
UNCOV
1070
    if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SELECT_STMT) {
×
1071
      SSelectStmt* pSubquery = (SSelectStmt*)((STempTableNode*)pFromTable)->pSubquery;
×
1072
      return doSetSelectStmtAuditReqTableInfo(pSubquery->pFromTable, pReq, pTbListInfo);
×
1073

1074
    } else if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SET_OPERATOR) {
×
1075
      code = initAuditTableListInfo(pTbListInfo);
×
1076
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1077

1078
      SSetOperator* pSetOperator = (SSetOperator*)((STempTableNode*)pFromTable)->pSubquery;
×
1079
      code = doSetOperatorTableInfo(pSetOperator, pTbListInfo);
×
UNCOV
1080
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1081
    }
1082
  } else if (nodeType(pFromTable) == QUERY_NODE_VIRTUAL_TABLE && pFromTable) {
×
UNCOV
1083
    SVirtualTableNode* pVtable = (SVirtualTableNode*)pFromTable;
×
UNCOV
1084
    pTable = &pVtable->table;
×
1085

1086
  } else if (nodeType(pFromTable) == QUERY_NODE_JOIN_TABLE) {
×
1087
    code = initAuditTableListInfo(pTbListInfo);
×
1088
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1089

1090
    SJoinTableNode* pJoinTable = (SJoinTableNode*)pFromTable;
×
UNCOV
1091
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pLeft, NULL, pTbListInfo);
×
1092
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1093
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pRight, NULL, pTbListInfo);
×
1094
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1095
  }
1096
  if (pTbListInfo->dbList == NULL && pTable && pReq) {
×
1097
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTable->tableName));
×
1098
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTable->dbName));
×
1099
  } else if (pTbListInfo->dbList != NULL && pTable) {
×
UNCOV
1100
    void* tmp = taosArrayPush(pTbListInfo->dbList, pTable->dbName);
×
1101
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1102
    tmp = taosArrayPush(pTbListInfo->tableList, pTable->tableName);
×
1103
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1104
  }
1105

1106
_exit:
×
1107
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1108
    tscError("[report] failed to set select stmt audit req table info, code:%d, lino:%d", code, lino);
×
1109
  }
1110
  return code;
×
1111
}
1112

1113
static int32_t setSelectStmtAuditReqTableInfo(SSelectStmt* pStmt, SAuditReq* pReq) {
×
1114
  int32_t             code = TSDB_CODE_SUCCESS;
×
1115
  SAuditTableListInfo tableListInfo = {0};
×
1116

1117
  code = doSetSelectStmtAuditReqTableInfo(pStmt->pFromTable, pReq, &tableListInfo);
×
1118
  if (code == TSDB_CODE_SUCCESS && tableListInfo.dbList) {
×
1119
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1120
    destroyAuditTableListInfo(&tableListInfo);
×
1121
  }
UNCOV
1122
  return code;
×
1123
}
1124

1125
static int32_t setOperatorTableInfo(SSetOperator* pSetOperator, SAuditReq* pReq) {
×
1126
  int32_t             code = TSDB_CODE_SUCCESS;
×
1127
  SAuditTableListInfo tableListInfo = {0};
×
1128
  code = initAuditTableListInfo(&tableListInfo);
×
1129
  if (code != TSDB_CODE_SUCCESS) {
×
1130
    return code;
×
1131
  }
UNCOV
1132
  code = doSetOperatorTableInfo(pSetOperator, &tableListInfo);
×
1133
  if (code == TSDB_CODE_SUCCESS) {
×
1134
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1135
  }
UNCOV
1136
  destroyAuditTableListInfo(&tableListInfo);
×
1137
  return code;
×
1138
}
1139

1140
static int32_t setAuditReqTableInfo(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
1141
  int32_t code = TSDB_CODE_SUCCESS;
×
1142

UNCOV
1143
  if (QUERY_NODE_DELETE_STMT == type) {
×
1144
    SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
×
1145
    return setDeleteStmtAuditReqTableInfo(pStmt, pReq);
×
1146
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1147
    return setModifyStmtAuditReqTableInfo((SVnodeModifyOpStmt*)pRequest->pQuery->pRoot, pReq);
×
UNCOV
1148
  } else if (QUERY_NODE_SELECT_STMT == type) {
×
1149
    return setSelectStmtAuditReqTableInfo((SSelectStmt*)pRequest->pQuery->pRoot, pReq);
×
UNCOV
1150
  } else if (QUERY_NODE_SET_OPERATOR == type) {
×
UNCOV
1151
    return setOperatorTableInfo((SSetOperator*)pRequest->pQuery->pRoot, pReq);
×
1152
  }
1153
  tscError("[report]unsupported report type: %s", nodesNodeName(type));
×
1154
  return code;
×
1155
}
1156

1157
static void setAuditReqAffectedRows(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
UNCOV
1158
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1159
    pReq->affectedRows = pRequest->body.resInfo.numOfRows;
×
1160
  } else if (QUERY_NODE_SELECT_STMT == type || QUERY_NODE_SET_OPERATOR == type) {
×
1161
    pReq->affectedRows = 0;
×
1162
  }
1163
}
×
1164

UNCOV
1165
static void setAuditReqOperation(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
UNCOV
1166
  if (QUERY_NODE_DELETE_STMT == type) {
×
1167
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "delete"));
×
1168
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
UNCOV
1169
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "insert"));
×
1170
  } else if (QUERY_NODE_SELECT_STMT == type) {
×
1171
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "select"));
×
1172
  }
1173
}
×
1174

1175
static bool needSendReport(SAppInstServerCFG* pCfg, ENodeType type) {
744,186,189✔
1176
  if (pCfg->auditLevel < AUDIT_LEVEL_DATA) {
744,186,189✔
1177
    return false;
744,183,935✔
1178
  }
UNCOV
1179
  if (type == QUERY_NODE_SELECT_STMT) {
×
1180
    return pCfg->enableAuditSelect != 0;
×
1181
  } else if (type == QUERY_NODE_DELETE_STMT) {
×
UNCOV
1182
    return pCfg->enableAuditDelete != 0;
×
UNCOV
1183
  } else if (type == QUERY_NODE_VNODE_MODIFY_STMT) {
×
1184
    return pCfg->enableAuditInsert != 0;
×
1185
  } else if (type == QUERY_NODE_SET_OPERATOR) {
×
1186
    return pCfg->enableAuditSelect != 0;
×
1187
  }
1188
  tscError("[report] unsupported report type: %s", nodesNodeName(type));
×
1189

1190
  return false;
×
1191
}
1192

1193
static void reportSqlExecResult(SRequestObj* pRequest, ENodeType type) {
766,002,188✔
1194
  int32_t  code = TSDB_CODE_SUCCESS;
766,002,188✔
1195
  STscObj* pTscObj = pRequest->pTscObj;
766,002,188✔
1196

1197
  if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
766,004,404✔
1198
    tscError("[report][%s] invalid tsc obj", nodesNodeName(type));
8,056✔
1199
    return;
2,900,896✔
1200
  }
1201
  if (pRequest->code != TSDB_CODE_SUCCESS) {
765,996,783✔
1202
    tscDebug("[report][%s] request result code:%d, skip audit", nodesNodeName(type), pRequest->code);
21,815,725✔
1203
    return;
21,815,725✔
1204
  }
1205

1206
  if (!needSendReport(&pTscObj->pAppInfo->serverCfg, type)) {
744,183,652✔
1207
    tscTrace("[report][%s] audit is disabled", nodesNodeName(type));
744,181,896✔
1208
    return;
744,182,155✔
1209
  }
1210

1211
  SAuditReq req;
×
UNCOV
1212
  req.pSql = pRequest->sqlstr;
×
1213
  req.sqlLen = pRequest->sqlLen;
×
UNCOV
1214
  setAuditReqAffectedRows(pRequest, type, &req);
×
UNCOV
1215
  code = setAuditReqTableInfo(pRequest, type, &req);
×
UNCOV
1216
  if (code == TSDB_CODE_TSC_INVALID_OPERATION) {
×
1217
    return;
×
UNCOV
1218
  } else if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1219
    tscError("[report][%s] failed to set audit req table info, code:%d", nodesNodeName(type), code);
×
UNCOV
1220
    return;
×
1221
  }
UNCOV
1222
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
×
UNCOV
1223
  req.duration = duration / 1000000.0;  // convert to seconds
×
UNCOV
1224
  setAuditReqOperation(pRequest, type, &req);
×
1225

UNCOV
1226
  int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
×
UNCOV
1227
  void*   pReq = taosMemoryCalloc(1, tlen);
×
UNCOV
1228
  if (pReq == NULL) {
×
UNCOV
1229
    tscError("[report][%s] failed to allocate memory for req", nodesNodeName(type));
×
UNCOV
1230
    return;
×
1231
  }
1232

UNCOV
1233
  if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
×
UNCOV
1234
    tscError("[report][%s] failed to serialize req", nodesNodeName(type));
×
UNCOV
1235
    taosMemoryFree(pReq);
×
UNCOV
1236
    return;
×
1237
  }
1238

1239
  code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
×
1240
  if (code != 0) {
×
1241
    tscError("[report][%s] failed to send audit info, code:%d", nodesNodeName(type), code);
×
1242
    taosMemoryFree(pReq);
×
1243
    return;
×
1244
  }
1245
  tscDebug("[report][%s] data, sql:%s", nodesNodeName(type), req.pSql);
×
1246
}
1247

1248
void clientOperateReport(SRequestObj* pRequest) {
861,198,840✔
1249
  if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
861,198,840✔
1250
    tscDebug("[report] invalid request");
95,194,227✔
1251
    return;
95,196,499✔
1252
  }
1253
  ENodeType type = nodeType(pRequest->pQuery->pRoot);
766,007,430✔
1254
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_SELECT_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type ||
1255
      QUERY_NODE_SET_OPERATOR) {
1256
    reportSqlExecResult(pRequest, type);
766,002,028✔
1257
  }
1258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc