• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3548

04 Dec 2024 01:03PM UTC coverage: 59.846% (-0.8%) from 60.691%
#3548

push

travis-ci

web-flow
Merge pull request #29033 from taosdata/fix/calculate-vnode-memory-used

fix/calculate-vnode-memory-used

118484 of 254183 branches covered (46.61%)

Branch coverage included in aggregate %.

199691 of 277471 relevant lines covered (71.97%)

18794141.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.73
/source/libs/executor/src/dataDeleter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDeleterBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDeleterBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t dataLen;
35
  int32_t numOfRows;
36
  int32_t numOfCols;
37
  int8_t  compressed;
38
  char    data[];
39
} SDataCacheEntry;
40

41
typedef struct SDataDeleterHandle {
42
  SDataSinkHandle     sink;
43
  SDataSinkManager*   pManager;
44
  SDataBlockDescNode* pSchema;
45
  SDataDeleterNode*   pDeleter;
46
  SDeleterParam*      pParam;
47
  STaosQueue*         pDataBlocks;
48
  SDataDeleterBuf     nextOutput;
49
  int32_t             status;
50
  bool                queryEnd;
51
  uint64_t            useconds;
52
  uint64_t            cachedSize;
53
  TdThreadMutex       mutex;
54
} SDataDeleterHandle;
55

56
static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
58,883✔
57
  int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
58,883!
58

59
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
58,883✔
60
  pEntry->compressed = 0;
58,883✔
61
  pEntry->numOfRows = pInput->pData->info.rows;
58,883✔
62
  pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
58,883✔
63
  pEntry->dataLen = sizeof(SDeleterRes);
58,883✔
64

65
  pBuf->useSize = sizeof(SDataCacheEntry);
58,883✔
66

67
  SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
58,883✔
68
  if (NULL == pColRes) {
58,883!
69
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
70
  }
71
  SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
58,883✔
72
  if (NULL == pColSKey) {
58,883!
73
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
74
  }
75
  SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
58,883✔
76
  if (NULL == pColEKey) {
58,883!
77
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
78
  }
79

80
  SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
58,883✔
81
  pRes->suid = pHandle->pParam->suid;
58,883✔
82
  pRes->uidList = pHandle->pParam->pUidList;
58,883✔
83
  TAOS_STRCPY(pRes->tableName, pHandle->pDeleter->tableFName);
58,883✔
84
  TAOS_STRCPY(pRes->tsColName, pHandle->pDeleter->tsColName);
58,883✔
85
  pRes->affectedRows = *(int64_t*)pColRes->pData;
58,883✔
86

87
  if (pRes->affectedRows) {
58,883✔
88
    pRes->skey = *(int64_t*)pColSKey->pData;
54,344✔
89
    pRes->ekey = *(int64_t*)pColEKey->pData;
54,344✔
90
    if (pRes->skey > pRes->ekey) {
54,344!
91
      qError("data delter skey:%" PRId64 " is bigger than ekey:%" PRId64, pRes->skey, pRes->ekey);
×
92
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
93
    }
94
  } else {
95
    pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
4,539✔
96
    pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
4,539✔
97
  }
98

99
  qDebug("delete %" PRId64 " rows, from %" PRId64 " to %" PRId64 "", pRes->affectedRows, pRes->skey, pRes->ekey);
58,883✔
100

101
  pBuf->useSize += pEntry->dataLen;
58,883✔
102

103
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
58,883✔
104
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
58,883✔
105

106
  return TSDB_CODE_SUCCESS;
58,883✔
107
}
108

109
static int32_t allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
58,882✔
110
  uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
58,882✔
111
  if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
58,882!
112
    qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
×
113
           taosQueueItemSize(pDeleter->pDataBlocks));
114
    return TSDB_CODE_OUT_OF_MEMORY;
×
115
  }
116

117
  pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
58,882✔
118

119
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
58,882✔
120
  if (pBuf->pData == NULL) {
58,883!
121
    qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
×
122
    return terrno;
×
123
  }
124

125
  return TSDB_CODE_SUCCESS;
58,883✔
126
}
127

128
static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
117,763✔
129
  (void)taosThreadMutexLock(&pDeleter->mutex);
117,763✔
130
  int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
117,766✔
131
  int32_t status =
117,766✔
132
      (0 == blockNums ? DS_BUF_EMPTY
133
                      : (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
117,766!
134
  pDeleter->status = status;
117,766✔
135
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
117,766✔
136
  
137
  return status;
117,766✔
138
}
139

140
static int32_t getStatus(SDataDeleterHandle* pDeleter) {
×
141
  (void)taosThreadMutexLock(&pDeleter->mutex);
×
142
  int32_t status = pDeleter->status;
×
143
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
×
144
  
145
  return status;
×
146
}
147

148
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
58,882✔
149
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
58,882✔
150
  SDataDeleterBuf*    pBuf = NULL;
58,882✔
151

152
  int32_t code = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0, (void**)&pBuf);
58,882✔
153
  if (code) {
58,883!
154
    return code;
×
155
  }
156

157
  code = allocBuf(pDeleter, pInput, pBuf);
58,883✔
158
  if (code) {
58,883!
159
    taosFreeQitem(pBuf);
×
160
    return code;
×
161
  }
162

163
  QRY_ERR_JRET(toDataCacheEntry(pDeleter, pInput, pBuf));
58,883!
164
  QRY_ERR_JRET(taosWriteQitem(pDeleter->pDataBlocks, pBuf));
58,883!
165
  *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
58,883✔
166

167
  return TSDB_CODE_SUCCESS;
58,882✔
168

169
_return:
×
170

171
  taosFreeQitem(pBuf);
×
172
  
173
  return code;
×
174
}
175

176
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
58,890✔
177
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
58,890✔
178
  (void)taosThreadMutexLock(&pDeleter->mutex);
58,890✔
179
  pDeleter->queryEnd = true;
58,891✔
180
  pDeleter->useconds = useconds;
58,891✔
181
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
58,891✔
182
}
58,891✔
183

184
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
58,890✔
185
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
58,890✔
186
  if (taosQueueEmpty(pDeleter->pDataBlocks)) {
58,890✔
187
    *pQueryEnd = pDeleter->queryEnd;
8✔
188
    *pLen = 0;
8✔
189
    return;
8✔
190
  }
191

192
  SDataDeleterBuf* pBuf = NULL;
58,883✔
193
  taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
58,883✔
194
  if (pBuf != NULL) {
58,883!
195
    TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
58,883✔
196
    taosFreeQitem(pBuf);
58,883✔
197
  }
198

199
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
58,883✔
200
  *pLen = pEntry->dataLen;
58,883✔
201
  *pRawLen = pEntry->dataLen;
58,883✔
202

203
  *pQueryEnd = pDeleter->queryEnd;
58,883✔
204
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
58,883✔
205
         ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
206
}
207

208
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
58,881✔
209
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
58,881✔
210
  if (NULL == pDeleter->nextOutput.pData) {
58,881!
211
    if (!pDeleter->queryEnd) {
×
212
      qError("empty res while query not end in data deleter");
×
213
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
214
    }
215
    pOutput->useconds = pDeleter->useconds;
×
216
    pOutput->precision = pDeleter->pSchema->precision;
×
217
    pOutput->bufStatus = DS_BUF_EMPTY;
×
218
    pOutput->queryEnd = pDeleter->queryEnd;
×
219
    return TSDB_CODE_SUCCESS;
×
220
  }
221

222
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
58,881✔
223
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
58,881✔
224
  pDeleter->pParam->pUidList = NULL;
58,881✔
225
  pOutput->numOfRows = pEntry->numOfRows;
58,881✔
226
  pOutput->numOfCols = pEntry->numOfCols;
58,881✔
227
  pOutput->compressed = pEntry->compressed;
58,881✔
228

229
  (void)atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
58,881✔
230
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
58,883✔
231

232
  taosMemoryFreeClear(pDeleter->nextOutput.pData);  // todo persistent
58,882✔
233
  pOutput->bufStatus = updateStatus(pDeleter);
58,882✔
234
  (void)taosThreadMutexLock(&pDeleter->mutex);
58,883✔
235
  pOutput->queryEnd = pDeleter->queryEnd;
58,883✔
236
  pOutput->useconds = pDeleter->useconds;
58,883✔
237
  pOutput->precision = pDeleter->pSchema->precision;
58,883✔
238
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
58,883✔
239

240
  return TSDB_CODE_SUCCESS;
58,883✔
241
}
242

243
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
58,898✔
244
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
58,898✔
245
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
58,898✔
246
  taosMemoryFreeClear(pDeleter->nextOutput.pData);
58,899!
247
  taosArrayDestroy(pDeleter->pParam->pUidList);
58,899✔
248
  taosMemoryFree(pDeleter->pParam);
58,899✔
249
  while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
58,898!
250
    SDataDeleterBuf* pBuf = NULL;
×
251
    taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
×
252

253
    if (pBuf != NULL) {
×
254
      taosMemoryFreeClear(pBuf->pData);
×
255
      taosFreeQitem(pBuf);
×
256
    }
257
  }
258
  taosCloseQueue(pDeleter->pDataBlocks);
58,897✔
259
  (void)taosThreadMutexDestroy(&pDeleter->mutex);
58,898✔
260

261
  taosMemoryFree(pDeleter->pManager);
58,898✔
262

263
  return TSDB_CODE_SUCCESS;
58,898✔
264
}
265

266
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
267
  SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
×
268

269
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
270
  return TSDB_CODE_SUCCESS;
×
271
}
272

273
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
58,891✔
274
                          void* pParam) {
275
  int32_t code = TSDB_CODE_SUCCESS;
58,891✔
276
  if (pParam == NULL) {
58,891!
277
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
278
    qError("invalid input param in creating data deleter, code%s", tstrerror(code));
×
279
    goto _end;
×
280
  }
281

282
  SDeleterParam* pDeleterParam = (SDeleterParam*)pParam;
58,891✔
283

284
  SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
58,891✔
285
  if (NULL == deleter) {
58,895!
286
    code = terrno;
×
287
    taosArrayDestroy(pDeleterParam->pUidList);
×
288
    taosMemoryFree(pParam);
×
289
    goto _end;
×
290
  }
291

292
  SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
58,895✔
293
  deleter->sink.fPut = putDataBlock;
58,895✔
294
  deleter->sink.fEndPut = endPut;
58,895✔
295
  deleter->sink.fGetLen = getDataLength;
58,895✔
296
  deleter->sink.fGetData = getDataBlock;
58,895✔
297
  deleter->sink.fDestroy = destroyDataSinker;
58,895✔
298
  deleter->sink.fGetCacheSize = getCacheSize;
58,895✔
299
  deleter->pManager = pManager;
58,895✔
300
  deleter->pDeleter = pDeleterNode;
58,895✔
301
  deleter->pSchema = pDataSink->pInputDataBlockDesc;
58,895✔
302

303
  deleter->pParam = pParam;
58,895✔
304
  deleter->status = DS_BUF_EMPTY;
58,895✔
305
  deleter->queryEnd = false;
58,895✔
306
  code = taosOpenQueue(&deleter->pDataBlocks);
58,895✔
307
  if (code) {
58,892!
308
    goto _end;
×
309
  }
310
  code = taosThreadMutexInit(&deleter->mutex, NULL);
58,892✔
311
  if (code) {
58,896!
312
    goto _end;
×
313
  }
314

315
  *pHandle = deleter;
58,896✔
316
  return code;
58,896✔
317

318
_end:
×
319

320
  if (deleter != NULL) {
×
321
    (void)destroyDataSinker((SDataSinkHandle*)deleter);
×
322
    taosMemoryFree(deleter);
×
323
  } else {
324
    taosMemoryFree(pManager);
×
325
  }
326
  
327
  return code;
×
328
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc