• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4922

09 Jan 2026 08:13AM UTC coverage: 65.161% (-0.4%) from 65.541%
#4922

push

travis-ci

web-flow
merge: from main to 3.0 branch #34232

33 of 56 new or added lines in 8 files covered. (58.93%)

2171 existing lines in 120 files now uncovered.

197632 of 303297 relevant lines covered (65.16%)

117870313.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.33
/source/client/src/clientMonitor.c
1
#include "clientMonitor.h"
2
#include "cJSON.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5
#include "query.h"
6
#include "taoserror.h"
7
#include "tarray.h"
8
#include "tmisce.h"
9
#include "tqueue.h"
10
#include "ttime.h"
11
#include "ttimer.h"
12

13
SRWLatch    monitorLock;
14
void*       monitorTimer;
15
SHashObj*   monitorCounterHash;
16
int32_t     monitorFlag = 0;
17
int32_t     quitCnt = 0;
18
tsem2_t     monitorSem;
19
STaosQueue* monitorQueue;
20
SHashObj*   monitorSlowLogHash;
21
char        tmpSlowLogPath[PATH_MAX] = {0};
22
TdThread    monitorThread;
23
extern bool tsEnableAuditDelete;
24

25
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
1,514,200✔
26
  int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
1,514,200✔
27
  if (ret < 0) {
1,514,200✔
28
    tscError("failed to get tmp path ret:%d", ret);
×
29
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
30
  }
31
  return 0;
1,514,200✔
32
}
33

34
static void processFileInTheEnd(TdFilePtr pFile, char* path) {
×
35
  if (pFile == NULL) {
×
36
    return;
×
37
  }
38
  if (taosFtruncateFile(pFile, 0) != 0) {
×
39
    tscError("failed to truncate file:%s, terrno:%d", path, terrno);
×
40
    return;
×
41
  }
42
  if (taosUnLockFile(pFile) != 0) {
×
43
    tscError("failed to unlock file:%s, terrno:%d", path, terrno);
×
44
    return;
×
45
  }
46
  if (taosCloseFile(&(pFile)) != 0) {
×
47
    tscError("failed to close file:%s, terrno:%d", path, terrno);
×
48
    return;
×
49
  }
50
  if (taosRemoveFile(path) != 0) {
×
51
    tscError("failed to remove file:%s, terrno:%d", path, terrno);
×
52
    return;
×
53
  }
54
}
55

56
static void destroySlowLogClient(void* data) {
×
57
  if (data == NULL) {
×
58
    return;
×
59
  }
60
  SlowLogClient* slowLogClient = *(SlowLogClient**)data;
×
61
  processFileInTheEnd(slowLogClient->pFile, slowLogClient->path);
×
62
  taosMemoryFree(slowLogClient);
×
63
}
64

65
static void destroyMonitorClient(void* data) {
1,511,947✔
66
  if (data == NULL) {
1,511,947✔
67
    return;
×
68
  }
69
  MonitorClient* pMonitor = *(MonitorClient**)data;
1,511,947✔
70
  if (pMonitor == NULL) {
1,511,947✔
71
    return;
×
72
  }
73
  if (!taosTmrStopA(&pMonitor->timer)) {
1,511,947✔
74
    tscError("failed to stop timer, pMonitor:%p", pMonitor);
158,758✔
75
  }
76
  taosHashCleanup(pMonitor->counters);
1,511,947✔
77
  int ret = taos_collector_registry_destroy(pMonitor->registry);
1,511,947✔
78
  if (ret) {
1,511,947✔
79
    tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
×
80
  }
81
  taosMemoryFree(pMonitor);
1,511,947✔
82
}
83

84
static void monitorFreeSlowLogData(void* paras) {
1,511,795✔
85
  MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
1,511,795✔
86
  if (pData == NULL) {
1,511,795✔
87
    return;
99✔
88
  }
89
  taosMemoryFreeClear(pData->data);
1,511,696✔
90
  if (pData->type == SLOW_LOG_READ_BEGINNIG) {
1,511,696✔
91
    taosMemoryFree(pData->fileName);
1,511,696✔
92
  }
93
}
94

95
static void monitorFreeSlowLogDataEx(void* paras) {
99✔
96
  monitorFreeSlowLogData(paras);
99✔
97
  taosMemoryFree(paras);
99✔
98
}
99✔
99

100
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
3,762,382✔
101
  void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
3,762,382✔
102
  if (p == NULL) {
3,762,382✔
103
    tscError("failed to get app inst, clusterId:0x%" PRIx64, clusterId);
×
104
    return NULL;
×
105
  }
106
  return *(SAppInstInfo**)p;
3,762,382✔
107
}
108

109
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
99✔
110
  if (TSDB_CODE_SUCCESS != code) {
99✔
111
    tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
×
112
  }
113
  if (pMsg) {
99✔
114
    taosMemoryFree(pMsg->pData);
99✔
115
    taosMemoryFree(pMsg->pEpSet);
99✔
116
  }
117
  if (param != NULL) {
99✔
118
    MonitorSlowLogData* p = (MonitorSlowLogData*)param;
×
119
    if (code != 0) {
×
120
      tscError("failed to send slow log:%s, clusterId:0x%" PRIx64, p->data, p->clusterId);
×
121
    }
122
    MonitorSlowLogData tmp = {.clusterId = p->clusterId,
×
123
                              .type = p->type,
×
124
                              .fileName = p->fileName,
×
125
                              .pFile = p->pFile,
×
126
                              .offset = p->offset,
×
127
                              .data = NULL};
128
    if (monitorPutData2MonitorQueue(tmp) == 0) {
×
129
      p->fileName = NULL;
×
130
    } else {
131
      if (taosCloseFile(&(p->pFile)) != 0) {
×
132
        tscError("failed to close file:%p", p->pFile);
×
133
      }
134
    }
135
  }
136
  return TSDB_CODE_SUCCESS;
99✔
137
}
138

139
static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) {
99✔
140
  SStatisReq sStatisReq;
63✔
141
  sStatisReq.pCont = pCont;
99✔
142
  sStatisReq.contLen = strlen(pCont);
99✔
143
  sStatisReq.type = type;
99✔
144

145
  int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
99✔
146
  if (tlen < 0) {
99✔
147
    goto FAILED;
×
148
  }
149
  void* buf = taosMemoryMalloc(tlen);
99✔
150
  if (buf == NULL) {
99✔
151
    tscError("sendReport failed, out of memory, len:%d", tlen);
×
152
    goto FAILED;
×
153
  }
154
  tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
99✔
155
  if (tlen < 0) {
99✔
156
    taosMemoryFree(buf);
×
157
    goto FAILED;
×
158
  }
159

160
  SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
99✔
161
  if (pInfo == NULL) {
99✔
162
    tscError("sendReport failed, out of memory send info");
×
163
    taosMemoryFree(buf);
×
164
    goto FAILED;
×
165
  }
166
  pInfo->fp = monitorReportAsyncCB;
99✔
167
  pInfo->msgInfo.pData = buf;
99✔
168
  pInfo->msgInfo.len = tlen;
99✔
169
  pInfo->msgType = TDMT_MND_STATIS;
99✔
170
  pInfo->param = param;
99✔
171
  pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
99✔
172
  pInfo->requestId = tGenIdPI64();
99✔
173
  pInfo->requestObjRefId = 0;
99✔
174

175
  // int64_t transporterId = 0;
176
  return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
99✔
177

178
FAILED:
×
179
  if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
×
180
    tscError("failed to close file:%p", ((MonitorSlowLogData*)param)->pFile);
×
181
  }
182
  monitorFreeSlowLogDataEx(param);
×
183
  return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
×
184
}
185

186
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
738,739✔
187
  char ts[50] = {0};
738,739✔
188
  (void)snprintf(ts, sizeof(ts), "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
738,739✔
189
  char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
738,739✔
190
  if (NULL == pCont) {
738,739✔
191
    tscError("generateClusterReport failed, get null content. since %s", tstrerror(terrno));
738,640✔
192
    return;
738,640✔
193
  }
194

195
  if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
99✔
196
    int ret = taos_collector_registry_clear_batch(registry);
99✔
197
    if (ret) {
99✔
198
      tscError("failed to clear registry, ret:%d", ret);
×
199
    }
200
  }
201
  taosMemoryFreeClear(pCont);
99✔
202
}
203

204
static void reportSendProcess(void* param, void* tmrId) {
745,481✔
205
  taosRLockLatch(&monitorLock);
745,481✔
206
  if (atomic_load_32(&monitorFlag) == 1) {
745,481✔
207
    taosRUnLockLatch(&monitorLock);
6,742✔
208
    return;
6,742✔
209
  }
210

211
  MonitorClient* pMonitor = (MonitorClient*)param;
738,739✔
212
  SAppInstInfo*  pInst = getAppInstByClusterId(pMonitor->clusterId);
738,739✔
213
  if (pInst == NULL) {
738,739✔
214
    taosRUnLockLatch(&monitorLock);
×
215
    return;
×
216
  }
217

218
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
738,739✔
219
  generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
738,739✔
220
  bool reset = taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param,
738,739✔
221
                            monitorTimer, &tmrId);
222
  tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
738,739✔
223
  taosRUnLockLatch(&monitorLock);
738,739✔
224
}
225

226
static void sendAllCounter() {
1,513,109✔
227
  MonitorClient** ppMonitor = NULL;
1,513,109✔
228
  while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) {
1,513,109✔
229
    MonitorClient* pMonitor = *ppMonitor;
×
230
    if (pMonitor == NULL) {
×
231
      continue;
×
232
    }
233
    SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
×
234
    if (pInst == NULL) {
×
235
      taosHashCancelIterate(monitorSlowLogHash, ppMonitor);
×
236
      break;
×
237
    }
238
    SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
239
    generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
×
240
  }
241
}
1,513,109✔
242

243
void monitorCreateClient(int64_t clusterId) {
3,024,409✔
244
  MonitorClient* pMonitor = NULL;
3,024,409✔
245
  taosWLockLatch(&monitorLock);
3,024,512✔
246
  if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
3,024,512✔
247
    tscInfo("clusterId:0x%" PRIx64 ", create monitor", clusterId);
1,511,947✔
248
    pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
1,511,947✔
249
    if (pMonitor == NULL) {
1,511,947✔
250
      tscError("failed to create monitor client");
×
251
      goto fail;
×
252
    }
253
    pMonitor->clusterId = clusterId;
1,511,947✔
254
    char clusterKey[32] = {0};
1,511,947✔
255
    if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) {
1,511,947✔
256
      tscError("failed to create cluster key");
×
257
      goto fail;
×
258
    }
259
    pMonitor->registry = taos_collector_registry_new(clusterKey);
1,511,947✔
260
    if (pMonitor->registry == NULL) {
1,511,947✔
261
      tscError("failed to create registry");
×
262
      goto fail;
×
263
    }
264
    pMonitor->colector = taos_collector_new(clusterKey);
1,511,947✔
265
    if (pMonitor->colector == NULL) {
1,511,947✔
266
      tscError("failed to create collector");
×
267
      goto fail;
×
268
    }
269

270
    int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
1,511,947✔
271
    if (r) {
1,511,947✔
272
      tscError("failed to register collector, ret:%d", r);
×
273
      goto fail;
×
274
    }
275
    pMonitor->counters =
1,520,545✔
276
        (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,511,947✔
277
    if (pMonitor->counters == NULL) {
1,511,947✔
278
      tscError("failed to create monitor counters");
×
279
      goto fail;
×
280
    }
281

282
    if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
1,511,947✔
283
      tscError("failed to put monitor client to hash");
×
284
      goto fail;
×
285
    }
286

287
    SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
1,511,947✔
288
    if (pInst == NULL) {
1,511,947✔
289
      tscError("failed to get app instance by cluster id");
×
290
      pMonitor = NULL;
×
291
      goto fail;
×
292
    }
293
    pMonitor->timer = taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000,
1,511,947✔
294
                                   (void*)pMonitor, monitorTimer);
295
    if (pMonitor->timer == NULL) {
1,511,947✔
296
      tscError("failed to start timer");
×
297
      goto fail;
×
298
    }
299
    tscInfo("clusterId:0x%" PRIx64 ", create monitor finished, monitor:%p", clusterId, pMonitor);
1,511,947✔
300
  }
301
  taosWUnLockLatch(&monitorLock);
3,024,512✔
302

303
  return;
3,024,512✔
304

305
fail:
×
306
  destroyMonitorClient(&pMonitor);
×
307
  taosWUnLockLatch(&monitorLock);
×
308
}
309

310
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count,
3,024,512✔
311
                                const char** label_keys) {
312
  taosWLockLatch(&monitorLock);
3,024,512✔
313
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
3,024,512✔
314
  if (ppMonitor == NULL || *ppMonitor == NULL) {
3,024,512✔
315
    tscError("failed to get monitor client");
×
316
    goto end;
×
317
  }
318
  taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
3,024,512✔
319
  if (newCounter == NULL) return;
3,024,512✔
320
  MonitorClient* pMonitor = *ppMonitor;
3,024,512✔
321
  if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
3,024,512✔
322
    tscError("failed to add metric to collector");
618✔
323
    int r = taos_counter_destroy(newCounter);
618✔
324
    if (r) {
618✔
325
      tscError("failed to destroy counter, code:%d", r);
×
326
    }
327
    goto end;
618✔
328
  }
329
  if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
3,023,894✔
330
    tscError("failed to put counter to monitor");
×
331
    int r = taos_counter_destroy(newCounter);
×
332
    if (r) {
×
333
      tscError("failed to destroy counter, code:%d", r);
×
334
    }
335
    goto end;
×
336
  }
337
  tscInfo("clusterId:0x%" PRIx64 ", monitor:%p, create counter:%s %p", pMonitor->clusterId, pMonitor, name, newCounter);
3,023,894✔
338

339
end:
3,007,316✔
340
  taosWUnLockLatch(&monitorLock);
3,024,512✔
341
}
342

343
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
137,633✔
344
  taosWLockLatch(&monitorLock);
137,633✔
345
  if (atomic_load_32(&monitorFlag) == 1) {
137,633✔
346
    taosWUnLockLatch(&monitorLock);
6✔
347
    return;
6✔
348
  }
349

350
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
137,627✔
351
  if (ppMonitor == NULL || *ppMonitor == NULL) {
137,627✔
352
    tscError("clusterId:0x%" PRIx64 ", monitor not found", clusterId);
×
353
    goto end;
×
354
  }
355

356
  MonitorClient*   pMonitor = *ppMonitor;
137,627✔
357
  taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
137,627✔
358
  if (ppCounter == NULL || *ppCounter == NULL) {
137,627✔
359
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s not found", clusterId, pMonitor, counterName);
×
360
    goto end;
×
361
  }
362
  if (taos_counter_inc(*ppCounter, label_values) != 0) {
137,627✔
363
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s inc failed", clusterId, pMonitor, counterName);
×
364
    goto end;
×
365
  }
366
  tscTrace("clusterId:0x%" PRIx64 ", monitor:%p, counter:%s inc", pMonitor->clusterId, pMonitor, counterName);
137,627✔
367

368
end:
137,627✔
369
  taosWUnLockLatch(&monitorLock);
137,627✔
370
}
371

372
const char* monitorResultStr(SQL_RESULT_CODE code) {
137,633✔
373
  static const char* result_state[] = {"Success", "Failed", "Cancel"};
374
  return result_state[code];
137,633✔
375
}
376

377
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
×
378
  TdFilePtr pFile = NULL;
×
379
  void*     tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
×
380
  if (tmp == NULL) {
×
381
    char path[PATH_MAX] = {0};
×
382
    char clusterId[32] = {0};
×
383
    if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0) {
×
384
      tscError("failed to generate clusterId:0x%" PRIx64, slowLogData->clusterId);
×
385
      return;
×
386
    }
387
    taosGetTmpfilePath(tmpPath, clusterId, path);
×
388
    tscInfo("monitor create slow log file:%s", path);
×
389
    pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
390
    if (pFile == NULL) {
×
391
      tscError("failed to open file:%s since %d", path, terrno);
×
392
      return;
×
393
    }
394

395
    SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
×
396
    if (pClient == NULL) {
×
397
      tscError("failed to allocate memory for slow log client");
×
398
      int32_t ret = taosCloseFile(&pFile);
×
399
      if (ret != 0) {
×
400
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
401
      }
402
      return;
×
403
    }
404
    pClient->lastCheckTime = taosGetMonoTimestampMs();
×
405
    tstrncpy(pClient->path, path, PATH_MAX);
×
406
    pClient->offset = 0;
×
407
    pClient->pFile = pFile;
×
408
    if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
×
409
      tscError("failed to put clusterId:0x%" PRIx64 " to hash table", slowLogData->clusterId);
×
410
      int32_t ret = taosCloseFile(&pFile);
×
411
      if (ret != 0) {
×
412
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
413
      }
414
      taosMemoryFree(pClient);
×
415
      return;
×
416
    }
417

418
    if (taosLockFile(pFile) < 0) {
×
419
      tscError("failed to lock file:%p since %s", pFile, terrstr());
×
420
      return;
×
421
    }
422
  } else {
423
    pFile = (*(SlowLogClient**)tmp)->pFile;
×
424
  }
425

426
  if (taosLSeekFile(pFile, 0, SEEK_END) < 0) {
×
427
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
428
    return;
×
429
  }
430
  if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) {
×
431
    tscError("failed to write len to file:%p since %s", pFile, terrstr());
×
432
  }
433
  tscDebug("monitor write slow log to file:%p, clusterId:0x%" PRIx64, pFile, slowLogData->clusterId);
×
434
}
435

436
static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
×
437
  tscDebug("monitor readFile slow begin pFile:%p, offset:%" PRId64 ", size:%" PRId64, pFile, *offset, size);
×
438
  if (taosLSeekFile(pFile, *offset, SEEK_SET) < 0) {
×
439
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
440
    return NULL;
×
441
  }
442

443
  if ((size <= *offset)) {
×
444
    tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
×
445
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
446
    return NULL;
×
447
  }
448
  char*   pCont = NULL;
×
449
  int64_t totalSize = 0;
×
450
  if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
×
451
    totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX;
×
452
  } else {
453
    totalSize = 4 + (size - *offset);
×
454
  }
455

456
  pCont = taosMemoryCalloc(1, totalSize);  // 4 reserved for []
×
457
  if (pCont == NULL) {
×
458
    tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
×
459
    return NULL;
×
460
  }
461
  char* buf = pCont;
×
462
  (void)strncat(buf++, "[", totalSize - 1);
×
NEW
463
  int64_t readSize = taosReadFile(pFile, buf, totalSize - 4);  // 4 reserved for []
×
464
  if (readSize <= 0) {
×
465
    if (readSize < 0) {
×
466
      tscError("failed to read len from file:%p since %s", pFile, terrstr());
×
467
    }
468
    taosMemoryFree(pCont);
×
469
    return NULL;
×
470
  }
471

472
  totalSize = 0;
×
473
  while (1) {
×
474
    size_t len = strlen(buf);
×
NEW
475
    if (len == SLOW_LOG_SEND_SIZE_MAX) {  // one item is too long
×
NEW
476
      *offset = size;
×
NEW
477
      *buf = ']';
×
NEW
478
      *(buf + 1) = '\0';
×
NEW
479
      break;
×
480
    }
481

482
    totalSize += (len + 1);
×
NEW
483
    if (totalSize > readSize) {
×
484
      *(buf - 1) = ']';
×
485
      *buf = '\0';
×
486
      break;
×
487
    }
488

NEW
489
    if (len == 0) {             // one item is empty
×
NEW
490
      if (*(buf - 1) == '[') {  // data is "\0"
×
491
        // no data read
NEW
492
        *buf = ']';
×
NEW
493
        *(buf + 1) = '\0';
×
494
      } else {  // data is "ass\0\0"
NEW
495
        *(buf - 1) = ']';
×
NEW
496
        *buf = '\0';
×
497
      }
NEW
498
      *offset += 1;
×
NEW
499
      break;
×
500
    }
501
    buf[len] = ',';  // replace '\0' with ','
×
502
    buf += (len + 1);
×
503
    *offset += (len + 1);
×
504
  }
505

506
  tscDebug("monitor readFile slow log end, data:%s, offset:%" PRId64, pCont, *offset);
×
507
  return pCont;
×
508
}
509

510
static int64_t getFileSize(char* path) {
×
511
  int64_t fileSize = 0;
×
512
  if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
×
513
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
514
  }
515

516
  return fileSize;
×
517
}
518

519
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type,
×
520
                           char* fileName, void* pTransporter, SEpSet* epSet) {
521
  if (data == NULL) {
×
522
    if (taosCloseFile(&pFile) != 0) {
×
523
      tscError("failed to close file:%p", pFile);
×
524
    }
525
    taosMemoryFree(fileName);
×
526
    return TSDB_CODE_INVALID_PARA;
×
527
  }
528
  MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
×
529
  if (pParam == NULL) {
×
530
    if (taosCloseFile(&pFile) != 0) {
×
531
      tscError("failed to close file:%p", pFile);
×
532
    }
533
    taosMemoryFree(data);
×
534
    taosMemoryFree(fileName);
×
535
    return terrno;
×
536
  }
537
  pParam->data = data;
×
538
  pParam->offset = offset;
×
539
  pParam->clusterId = clusterId;
×
540
  pParam->type = type;
×
541
  pParam->pFile = pFile;
×
542
  pParam->fileName = fileName;
×
543
  return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
×
544
}
545

546
static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size,
×
547
                               SLOW_LOG_QUEUE_TYPE type, char* fileName) {
548
  SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
×
549
  if (pInst == NULL) {
×
550
    tscError("failed to get app instance by clusterId:0x%" PRIx64, clusterId);
×
551
    if (taosCloseFile(&pFile) != 0) {
×
552
      tscError("failed to close file:%p", pFile);
×
553
    }
554
    taosMemoryFree(fileName);
×
555
    return terrno;
×
556
  }
557
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
558
  char*  data = readFile(pFile, offset, size);
×
559
  if (data == NULL) return terrno;
×
560
  return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
×
561
                     pInst->pTransporter, &ep);
562
}
563

564
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
×
565
  if (fileName == NULL) {
×
566
    return;
×
567
  }
568
  int64_t size = getFileSize(*fileName);
×
569
  if (size <= offset) {
×
570
    processFileInTheEnd(pFile, *fileName);
×
571
    tscDebug("monitor delete file:%s", *fileName);
×
572
  } else {
573
    int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
×
574
    if (code == 0) {
×
575
      tscDebug("monitor send slow log succ, clusterId:0x%" PRIx64, clusterId);
×
576
    } else {
NEW
577
      tscError("monitor send slow log failed, clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
×
578
    }
579
    *fileName = NULL;
×
580
  }
581
}
582

583
static void monitorSendSlowLogAtRunning(int64_t clusterId) {
×
584
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
585
  if (tmp == NULL) {
×
586
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
587
    return;
×
588
  }
589
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
590
  if (pClient == NULL) {
×
591
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
592
    return;
×
593
  }
594
  int64_t size = getFileSize(pClient->path);
×
595
  if (size <= pClient->offset) {
×
596
    if (taosFtruncateFile(pClient->pFile, 0) < 0) {
×
597
      tscError("failed to truncate file:%p code:%d", pClient->pFile, terrno);
×
598
    }
599
    tscDebug("monitor truncate file to 0 file:%p", pClient->pFile);
×
600
    pClient->offset = 0;
×
601
  } else {
602
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
×
603
    tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
×
604
  }
605
}
606

607
static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
×
608
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
609
  if (tmp == NULL) {
×
610
    return true;
×
611
  }
612
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
613
  if (pClient == NULL) {
×
614
    return true;
×
615
  }
616
  int64_t size = getFileSize(pClient->path);
×
617
  if (size <= pClient->offset) {
×
618
    processFileInTheEnd(pClient->pFile, pClient->path);
×
619
    pClient->pFile = NULL;
×
620
    tscInfo("monitor remove file:%s", pClient->path);
×
621
    if ((--quitCnt) == 0) {
×
622
      return true;
×
623
    }
624
  } else {
625
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
×
626
    tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
×
627
  }
628
  return false;
×
629
}
630
static void monitorSendAllSlowLogAtQuit() {
1,513,068✔
631
  void* pIter = NULL;
1,513,068✔
632
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
1,513,068✔
633
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
×
634
    if (pClient == NULL) {
×
635
      continue;
×
636
    }
637
    int64_t size = getFileSize(pClient->path);
×
638
    if (size <= pClient->offset) {
×
639
      processFileInTheEnd(pClient->pFile, pClient->path);
×
640
      pClient->pFile = NULL;
×
641
    } else if (pClient->offset == 0) {
×
642
      int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
×
643
      int32_t  code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
×
644
      tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
×
645
      if (code == 0) {
×
646
        quitCnt++;
×
647
      }
648
    }
649
  }
650
}
1,513,068✔
651

652
static void processFileRemoved(SlowLogClient* pClient) {
×
653
  if (taosUnLockFile(pClient->pFile) != 0) {
×
654
    tscError("failed to unlock file:%s since %d", pClient->path, terrno);
×
655
    return;
×
656
  }
657
  int32_t ret = taosCloseFile(&(pClient->pFile));
×
658
  if (ret != 0) {
×
659
    tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
×
660
    return;
×
661
  }
662

663
  TdFilePtr pFile =
664
      taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
665
  if (pFile == NULL) {
×
666
    tscError("failed to open file:%s since %d", pClient->path, terrno);
×
667
  } else {
668
    pClient->pFile = pFile;
×
669
  }
670
}
671

672
static void monitorSendAllSlowLog() {
278,358,758✔
673
  int64_t t = taosGetMonoTimestampMs();
278,358,758✔
674
  void*   pIter = NULL;
278,358,758✔
675
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
278,358,758✔
676
    int64_t*       clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
×
677
    SAppInstInfo*  pInst = getAppInstByClusterId(*clusterId);
×
678
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
×
679
    if (pClient == NULL || pInst == NULL) {
×
680
      taosHashCancelIterate(monitorSlowLogHash, pIter);
×
681
      return;
×
682
    }
683
    if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) {
×
684
      pClient->lastCheckTime = t;
×
685
    } else {
686
      continue;
×
687
    }
688

689
    if (pClient->offset == 0) {
×
690
      int64_t size = getFileSize(pClient->path);
×
691
      if (size <= 0) {
×
692
        if (size < 0) {
×
693
          tscError("monitor failed to get file size:%s, err:%d", pClient->path, ERRNO);
×
694
          if (ERRNO == ENOENT) {
×
695
            processFileRemoved(pClient);
×
696
          }
697
        }
698
        continue;
×
699
      }
700
      int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
×
701
      tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
×
702
    }
703
  }
704
}
705

706
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
1,511,696✔
707
  SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
1,511,696✔
708

709
  if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
1,511,696✔
710
    tscInfo("monitor is disabled, skip send slow log");
1,510,564✔
711
    return;
1,510,564✔
712
  }
713
  char namePrefix[PATH_MAX] = {0};
1,132✔
714
  if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
1,132✔
715
    tscError("failed to generate slow log file name prefix");
×
716
    return;
×
717
  }
718

719
  char tmpPath[PATH_MAX] = {0};
1,132✔
720
  if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
1,132✔
721
    return;
×
722
  }
723

724
  TdDirPtr pDir = taosOpenDir(tmpPath);
1,132✔
725
  if (pDir == NULL) {
1,132✔
726
    return;
×
727
  }
728

729
  TdDirEntryPtr de = NULL;
1,132✔
730
  while ((de = taosReadDir(pDir)) != NULL) {
3,396✔
731
    if (taosDirEntryIsDir(de)) {
2,264✔
732
      continue;
2,264✔
733
    }
734

735
    char* name = taosGetDirEntryName(de);
×
736
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) {
×
737
      tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId);
×
738
      continue;
×
739
    }
740

741
    char filename[PATH_MAX] = {0};
×
742
    (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
×
743
    TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
×
744
    if (pFile == NULL) {
×
745
      tscError("failed to open file:%s since %s", filename, terrstr());
×
746
      continue;
×
747
    }
748
    if (taosLockFile(pFile) < 0) {
×
749
      tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
×
750
      int32_t ret = taosCloseFile(&pFile);
×
751
      if (ret != 0) {
×
752
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
753
      }
754
      continue;
×
755
    }
756
    char* tmp = taosStrdup(filename);
×
757
    if (tmp == NULL) {
×
758
      tscError("failed to dup string:%s since %s", filename, terrstr());
×
759
      if (taosUnLockFile(pFile) != 0) {
×
760
        tscError("failed to unlock file:%s, terrno:%d", filename, terrno);
×
761
      }
762
      if (taosCloseFile(&(pFile)) != 0) {
×
763
        tscError("failed to close file:%s, terrno:%d", filename, terrno);
×
764
      }
765
      continue;
×
766
    }
767
    monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
×
768
    taosMemoryFree(tmp);
×
769
  }
770

771
  int32_t ret = taosCloseDir(&pDir);
1,132✔
772
  if (ret != 0) {
1,132✔
773
    tscError("failed to close dir, ret:%d", ret);
×
774
  }
775
}
776

777
static void* monitorThreadFunc(void* param) {
1,513,068✔
778
  setThreadName("client-monitor-slowlog");
1,513,068✔
779
  tscInfo("monitor update thread started");
1,513,068✔
780
  int64_t quitTime = 0;
1,513,068✔
781
  while (1) {
278,358,758✔
782
    if (atomic_load_32(&monitorFlag) == 1) {
279,871,826✔
783
      if (quitCnt == 0) {
1,513,068✔
784
        monitorSendAllSlowLogAtQuit();
1,513,068✔
785
        if (quitCnt == 0) {
1,513,068✔
786
          tscInfo("monitorThreadFunc quit since no slow log to send");
1,513,068✔
787
          break;
1,513,068✔
788
        }
789
        quitTime = taosGetMonoTimestampMs();
×
790
      }
791
      if (taosGetMonoTimestampMs() - quitTime > 500) {  // quit at most 500ms
×
792
        tscInfo("monitorThreadFunc quit since timeout");
×
793
        break;
×
794
      }
795
    }
796

797
    MonitorSlowLogData* slowLogData = NULL;
278,358,758✔
798
    taosReadQitem(monitorQueue, (void**)&slowLogData);
278,358,758✔
799
    if (slowLogData != NULL) {
278,358,758✔
800
      if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
1,511,696✔
801
        if (slowLogData->pFile != NULL) {
1,511,696✔
802
          monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile,
×
803
                                        slowLogData->offset);
×
804
        } else {
805
          monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
1,511,696✔
806
        }
807
      } else if (slowLogData->type == SLOW_LOG_WRITE) {
×
808
        monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
×
809
      } else if (slowLogData->type == SLOW_LOG_READ_RUNNING) {
×
810
        monitorSendSlowLogAtRunning(slowLogData->clusterId);
×
811
      } else if (slowLogData->type == SLOW_LOG_READ_QUIT) {
×
812
        if (monitorSendSlowLogAtQuit(slowLogData->clusterId)) {
×
813
          tscInfo("monitorThreadFunc quit since all slow log sended");
×
814
          monitorFreeSlowLogData(slowLogData);
×
815
          taosFreeQitem(slowLogData);
×
816
          break;
×
817
        }
818
      }
819
      monitorFreeSlowLogData(slowLogData);
1,511,696✔
820
      taosFreeQitem(slowLogData);
1,511,696✔
821
    }
822

823
    if (quitCnt == 0) {
278,358,758✔
824
      monitorSendAllSlowLog();
278,358,758✔
825
    }
826
    (void)tsem2_timewait(&monitorSem, 100);
278,358,758✔
827
  }
828
  return NULL;
1,513,068✔
829
}
830

831
static int32_t tscMonitortInit() {
1,513,068✔
832
  TdThreadAttr thAttr;
1,504,308✔
833
  if (taosThreadAttrInit(&thAttr) != 0) {
1,513,068✔
834
    tscError("failed to init thread attr since %s", strerror(ERRNO));
×
835
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
836
  }
837
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
1,513,068✔
838
    tscError("failed to set thread attr since %s", strerror(ERRNO));
×
839
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
840
  }
841

842
  if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
1,513,068✔
843
    tscError("failed to create monitor thread since %s", strerror(ERRNO));
×
844
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
845
  }
846

847
  (void)taosThreadAttrDestroy(&thAttr);
1,513,068✔
848
  return 0;
1,513,068✔
849
}
850

851
static void tscMonitorStop() {
1,513,109✔
852
  if (taosCheckPthreadValid(monitorThread)) {
1,513,109✔
853
    (void)taosThreadJoin(monitorThread, NULL);
1,513,068✔
854
    taosThreadClear(&monitorThread);
1,513,068✔
855
  }
856
}
1,513,109✔
857

858
int32_t monitorInit() {
1,513,068✔
859
  int32_t code = 0;
1,513,068✔
860

861
  tscInfo("monitor init");
1,513,068✔
862
  monitorCounterHash =
1,513,068✔
863
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,513,068✔
864
  if (monitorCounterHash == NULL) {
1,513,068✔
865
    tscError("failed to create monitorCounterHash");
×
866
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
867
  }
868
  taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
1,513,068✔
869

870
  monitorSlowLogHash =
1,513,068✔
871
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,513,068✔
872
  if (monitorSlowLogHash == NULL) {
1,513,068✔
873
    tscError("failed to create monitorSlowLogHash");
×
874
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
875
  }
876
  taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
1,513,068✔
877

878
  monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
1,513,068✔
879
  if (monitorTimer == NULL) {
1,513,068✔
880
    tscError("failed to create monitor timer");
×
881
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
882
  }
883

884
  code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
1,513,068✔
885
  if (code != 0) {
1,513,068✔
886
    return code;
×
887
  }
888

889
  code = taosMulModeMkDir(tmpSlowLogPath, 0777, true);
1,513,068✔
890
  if (code != 0) {
1,513,068✔
891
    tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
×
892
    return code;
×
893
  }
894

895
  if (tsem2_init(&monitorSem, 0, 0) != 0) {
1,513,068✔
896
    tscError("sem init error since %s", terrstr());
×
897
    return TAOS_SYSTEM_ERROR(ERRNO);
×
898
  }
899

900
  code = taosOpenQueue(&monitorQueue);
1,513,068✔
901
  if (code) {
1,513,068✔
902
    tscError("open queue error since %s", terrstr());
×
903
    return TAOS_GET_TERRNO(code);
×
904
  }
905

906
  taosInitRWLatch(&monitorLock);
1,513,068✔
907
  return tscMonitortInit();
1,513,068✔
908
}
909

910
void monitorClose() {
1,513,109✔
911
  tscInfo("monitor close");
1,513,109✔
912
  taosWLockLatch(&monitorLock);
1,513,109✔
913
  atomic_store_32(&monitorFlag, 1);
1,513,109✔
914
  tscMonitorStop();
1,513,109✔
915
  sendAllCounter();
1,513,109✔
916
  taosHashCleanup(monitorCounterHash);
1,513,109✔
917
  taosHashCleanup(monitorSlowLogHash);
1,513,109✔
918
  taosTmrCleanUp(monitorTimer);
1,513,109✔
919
  taosCloseQueue(monitorQueue);
1,513,109✔
920
  if (tsem2_destroy(&monitorSem) != 0) {
1,513,109✔
921
    tscError("failed to destroy semaphore");
×
922
  }
923
  taosWUnLockLatch(&monitorLock);
1,513,109✔
924
}
1,513,109✔
925

926
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
1,512,256✔
927
  int32_t             code = 0;
1,512,256✔
928
  MonitorSlowLogData* slowLogData = NULL;
1,512,256✔
929

930
  if (atomic_load_32(&monitorFlag) == 1) {
1,512,256✔
931
    tscError("monitor slow log thread is exiting");
×
932
    return -1;
×
933
  }
934

935
  code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
1,512,256✔
936
  if (code) {
1,512,256✔
937
    tscError("monitor failed to allocate slow log data");
×
938
    return code;
×
939
  }
940
  *slowLogData = data;
1,512,256✔
941
  tscDebug("monitor write slow log to queue, clusterId:0x%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
1,512,256✔
942
           queueTypeStr[slowLogData->type], slowLogData->data);
943
  if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
1,512,256✔
944
    if (tsem2_post(&monitorSem) != 0) {
1,512,256✔
945
      tscError("failed to post semaphore");
×
946
    }
947
  } else {
948
    if (taosCloseFile(&(slowLogData->pFile)) != 0) {
×
949
      tscError("failed to close file:%p", slowLogData->pFile);
×
950
    }
951
    monitorFreeSlowLogData(slowLogData);
×
952
    taosFreeQitem(slowLogData);
×
953
  }
954
  return 0;
1,512,256✔
955
}
956

957
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
×
958
  taosMemoryFree(pMsg->pData);
×
959
  taosMemoryFree(pMsg->pEpSet);
×
960
  tscDebug("[del report]delete reportCB code:%d", code);
×
961
  return 0;
×
962
}
963

964
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
×
965
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
966
  if (sendInfo == NULL) {
×
967
    tscError("[del report] failed to allocate memory for sendInfo");
×
968
    return terrno;
×
969
  }
970

971
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
×
972

973
  sendInfo->requestId = requestId;
×
974
  sendInfo->requestObjRefId = 0;
×
975
  sendInfo->param = NULL;
×
976
  sendInfo->fp = reportCB;
×
977
  sendInfo->msgType = TDMT_MND_AUDIT;
×
978

979
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
980

981
  int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
982
  if (code != 0) {
×
983
    tscError("[del report] failed to send msg to server, code:%d", code);
×
984
    taosMemoryFree(sendInfo);
×
985
    return code;
×
986
  }
987
  return TSDB_CODE_SUCCESS;
×
988
}
989

990
static int32_t setDeleteStmtAuditReqTableInfo(SDeleteStmt* pStmt, SAuditReq* pReq) {
×
991
  if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
×
992
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->pFromTable->type));
×
993
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
994
  }
995
  SRealTableNode* pTableNode = (SRealTableNode*)pStmt->pFromTable;
×
996
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTableNode->table.tableName));
×
997
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTableNode->table.dbName));
×
998
  return TSDB_CODE_SUCCESS;
×
999
}
1000

1001
static int32_t setModifyStmtAuditReqTableInfo(SVnodeModifyOpStmt* pStmt, SAuditReq* pReq) {
×
1002
  if (pStmt->insertType != TSDB_QUERY_TYPE_INSERT && pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT &&
×
1003
      pStmt->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
×
1004
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->sqlNodeType));
×
1005
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
1006
  }
1007

1008
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pStmt->targetTableName.tname));
×
1009
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pStmt->targetTableName.dbname));
×
1010
  return TSDB_CODE_SUCCESS;
×
1011
}
1012

1013
typedef struct SAuditTableListInfo {
1014
  SArray* dbList;
1015
  SArray* tableList;
1016
} SAuditTableListInfo;
1017

1018
static int32_t initAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
1019
  if (pInfo->dbList) return TSDB_CODE_SUCCESS;
×
1020

1021
  pInfo->dbList = taosArrayInit(4, TSDB_DB_FNAME_LEN);
×
1022
  if (pInfo->dbList == NULL) {
×
1023
    tscError("[report] failed to create db list array");
×
1024
    return terrno;
×
1025
  }
1026

1027
  pInfo->tableList = taosArrayInit(4, TSDB_TABLE_NAME_LEN);
×
1028
  if (pInfo->tableList == NULL) {
×
1029
    tscError("[report] failed to create table list array");
×
1030
    taosArrayDestroy(pInfo->dbList);
×
1031
    return terrno;
×
1032
  }
1033
  return TSDB_CODE_SUCCESS;
×
1034
}
1035

1036
static void destroyAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
1037
  if (pInfo->dbList) {
×
1038
    taosArrayDestroy(pInfo->dbList);
×
1039
    pInfo->dbList = NULL;
×
1040
  }
1041
  if (pInfo->tableList) {
×
1042
    taosArrayDestroy(pInfo->tableList);
×
1043
    pInfo->tableList = NULL;
×
1044
  }
1045
}
×
1046

1047
static void copyTableInfoToAuditReq(SAuditTableListInfo* pTbListInfo, SAuditReq* pReq) {
×
1048
  if (pTbListInfo->dbList->size > 0) {
×
1049
    char* dbName = (char*)taosArrayGet(pTbListInfo->dbList, 0);
×
1050
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", dbName));
×
1051
  }
1052
  if (pTbListInfo->tableList->size > 0) {
×
1053
    char* tableName = (char*)taosArrayGet(pTbListInfo->tableList, 0);
×
1054
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", tableName));
×
1055
  }
1056
}
×
1057

1058
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo);
1059
static int32_t doSetOperatorTableInfo(SSetOperator* pSetOperator, SAuditTableListInfo* pTbListInfo) {
×
1060
  int32_t code = TSDB_CODE_SUCCESS;
×
1061
  if (pSetOperator->pLeft) {
×
1062
    SSelectStmt* pLeftSelect = (SSelectStmt*)pSetOperator->pLeft;
×
1063
    if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SELECT_STMT) {
×
1064
      code = doSetSelectStmtAuditReqTableInfo(pLeftSelect->pFromTable, NULL, pTbListInfo);
×
1065
    } else if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SET_OPERATOR) {
×
1066
      SSetOperator* pLeftSetOperator = (SSetOperator*)pSetOperator->pLeft;
×
1067
      code = doSetOperatorTableInfo(pLeftSetOperator, pTbListInfo);
×
1068
      TAOS_RETURN(code);
×
1069
    }
1070
  }
1071
  if (pSetOperator->pRight) {
×
1072
    SSelectStmt* pRightSelect = (SSelectStmt*)pSetOperator->pRight;
×
1073
    if (nodeType(pSetOperator->pRight) == QUERY_NODE_SELECT_STMT) {
×
1074
      code = doSetSelectStmtAuditReqTableInfo(pRightSelect->pFromTable, NULL, pTbListInfo);
×
1075
      TAOS_RETURN(code);
×
1076
    } else if (nodeType(pSetOperator->pRight) == QUERY_NODE_SET_OPERATOR) {
×
1077
      SSetOperator* pRightSetOperator = (SSetOperator*)pSetOperator->pRight;
×
1078
      code = doSetOperatorTableInfo(pRightSetOperator, pTbListInfo);
×
1079
      TAOS_RETURN(code);
×
1080
    }
1081
  }
1082
  return code;
×
1083
}
1084

1085
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo) {
×
1086
  int32_t     code = TSDB_CODE_SUCCESS;
×
1087
  int32_t     lino = 0;
×
1088
  STableNode* pTable = NULL;
×
1089

1090
  if (!pFromTable) return TSDB_CODE_TSC_INVALID_OPERATION;
×
1091

1092
  if (nodeType(pFromTable) == QUERY_NODE_REAL_TABLE) {
×
1093
    SRealTableNode* pTableNode = (SRealTableNode*)pFromTable;
×
1094
    pTable = &pTableNode->table;
×
1095

1096
  } else if (nodeType(pFromTable) == QUERY_NODE_TEMP_TABLE && ((STempTableNode*)pFromTable)->pSubquery) {
×
1097
    if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SELECT_STMT) {
×
1098
      SSelectStmt* pSubquery = (SSelectStmt*)((STempTableNode*)pFromTable)->pSubquery;
×
1099
      return doSetSelectStmtAuditReqTableInfo(pSubquery->pFromTable, pReq, pTbListInfo);
×
1100

1101
    } else if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SET_OPERATOR) {
×
1102
      code = initAuditTableListInfo(pTbListInfo);
×
1103
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1104

1105
      SSetOperator* pSetOperator = (SSetOperator*)((STempTableNode*)pFromTable)->pSubquery;
×
1106
      code = doSetOperatorTableInfo(pSetOperator, pTbListInfo);
×
1107
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1108
    }
1109
  } else if (nodeType(pFromTable) == QUERY_NODE_VIRTUAL_TABLE && pFromTable) {
×
1110
    SVirtualTableNode* pVtable = (SVirtualTableNode*)pFromTable;
×
1111
    pTable = &pVtable->table;
×
1112

1113
  } else if (nodeType(pFromTable) == QUERY_NODE_JOIN_TABLE) {
×
1114
    code = initAuditTableListInfo(pTbListInfo);
×
1115
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1116

1117
    SJoinTableNode* pJoinTable = (SJoinTableNode*)pFromTable;
×
1118
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pLeft, NULL, pTbListInfo);
×
1119
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1120
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pRight, NULL, pTbListInfo);
×
1121
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1122
  }
1123
  if (pTbListInfo->dbList == NULL && pTable && pReq) {
×
1124
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTable->tableName));
×
1125
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTable->dbName));
×
1126
  } else if (pTbListInfo->dbList != NULL && pTable) {
×
1127
    void* tmp = taosArrayPush(pTbListInfo->dbList, pTable->dbName);
×
1128
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1129
    tmp = taosArrayPush(pTbListInfo->tableList, pTable->tableName);
×
1130
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1131
  }
1132

1133
_exit:
×
1134
  if (code != TSDB_CODE_SUCCESS) {
×
1135
    tscError("[report] failed to set select stmt audit req table info, code:%d, lino:%d", code, lino);
×
1136
  }
1137
  return code;
×
1138
}
1139

1140
static int32_t setSelectStmtAuditReqTableInfo(SSelectStmt* pStmt, SAuditReq* pReq) {
×
1141
  int32_t             code = TSDB_CODE_SUCCESS;
×
1142
  SAuditTableListInfo tableListInfo = {0};
×
1143

1144
  code = doSetSelectStmtAuditReqTableInfo(pStmt->pFromTable, pReq, &tableListInfo);
×
1145
  if (code == TSDB_CODE_SUCCESS && tableListInfo.dbList) {
×
1146
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1147
    destroyAuditTableListInfo(&tableListInfo);
×
1148
  }
1149
  return code;
×
1150
}
1151

1152
static int32_t setOperatorTableInfo(SSetOperator* pSetOperator, SAuditReq* pReq) {
×
1153
  int32_t             code = TSDB_CODE_SUCCESS;
×
1154
  SAuditTableListInfo tableListInfo = {0};
×
1155
  code = initAuditTableListInfo(&tableListInfo);
×
1156
  if (code != TSDB_CODE_SUCCESS) {
×
1157
    return code;
×
1158
  }
1159
  code = doSetOperatorTableInfo(pSetOperator, &tableListInfo);
×
1160
  if (code == TSDB_CODE_SUCCESS) {
×
1161
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1162
  }
1163
  destroyAuditTableListInfo(&tableListInfo);
×
1164
  return code;
×
1165
}
1166

1167
static int32_t setAuditReqTableInfo(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
1168
  int32_t code = TSDB_CODE_SUCCESS;
×
1169

1170
  if (QUERY_NODE_DELETE_STMT == type) {
×
1171
    SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
×
1172
    return setDeleteStmtAuditReqTableInfo(pStmt, pReq);
×
1173
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1174
    return setModifyStmtAuditReqTableInfo((SVnodeModifyOpStmt*)pRequest->pQuery->pRoot, pReq);
×
1175
  } else if (QUERY_NODE_SELECT_STMT == type) {
×
1176
    return setSelectStmtAuditReqTableInfo((SSelectStmt*)pRequest->pQuery->pRoot, pReq);
×
1177
  } else if (QUERY_NODE_SET_OPERATOR == type) {
×
1178
    return setOperatorTableInfo((SSetOperator*)pRequest->pQuery->pRoot, pReq);
×
1179
  }
1180
  tscError("[report]unsupported report type: %s", nodesNodeName(type));
×
1181
  return code;
×
1182
}
1183

1184
static void setAuditReqAffectedRows(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
1185
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1186
    pReq->affectedRows = pRequest->body.resInfo.numOfRows;
×
1187
  } else if (QUERY_NODE_SELECT_STMT == type || QUERY_NODE_SET_OPERATOR == type) {
×
1188
    pReq->affectedRows = 0;
×
1189
  }
1190
}
×
1191

1192
static void setAuditReqOperation(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
×
1193
  if (QUERY_NODE_DELETE_STMT == type) {
×
1194
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "delete"));
×
1195
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
×
1196
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "insert"));
×
1197
  } else if (QUERY_NODE_SELECT_STMT == type) {
×
1198
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "select"));
×
1199
  }
1200
}
×
1201

1202
static bool needSendReport(SAppInstServerCFG* pCfg, ENodeType type) {
701,956,455✔
1203
  if (pCfg->auditLevel < AUDIT_LEVEL_DATA) {
701,956,455✔
1204
    return false;
701,948,861✔
1205
  }
1206
  if (type == QUERY_NODE_SELECT_STMT) {
10✔
1207
    return pCfg->enableAuditSelect != 0;
×
1208
  } else if (type == QUERY_NODE_DELETE_STMT) {
10✔
1209
    return pCfg->enableAuditDelete != 0;
×
1210
  } else if (type == QUERY_NODE_VNODE_MODIFY_STMT) {
10✔
1211
    return pCfg->enableAuditInsert != 0;
×
1212
  } else if (type == QUERY_NODE_SET_OPERATOR) {
10✔
1213
    return pCfg->enableAuditSelect != 0;
×
1214
  }
1215
  tscError("[report] unsupported report type: %s", nodesNodeName(type));
10✔
1216

1217
  return false;
×
1218
}
1219

1220
static void reportSqlExecResult(SRequestObj* pRequest, ENodeType type) {
714,843,451✔
1221
  int32_t  code = TSDB_CODE_SUCCESS;
714,843,451✔
1222
  STscObj* pTscObj = pRequest->pTscObj;
714,843,451✔
1223

1224
  if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
714,847,043✔
1225
    tscError("[report][%s] invalid tsc obj", nodesNodeName(type));
10,261✔
1226
    return;
2,954,610✔
1227
  }
1228
  if (pRequest->code != TSDB_CODE_SUCCESS) {
714,842,342✔
1229
    tscDebug("[report][%s] request result code:%d, skip audit", nodesNodeName(type), pRequest->code);
12,888,489✔
1230
    return;
12,888,489✔
1231
  }
1232

1233
  if (!needSendReport(&pTscObj->pAppInfo->serverCfg, type)) {
701,938,002✔
1234
    tscTrace("[report][%s] audit is disabled", nodesNodeName(type));
701,946,657✔
1235
    return;
701,945,334✔
1236
  }
1237
  
1238
  SAuditReq       req;
×
1239
  req.pSql = pRequest->sqlstr;
×
1240
  req.sqlLen = pRequest->sqlLen;
×
1241
  setAuditReqAffectedRows(pRequest, type, &req);
×
1242
  code = setAuditReqTableInfo(pRequest, type, &req);
×
1243
  if (code == TSDB_CODE_TSC_INVALID_OPERATION) {
×
1244
    return;
×
1245
  } else if (code != TSDB_CODE_SUCCESS) {
×
1246
    tscError("[report][%s] failed to set audit req table info, code:%d", nodesNodeName(type), code);
×
1247
    return;
×
1248
  }
1249
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
×
1250
  req.duration = duration / 1000000.0;  // convert to seconds
×
1251
  setAuditReqOperation(pRequest, type, &req);
×
1252

1253
  int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
×
1254
  void*   pReq = taosMemoryCalloc(1, tlen);
×
1255
  if (pReq == NULL) {
×
1256
    tscError("[report][%s] failed to allocate memory for req", nodesNodeName(type));
×
1257
    return;
×
1258
  }
1259

1260
  if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
×
1261
    tscError("[report][%s] failed to serialize req", nodesNodeName(type));
×
1262
    taosMemoryFree(pReq);
×
1263
    return;
×
1264
  }
1265

1266
  code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
×
1267
  if (code != 0) {
×
1268
    tscError("[report][%s] failed to send audit info, code:%d", nodesNodeName(type), code);
×
1269
    taosMemoryFree(pReq);
×
1270
    return;
×
1271
  }
1272
  tscDebug("[report][%s] data, sql:%s", nodesNodeName(type), req.pSql);
×
1273
}
1274

1275
void clientOperateReport(SRequestObj* pRequest) {
781,306,567✔
1276
  if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
781,306,567✔
1277
    tscDebug("[report] invalid request");
66,464,473✔
1278
    return;
66,460,090✔
1279
  }
1280
  ENodeType type = nodeType(pRequest->pQuery->pRoot);
714,845,076✔
1281
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_SELECT_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type ||
1282
      QUERY_NODE_SET_OPERATOR) {
1283
    reportSqlExecResult(pRequest, type);
714,835,717✔
1284
  }
1285
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc