• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.92
/source/client/src/clientMonitor.c
1
#include "clientMonitor.h"
2
#include "cJSON.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5
#include "tmisce.h"
6
#include "tqueue.h"
7
#include "ttime.h"
8
#include "ttimer.h"
9

10
SRWLatch    monitorLock;
11
void*       monitorTimer;
12
SHashObj*   monitorCounterHash;
13
int32_t     monitorFlag = 0;
14
int32_t     quitCnt = 0;
15
tsem2_t     monitorSem;
16
STaosQueue* monitorQueue;
17
SHashObj*   monitorSlowLogHash;
18
char        tmpSlowLogPath[PATH_MAX] = {0};
19
TdThread    monitorThread;
20
extern bool tsEnableAuditDelete;
21

22
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
2,067✔
23
  int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
2,067✔
24
  if (ret < 0) {
2,067!
25
    tscError("failed to get tmp path ret:%d", ret);
×
26
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
27
  }
28
  return 0;
2,067✔
29
}
30

31
static void processFileInTheEnd(TdFilePtr pFile, char* path) {
1✔
32
  if (pFile == NULL) {
1!
33
    return;
×
34
  }
35
  if (taosFtruncateFile(pFile, 0) != 0) {
1!
36
    tscError("failed to truncate file:%s, terrno:%d", path, terrno);
×
37
    return;
×
38
  }
39
  if (taosUnLockFile(pFile) != 0) {
1!
40
    tscError("failed to unlock file:%s, terrno:%d", path, terrno);
×
41
    return;
×
42
  }
43
  if (taosCloseFile(&(pFile)) != 0) {
1!
44
    tscError("failed to close file:%s, terrno:%d", path, terrno);
×
45
    return;
×
46
  }
47
  if (taosRemoveFile(path) != 0) {
1!
48
    tscError("failed to remove file:%s, terrno:%d", path, terrno);
×
49
    return;
×
50
  }
51
}
52

53
static void destroySlowLogClient(void* data) {
1✔
54
  if (data == NULL) {
1!
55
    return;
×
56
  }
57
  SlowLogClient* slowLogClient = *(SlowLogClient**)data;
1✔
58
  processFileInTheEnd(slowLogClient->pFile, slowLogClient->path);
1✔
59
  taosMemoryFree(slowLogClient);
1✔
60
}
61

62
static void destroyMonitorClient(void* data) {
1,582✔
63
  if (data == NULL) {
1,582!
64
    return;
×
65
  }
66
  MonitorClient* pMonitor = *(MonitorClient**)data;
1,582✔
67
  if (pMonitor == NULL) {
1,582!
68
    return;
×
69
  }
70
  if (!taosTmrStopA(&pMonitor->timer)) {
1,582✔
71
    tscError("failed to stop timer, pMonitor:%p", pMonitor);
354!
72
  }
73
  taosHashCleanup(pMonitor->counters);
1,582✔
74
  int ret = taos_collector_registry_destroy(pMonitor->registry);
1,582✔
75
  if (ret) {
1,582!
76
    tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
×
77
  }
78
  taosMemoryFree(pMonitor);
1,582✔
79
}
80

81
static void monitorFreeSlowLogData(void* paras) {
1,734✔
82
  MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
1,734✔
83
  if (pData == NULL) {
1,734✔
84
    return;
151✔
85
  }
86
  taosMemoryFreeClear(pData->data);
1,583✔
87
  if (pData->type == SLOW_LOG_READ_BEGINNIG) {
1,583✔
88
    taosMemoryFree(pData->fileName);
1,582✔
89
  }
90
}
91

92
static void monitorFreeSlowLogDataEx(void* paras) {
151✔
93
  monitorFreeSlowLogData(paras);
151✔
94
  taosMemoryFree(paras);
151✔
95
}
151✔
96

97
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
4,351✔
98
  void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
4,351✔
99
  if (p == NULL) {
4,350✔
100
    tscError("failed to get app inst, clusterId:%" PRIx64, clusterId);
1!
101
    return NULL;
1✔
102
  }
103
  return *(SAppInstInfo**)p;
4,349✔
104
}
105

106
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
151✔
107
  if (TSDB_CODE_SUCCESS != code) {
151✔
108
    tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
13!
109
  }
110
  if (pMsg) {
151!
111
    taosMemoryFree(pMsg->pData);
151✔
112
    taosMemoryFree(pMsg->pEpSet);
151✔
113
  }
114
  if (param != NULL) {
151!
UNCOV
115
    MonitorSlowLogData* p = (MonitorSlowLogData*)param;
×
UNCOV
116
    if (code != 0) {
×
UNCOV
117
      tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
×
118
    }
UNCOV
119
    MonitorSlowLogData tmp = {.clusterId = p->clusterId,
×
UNCOV
120
                              .type = p->type,
×
UNCOV
121
                              .fileName = p->fileName,
×
UNCOV
122
                              .pFile = p->pFile,
×
UNCOV
123
                              .offset = p->offset,
×
124
                              .data = NULL};
UNCOV
125
    if (monitorPutData2MonitorQueue(tmp) == 0) {
×
UNCOV
126
      p->fileName = NULL;
×
127
    } else {
UNCOV
128
      if (taosCloseFile(&(p->pFile)) != 0) {
×
129
        tscError("failed to close file:%p", p->pFile);
×
130
      }
131
    }
132
  }
133
  return TSDB_CODE_SUCCESS;
151✔
134
}
135

136
static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) {
168✔
137
  SStatisReq sStatisReq;
138
  sStatisReq.pCont = pCont;
168✔
139
  sStatisReq.contLen = strlen(pCont);
168✔
140
  sStatisReq.type = type;
168✔
141

142
  int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
168✔
143
  if (tlen < 0) {
168!
144
    goto FAILED;
×
145
  }
146
  void* buf = taosMemoryMalloc(tlen);
168✔
147
  if (buf == NULL) {
168!
148
    tscError("sendReport failed, out of memory, len:%d", tlen);
×
149
    goto FAILED;
×
150
  }
151
  tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
168✔
152
  if (tlen < 0) {
168!
153
    taosMemoryFree(buf);
×
154
    goto FAILED;
×
155
  }
156

157
  SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
168✔
158
  if (pInfo == NULL) {
168!
159
    tscError("sendReport failed, out of memory send info");
×
160
    taosMemoryFree(buf);
×
161
    goto FAILED;
×
162
  }
163
  pInfo->fp = monitorReportAsyncCB;
168✔
164
  pInfo->msgInfo.pData = buf;
168✔
165
  pInfo->msgInfo.len = tlen;
168✔
166
  pInfo->msgType = TDMT_MND_STATIS;
168✔
167
  pInfo->param = param;
168✔
168
  pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
168✔
169
  pInfo->requestId = tGenIdPI64();
168✔
170
  pInfo->requestObjRefId = 0;
168✔
171

172
  // int64_t transporterId = 0;
173
  return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
168✔
174

175
FAILED:
×
176
  if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
×
177
    tscError("failed to close file:%p", ((MonitorSlowLogData*)param)->pFile);
×
178
  }
179
  monitorFreeSlowLogDataEx(param);
×
180
  return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
×
181
}
182

183
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
1,129✔
184
  char ts[50] = {0};
1,129!
185
  (void)snprintf(ts, sizeof(ts), "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
1,129✔
186
  char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
1,129✔
187
  if (NULL == pCont) {
1,129✔
188
    tscError("generateClusterReport failed, get null content.");
962!
189
    return;
962✔
190
  }
191

192
  if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
167!
193
    int ret = taos_collector_registry_clear_batch(registry);
167✔
194
    if (ret) {
167!
195
      tscError("failed to clear registry, ret:%d", ret);
×
196
    }
197
  }
198
  taosMemoryFreeClear(pCont);
167!
199
}
200

201
static void reportSendProcess(void* param, void* tmrId) {
1,141✔
202
  taosRLockLatch(&monitorLock);
1,141✔
203
  if (atomic_load_32(&monitorFlag) == 1) {
1,141✔
204
    taosRUnLockLatch(&monitorLock);
12✔
205
    return;
12✔
206
  }
207

208
  MonitorClient* pMonitor = (MonitorClient*)param;
1,129✔
209
  SAppInstInfo*  pInst = getAppInstByClusterId(pMonitor->clusterId);
1,129✔
210
  if (pInst == NULL) {
1,129!
211
    taosRUnLockLatch(&monitorLock);
×
212
    return;
×
213
  }
214

215
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
1,129✔
216
  generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
1,129✔
217
  bool reset =
218
      taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
1,129✔
219
  tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
1,129✔
220
  taosRUnLockLatch(&monitorLock);
1,129✔
221
}
222

223
static void sendAllCounter() {
1,602✔
224
  MonitorClient** ppMonitor = NULL;
1,602✔
225
  while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) {
1,602✔
226
    MonitorClient* pMonitor = *ppMonitor;
1✔
227
    if (pMonitor == NULL) {
1!
228
      continue;
×
229
    }
230
    SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
1✔
231
    if (pInst == NULL) {
1!
232
      taosHashCancelIterate(monitorCounterHash, ppMonitor);
1✔
233
      break;
1✔
234
    }
235
    SEpSet ep = getEpSet_s(&pInst->mgmtEp);
×
236
    generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
×
237
  }
238
}
1,602✔
239

240
void monitorCreateClient(int64_t clusterId) {
3,164✔
241
  MonitorClient* pMonitor = NULL;
3,164✔
242
  taosWLockLatch(&monitorLock);
3,164✔
243
  if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
3,164✔
244
    tscInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId);
1,582!
245
    pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
1,582✔
246
    if (pMonitor == NULL) {
1,582!
247
      tscError("failed to create monitor client");
×
248
      goto fail;
×
249
    }
250
    pMonitor->clusterId = clusterId;
1,582✔
251
    char clusterKey[32] = {0};
1,582✔
252
    if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) {
1,582!
253
      tscError("failed to create cluster key");
×
254
      goto fail;
×
255
    }
256
    pMonitor->registry = taos_collector_registry_new(clusterKey);
1,582✔
257
    if (pMonitor->registry == NULL) {
1,582!
258
      tscError("failed to create registry");
×
259
      goto fail;
×
260
    }
261
    pMonitor->colector = taos_collector_new(clusterKey);
1,582✔
262
    if (pMonitor->colector == NULL) {
1,582!
263
      tscError("failed to create collector");
×
264
      goto fail;
×
265
    }
266

267
    int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
1,582✔
268
    if (r) {
1,582!
269
      tscError("failed to register collector, ret:%d", r);
×
270
      goto fail;
×
271
    }
272
    pMonitor->counters =
3,164✔
273
        (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
1,582✔
274
    if (pMonitor->counters == NULL) {
1,582!
275
      tscError("failed to create monitor counters");
×
276
      goto fail;
×
277
    }
278

279
    if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
1,582!
280
      tscError("failed to put monitor client to hash");
×
281
      goto fail;
×
282
    }
283

284
    SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
1,582✔
285
    if (pInst == NULL) {
1,582!
286
      tscError("failed to get app instance by cluster id");
×
287
      pMonitor = NULL;
×
288
      goto fail;
×
289
    }
290
    pMonitor->timer =
3,164✔
291
        taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
1,582✔
292
    if (pMonitor->timer == NULL) {
1,582!
293
      tscError("failed to start timer");
×
294
      goto fail;
×
295
    }
296
    tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor);
1,582!
297
  }
298
  taosWUnLockLatch(&monitorLock);
3,164✔
299

300
  return;
3,164✔
301

302
fail:
×
303
  destroyMonitorClient(&pMonitor);
×
304
  taosWUnLockLatch(&monitorLock);
×
305
}
306

307
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count,
3,164✔
308
                                const char** label_keys) {
309
  taosWLockLatch(&monitorLock);
3,164✔
310
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
3,164✔
311
  if (ppMonitor == NULL || *ppMonitor == NULL) {
3,164!
312
    tscError("failed to get monitor client");
×
313
    goto end;
×
314
  }
315
  taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
3,164✔
316
  if (newCounter == NULL) return;
3,164!
317
  MonitorClient* pMonitor = *ppMonitor;
3,164✔
318
  if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
3,164!
UNCOV
319
    tscError("failed to add metric to collector");
×
UNCOV
320
    int r = taos_counter_destroy(newCounter);
×
UNCOV
321
    if (r) {
×
322
      tscError("failed to destroy counter, code: %d", r);
×
323
    }
UNCOV
324
    goto end;
×
325
  }
326
  if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
3,164!
327
    tscError("failed to put counter to monitor");
×
328
    int r = taos_counter_destroy(newCounter);
×
329
    if (r) {
×
330
      tscError("failed to destroy counter, code: %d", r);
×
331
    }
332
    goto end;
×
333
  }
334
  tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
3,164!
335
          newCounter);
336

337
end:
×
338
  taosWUnLockLatch(&monitorLock);
3,164✔
339
}
340

341
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
307,361✔
342
  taosWLockLatch(&monitorLock);
307,361✔
343
  if (atomic_load_32(&monitorFlag) == 1) {
307,361!
344
    taosWUnLockLatch(&monitorLock);
×
345
    return;
×
346
  }
347

348
  MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
307,361✔
349
  if (ppMonitor == NULL || *ppMonitor == NULL) {
307,361!
350
    tscError("monitorCounterInc not found pMonitor %" PRId64, clusterId);
×
351
    goto end;
×
352
  }
353

354
  MonitorClient*   pMonitor = *ppMonitor;
307,361✔
355
  taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
307,361✔
356
  if (ppCounter == NULL || *ppCounter == NULL) {
307,361!
357
    tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName);
×
358
    goto end;
×
359
  }
360
  if (taos_counter_inc(*ppCounter, label_values) != 0) {
307,361!
361
    tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName);
×
362
    goto end;
×
363
  }
364
  tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
307,361✔
365

366
end:
1,196✔
367
  taosWUnLockLatch(&monitorLock);
307,361✔
368
}
369

370
const char* monitorResultStr(SQL_RESULT_CODE code) {
307,361✔
371
  static const char* result_state[] = {"Success", "Failed", "Cancel"};
372
  return result_state[code];
307,361✔
373
}
374

375
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
1✔
376
  TdFilePtr pFile = NULL;
1✔
377
  void*     tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
1✔
378
  if (tmp == NULL) {
1!
379
    char path[PATH_MAX] = {0};
1✔
380
    char clusterId[32] = {0};
1✔
381
    if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0) {
1!
382
      tscError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId);
×
383
      return;
×
384
    }
385
    taosGetTmpfilePath(tmpPath, clusterId, path);
1✔
386
    tscInfo("[monitor] create slow log file:%s", path);
1!
387
    pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
1✔
388
    if (pFile == NULL) {
1!
389
      tscError("failed to open file:%s since %d", path, terrno);
×
390
      return;
×
391
    }
392

393
    SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
1✔
394
    if (pClient == NULL) {
1!
395
      tscError("failed to allocate memory for slow log client");
×
396
      int32_t ret = taosCloseFile(&pFile);
×
397
      if (ret != 0) {
×
398
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
399
      }
400
      return;
×
401
    }
402
    pClient->lastCheckTime = taosGetMonoTimestampMs();
1✔
403
    tstrncpy(pClient->path, path, PATH_MAX);
1✔
404
    pClient->offset = 0;
1✔
405
    pClient->pFile = pFile;
1✔
406
    if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
1!
407
      tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
×
408
      int32_t ret = taosCloseFile(&pFile);
×
409
      if (ret != 0) {
×
410
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
411
      }
412
      taosMemoryFree(pClient);
×
413
      return;
×
414
    }
415

416
    if (taosLockFile(pFile) < 0) {
1!
417
      tscError("failed to lock file:%p since %s", pFile, terrstr());
×
418
      return;
×
419
    }
420
  } else {
UNCOV
421
    pFile = (*(SlowLogClient**)tmp)->pFile;
×
422
  }
423

424
  if (taosLSeekFile(pFile, 0, SEEK_END) < 0) {
1!
425
    tscError("failed to seek file:%p code: %d", pFile, terrno);
×
426
    return;
×
427
  }
428
  if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) {
1!
429
    tscError("failed to write len to file:%p since %s", pFile, terrstr());
×
430
  }
431
  tscDebug("[monitor] write slow log to file:%p, clusterId:%" PRIx64, pFile, slowLogData->clusterId);
1!
432
}
433

434
static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
1✔
435
  tscDebug("[monitor] readFile slow begin pFile:%p, offset:%" PRId64 ", size:%" PRId64, pFile, *offset, size);
1!
436
  if (taosLSeekFile(pFile, *offset, SEEK_SET) < 0) {
1!
437
    tscError("failed to seek file:%p code: %d", pFile, terrno);
×
438
    return NULL;
×
439
  }
440

441
  if ((size <= *offset)) {
1!
442
    tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
×
443
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
444
    return NULL;
×
445
  }
446
  char*   pCont = NULL;
1✔
447
  int64_t totalSize = 0;
1✔
448
  if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
1!
449
    totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX;
×
450
  } else {
451
    totalSize = 4 + (size - *offset);
1✔
452
  }
453

454
  pCont = taosMemoryCalloc(1, totalSize);  // 4 reserved for []
1✔
455
  if (pCont == NULL) {
1!
456
    tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
×
457
    return NULL;
×
458
  }
459
  char* buf = pCont;
1✔
460
  (void)strncat(buf++, "[", totalSize - 1);
1✔
461
  int64_t readSize = taosReadFile(pFile, buf, totalSize - 4); // 4 reserved for []
1✔
462
  if (readSize <= 0) {
1!
463
    if (readSize < 0) {
×
464
      tscError("failed to read len from file:%p since %s", pFile, terrstr());
×
465
    }
466
    taosMemoryFree(pCont);
×
467
    return NULL;
×
468
  }
469

470
  totalSize = 0;
1✔
471
  while (1) {
1✔
472
    size_t len = strlen(buf);
2✔
473
    totalSize += (len + 1);
2✔
474
    if (totalSize > readSize || len == 0) {
2!
475
      *(buf - 1) = ']';
1✔
476
      *buf = '\0';
1✔
477
      break;
1✔
478
    }
479
    buf[len] = ',';  // replace '\0' with ','
1✔
480
    buf += (len + 1);
1✔
481
    *offset += (len + 1);
1✔
482
  }
483

484
  tscDebug("[monitor] readFile slow log end, data:%s, offset:%" PRId64, pCont, *offset);
1!
485
  return pCont;
1✔
486
}
487

488
static int64_t getFileSize(char* path) {
1✔
489
  int64_t fileSize = 0;
1✔
490
  if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
1!
491
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
492
  }
493

494
  return fileSize;
1✔
495
}
496

497
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type,
1✔
498
                           char* fileName, void* pTransporter, SEpSet* epSet) {
499
  if (data == NULL) {
1!
500
    if (taosCloseFile(&pFile) != 0) {
×
501
      tscError("failed to close file:%p", pFile);
×
502
    }
503
    taosMemoryFree(fileName);
×
504
    return TSDB_CODE_INVALID_PARA;
×
505
  }
506
  MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
1✔
507
  if (pParam == NULL) {
1!
508
    if (taosCloseFile(&pFile) != 0) {
×
509
      tscError("failed to close file:%p", pFile);
×
510
    }
511
    taosMemoryFree(data);
×
512
    taosMemoryFree(fileName);
×
513
    return terrno;
×
514
  }
515
  pParam->data = data;
1✔
516
  pParam->offset = offset;
1✔
517
  pParam->clusterId = clusterId;
1✔
518
  pParam->type = type;
1✔
519
  pParam->pFile = pFile;
1✔
520
  pParam->fileName = fileName;
1✔
521
  return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
1✔
522
}
523

524
static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size,
1✔
525
                               SLOW_LOG_QUEUE_TYPE type, char* fileName) {
526
  SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
1✔
527
  if (pInst == NULL) {
1!
528
    tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
×
529
    if (taosCloseFile(&pFile) != 0) {
×
530
      tscError("failed to close file:%p", pFile);
×
531
    }
532
    taosMemoryFree(fileName);
×
533
    return terrno;
×
534
  }
535
  SEpSet ep = getEpSet_s(&pInst->mgmtEp);
1✔
536
  char*  data = readFile(pFile, offset, size);
1✔
537
  if (data == NULL) return terrno;
1!
538
  return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
1!
539
                     pInst->pTransporter, &ep);
540
}
541

UNCOV
542
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
×
UNCOV
543
  if (fileName == NULL) {
×
544
    return;
×
545
  }
UNCOV
546
  int64_t size = getFileSize(*fileName);
×
UNCOV
547
  if (size <= offset) {
×
UNCOV
548
    processFileInTheEnd(pFile, *fileName);
×
UNCOV
549
    tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
×
550
  } else {
UNCOV
551
    int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
×
UNCOV
552
    if (code == 0) {
×
UNCOV
553
      tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId);
×
554
    } else {
555
      tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId,
×
556
               code);
557
    }
UNCOV
558
    *fileName = NULL;
×
559
  }
560
}
561

UNCOV
562
static void monitorSendSlowLogAtRunning(int64_t clusterId) {
×
UNCOV
563
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
UNCOV
564
  if (tmp == NULL) {
×
565
    tscError("failed to get slow log client by clusterId:%" PRId64, clusterId);
×
566
    return;
×
567
  }
UNCOV
568
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
UNCOV
569
  if (pClient == NULL) {
×
570
    tscError("failed to get slow log client by clusterId:%" PRId64, clusterId);
×
571
    return;
×
572
  }
UNCOV
573
  int64_t size = getFileSize(pClient->path);
×
UNCOV
574
  if (size <= pClient->offset) {
×
575
    if (taosFtruncateFile(pClient->pFile, 0) < 0) {
×
576
      tscError("failed to truncate file:%p code: %d", pClient->pFile, terrno);
×
577
    }
578
    tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile);
×
579
    pClient->offset = 0;
×
580
  } else {
UNCOV
581
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
×
UNCOV
582
    tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code);
×
583
  }
584
}
585

586
static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
×
587
  void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
×
588
  if (tmp == NULL) {
×
589
    return true;
×
590
  }
591
  SlowLogClient* pClient = (*(SlowLogClient**)tmp);
×
592
  if (pClient == NULL) {
×
593
    return true;
×
594
  }
595
  int64_t size = getFileSize(pClient->path);
×
596
  if (size <= pClient->offset) {
×
597
    processFileInTheEnd(pClient->pFile, pClient->path);
×
598
    pClient->pFile = NULL;
×
599
    tscInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path);
×
600
    if ((--quitCnt) == 0) {
×
601
      return true;
×
602
    }
603
  } else {
604
    int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
×
605
    tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code);
×
606
  }
607
  return false;
×
608
}
609
static void monitorSendAllSlowLogAtQuit() {
1,602✔
610
  void* pIter = NULL;
1,602✔
611
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
1,603✔
612
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
1✔
613
    if (pClient == NULL) {
1!
614
      continue;
×
615
    }
616
    int64_t size = getFileSize(pClient->path);
1✔
617
    if (size <= pClient->offset) {
1!
618
      processFileInTheEnd(pClient->pFile, pClient->path);
×
619
      pClient->pFile = NULL;
×
620
    } else if (pClient->offset == 0) {
1!
621
      int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
1✔
622
      int32_t  code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
1✔
623
      tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%" PRId64 ",ret:%d", *clusterId, code);
1!
624
      if (code == 0) {
1!
625
        quitCnt++;
1✔
626
      }
627
    }
628
  }
629
}
1,602✔
630

631
static void processFileRemoved(SlowLogClient* pClient) {
×
632
  if (taosUnLockFile(pClient->pFile) != 0) {
×
633
    tscError("failed to unlock file:%s since %d", pClient->path, terrno);
×
634
    return;
×
635
  }
636
  int32_t ret = taosCloseFile(&(pClient->pFile));
×
637
  if (ret != 0) {
×
638
    tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
×
639
    return;
×
640
  }
641

642
  TdFilePtr pFile =
643
      taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
×
644
  if (pFile == NULL) {
×
645
    tscError("failed to open file:%s since %d", pClient->path, terrno);
×
646
  } else {
647
    pClient->pFile = pFile;
×
648
  }
649
}
650

651
static void monitorSendAllSlowLog() {
512,426✔
652
  int64_t t = taosGetMonoTimestampMs();
512,426✔
653
  void*   pIter = NULL;
512,426✔
654
  while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
512,482✔
655
    int64_t*       clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
56✔
656
    SAppInstInfo*  pInst = getAppInstByClusterId(*clusterId);
56✔
657
    SlowLogClient* pClient = (*(SlowLogClient**)pIter);
56✔
658
    if (pClient == NULL || pInst == NULL) {
56!
659
      taosHashCancelIterate(monitorSlowLogHash, pIter);
×
660
      return;
×
661
    }
662
    if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) {
56!
UNCOV
663
      pClient->lastCheckTime = t;
×
664
    } else {
665
      continue;
56✔
666
    }
667

UNCOV
668
    if (pClient->offset == 0) {
×
UNCOV
669
      int64_t size = getFileSize(pClient->path);
×
UNCOV
670
      if (size <= 0) {
×
671
        if (size < 0) {
×
672
          tscError("[monitor] monitorSendAllSlowLog failed to get file size:%s, err:%d", pClient->path, errno);
×
673
          if (errno == ENOENT) {
×
674
            processFileRemoved(pClient);
×
675
          }
676
        }
677
        continue;
×
678
      }
UNCOV
679
      int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
×
UNCOV
680
      tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%" PRId64 ",ret:%d", *clusterId, code);
×
681
    }
682
  }
683
}
684

685
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
1,582✔
686
  SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
1,582✔
687

688
  if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
1,582!
689
    tscInfo("[monitor] monitor is disabled, skip send slow log");
1,117!
690
    return;
1,117✔
691
  }
692
  char namePrefix[PATH_MAX] = {0};
465✔
693
  if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
465!
694
    tscError("failed to generate slow log file name prefix");
×
695
    return;
×
696
  }
697

698
  char tmpPath[PATH_MAX] = {0};
465✔
699
  if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
465!
700
    return;
×
701
  }
702

703
  TdDirPtr pDir = taosOpenDir(tmpPath);
465✔
704
  if (pDir == NULL) {
465!
705
    return;
×
706
  }
707

708
  TdDirEntryPtr de = NULL;
465✔
709
  while ((de = taosReadDir(pDir)) != NULL) {
25,683✔
710
    if (taosDirEntryIsDir(de)) {
25,218✔
711
      continue;
25,218✔
712
    }
713

714
    char* name = taosGetDirEntryName(de);
24,288✔
715
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) {
24,288!
716
      tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId);
24,285!
717
      continue;
24,285✔
718
    }
719

720
    char filename[PATH_MAX] = {0};
3✔
721
    (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
3✔
722
    TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
3✔
723
    if (pFile == NULL) {
3!
724
      tscError("failed to open file:%s since %s", filename, terrstr());
×
725
      continue;
×
726
    }
727
    if (taosLockFile(pFile) < 0) {
3!
728
      tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
3!
729
      int32_t ret = taosCloseFile(&pFile);
3✔
730
      if (ret != 0) {
3!
731
        tscError("failed to close file:%p ret:%d", pFile, ret);
×
732
      }
733
      continue;
3✔
734
    }
UNCOV
735
    char* tmp = taosStrdup(filename);
×
UNCOV
736
    if (tmp == NULL) {
×
737
      tscError("failed to dup string:%s since %s", filename, terrstr());
×
738
      if (taosUnLockFile(pFile) != 0) {
×
739
        tscError("failed to unlock file:%s, terrno:%d", filename, terrno);
×
740
      }
741
      if (taosCloseFile(&(pFile)) != 0) {
×
742
        tscError("failed to close file:%s, terrno:%d", filename, terrno);
×
743
      }
744
      continue;
×
745
    }
UNCOV
746
    monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
×
UNCOV
747
    taosMemoryFree(tmp);
×
748
  }
749

750
  int32_t ret = taosCloseDir(&pDir);
465✔
751
  if (ret != 0) {
465!
752
    tscError("failed to close dir, ret:%d", ret);
×
753
  }
754
}
755

756
static void* monitorThreadFunc(void* param) {
1,602✔
757
  setThreadName("client-monitor-slowlog");
1,602✔
758
  tscDebug("monitorThreadFunc start");
1,602✔
759
  int64_t quitTime = 0;
1,602✔
760
  while (1) {
512,432✔
761
    if (atomic_load_32(&monitorFlag) == 1) {
514,034✔
762
      if (quitCnt == 0) {
1,608✔
763
        monitorSendAllSlowLogAtQuit();
1,602✔
764
        if (quitCnt == 0) {
1,602✔
765
          tscInfo("monitorThreadFunc quit since no slow log to send");
1,601!
766
          break;
1,601✔
767
        }
768
        quitTime = taosGetMonoTimestampMs();
1✔
769
      }
770
      if (taosGetMonoTimestampMs() - quitTime > 500) {  // quit at most 500ms
7✔
771
        tscInfo("monitorThreadFunc quit since timeout");
1!
772
        break;
1✔
773
      }
774
    }
775

776
    MonitorSlowLogData* slowLogData = NULL;
512,432✔
777
    taosReadQitem(monitorQueue, (void**)&slowLogData);
512,432✔
778
    if (slowLogData != NULL) {
512,432✔
779
      if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
1,583!
780
        if (slowLogData->pFile != NULL) {
1,582!
UNCOV
781
          monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile,
×
UNCOV
782
                                        slowLogData->offset);
×
783
        } else {
784
          monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
1,582✔
785
        }
786
      } else if (slowLogData->type == SLOW_LOG_WRITE) {
1!
787
        monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
1✔
UNCOV
788
      } else if (slowLogData->type == SLOW_LOG_READ_RUNNING) {
×
UNCOV
789
        monitorSendSlowLogAtRunning(slowLogData->clusterId);
×
790
      } else if (slowLogData->type == SLOW_LOG_READ_QUIT) {
×
791
        if (monitorSendSlowLogAtQuit(slowLogData->clusterId)) {
×
792
          tscInfo("monitorThreadFunc quit since all slow log sended");
×
793
          monitorFreeSlowLogData(slowLogData);
×
794
          taosFreeQitem(slowLogData);
×
795
          break;
×
796
        }
797
      }
798
      monitorFreeSlowLogData(slowLogData);
1,583✔
799
      taosFreeQitem(slowLogData);
1,583✔
800
    }
801

802
    if (quitCnt == 0) {
512,432✔
803
      monitorSendAllSlowLog();
512,426✔
804
    }
805
    (void)tsem2_timewait(&monitorSem, 100);
512,432✔
806
  }
807
  return NULL;
1,602✔
808
}
809

810
static int32_t tscMonitortInit() {
1,602✔
811
  TdThreadAttr thAttr;
812
  if (taosThreadAttrInit(&thAttr) != 0) {
1,602!
813
    tscError("failed to init thread attr since %s", strerror(errno));
×
814
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
815
  }
816
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
1,602!
817
    tscError("failed to set thread attr since %s", strerror(errno));
×
818
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
819
  }
820

821
  if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
1,602!
822
    tscError("failed to create monitor thread since %s", strerror(errno));
×
823
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
824
  }
825

826
  (void)taosThreadAttrDestroy(&thAttr);
1,602✔
827
  return 0;
1,602✔
828
}
829

830
static void tscMonitorStop() {
1,602✔
831
  if (taosCheckPthreadValid(monitorThread)) {
1,602!
832
    (void)taosThreadJoin(monitorThread, NULL);
1,602✔
833
    taosThreadClear(&monitorThread);
1,602✔
834
  }
835
}
1,602✔
836

837
int32_t monitorInit() {
1,602✔
838
  int32_t code = 0;
1,602✔
839

840
  tscInfo("[monitor] tscMonitor init");
1,602!
841
  monitorCounterHash =
1,602✔
842
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,602✔
843
  if (monitorCounterHash == NULL) {
1,602!
844
    tscError("failed to create monitorCounterHash");
×
845
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
846
  }
847
  taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
1,602✔
848

849
  monitorSlowLogHash =
1,602✔
850
      (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,602✔
851
  if (monitorSlowLogHash == NULL) {
1,602!
852
    tscError("failed to create monitorSlowLogHash");
×
853
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
854
  }
855
  taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
1,602✔
856

857
  monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
1,602✔
858
  if (monitorTimer == NULL) {
1,602!
859
    tscError("failed to create monitor timer");
×
860
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
861
  }
862

863
  code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
1,602✔
864
  if (code != 0) {
1,602!
865
    return code;
×
866
  }
867

868
  code = taosMulModeMkDir(tmpSlowLogPath, 0777, true);
1,602✔
869
  if (code != 0) {
1,602!
870
    tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
×
871
    return code;
×
872
  }
873

874
  if (tsem2_init(&monitorSem, 0, 0) != 0) {
1,602!
875
    tscError("sem init error since %s", terrstr());
×
876
    return TAOS_SYSTEM_ERROR(errno);
×
877
  }
878

879
  code = taosOpenQueue(&monitorQueue);
1,602✔
880
  if (code) {
1,602!
881
    tscError("open queue error since %s", terrstr());
×
882
    return TAOS_GET_TERRNO(code);
×
883
  }
884

885
  taosInitRWLatch(&monitorLock);
1,602✔
886
  return tscMonitortInit();
1,602✔
887
}
888

889
void monitorClose() {
1,602✔
890
  tscInfo("[monitor] tscMonitor close");
1,602!
891
  taosWLockLatch(&monitorLock);
1,602✔
892
  atomic_store_32(&monitorFlag, 1);
1,602✔
893
  tscMonitorStop();
1,602✔
894
  sendAllCounter();
1,602✔
895
  taosHashCleanup(monitorCounterHash);
1,602✔
896
  taosHashCleanup(monitorSlowLogHash);
1,602✔
897
  taosTmrCleanUp(monitorTimer);
1,602✔
898
  taosCloseQueue(monitorQueue);
1,602✔
899
  if (tsem2_destroy(&monitorSem) != 0) {
1,602!
900
    tscError("failed to destroy semaphore");
×
901
  }
902
  taosWUnLockLatch(&monitorLock);
1,602✔
903
}
1,602✔
904

905
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
1,583✔
906
  int32_t             code = 0;
1,583✔
907
  MonitorSlowLogData* slowLogData = NULL;
1,583✔
908

909
  if (atomic_load_32(&monitorFlag) == 1) {
1,583!
UNCOV
910
    tscError("[monitor] slow log thread is exiting");
×
UNCOV
911
    return -1;
×
912
  }
913

914
  code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
1,583✔
915
  if (code) {
1,583!
916
    tscError("[monitor] failed to allocate slow log data");
×
917
    return code;
×
918
  }
919
  *slowLogData = data;
1,583✔
920
  tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
1,583✔
921
           queueTypeStr[slowLogData->type], slowLogData->data);
922
  if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
1,583!
923
    if (tsem2_post(&monitorSem) != 0) {
1,583!
924
      tscError("failed to post semaphore");
×
925
    }
926
  } else {
927
    if (taosCloseFile(&(slowLogData->pFile)) != 0) {
×
928
      tscError("failed to close file:%p", slowLogData->pFile);
×
929
    }
930
    monitorFreeSlowLogData(slowLogData);
×
931
    taosFreeQitem(slowLogData);
×
932
  }
933
  return 0;
1,583✔
934
}
935

936
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
1,666✔
937
  taosMemoryFree(pMsg->pData);
1,666✔
938
  taosMemoryFree(pMsg->pEpSet);
1,666✔
939
  tscDebug("[del report]delete reportCB code:%d", code);
1,666✔
940
  return 0;
1,666✔
941
}
942

943
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
1,666✔
944
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,666✔
945
  if (sendInfo == NULL) {
1,666!
946
    tscError("[del report]failed to allocate memory for sendInfo");
×
947
    return terrno;
×
948
  }
949

950
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
1,666✔
951

952
  sendInfo->requestId = requestId;
1,666✔
953
  sendInfo->requestObjRefId = 0;
1,666✔
954
  sendInfo->param = NULL;
1,666✔
955
  sendInfo->fp = reportCB;
1,666✔
956
  sendInfo->msgType = TDMT_MND_AUDIT;
1,666✔
957

958
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,666✔
959

960
  int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
1,666✔
961
  if (code != 0) {
1,666!
962
    tscError("[del report]failed to send msg to server, code:%d", code);
×
963
    taosMemoryFree(sendInfo);
×
964
    return code;
×
965
  }
966
  return TSDB_CODE_SUCCESS;
1,666✔
967
}
968

969
static void reportDeleteSql(SRequestObj* pRequest) {
1,666✔
970
  SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
1,666✔
971
  STscObj*     pTscObj = pRequest->pTscObj;
1,666✔
972

973
  if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
1,666!
974
    tscError("[del report]invalid tsc obj");
×
975
    return;
×
976
  }
977

978
  if(pTscObj->pAppInfo->serverCfg.enableAuditDelete == 0) {
1,666!
979
    tscDebug("[del report]audit delete is disabled");
×
980
    return;
×
981
  }
982

983
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,666!
984
    tscDebug("[del report]delete request result code:%d", pRequest->code);
×
985
    return;
×
986
  }
987

988
  if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
1,666!
989
    tscError("[del report]invalid from table node type:%d", nodeType(pStmt->pFromTable));
×
990
    return;
×
991
  }
992

993
  SRealTableNode* pTable = (SRealTableNode*)pStmt->pFromTable;
1,666✔
994
  SAuditReq       req;
995
  req.pSql = pRequest->sqlstr;
1,666✔
996
  req.sqlLen = pRequest->sqlLen;
1,666✔
997
  TAOS_UNUSED(tsnprintf(req.table, TSDB_TABLE_NAME_LEN, "%s", pTable->table.tableName));
1,666✔
998
  TAOS_UNUSED(tsnprintf(req.db, TSDB_DB_FNAME_LEN, "%s", pTable->table.dbName));
1,666✔
999
  TAOS_UNUSED(tsnprintf(req.operation, AUDIT_OPERATION_LEN, "delete"));
1,666✔
1000
  int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
1,666✔
1001
  void*   pReq = taosMemoryCalloc(1, tlen);
1,666✔
1002
  if (pReq == NULL) {
1,666!
1003
    tscError("[del report]failed to allocate memory for req");
×
1004
    return;
×
1005
  }
1006

1007
  if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
1,666!
1008
    tscError("[del report]failed to serialize req");
×
1009
    taosMemoryFree(pReq);
×
1010
    return;
×
1011
  }
1012

1013
  int32_t code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
1,666✔
1014
  if (code != 0) {
1,666!
1015
    tscError("[del report]failed to send audit info, code:%d", code);
×
1016
    taosMemoryFree(pReq);
×
1017
    return;
×
1018
  }
1019
  tscDebug("[del report]delete data, sql:%s", req.pSql);
1,666✔
1020
}
1021

1022
void clientOperateReport(SRequestObj* pRequest) {
1,359,140✔
1023
  if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
1,359,140!
1024
    tscError("[del report]invalid request");
80,366!
1025
    return;
80,368✔
1026
  }
1027

1028
  if (QUERY_NODE_DELETE_STMT == nodeType(pRequest->pQuery->pRoot)) {
1,278,774✔
1029
    reportDeleteSql(pRequest);
1,666✔
1030
  }
1031
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc