• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.91
/source/libs/executor/src/dataDeleter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDeleterBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDeleterBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t dataLen;
35
  int32_t numOfRows;
36
  int32_t numOfCols;
37
  int8_t  compressed;
38
  char    data[];
39
} SDataCacheEntry;
40

41
typedef struct SDataDeleterHandle {
42
  SDataSinkHandle     sink;
43
  SDataSinkManager*   pManager;
44
  SDataBlockDescNode* pSchema;
45
  SDataDeleterNode*   pDeleter;
46
  SDeleterParam*      pParam;
47
  STaosQueue*         pDataBlocks;
48
  SDataDeleterBuf     nextOutput;
49
  int32_t             status;
50
  bool                queryEnd;
51
  uint64_t            useconds;
52
  uint64_t            cachedSize;
53
  TdThreadMutex       mutex;
54
} SDataDeleterHandle;
55

56
static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
1,176✔
57
  int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
1,176!
58

59
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
1,176✔
60
  pEntry->compressed = 0;
1,176✔
61
  pEntry->numOfRows = pInput->pData->info.rows;
1,176✔
62
  pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
1,176✔
63
  pEntry->dataLen = sizeof(SDeleterRes);
1,176✔
64

65
  pBuf->useSize = sizeof(SDataCacheEntry);
1,176✔
66

67
  SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
1,176✔
68
  if (NULL == pColRes) {
1,176!
69
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
70
  }
71
  SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
1,176✔
72
  if (NULL == pColSKey) {
1,176!
73
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
74
  }
75
  SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
1,176✔
76
  if (NULL == pColEKey) {
1,176!
77
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
78
  }
79

80
  SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
1,176✔
81
  pRes->suid = pHandle->pParam->suid;
1,176✔
82
  pRes->uidList = pHandle->pParam->pUidList;
1,176✔
83
  TAOS_STRCPY(pRes->tableName, pHandle->pDeleter->tableFName);
1,176✔
84
  TAOS_STRCPY(pRes->tsColName, pHandle->pDeleter->tsColName);
1,176✔
85
  pRes->affectedRows = *(int64_t*)pColRes->pData;
1,176✔
86

87
  if (pRes->affectedRows) {
1,176✔
88
    pRes->skey = *(int64_t*)pColSKey->pData;
519✔
89
    pRes->ekey = *(int64_t*)pColEKey->pData;
519✔
90
    if (pRes->skey > pRes->ekey) {
519!
91
      qError("data delter skey:%" PRId64 " is bigger than ekey:%" PRId64, pRes->skey, pRes->ekey);
×
92
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
93
    }
94
  } else {
95
    pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
657✔
96
    pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
657✔
97
  }
98

99
  qDebug("delete %" PRId64 " rows, from %" PRId64 " to %" PRId64 "", pRes->affectedRows, pRes->skey, pRes->ekey);
1,176✔
100

101
  pBuf->useSize += pEntry->dataLen;
1,176✔
102

103
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
1,176✔
104
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
1,176✔
105

106
  return TSDB_CODE_SUCCESS;
1,176✔
107
}
108

109
static int32_t allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
1,176✔
110
  uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
1,176✔
111
  if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
1,176!
112
    qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
×
113
           taosQueueItemSize(pDeleter->pDataBlocks));
114
    return TSDB_CODE_OUT_OF_MEMORY;
×
115
  }
116

117
  pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
1,176✔
118

119
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
1,176✔
120
  if (pBuf->pData == NULL) {
1,176!
UNCOV
121
    qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
×
UNCOV
122
    return terrno;
×
123
  }
124

125
  return TSDB_CODE_SUCCESS;
1,176✔
126
}
127

128
static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
2,352✔
129
  (void)taosThreadMutexLock(&pDeleter->mutex);
2,352✔
130
  int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
2,352✔
131
  int32_t status =
2,352✔
132
      (0 == blockNums ? DS_BUF_EMPTY
133
                      : (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
2,352!
134
  pDeleter->status = status;
2,352✔
135
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
2,352✔
136
  
137
  return status;
2,352✔
138
}
139

140
static int32_t getStatus(SDataDeleterHandle* pDeleter) {
×
141
  (void)taosThreadMutexLock(&pDeleter->mutex);
×
142
  int32_t status = pDeleter->status;
×
143
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
×
144
  
145
  return status;
×
146
}
147

148
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
1,176✔
149
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
1,176✔
150
  SDataDeleterBuf*    pBuf = NULL;
1,176✔
151

152
  int32_t code = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0, (void**)&pBuf);
1,176✔
153
  if (code) {
1,176!
154
    return code;
×
155
  }
156

157
  code = allocBuf(pDeleter, pInput, pBuf);
1,176✔
158
  if (code) {
1,176!
159
    taosFreeQitem(pBuf);
×
160
    return code;
×
161
  }
162

163
  QRY_ERR_JRET(toDataCacheEntry(pDeleter, pInput, pBuf));
1,176!
164
  QRY_ERR_JRET(taosWriteQitem(pDeleter->pDataBlocks, pBuf));
1,176!
165
  *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
1,176✔
166

167
  return TSDB_CODE_SUCCESS;
1,176✔
168

169
_return:
×
170

171
  taosFreeQitem(pBuf);
×
172
  
173
  return code;
×
174
}
175

176
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
1,176✔
177
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
1,176✔
178
  (void)taosThreadMutexLock(&pDeleter->mutex);
1,176✔
179
  pDeleter->queryEnd = true;
1,176✔
180
  pDeleter->useconds = useconds;
1,176✔
181
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
1,176✔
182
}
1,176✔
183

184
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
1,176✔
185
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
1,176✔
186
  if (taosQueueEmpty(pDeleter->pDataBlocks)) {
1,176!
UNCOV
187
    *pQueryEnd = pDeleter->queryEnd;
×
UNCOV
188
    *pLen = 0;
×
UNCOV
189
    return;
×
190
  }
191

192
  SDataDeleterBuf* pBuf = NULL;
1,176✔
193
  taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
1,176✔
194
  if (pBuf != NULL) {
1,176!
195
    TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
1,176✔
196
    taosFreeQitem(pBuf);
1,176✔
197
  }
198

199
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
1,176✔
200
  *pLen = pEntry->dataLen;
1,176✔
201
  *pRawLen = pEntry->dataLen;
1,176✔
202

203
  *pQueryEnd = pDeleter->queryEnd;
1,176✔
204
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
1,176✔
205
         ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
206
}
207

208
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
1,176✔
209
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
1,176✔
210
  if (NULL == pDeleter->nextOutput.pData) {
1,176!
211
    if (!pDeleter->queryEnd) {
×
212
      qError("empty res while query not end in data deleter");
×
213
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
214
    }
215
    pOutput->useconds = pDeleter->useconds;
×
216
    pOutput->precision = pDeleter->pSchema->precision;
×
217
    pOutput->bufStatus = DS_BUF_EMPTY;
×
218
    pOutput->queryEnd = pDeleter->queryEnd;
×
219
    return TSDB_CODE_SUCCESS;
×
220
  }
221

222
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
1,176✔
223
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
1,176✔
224
  pDeleter->pParam->pUidList = NULL;
1,176✔
225
  pOutput->numOfRows = pEntry->numOfRows;
1,176✔
226
  pOutput->numOfCols = pEntry->numOfCols;
1,176✔
227
  pOutput->compressed = pEntry->compressed;
1,176✔
228

229
  (void)atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
1,176✔
230
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
1,176✔
231

232
  taosMemoryFreeClear(pDeleter->nextOutput.pData);  // todo persistent
1,176!
233
  pOutput->bufStatus = updateStatus(pDeleter);
1,176✔
234
  (void)taosThreadMutexLock(&pDeleter->mutex);
1,176✔
235
  pOutput->queryEnd = pDeleter->queryEnd;
1,176✔
236
  pOutput->useconds = pDeleter->useconds;
1,176✔
237
  pOutput->precision = pDeleter->pSchema->precision;
1,176✔
238
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
1,176✔
239

240
  return TSDB_CODE_SUCCESS;
1,176✔
241
}
242

243
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
1,184✔
244
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
1,184✔
245
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
1,184✔
246
  taosMemoryFreeClear(pDeleter->nextOutput.pData);
1,184!
247
  taosArrayDestroy(pDeleter->pParam->pUidList);
1,184✔
248
  taosMemoryFree(pDeleter->pParam);
1,184✔
249
  while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
1,184!
250
    SDataDeleterBuf* pBuf = NULL;
×
251
    taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
×
252

253
    if (pBuf != NULL) {
×
254
      taosMemoryFreeClear(pBuf->pData);
×
255
      taosFreeQitem(pBuf);
×
256
    }
257
  }
258
  taosCloseQueue(pDeleter->pDataBlocks);
1,184✔
259
  (void)taosThreadMutexDestroy(&pDeleter->mutex);
1,184✔
260

261
  taosMemoryFree(pDeleter->pManager);
1,184✔
262

263
  return TSDB_CODE_SUCCESS;
1,184✔
264
}
265

266
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
267
  SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
×
268

269
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
270
  return TSDB_CODE_SUCCESS;
×
271
}
272

273
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
1,184✔
274
                          void* pParam) {
275
  int32_t code = TSDB_CODE_SUCCESS;
1,184✔
276
  if (pParam == NULL) {
1,184!
277
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
278
    qError("invalid input param in creating data deleter, code%s", tstrerror(code));
×
279
    goto _end;
×
280
  }
281

282
  SDeleterParam* pDeleterParam = (SDeleterParam*)pParam;
1,184✔
283

284
  SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
1,184✔
285
  if (NULL == deleter) {
1,184!
286
    code = terrno;
×
287
    taosArrayDestroy(pDeleterParam->pUidList);
×
288
    taosMemoryFree(pParam);
×
289
    goto _end;
×
290
  }
291

292
  SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
1,184✔
293
  deleter->sink.fPut = putDataBlock;
1,184✔
294
  deleter->sink.fEndPut = endPut;
1,184✔
295
  deleter->sink.fGetLen = getDataLength;
1,184✔
296
  deleter->sink.fGetData = getDataBlock;
1,184✔
297
  deleter->sink.fDestroy = destroyDataSinker;
1,184✔
298
  deleter->sink.fGetCacheSize = getCacheSize;
1,184✔
299
  deleter->pManager = pManager;
1,184✔
300
  deleter->pDeleter = pDeleterNode;
1,184✔
301
  deleter->pSchema = pDataSink->pInputDataBlockDesc;
1,184✔
302

303
  deleter->pParam = pParam;
1,184✔
304
  deleter->status = DS_BUF_EMPTY;
1,184✔
305
  deleter->queryEnd = false;
1,184✔
306
  code = taosOpenQueue(&deleter->pDataBlocks);
1,184✔
307
  if (code) {
1,184!
308
    goto _end;
×
309
  }
310
  code = taosThreadMutexInit(&deleter->mutex, NULL);
1,184✔
311
  if (code) {
1,184!
312
    goto _end;
×
313
  }
314

315
  *pHandle = deleter;
1,184✔
316
  return code;
1,184✔
317

318
_end:
×
319

320
  if (deleter != NULL) {
×
321
    (void)destroyDataSinker((SDataSinkHandle*)deleter);
×
322
    taosMemoryFree(deleter);
×
323
  } else {
324
    taosMemoryFree(pManager);
×
325
  }
326
  
327
  return code;
×
328
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc