• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4907

30 Dec 2025 10:52AM UTC coverage: 65.541% (+0.03%) from 65.514%
#4907

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

808 existing lines in 106 files now uncovered.

193920 of 295877 relevant lines covered (65.54%)

118520209.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.85
/source/client/src/clientMonitor.c
1
#include "clientMonitor.h"
2
#include "cJSON.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5
#include "query.h"
6
#include "taoserror.h"
7
#include "tarray.h"
8
#include "tmisce.h"
9
#include "tqueue.h"
10
#include "ttime.h"
11
#include "ttimer.h"
12

13
SRWLatch    monitorLock;
14
void*       monitorTimer;
15
SHashObj*   monitorCounterHash;
16
int32_t     monitorFlag = 0;
17
int32_t     quitCnt = 0;
18
tsem2_t     monitorSem;
19
STaosQueue* monitorQueue;
20
SHashObj*   monitorSlowLogHash;
21
char        tmpSlowLogPath[PATH_MAX] = {0};
22
TdThread    monitorThread;
23
extern bool tsEnableAuditDelete;
24

25
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
1,634,953✔
26
  int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
1,634,953✔
27
  if (ret < 0) {
1,634,953✔
28
    tscError("failed to get tmp path ret:%d", ret);
×
29
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
30
  }
31
  return 0;
1,634,953✔
32
}
33

34
static void processFileInTheEnd(TdFilePtr pFile, char* path) {
×
35
  if (pFile == NULL) {
×
36
    return;
×
37
  }
38
  if (taosFtruncateFile(pFile, 0) != 0) {
×
39
    tscError("failed to truncate file:%s, terrno:%d", path, terrno);
×
40
    return;
×
41
  }
42
  if (taosUnLockFile(pFile) != 0) {
×
43
    tscError("failed to unlock file:%s, terrno:%d", path, terrno);
×
44
    return;
×
45
  }
46
  if (taosCloseFile(&(pFile)) != 0) {
×
47
    tscError("failed to close file:%s, terrno:%d", path, terrno);
×
48
    return;
×
49
  }
50
  if (taosRemoveFile(path) != 0) {
×
51
    tscError("failed to remove file:%s, terrno:%d", path, terrno);
×
52
    return;
×
53
  }
54
}
55

56
static void destroySlowLogClient(void* data) {
×
57
  if (data == NULL) {
×
58
    return;
×
59
  }
60
  SlowLogClient* slowLogClient = *(SlowLogClient**)data;
×
61
  processFileInTheEnd(slowLogClient->pFile, slowLogClient->path);
×
62
  taosMemoryFree(slowLogClient);
×
63
}
64

65
static void destroyMonitorClient(void* data) {
1,632,265✔
66
  if (data == NULL) {
1,632,265✔
67
    return;
×
68
  }
69
  MonitorClient* pMonitor = *(MonitorClient**)data;
1,632,265✔
70
  if (pMonitor == NULL) {
1,632,265✔
71
    return;
×
72
  }
73
  if (!taosTmrStopA(&pMonitor->timer)) {
1,632,265✔
74
    tscError("failed to stop timer, pMonitor:%p", pMonitor);
212,526✔
75
  }
76
  taosHashCleanup(pMonitor->counters);
1,632,265✔
77
  int ret = taos_collector_registry_destroy(pMonitor->registry);
1,632,265✔
78
  if (ret) {
1,632,265✔
79
    tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
×
80
  }
81
  taosMemoryFree(pMonitor);
1,632,265✔
82
}
83

84
static void monitorFreeSlowLogData(void* paras) {
1,632,457✔
85
  MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
1,632,457✔
86
  if (pData == NULL) {
1,632,457✔
87
    return;
200✔
88
  }
89
  taosMemoryFreeClear(pData->data);
1,632,257✔
90
  if (pData->type == SLOW_LOG_READ_BEGINNIG) {
1,632,257✔
91
    taosMemoryFree(pData->fileName);
1,632,257✔
92
  }
93
}
94

95
static void monitorFreeSlowLogDataEx(void* paras) {
200✔
96
  monitorFreeSlowLogData(paras);
200✔
97
  taosMemoryFree(paras);
200✔
98
}
200✔
99

100
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
4,673,282✔
101
  void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
4,673,282✔
102
  if (p == NULL) {
4,673,282✔
103
    tscError("failed to get app inst, clusterId:0x%" PRIx64, clusterId);
×
104
    return NULL;
×
105
  }
106
  return *(SAppInstInfo**)p;
4,673,282✔
107
}
108

109
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
200✔
110
  if (TSDB_CODE_SUCCESS != code) {
200✔
111
    tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
×
112
  }
113
  if (pMsg) {
200✔
114
    taosMemoryFree(pMsg->pData);
200✔
115
    taosMemoryFree(pMsg->pEpSet);
200✔
116
  }
117
  if (param != NULL) {
200✔
118
    MonitorSlowLogData* p = (MonitorSlowLogData*)param;
×
119
    if (code != 0) {
×
120
      tscError("failed to send slow log:%s, clusterId:0x%" PRIx64, p->data, p->clusterId);
×
121
    }
122
    MonitorSlowLogData tmp = {.clusterId = p->clusterId,
×
123
                              .type = p->type,
×
124
                              .fileName = p->fileName,
×
125
                              .pFile = p->pFile,
×
126
                              .offset = p->offset,
×
127
                              .data = NULL};
128
    if (monitorPutData2MonitorQueue(tmp) == 0) {
×
129
      p->fileName = NULL;
×
130
    } else {
131
      if (taosCloseFile(&(p->pFile)) != 0) {
×
132
        tscError("failed to close file:%p", p->pFile);
×
133
      }
134
    }
135
  }
136
  return TSDB_CODE_SUCCESS;
200✔
137
}
138

139
static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) {
200✔
140
  SStatisReq sStatisReq;
77✔
141
  sStatisReq.pCont = pCont;
200✔
142
  sStatisReq.contLen = strlen(pCont);
200✔
143
  sStatisReq.type = type;
200✔
144

145
  int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
200✔
146
  if (tlen < 0) {
200✔
147
    goto FAILED;
×
148
  }
149
  void* buf = taosMemoryMalloc(tlen);
200✔
150
  if (buf == NULL) {
200✔
151
    tscError("sendReport failed, out of memory, len:%d", tlen);
×
152
    goto FAILED;
×
153
  }
154
  tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
200✔
155
  if (tlen < 0) {
200✔
156
    taosMemoryFree(buf);
×
157
    goto FAILED;
×
158
  }
159

160
  SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
200✔
161
  if (pInfo == NULL) {
200✔
162
    tscError("sendReport failed, out of memory send info");
×
163
    taosMemoryFree(buf);
×
164
    goto FAILED;
×
165
  }
166
  pInfo->fp = monitorReportAsyncCB;
200✔
167
  pInfo->msgInfo.pData = buf;
200✔
168
  pInfo->msgInfo.len = tlen;
200✔
169
  pInfo->msgType = TDMT_MND_STATIS;
200✔
170
  pInfo->param = param;
200✔
171
  pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
200✔
172
  pInfo->requestId = tGenIdPI64();
200✔
173
  pInfo->requestObjRefId = 0;
200✔
174

175
  // int64_t transporterId = 0;
176
  return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
200✔
177

178
FAILED:
×
179
  if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
×
180
    tscError("failed to close file:%p", ((MonitorSlowLogData*)param)->pFile);
×
181
  }
182
  monitorFreeSlowLogDataEx(param);
×
183
  return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
×
184
}
185

186
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
1,408,760✔
187
  char ts[50] = {0};
1,408,760✔
188
  (void)snprintf(ts, sizeof(ts), "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
1,408,760✔
189
  char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
1,408,760✔
190
  if (NULL == pCont) {
1,408,760✔
191
    tscError("generateClusterReport failed, get null content. since %s", tstrerror(terrno));
1,408,560✔
192
    return;
1,408,560✔
193
  }
194

195
  if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
200✔
196
    int ret = taos_collector_registry_clear_batch(registry);
200✔
197
    if (ret) {
200✔
198
      tscError("failed to clear registry, ret:%d", ret);
×
199
    }
200
  }
201
  taosMemoryFreeClear(pCont);
200✔
202
}
203

204
static void reportSendProcess(void* param, void* tmrId) {
1,421,073✔
205
  taosRLockLatch(&monitorLock);
1,421,073✔
206
  if (atomic_load_32(&monitorFlag) == 1) {
1,421,073✔
207
    taosRUnLockLatch(&monitorLock);
12,313✔
208
    return;
12,313✔
209
  }
210

211
  MonitorClient* pMonitor = (MonitorClient*)param;
1,408,760✔
212
  SAppInstInfo*  pInst = getAppInstByClusterId(pMonitor->clusterId);
1,408,760✔
213
  if (pInst == NULL) {
1,408,760✔
214
    taosRUnLockLatch(&monitorLock);
×
215
    return;
×
216
  }
217

218
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
1,408,760✔
219
  generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
1,408,760✔
220
  bool reset =
221
      taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
1,408,760✔
222
  tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
1,408,760✔
223
  taosRUnLockLatch(&monitorLock);
1,408,760✔
224
}
225

226
static void sendAllCounter() {
1,633,574✔
227
  MonitorClient** ppMonitor = NULL;
1,633,574✔
228
  while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) {
1,633,574✔
229
    MonitorClient* pMonitor = *ppMonitor;
×
230
    if (pMonitor == NULL) {
×
231
      continue;
×
232
    }
233
    SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
×
234
    if (pInst == NULL) {
×
235
      taosHashCancelIterate(monitorSlowLogHash, ppMonitor);
×
236
      break;
×
237
    }
238
    SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
239
    generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
×
240
  }
241
}
1,633,574✔
242

243
void monitorCreateClient(int64_t clusterId) {
3,264,530✔
244
  MonitorClient* pMonitor = NULL;
3,264,530✔
245
  taosWLockLatch(&monitorLock);
3,264,530✔
246
  if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
3,264,530✔
247
    tscInfo("clusterId:0x%" PRIx64 ", create monitor", clusterId);
1,632,265✔
248
    pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
1,632,265✔
249
    if (pMonitor == NULL) {
1,632,265✔
250
      tscError("failed to create monitor client");
×
251
      goto fail;
×
252
    }
253
    pMonitor->clusterId = clusterId;
1,632,265✔
254
    char clusterKey[32] = {0};
1,632,265✔
255
    if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) {
1,632,265✔
256
      tscError("failed to create cluster key");
×
257
      goto fail;
×
258
    }
259
    pMonitor->registry = taos_collector_registry_new(clusterKey);
1,632,265✔
260
    if (pMonitor->registry == NULL) {
1,632,265✔
261
      tscError("failed to create registry");
×
262
      goto fail;
×
263
    }
264
    pMonitor->colector = taos_collector_new(clusterKey);
1,632,265✔
265
    if (pMonitor->colector == NULL) {
1,632,265✔
266
      tscError("failed to create collector");
×
267
      goto fail;
×
268
    }
269

270
    int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
1,632,265✔
271
    if (r) {
1,632,265✔
272
      tscError("failed to register collector, ret:%d", r);
×
273
      goto fail;
×
274
    }
275
    pMonitor->counters =
1,642,617✔
276
        (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,632,265✔
277
    if (pMonitor->counters == NULL) {
1,632,265✔
278
      tscError("failed to create monitor counters");
×
279
      goto fail;
×
280
    }
281

282
    if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
1,632,265✔
283
      tscError("failed to put monitor client to hash");
×
284
      goto fail;
×
285
    }
286

287
    SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
1,632,265✔
288
    if (pInst == NULL) {
1,632,265✔
289
      tscError("failed to get app instance by cluster id");
×
290
      pMonitor = NULL;
×
291
      goto fail;
×
292
    }
293
    pMonitor->timer =
1,642,617✔
294
        taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
1,632,265✔
295
    if (pMonitor->timer == NULL) {
1,632,265✔
296
      tscError("failed to start timer");
×
297
      goto fail;
×
298
    }
299
    tscInfo("clusterId:0x%" PRIx64 ", create monitor finished, monitor:%p", clusterId, pMonitor);
1,632,265✔
300
  }
301
  taosWUnLockLatch(&monitorLock);
3,264,530✔
302

303
  return;
3,264,530✔
304

305
fail:
×
306
  destroyMonitorClient(&pMonitor);
×
307
  taosWUnLockLatch(&monitorLock);
×
308
}
309

310
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count,
3,264,530✔
311
                                const char** label_keys) {
312
  taosWLockLatch(&monitorLock);
3,264,530✔
313
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
3,264,530✔
314
  if (ppMonitor == NULL || *ppMonitor == NULL) {
3,264,530✔
315
    tscError("failed to get monitor client");
×
316
    goto end;
×
317
  }
318
  taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
3,264,530✔
319
  if (newCounter == NULL) return;
3,264,530✔
320
  MonitorClient* pMonitor = *ppMonitor;
3,264,530✔
321
  if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
3,264,530✔
UNCOV
322
    tscError("failed to add metric to collector");
×
UNCOV
323
    int r = taos_counter_destroy(newCounter);
×
UNCOV
324
    if (r) {
×
325
      tscError("failed to destroy counter, code:%d", r);
×
326
    }
UNCOV
327
    goto end;
×
328
  }
329
  if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
3,264,530✔
330
    tscError("failed to put counter to monitor");
×
331
    int r = taos_counter_destroy(newCounter);
×
332
    if (r) {
×
333
      tscError("failed to destroy counter, code:%d", r);
×
334
    }
335
    goto end;
×
336
  }
337
  tscInfo("clusterId:0x%" PRIx64 ", monitor:%p, create counter:%s %p", pMonitor->clusterId, pMonitor, name,
3,264,530✔
338
          newCounter);
339

340
end:
3,243,826✔
341
  taosWUnLockLatch(&monitorLock);
3,264,530✔
342
}
343

344
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
120,577✔
345
  taosWLockLatch(&monitorLock);
120,577✔
346
  if (atomic_load_32(&monitorFlag) == 1) {
120,577✔
347
    taosWUnLockLatch(&monitorLock);
8✔
348
    return;
8✔
349
  }
350

351
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
120,569✔
352
  if (ppMonitor == NULL || *ppMonitor == NULL) {
120,569✔
353
    tscError("clusterId:0x%" PRIx64 ", monitor not found", clusterId);
×
354
    goto end;
×
355
  }
356

357
  MonitorClient*   pMonitor = *ppMonitor;
120,569✔
358
  taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
120,569✔
359
  if (ppCounter == NULL || *ppCounter == NULL) {
120,569✔
360
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s not found", clusterId, pMonitor, counterName);
×
361
    goto end;
×
362
  }
363
  if (taos_counter_inc(*ppCounter, label_values) != 0) {
120,569✔
364
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s inc failed", clusterId, pMonitor, counterName);
×
365
    goto end;
×
366
  }
367
  tscTrace("clusterId:0x%" PRIx64 ", monitor:%p, counter:%s inc", pMonitor->clusterId, pMonitor, counterName);
120,569✔
368

369
end:
120,569✔
370
  taosWUnLockLatch(&monitorLock);
120,569✔
371
}
372

373
const char* monitorResultStr(SQL_RESULT_CODE code) {
120,577✔
374
  static const char* result_state[] = {"Success", "Failed", "Cancel"};
375
  return result_state[code];
120,577✔
376
}
377

378
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
×
379
  TdFilePtr pFile = NULL;
×
380
  void*     tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
×
381
  if (tmp == NULL) {
×
382
    char path[PATH_MAX] = {0};
×
383
    char clusterId[32] = {0};
×
384
    if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0) {
×
385
      tscError("failed to generate clusterId:0x%" PRIx64, slowLogData->clusterId);
×
386
      return;
×
387
    }
388
    taosGetTmpfilePath(tmpPath, clusterId, path);
×
389
    tscInfo("monitor create slow log file:%s", path);
×
390
    pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
391
    if (pFile == NULL) {
×
392
      tscError("failed to open file:%s since %d", path, terrno);
×
393
      return;
×
394
    }
395

396
    SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
×
397
    if (pClient == NULL) {
×
398
      tscError("failed to allocate memory for slow log client");
×
399
      int32_t ret = taosCloseFile(&pFile);
×
400
      if (ret != 0) {
×
401
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
402
      }
403
      return;
×
404
    }
405
    pClient->lastCheckTime = taosGetMonoTimestampMs();
×
406
    tstrncpy(pClient->path, path, PATH_MAX);
×
407
    pClient->offset = 0;
×
408
    pClient->pFile = pFile;
×
409
    if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
×
410
      tscError("failed to put clusterId:0x%" PRIx64 " to hash table", slowLogData->clusterId);
×
411
      int32_t ret = taosCloseFile(&pFile);
×
412
      if (ret != 0) {
×
413
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
414
      }
415
      taosMemoryFree(pClient);
×
416
      return;
×
417
    }
418

419
    if (taosLockFile(pFile) < 0) {
×
420
      tscError("failed to lock file:%p since %s", pFile, terrstr());
×
421
      return;
×
422
    }
423
  } else {
424
    pFile = (*(SlowLogClient**)tmp)->pFile;
×
425
  }
426

427
  if (taosLSeekFile(pFile, 0, SEEK_END) < 0) {
×
428
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
429
    return;
×
430
  }
431
  if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) {
×
432
    tscError("failed to write len to file:%p since %s", pFile, terrstr());
×
433
  }
434
  tscDebug("monitor write slow log to file:%p, clusterId:0x%" PRIx64, pFile, slowLogData->clusterId);
×
435
}
436

437
static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
×
438
  tscDebug("monitor readFile slow begin pFile:%p, offset:%" PRId64 ", size:%" PRId64, pFile, *offset, size);
×
439
  if (taosLSeekFile(pFile, *offset, SEEK_SET) < 0) {
×
440
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
441
    return NULL;
×
442
  }
443

444
  if ((size <= *offset)) {
×
445
    tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
×
446
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
447
    return NULL;
×
448
  }
449
  char*   pCont = NULL;
×
450
  int64_t totalSize = 0;
×
451
  if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
×
452
    totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX;
×
453
  } else {
454
    totalSize = 4 + (size - *offset);
×
455
  }
456

457
  pCont = taosMemoryCalloc(1, totalSize);  // 4 reserved for []
×
458
  if (pCont == NULL) {
×
459
    tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
×
460
    return NULL;
×
461
  }
462
  char* buf = pCont;
×
463
  (void)strncat(buf++, "[", totalSize - 1);
×
464
  int64_t readSize = taosReadFile(pFile, buf, totalSize - 4); // 4 reserved for []
×
465
  if (readSize <= 0) {
×
466
    if (readSize < 0) {
×
467
      tscError("failed to read len from file:%p since %s", pFile, terrstr());
×
468
    }
469
    taosMemoryFree(pCont);
×
470
    return NULL;
×
471
  }
472

473
  totalSize = 0;
×
474
  while (1) {
×
475
    size_t len = strlen(buf);
×
476
    totalSize += (len + 1);
×
477
    if (totalSize > readSize || len == 0) {
×
478
      *(buf - 1) = ']';
×
479
      *buf = '\0';
×
480
      break;
×
481
    }
482
    buf[len] = ',';  // replace '\0' with ','
×
483
    buf += (len + 1);
×
484
    *offset += (len + 1);
×
485
  }
486

487
  tscDebug("monitor readFile slow log end, data:%s, offset:%" PRId64, pCont, *offset);
×
488
  return pCont;
×
489
}
490

491
static int64_t getFileSize(char* path) {
×
492
  int64_t fileSize = 0;
×
493
  if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
×
494
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
495
  }
496

497
  return fileSize;
×
498
}
499

500
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type,
×
501
                           char* fileName, void* pTransporter, SEpSet* epSet) {
502
  if (data == NULL) {
×
503
    if (taosCloseFile(&pFile) != 0) {
×
504
      tscError("failed to close file:%p", pFile);
×
505
    }
506
    taosMemoryFree(fileName);
×
507
    return TSDB_CODE_INVALID_PARA;
×
508
  }
509
  MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
×
510
  if (pParam == NULL) {
×
511
    if (taosCloseFile(&pFile) != 0) {
×
512
      tscError("failed to close file:%p", pFile);
×
513
    }
514
    taosMemoryFree(data);
×
515
    taosMemoryFree(fileName);
×
516
    return terrno;
×
517
  }
518
  pParam->data = data;
×
519
  pParam->offset = offset;
×
520
  pParam->clusterId = clusterId;
×
521
  pParam->type = type;
×
522
  pParam->pFile = pFile;
×
523
  pParam->fileName = fileName;
×
524
  return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
×
525
}
526

527
static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size,
×
528
                               SLOW_LOG_QUEUE_TYPE type, char* fileName) {
529
  SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
×
530
  if (pInst == NULL) {
×
531
    tscError("failed to get app instance by clusterId:0x%" PRIx64, clusterId);
×
532
    if (taosCloseFile(&pFile) != 0) {
×
533
      tscError("failed to close file:%p", pFile);
×
534
    }
535
    taosMemoryFree(fileName);
×
536
    return terrno;
×
537
  }
538
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
539
  char*  data = readFile(pFile, offset, size);
×
540
  if (data == NULL) return terrno;
×
541
  return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
×
542
                     pInst->pTransporter, &ep);
543
}
544

545
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
×
546
  if (fileName == NULL) {
×
547
    return;
×
548
  }
549
  int64_t size = getFileSize(*fileName);
×
550
  if (size <= offset) {
×
551
    processFileInTheEnd(pFile, *fileName);
×
552
    tscDebug("monitor delete file:%s", *fileName);
×
553
  } else {
554
    int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
×
555
    if (code == 0) {
×
556
      tscDebug("monitor send slow log succ, clusterId:0x%" PRIx64, clusterId);
×
557
    } else {
558
      tscError("monitor send slow log failed, clusterId:0x%" PRIx64 ", ret:%d", clusterId,
×
559
               code);
560
    }
561
    *fileName = NULL;
×
562
  }
563
}
564

565
static void monitorSendSlowLogAtRunning(int64_t clusterId) {
×
566
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
567
  if (tmp == NULL) {
×
568
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
569
    return;
×
570
  }
571
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
572
  if (pClient == NULL) {
×
573
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
574
    return;
×
575
  }
576
  int64_t size = getFileSize(pClient->path);
×
577
  if (size <= pClient->offset) {
×
578
    if (taosFtruncateFile(pClient->pFile, 0) < 0) {
×
579
      tscError("failed to truncate file:%p code:%d", pClient->pFile, terrno);
×
580
    }
581
    tscDebug("monitor truncate file to 0 file:%p", pClient->pFile);
×
582
    pClient->offset = 0;
×
583
  } else {
584
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
×
585
    tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
×
586
  }
587
}
588

589
static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
×
590
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
591
  if (tmp == NULL) {
×
592
    return true;
×
593
  }
594
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
595
  if (pClient == NULL) {
×
596
    return true;
×
597
  }
598
  int64_t size = getFileSize(pClient->path);
×
599
  if (size <= pClient->offset) {
×
600
    processFileInTheEnd(pClient->pFile, pClient->path);
×
601
    pClient->pFile = NULL;
×
602
    tscInfo("monitor remove file:%s", pClient->path);
×
603
    if ((--quitCnt) == 0) {
×
604
      return true;
×
605
    }
606
  } else {
607
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
×
608
    tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
×
609
  }
610
  return false;
×
611
}
612
static void monitorSendAllSlowLogAtQuit() {
1,633,559✔
613
  void* pIter = NULL;
1,633,559✔
614
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
1,633,559✔
615
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
×
616
    if (pClient == NULL) {
×
617
      continue;
×
618
    }
619
    int64_t size = getFileSize(pClient->path);
×
620
    if (size <= pClient->offset) {
×
621
      processFileInTheEnd(pClient->pFile, pClient->path);
×
622
      pClient->pFile = NULL;
×
623
    } else if (pClient->offset == 0) {
×
624
      int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
×
625
      int32_t  code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
×
626
      tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
×
627
      if (code == 0) {
×
628
        quitCnt++;
×
629
      }
630
    }
631
  }
632
}
1,633,559✔
633

634
static void processFileRemoved(SlowLogClient* pClient) {
×
635
  if (taosUnLockFile(pClient->pFile) != 0) {
×
636
    tscError("failed to unlock file:%s since %d", pClient->path, terrno);
×
637
    return;
×
638
  }
639
  int32_t ret = taosCloseFile(&(pClient->pFile));
×
640
  if (ret != 0) {
×
641
    tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
×
642
    return;
×
643
  }
644

645
  TdFilePtr pFile =
646
      taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
647
  if (pFile == NULL) {
×
648
    tscError("failed to open file:%s since %d", pClient->path, terrno);
×
649
  } else {
650
    pClient->pFile = pFile;
×
651
  }
652
}
653

654
static void monitorSendAllSlowLog() {
494,542,111✔
655
  int64_t t = taosGetMonoTimestampMs();
494,542,111✔
656
  void*   pIter = NULL;
494,542,111✔
657
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
494,542,111✔
658
    int64_t*       clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
×
659
    SAppInstInfo*  pInst = getAppInstByClusterId(*clusterId);
×
660
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
×
661
    if (pClient == NULL || pInst == NULL) {
×
662
      taosHashCancelIterate(monitorSlowLogHash, pIter);
×
663
      return;
×
664
    }
665
    if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) {
×
666
      pClient->lastCheckTime = t;
×
667
    } else {
668
      continue;
×
669
    }
670

671
    if (pClient->offset == 0) {
×
672
      int64_t size = getFileSize(pClient->path);
×
673
      if (size <= 0) {
×
674
        if (size < 0) {
×
675
          tscError("monitor failed to get file size:%s, err:%d", pClient->path, ERRNO);
×
676
          if (ERRNO == ENOENT) {
×
677
            processFileRemoved(pClient);
×
678
          }
679
        }
680
        continue;
×
681
      }
682
      int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
×
683
      tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
×
684
    }
685
  }
686
}
687

688
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
1,632,257✔
689
  SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
1,632,257✔
690

691
  if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
1,632,257✔
692
    tscInfo("monitor is disabled, skip send slow log");
1,630,863✔
693
    return;
1,630,863✔
694
  }
695
  char namePrefix[PATH_MAX] = {0};
1,394✔
696
  if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
1,394✔
697
    tscError("failed to generate slow log file name prefix");
×
698
    return;
×
699
  }
700

701
  char tmpPath[PATH_MAX] = {0};
1,394✔
702
  if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
1,394✔
703
    return;
×
704
  }
705

706
  TdDirPtr pDir = taosOpenDir(tmpPath);
1,394✔
707
  if (pDir == NULL) {
1,394✔
708
    return;
×
709
  }
710

711
  TdDirEntryPtr de = NULL;
1,394✔
712
  while ((de = taosReadDir(pDir)) != NULL) {
4,182✔
713
    if (taosDirEntryIsDir(de)) {
2,788✔
714
      continue;
2,788✔
715
    }
716

717
    char* name = taosGetDirEntryName(de);
×
718
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) {
×
719
      tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId);
×
720
      continue;
×
721
    }
722

723
    char filename[PATH_MAX] = {0};
×
724
    (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
×
725
    TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
×
726
    if (pFile == NULL) {
×
727
      tscError("failed to open file:%s since %s", filename, terrstr());
×
728
      continue;
×
729
    }
730
    if (taosLockFile(pFile) < 0) {
×
731
      tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
×
732
      int32_t ret = taosCloseFile(&pFile);
×
733
      if (ret != 0) {
×
734
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
735
      }
736
      continue;
×
737
    }
738
    char* tmp = taosStrdup(filename);
×
739
    if (tmp == NULL) {
×
740
      tscError("failed to dup string:%s since %s", filename, terrstr());
×
741
      if (taosUnLockFile(pFile) != 0) {
×
742
        tscError("failed to unlock file:%s, terrno:%d", filename, terrno);
×
743
      }
744
      if (taosCloseFile(&(pFile)) != 0) {
×
745
        tscError("failed to close file:%s, terrno:%d", filename, terrno);
×
746
      }
747
      continue;
×
748
    }
749
    monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
×
750
    taosMemoryFree(tmp);
×
751
  }
752

753
  int32_t ret = taosCloseDir(&pDir);
1,394✔
754
  if (ret != 0) {
1,394✔
755
    tscError("failed to close dir, ret:%d", ret);
×
756
  }
757
}
758

759
static void* monitorThreadFunc(void* param) {
1,633,559✔
760
  setThreadName("client-monitor-slowlog");
1,633,559✔
761
  tscInfo("monitor update thread started");
1,633,559✔
762
  int64_t quitTime = 0;
1,633,559✔
763
  while (1) {
494,542,111✔
764
    if (atomic_load_32(&monitorFlag) == 1) {
496,175,670✔
765
      if (quitCnt == 0) {
1,633,559✔
766
        monitorSendAllSlowLogAtQuit();
1,633,559✔
767
        if (quitCnt == 0) {
1,633,559✔
768
          tscInfo("monitorThreadFunc quit since no slow log to send");
1,633,559✔
769
          break;
1,633,559✔
770
        }
771
        quitTime = taosGetMonoTimestampMs();
×
772
      }
773
      if (taosGetMonoTimestampMs() - quitTime > 500) {  // quit at most 500ms
×
774
        tscInfo("monitorThreadFunc quit since timeout");
×
775
        break;
×
776
      }
777
    }
778

779
    MonitorSlowLogData* slowLogData = NULL;
494,542,111✔
780
    taosReadQitem(monitorQueue, (void**)&slowLogData);
494,542,111✔
781
    if (slowLogData != NULL) {
494,542,111✔
782
      if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
1,632,257✔
783
        if (slowLogData->pFile != NULL) {
1,632,257✔
784
          monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile,
×
785
                                        slowLogData->offset);
×
786
        } else {
787
          monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
1,632,257✔
788
        }
789
      } else if (slowLogData->type == SLOW_LOG_WRITE) {
×
790
        monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
×
791
      } else if (slowLogData->type == SLOW_LOG_READ_RUNNING) {
×
792
        monitorSendSlowLogAtRunning(slowLogData->clusterId);
×
793
      } else if (slowLogData->type == SLOW_LOG_READ_QUIT) {
×
794
        if (monitorSendSlowLogAtQuit(slowLogData->clusterId)) {
×
795
          tscInfo("monitorThreadFunc quit since all slow log sended");
×
796
          monitorFreeSlowLogData(slowLogData);
×
797
          taosFreeQitem(slowLogData);
×
798
          break;
×
799
        }
800
      }
801
      monitorFreeSlowLogData(slowLogData);
1,632,257✔
802
      taosFreeQitem(slowLogData);
1,632,257✔
803
    }
804

805
    if (quitCnt == 0) {
494,542,111✔
806
      monitorSendAllSlowLog();
494,542,111✔
807
    }
808
    (void)tsem2_timewait(&monitorSem, 100);
494,542,111✔
809
  }
810
  return NULL;
1,633,559✔
811
}
812

813
static int32_t tscMonitortInit() {
1,633,559✔
814
  TdThreadAttr thAttr;
1,622,906✔
815
  if (taosThreadAttrInit(&thAttr) != 0) {
1,633,559✔
816
    tscError("failed to init thread attr since %s", strerror(ERRNO));
×
817
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
818
  }
819
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
1,633,559✔
820
    tscError("failed to set thread attr since %s", strerror(ERRNO));
×
821
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
822
  }
823

824
  if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
1,633,559✔
825
    tscError("failed to create monitor thread since %s", strerror(ERRNO));
×
826
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
827
  }
828

829
  (void)taosThreadAttrDestroy(&thAttr);
1,633,559✔
830
  return 0;
1,633,559✔
831
}
832

833
static void tscMonitorStop() {
1,633,574✔
834
  if (taosCheckPthreadValid(monitorThread)) {
1,633,574✔
835
    (void)taosThreadJoin(monitorThread, NULL);
1,633,559✔
836
    taosThreadClear(&monitorThread);
1,633,559✔
837
  }
838
}
1,633,574✔
839

840
int32_t monitorInit() {
1,633,559✔
841
  int32_t code = 0;
1,633,559✔
842

843
  tscInfo("monitor init");
1,633,559✔
844
  monitorCounterHash =
1,633,559✔
845
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,633,559✔
846
  if (monitorCounterHash == NULL) {
1,633,559✔
847
    tscError("failed to create monitorCounterHash");
×
848
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
849
  }
850
  taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
1,633,559✔
851

852
  monitorSlowLogHash =
1,633,559✔
853
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,633,559✔
854
  if (monitorSlowLogHash == NULL) {
1,633,559✔
855
    tscError("failed to create monitorSlowLogHash");
×
856
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
857
  }
858
  taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
1,633,559✔
859

860
  monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
1,633,559✔
861
  if (monitorTimer == NULL) {
1,633,559✔
862
    tscError("failed to create monitor timer");
×
863
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
864
  }
865

866
  code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
1,633,559✔
867
  if (code != 0) {
1,633,559✔
868
    return code;
×
869
  }
870

871
  code = taosMulModeMkDir(tmpSlowLogPath, 0777, true);
1,633,559✔
872
  if (code != 0) {
1,633,559✔
873
    tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
×
874
    return code;
×
875
  }
876

877
  if (tsem2_init(&monitorSem, 0, 0) != 0) {
1,633,559✔
878
    tscError("sem init error since %s", terrstr());
×
879
    return TAOS_SYSTEM_ERROR(ERRNO);
×
880
  }
881

882
  code = taosOpenQueue(&monitorQueue);
1,633,559✔
883
  if (code) {
1,633,559✔
884
    tscError("open queue error since %s", terrstr());
×
885
    return TAOS_GET_TERRNO(code);
×
886
  }
887

888
  taosInitRWLatch(&monitorLock);
1,633,559✔
889
  return tscMonitortInit();
1,633,559✔
890
}
891

892
void monitorClose() {
1,633,574✔
893
  tscInfo("monitor close");
1,633,574✔
894
  taosWLockLatch(&monitorLock);
1,633,574✔
895
  atomic_store_32(&monitorFlag, 1);
1,633,574✔
896
  tscMonitorStop();
1,633,574✔
897
  sendAllCounter();
1,633,574✔
898
  taosHashCleanup(monitorCounterHash);
1,633,574✔
899
  taosHashCleanup(monitorSlowLogHash);
1,633,574✔
900
  taosTmrCleanUp(monitorTimer);
1,633,574✔
901
  taosCloseQueue(monitorQueue);
1,633,574✔
902
  if (tsem2_destroy(&monitorSem) != 0) {
1,633,574✔
903
    tscError("failed to destroy semaphore");
×
904
  }
905
  taosWUnLockLatch(&monitorLock);
1,633,574✔
906
}
1,633,574✔
907

908
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
1,632,265✔
909
  int32_t             code = 0;
1,632,265✔
910
  MonitorSlowLogData* slowLogData = NULL;
1,632,265✔
911

912
  if (atomic_load_32(&monitorFlag) == 1) {
1,632,265✔
913
    tscError("monitor slow log thread is exiting");
×
914
    return -1;
×
915
  }
916

917
  code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
1,632,265✔
918
  if (code) {
1,632,265✔
919
    tscError("monitor failed to allocate slow log data");
×
920
    return code;
×
921
  }
922
  *slowLogData = data;
1,632,265✔
923
  tscDebug("monitor write slow log to queue, clusterId:0x%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
1,632,265✔
924
           queueTypeStr[slowLogData->type], slowLogData->data);
925
  if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
1,632,265✔
926
    if (tsem2_post(&monitorSem) != 0) {
1,632,265✔
927
      tscError("failed to post semaphore");
×
928
    }
929
  } else {
930
    if (taosCloseFile(&(slowLogData->pFile)) != 0) {
×
931
      tscError("failed to close file:%p", slowLogData->pFile);
×
932
    }
933
    monitorFreeSlowLogData(slowLogData);
×
934
    taosFreeQitem(slowLogData);
×
935
  }
936
  return 0;
1,632,265✔
937
}
938

939
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
×
940
  taosMemoryFree(pMsg->pData);
×
941
  taosMemoryFree(pMsg->pEpSet);
×
942
  tscDebug("[del report]delete reportCB code:%d", code);
×
943
  return 0;
×
944
}
945

946
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
×
947
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
948
  if (sendInfo == NULL) {
×
949
    tscError("[del report] failed to allocate memory for sendInfo");
×
950
    return terrno;
×
951
  }
952

953
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
×
954

955
  sendInfo->requestId = requestId;
×
956
  sendInfo->requestObjRefId = 0;
×
957
  sendInfo->param = NULL;
×
958
  sendInfo->fp = reportCB;
×
959
  sendInfo->msgType = TDMT_MND_AUDIT;
×
960

961
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
962

963
  int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
964
  if (code != 0) {
×
965
    tscError("[del report] failed to send msg to server, code:%d", code);
×
966
    taosMemoryFree(sendInfo);
×
967
    return code;
×
968
  }
969
  return TSDB_CODE_SUCCESS;
×
970
}
971

972
static int32_t setDeleteStmtAuditReqTableInfo(SDeleteStmt* pStmt, SAuditReq* pReq) {
×
973
  if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
×
974
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->pFromTable->type));
×
975
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
976
  }
977
  SRealTableNode* pTableNode = (SRealTableNode*)pStmt->pFromTable;
×
978
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTableNode->table.tableName));
×
979
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTableNode->table.dbName));
×
980
  return TSDB_CODE_SUCCESS;
×
981
}
982

983
static int32_t setModifyStmtAuditReqTableInfo(SVnodeModifyOpStmt* pStmt, SAuditReq* pReq) {
×
984
  if (pStmt->insertType != TSDB_QUERY_TYPE_INSERT && pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT &&
×
985
      pStmt->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
×
986
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->sqlNodeType));
×
987
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
988
  }
989

990
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pStmt->targetTableName.tname));
×
991
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pStmt->targetTableName.dbname));
×
992
  return TSDB_CODE_SUCCESS;
×
993
}
994

995
typedef struct SAuditTableListInfo {
996
  SArray* dbList;
997
  SArray* tableList;
998
} SAuditTableListInfo;
999

1000
static int32_t initAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
1001
  if (pInfo->dbList) return TSDB_CODE_SUCCESS;
×
1002

1003
  pInfo->dbList = taosArrayInit(4, TSDB_DB_FNAME_LEN);
×
1004
  if (pInfo->dbList == NULL) {
×
1005
    tscError("[report] failed to create db list array");
×
1006
    return terrno;
×
1007
  }
1008

1009
  pInfo->tableList = taosArrayInit(4, TSDB_TABLE_NAME_LEN);
×
1010
  if (pInfo->tableList == NULL) {
×
1011
    tscError("[report] failed to create table list array");
×
1012
    taosArrayDestroy(pInfo->dbList);
×
1013
    return terrno;
×
1014
  }
1015
  return TSDB_CODE_SUCCESS;
×
1016
}
1017

1018
static void destroyAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
1019
  if (pInfo->dbList) {
×
1020
    taosArrayDestroy(pInfo->dbList);
×
1021
    pInfo->dbList = NULL;
×
1022
  }
1023
  if (pInfo->tableList) {
×
1024
    taosArrayDestroy(pInfo->tableList);
×
1025
    pInfo->tableList = NULL;
×
1026
  }
1027
}
×
1028

1029
static void copyTableInfoToAuditReq(SAuditTableListInfo* pTbListInfo, SAuditReq* pReq) {
×
1030
  if (pTbListInfo->dbList->size > 0) {
×
1031
    char* dbName = (char*)taosArrayGet(pTbListInfo->dbList, 0);
×
1032
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", dbName));
×
1033
  }
1034
  if (pTbListInfo->tableList->size > 0) {
×
1035
    char* tableName = (char*)taosArrayGet(pTbListInfo->tableList, 0);
×
1036
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", tableName));
×
1037
  }
1038
}
×
1039

1040
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo);
1041
static int32_t doSetOperatorTableInfo(SSetOperator* pSetOperator, SAuditTableListInfo* pTbListInfo) {
×
1042
  int32_t code = TSDB_CODE_SUCCESS;
×
1043
  if (pSetOperator->pLeft) {
×
1044
    SSelectStmt* pLeftSelect = (SSelectStmt*)pSetOperator->pLeft;
×
1045
    if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SELECT_STMT) {
×
1046
      code = doSetSelectStmtAuditReqTableInfo(pLeftSelect->pFromTable, NULL, pTbListInfo);
×
1047
    } else if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SET_OPERATOR) {
×
1048
      SSetOperator* pLeftSetOperator = (SSetOperator*)pSetOperator->pLeft;
×
1049
      code = doSetOperatorTableInfo(pLeftSetOperator, pTbListInfo);
×
1050
      TAOS_RETURN(code);
×
1051
    }
1052
  }
1053
  if (pSetOperator->pRight) {
×
1054
    SSelectStmt* pRightSelect = (SSelectStmt*)pSetOperator->pRight;
×
1055
    if (nodeType(pSetOperator->pRight) == QUERY_NODE_SELECT_STMT) {
×
1056
      code = doSetSelectStmtAuditReqTableInfo(pRightSelect->pFromTable, NULL, pTbListInfo);
×
1057
      TAOS_RETURN(code);
×
1058
    } else if (nodeType(pSetOperator->pRight) == QUERY_NODE_SET_OPERATOR) {
×
1059
      SSetOperator* pRightSetOperator = (SSetOperator*)pSetOperator->pRight;
×
1060
      code = doSetOperatorTableInfo(pRightSetOperator, pTbListInfo);
×
1061
      TAOS_RETURN(code);
×
1062
    }
1063
  }
1064
  return code;
×
1065
}
1066

1067
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo) {
×
1068
  int32_t     code = TSDB_CODE_SUCCESS;
×
1069
  int32_t     lino = 0;
×
1070
  STableNode* pTable = NULL;
×
1071

1072
  if (!pFromTable) return TSDB_CODE_TSC_INVALID_OPERATION;
×
1073

1074
  if (nodeType(pFromTable) == QUERY_NODE_REAL_TABLE) {
×
1075
    SRealTableNode* pTableNode = (SRealTableNode*)pFromTable;
×
1076
    pTable = &pTableNode->table;
×
1077

1078
  } else if (nodeType(pFromTable) == QUERY_NODE_TEMP_TABLE && ((STempTableNode*)pFromTable)->pSubquery) {
×
1079
    if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SELECT_STMT) {
×
1080
      SSelectStmt* pSubquery = (SSelectStmt*)((STempTableNode*)pFromTable)->pSubquery;
×
1081
      return doSetSelectStmtAuditReqTableInfo(pSubquery->pFromTable, pReq, pTbListInfo);
×
1082

1083
    } else if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SET_OPERATOR) {
×
1084
      code = initAuditTableListInfo(pTbListInfo);
×
1085
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1086

1087
      SSetOperator* pSetOperator = (SSetOperator*)((STempTableNode*)pFromTable)->pSubquery;
×
1088
      code = doSetOperatorTableInfo(pSetOperator, pTbListInfo);
×
1089
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1090
    }
1091
  } else if (nodeType(pFromTable) == QUERY_NODE_VIRTUAL_TABLE && pFromTable) {
×
1092
    SVirtualTableNode* pVtable = (SVirtualTableNode*)pFromTable;
×
1093
    pTable = &pVtable->table;
×
1094

1095
  } else if (nodeType(pFromTable) == QUERY_NODE_JOIN_TABLE) {
×
1096
    code = initAuditTableListInfo(pTbListInfo);
×
1097
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1098

1099
    SJoinTableNode* pJoinTable = (SJoinTableNode*)pFromTable;
×
1100
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pLeft, NULL, pTbListInfo);
×
1101
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1102
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pRight, NULL, pTbListInfo);
×
1103
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1104
  }
1105
  if (pTbListInfo->dbList == NULL && pTable && pReq) {
×
1106
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTable->tableName));
×
1107
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTable->dbName));
×
1108
  } else if (pTbListInfo->dbList != NULL && pTable) {
×
1109
    void* tmp = taosArrayPush(pTbListInfo->dbList, pTable->dbName);
×
1110
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1111
    tmp = taosArrayPush(pTbListInfo->tableList, pTable->tableName);
×
1112
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1113
  }
1114

1115
_exit:
×
1116
  if (code != TSDB_CODE_SUCCESS) {
×
1117
    tscError("[report] failed to set select stmt audit req table info, code:%d, lino:%d", code, lino);
×
1118
  }
1119
  return code;
×
1120
}
1121

1122
static int32_t setSelectStmtAuditReqTableInfo(SSelectStmt* pStmt, SAuditReq* pReq) {
×
1123
  int32_t             code = TSDB_CODE_SUCCESS;
×
1124
  SAuditTableListInfo tableListInfo = {0};
×
1125

1126
  code = doSetSelectStmtAuditReqTableInfo(pStmt->pFromTable, pReq, &tableListInfo);
×
1127
  if (code == TSDB_CODE_SUCCESS && tableListInfo.dbList) {
×
1128
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1129
    destroyAuditTableListInfo(&tableListInfo);
×
1130
  }
1131
  return code;
×
1132
}
1133

1134
static int32_t setOperatorTableInfo(SSetOperator* pSetOperator, SAuditReq* pReq) {
×
1135
  int32_t             code = TSDB_CODE_SUCCESS;
×
1136
  SAuditTableListInfo tableListInfo = {0};
×
1137
  code = initAuditTableListInfo(&tableListInfo);
×
1138
  if (code != TSDB_CODE_SUCCESS) {
×
1139
    return code;
×
1140
  }
1141
  code = doSetOperatorTableInfo(pSetOperator, &tableListInfo);
×
1142
  if (code == TSDB_CODE_SUCCESS) {
×
1143
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1144
  }
1145
  destroyAuditTableListInfo(&tableListInfo);
×
1146
  return code;
×
1147
}
1148

1149
static int32_t setAuditReqTableInfo(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
1150
  int32_t code = TSDB_CODE_SUCCESS;
×
1151

1152
  if (QUERY_NODE_DELETE_STMT == type) {
×
1153
    SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
×
1154
    return setDeleteStmtAuditReqTableInfo(pStmt, pReq);
×
1155
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1156
    return setModifyStmtAuditReqTableInfo((SVnodeModifyOpStmt*)pRequest->pQuery->pRoot, pReq);
×
1157
  } else if (QUERY_NODE_SELECT_STMT == type) {
×
1158
    return setSelectStmtAuditReqTableInfo((SSelectStmt*)pRequest->pQuery->pRoot, pReq);
×
1159
  } else if (QUERY_NODE_SET_OPERATOR == type) {
×
1160
    return setOperatorTableInfo((SSetOperator*)pRequest->pQuery->pRoot, pReq);
×
1161
  }
1162
  tscError("[report]unsupported report type: %s", nodesNodeName(type));
×
1163
  return code;
×
1164
}
1165

1166
static void setAuditReqAffectedRows(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
1167
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1168
    pReq->affectedRows = pRequest->body.resInfo.numOfRows;
×
1169
  } else if (QUERY_NODE_SELECT_STMT == type || QUERY_NODE_SET_OPERATOR == type) {
×
1170
    pReq->affectedRows = 0;
×
1171
  }
1172
}
×
1173

1174
static void setAuditReqOperation(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
1175
  if (QUERY_NODE_DELETE_STMT == type) {
×
1176
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "delete"));
×
1177
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1178
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "insert"));
×
1179
  } else if (QUERY_NODE_SELECT_STMT == type) {
×
1180
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "select"));
×
1181
  }
1182
}
×
1183

1184
static bool needSendReport(SAppInstServerCFG* pCfg, ENodeType type) {
671,316,154✔
1185
  if (pCfg->auditLevel < AUDIT_LEVEL_DATA) {
671,316,154✔
1186
    return false;
671,312,594✔
1187
  }
1188
  if (type == QUERY_NODE_SELECT_STMT) {
×
1189
    return pCfg->enableAuditSelect != 0;
×
1190
  } else if (type == QUERY_NODE_DELETE_STMT) {
×
1191
    return pCfg->enableAuditDelete != 0;
×
1192
  } else if (type == QUERY_NODE_VNODE_MODIFY_STMT) {
×
1193
    return pCfg->enableAuditInsert != 0;
×
1194
  } else if (type == QUERY_NODE_SET_OPERATOR) {
×
1195
    return pCfg->enableAuditSelect != 0;
×
1196
  }
1197
  tscError("[report] unsupported report type: %s", nodesNodeName(type));
×
1198

1199
  return false;
×
1200
}
1201

1202
static void reportSqlExecResult(SRequestObj* pRequest, ENodeType type) {
681,969,845✔
1203
  int32_t  code = TSDB_CODE_SUCCESS;
681,969,845✔
1204
  STscObj* pTscObj = pRequest->pTscObj;
681,969,845✔
1205

1206
  if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
681,975,096✔
1207
    tscError("[report][%s] invalid tsc obj", nodesNodeName(type));
2,236✔
1208
    return;
2,978,830✔
1209
  }
1210
  if (pRequest->code != TSDB_CODE_SUCCESS) {
681,974,139✔
1211
    tscDebug("[report][%s] request result code:%d, skip audit", nodesNodeName(type), pRequest->code);
10,651,193✔
1212
    return;
10,651,193✔
1213
  }
1214

1215
  if (!needSendReport(&pTscObj->pAppInfo->serverCfg, type)) {
671,320,479✔
1216
    tscTrace("[report][%s] audit is disabled", nodesNodeName(type));
671,315,207✔
1217
    return;
671,313,054✔
1218
  }
1219
  
1220
  SAuditReq       req;
×
1221
  req.pSql = pRequest->sqlstr;
×
1222
  req.sqlLen = pRequest->sqlLen;
×
1223
  setAuditReqAffectedRows(pRequest, type, &req);
×
1224
  code = setAuditReqTableInfo(pRequest, type, &req);
×
1225
  if (code == TSDB_CODE_TSC_INVALID_OPERATION) {
×
1226
    return;
×
1227
  } else if (code != TSDB_CODE_SUCCESS) {
×
1228
    tscError("[report][%s] failed to set audit req table info, code:%d", nodesNodeName(type), code);
×
1229
    return;
×
1230
  }
1231
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
×
1232
  req.duration = duration / 1000000.0;  // convert to seconds
×
1233
  setAuditReqOperation(pRequest, type, &req);
×
1234

1235
  int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
×
1236
  void*   pReq = taosMemoryCalloc(1, tlen);
×
1237
  if (pReq == NULL) {
×
1238
    tscError("[report][%s] failed to allocate memory for req", nodesNodeName(type));
×
1239
    return;
×
1240
  }
1241

1242
  if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
×
1243
    tscError("[report][%s] failed to serialize req", nodesNodeName(type));
×
1244
    taosMemoryFree(pReq);
×
1245
    return;
×
1246
  }
1247

1248
  code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
×
1249
  if (code != 0) {
×
1250
    tscError("[report][%s] failed to send audit info, code:%d", nodesNodeName(type), code);
×
1251
    taosMemoryFree(pReq);
×
1252
    return;
×
1253
  }
1254
  tscDebug("[report][%s] data, sql:%s", nodesNodeName(type), req.pSql);
×
1255
}
1256

1257
void clientOperateReport(SRequestObj* pRequest) {
744,821,461✔
1258
  if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
744,821,461✔
1259
    tscDebug("[report] invalid request");
62,848,770✔
1260
    return;
62,850,157✔
1261
  }
1262
  ENodeType type = nodeType(pRequest->pQuery->pRoot);
681,974,583✔
1263
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_SELECT_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type ||
1264
      QUERY_NODE_SET_OPERATOR) {
1265
    reportSqlExecResult(pRequest, type);
681,970,005✔
1266
  }
1267
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc