• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.76
/source/client/src/clientMonitor.c
1
#include "clientMonitor.h"
2
#include "cJSON.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5
#include "tmisce.h"
6
#include "tqueue.h"
7
#include "ttime.h"
8
#include "ttimer.h"
9

10
SRWLatch    monitorLock;
11
void*       monitorTimer;
12
SHashObj*   monitorCounterHash;
13
int32_t     monitorFlag = 0;
14
int32_t     quitCnt = 0;
15
tsem2_t     monitorSem;
16
STaosQueue* monitorQueue;
17
SHashObj*   monitorSlowLogHash;
18
char        tmpSlowLogPath[PATH_MAX] = {0};
19
TdThread    monitorThread;
20
extern bool tsEnableAuditDelete;
21

22
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
17,109✔
23
  int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
17,109✔
24
  if (ret < 0) {
17,109!
25
    tscError("failed to get tmp path ret:%d", ret);
×
26
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
27
  }
28
  return 0;
17,109✔
29
}
30

31
static void processFileInTheEnd(TdFilePtr pFile, char* path) {
21✔
32
  if (pFile == NULL) {
21✔
33
    return;
6✔
34
  }
35
  if (taosFtruncateFile(pFile, 0) != 0) {
15!
36
    tscError("failed to truncate file:%s, terrno:%d", path, terrno);
×
37
    return;
×
38
  }
39
  if (taosUnLockFile(pFile) != 0) {
15!
40
    tscError("failed to unlock file:%s, terrno:%d", path, terrno);
×
41
    return;
×
42
  }
43
  if (taosCloseFile(&(pFile)) != 0) {
15!
44
    tscError("failed to close file:%s, terrno:%d", path, terrno);
×
45
    return;
×
46
  }
47
  if (taosRemoveFile(path) != 0) {
15!
48
    tscError("failed to remove file:%s, terrno:%d", path, terrno);
×
49
    return;
×
50
  }
51
}
52

53
static void destroySlowLogClient(void* data) {
13✔
54
  if (data == NULL) {
13!
55
    return;
×
56
  }
57
  SlowLogClient* slowLogClient = *(SlowLogClient**)data;
13✔
58
  processFileInTheEnd(slowLogClient->pFile, slowLogClient->path);
13✔
59
  taosMemoryFree(slowLogClient);
13!
60
}
61

62
static void destroyMonitorClient(void* data) {
15,666✔
63
  if (data == NULL) {
15,666!
64
    return;
×
65
  }
66
  MonitorClient* pMonitor = *(MonitorClient**)data;
15,666✔
67
  if (pMonitor == NULL) {
15,666!
68
    return;
×
69
  }
70
  if (!taosTmrStopA(&pMonitor->timer)) {
15,666✔
71
    tscError("failed to stop timer, pMonitor:%p", pMonitor);
531!
72
  }
73
  taosHashCleanup(pMonitor->counters);
15,666✔
74
  int ret = taos_collector_registry_destroy(pMonitor->registry);
15,666✔
75
  if (ret) {
15,666!
76
    tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
×
77
  }
78
  taosMemoryFree(pMonitor);
15,666!
79
}
80

81
static void monitorFreeSlowLogData(void* paras) {
16,553✔
82
  MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
16,553✔
83
  if (pData == NULL) {
16,553✔
84
    return;
393✔
85
  }
86
  taosMemoryFreeClear(pData->data);
16,160!
87
  if (pData->type == SLOW_LOG_READ_BEGINNIG) {
16,160✔
88
    taosMemoryFree(pData->fileName);
16,059!
89
  }
90
}
91

92
static void monitorFreeSlowLogDataEx(void* paras) {
898✔
93
  monitorFreeSlowLogData(paras);
898✔
94
  taosMemoryFree(paras);
898!
95
}
898✔
96

97
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
47,801✔
98
  void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
47,801✔
99
  if (p == NULL) {
47,800✔
100
    tscError("failed to get app inst, clusterId:0x%" PRIx64, clusterId);
13!
101
    return NULL;
13✔
102
  }
103
  return *(SAppInstInfo**)p;
47,787✔
104
}
105

106
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
840✔
107
  if (TSDB_CODE_SUCCESS != code) {
840✔
108
    tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
30!
109
  }
110
  if (pMsg) {
840!
111
    taosMemoryFree(pMsg->pData);
840!
112
    taosMemoryFree(pMsg->pEpSet);
840!
113
  }
114
  if (param != NULL) {
840✔
115
    MonitorSlowLogData* p = (MonitorSlowLogData*)param;
501✔
116
    if (code != 0) {
501✔
117
      tscError("failed to send slow log:%s, clusterId:0x%" PRIx64, p->data, p->clusterId);
5!
118
    }
119
    MonitorSlowLogData tmp = {.clusterId = p->clusterId,
501✔
120
                              .type = p->type,
501✔
121
                              .fileName = p->fileName,
501✔
122
                              .pFile = p->pFile,
501✔
123
                              .offset = p->offset,
501✔
124
                              .data = NULL};
125
    if (monitorPutData2MonitorQueue(tmp) == 0) {
501✔
126
      p->fileName = NULL;
30✔
127
    } else {
128
      if (taosCloseFile(&(p->pFile)) != 0) {
471!
129
        tscError("failed to close file:%p", p->pFile);
×
130
      }
131
    }
132
  }
133
  return TSDB_CODE_SUCCESS;
840✔
134
}
135

136
static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) {
899✔
137
  SStatisReq sStatisReq;
138
  sStatisReq.pCont = pCont;
899✔
139
  sStatisReq.contLen = strlen(pCont);
899✔
140
  sStatisReq.type = type;
899✔
141

142
  int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
899✔
143
  if (tlen < 0) {
899!
144
    goto FAILED;
×
145
  }
146
  void* buf = taosMemoryMalloc(tlen);
899!
147
  if (buf == NULL) {
899!
148
    tscError("sendReport failed, out of memory, len:%d", tlen);
×
149
    goto FAILED;
×
150
  }
151
  tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
899✔
152
  if (tlen < 0) {
899!
153
    taosMemoryFree(buf);
×
154
    goto FAILED;
×
155
  }
156

157
  SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
899!
158
  if (pInfo == NULL) {
899!
159
    tscError("sendReport failed, out of memory send info");
×
160
    taosMemoryFree(buf);
×
161
    goto FAILED;
×
162
  }
163
  pInfo->fp = monitorReportAsyncCB;
899✔
164
  pInfo->msgInfo.pData = buf;
899✔
165
  pInfo->msgInfo.len = tlen;
899✔
166
  pInfo->msgType = TDMT_MND_STATIS;
899✔
167
  pInfo->param = param;
899✔
168
  pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
899✔
169
  pInfo->requestId = tGenIdPI64();
899✔
170
  pInfo->requestObjRefId = 0;
899✔
171

172
  // int64_t transporterId = 0;
173
  return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
899✔
174

175
FAILED:
×
176
  if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
×
177
    tscError("failed to close file:%p", ((MonitorSlowLogData*)param)->pFile);
×
178
  }
179
  monitorFreeSlowLogDataEx(param);
×
180
  return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
×
181
}
182

183
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
1,565✔
184
  char ts[50] = {0};
1,565!
185
  (void)snprintf(ts, sizeof(ts), "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
1,565✔
186
  char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
1,565✔
187
  if (NULL == pCont) {
1,565✔
188
    tscError("generateClusterReport failed, get null content.");
1,172!
189
    return;
1,172✔
190
  }
191

192
  if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
393!
193
    int ret = taos_collector_registry_clear_batch(registry);
393✔
194
    if (ret) {
393!
195
      tscError("failed to clear registry, ret:%d", ret);
×
196
    }
197
  }
198
  taosMemoryFreeClear(pCont);
393!
199
}
200

201
static void reportSendProcess(void* param, void* tmrId) {
1,573✔
202
  taosRLockLatch(&monitorLock);
1,573✔
203
  if (atomic_load_32(&monitorFlag) == 1) {
1,573✔
204
    taosRUnLockLatch(&monitorLock);
8✔
205
    return;
8✔
206
  }
207

208
  MonitorClient* pMonitor = (MonitorClient*)param;
1,565✔
209
  SAppInstInfo*  pInst = getAppInstByClusterId(pMonitor->clusterId);
1,565✔
210
  if (pInst == NULL) {
1,565!
211
    taosRUnLockLatch(&monitorLock);
×
212
    return;
×
213
  }
214

215
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
1,565✔
216
  generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
1,565✔
217
  bool reset =
218
      taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
1,565✔
219
  tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
1,565✔
220
  taosRUnLockLatch(&monitorLock);
1,565✔
221
}
222

223
static void sendAllCounter() {
15,719✔
224
  MonitorClient** ppMonitor = NULL;
15,719✔
225
  while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) {
15,719✔
226
    MonitorClient* pMonitor = *ppMonitor;
13✔
227
    if (pMonitor == NULL) {
13!
228
      continue;
×
229
    }
230
    SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
13✔
231
    if (pInst == NULL) {
13!
232
      taosHashCancelIterate(monitorCounterHash, ppMonitor);
13✔
233
      break;
13✔
234
    }
235
    SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
236
    generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
×
237
  }
238
}
15,719✔
239

240
void monitorCreateClient(int64_t clusterId) {
31,332✔
241
  MonitorClient* pMonitor = NULL;
31,332✔
242
  taosWLockLatch(&monitorLock);
31,332✔
243
  if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
31,332✔
244
    tscInfo("clusterId:0x%" PRIx64 ", create monitor", clusterId);
15,666!
245
    pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
15,666!
246
    if (pMonitor == NULL) {
15,666!
247
      tscError("failed to create monitor client");
×
248
      goto fail;
×
249
    }
250
    pMonitor->clusterId = clusterId;
15,666✔
251
    char clusterKey[32] = {0};
15,666✔
252
    if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) {
15,666!
253
      tscError("failed to create cluster key");
×
254
      goto fail;
×
255
    }
256
    pMonitor->registry = taos_collector_registry_new(clusterKey);
15,666✔
257
    if (pMonitor->registry == NULL) {
15,666!
258
      tscError("failed to create registry");
×
259
      goto fail;
×
260
    }
261
    pMonitor->colector = taos_collector_new(clusterKey);
15,666✔
262
    if (pMonitor->colector == NULL) {
15,666!
263
      tscError("failed to create collector");
×
264
      goto fail;
×
265
    }
266

267
    int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
15,666✔
268
    if (r) {
15,666!
269
      tscError("failed to register collector, ret:%d", r);
×
270
      goto fail;
×
271
    }
272
    pMonitor->counters =
31,332✔
273
        (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
15,666✔
274
    if (pMonitor->counters == NULL) {
15,666!
275
      tscError("failed to create monitor counters");
×
276
      goto fail;
×
277
    }
278

279
    if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
15,666!
280
      tscError("failed to put monitor client to hash");
×
281
      goto fail;
×
282
    }
283

284
    SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
15,666✔
285
    if (pInst == NULL) {
15,666!
286
      tscError("failed to get app instance by cluster id");
×
287
      pMonitor = NULL;
×
288
      goto fail;
×
289
    }
290
    pMonitor->timer =
31,332✔
291
        taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
15,666✔
292
    if (pMonitor->timer == NULL) {
15,666!
293
      tscError("failed to start timer");
×
294
      goto fail;
×
295
    }
296
    tscInfo("clusterId:0x%" PRIx64 ", create monitor finished, monitor:%p", clusterId, pMonitor);
15,666!
297
  }
298
  taosWUnLockLatch(&monitorLock);
31,332✔
299

300
  return;
31,332✔
301

302
fail:
×
303
  destroyMonitorClient(&pMonitor);
×
304
  taosWUnLockLatch(&monitorLock);
×
305
}
306

307
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count,
31,332✔
308
                                const char** label_keys) {
309
  taosWLockLatch(&monitorLock);
31,332✔
310
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
31,332✔
311
  if (ppMonitor == NULL || *ppMonitor == NULL) {
31,332!
312
    tscError("failed to get monitor client");
×
313
    goto end;
×
314
  }
315
  taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
31,332✔
316
  if (newCounter == NULL) return;
31,332!
317
  MonitorClient* pMonitor = *ppMonitor;
31,332✔
318
  if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
31,332!
319
    tscError("failed to add metric to collector");
×
320
    int r = taos_counter_destroy(newCounter);
×
321
    if (r) {
×
322
      tscError("failed to destroy counter, code:%d", r);
×
323
    }
324
    goto end;
×
325
  }
326
  if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
31,332!
327
    tscError("failed to put counter to monitor");
×
328
    int r = taos_counter_destroy(newCounter);
×
329
    if (r) {
×
330
      tscError("failed to destroy counter, code:%d", r);
×
331
    }
332
    goto end;
×
333
  }
334
  tscInfo("clusterId:0x%" PRIx64 ", monitor:%p, create counter:%s %p", pMonitor->clusterId, pMonitor, name,
31,332!
335
          newCounter);
336

337
end:
×
338
  taosWUnLockLatch(&monitorLock);
31,332✔
339
}
340

341
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
8,527,342✔
342
  taosWLockLatch(&monitorLock);
8,527,342✔
343
  if (atomic_load_32(&monitorFlag) == 1) {
8,549,414!
344
    taosWUnLockLatch(&monitorLock);
×
345
    return;
×
346
  }
347

348
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
8,549,414✔
349
  if (ppMonitor == NULL || *ppMonitor == NULL) {
8,549,414!
350
    tscError("clusterId:0x%" PRIx64 ", monitor not found", clusterId);
×
351
    goto end;
×
352
  }
353

354
  MonitorClient*   pMonitor = *ppMonitor;
8,549,414✔
355
  taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
8,549,414✔
356
  if (ppCounter == NULL || *ppCounter == NULL) {
8,549,414!
357
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s not found", clusterId, pMonitor, counterName);
×
358
    goto end;
×
359
  }
360
  if (taos_counter_inc(*ppCounter, label_values) != 0) {
8,549,414!
361
    tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s inc failed", clusterId, pMonitor, counterName);
×
362
    goto end;
×
363
  }
364
  tscDebug("clusterId:0x%" PRIx64 ", monitor:%p, counter:%s inc", pMonitor->clusterId, pMonitor, counterName);
8,549,414✔
365

366
end:
8,084,862✔
367
  taosWUnLockLatch(&monitorLock);
8,549,414✔
368
}
369

370
const char* monitorResultStr(SQL_RESULT_CODE code) {
8,525,560✔
371
  static const char* result_state[] = {"Success", "Failed", "Cancel"};
372
  return result_state[code];
8,525,560✔
373
}
374

375
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
41✔
376
  TdFilePtr pFile = NULL;
41✔
377
  void*     tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
41✔
378
  if (tmp == NULL) {
41✔
379
    char path[PATH_MAX] = {0};
13✔
380
    char clusterId[32] = {0};
13✔
381
    if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0) {
13!
382
      tscError("failed to generate clusterId:0x%" PRIx64, slowLogData->clusterId);
×
383
      return;
×
384
    }
385
    taosGetTmpfilePath(tmpPath, clusterId, path);
13✔
386
    tscInfo("monitor create slow log file:%s", path);
13!
387
    pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
13✔
388
    if (pFile == NULL) {
13!
389
      tscError("failed to open file:%s since %d", path, terrno);
×
390
      return;
×
391
    }
392

393
    SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
13!
394
    if (pClient == NULL) {
13!
395
      tscError("failed to allocate memory for slow log client");
×
396
      int32_t ret = taosCloseFile(&pFile);
×
397
      if (ret != 0) {
×
398
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
399
      }
400
      return;
×
401
    }
402
    pClient->lastCheckTime = taosGetMonoTimestampMs();
13✔
403
    tstrncpy(pClient->path, path, PATH_MAX);
13✔
404
    pClient->offset = 0;
13✔
405
    pClient->pFile = pFile;
13✔
406
    if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
13!
407
      tscError("failed to put clusterId:0x%" PRIx64 " to hash table", slowLogData->clusterId);
×
408
      int32_t ret = taosCloseFile(&pFile);
×
409
      if (ret != 0) {
×
410
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
411
      }
412
      taosMemoryFree(pClient);
×
413
      return;
×
414
    }
415

416
    if (taosLockFile(pFile) < 0) {
13!
417
      tscError("failed to lock file:%p since %s", pFile, terrstr());
×
418
      return;
×
419
    }
420
  } else {
421
    pFile = (*(SlowLogClient**)tmp)->pFile;
28✔
422
  }
423

424
  if (taosLSeekFile(pFile, 0, SEEK_END) < 0) {
41!
425
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
426
    return;
×
427
  }
428
  if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) {
41!
429
    tscError("failed to write len to file:%p since %s", pFile, terrstr());
×
430
  }
431
  tscDebug("monitor write slow log to file:%p, clusterId:0x%" PRIx64, pFile, slowLogData->clusterId);
41!
432
}
433

434
static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
506✔
435
  tscDebug("monitor readFile slow begin pFile:%p, offset:%" PRId64 ", size:%" PRId64, pFile, *offset, size);
506✔
436
  if (taosLSeekFile(pFile, *offset, SEEK_SET) < 0) {
506!
437
    tscError("failed to seek file:%p code:%d", pFile, terrno);
×
438
    return NULL;
×
439
  }
440

441
  if ((size <= *offset)) {
506!
442
    tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
×
443
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
444
    return NULL;
×
445
  }
446
  char*   pCont = NULL;
506✔
447
  int64_t totalSize = 0;
506✔
448
  if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
506!
449
    totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX;
×
450
  } else {
451
    totalSize = 4 + (size - *offset);
506✔
452
  }
453

454
  pCont = taosMemoryCalloc(1, totalSize);  // 4 reserved for []
506!
455
  if (pCont == NULL) {
506!
456
    tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
×
457
    return NULL;
×
458
  }
459
  char* buf = pCont;
506✔
460
  (void)strncat(buf++, "[", totalSize - 1);
506✔
461
  int64_t readSize = taosReadFile(pFile, buf, totalSize - 4); // 4 reserved for []
506✔
462
  if (readSize <= 0) {
506!
463
    if (readSize < 0) {
×
464
      tscError("failed to read len from file:%p since %s", pFile, terrstr());
×
465
    }
466
    taosMemoryFree(pCont);
×
467
    return NULL;
×
468
  }
469

470
  totalSize = 0;
506✔
471
  while (1) {
1,012✔
472
    size_t len = strlen(buf);
1,518✔
473
    totalSize += (len + 1);
1,518✔
474
    if (totalSize > readSize || len == 0) {
1,518!
475
      *(buf - 1) = ']';
506✔
476
      *buf = '\0';
506✔
477
      break;
506✔
478
    }
479
    buf[len] = ',';  // replace '\0' with ','
1,012✔
480
    buf += (len + 1);
1,012✔
481
    *offset += (len + 1);
1,012✔
482
  }
483

484
  tscDebug("monitor readFile slow log end, data:%s, offset:%" PRId64, pCont, *offset);
506✔
485
  return pCont;
506✔
486
}
487

488
static int64_t getFileSize(char* path) {
554✔
489
  int64_t fileSize = 0;
554✔
490
  if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
554!
491
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
492
  }
493

494
  return fileSize;
554✔
495
}
496

497
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type,
506✔
498
                           char* fileName, void* pTransporter, SEpSet* epSet) {
499
  if (data == NULL) {
506!
500
    if (taosCloseFile(&pFile) != 0) {
×
501
      tscError("failed to close file:%p", pFile);
×
502
    }
503
    taosMemoryFree(fileName);
×
504
    return TSDB_CODE_INVALID_PARA;
×
505
  }
506
  MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
506!
507
  if (pParam == NULL) {
506!
508
    if (taosCloseFile(&pFile) != 0) {
×
509
      tscError("failed to close file:%p", pFile);
×
510
    }
511
    taosMemoryFree(data);
×
512
    taosMemoryFree(fileName);
×
513
    return terrno;
×
514
  }
515
  pParam->data = data;
506✔
516
  pParam->offset = offset;
506✔
517
  pParam->clusterId = clusterId;
506✔
518
  pParam->type = type;
506✔
519
  pParam->pFile = pFile;
506✔
520
  pParam->fileName = fileName;
506✔
521
  return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
506✔
522
}
523

524
static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size,
506✔
525
                               SLOW_LOG_QUEUE_TYPE type, char* fileName) {
526
  SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
506✔
527
  if (pInst == NULL) {
506!
528
    tscError("failed to get app instance by clusterId:0x%" PRIx64, clusterId);
×
529
    if (taosCloseFile(&pFile) != 0) {
×
530
      tscError("failed to close file:%p", pFile);
×
531
    }
532
    taosMemoryFree(fileName);
×
533
    return terrno;
×
534
  }
535
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
506✔
536
  char*  data = readFile(pFile, offset, size);
506✔
537
  if (data == NULL) return terrno;
506!
538
  return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
506✔
539
                     pInst->pTransporter, &ep);
540
}
541

542
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
474✔
543
  if (fileName == NULL) {
474!
544
    return;
×
545
  }
546
  int64_t size = getFileSize(*fileName);
474✔
547
  if (size <= offset) {
474✔
548
    processFileInTheEnd(pFile, *fileName);
2✔
549
    tscDebug("monitor delete file:%s", *fileName);
2!
550
  } else {
551
    int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
472✔
552
    if (code == 0) {
472!
553
      tscDebug("monitor send slow log succ, clusterId:0x%" PRIx64, clusterId);
472!
554
    } else {
555
      tscError("monitor send slow log failed, clusterId:0x%" PRIx64 ", ret:%d", clusterId,
×
556
               code);
557
    }
558
    *fileName = NULL;
472✔
559
  }
560
}
561

562
static void monitorSendSlowLogAtRunning(int64_t clusterId) {
26✔
563
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
26✔
564
  if (tmp == NULL) {
26!
565
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
566
    return;
×
567
  }
568
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
26✔
569
  if (pClient == NULL) {
26!
570
    tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
×
571
    return;
×
572
  }
573
  int64_t size = getFileSize(pClient->path);
26✔
574
  if (size <= pClient->offset) {
26!
575
    if (taosFtruncateFile(pClient->pFile, 0) < 0) {
26!
576
      tscError("failed to truncate file:%p code:%d", pClient->pFile, terrno);
×
577
    }
578
    tscDebug("monitor truncate file to 0 file:%p", pClient->pFile);
26!
579
    pClient->offset = 0;
26✔
580
  } else {
UNCOV
581
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
×
UNCOV
582
    tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
×
583
  }
584
}
585

586
static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
×
587
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
588
  if (tmp == NULL) {
×
589
    return true;
×
590
  }
591
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
592
  if (pClient == NULL) {
×
593
    return true;
×
594
  }
595
  int64_t size = getFileSize(pClient->path);
×
596
  if (size <= pClient->offset) {
×
597
    processFileInTheEnd(pClient->pFile, pClient->path);
×
598
    pClient->pFile = NULL;
×
599
    tscInfo("monitor remove file:%s", pClient->path);
×
600
    if ((--quitCnt) == 0) {
×
601
      return true;
×
602
    }
603
  } else {
604
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
×
605
    tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
×
606
  }
607
  return false;
×
608
}
609
static void monitorSendAllSlowLogAtQuit() {
15,718✔
610
  void* pIter = NULL;
15,718✔
611
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
15,731✔
612
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
13✔
613
    if (pClient == NULL) {
13!
614
      continue;
×
615
    }
616
    int64_t size = getFileSize(pClient->path);
13✔
617
    if (size <= pClient->offset) {
13✔
618
      processFileInTheEnd(pClient->pFile, pClient->path);
6✔
619
      pClient->pFile = NULL;
6✔
620
    } else if (pClient->offset == 0) {
7!
621
      int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
7✔
622
      int32_t  code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
7✔
623
      tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
7!
624
      if (code == 0) {
7!
625
        quitCnt++;
7✔
626
      }
627
    }
628
  }
629
}
15,718✔
630

631
static void processFileRemoved(SlowLogClient* pClient) {
×
632
  if (taosUnLockFile(pClient->pFile) != 0) {
×
633
    tscError("failed to unlock file:%s since %d", pClient->path, terrno);
×
634
    return;
×
635
  }
636
  int32_t ret = taosCloseFile(&(pClient->pFile));
×
637
  if (ret != 0) {
×
638
    tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
×
639
    return;
×
640
  }
641

642
  TdFilePtr pFile =
643
      taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
644
  if (pFile == NULL) {
×
645
    tscError("failed to open file:%s since %d", pClient->path, terrno);
×
646
  } else {
647
    pClient->pFile = pFile;
×
648
  }
649
}
650

651
static void monitorSendAllSlowLog() {
731,097✔
652
  int64_t t = taosGetMonoTimestampMs();
731,097✔
653
  void*   pIter = NULL;
731,097✔
654
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
745,560✔
655
    int64_t*       clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
14,463✔
656
    SAppInstInfo*  pInst = getAppInstByClusterId(*clusterId);
14,463✔
657
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
14,463✔
658
    if (pClient == NULL || pInst == NULL) {
14,463!
659
      taosHashCancelIterate(monitorSlowLogHash, pIter);
×
660
      return;
×
661
    }
662
    if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) {
14,463✔
663
      pClient->lastCheckTime = t;
41✔
664
    } else {
665
      continue;
14,422✔
666
    }
667

668
    if (pClient->offset == 0) {
41!
669
      int64_t size = getFileSize(pClient->path);
41✔
670
      if (size <= 0) {
41✔
671
        if (size < 0) {
14!
672
          tscError("monitor failed to get file size:%s, err:%d", pClient->path, ERRNO);
×
673
          if (ERRNO == ENOENT) {
×
674
            processFileRemoved(pClient);
×
675
          }
676
        }
677
        continue;
14✔
678
      }
679
      int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
27✔
680
      tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
27!
681
    }
682
  }
683
}
684

685
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
15,588✔
686
  SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
15,588✔
687

688
  if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
15,588!
689
    tscInfo("monitor is disabled, skip send slow log");
14,197!
690
    return;
14,197✔
691
  }
692
  char namePrefix[PATH_MAX] = {0};
1,391✔
693
  if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
1,391!
694
    tscError("failed to generate slow log file name prefix");
×
695
    return;
×
696
  }
697

698
  char tmpPath[PATH_MAX] = {0};
1,391✔
699
  if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
1,391!
700
    return;
×
701
  }
702

703
  TdDirPtr pDir = taosOpenDir(tmpPath);
1,391✔
704
  if (pDir == NULL) {
1,391!
705
    return;
×
706
  }
707

708
  TdDirEntryPtr de = NULL;
1,391✔
709
  while ((de = taosReadDir(pDir)) != NULL) {
1,400,456✔
710
    if (taosDirEntryIsDir(de)) {
1,399,065✔
711
      continue;
1,398,591✔
712
    }
713

714
    char* name = taosGetDirEntryName(de);
1,396,283✔
715
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) {
1,396,283!
716
      tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId);
1,387,962!
717
      continue;
1,387,962✔
718
    }
719

720
    char filename[PATH_MAX] = {0};
8,321✔
721
    (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
8,321✔
722
    TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
8,321✔
723
    if (pFile == NULL) {
8,321!
724
      tscError("failed to open file:%s since %s", filename, terrstr());
×
725
      continue;
×
726
    }
727
    if (taosLockFile(pFile) < 0) {
8,321✔
728
      tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
7,847!
729
      int32_t ret = taosCloseFile(&pFile);
7,847✔
730
      if (ret != 0) {
7,847!
731
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
732
      }
733
      continue;
7,847✔
734
    }
735
    char* tmp = taosStrdup(filename);
474!
736
    if (tmp == NULL) {
474!
737
      tscError("failed to dup string:%s since %s", filename, terrstr());
×
738
      if (taosUnLockFile(pFile) != 0) {
×
739
        tscError("failed to unlock file:%s, terrno:%d", filename, terrno);
×
740
      }
741
      if (taosCloseFile(&(pFile)) != 0) {
×
742
        tscError("failed to close file:%s, terrno:%d", filename, terrno);
×
743
      }
744
      continue;
×
745
    }
746
    monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
474✔
747
    taosMemoryFree(tmp);
474!
748
  }
749

750
  int32_t ret = taosCloseDir(&pDir);
1,391✔
751
  if (ret != 0) {
1,391!
752
    tscError("failed to close dir, ret:%d", ret);
×
753
  }
754
}
755

756
static void* monitorThreadFunc(void* param) {
15,718✔
757
  setThreadName("client-monitor-slowlog");
15,718✔
758
  tscInfo("monitor update thread started");
15,718!
759
  int64_t quitTime = 0;
15,718✔
760
  while (1) {
731,135✔
761
    if (atomic_load_32(&monitorFlag) == 1) {
746,853✔
762
      if (quitCnt == 0) {
15,756✔
763
        monitorSendAllSlowLogAtQuit();
15,718✔
764
        if (quitCnt == 0) {
15,718✔
765
          tscInfo("monitorThreadFunc quit since no slow log to send");
15,711!
766
          break;
15,711✔
767
        }
768
        quitTime = taosGetMonoTimestampMs();
7✔
769
      }
770
      if (taosGetMonoTimestampMs() - quitTime > 500) {  // quit at most 500ms
45✔
771
        tscInfo("monitorThreadFunc quit since timeout");
7!
772
        break;
7✔
773
      }
774
    }
775

776
    MonitorSlowLogData* slowLogData = NULL;
731,135✔
777
    taosReadQitem(monitorQueue, (void**)&slowLogData);
731,135✔
778
    if (slowLogData != NULL) {
731,135✔
779
      if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
15,655!
780
        if (slowLogData->pFile != NULL) {
15,588!
UNCOV
781
          monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile,
×
UNCOV
782
                                        slowLogData->offset);
×
783
        } else {
784
          monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
15,588✔
785
        }
786
      } else if (slowLogData->type == SLOW_LOG_WRITE) {
67✔
787
        monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
41✔
788
      } else if (slowLogData->type == SLOW_LOG_READ_RUNNING) {
26!
789
        monitorSendSlowLogAtRunning(slowLogData->clusterId);
26✔
790
      } else if (slowLogData->type == SLOW_LOG_READ_QUIT) {
×
791
        if (monitorSendSlowLogAtQuit(slowLogData->clusterId)) {
×
792
          tscInfo("monitorThreadFunc quit since all slow log sended");
×
793
          monitorFreeSlowLogData(slowLogData);
×
794
          taosFreeQitem(slowLogData);
×
795
          break;
×
796
        }
797
      }
798
      monitorFreeSlowLogData(slowLogData);
15,655✔
799
      taosFreeQitem(slowLogData);
15,655✔
800
    }
801

802
    if (quitCnt == 0) {
731,135✔
803
      monitorSendAllSlowLog();
731,097✔
804
    }
805
    (void)tsem2_timewait(&monitorSem, 100);
731,135✔
806
  }
807
  return NULL;
15,718✔
808
}
809

810
static int32_t tscMonitortInit() {
15,718✔
811
  TdThreadAttr thAttr;
812
  if (taosThreadAttrInit(&thAttr) != 0) {
15,718!
813
    tscError("failed to init thread attr since %s", strerror(ERRNO));
×
814
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
815
  }
816
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
15,718!
817
    tscError("failed to set thread attr since %s", strerror(ERRNO));
×
818
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
819
  }
820

821
  if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
15,718!
822
    tscError("failed to create monitor thread since %s", strerror(ERRNO));
×
823
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
824
  }
825

826
  (void)taosThreadAttrDestroy(&thAttr);
15,718✔
827
  return 0;
15,718✔
828
}
829

830
static void tscMonitorStop() {
15,719✔
831
  if (taosCheckPthreadValid(monitorThread)) {
15,719✔
832
    (void)taosThreadJoin(monitorThread, NULL);
15,718✔
833
    taosThreadClear(&monitorThread);
15,718✔
834
  }
835
}
15,719✔
836

837
int32_t monitorInit() {
15,718✔
838
  int32_t code = 0;
15,718✔
839

840
  tscInfo("monitor init");
15,718!
841
  monitorCounterHash =
15,718✔
842
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
15,718✔
843
  if (monitorCounterHash == NULL) {
15,718!
844
    tscError("failed to create monitorCounterHash");
×
845
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
846
  }
847
  taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
15,718✔
848

849
  monitorSlowLogHash =
15,718✔
850
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
15,718✔
851
  if (monitorSlowLogHash == NULL) {
15,718!
852
    tscError("failed to create monitorSlowLogHash");
×
853
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
854
  }
855
  taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
15,718✔
856

857
  monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
15,718✔
858
  if (monitorTimer == NULL) {
15,718!
859
    tscError("failed to create monitor timer");
×
860
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
861
  }
862

863
  code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
15,718✔
864
  if (code != 0) {
15,718!
865
    return code;
×
866
  }
867

868
  code = taosMulModeMkDir(tmpSlowLogPath, 0777, true);
15,718✔
869
  if (code != 0) {
15,718!
870
    tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
×
871
    return code;
×
872
  }
873

874
  if (tsem2_init(&monitorSem, 0, 0) != 0) {
15,718!
875
    tscError("sem init error since %s", terrstr());
×
876
    return TAOS_SYSTEM_ERROR(ERRNO);
×
877
  }
878

879
  code = taosOpenQueue(&monitorQueue);
15,718✔
880
  if (code) {
15,718!
881
    tscError("open queue error since %s", terrstr());
×
882
    return TAOS_GET_TERRNO(code);
×
883
  }
884

885
  taosInitRWLatch(&monitorLock);
15,718✔
886
  return tscMonitortInit();
15,718✔
887
}
888

889
void monitorClose() {
15,719✔
890
  tscInfo("monitor close");
15,719!
891
  taosWLockLatch(&monitorLock);
15,719✔
892
  atomic_store_32(&monitorFlag, 1);
15,719✔
893
  tscMonitorStop();
15,719✔
894
  sendAllCounter();
15,719✔
895
  taosHashCleanup(monitorCounterHash);
15,719✔
896
  taosHashCleanup(monitorSlowLogHash);
15,719✔
897
  taosTmrCleanUp(monitorTimer);
15,719✔
898
  taosCloseQueue(monitorQueue);
15,719✔
899
  if (tsem2_destroy(&monitorSem) != 0) {
15,719!
900
    tscError("failed to destroy semaphore");
×
901
  }
902
  taosWUnLockLatch(&monitorLock);
15,719✔
903
}
15,719✔
904

905
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
16,208✔
906
  int32_t             code = 0;
16,208✔
907
  MonitorSlowLogData* slowLogData = NULL;
16,208✔
908

909
  if (atomic_load_32(&monitorFlag) == 1) {
16,208✔
910
    tscError("monitor slow log thread is exiting");
471!
911
    return -1;
471✔
912
  }
913

914
  code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
15,737✔
915
  if (code) {
15,737!
916
    tscError("monitor failed to allocate slow log data");
×
917
    return code;
×
918
  }
919
  *slowLogData = data;
15,737✔
920
  tscDebug("monitor write slow log to queue, clusterId:0x%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
15,737✔
921
           queueTypeStr[slowLogData->type], slowLogData->data);
922
  if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
15,737!
923
    if (tsem2_post(&monitorSem) != 0) {
15,737!
924
      tscError("failed to post semaphore");
×
925
    }
926
  } else {
927
    if (taosCloseFile(&(slowLogData->pFile)) != 0) {
×
928
      tscError("failed to close file:%p", slowLogData->pFile);
×
929
    }
930
    monitorFreeSlowLogData(slowLogData);
×
931
    taosFreeQitem(slowLogData);
×
932
  }
933
  return 0;
15,737✔
934
}
935

936
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
64,205✔
937
  taosMemoryFree(pMsg->pData);
64,205!
938
  taosMemoryFree(pMsg->pEpSet);
64,204!
939
  tscDebug("[del report]delete reportCB code:%d", code);
64,205✔
940
  return 0;
64,206✔
941
}
942

943
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
64,195✔
944
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
64,195!
945
  if (sendInfo == NULL) {
64,200!
946
    tscError("[del report] failed to allocate memory for sendInfo");
×
947
    return terrno;
×
948
  }
949

950
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
64,200✔
951

952
  sendInfo->requestId = requestId;
64,200✔
953
  sendInfo->requestObjRefId = 0;
64,200✔
954
  sendInfo->param = NULL;
64,200✔
955
  sendInfo->fp = reportCB;
64,200✔
956
  sendInfo->msgType = TDMT_MND_AUDIT;
64,200✔
957

958
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
64,200✔
959

960
  int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
64,206✔
961
  if (code != 0) {
64,198!
962
    tscError("[del report] failed to send msg to server, code:%d", code);
×
963
    taosMemoryFree(sendInfo);
×
964
    return code;
×
965
  }
966
  return TSDB_CODE_SUCCESS;
64,198✔
967
}
968

969
static void reportDeleteSql(SRequestObj* pRequest) {
64,201✔
970
  SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
64,201✔
971
  STscObj*     pTscObj = pRequest->pTscObj;
64,201✔
972

973
  if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
64,201!
974
    tscError("[del report] invalid tsc obj");
×
975
    return;
×
976
  }
977

978
  if(pTscObj->pAppInfo->serverCfg.enableAuditDelete == 0) {
64,204!
979
    tscDebug("[del report] audit delete is disabled");
×
980
    return;
×
981
  }
982

983
  if (pRequest->code != TSDB_CODE_SUCCESS) {
64,204!
984
    tscDebug("[del report] delete request result code:%d", pRequest->code);
×
985
    return;
×
986
  }
987

988
  if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
64,204!
989
    tscError("[del report] invalid from table node type:%d", nodeType(pStmt->pFromTable));
×
990
    return;
×
991
  }
992

993
  SRealTableNode* pTable = (SRealTableNode*)pStmt->pFromTable;
64,204✔
994
  SAuditReq       req;
995
  req.pSql = pRequest->sqlstr;
64,204✔
996
  req.sqlLen = pRequest->sqlLen;
64,204✔
997
  TAOS_UNUSED(tsnprintf(req.table, TSDB_TABLE_NAME_LEN, "%s", pTable->table.tableName));
64,204✔
998
  TAOS_UNUSED(tsnprintf(req.db, TSDB_DB_FNAME_LEN, "%s", pTable->table.dbName));
64,200✔
999
  TAOS_UNUSED(tsnprintf(req.operation, AUDIT_OPERATION_LEN, "delete"));
64,204✔
1000
  int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
64,197✔
1001
  void*   pReq = taosMemoryCalloc(1, tlen);
64,200!
1002
  if (pReq == NULL) {
64,202!
1003
    tscError("[del report] failed to allocate memory for req");
×
1004
    return;
×
1005
  }
1006

1007
  if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
64,202!
1008
    tscError("[del report] failed to serialize req");
×
1009
    taosMemoryFree(pReq);
×
1010
    return;
×
1011
  }
1012

1013
  int32_t code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
64,199✔
1014
  if (code != 0) {
64,198!
1015
    tscError("[del report] failed to send audit info, code:%d", code);
×
1016
    taosMemoryFree(pReq);
×
1017
    return;
×
1018
  }
1019
  tscDebug("[del report] delete data, sql:%s", req.pSql);
64,198✔
1020
}
1021

1022
void clientOperateReport(SRequestObj* pRequest) {
10,709,450✔
1023
  if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
10,709,450!
1024
    tscError("[del report] invalid request");
77,007!
1025
    return;
88,236✔
1026
  }
1027

1028
  if (QUERY_NODE_DELETE_STMT == nodeType(pRequest->pQuery->pRoot)) {
10,632,443✔
1029
    reportDeleteSql(pRequest);
64,205✔
1030
  }
1031
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc