• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4700

29 Aug 2025 06:36AM UTC coverage: 58.335% (+0.008%) from 58.327%
#4700

push

travis-ci

web-flow
fix(gpt): fix race-condition in preparing tmp files (#32800)

133694 of 291873 branches covered (45.81%)

Branch coverage included in aggregate %.

5 of 34 new or added lines in 6 files covered. (14.71%)

242 existing lines in 22 files now uncovered.

201983 of 283561 relevant lines covered (71.23%)

29993718.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/forecastoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tanalytics.h"
22
#include "taoserror.h"
23
#include "tcommon.h"
24
#include "tdatablock.h"
25
#include "tmsg.h"
26

27
#ifdef USE_ANALYTICS
28

29
#define ALGO_OPT_RETCONF_NAME      "return_conf"
30
#define ALGO_OPT_FORECASTROWS_NAME "rows"
31
#define ALGO_OPT_CONF_NAME         "conf"
32
#define ALGO_OPT_START_NAME        "start"
33
#define ALGO_OPT_EVERY_NAME        "every"
34

35
typedef struct {
36
  char*           pName;
37
  SColumnInfoData data;
38
  SColumn         col;
39
  int32_t         numOfRows;
40
} SColFutureData;
41

42
typedef struct {
43
  char         algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
44
  char         algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
45
  char*        pOptions;
46
  int64_t      maxTs;
47
  int64_t      minTs;
48
  int64_t      numOfRows;
49
  uint64_t     groupId;
50
  int64_t      forecastRows;
51
  int64_t      cachedRows;
52
  int32_t      numOfBlocks;
53
  int64_t      timeout;
54
  int16_t      resTsSlot;
55
  int16_t      resValSlot;
56
  int16_t      resLowSlot;
57
  int16_t      resHighSlot;
58
  int16_t      inputTsSlot;
59
  int16_t      targetValSlot;
60
  int8_t       targetValType;
61
  int8_t       inputPrecision;
62
  int8_t       wncheck;
63
  double       conf;
64
  int64_t      startTs;
65
  int64_t      every;
66
  int8_t       setStart;
67
  int8_t       setEvery;
68
  SArray*      pCovariateSlotList;   // covariate slot list
69
  SArray*      pDynamicRealList;     // dynamic real data list
70
  int32_t      numOfInputCols;
71
  SAnalyticBuf analyBuf;
72
} SForecastSupp;
73

74
typedef struct SForecastOperatorInfo {
75
  SSDataBlock*  pRes;
76
  SExprSupp     scalarSup;  // scalar calculation
77
  SForecastSupp forecastSupp;
78
} SForecastOperatorInfo;
79

80
static void destroyForecastInfo(void* param);
81
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id);
82

83
static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) {
84
  if (pBlock->info.rows < pBlock->info.capacity) {
×
85
    return TSDB_CODE_SUCCESS;
×
86
  }
87

88
  int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum);
×
89
  if (code != TSDB_CODE_SUCCESS) {
×
90
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
91
    return code;
×
92
  }
93

94
  return TSDB_CODE_SUCCESS;
×
95
}
96

97
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
×
98
  int32_t       code = TSDB_CODE_SUCCESS;
×
99
  int32_t       lino = 0;
×
100
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
101

102
  if (pSupp->cachedRows > ANALY_FORECAST_MAX_ROWS) {
×
103
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
104
    qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
×
105
           tstrerror(code), ANALY_FORECAST_MAX_ROWS);
106
    return code;
×
107
  }
108

109
  pSupp->numOfBlocks++;
×
110
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
×
111

112
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
113
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetValSlot);
×
114
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot);
×
115
    if (pTsCol == NULL || pValCol == NULL) break;
×
116

117
    int32_t index = 0;
×
118
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
×
119
    char*   val = colDataGetData(pValCol, j);
×
120
    int16_t valType = pValCol->info.type;
×
121

122
    pSupp->minTs = MIN(pSupp->minTs, ts);
×
123
    pSupp->maxTs = MAX(pSupp->maxTs, ts);
×
124
    pSupp->numOfRows++;
×
125

126
    // write the primary time stamp column data
127
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
×
128
    if (TSDB_CODE_SUCCESS != code) {
×
129
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
130
      return code;
×
131
    }
132

133
    // write the main column for forecasting
134
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
×
135
    if (TSDB_CODE_SUCCESS != code) {
×
136
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
137
      return code;
×
138
    }
139

140
    // let's handle the dynamic_real_columns
141
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pDynamicRealList); ++i) {
×
142
      SColFutureData*  pCol = taosArrayGet(pSupp->pDynamicRealList, i);
×
143
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->col.slotId);
×
144

145
      char* pVal = colDataGetData(pColData, j);
×
146
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->col.type, pVal);
×
147
      if (TSDB_CODE_SUCCESS != code) {
×
148
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
149
        return code;
×
150
      }
151
    }
152

153
    // now handle the past_dynamic_real columns and
154
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
×
155
      SColumn*         pCol = taosArrayGet(pSupp->pCovariateSlotList, i);
×
156
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
×
157

158
      char* pVal = colDataGetData(pColData, j);
×
159
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->type, pVal);
×
160
      if (TSDB_CODE_SUCCESS != code) {
×
161
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
162
        return code;
×
163
      }
164
    }
165
  }
166

167
  return 0;
×
168
}
169

170
static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
×
171
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
172
  int32_t       code = 0;
×
173

174
  // add the future dynamic real column data
175

176
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
×
177
    // the primary timestamp and the forecast column
178
    // add the future dynamic real column data
179
    if ((i >= 2) && ((i - 2) < taosArrayGetSize(pSupp->pDynamicRealList))) {
×
180
      SColFutureData* pCol = taosArrayGet(pSupp->pDynamicRealList, i - 2);
×
181
      int16_t         valType = pCol->col.type;
×
182
      for (int32_t j = 0; j < pCol->numOfRows; ++j) {
×
183
        char* pVal = colDataGetData(&pCol->data, j);
×
184
        code = taosAnalyBufWriteColData(pBuf, i, valType, pVal);
×
185
        if (code != 0) {
×
186
          return code;
×
187
        }
188
      }
189
    }
190

191
    code = taosAnalyBufWriteColEnd(pBuf, i);
×
192
    if (code != 0) return code;
×
193
  }
194

195
  code = taosAnalyBufWriteDataEnd(pBuf);
×
196
  if (code != 0) return code;
×
197

198
  code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->pOptions);
×
199
  if (code != 0) return code;
×
200

201
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName);
×
202
  if (code != 0) return code;
×
203

204
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
205
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
×
206
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
×
207
  code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
×
208
  if (code != 0) return code;
×
209

210
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
×
211
  if (code != 0) return code;
×
212

213
  bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1);
×
214
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_RETCONF_NAME, !noConf);
×
215
  if (code != 0) return code;
×
216

217
  if (pSupp->cachedRows < ANALY_FORECAST_MIN_ROWS) {
×
218
    qError("%s history rows for forecasting not enough, min required:%d, current:%" PRId64, id, ANALY_FORECAST_MIN_ROWS,
×
219
           pSupp->forecastRows);
220
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
221
  }
222

223
  code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->forecastRows);
×
224
  if (code != 0) return code;
×
225

226
  code = taosAnalyBufWriteOptFloat(pBuf, "conf", pSupp->conf);
×
227
  if (code != 0) return code;
×
228

229
  int32_t len = strlen(pSupp->pOptions);
×
230
  int64_t every = (pSupp->setEvery != 0) ? pSupp->every : ((pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1));
×
231
  code = taosAnalyBufWriteOptInt(pBuf, "every", every);
×
232
  if (code != 0) return code;
×
233

234
  int64_t start = (pSupp->setStart != 0) ? pSupp->startTs : pSupp->maxTs + every;
×
235
  code = taosAnalyBufWriteOptInt(pBuf, "start", start);
×
236
  if (code != 0) return code;
×
237

238
  if (taosArrayGetSize(pSupp->pCovariateSlotList) + taosArrayGetSize(pSupp->pDynamicRealList) > 0) {
×
239
    code = taosAnalyBufWriteOptStr(pBuf, "type", "covariate");
×
240
    if (code != 0) return code;
×
241
  }
242

243
  code = taosAnalyBufClose(pBuf);
×
244
  return code;
×
245
}
246

247
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
×
248
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
249
  int32_t       resCurRow = pBlock->info.rows;
×
250
  int8_t        tmpI8 = 0;
×
251
  int16_t       tmpI16 = 0;
×
252
  int32_t       tmpI32 = 0;
×
253
  int64_t       tmpI64 = 0;
×
254
  double        tmpDouble = 0;
×
255
  int32_t       code = 0;
×
256

257
  SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
×
258
  if (NULL == pResValCol) {
×
259
    return terrno;
×
260
  }
261

262
  SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
×
263
  SColumnInfoData* pResLowCol =
×
264
      ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
×
265
  SColumnInfoData* pResHighCol =
×
266
      (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
×
267

NEW
268
  SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout, pId);
×
269
  if (pJson == NULL) {
×
270
    return terrno;
×
271
  }
272

273
  int32_t rows = 0;
×
274
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
275
  if (rows < 0 && code == 0) {
×
276
    code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
277

278
    char    pMsg[1024] = {0};
×
279
    int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
280
    
281
    if (ret == 0) {
×
282
      qError("%s failed to exec forecast, msg:%s", pId, pMsg);
×
283
      void* p = strstr(pMsg, "white noise");
×
284
      if (p != NULL) {
×
285
        code = TSDB_CODE_ANA_WN_DATA;
×
286
      } else {
287
        p = strstr(pMsg, "[Errno 111] Connection refused");
×
288
        if (p != NULL) {  // the specified forecast model not loaded yet
×
289
          code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
290
        }
291
      }
292
    } else {
293
      qError("%s failed to extract msg from server, unknown error", pId);
×
294
    }
295

296
    tjsonDelete(pJson);
×
297
    return code;
×
298
  }
299

300
  // invalid json format
301
  if (code != 0) {
×
302
    goto _OVER;
×
303
  }
304

305
  SJson* res = tjsonGetObjectItem(pJson, "res");
×
306
  if (res == NULL) goto _OVER;
×
307
  int32_t ressize = tjsonGetArraySize(res);
×
308
  bool    returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
×
309

310
  if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) {
×
311
    goto _OVER;
×
312
  }
313

314
  if (pResTsCol != NULL) {
×
315
    resCurRow = pBlock->info.rows;
×
316
    SJson* tsJsonArray = tjsonGetArrayItem(res, 0);
×
317
    if (tsJsonArray == NULL) goto _OVER;
×
318
    int32_t tsSize = tjsonGetArraySize(tsJsonArray);
×
319
    if (tsSize != rows) goto _OVER;
×
320
    for (int32_t i = 0; i < tsSize; ++i) {
×
321
      SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i);
×
322
      tjsonGetObjectValueBigInt(tsJson, &tmpI64);
×
323
      colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
×
324
      resCurRow++;
×
325
    }
326
  }
327

328
  if (pResLowCol != NULL) {
×
329
    resCurRow = pBlock->info.rows;
×
330
    SJson* lowJsonArray = tjsonGetArrayItem(res, 2);
×
331
    if (lowJsonArray == NULL) goto _OVER;
×
332
    int32_t lowSize = tjsonGetArraySize(lowJsonArray);
×
333
    if (lowSize != rows) goto _OVER;
×
334
    for (int32_t i = 0; i < lowSize; ++i) {
×
335
      SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i);
×
336
      tjsonGetObjectValueDouble(lowJson, &tmpDouble);
×
337
      colDataSetDouble(pResLowCol, resCurRow, &tmpDouble);
×
338
      resCurRow++;
×
339
    }
340
  }
341

342
  if (pResHighCol != NULL) {
×
343
    resCurRow = pBlock->info.rows;
×
344
    SJson* highJsonArray = tjsonGetArrayItem(res, 3);
×
345
    if (highJsonArray == NULL) goto _OVER;
×
346
    int32_t highSize = tjsonGetArraySize(highJsonArray);
×
347
    if (highSize != rows) goto _OVER;
×
348
    for (int32_t i = 0; i < highSize; ++i) {
×
349
      SJson* highJson = tjsonGetArrayItem(highJsonArray, i);
×
350
      tjsonGetObjectValueDouble(highJson, &tmpDouble);
×
351
      colDataSetDouble(pResHighCol, resCurRow, &tmpDouble);
×
352
      resCurRow++;
×
353
    }
354
  }
355

356
  resCurRow = pBlock->info.rows;
×
357
  SJson* valJsonArray = tjsonGetArrayItem(res, 1);
×
358
  if (valJsonArray == NULL) goto _OVER;
×
359
  int32_t valSize = tjsonGetArraySize(valJsonArray);
×
360
  if (valSize != rows) goto _OVER;
×
361
  for (int32_t i = 0; i < valSize; ++i) {
×
362
    SJson* valJson = tjsonGetArrayItem(valJsonArray, i);
×
363
    tjsonGetObjectValueDouble(valJson, &tmpDouble);
×
364

365
    colDataSetDouble(pResValCol, resCurRow, &tmpDouble);
×
366
    resCurRow++;
×
367
  }
368

369
  pBlock->info.rows += rows;
×
370

371
  if (pJson != NULL) tjsonDelete(pJson);
×
372
  return 0;
×
373

374
_OVER:
×
375
  tjsonDelete(pJson);
×
376
  if (code == 0) {
×
377
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
378
  }
379

380
  qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
×
381
  return code;
×
382
}
383

384
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
×
385
  int32_t       code = TSDB_CODE_SUCCESS;
×
386
  int32_t       lino = 0;
×
387
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
388

389
  code = forecastCloseBuf(pSupp, pId);
×
390
  QUERY_CHECK_CODE(code, lino, _end);
×
391

392
  code = forecastEnsureBlockCapacity(pResBlock, 1);
×
393
  QUERY_CHECK_CODE(code, lino, _end);
×
394

395
  code = forecastAnalysis(pSupp, pResBlock, pId);
×
396
  QUERY_CHECK_CODE(code, lino, _end);
×
397

398
  uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks);
×
399

400
_end:
×
401
  pSupp->numOfBlocks = 0;
×
402
  taosAnalyBufDestroy(&pSupp->analyBuf);
×
403
  return code;
×
404
}
405

406
static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
407
  int32_t                code = TSDB_CODE_SUCCESS;
×
408
  int32_t                lino = 0;
×
409
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
×
410
  SForecastOperatorInfo* pInfo = pOperator->info;
×
411
  SSDataBlock*           pResBlock = pInfo->pRes;
×
412
  SForecastSupp*         pSupp = &pInfo->forecastSupp;
×
413
  SExprSupp*             pScalarSupp = &pInfo->scalarSup;
×
414
  SAnalyticBuf*          pBuf = &pSupp->analyBuf;
×
415
  int64_t                st = taosGetTimestampUs();
×
416
  int32_t                numOfBlocks = pSupp->numOfBlocks;
×
417
  const char*            pId = GET_TASKID(pOperator->pTaskInfo);
×
418

419
  blockDataCleanup(pResBlock);
×
420

421
  while (1) {
×
422
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
423
    if (pBlock == NULL) {
×
424
      break;
×
425
    }
426

427
    if (pScalarSupp->pExprInfo != NULL) {
×
428
      code = projectApplyFunctions(pScalarSupp->pExprInfo, pBlock, pBlock, pScalarSupp->pCtx, pScalarSupp->numOfExprs,
×
429
                                   NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
×
430
      if (code != TSDB_CODE_SUCCESS) {
×
431
        T_LONG_JMP(pTaskInfo->env, code);
×
432
      }
433
    }
434

435
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
436
      pSupp->groupId = pBlock->info.id.groupId;
×
437
      numOfBlocks++;
×
438
      pSupp->cachedRows += pBlock->info.rows;
×
439
      qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
×
440
             pBlock->info.rows, pSupp->cachedRows);
441
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
442
      QUERY_CHECK_CODE(code, lino, _end);
×
443
    } else {
444
      qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
445
      code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
446
      QUERY_CHECK_CODE(code, lino, _end);
×
447
      pSupp->groupId = pBlock->info.id.groupId;
×
448
      numOfBlocks = 1;
×
449
      pSupp->cachedRows = pBlock->info.rows;
×
450
      qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
×
451
             pBlock->info.rows, pSupp->cachedRows);
452
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
453
      QUERY_CHECK_CODE(code, lino, _end);
×
454
    }
455

456
    if (pResBlock->info.rows > 0) {
×
457
      (*ppRes) = pResBlock;
×
458
      qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks);
×
459
      return code;
×
460
    }
461
  }
462

463
  if (numOfBlocks > 0) {
×
464
    qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
465
    code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
466
    QUERY_CHECK_CODE(code, lino, _end);
×
467
  }
468

469
  int64_t cost = taosGetTimestampUs() - st;
×
470
  qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost);
×
471

472
_end:
×
473
  if (code != TSDB_CODE_SUCCESS) {
×
474
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
475
    pTaskInfo->code = code;
×
476
    T_LONG_JMP(pTaskInfo->env, code);
×
477
  }
478

479
  (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
×
480
  return code;
×
481
}
482

483
static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) {
×
484
  pSupp->resLowSlot = -1;
×
485
  pSupp->resHighSlot = -1;
×
486
  pSupp->resTsSlot = -1;
×
487
  pSupp->resValSlot = -1;
×
488

489
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
×
490
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
×
491
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
×
492
    if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) {
×
493
      pSupp->resValSlot = dstSlot;
×
494
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) {
×
495
      pSupp->resTsSlot = dstSlot;
×
496
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) {
×
497
      pSupp->resLowSlot = dstSlot;
×
498
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) {
×
499
      pSupp->resHighSlot = dstSlot;
×
500
    } else {
501
    }
502
  }
503

504
  return 0;
×
505
}
506

507
static int32_t validInputParams(SFunctionNode* pFunc, const char* id) {
×
508
  int32_t code = 0;
×
509
  int32_t num = LIST_LENGTH(pFunc->pParameterList);
×
510

511
  if (num <= 1) {
×
512
    qError("%s invalid number of parameters:%d", id, num);
×
513
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
514
    goto _end;
×
515
  }
516

517
  for (int32_t i = 0; i < num; ++i) {
×
518
    SNode* p = nodesListGetNode(pFunc->pParameterList, i);
×
519
    if (p == NULL) {
×
520
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
521
      qError("%s %d-th parameter in forecast function is NULL, code:%s", id, i, tstrerror(code));
×
522
      goto _end;
×
523
    }
524
  }
525

526
  if (num == 2) {  // column_name, timestamp_column_name
×
527
    SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
×
528
    SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
×
529

530
    if (nodeType(p1) != QUERY_NODE_COLUMN || nodeType(p2) != QUERY_NODE_COLUMN) {
×
531
      qError("%s invalid column type, column 1:%d, column 2:%d", id, nodeType(p1), nodeType(p2));
×
532
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
533
      goto _end;
×
534
    }
535
  } else if (num >= 3) {
×
536
    // column_name_#1, column_name_#2...., analytics_options, timestamp_column_name, primary_key_column if exists
537
    // column_name_#1, timestamp_column_name, primary_key_column if exists
538
    // column_name_#1, analytics_options, timestamp_column_name
539
    SNode* pTarget = nodesListGetNode(pFunc->pParameterList, 0);
×
540
    if (nodeType(pTarget) != QUERY_NODE_COLUMN) {
×
541
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
542
      qError("%s first parameter is not valid column in forecast function", id);
×
543
      goto _end;
×
544
    }
545

546
    SNode* pNode = nodesListGetNode(pFunc->pParameterList, num - 1);
×
547
    if (nodeType(pNode) != QUERY_NODE_COLUMN) {
×
548
      qError("%s last parameter is not valid column, actual:%d", id, nodeType(pNode));
×
549
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
550
    }
551
  }
552

553
_end:
×
554
  if (code) {
×
555
    qError("%s valid the parameters failed, code:%s", id, tstrerror(code));
×
556
  }
557
  return code;
×
558
}
559

560
static bool existInList(SForecastSupp* pSupp, int32_t slotId) {
×
561
  for (int32_t j = 0; j < taosArrayGetSize(pSupp->pCovariateSlotList); ++j) {
×
562
    SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
×
563

564
    if (pCol->slotId == slotId) {
×
565
      return true;
×
566
    }
567
  }
568

569
  return false;
×
570
}
571

572
static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
573
  int32_t code = 0;
×
574
  SNode*  pNode = NULL;
×
575
  
576
  pSupp->inputTsSlot = -1;
×
577
  pSupp->targetValSlot = -1;
×
578
  pSupp->targetValType = -1;
×
579
  pSupp->inputPrecision = -1;
×
580

581
  FOREACH(pNode, pFuncs) {
×
582
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
583
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
584
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
585

586
      if (pFunc->funcType == FUNCTION_TYPE_FORECAST) {
×
587
        code = validInputParams(pFunc, id);
×
588
        if (code) {
×
589
          return code;
×
590
        }
591

592
        pSupp->numOfInputCols = 2;
×
593

594
        if (numOfParam == 2) {
×
595
          // column, ts
596
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
597
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
598

599
          pSupp->inputTsSlot = pTsNode->slotId;
×
600
          pSupp->inputPrecision = pTsNode->node.resType.precision;
×
601
          pSupp->targetValSlot = pTarget->slotId;
×
602
          pSupp->targetValType = pTarget->node.resType.type;
×
603

604
          // let's add the holtwinters as the default forecast algorithm
605
          pSupp->pOptions = taosStrdup("algo=holtwinters");
×
606
          if (pSupp->pOptions == NULL) {
×
607
            qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
608
            return terrno;
×
609
          }
610
        } else {
611
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
612
          bool         assignTs = false;
×
613
          bool         assignOpt = false;
×
614

615
          pSupp->targetValSlot = pTarget->slotId;
×
616
          pSupp->targetValType = pTarget->node.resType.type;
×
617

618
          // set the primary ts column and option info
619
          for (int32_t i = 0; i < numOfParam; ++i) {
×
620
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
×
621
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
622
              SColumnNode* pColNode = (SColumnNode*)pNode;
×
623
              if (pColNode->isPrimTs && (!assignTs)) {
×
624
                pSupp->inputTsSlot = pColNode->slotId;
×
625
                pSupp->inputPrecision = pColNode->node.resType.precision;
×
626
                assignTs = true;
×
627
                continue;
×
628
              }
629
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
630
              if (!assignOpt) {
×
631
                SValueNode* pOptNode = (SValueNode*)pNode;
×
632
                pSupp->pOptions = taosStrdup(pOptNode->literal);
×
633
                assignOpt = true;
×
634
                continue;
×
635
              }
636
            }
637
          }
638

639
          if (!assignOpt) {
×
640
            // set the default forecast option
641
            pSupp->pOptions = taosStrdup("algo=holtwinters");
×
642
            if (pSupp->pOptions == NULL) {
×
643
              qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
644
              return terrno;
×
645
            }
646
          }
647

648
          pSupp->pCovariateSlotList = taosArrayInit(4, sizeof(SColumn));
×
649
          pSupp->pDynamicRealList = taosArrayInit(4, sizeof(SColFutureData));
×
650

651
          // the first is the target column
652
          for (int32_t i = 1; i < numOfParam; ++i) {
×
653
            SColumnNode* p = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, i);
×
654
            if ((nodeType(p) != QUERY_NODE_COLUMN) || (nodeType(p) == QUERY_NODE_COLUMN && p->isPrimTs)) {
×
655
              break;
656
            }
657

658
            if (p->slotId == pSupp->targetValSlot) {
×
659
              continue; // duplicate the target column, ignore it
×
660
            }
661

662
            bool exist = existInList(pSupp, p->slotId);
×
663
            if (exist) {
×
664
              continue;  // duplicate column, ignore it
×
665
            }
666

667
            SColumn col = {.slotId = p->slotId,
×
668
                           .colType = p->colType,
×
669
                           .type = p->node.resType.type,
×
670
                           .bytes = p->node.resType.bytes};
×
671

672
            tstrncpy(col.name, p->colName, tListLen(col.name));
×
673
            void* pRet = taosArrayPush(pSupp->pCovariateSlotList, &col);
×
674
            if (pRet == NULL) {
×
675
              code = terrno;
×
676
              qError("failed to record the covariate slot index, since %s", tstrerror(code));
×
677
            }
678
          }
679

680
          pSupp->numOfInputCols += taosArrayGetSize(pSupp->pCovariateSlotList);
×
681
        }
682
      }
683
    }
684
  }
685

686
  return code;
×
687
}
688

689
static void initForecastOpt(SForecastSupp* pSupp) {
×
690
  pSupp->maxTs = 0;
×
691
  pSupp->minTs = INT64_MAX;
×
692
  pSupp->numOfRows = 0;
×
693
  pSupp->wncheck = ANALY_FORECAST_DEFAULT_WNCHECK;
×
694
  pSupp->forecastRows = ANALY_FORECAST_DEFAULT_ROWS;
×
695
  pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
696
  pSupp->setEvery = 0;
×
697
  pSupp->setStart = 0;
×
698
}
×
699

700
static int32_t filterNotSupportForecast(SForecastSupp* pSupp) {
×
701
  if (taosArrayGetSize(pSupp->pCovariateSlotList) > 0) {
×
702
    if (taosStrcasecmp(pSupp->algoName, "holtwinters") == 0) {
×
703
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
704
    } else if (taosStrcasecmp(pSupp->algoName, "arima") == 0) {
×
705
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
706
    } else if (taosStrcasecmp(pSupp->algoName, "timemoe-fc") == 0) {
×
707
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
708
    }
709
  }
710

711
  return TSDB_CODE_SUCCESS;
×
712
}
713

714

715
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
×
716
  int32_t   code = 0;
×
717
  int32_t   lino = 0;
×
718
  SHashObj* pHashMap = NULL;
×
719

720
  initForecastOpt(pSupp);
×
721

722
  code = taosAnalyGetOpts(pSupp->pOptions, &pHashMap);
×
723
  if (code != TSDB_CODE_SUCCESS) {
×
724
    return code;
×
725
  }
726

727
  if (taosHashGetSize(pHashMap) == 0) {
×
728
    code = TSDB_CODE_INVALID_PARA;
×
729
    qError("%s no valid options for forecast, failed to exec", id);
×
730
    return code;
×
731
  }
732

733
  if (taosHashGetSize(pHashMap) == 0) {
×
734
    code = TSDB_CODE_INVALID_PARA;
×
735
    qError("%s no valid options for forecast, failed to exec", id);
×
736
    return code;
×
737
  }
738

739
  code = taosAnalysisParseAlgo(pSupp->pOptions, pSupp->algoName, pSupp->algoUrl, ANALY_ALGO_TYPE_FORECAST,
×
740
                               tListLen(pSupp->algoUrl), pHashMap, id);
741
  TSDB_CHECK_CODE(code, lino, _end);
×
742

743
  code = filterNotSupportForecast(pSupp);
×
744
  if (code) {
×
745
    qError("%s not support forecast model, %s", id, pSupp->algoName);
×
746
    TSDB_CHECK_CODE(code, lino, _end);
×
747
  }
748

749
  // extract the timeout parameter
750
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
×
751
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
752

753
  // extract the forecast rows
754
  char* pRows = taosHashGet(pHashMap, ALGO_OPT_FORECASTROWS_NAME, strlen(ALGO_OPT_FORECASTROWS_NAME));
×
755
  if (pRows != NULL) {
×
756
    int64_t v = 0;
×
757
    code = toInteger(pRows, taosHashGetValueSize(pRows), 10, &v);
×
758

759
    pSupp->forecastRows = v;
×
760
    qDebug("%s forecast rows:%"PRId64, id, pSupp->forecastRows);
×
761
  } else {
762
    qDebug("%s forecast rows not found:%s, use default:%" PRId64, id, pSupp->pOptions, pSupp->forecastRows);
×
763
  }
764

765
  if (pSupp->forecastRows > ANALY_FORECAST_RES_MAX_ROWS) {
×
766
    qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_RES_MAX_ROWS,
×
767
           pSupp->forecastRows);
768
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
769
    goto _end;
×
770
  }
771

772
  if (pSupp->forecastRows <= 0) {
×
773
    qError("%s output rows should be greater than 0, input:%" PRId64, id, pSupp->forecastRows);
×
774
    code = TSDB_CODE_INVALID_PARA;
×
775
    goto _end;
×
776
  }
777

778
  // extract the confidence interval value
779
  char* pConf = taosHashGet(pHashMap, ALGO_OPT_CONF_NAME, strlen(ALGO_OPT_CONF_NAME));
×
780
  if (pConf != NULL) {
×
781
    char*  endPtr = NULL;
×
782
    double v = taosStr2Double(pConf, &endPtr);
×
783
    pSupp->conf = v;
×
784

785
    if (v <= 0 || v > 1.0) {
×
786
      pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
787
      qWarn("%s valid conf range is (0, 1], user specified:%.2f out of range, set the default:%.2f", id, v,
×
788
             pSupp->conf);
789
    } else {
790
      qDebug("%s forecast conf:%.2f", id, pSupp->conf);
×
791
    }
792
  } else {
793
    qDebug("%s forecast conf not found:%s, use default:%.2f", id, pSupp->pOptions, pSupp->conf);
×
794
  }
795

796
  // extract the start timestamp
797
  char* pStart = taosHashGet(pHashMap, ALGO_OPT_START_NAME, strlen(ALGO_OPT_START_NAME));
×
798
  if (pStart != NULL) {
×
799
    int64_t v = 0;
×
800
    code = toInteger(pStart, taosHashGetValueSize(pStart), 10, &v);
×
801
    pSupp->startTs = v;
×
802
    pSupp->setStart = 1;
×
803
    qDebug("%s forecast set start ts:%"PRId64, id, pSupp->startTs);
×
804
  }
805

806
  // extract the time step
807
  char* pEvery = taosHashGet(pHashMap, ALGO_OPT_EVERY_NAME, strlen(ALGO_OPT_EVERY_NAME));
×
808
  if (pEvery != NULL) {
×
809
    int64_t v = 0;
×
810
    code = toInteger(pEvery, taosHashGetValueSize(pEvery), 10, &v);
×
811
    pSupp->every = v;
×
812
    pSupp->setEvery = 1;
×
813
    qDebug("%s forecast set every ts:%"PRId64, id, pSupp->every);
×
814
  }
815

816
  if (pSupp->setEvery && pSupp->every <= 0) {
×
817
    qError("%s period should be greater than 0, user specified:%"PRId64, id, pSupp->every);
×
818
    code = TSDB_CODE_INVALID_PARA;
×
819
    goto _end;
×
820
  }
821

822
  // extract the dynamic real feature for covariate forecasting
823
  void*       pIter = NULL;
×
824
  size_t      keyLen = 0;
×
825
  const char* p = "dynamic_real_";
×
826

827
  while ((pIter = taosHashIterate(pHashMap, pIter))) {
×
828
    const char* pVal = pIter;
×
829
    char*       pKey = taosHashGetKey((void*)pVal, &keyLen);
×
830
    int32_t     idx = 0;
×
831
    char        nameBuf[512] = {0};
×
832

833
    if (strncmp(pKey, p, strlen(p)) == 0) {
×
834

835
      if (strncmp(&pKey[keyLen - 4], "_col", 4) == 0) {
×
836
        continue;
×
837
      }
838

839
      int32_t ret = sscanf(pKey, "dynamic_real_%d", &idx);
×
840
      if (ret == 0) {
×
841
        continue;
×
842
      }
843

844
      memcpy(nameBuf, pKey, keyLen);
×
845
      strncpy(&nameBuf[keyLen], "_col", strlen("_col"));
×
846

847
      void* pCol = taosHashGet(pHashMap, nameBuf, strlen(nameBuf));
×
848
      if (pCol == NULL) {
×
849
        char* pTmp = taosStrndupi(pKey, keyLen);
×
850
        qError("%s dynamic real column related:%s column name:%s not specified", id, pTmp, nameBuf);
×
851
        
852
        taosMemoryFree(pTmp);
×
853
        code = TSDB_CODE_INVALID_PARA;
×
854
        goto _end;
×
855
      } else {
856
        // build dynamic_real_feature
857
        SColFutureData d = {.pName = taosStrndupi(pCol, taosHashGetValueSize(pCol))};
×
858
        if (d.pName == NULL) {
×
859
          qError("%s failed to clone the future dynamic real column name:%s", id, (char*) pCol);
×
860
          code = terrno;
×
861
          goto _end;
×
862
        }
863

864
        int32_t index = -1;
×
865
        for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
×
866
          SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, i);
×
867
          if (strcmp(pColx->name, d.pName) == 0) {
×
868
            index = i;
×
869
            break;
×
870
          }
871
        }
872

873
        if (index == -1) {
×
874
          qError("%s not found the required future dynamic real column:%s", id, d.pName);
×
875
          code = TSDB_CODE_INVALID_PARA;
×
876
          taosMemoryFree(d.pName);
×
877
          goto _end;
×
878
        }
879

880
        SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, index);
×
881
        d.data.info.slotId = pColx->slotId;
×
882
        d.data.info.type = pColx->type;
×
883
        d.data.info.bytes = pColx->bytes;
×
884

885
        int32_t len = taosHashGetValueSize((void*)pVal);
×
886
        char*   buf = taosStrndupi(pVal, len);
×
887
        int32_t unused = strdequote((char*)buf);
×
888

889
        int32_t num = 0;
×
890
        char**  pList = strsplit(buf, " ", &num);
×
891
        if (num != pSupp->forecastRows) {
×
892
          qError("%s the rows:%d of future dynamic real column data is not equalled to the forecasting rows:%" PRId64,
×
893
                 id, num, pSupp->forecastRows);
894
          code = TSDB_CODE_INVALID_PARA;
×
895

896
          taosMemoryFree(d.pName);
×
897
          taosMemoryFree(pList);
×
898
          taosMemoryFree(buf);
×
899
          goto _end;
×
900
        }
901

902
        d.numOfRows = num;
×
903

904
        code = colInfoDataEnsureCapacity(&d.data, num, true);
×
905
        if (code != 0) {
×
906
          qError("%s failed to prepare buffer, code:%s", id, tstrerror(code));
×
907
          goto _end;
×
908
        }
909

910
        for (int32_t j = 0; j < num; ++j) {
×
911
          char* ps = NULL;
×
912
          if (j == 0) {
×
913
            ps = strstr(pList[j], "[") + 1;
×
914
          } else {
915
            ps = pList[j];
×
916
          }
917

918
          code = 0;
×
919

920
          switch(pColx->type) {
×
921
            case TSDB_DATA_TYPE_TINYINT: {
×
922
              int8_t t1 = taosStr2Int8(ps, NULL, 10);
×
923
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
924
              break;
×
925
            }
926
            case TSDB_DATA_TYPE_SMALLINT: {
×
927
              int16_t t1 = taosStr2Int16(ps, NULL, 10);
×
928
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
929
              break;
×
930
            }
931
            case TSDB_DATA_TYPE_INT: {
×
932
              int32_t t1 = taosStr2Int32(ps, NULL, 10);
×
933
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
934
              break;
×
935
            }
936
            case TSDB_DATA_TYPE_BIGINT: {
×
937
              int64_t t1 = taosStr2Int64(ps, NULL, 10);
×
938
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
939
              break;
×
940
            }
941
            case TSDB_DATA_TYPE_FLOAT: {
×
942
              float t1 = taosStr2Float(ps, NULL);
×
943
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
944
              break;
×
945
            }
946
            case TSDB_DATA_TYPE_DOUBLE: {
×
947
              double t1 = taosStr2Double(ps, NULL);
×
948
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
949
              break;
×
950
            }
951
            case TSDB_DATA_TYPE_UTINYINT: {
×
952
              uint8_t t1 = taosStr2UInt8(ps, NULL, 10);
×
953
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
954
              break;
×
955
            }
956
            case TSDB_DATA_TYPE_USMALLINT: {
×
957
              uint16_t t1 = taosStr2UInt16(ps, NULL, 10);
×
958
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
959
              break;
×
960
            }
961
            case TSDB_DATA_TYPE_UINT: {
×
962
              uint32_t t1 = taosStr2UInt32(ps, NULL, 10);
×
963
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
964
              break;
×
965
            }
966
            case TSDB_DATA_TYPE_UBIGINT: {
×
967
              uint64_t t1 = taosStr2UInt64(ps, NULL, 10);
×
968
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
969
              break;
×
970
            }
971
          }
972

973
          if (code != 0) {
×
974
            break;
×
975
          }
976
        }
977

978
        taosMemoryFree(pList);
×
979
        taosMemoryFree(buf);
×
980
        if (code != 0) {
×
981
          goto _end;
×
982
        }
983

984
        void* noret = taosArrayPush(pSupp->pDynamicRealList, &d);
×
985
        if (noret == NULL) {
×
986
          qError("%s failed to add column info in dynamic real column info", id);
×
987
          code = terrno;
×
988
          goto _end;
×
989
        }
990
      }
991
    }
992
  }
993

994
_end:
×
995
  taosHashCleanup(pHashMap);
×
996
  return code;
×
997
}
998

NEW
999
static int32_t forecastCreateBuf(SForecastSupp* pSupp, const char* pId) {
×
1000
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
NEW
1001
  int64_t       ts = taosGetTimestampNs();
×
1002
  int32_t       index = 0;
×
1003

1004
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
×
NEW
1005
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%p-%" PRId64, tsTempDir, pSupp, ts);
×
1006

1007
  int32_t numOfCols = taosArrayGetSize(pSupp->pCovariateSlotList) + 2;
×
1008

NEW
1009
  int32_t code = tsosAnalyBufOpen(pBuf, numOfCols, pId);
×
1010
  if (code != 0) goto _OVER;
×
1011

1012
  code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
1013
  if (code != 0) goto _OVER;
×
1014

1015
  code = taosAnalyBufWriteColMeta(pBuf, index++, pSupp->targetValType, "val");
×
1016
  if (code != 0) goto _OVER;
×
1017

1018
  int32_t numOfDynamicReal = taosArrayGetSize(pSupp->pDynamicRealList);
×
1019
  int32_t numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
×
1020

1021
  if (numOfPastDynamicReal >= numOfDynamicReal) {
×
1022
    for(int32_t i = 0; i < numOfDynamicReal; ++i) {
×
1023
      SColFutureData* pData = taosArrayGet(pSupp->pDynamicRealList, i);
×
1024

1025
      for(int32_t k = 0; k < taosArrayGetSize(pSupp->pCovariateSlotList); ++k) {
×
1026
        SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, k);
×
1027
        if (strcmp(pCol->name, pData->pName) == 0) {
×
1028
          char name[128] = {0};
×
1029
          (void) tsnprintf(name, tListLen(name), "dynamic_real_%d", i + 1);
×
1030
          code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
1031
          if (code != 0) {
×
1032
            goto _OVER;
×
1033
          }
1034

1035
          memcpy(&pData->col, pCol, sizeof(SColumn));
×
1036
          taosArrayRemove(pSupp->pCovariateSlotList, k);
×
1037
          break;
×
1038
        }
1039
      }
1040
    }
1041

1042
    numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
×
1043
    for (int32_t j = 0; j < numOfPastDynamicReal; ++j) {
×
1044
      SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
×
1045

1046
      char name[128] = {0};
×
1047
      (void)tsnprintf(name, tListLen(name), "past_dynamic_real_%d", j + 1);
×
1048

1049
      code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
1050
      if (code) {
×
1051
        goto _OVER;
×
1052
      }
1053
    }
1054
  }
1055

1056
  code = taosAnalyBufWriteDataBegin(pBuf);
×
1057
  if (code != 0) goto _OVER;
×
1058

1059
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
×
1060
    code = taosAnalyBufWriteColBegin(pBuf, i);
×
1061
    if (code != 0) goto _OVER;
×
1062
  }
1063

1064
_OVER:
×
1065
  if (code != 0) {
×
1066
    (void)taosAnalyBufClose(pBuf);
×
1067
    taosAnalyBufDestroy(pBuf);
×
1068
  }
1069
  return code;
×
1070
}
1071

1072
static int32_t resetForecastOperState(SOperatorInfo* pOper) {
×
1073
  int32_t code = 0, lino = 0;
×
1074
  SForecastOperatorInfo* pInfo = pOper->info;
×
1075
  const char*            pId = pOper->pTaskInfo->id.str;
×
1076
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pOper->pPhyNode;
×
1077
  SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
×
1078

1079
  pOper->status = OP_NOT_OPENED;
×
1080

1081
  blockDataCleanup(pInfo->pRes);
×
1082

1083
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
1084
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1085

1086
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
1087

1088
  cleanupExprSupp(&pOper->exprSupp);
×
1089
  cleanupExprSupp(&pInfo->scalarSup);
×
1090

1091
  int32_t                 numOfExprs = 0;
×
1092
  SExprInfo*              pExprInfo = NULL;
×
1093

1094
  TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs));
×
1095

1096
  TAOS_CHECK_EXIT(initExprSupp(&pOper->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore));
×
1097

1098
  TAOS_CHECK_EXIT(filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOper->exprSupp.pFilterInfo, 0,
×
1099
                            pTaskInfo->pStreamRuntimeInfo));
1100

1101
  TAOS_CHECK_EXIT(forecastParseInput(&pInfo->forecastSupp, pForecastPhyNode->pFuncs, pId));
×
1102

1103
  TAOS_CHECK_EXIT(forecastParseOutput(&pInfo->forecastSupp, &pOper->exprSupp));
×
1104

1105
  TAOS_CHECK_EXIT(forecastParseOpt(&pInfo->forecastSupp, pId));
×
NEW
1106
  TAOS_CHECK_EXIT(forecastCreateBuf(&pInfo->forecastSupp, pId));
×
1107

1108
  if (pForecastPhyNode->pExprs != NULL) {
×
1109
    int32_t    num = 0;
×
1110
    SExprInfo* pScalarExprInfo = NULL;
×
1111
    TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num));
×
UNCOV
1112
    TAOS_CHECK_EXIT(initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore));
×
1113
  }
1114

UNCOV
1115
  initResultSizeInfo(&pOper->resultInfo, 4096);
×
1116

UNCOV
1117
_exit:
×
1118

1119
  if (code) {
×
UNCOV
1120
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1121
  }
1122

UNCOV
1123
  return code;  
×
1124
}
1125

1126

1127

1128

UNCOV
1129
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
1130
                                   SOperatorInfo** pOptrInfo) {
UNCOV
1131
  QRY_PARAM_CHECK(pOptrInfo);
×
1132

1133
  int32_t                code = 0;
×
1134
  int32_t                lino = 0;
×
1135
  SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo));
×
1136
  SOperatorInfo*         pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
1137
  if (pOperator == NULL || pInfo == NULL) {
×
1138
    code = terrno;
×
UNCOV
1139
    goto _error;
×
1140
  }
1141

UNCOV
1142
  pOperator->pPhyNode = pPhyNode;
×
1143

1144
  const char*             pId = pTaskInfo->id.str;
×
1145
  SForecastSupp*          pSupp = &pInfo->forecastSupp;
×
1146
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
×
1147
  SExprSupp*              pExprSup = &pOperator->exprSupp;
×
1148
  int32_t                 numOfExprs = 0;
×
UNCOV
1149
  SExprInfo*              pExprInfo = NULL;
×
1150

1151
  code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
UNCOV
1152
  QUERY_CHECK_CODE(code, lino, _error);
×
1153

1154
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
UNCOV
1155
  QUERY_CHECK_CODE(code, lino, _error);
×
1156

1157
  if (pForecastPhyNode->pExprs != NULL) {
×
1158
    int32_t    num = 0;
×
1159
    SExprInfo* pScalarExprInfo = NULL;
×
1160
    code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
×
UNCOV
1161
    QUERY_CHECK_CODE(code, lino, _error);
×
1162

1163
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
UNCOV
1164
    QUERY_CHECK_CODE(code, lino, _error);
×
1165
  }
1166

1167
  code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
1168
                            pTaskInfo->pStreamRuntimeInfo);
×
UNCOV
1169
  QUERY_CHECK_CODE(code, lino, _error);
×
1170

1171
  code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs, pId);
×
UNCOV
1172
  QUERY_CHECK_CODE(code, lino, _error);
×
1173

1174
  code = forecastParseOutput(pSupp, pExprSup);
×
UNCOV
1175
  QUERY_CHECK_CODE(code, lino, _error);
×
1176

1177
  code = forecastParseOpt(pSupp, pId);
×
UNCOV
1178
  QUERY_CHECK_CODE(code, lino, _error);
×
1179

NEW
1180
  code = forecastCreateBuf(pSupp, pId);
×
UNCOV
1181
  QUERY_CHECK_CODE(code, lino, _error);
×
1182

UNCOV
1183
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
1184

1185
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
UNCOV
1186
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
×
1187

UNCOV
1188
  setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo,
×
1189
                  pTaskInfo);
UNCOV
1190
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn,
×
1191
                                         NULL, optrDefaultGetNextExtFn, NULL);
1192

UNCOV
1193
  setOperatorResetStateFn(pOperator, resetForecastOperState);
×
1194

1195
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
×
UNCOV
1196
  QUERY_CHECK_CODE(code, lino, _error);
×
1197

1198
  code = appendDownstream(pOperator, &downstream, 1);
×
UNCOV
1199
  QUERY_CHECK_CODE(code, lino, _error);
×
1200

UNCOV
1201
  *pOptrInfo = pOperator;
×
1202

1203
  qDebug("%s forecast env is initialized, option:%s", pId, pSupp->pOptions);
×
UNCOV
1204
  return TSDB_CODE_SUCCESS;
×
1205

1206
_error:
×
1207
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1208
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
1209
  }
1210
  if (pInfo != NULL) destroyForecastInfo(pInfo);
×
1211
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1212
  pTaskInfo->code = code;
×
UNCOV
1213
  return code;
×
1214
}
1215

1216
static void destroyColFutureData(void* p) {
×
1217
  SColFutureData* pData = p;
×
1218
  taosMemoryFree(pData->pName);
×
1219
  colDataDestroy(&pData->data);
×
UNCOV
1220
}
×
1221

1222
static void destroyForecastInfo(void* param) {
×
UNCOV
1223
  SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param;
×
1224

1225
  blockDataDestroy(pInfo->pRes);
×
UNCOV
1226
  pInfo->pRes = NULL;
×
1227

1228
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
UNCOV
1229
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1230

1231
  taosArrayDestroyEx(pInfo->forecastSupp.pDynamicRealList, destroyColFutureData);
×
UNCOV
1232
  pInfo->forecastSupp.pDynamicRealList = NULL;
×
1233

UNCOV
1234
  taosMemoryFree(pInfo->forecastSupp.pOptions);
×
1235

1236
  cleanupExprSupp(&pInfo->scalarSup);
×
1237
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
1238
  taosMemoryFreeClear(param);
×
UNCOV
1239
}
×
1240

1241
#else
1242

1243
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1244
                                   SOperatorInfo** pOptrInfo) {
1245
  return TSDB_CODE_OPS_NOT_SUPPORT;
1246
}
1247

1248
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc