• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4938

23 Jan 2026 09:40AM UTC coverage: 66.8% (+0.006%) from 66.794%
#4938

push

travis-ci

web-flow
fix: case failuer caused by the modification of the error description (#34391)

204187 of 305671 relevant lines covered (66.8%)

124015580.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.42
/source/client/src/clientMonitor.c
1
#include "clientMonitor.h"
2
#include "cJSON.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5
#include "osEnv.h"
6
#include "query.h"
7
#include "taos.h"
8
#include "taoserror.h"
9
#include "tarray.h"
10
#include "tglobal.h"
11
#include "thash.h"
12
#include "tmisce.h"
13
#include "tqueue.h"
14
#include "ttime.h"
15
#include "ttimer.h"
16

17
SRWLatch    monitorLock;
18
SRWLatch    monitorQueueLock;
19
void*       monitorTimer;
20
SHashObj*   monitorCounterHash;
21
int32_t     monitorFlag = 0;
22
int64_t     quitTime = 0;
23
int32_t     quitCnt = 0;
24
tsem2_t     monitorSem;
25
STaosQueue* monitorQueue;
26
SHashObj*   monitorSlowLogHash;
27
SHashObj*   monitorSlowLogHashPath;
28
char        tmpSlowLogPath[PATH_MAX] = {0};
29
TdThread    monitorThread;
30
extern bool tsEnableAuditDelete;
31

32
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
1,205,005✔
33
  if (strlen(tsTempDir) == 0) {
1,205,005✔
34
    return TSDB_CODE_INVALID_PARA;
×
35
  }
36
  bool hasDelim = (tsTempDir[strlen(tsTempDir) - 1] == TD_DIRSEP_CHAR) ? true : false;
1,205,005✔
37
  int ret = tsnprintf(tmpPath, size, "%s%stdengine_slow_log%s", tsTempDir, hasDelim ? "" : TD_DIRSEP, TD_DIRSEP);
1,205,005✔
38
  if (ret < 0) {
1,205,005✔
39
    tscError("failed to get tmp path ret:%d", ret);
×
40
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
41
  }
42
  return 0;
1,205,005✔
43
}
44

45
static void destroySlowLogClient(void* data) {
×
46
  if (data == NULL) {
×
47
    return;
×
48
  }
49
  SlowLogClient* slowLogClient = *(SlowLogClient**)data;
×
50
  if (taosCloseFile(&slowLogClient->pFile) != 0) {
×
51
    tscError("%s monitor failed to close file:%s, terrno:%d", __func__, slowLogClient->path, terrno);
×
52
  }
53
  slowLogClient->pFile = NULL;
×
54

55
  taosMemoryFree(slowLogClient);
×
56
}
57

58
static void destroyMonitorClient(void* data) {
1,200,342✔
59
  if (data == NULL) {
1,200,342✔
60
    return;
×
61
  }
62
  MonitorClient* pMonitor = *(MonitorClient**)data;
1,200,342✔
63
  if (pMonitor == NULL) {
1,200,342✔
64
    return;
×
65
  }
66
  if (!taosTmrStopA(&pMonitor->timer)) {
1,200,342✔
67
    tscError("failed to stop timer, pMonitor:%p", pMonitor);
185,441✔
68
  }
69
  taosHashCleanup(pMonitor->counters);
1,200,342✔
70
  int ret = taos_collector_registry_destroy(pMonitor->registry);
1,200,342✔
71
  if (ret) {
1,200,342✔
72
    tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
×
73
  }
74
  taosMemoryFree(pMonitor);
1,200,342✔
75
}
76

77
static void monitorFreeSlowLogData(void* paras) {
1,201,665✔
78
  MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
1,201,665✔
79
  if (pData == NULL) {
1,201,665✔
80
    return;
796✔
81
  }
82
  taosMemoryFreeClear(pData->data);
1,200,869✔
83
}
84

85
static void monitorFreeSlowLogDataEx(void* paras) {
796✔
86
  monitorFreeSlowLogData(paras);
796✔
87
  taosMemoryFree(paras);
796✔
88
}
796✔
89

90
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
4,426,040✔
91
  void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
4,426,040✔
92
  if (p == NULL) {
4,426,040✔
93
    tscError("failed to get app inst, clusterId:0x%" PRIx64, clusterId);
×
94
    return NULL;
×
95
  }
96
  return *(SAppInstInfo**)p;
4,426,040✔
97
}
98

99
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
711✔
100
  if (TSDB_CODE_SUCCESS != code) {
711✔
101
    tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
×
102
  }
103
  if (pMsg) {
711✔
104
    taosMemoryFree(pMsg->pData);
711✔
105
    taosMemoryFree(pMsg->pEpSet);
711✔
106
  }
107
  if (param != NULL) {
711✔
108
    MonitorSlowLogData* p = (MonitorSlowLogData*)param;
×
109
    if (code != 0) {
×
110
      tscError("failed to send slow log:%s, clusterId:0x%" PRIx64, p->data, p->clusterId);
×
111
    }
112
    MonitorSlowLogData tmp = *p;
×
113
    tmp.data = NULL;
×
114
    (void)monitorPutData2MonitorQueue(tmp);
×
115
  }
116
  return TSDB_CODE_SUCCESS;
711✔
117
}
118

119
static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) {
796✔
120
  int32_t    code = 0;
796✔
121
  void*      buf = NULL;
796✔
122
  SStatisReq sStatisReq;
124✔
123
  sStatisReq.pCont = pCont;
796✔
124
  sStatisReq.contLen = strlen(pCont);
796✔
125
  sStatisReq.type = type;
796✔
126

127
  int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
796✔
128
  if (tlen < 0) {
796✔
129
    code = terrno;
×
130
    goto FAILED;
×
131
  }
132
  buf = taosMemoryMalloc(tlen);
796✔
133
  if (buf == NULL) {
796✔
134
    tscError("sendReport failed, out of memory, len:%d", tlen);
×
135
    code = terrno;
×
136
    goto FAILED;
×
137
  }
138
  tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
796✔
139
  if (tlen < 0) {
796✔
140
    code = terrno;
×
141
    goto FAILED;
×
142
  }
143

144
  SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
796✔
145
  if (pInfo == NULL) {
796✔
146
    tscError("sendReport failed, out of memory send info");
×
147
    code = terrno;
×
148
    goto FAILED;
×
149
  }
150
  pInfo->fp = monitorReportAsyncCB;
796✔
151
  pInfo->msgInfo.pData = buf;
796✔
152
  pInfo->msgInfo.len = tlen;
796✔
153
  pInfo->msgType = TDMT_MND_STATIS;
796✔
154
  pInfo->param = param;
796✔
155
  pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
796✔
156
  pInfo->requestId = tGenIdPI64();
796✔
157
  pInfo->requestObjRefId = 0;
796✔
158

159
  // int64_t transporterId = 0;
160
  return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
796✔
161

162
FAILED:
×
163
  taosMemoryFree(buf);
×
164
  monitorFreeSlowLogDataEx(param);
×
165
  return code;
×
166
}
167

168
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
2,024,829✔
169
  char ts[50] = {0};
2,024,829✔
170
  (void)snprintf(ts, sizeof(ts), "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
2,024,829✔
171
  char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
2,024,829✔
172
  if (NULL == pCont) {
2,024,829✔
173
    tscError("generateClusterReport failed, get null content. since %s", tstrerror(terrno));
2,024,033✔
174
    return;
2,024,033✔
175
  }
176

177
  if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
796✔
178
    int ret = taos_collector_registry_clear_batch(registry);
796✔
179
    if (ret) {
796✔
180
      tscError("failed to clear registry, ret:%d", ret);
×
181
    }
182
  }
183
  taosMemoryFreeClear(pCont);
796✔
184
}
185

186
static void reportSendProcess(void* param, void* tmrId) {
832,023✔
187
  taosRLockLatch(&monitorLock);
832,023✔
188
  if (atomic_load_32(&monitorFlag) == 1) {
832,023✔
189
    taosRUnLockLatch(&monitorLock);
7,536✔
190
    return;
7,536✔
191
  }
192

193
  MonitorClient* pMonitor = (MonitorClient*)param;
824,487✔
194
  SAppInstInfo*  pInst = getAppInstByClusterId(pMonitor->clusterId);
824,487✔
195
  if (pInst == NULL) {
824,487✔
196
    taosRUnLockLatch(&monitorLock);
×
197
    return;
×
198
  }
199

200
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
824,487✔
201
  generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
824,487✔
202
  bool reset = taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param,
824,487✔
203
                            monitorTimer, &tmrId);
204
  tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
824,487✔
205
  taosRUnLockLatch(&monitorLock);
824,487✔
206
}
207

208
static void sendAllCounter() {
1,203,862✔
209
  MonitorClient** ppMonitor = NULL;
1,203,862✔
210
  while ((ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor))) {
2,404,204✔
211
    MonitorClient* pMonitor = *ppMonitor;
1,200,342✔
212
    if (pMonitor == NULL) {
1,200,342✔
213
      continue;
×
214
    }
215
    SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
1,200,342✔
216
    if (pInst == NULL) {
1,200,342✔
217
      taosHashCancelIterate(monitorCounterHash, ppMonitor);
×
218
      break;
×
219
    }
220
    SEpSet ep = getEpSet_s(&pInst->mgmtEp);
1,200,342✔
221
    generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
1,200,342✔
222
  }
223
}
1,203,862✔
224

225
void monitorCreateClient(int64_t clusterId) {
2,401,738✔
226
  MonitorClient* pMonitor = NULL;
2,401,738✔
227
  taosWLockLatch(&monitorLock);
2,401,738✔
228
  if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
2,401,738✔
229
    tscInfo("clusterId:0x%" PRIx64 ", create monitor", clusterId);
1,200,342✔
230
    pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
1,200,342✔
231
    if (pMonitor == NULL) {
1,200,342✔
232
      tscError("failed to create monitor client");
×
233
      goto fail;
×
234
    }
235
    pMonitor->clusterId = clusterId;
1,200,342✔
236
    char clusterKey[32] = {0};
1,200,342✔
237
    if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) {
1,200,342✔
238
      tscError("failed to create cluster key");
×
239
      goto fail;
×
240
    }
241
    pMonitor->registry = taos_collector_registry_new(clusterKey);
1,200,342✔
242
    if (pMonitor->registry == NULL) {
1,200,342✔
243
      tscError("failed to create registry");
×
244
      goto fail;
×
245
    }
246
    pMonitor->colector = taos_collector_new(clusterKey);
1,200,342✔
247
    if (pMonitor->colector == NULL) {
1,200,342✔
248
      tscError("failed to create collector");
×
249
      goto fail;
×
250
    }
251

252
    int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
1,200,342✔
253
    if (r) {
1,200,342✔
254
      tscError("failed to register collector, ret:%d", r);
×
255
      goto fail;
×
256
    }
257
    pMonitor->counters =
1,211,145✔
258
        (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,200,342✔
259
    if (pMonitor->counters == NULL) {
1,200,342✔
260
      tscError("failed to create monitor counters");
×
261
      goto fail;
×
262
    }
263

264
    if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
1,200,342✔
265
      tscError("failed to put monitor client to hash");
×
266
      goto fail;
×
267
    }
268

269
    SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
1,200,342✔
270
    if (pInst == NULL) {
1,200,342✔
271
      tscError("failed to get app instance by cluster id");
×
272
      pMonitor = NULL;
×
273
      goto fail;
×
274
    }
275
    pMonitor->timer = taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000,
1,200,342✔
276
                                   (void*)pMonitor, monitorTimer);
277
    if (pMonitor->timer == NULL) {
1,200,342✔
278
      tscError("failed to start timer");
×
279
      goto fail;
×
280
    }
281
    tscInfo("clusterId:0x%" PRIx64 ", create monitor finished, monitor:%p", clusterId, pMonitor);
1,200,342✔
282
  }
283
  taosWUnLockLatch(&monitorLock);
2,401,738✔
284

285
  return;
2,401,738✔
286

287
fail:
×
288
  destroyMonitorClient(&pMonitor);
×
289
  taosWUnLockLatch(&monitorLock);
×
290
}
291

292
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count,
2,401,738✔
293
                                const char** label_keys) {
294
  taosWLockLatch(&monitorLock);
2,401,738✔
295
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
2,401,738✔
296
  if (ppMonitor == NULL || *ppMonitor == NULL) {
2,401,738✔
297
    tscError("failed to get monitor client");
×
298
    goto end;
×
299
  }
300
  taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
2,401,738✔
301
  if (newCounter == NULL) return;
2,401,738✔
302
  MonitorClient* pMonitor = *ppMonitor;
2,401,738✔
303
  if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
2,401,738✔
304
    tscError("failed to add metric to collector");
1,054✔
305
    int r = taos_counter_destroy(newCounter);
1,054✔
306
    if (r) {
1,054✔
307
      tscError("failed to destroy counter, code:%d", r);
×
308
    }
309
    goto end;
1,054✔
310
  }
311
  if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
2,400,684✔
312
    tscError("failed to put counter to monitor");
×
313
    int r = taos_counter_destroy(newCounter);
×
314
    if (r) {
×
315
      tscError("failed to destroy counter, code:%d", r);
×
316
    }
317
    goto end;
×
318
  }
319
  tscInfo("clusterId:0x%" PRIx64 ", monitor:%p, create counter:%s %p", pMonitor->clusterId, pMonitor, name, newCounter);
2,400,684✔
320

321
end:
2,380,132✔
322
  taosWUnLockLatch(&monitorLock);
2,401,738✔
323
}
324

325
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
136,427✔
326
  taosWLockLatch(&monitorLock);
136,427✔
327
  if (atomic_load_32(&monitorFlag) == 1) {
136,427✔
328
    taosWUnLockLatch(&monitorLock);
×
329
    return;
×
330
  }
331

332
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
136,427✔
333
  if (ppMonitor == NULL || *ppMonitor == NULL) {
136,427✔
334
    tscError("clusterId:0x%" PRIx64 ", monitor not found", clusterId);
×
335
    goto end;
×
336
  }
337

338
  MonitorClient*   pMonitor = *ppMonitor;
136,427✔
339
  taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
136,427✔
340
  if (ppCounter == NULL || *ppCounter == NULL) {
136,427✔
341
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s not found", clusterId, pMonitor, counterName);
×
342
    goto end;
×
343
  }
344
  if (taos_counter_inc(*ppCounter, label_values) != 0) {
136,427✔
345
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s inc failed", clusterId, pMonitor, counterName);
×
346
    goto end;
×
347
  }
348
  tscTrace("clusterId:0x%" PRIx64 ", monitor:%p, counter:%s inc", pMonitor->clusterId, pMonitor, counterName);
136,427✔
349

350
end:
136,427✔
351
  taosWUnLockLatch(&monitorLock);
136,427✔
352
}
353

354
const char* monitorResultStr(SQL_RESULT_CODE code) {
136,427✔
355
  static const char* result_state[] = {"Success", "Failed", "Cancel"};
356
  return result_state[code];
136,427✔
357
}
358

359
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
×
360
  TdFilePtr pFile = NULL;
×
361
  SlowLogClient* pClient = NULL;
×
362
  void*     tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
×
363
  if (tmp == NULL) {
×
364
    char path[PATH_MAX] = {0};
×
365
    char clusterId[32] = {0};
×
366
    if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64"-%"PRId64, slowLogData->clusterId, taosGetTimestampNs()) < 0) {
×
367
      tscError("failed to generate clusterId:0x%" PRIx64, slowLogData->clusterId);
×
368
      return;
×
369
    }
370
    taosGetTmpfilePath(tmpPath, clusterId, path);
×
371
    tscInfo("monitor create slow log file:%s", path);
×
372
    pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
373
    if (pFile == NULL) {
×
374
      tscError("failed to open file:%s since %d", path, terrno);
×
375
      return;
×
376
    }
377

378
    pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
×
379
    if (pClient == NULL) {
×
380
      tscError("failed to allocate memory for slow log client");
×
381
      int32_t ret = taosCloseFile(&pFile);
×
382
      if (ret != 0) {
×
383
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
384
      }
385
      return;
×
386
    }
387
    tstrncpy(pClient->path, path, PATH_MAX);
×
388
    pClient->sendOffset = 0;
×
389
    pClient->pFile = pFile;
×
390
    pClient->clusterId = slowLogData->clusterId;
×
391
    pClient->type = SLOW_LOG_READ_RUNNING;
×
392
    if (taosHashPut(monitorSlowLogHash, &pClient->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
×
393
      tscError("failed to put clusterId:0x%" PRIx64 " to hash table", pClient->clusterId);
×
394
      int32_t ret = taosCloseFile(&pFile);
×
395
      if (ret != 0) {
×
396
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
397
      }
398
      taosMemoryFree(pClient);
×
399
      return;
×
400
    }
401

402
    if (taosLockFile(pFile) < 0) {
×
403
      tscError("failed to lock file:%p since %s", pFile, terrstr());
×
404
      return;
×
405
    }
406
  } else {
407
    pFile = (*(SlowLogClient**)tmp)->pFile;
×
408
    pClient = *(SlowLogClient**)tmp;
×
409
  }
410

411
  if (taosLSeekFile(pFile, 0, SEEK_END) < 0) {
×
412
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
413
    return;
×
414
  }
415
  if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) {
×
416
    tscError("failed to write len to file:%p since %s", pFile, terrstr());
×
417
  }
418
  tscDebug("monitor write slow log to file:%s, clusterId:0x%" PRIx64 ", data:%s", pClient->path, pClient->clusterId, slowLogData->data);
×
419
}
420

421
static char* readFile(SlowLogClient* pClient) {
×
422
  tscDebug("monitor readFile slow begin file:%s, offset:%" PRId64 ", size:%" PRId64, pClient->path,
×
423
           pClient->sendOffset, pClient->size);
424
  if (taosLSeekFile(pClient->pFile, pClient->sendOffset, SEEK_SET) < 0) {
×
425
    tscError("failed to seek file:%p code:%d", pClient->path, terrno);
×
426
    return NULL;
×
427
  }
428

429
  if ((pClient->size <= pClient->sendOffset)) {
×
430
    tscError("invalid size:%" PRId64 ", offset:%" PRId64, pClient->size, pClient->sendOffset);
×
431
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
432
    return NULL;
×
433
  }
434
  char*   pCont = NULL;
×
435
  int64_t totalSize = 0;
×
436
  if (pClient->size - pClient->sendOffset >= SLOW_LOG_SEND_SIZE_MAX) {
×
437
    totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX;
×
438
  } else {
439
    totalSize = 4 + (pClient->size - pClient->sendOffset);
×
440
  }
441

442
  pCont = taosMemoryCalloc(1, totalSize);  // 4 reserved for []
×
443
  if (pCont == NULL) {
×
444
    tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
×
445
    return NULL;
×
446
  }
447
  char* buf = pCont;
×
448
  (void)strncat(buf++, "[", totalSize - 1);
×
449
  int64_t readSize = taosReadFile(pClient->pFile, buf, totalSize - 4);  // 4 reserved for []
×
450
  if (readSize <= 0) {
×
451
    if (readSize < 0) {
×
452
      tscError("failed to read len from file:%s since %s", pClient->path, terrstr());
×
453
    }
454
    taosMemoryFree(pCont);
×
455
    return NULL;
×
456
  }
457

458
  totalSize = 0;
×
459
  while (1) {
×
460
    size_t len = strlen(buf);
×
461
    if (len == SLOW_LOG_SEND_SIZE_MAX) {  // one item is too long
×
462
      pClient->sendOffset = pClient->size;
×
463
      *buf = ']';
×
464
      *(buf + 1) = '\0';
×
465
      break;
×
466
    }
467

468
    totalSize += (len + 1);
×
469
    if (totalSize > readSize) {
×
470
      *(buf - 1) = ']';
×
471
      *buf = '\0';
×
472
      break;
×
473
    }
474

475
    if (len == 0) {             // one item is empty
×
476
      if (*(buf - 1) == '[') {  // data is "\0"
×
477
        // no data read
478
        *buf = ']';
×
479
        *(buf + 1) = '\0';
×
480
      } else {  // data is "ass\0\0"
481
        *(buf - 1) = ']';
×
482
        *buf = '\0';
×
483
      }
484
      pClient->sendOffset += 1;
×
485
      break;
×
486
    }
487
    buf[len] = ',';  // replace '\0' with ','
×
488
    buf += (len + 1);
×
489
    pClient->sendOffset += (len + 1);
×
490
  }
491

492
  tscDebugL("monitor readFile slow log end, data:%s, offset:%" PRId64, pCont, pClient->sendOffset);
×
493
  return pCont;
×
494
}
495

496
static void processFileRemoved(SlowLogClient* pClient) {
×
497
  int32_t ret = taosCloseFile(&(pClient->pFile));
×
498
  if (ret != 0) {
×
499
    tscError("%s failed to close file:%s ret:%d", __func__, pClient->path, ret);
×
500
  }
501
  pClient->pFile = NULL;
×
502

503
  TdFilePtr pFile =
504
      taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
505
  if (pFile == NULL) {
×
506
    tscError("%s failed to open file:%s since %d", __func__, pClient->path, terrno);
×
507
  } else {
508
    pClient->pFile = pFile;
×
509
  }
510
  pClient->size = 0;
×
511
}
×
512

513
static int32_t processFile(SlowLogClient* pClient) {
×
514
  int32_t code = 0;
×
515
  code = taosStatFile(pClient->path, &pClient->size, NULL, NULL);
×
516
  if (code < 0 && ERRNO == ENOENT) {
×
517
    processFileRemoved(pClient);
×
518
    tscError("%s monitor rebuild file:%s because of file not exist", __func__, pClient->path);
×
519
    code = 0;
×
520
  }
521
  return code;
×
522
}
523

524
static int32_t sendSlowLog(int64_t clusterId, char* data, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter,
×
525
                           SEpSet* epSet) {
526
  if (data == NULL) {
×
527
    return TSDB_CODE_INVALID_PARA;
×
528
  }
529
  MonitorSlowLogData* pParam = taosMemoryCalloc(1, sizeof(MonitorSlowLogData));
×
530
  if (pParam == NULL) {
×
531
    taosMemoryFree(data);
×
532
    return terrno;
×
533
  }
534
  pParam->data = data;
×
535
  pParam->type = type;
×
536
  pParam->fileName = fileName;
×
537
  pParam->clusterId = clusterId;
×
538
  return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
×
539
}
540

541
static int32_t monitorReadSend(SlowLogClient* pClient) {
×
542
  SAppInstInfo* pInst = getAppInstByClusterId(pClient->clusterId);
×
543
  if (pInst == NULL) {
×
544
    tscError("%s failed to get app instance by clusterId:0x%" PRIx64, __func__, pClient->clusterId);
×
545
    return terrno;
×
546
  }
547
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
548
  char*  data = readFile(pClient);
×
549
  if (data == NULL) return terrno;
×
550
  int32_t code = sendSlowLog(pClient->clusterId, data, pClient->type, pClient->path, pInst->pTransporter, &ep);
×
551
  if (code != 0) {
×
552
    pClient->sendOffset = 0;  // set offset = 0 to send data at beginning if send data failed
×
553
    tscError("monitor send slow log failed, clusterId:0x%" PRIx64 ", ret:%d", pClient->clusterId, code);
×
554
  }
555
  return code;
×
556
}
557

558
static void sendOneClient(SlowLogClient* pClient) {
×
559
  int32_t code = processFile(pClient);
×
560
  if (code < 0) {
×
561
    tscError("failed to get file size for file:%s, code:%d, errno:%d", pClient->path, code, errno);
×
562
    return;
×
563
  }
564
  if (pClient->size <= pClient->sendOffset) {
×
565
    if (pClient->type == SLOW_LOG_READ_RUNNING && pClient->size > 0) {
×
566
      if (taosFtruncateFile(pClient->pFile, 0) < 0) {
×
567
        tscError("failed to truncate file:%s code:%d", pClient->path, terrno);
×
568
      }
569
      tscDebug("monitor truncate file to 0 file:%s", pClient->path);
×
570
    } else if (pClient->type == SLOW_LOG_READ_BEGINNIG || pClient->type == SLOW_LOG_READ_QUIT) {
×
571
      if (taosCloseFile(&pClient->pFile) != 0) {
×
572
        tscError("failed to close file:%s ret:%d", pClient->path, terrno);
×
573
      }
574
      pClient->pFile = NULL;
×
575
      if (taosRemoveFile(pClient->path) != 0) {
×
576
        tscError("failed to remove file:%s, terrno:%d", pClient->path, terrno);
×
577
      }
578
      tscInfo("monitor remove file:%s when send data out at beginning", pClient->path);
×
579
    }
580

581
    pClient->sendOffset = 0;
×
582
    if (pClient->type == SLOW_LOG_READ_QUIT) {
×
583
      pClient->closed = true;
×
584
      quitCnt++;
×
585
    }
586
  } else {
587
    if (pClient->closed) {
×
588
      tscWarn("%s client is closed, skip send slow log, file:%s", __func__, pClient->path);
×
589
      return;
×
590
    }
591
    code = monitorReadSend(pClient);
×
592
    tscDebug("%s monitor send slow log, file:%s code:%d", __func__, pClient->path, code);
×
593
  }
594
}
595

596
static void monitorSendSlowLogAtRunningCb(int64_t clusterId) {
×
597
  int32_t code = 0;
×
598
  void*   tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
599
  if (tmp == NULL) {
×
600
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
601
    return;
×
602
  }
603
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
604
  if (pClient == NULL) {
×
605
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
606
    return;
×
607
  }
608
  sendOneClient(pClient);
×
609
}
610

611
static void monitorSendSlowLogAtBeginningCb(const char* fileName) {
×
612
  int32_t code = 0;
×
613
  void*   tmp = taosHashGet(monitorSlowLogHashPath, fileName, strlen(fileName));
×
614
  if (tmp == NULL) {
×
615
    tscError("failed to get slow log client by fileName:%s", fileName);
×
616
    return;
×
617
  }
618
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
619
  if (pClient == NULL) {
×
620
    tscError("failed to get slow log client by fileName:%s", fileName);
×
621
    return;
×
622
  }
623
  sendOneClient(pClient);
×
624
}
625

626
static void monitorSendAllSlowLog() {
314,259,634✔
627
  int32_t code = 0;
314,259,634✔
628
  int64_t t = taosGetMonoTimestampMs();
314,259,634✔
629
  void*   pIter = NULL;
314,259,634✔
630
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
314,259,634✔
631
    int64_t*       clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
×
632
    SAppInstInfo*  pInst = getAppInstByClusterId(*clusterId);
×
633
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
×
634
    if (pClient == NULL || pInst == NULL) {
×
635
      taosHashCancelIterate(monitorSlowLogHash, pIter);
×
636
      return;
×
637
    }
638
    if (t - pClient->lastSendTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000 ||
×
639
        atomic_load_32(&monitorFlag) == 1) {
×
640
      pClient->lastSendTime = t;
×
641
    } else {
642
      continue;
×
643
    }
644

645
    if (atomic_load_32(&monitorFlag) == 1) {  // change type to quit
×
646
      pClient->type = SLOW_LOG_READ_QUIT;
×
647
    }
648
    tscDebug("monitor send slow log for clusterId:0x%" PRIx64 ", file:%s, type:%d", *clusterId, pClient->path, pClient->type);
×
649
    if (pClient->sendOffset > 0) {  // already in sending process
×
650
      continue;
×
651
    }
652
    sendOneClient(pClient);
×
653
  }
654
}
655

656
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
1,200,869✔
657
  SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
1,200,869✔
658

659
  if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
1,200,869✔
660
    tscInfo("monitor is disabled, skip send slow log");
1,199,688✔
661
    return;
1,199,688✔
662
  }
663
  char namePrefix[PATH_MAX] = {0};
1,181✔
664
  if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
1,181✔
665
    tscError("failed to generate slow log file name prefix");
×
666
    return;
×
667
  }
668

669
  char tmpPath[PATH_MAX] = {0};
1,181✔
670
  if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
1,181✔
671
    return;
×
672
  }
673

674
  TdDirPtr pDir = taosOpenDir(tmpPath);
1,181✔
675
  if (pDir == NULL) {
1,181✔
676
    return;
×
677
  }
678

679
  TdDirEntryPtr de = NULL;
1,181✔
680
  while ((de = taosReadDir(pDir)) != NULL) {
3,543✔
681
    if (taosDirEntryIsDir(de)) {
2,362✔
682
      continue;
2,362✔
683
    }
684

685
    char* name = taosGetDirEntryName(de);
×
686
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) {
×
687
      tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId);
×
688
      continue;
×
689
    }
690

691
    char filename[PATH_MAX] = {0};
×
692
    (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
×
693
    int64_t fileSize = 0;
×
694
    if (taosStatFile(filename, &fileSize, NULL, NULL) < 0) {
×
695
      tscError("failed to get file:%s status, since %s", filename, terrstr());
×
696
      continue;
×
697
    }
698
    if (fileSize == 0) {
×
699
      if (taosRemoveFile(filename) != 0) {
×
700
        tscError("failed to remove file:%s, terrno:%d", filename, terrno);
×
701
      }
702
      continue;
×
703
    }
704

705
    tscInfo("%s monitor opening file:%s at beginning", __func__, filename);
×
706
    TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
×
707
    if (pFile == NULL) {
×
708
      tscError("failed to open file:%s since %s", filename, terrstr());
×
709
      continue;
×
710
    }
711
    if (taosLockFile(pFile) < 0) {
×
712
      tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
×
713
      int32_t ret = taosCloseFile(&pFile);
×
714
      if (ret != 0) {
×
715
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
716
      }
717
      continue;
×
718
    }
719

720
    SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
×
721
    if (pClient == NULL) {
×
722
      tscError("failed to allocate memory for slow log client");
×
723
      int32_t ret = taosCloseFile(&pFile);
×
724
      if (ret != 0) {
×
725
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
726
      }
727
      return;
×
728
    }
729
    tstrncpy(pClient->path, filename, PATH_MAX);
×
730
    pClient->sendOffset = 0;
×
731
    pClient->pFile = pFile;
×
732
    pClient->clusterId = clusterId;
×
733
    pClient->type = SLOW_LOG_READ_BEGINNIG;
×
734
    if (taosHashPut(monitorSlowLogHashPath, filename, strlen(filename), &pClient, POINTER_BYTES) != 0) {
×
735
      tscError("failed to put clusterId:0x%" PRIx64 " to hash table", pClient->clusterId);
×
736
      int32_t ret = taosCloseFile(&pFile);
×
737
      if (ret != 0) {
×
738
        tscError("failed to close file:%s ret:%d", filename, ret);
×
739
      }
740
      taosMemoryFree(pClient);
×
741
      return;
×
742
    }
743
    sendOneClient(pClient);
×
744
  }
745

746
  int32_t ret = taosCloseDir(&pDir);
1,181✔
747
  if (ret != 0) {
1,181✔
748
    tscError("failed to close dir, ret:%d", ret);
×
749
  }
750
}
751

752
static void* monitorThreadFunc(void* param) {
1,203,824✔
753
  setThreadName("client-monitor-slowlog");
1,203,824✔
754
  tscInfo("monitor update thread started");
1,203,824✔
755
  while (1) {
314,259,634✔
756
    if (atomic_load_32(&monitorFlag) == 1) {
315,463,458✔
757
      if (taosGetMonoTimestampMs() - quitTime > 1000 ||
1,203,824✔
758
          quitCnt == taosHashGetSize(monitorSlowLogHash)) {  // quit at most 1000ms or no data need to send
1,203,824✔
759
        tscInfo("monitorThreadFunc quit since timeout or quitcnt:%d", quitCnt);
1,203,824✔
760
        break;
1,203,824✔
761
      }
762
    }
763

764
    (void)tsem2_timewait(&monitorSem, 100);
314,259,634✔
765
    monitorSendAllSlowLog();
314,259,634✔
766

767
    MonitorSlowLogData* slowLogData = NULL;
314,259,634✔
768
    taosReadQitem(monitorQueue, (void**)&slowLogData);
314,259,634✔
769
    if (slowLogData == NULL) {
314,259,634✔
770
      continue;
313,058,765✔
771
    }
772

773
    if (slowLogData->type == SLOW_LOG_READ_ALL) {
1,200,869✔
774
      monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
1,200,869✔
775
    } else if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) {
×
776
      monitorSendSlowLogAtBeginningCb(slowLogData->fileName);
×
777
    } else if (slowLogData->type == SLOW_LOG_WRITE) {
×
778
      monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
×
779
    } else if (slowLogData->type == SLOW_LOG_READ_RUNNING || slowLogData->type == SLOW_LOG_READ_QUIT) {
×
780
      monitorSendSlowLogAtRunningCb(slowLogData->clusterId);
×
781
    }
782
    monitorFreeSlowLogData(slowLogData);
1,200,869✔
783
    taosFreeQitem(slowLogData);
1,200,869✔
784
  }
785

786
  return NULL;
1,203,824✔
787
}
788

789
static int32_t tscMonitortInit() {
1,203,824✔
790
  TdThreadAttr thAttr;
1,192,859✔
791
  if (taosThreadAttrInit(&thAttr) != 0) {
1,203,824✔
792
    tscError("failed to init thread attr since %s", strerror(ERRNO));
×
793
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
794
  }
795
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
1,203,824✔
796
    tscError("failed to set thread attr since %s", strerror(ERRNO));
×
797
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
798
  }
799

800
  if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
1,203,824✔
801
    tscError("failed to create monitor thread since %s", strerror(ERRNO));
×
802
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
803
  }
804

805
  (void)taosThreadAttrDestroy(&thAttr);
1,203,824✔
806
  return 0;
1,203,824✔
807
}
808

809
static void tscMonitorStop() {
1,203,862✔
810
  if (taosCheckPthreadValid(monitorThread)) {
1,203,862✔
811
    (void)taosThreadJoin(monitorThread, NULL);
1,203,824✔
812
    taosThreadClear(&monitorThread);
1,203,824✔
813
  }
814
}
1,203,862✔
815

816
int32_t monitorInit() {
1,203,824✔
817
  int32_t code = 0;
1,203,824✔
818

819
  tscInfo("monitor init");
1,203,824✔
820
  monitorCounterHash =
1,203,824✔
821
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,203,824✔
822
  if (monitorCounterHash == NULL) {
1,203,824✔
823
    tscError("failed to create monitorCounterHash");
×
824
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
825
  }
826
  taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
1,203,824✔
827

828
  monitorSlowLogHashPath =
1,203,824✔
829
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,203,824✔
830
  if (monitorSlowLogHashPath == NULL) {
1,203,824✔
831
    tscError("failed to create monitorSlowLogHashPath");
×
832
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
833
  }
834
  taosHashSetFreeFp(monitorSlowLogHashPath, destroySlowLogClient);
1,203,824✔
835

836
  monitorSlowLogHash =
1,203,824✔
837
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,203,824✔
838
  if (monitorSlowLogHash == NULL) {
1,203,824✔
839
    tscError("failed to create monitorSlowLogHash");
×
840
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
841
  }
842
  taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
1,203,824✔
843

844
  monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
1,203,824✔
845
  if (monitorTimer == NULL) {
1,203,824✔
846
    tscError("failed to create monitor timer");
×
847
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
848
  }
849

850
  code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
1,203,824✔
851
  if (code != 0) {
1,203,824✔
852
    return code;
×
853
  }
854

855
  code = taosMulModeMkDir(tmpSlowLogPath, 0777, true);
1,203,824✔
856
  if (code != 0) {
1,203,824✔
857
    tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
×
858
    return code;
×
859
  }
860

861
  if (tsem2_init(&monitorSem, 0, 0) != 0) {
1,203,824✔
862
    tscError("sem init error since %s", terrstr());
×
863
    return TAOS_SYSTEM_ERROR(ERRNO);
×
864
  }
865

866
  code = taosOpenQueue(&monitorQueue);
1,203,824✔
867
  if (code) {
1,203,824✔
868
    tscError("open queue error since %s", terrstr());
×
869
    return TAOS_GET_TERRNO(code);
×
870
  }
871

872
  taosInitRWLatch(&monitorLock);
1,203,824✔
873
  taosInitRWLatch(&monitorQueueLock);
1,203,824✔
874
  return tscMonitortInit();
1,203,824✔
875
}
876

877
void monitorClose() {
1,203,862✔
878
  tscInfo("monitor close");
1,203,862✔
879
  taosWLockLatch(&monitorLock);
1,203,862✔
880
  atomic_store_32(&monitorFlag, 1);
1,203,862✔
881
  atomic_store_64(&quitTime, taosGetMonoTimestampMs());
1,203,862✔
882
  tscMonitorStop();
1,203,862✔
883
  sendAllCounter();
1,203,862✔
884
  taosHashCleanup(monitorCounterHash);
1,203,862✔
885
  taosHashCleanup(monitorSlowLogHash);
1,203,862✔
886
  taosTmrCleanUp(monitorTimer);
1,203,862✔
887
  taosWUnLockLatch(&monitorLock);
1,203,862✔
888

889
  taosWLockLatch(&monitorQueueLock);
1,203,862✔
890
  taosCloseQueue(monitorQueue);
1,203,862✔
891
  monitorQueue = NULL;
1,203,862✔
892
  if (tsem2_destroy(&monitorSem) != 0) {
1,203,862✔
893
    tscError("failed to destroy semaphore");
×
894
  }
895
  taosWUnLockLatch(&monitorQueueLock);
1,203,862✔
896
}
1,203,862✔
897

898
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
1,200,869✔
899
  int32_t             code = 0;
1,200,869✔
900
  MonitorSlowLogData* slowLogData = NULL;
1,200,869✔
901

902
  code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
1,200,869✔
903
  if (code) {
1,200,869✔
904
    tscError("monitor failed to allocate slow log data");
×
905
    return code;
×
906
  }
907
  *slowLogData = data;
1,200,869✔
908
  tscDebug("monitor write slow log to queue, clusterId:0x%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
1,200,869✔
909
           queueTypeStr[slowLogData->type], slowLogData->data == NULL ? "null" : slowLogData->data);
910
  taosWLockLatch(&monitorQueueLock);
1,200,869✔
911
  if (monitorQueue == NULL) {
1,200,869✔
912
    tscError("monitor queue is null");
×
913
    taosWUnLockLatch(&monitorQueueLock);
×
914
    taosFreeQitem(slowLogData);
×
915
    return 0;
×
916
  }
917
  code = taosWriteQitem(monitorQueue, slowLogData);
1,200,869✔
918
  taosWUnLockLatch(&monitorQueueLock);
1,200,869✔
919

920
  if (code == 0) {
1,200,869✔
921
    if (tsem2_post(&monitorSem) != 0) {
1,200,869✔
922
      tscError("failed to post semaphore");
×
923
    }
924
  } else {
925
    taosFreeQitem(slowLogData);
×
926
  }
927
  return code;
1,200,869✔
928
}
929

930
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
300✔
931
  taosMemoryFree(pMsg->pData);
300✔
932
  taosMemoryFree(pMsg->pEpSet);
300✔
933
  tscDebug("[del report]delete reportCB code:%d", code);
300✔
934
  return 0;
300✔
935
}
936

937
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
300✔
938
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
300✔
939
  if (sendInfo == NULL) {
300✔
940
    tscError("[del report] failed to allocate memory for sendInfo");
×
941
    return terrno;
×
942
  }
943

944
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
300✔
945

946
  sendInfo->requestId = requestId;
300✔
947
  sendInfo->requestObjRefId = 0;
300✔
948
  sendInfo->param = NULL;
300✔
949
  sendInfo->fp = reportCB;
300✔
950
  sendInfo->msgType = TDMT_MND_AUDIT;
300✔
951

952
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
300✔
953

954
  int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
300✔
955
  if (code != 0) {
300✔
956
    tscError("[del report] failed to send msg to server, code:%d", code);
×
957
    taosMemoryFree(sendInfo);
×
958
    return code;
×
959
  }
960
  return TSDB_CODE_SUCCESS;
300✔
961
}
962

963
static int32_t setDeleteStmtAuditReqTableInfo(SDeleteStmt* pStmt, SAuditReq* pReq) {
60✔
964
  if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
60✔
965
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->pFromTable->type));
×
966
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
967
  }
968
  SRealTableNode* pTableNode = (SRealTableNode*)pStmt->pFromTable;
60✔
969
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTableNode->table.tableName));
60✔
970
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTableNode->table.dbName));
60✔
971
  return TSDB_CODE_SUCCESS;
60✔
972
}
973

974
static int32_t setModifyStmtAuditReqTableInfo(SVnodeModifyOpStmt* pStmt, SAuditReq* pReq) {
120✔
975
  if (pStmt->insertType != TSDB_QUERY_TYPE_INSERT && pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT &&
120✔
976
      pStmt->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
60✔
977
    tscDebug("[report] invalid from table node type:%s", nodesNodeName(pStmt->sqlNodeType));
60✔
978
    return TSDB_CODE_TSC_INVALID_OPERATION;
60✔
979
  }
980

981
  TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pStmt->targetTableName.tname));
60✔
982
  TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pStmt->targetTableName.dbname));
60✔
983
  return TSDB_CODE_SUCCESS;
60✔
984
}
985

986
typedef struct SAuditTableListInfo {
987
  SArray* dbList;
988
  SArray* tableList;
989
} SAuditTableListInfo;
990

991
static int32_t initAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
992
  if (pInfo->dbList) return TSDB_CODE_SUCCESS;
×
993

994
  pInfo->dbList = taosArrayInit(4, TSDB_DB_FNAME_LEN);
×
995
  if (pInfo->dbList == NULL) {
×
996
    tscError("[report] failed to create db list array");
×
997
    return terrno;
×
998
  }
999

1000
  pInfo->tableList = taosArrayInit(4, TSDB_TABLE_NAME_LEN);
×
1001
  if (pInfo->tableList == NULL) {
×
1002
    tscError("[report] failed to create table list array");
×
1003
    taosArrayDestroy(pInfo->dbList);
×
1004
    return terrno;
×
1005
  }
1006
  return TSDB_CODE_SUCCESS;
×
1007
}
1008

1009
static void destroyAuditTableListInfo(SAuditTableListInfo* pInfo) {
×
1010
  if (pInfo->dbList) {
×
1011
    taosArrayDestroy(pInfo->dbList);
×
1012
    pInfo->dbList = NULL;
×
1013
  }
1014
  if (pInfo->tableList) {
×
1015
    taosArrayDestroy(pInfo->tableList);
×
1016
    pInfo->tableList = NULL;
×
1017
  }
1018
}
×
1019

1020
static void copyTableInfoToAuditReq(SAuditTableListInfo* pTbListInfo, SAuditReq* pReq) {
×
1021
  if (pTbListInfo->dbList->size > 0) {
×
1022
    char* dbName = (char*)taosArrayGet(pTbListInfo->dbList, 0);
×
1023
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", dbName));
×
1024
  }
1025
  if (pTbListInfo->tableList->size > 0) {
×
1026
    char* tableName = (char*)taosArrayGet(pTbListInfo->tableList, 0);
×
1027
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", tableName));
×
1028
  }
1029
}
×
1030

1031
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo);
1032
static int32_t doSetOperatorTableInfo(SSetOperator* pSetOperator, SAuditTableListInfo* pTbListInfo) {
×
1033
  int32_t code = TSDB_CODE_SUCCESS;
×
1034
  if (pSetOperator->pLeft) {
×
1035
    SSelectStmt* pLeftSelect = (SSelectStmt*)pSetOperator->pLeft;
×
1036
    if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SELECT_STMT) {
×
1037
      code = doSetSelectStmtAuditReqTableInfo(pLeftSelect->pFromTable, NULL, pTbListInfo);
×
1038
    } else if (nodeType(pSetOperator->pLeft) == QUERY_NODE_SET_OPERATOR) {
×
1039
      SSetOperator* pLeftSetOperator = (SSetOperator*)pSetOperator->pLeft;
×
1040
      code = doSetOperatorTableInfo(pLeftSetOperator, pTbListInfo);
×
1041
      TAOS_RETURN(code);
×
1042
    }
1043
  }
1044
  if (pSetOperator->pRight) {
×
1045
    SSelectStmt* pRightSelect = (SSelectStmt*)pSetOperator->pRight;
×
1046
    if (nodeType(pSetOperator->pRight) == QUERY_NODE_SELECT_STMT) {
×
1047
      code = doSetSelectStmtAuditReqTableInfo(pRightSelect->pFromTable, NULL, pTbListInfo);
×
1048
      TAOS_RETURN(code);
×
1049
    } else if (nodeType(pSetOperator->pRight) == QUERY_NODE_SET_OPERATOR) {
×
1050
      SSetOperator* pRightSetOperator = (SSetOperator*)pSetOperator->pRight;
×
1051
      code = doSetOperatorTableInfo(pRightSetOperator, pTbListInfo);
×
1052
      TAOS_RETURN(code);
×
1053
    }
1054
  }
1055
  return code;
×
1056
}
1057

1058
static int32_t doSetSelectStmtAuditReqTableInfo(SNode* pFromTable, SAuditReq* pReq, SAuditTableListInfo* pTbListInfo) {
180✔
1059
  int32_t     code = TSDB_CODE_SUCCESS;
180✔
1060
  int32_t     lino = 0;
180✔
1061
  STableNode* pTable = NULL;
180✔
1062

1063
  if (!pFromTable) return TSDB_CODE_TSC_INVALID_OPERATION;
180✔
1064

1065
  if (nodeType(pFromTable) == QUERY_NODE_REAL_TABLE) {
180✔
1066
    SRealTableNode* pTableNode = (SRealTableNode*)pFromTable;
180✔
1067
    pTable = &pTableNode->table;
180✔
1068

1069
  } else if (nodeType(pFromTable) == QUERY_NODE_TEMP_TABLE && ((STempTableNode*)pFromTable)->pSubquery) {
×
1070
    if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SELECT_STMT) {
×
1071
      SSelectStmt* pSubquery = (SSelectStmt*)((STempTableNode*)pFromTable)->pSubquery;
×
1072
      return doSetSelectStmtAuditReqTableInfo(pSubquery->pFromTable, pReq, pTbListInfo);
×
1073

1074
    } else if (nodeType(((STempTableNode*)pFromTable)->pSubquery) == QUERY_NODE_SET_OPERATOR) {
×
1075
      code = initAuditTableListInfo(pTbListInfo);
×
1076
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1077

1078
      SSetOperator* pSetOperator = (SSetOperator*)((STempTableNode*)pFromTable)->pSubquery;
×
1079
      code = doSetOperatorTableInfo(pSetOperator, pTbListInfo);
×
1080
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
1081
    }
1082
  } else if (nodeType(pFromTable) == QUERY_NODE_VIRTUAL_TABLE && pFromTable) {
×
1083
    SVirtualTableNode* pVtable = (SVirtualTableNode*)pFromTable;
×
1084
    pTable = &pVtable->table;
×
1085

1086
  } else if (nodeType(pFromTable) == QUERY_NODE_JOIN_TABLE) {
×
1087
    code = initAuditTableListInfo(pTbListInfo);
×
1088
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1089

1090
    SJoinTableNode* pJoinTable = (SJoinTableNode*)pFromTable;
×
1091
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pLeft, NULL, pTbListInfo);
×
1092
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1093
    code = doSetSelectStmtAuditReqTableInfo(pJoinTable->pRight, NULL, pTbListInfo);
×
1094
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1095
  }
1096
  if (pTbListInfo->dbList == NULL && pTable && pReq) {
180✔
1097
    TAOS_UNUSED(tsnprintf(pReq->table, TSDB_TABLE_NAME_LEN, "%s", pTable->tableName));
180✔
1098
    TAOS_UNUSED(tsnprintf(pReq->db, TSDB_DB_FNAME_LEN, "%s", pTable->dbName));
180✔
1099
  } else if (pTbListInfo->dbList != NULL && pTable) {
×
1100
    void* tmp = taosArrayPush(pTbListInfo->dbList, pTable->dbName);
×
1101
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1102
    tmp = taosArrayPush(pTbListInfo->tableList, pTable->tableName);
×
1103
    TSDB_CHECK_NULL(tmp, code, lino, _exit, terrno);
×
1104
  }
1105

1106
_exit:
180✔
1107
  if (code != TSDB_CODE_SUCCESS) {
180✔
1108
    tscError("[report] failed to set select stmt audit req table info, code:%d, lino:%d", code, lino);
×
1109
  }
1110
  return code;
180✔
1111
}
1112

1113
static int32_t setSelectStmtAuditReqTableInfo(SSelectStmt* pStmt, SAuditReq* pReq) {
180✔
1114
  int32_t             code = TSDB_CODE_SUCCESS;
180✔
1115
  SAuditTableListInfo tableListInfo = {0};
180✔
1116

1117
  code = doSetSelectStmtAuditReqTableInfo(pStmt->pFromTable, pReq, &tableListInfo);
180✔
1118
  if (code == TSDB_CODE_SUCCESS && tableListInfo.dbList) {
180✔
1119
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1120
    destroyAuditTableListInfo(&tableListInfo);
×
1121
  }
1122
  return code;
180✔
1123
}
1124

1125
static int32_t setOperatorTableInfo(SSetOperator* pSetOperator, SAuditReq* pReq) {
×
1126
  int32_t             code = TSDB_CODE_SUCCESS;
×
1127
  SAuditTableListInfo tableListInfo = {0};
×
1128
  code = initAuditTableListInfo(&tableListInfo);
×
1129
  if (code != TSDB_CODE_SUCCESS) {
×
1130
    return code;
×
1131
  }
1132
  code = doSetOperatorTableInfo(pSetOperator, &tableListInfo);
×
1133
  if (code == TSDB_CODE_SUCCESS) {
×
1134
    copyTableInfoToAuditReq(&tableListInfo, pReq);
×
1135
  }
1136
  destroyAuditTableListInfo(&tableListInfo);
×
1137
  return code;
×
1138
}
1139

1140
static int32_t setAuditReqTableInfo(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
360✔
1141
  int32_t code = TSDB_CODE_SUCCESS;
360✔
1142

1143
  if (QUERY_NODE_DELETE_STMT == type) {
360✔
1144
    SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
60✔
1145
    return setDeleteStmtAuditReqTableInfo(pStmt, pReq);
60✔
1146
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
300✔
1147
    return setModifyStmtAuditReqTableInfo((SVnodeModifyOpStmt*)pRequest->pQuery->pRoot, pReq);
120✔
1148
  } else if (QUERY_NODE_SELECT_STMT == type) {
180✔
1149
    return setSelectStmtAuditReqTableInfo((SSelectStmt*)pRequest->pQuery->pRoot, pReq);
180✔
1150
  } else if (QUERY_NODE_SET_OPERATOR == type) {
×
1151
    return setOperatorTableInfo((SSetOperator*)pRequest->pQuery->pRoot, pReq);
×
1152
  }
1153
  tscError("[report]unsupported report type: %s", nodesNodeName(type));
×
1154
  return code;
×
1155
}
1156

1157
static void setAuditReqAffectedRows(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
360✔
1158
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type) {
360✔
1159
    pReq->affectedRows = pRequest->body.resInfo.numOfRows;
180✔
1160
  } else if (QUERY_NODE_SELECT_STMT == type || QUERY_NODE_SET_OPERATOR == type) {
180✔
1161
    pReq->affectedRows = 0;
180✔
1162
  }
1163
}
360✔
1164

1165
static void setAuditReqOperation(SRequestObj* pRequest, ENodeType type, SAuditReq* pReq) {
300✔
1166
  if (QUERY_NODE_DELETE_STMT == type) {
300✔
1167
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "delete"));
60✔
1168
  } else if (QUERY_NODE_VNODE_MODIFY_STMT == type) {
240✔
1169
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "insert"));
60✔
1170
  } else if (QUERY_NODE_SELECT_STMT == type) {
180✔
1171
    TAOS_UNUSED(tsnprintf(pReq->operation, AUDIT_OPERATION_LEN, "select"));
180✔
1172
  }
1173
}
300✔
1174

1175
static bool needSendReport(SAppInstServerCFG* pCfg, ENodeType type) {
714,117,166✔
1176
  if (pCfg->auditLevel < AUDIT_LEVEL_DATA) {
714,117,166✔
1177
    return false;
714,107,402✔
1178
  }
1179
  if (type == QUERY_NODE_SELECT_STMT) {
900✔
1180
    return pCfg->enableAuditSelect != 0;
180✔
1181
  } else if (type == QUERY_NODE_DELETE_STMT) {
720✔
1182
    return pCfg->enableAuditDelete != 0;
60✔
1183
  } else if (type == QUERY_NODE_VNODE_MODIFY_STMT) {
660✔
1184
    return pCfg->enableAuditInsert != 0;
120✔
1185
  } else if (type == QUERY_NODE_SET_OPERATOR) {
540✔
1186
    return pCfg->enableAuditSelect != 0;
×
1187
  }
1188
  tscError("[report] unsupported report type: %s", nodesNodeName(type));
540✔
1189

1190
  return false;
540✔
1191
}
1192

1193
static void reportSqlExecResult(SRequestObj* pRequest, ENodeType type) {
722,040,475✔
1194
  int32_t  code = TSDB_CODE_SUCCESS;
722,040,475✔
1195
  STscObj* pTscObj = pRequest->pTscObj;
722,040,475✔
1196

1197
  if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
722,044,852✔
1198
    tscError("[report][%s] invalid tsc obj", nodesNodeName(type));
9,437✔
1199
    return;
27,130,114✔
1200
  }
1201
  if (pRequest->code != TSDB_CODE_SUCCESS) {
722,037,358✔
1202
    tscDebug("[report][%s] request result code:%d, skip audit", nodesNodeName(type), pRequest->code);
7,923,127✔
1203
    return;
7,923,127✔
1204
  }
1205

1206
  if (!needSendReport(&pTscObj->pAppInfo->serverCfg, type)) {
714,115,923✔
1207
    tscTrace("[report][%s] audit is disabled", nodesNodeName(type));
714,106,119✔
1208
    return;
714,107,931✔
1209
  }
1210

1211
  SAuditReq req;
360✔
1212
  req.pSql = pRequest->sqlstr;
360✔
1213
  req.sqlLen = pRequest->sqlLen;
360✔
1214
  setAuditReqAffectedRows(pRequest, type, &req);
360✔
1215
  code = setAuditReqTableInfo(pRequest, type, &req);
360✔
1216
  if (code == TSDB_CODE_TSC_INVALID_OPERATION) {
360✔
1217
    return;
60✔
1218
  } else if (code != TSDB_CODE_SUCCESS) {
300✔
1219
    tscError("[report][%s] failed to set audit req table info, code:%d", nodesNodeName(type), code);
×
1220
    return;
×
1221
  }
1222
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
300✔
1223
  req.duration = duration / 1000000.0;  // convert to seconds
300✔
1224
  setAuditReqOperation(pRequest, type, &req);
300✔
1225

1226
  int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
300✔
1227
  void*   pReq = taosMemoryCalloc(1, tlen);
300✔
1228
  if (pReq == NULL) {
300✔
1229
    tscError("[report][%s] failed to allocate memory for req", nodesNodeName(type));
×
1230
    return;
×
1231
  }
1232

1233
  if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
300✔
1234
    tscError("[report][%s] failed to serialize req", nodesNodeName(type));
×
1235
    taosMemoryFree(pReq);
×
1236
    return;
×
1237
  }
1238

1239
  code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
300✔
1240
  if (code != 0) {
300✔
1241
    tscError("[report][%s] failed to send audit info, code:%d", nodesNodeName(type), code);
×
1242
    taosMemoryFree(pReq);
×
1243
    return;
×
1244
  }
1245
  tscDebug("[report][%s] data, sql:%s", nodesNodeName(type), req.pSql);
300✔
1246
}
1247

1248
void clientOperateReport(SRequestObj* pRequest) {
774,097,991✔
1249
  if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
774,097,991✔
1250
    tscDebug("[report] invalid request");
52,050,612✔
1251
    return;
52,055,945✔
1252
  }
1253
  ENodeType type = nodeType(pRequest->pQuery->pRoot);
722,046,476✔
1254
  if (QUERY_NODE_DELETE_STMT == type || QUERY_NODE_SELECT_STMT == type || QUERY_NODE_VNODE_MODIFY_STMT == type ||
1255
      QUERY_NODE_SET_OPERATOR) {
1256
    reportSqlExecResult(pRequest, type);
722,042,112✔
1257
  }
1258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc