• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4761

28 Sep 2025 10:49AM UTC coverage: 57.837% (-1.0%) from 58.866%
#4761

push

travis-ci

web-flow
merge: set version (#33122)

136913 of 302095 branches covered (45.32%)

Branch coverage included in aggregate %.

207750 of 293830 relevant lines covered (70.7%)

5673932.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/imputationoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "osMemPool.h"
21
#include "osMemory.h"
22
#include "querytask.h"
23
#include "tanalytics.h"
24
#include "taoserror.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdef.h"
28
#include "thash.h"
29
#include "tjson.h"
30
#include "tmsg.h"
31

32
#ifdef USE_ANALYTICS
33

34
#define FREQ_STR "freq"
35

36
typedef struct {
37
  SArray*      blocks;  // SSDataBlock*
38
  uint64_t     groupId;
39
  int32_t      numOfRows;
40
  int32_t      numOfBlocks;
41
  int64_t      timeout;
42
  int8_t       wncheck;
43
  int32_t      targetSlot;
44
  int32_t      targetType;
45
  int32_t      tsSlot;
46
  int32_t      tsPrecision;
47
  int32_t      numOfCols;
48
  char         freq[64];         // frequency of data
49
  SAnalyticBuf analyBuf;
50
  STimeWindow  win;
51
} SImputationSupp;
52

53
typedef struct {
54
  SOptrBasicInfo  binfo;
55
  SExprSupp       scalarSup;
56
  char            algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
57
  char            algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
58
  char*           options;
59
  SColumn         targetCol;
60
  int32_t         resTsSlot;
61
  int32_t         resTargetSlot;
62
  int32_t         resMarkSlot;
63
  SImputationSupp imputatSup;
64
} SImputationOperatorInfo;
65

66
static void    imputatDestroyOperatorInfo(void* param);
67
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
68
static int32_t doImputation(SImputationOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
69
static int32_t doCacheBlock(SImputationSupp* pSupp, SSDataBlock* pBlock, const char* id);
70
static int32_t doParseInput(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id);
71
static int32_t doSetResSlot(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, SExprSupp* pExprSup);
72
static int32_t doParseOption(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, const char* id);
73
static int32_t doCreateBuf(SImputationSupp* pSupp, const char* pId);
74
static int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, SImputationSupp* pSupp, const char* id);
75
static void    doInitOptions(SImputationSupp* pSupp);
76

77
int32_t createImputationOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
×
78
                                     SOperatorInfo** pOptrInfo) {
79
  QRY_PARAM_CHECK(pOptrInfo);
×
80

81
  int32_t                   code = TSDB_CODE_SUCCESS;
×
82
  int32_t                   lino = 0;
×
83
  size_t                    keyBufSize = 0;
×
84
  int32_t                   num = 0;
×
85
  SExprInfo*                pExprInfo = NULL;
×
86
  int32_t                   numOfExprs = 0;
×
87
  const char*               id = GET_TASKID(pTaskInfo);
×
88
  SHashObj*                 pHashMap = NULL;
×
89
  SImputationFuncPhysiNode* pImputatNode = (SImputationFuncPhysiNode*)physiNode;
×
90
  SExprSupp*                pExprSup = NULL;
×
91
  SImputationSupp*          pSupp = NULL;
×
92

93
  SImputationOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SImputationOperatorInfo));
×
94
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
95
  if (pOperator == NULL || pInfo == NULL) {
×
96
    code = terrno;
×
97
    goto _error;
×
98
  }
99

100
  pSupp = &pInfo->imputatSup;
×
101
  pImputatNode = (SImputationFuncPhysiNode*)physiNode;
×
102
  pExprSup = &pOperator->exprSupp;
×
103

104
  code = createExprInfo(pImputatNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
105
  QUERY_CHECK_CODE(code, lino, _error);
×
106

107
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
108
  QUERY_CHECK_CODE(code, lino, _error);
×
109

110
  if (pImputatNode->pExprs != NULL) {
×
111
    SExprInfo* pScalarExprInfo = NULL;
×
112
    code = createExprInfo(pImputatNode->pExprs, NULL, &pScalarExprInfo, &num);
×
113
    QUERY_CHECK_CODE(code, lino, _error);
×
114

115
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
116
    QUERY_CHECK_CODE(code, lino, _error);
×
117
  }
118

119
  code = filterInitFromNode((SNode*)pImputatNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0, pTaskInfo->pStreamRuntimeInfo);
×
120
  QUERY_CHECK_CODE(code, lino, _error);
×
121

122
  doInitOptions(pSupp);
×
123

124
  code = doParseInput(pInfo, pSupp, pImputatNode->pFuncs, id);
×
125
  QUERY_CHECK_CODE(code, lino, _error);
×
126

127
  code = doSetResSlot(pInfo, pSupp, pExprSup);
×
128
  QUERY_CHECK_CODE(code, lino, _error);
×
129

130
  code = doParseOption(pInfo, pSupp, id);
×
131
  QUERY_CHECK_CODE(code, lino, _error);
×
132

133
  code = doCreateBuf(pSupp, id);
×
134
  QUERY_CHECK_CODE(code, lino, _error);
×
135

136
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
137

138
  pInfo->binfo.pRes = createDataBlockFromDescNode(physiNode->pOutputDataBlockDesc);
×
139
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
×
140

141
  setOperatorInfo(pOperator, "ImputationOperator", QUERY_NODE_PHYSICAL_PLAN_IMPUTATION_FUNC, false, OP_NOT_OPENED,
×
142
                  pInfo, pTaskInfo);
143
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, imputationNext, NULL, imputatDestroyOperatorInfo,
×
144
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
145

146
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
147
  QUERY_CHECK_CODE(code, lino, _error);
×
148

149
  code = appendDownstream(pOperator, &downstream, 1);
×
150
  QUERY_CHECK_CODE(code, lino, _error);
×
151

152
  *pOptrInfo = pOperator;
×
153

154
  qDebug("%s forecast env is initialized, option:%s", id, pInfo->options);
×
155
  return TSDB_CODE_SUCCESS;
×
156

157
_error:
×
158
  if (code != TSDB_CODE_SUCCESS) {
×
159
    qError("%s %s failed at line %d since %s", id, __func__, lino, tstrerror(code));
×
160
  }
161

162
  if (pInfo != NULL) imputatDestroyOperatorInfo(pInfo);
×
163
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
164
  pTaskInfo->code = code;
×
165
  return code;
×
166
}
167

168
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
169
  int32_t                  code = TSDB_CODE_SUCCESS;
×
170
  int32_t                  lino = 0;
×
171
  SImputationOperatorInfo* pInfo = pOperator->info;
×
172
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
×
173
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
×
174
  SImputationSupp*         pSupp = &pInfo->imputatSup;
×
175
  SSDataBlock*             pRes = pInfo->binfo.pRes;
×
176
  int64_t                  st = taosGetTimestampUs();
×
177
  int32_t                  numOfBlocks = taosArrayGetSize(pSupp->blocks);
×
178
  const char*              idstr = GET_TASKID(pTaskInfo);
×
179

180
  blockDataCleanup(pRes);
×
181

182
  while (1) {
×
183
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
184
    if (pBlock == NULL) {
×
185
      break;
×
186
    }
187

188
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
189
      pSupp->groupId = pBlock->info.id.groupId;
×
190
      numOfBlocks++;
×
191
      code = doCacheBlock(pSupp, pBlock, idstr);
×
192

193
      qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%d", pSupp->groupId, numOfBlocks,
×
194
             pBlock->info.rows, pSupp->numOfRows);
195
      QUERY_CHECK_CODE(code, lino, _end);
×
196
    } else {
197
      qDebug("group:%" PRId64 ", read completed for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
×
198
      code = doImputation(pInfo, pTaskInfo);
×
199
      QUERY_CHECK_CODE(code, lino, _end);
×
200

201
      pSupp->groupId = pBlock->info.id.groupId;
×
202
      numOfBlocks = 1;
×
203
      qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%d", pSupp->groupId, pBlock->info.rows,
×
204
             pSupp->numOfRows);
205
      code = doCacheBlock(pSupp, pBlock, idstr);
×
206
      QUERY_CHECK_CODE(code, lino, _end);
×
207
    }
208

209
    if (pRes->info.rows > 0) {
×
210
      (*ppRes) = pRes;
×
211
      qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pRes->info.id.groupId, numOfBlocks);
×
212
      return code;
×
213
    }
214
  }
215

216
  if (numOfBlocks > 0) {
×
217
    qDebug("group:%" PRId64 ", read finish, blocks:%d", pInfo->imputatSup.groupId, numOfBlocks);
×
218
    code = doImputation(pInfo, pTaskInfo);
×
219
  }
220

221
  int64_t cost = taosGetTimestampUs() - st;
×
222
  qDebug("%s all groups finished, cost:%" PRId64 "us", idstr, cost);
×
223

224
_end:
×
225
  if (code != TSDB_CODE_SUCCESS) {
×
226
    qError("%s %s failed at line %d since %s", idstr, __func__, lino, tstrerror(code));
×
227
    pTaskInfo->code = code;
×
228
    T_LONG_JMP(pTaskInfo->env, code);
×
229
  }
230

231
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
×
232
  return code;
×
233
}
234

235
static void imputatDestroyOperatorInfo(void* param) {
×
236
  SImputationOperatorInfo* pInfo = (SImputationOperatorInfo*)param;
×
237
  if (pInfo == NULL) return;
×
238

239
  cleanupBasicInfo(&pInfo->binfo);
×
240
  cleanupExprSupp(&pInfo->scalarSup);
×
241

242
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->imputatSup.blocks); ++i) {
×
243
    SSDataBlock* pBlock = taosArrayGetP(pInfo->imputatSup.blocks, i);
×
244
    blockDataDestroy(pBlock);
×
245
  }
246

247
  taosArrayDestroy(pInfo->imputatSup.blocks);
×
248
  taosMemoryFreeClear(param);
×
249
}
250

251
static int32_t doCacheBlock(SImputationSupp* pSupp, SSDataBlock* pBlock, const char* id) {
×
252
  int32_t       code = TSDB_CODE_SUCCESS;
×
253
  int32_t       lino = 0;
×
254
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
255

256
  if (pSupp->numOfRows > ANALY_IMPUTATION_INPUT_MAX_ROWS) {
×
257
    qError("%s too many rows for imputation, maximum allowed:%d, input:%d", id, ANALY_IMPUTATION_INPUT_MAX_ROWS,
×
258
           pSupp->numOfRows);
259
    return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
260
  }
261

262
  pSupp->numOfBlocks++;
×
263
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
×
264

265
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
266
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot);
×
267
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->tsSlot);
×
268
    if (pTsCol == NULL || pValCol == NULL) {
×
269
      break;
270
    }
271

272
    int32_t index = 0;
×
273

274
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
×
275
    char*   val = colDataGetData(pValCol, j);
×
276
    int16_t valType = pValCol->info.type;
×
277

278
    pSupp->win.skey = MIN(pSupp->win.skey, ts);
×
279
    pSupp->win.ekey = MAX(pSupp->win.ekey, ts);
×
280

281
    pSupp->numOfRows++;
×
282

283
    // write the primary time stamp column data
284
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
×
285
    if (TSDB_CODE_SUCCESS != code) {
×
286
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
287
      return code;
×
288
    }
289

290
    // write the main column for imputation
291
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
×
292
    if (TSDB_CODE_SUCCESS != code) {
×
293
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
294
      return code;
×
295
    }
296
  }
297

298
  return 0;
×
299
}
300

301
static int32_t finishBuildRequest(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, const char* id) {
×
302
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
303
  int32_t       code = 0;
×
304

305
  for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
306
    code = taosAnalyBufWriteColEnd(pBuf, i);
×
307
    if (code != 0) return code;
×
308
  }
309

310
  code = taosAnalyBufWriteDataEnd(pBuf);
×
311
  if (code != 0) return code;
×
312

313
  code = taosAnalyBufWriteOptStr(pBuf, "option", pInfo->options);
×
314
  if (code != 0) return code;
×
315

316
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pInfo->algoName);
×
317
  if (code != 0) return code;
×
318

319
  code = taosAnalyBufWriteOptStr(pBuf, "freq", pSupp->freq);
×
320
  if (code != 0) return code;
×
321

322
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
323
  if (pSupp->tsPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
×
324
  if (pSupp->tsPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
×
325
  if (pSupp->tsPrecision == TSDB_TIME_PRECISION_SECONDS) prec = "s";
×
326

327
  code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
×
328
  if (code != 0) return code;
×
329

330
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
×
331
  if (code != 0) return code;
×
332

333
  if (pSupp->numOfRows < ANALY_IMPUTATION_INPUT_MIN_ROWS) {
×
334
    qError("%s history rows for forecasting not enough, min required:%d, current:%d", id, ANALY_FORECAST_MIN_ROWS,
×
335
           pSupp->numOfRows);
336
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
337
  }
338

339
  code = estResultRowsAfterImputation(pSupp->numOfRows, pSupp->win.skey, pSupp->win.ekey, pSupp, id);
×
340
  if (code != 0) {
×
341
    return code;
×
342
  }
343

344
  code = taosAnalyBufClose(pBuf);
×
345
  return code;
×
346
}
347

348
static int32_t doImputationImpl(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
×
349
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
350
  int32_t       resCurRow = pBlock->info.rows;
×
351
  int64_t       tmpI64 = 0;
×
352
  int32_t       tmpI32 = 0;
×
353
  float         tmpFloat = 0;
×
354
  double        tmpDouble = 0;
×
355
  int32_t       code = 0;
×
356

357
  SColumnInfoData* pResTargetCol = taosArrayGet(pBlock->pDataBlock, pInfo->resTargetSlot);
×
358
  if (NULL == pResTargetCol) {
×
359
    return terrno;
×
360
  }
361

362
  SJson* pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout, pId);
×
363
  if (pJson == NULL) {
×
364
    return terrno;
×
365
  }
366

367
  int32_t rows = 0;
×
368
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
369
  if (rows < 0 && code == 0) {
×
370
    char pMsg[1024] = {0};
×
371
    code = tjsonGetStringValue(pJson, "msg", pMsg);
×
372
    if (code != 0) {
×
373
      qError("%s failed to get msg from rsp, unknown error", pId);
×
374
    } else {
375
      qError("%s failed to exec forecast, msg:%s", pId, pMsg);
×
376
    }
377

378
    tjsonDelete(pJson);
×
379
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
380
  }
381

382
  if (code < 0) {
×
383
    goto _OVER;
×
384
  }
385

386
  SJson* pTarget = tjsonGetObjectItem(pJson, "target");
×
387
  if (pTarget == NULL) goto _OVER;
×
388

389
  SJson* pTsList = tjsonGetObjectItem(pJson, "ts");
×
390
  if (pTsList == NULL) goto _OVER;
×
391

392
  SJson* pMask = tjsonGetObjectItem(pJson, "mask");
×
393
  if (pMask == NULL) goto _OVER;
×
394

395
  int32_t listLen = tjsonGetArraySize(pTarget);
×
396
  if (listLen != rows) {
×
397
    goto _OVER;
×
398
  }
399

400
  if (pInfo->resTsSlot != -1) {
×
401
    SColumnInfoData* pResTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->resTsSlot);
×
402
    if (pResTsCol != NULL) {
×
403
      for (int32_t i = 0; i < rows; ++i) {
×
404
        SJson* tsJson = tjsonGetArrayItem(pTsList, i);
×
405
        tjsonGetObjectValueBigInt(tsJson, &tmpI64);
×
406
        colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
×
407
        resCurRow++;
×
408
      }
409
    }
410
  }
411

412
  resCurRow = pBlock->info.rows;
×
413
  if (pResTargetCol->info.type == TSDB_DATA_TYPE_DOUBLE) {
×
414
    for (int32_t i = 0; i < rows; ++i) {
×
415
      SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
416
      tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
417
      colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
418
      resCurRow++;
×
419
    }
420
  } else if (pResTargetCol->info.type == TSDB_DATA_TYPE_INT) {
×
421
    for (int32_t i = 0; i < rows; ++i) {
×
422
      SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
423
      tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
424
      int32_t t = tmpDouble;
×
425
      colDataSetInt32(pResTargetCol, resCurRow, &t);
×
426
      resCurRow++;
×
427
    }
428
  }  else if (pResTargetCol->info.type == TSDB_DATA_TYPE_FLOAT) {
×
429
    for (int32_t i = 0; i < rows; ++i) {
×
430
      SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
431
      tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
432
      float t = tmpDouble;
×
433
      colDataSetFloat(pResTargetCol, resCurRow, &t);
×
434
      resCurRow++;
×
435
    }
436
  }
437

438
  if (pInfo->resMarkSlot != -1) {
×
439
    SColumnInfoData* pResMaskCol = taosArrayGet(pBlock->pDataBlock, pInfo->resMarkSlot);
×
440
    if (pResMaskCol != NULL) {
×
441
      resCurRow = pBlock->info.rows;
×
442
      for (int32_t i = 0; i < rows; ++i) {
×
443
        SJson* maskJson = tjsonGetArrayItem(pMask, i);
×
444
        tjsonGetObjectValueBigInt(maskJson, &tmpI64);
×
445
        int32_t v = tmpI64;
×
446
        colDataSetInt32(pResMaskCol, resCurRow, &v);
×
447
        resCurRow++;
×
448
      }
449
    }
450
  }
451

452
  pBlock->info.rows += rows;
×
453

454
  if (pJson != NULL) tjsonDelete(pJson);
×
455
  return 0;
×
456

457
_OVER:
×
458
  tjsonDelete(pJson);
×
459
  if (code == 0) {
×
460
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
461
  }
462

463
  qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
×
464
  return code;
×
465
}
466

467
static int32_t doImputation(SImputationOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
468
  int32_t                  code = TSDB_CODE_SUCCESS;
×
469
  int32_t                  lino = 0;
×
470
  char*                    id = GET_TASKID(pTaskInfo);
×
471
  SSDataBlock*             pRes = pInfo->binfo.pRes;
×
472
  SImputationSupp*         pSupp = &pInfo->imputatSup;
×
473

474
  if (pSupp->numOfRows <= 0) goto _OVER;
×
475

476
  qDebug("%s group:%" PRId64 ", do imputation, rows:%d", id, pSupp->groupId, pSupp->numOfRows);
×
477
  pRes->info.id.groupId = pSupp->groupId;
×
478

479
  code = finishBuildRequest(pInfo, pSupp, id);
×
480
  QUERY_CHECK_CODE(code, lino, _end);
×
481

482
  //   if (pBlock->info.rows < pBlock->info.capacity) {
483
  //   return TSDB_CODE_SUCCESS;
484
  // }
485

486
  // code = blockDataEnsureCapacity(pRes, newRowsNum);
487
  // if (code != TSDB_CODE_SUCCESS) {
488
  //   qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
489
  //   return code;
490
  // }
491

492
  // QUERY_CHECK_CODE(code, lino, _end);
493

494
  code = doImputationImpl(pInfo, pSupp, pRes, id);
×
495
  QUERY_CHECK_CODE(code, lino, _end);
×
496

497
  uInfo("%s block:%d, imputation finalize", id, pSupp->numOfBlocks);
×
498

499
_end:
×
500
  pSupp->numOfBlocks = 0;
×
501
  taosAnalyBufDestroy(&pSupp->analyBuf);
×
502
  return code;
×
503

504
_OVER:
×
505
  taosArrayClear(pSupp->blocks);
×
506
  pSupp->numOfRows = 0;
×
507
  return code;
×
508
}
509

510
static int32_t doParseInput(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
511
  int32_t code = 0;
×
512
  SNode*  pNode = NULL;
×
513

514
  FOREACH(pNode, pFuncs) {
×
515
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
516
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
517
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
518

519
      if (pFunc->funcType == FUNCTION_TYPE_IMPUTATION) {
×
520
        // code = validInputParams(pFunc, id);
521
        // if (code) {
522
          // return code;
523
        // }
524

525
        pSupp->numOfCols = 2;
×
526

527
        if (numOfParam == 2) {
×
528
          // column, ts
529
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
530
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
531

532
          pSupp->tsSlot = pTsNode->slotId;
×
533
          pSupp->tsPrecision = pTsNode->node.resType.precision;
×
534
          pSupp->targetSlot = pTargetNode->slotId;
×
535
          pSupp->targetType = pTargetNode->node.resType.type;
×
536

537
          // let's set the moment as the default imputation algorithm
538
          pInfo->options = taosStrdup("algo=moment");
×
539
        } else {
540
          // column, options, ts
541
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
542
          if (nodeType(pTargetNode) != QUERY_NODE_COLUMN) {
×
543
            // return error
544
          }
545

546
          bool assignTs = false;
×
547
          bool assignOpt = false;
×
548

549
          pSupp->targetSlot = pTargetNode->slotId;
×
550
          pSupp->targetType = pTargetNode->node.resType.type;
×
551

552
          for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
×
553
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
×
554
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
555
              SColumnNode* pColNode = (SColumnNode*)pNode;
×
556
              if (pColNode->isPrimTs && (!assignTs)) {
×
557
                pSupp->tsSlot = pColNode->slotId;
×
558
                pSupp->tsPrecision = pColNode->node.resType.precision;
×
559
                assignTs = true;
×
560
                continue;
×
561
              }
562
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
563
              if (!assignOpt) {
×
564
                SValueNode* pOptNode = (SValueNode*)pNode;
×
565
                pInfo->options = taosStrdup(pOptNode->literal);
×
566
                assignOpt = true;
×
567
                continue;
×
568
              }
569
            }
570
          }
571

572
          if (!assignOpt) {
×
573
            qError("%s option is missing, failed to do imputation", id);
×
574
            code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
575
          }
576
        }
577
      }
578
    }
579
  }
580

581
  if (pInfo->options == NULL) {
×
582
    qError("%s option is missing or clone option string failed, failed to do imputation", id);
×
583
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
584
  }
585

586
  return code;
×
587
}
588

589
static int32_t doSetResSlot(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, SExprSupp* pExprSup) {
×
590
  pInfo->resTsSlot = -1;
×
591
  pInfo->resTargetSlot = -1;
×
592
  pInfo->resMarkSlot = -1;
×
593

594
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
×
595
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
×
596
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
×
597
    int32_t    functionType = pExprInfo->pExpr->_function.functionType;
×
598
    if (functionType == FUNCTION_TYPE_IMPUTATION) {
×
599
      pInfo->resTargetSlot = dstSlot;
×
600
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_ROWTS) {
×
601
      pInfo->resTsSlot = dstSlot;
×
602
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_MARK) {
×
603
      pInfo->resMarkSlot = dstSlot;
×
604
    }
605
  }
606

607
  return 0;
×
608
}
609

610
void doInitOptions(SImputationSupp* pSupp) {
×
611
  pSupp->numOfRows = 0;
×
612
  pSupp->wncheck = ANALY_DEFAULT_WNCHECK;
×
613

614
  pSupp->freq[0] = 'S';  // d(day), h(hour), m(minute),s(second), ms(millisecond), us(microsecond)
×
615
  pSupp->freq[1] = '\0';
×
616

617
  pSupp->tsSlot = -1;
×
618
  pSupp->targetSlot = -1;
×
619
  pSupp->targetType = -1;
×
620
  pSupp->tsPrecision = -1;
×
621

622
  pSupp->win.skey = INT64_MAX;
×
623
  pSupp->win.ekey = INT64_MIN;
×
624
}
×
625

626
int32_t parseFreq(SImputationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
627
  int32_t     code = 0;
×
628
  char*       p = NULL;
×
629
  int32_t     len = 0;
×
630

631
  char* pFreq = taosHashGet(pHashMap, FREQ_STR, strlen(FREQ_STR));
×
632
  if (pFreq != NULL) {
×
633
    len = taosHashGetValueSize(pFreq);
×
634
    p = taosStrndupi(pSupp->freq, len);
×
635
    if (p == NULL) {
×
636
      qError("%s failed to clone the freq param, code:%s", id, strerror(terrno));
×
637
      return terrno;
×
638
    }
639

640
    if (len >= tListLen(pSupp->freq)) {
×
641
      qError("%s invalid freq parameter: %s", id, p);
×
642
      code = TSDB_CODE_INVALID_PARA;
×
643
    } else {
644
      if ((len == 1) && (strncmp(pFreq, "d", 1) != 0) && (strncmp(pFreq, "h", 1) != 0) &&
×
645
          (strncmp(pFreq, "m", 1) != 0) && (strncmp(pFreq, "s", 1) != 0)) {
×
646
        code = TSDB_CODE_INVALID_PARA;
×
647
        qError("%s invalid freq parameter: %s", id, p);
×
648
      } else if ((len == 2) && (strncmp(pFreq, "ms", 2) != 0) && (strncmp(pFreq, "us", 2) != 0)) {
×
649
        code = TSDB_CODE_INVALID_PARA;
×
650
        qError("%s invalid freq parameter: %s", id, p);
×
651
      } else if (len > 2) {
×
652
        code = TSDB_CODE_INVALID_PARA;
×
653
        qError("%s invalid freq parameter: %s", id, p);
×
654
      }
655
    }
656

657
    if (code == TSDB_CODE_SUCCESS) {
×
658
      tstrncpy(pSupp->freq, pFreq, len + 1);
×
659
      qDebug("%s data freq:%s", id, pSupp->freq);
×
660
    }
661
  } else {
662
    qDebug("%s not specify data freq, default: %s", id, pSupp->freq);
×
663
  }
664
  
665
  taosMemoryFreeClear(p);
×
666
  return code;
×
667
}
668

669
int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, SImputationSupp* pSupp, const char* id) {
×
670
  int64_t range = ekey - skey;
×
671
  double  factor = 0;
×
672
  if (pSupp->tsPrecision == TSDB_TIME_PRECISION_MILLI) {
×
673
    if (strcmp(pSupp->freq, "h") == 0) {
×
674
      factor = 0.001 * 1/3600;
×
675
    } else if (strcmp(pSupp->freq, "m") == 0) {
×
676
      factor = 0.001 * 1/60;
×
677
    } else if (strcmp(pSupp->freq, "s") == 0) {
×
678
      factor = 0.001;
×
679
    } else if (strcmp(pSupp->freq, "ms") == 0) {
×
680
      factor = 1;
×
681
    } else if (strcmp(pSupp->freq, "us") == 0) {
×
682
      factor *= 1000;
×
683
    } else if (strcmp(pSupp->freq, "ns") == 0) {
×
684
      factor *= 1000000;
×
685
    }
686

687
    int64_t num = range * factor - rows;
×
688
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
689
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
690
      return TSDB_CODE_INVALID_PARA;
×
691
    }
692
  } else if (pSupp->tsPrecision == TSDB_TIME_PRECISION_MICRO) {
×
693
    if (strcmp(pSupp->freq, "h") == 0) {
×
694
      factor = 0.000001 * 1/3600;
×
695
    } else if (strcmp(pSupp->freq, "m") == 0) {
×
696
      factor = 0.000001 * 1/60;
×
697
    } else if (strcmp(pSupp->freq, "s") == 0) {
×
698
      factor = 0.000001;
×
699
    } else if (strcmp(pSupp->freq, "ms") == 0) {
×
700
      factor = 1000;
×
701
    } else if (strcmp(pSupp->freq, "us") == 0) {
×
702
      factor *= 1;
×
703
    } else if (strcmp(pSupp->freq, "ns") == 0) {
×
704
      factor *= 1000;
×
705
    }
706

707
    int64_t num = range * factor - rows;
×
708
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
709
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
710
      return TSDB_CODE_INVALID_PARA;
×
711
    }
712
  } else if (pSupp->tsPrecision == TSDB_TIME_PRECISION_NANO) {
×
713
    if (strcmp(pSupp->freq, "h") == 0) {
×
714
      factor = 0.000000001 * 1/3600;
×
715
    } else if (strcmp(pSupp->freq, "m") == 0) {
×
716
      factor = 0.000000001 * 1/60;
×
717
    } else if (strcmp(pSupp->freq, "s") == 0) {
×
718
      factor = 0.000000001;
×
719
    } else if (strcmp(pSupp->freq, "ms") == 0) {
×
720
      factor = 1000000;
×
721
    } else if (strcmp(pSupp->freq, "us") == 0) {
×
722
      factor *= 1000;
×
723
    } else if (strcmp(pSupp->freq, "ns") == 0) {
×
724
      factor *= 1;
×
725
    }
726

727
    int64_t num = range * factor - rows;
×
728
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
729
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
730
      return TSDB_CODE_INVALID_PARA;
×
731
    }
732
  }
733

734
  return TSDB_CODE_SUCCESS;
×
735
}
736

737
int32_t doParseOption(SImputationOperatorInfo* pInfo, SImputationSupp* pSupp, const char* id) {
×
738
  int32_t   code = 0;
×
739
  int32_t   lino = 0;
×
740
  SHashObj* pHashMap = NULL;
×
741

742
  code = taosAnalyGetOpts(pInfo->options, &pHashMap);
×
743
  if (code != TSDB_CODE_SUCCESS) {
×
744
    return code;
×
745
  }
746

747
  code = taosAnalysisParseAlgo(pInfo->options, pInfo->algoName, pInfo->algoUrl, ANALY_ALGO_TYPE_IMPUTATION,
×
748
                               tListLen(pInfo->algoUrl), pHashMap, id);
749
  TSDB_CHECK_CODE(code, lino, _end);
×
750

751
  // extract the timeout parameter
752
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
×
753
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
754

755
  // extract data freq
756
  code = parseFreq(pSupp, pHashMap, id);
×
757

758
_end:
×
759
  taosHashCleanup(pHashMap);
×
760
  return code;
×
761
}
762

763
static int32_t doCreateBuf(SImputationSupp* pSupp, const char* pId) {
×
764
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
765
  int64_t       ts = taosGetTimestampNs();
×
766
  int32_t       index = 0;
×
767

768
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
×
769
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-imput-%p-%" PRId64, tsTempDir, pSupp, ts);
×
770

771
  int32_t code = tsosAnalyBufOpen(pBuf, pSupp->numOfCols, pId);
×
772
  if (code != 0) goto _OVER;
×
773

774
  code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
775
  if (code != 0) goto _OVER;
×
776

777
  code = taosAnalyBufWriteColMeta(pBuf, index++, pSupp->targetType, "val");
×
778
  if (code != 0) goto _OVER;
×
779

780
  code = taosAnalyBufWriteDataBegin(pBuf);
×
781
  if (code != 0) goto _OVER;
×
782

783
  for (int32_t i = 0; i < pSupp->numOfCols; ++i) {
×
784
    code = taosAnalyBufWriteColBegin(pBuf, i);
×
785
    if (code != 0) goto _OVER;
×
786
  }
787

788
_OVER:
×
789
  if (code != 0) {
×
790
    (void)taosAnalyBufClose(pBuf);
×
791
    taosAnalyBufDestroy(pBuf);
×
792
  }
793
  return code;
×
794
}
795

796
#else
797

798
int32_t createImputationOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
799
                                     SOperatorInfo** pOptrInfo) {
800
  return TSDB_CODE_OPS_NOT_SUPPORT;
801
}
802
void imputatDestroyOperatorInfo(void* param) {}
803

804
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc