• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4844

09 Nov 2025 03:44PM UTC coverage: 63.058% (-0.5%) from 63.514%
#4844

push

travis-ci

web-flow
test: minor changes (#33510)

117164 of 185804 relevant lines covered (63.06%)

115657269.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/imputationoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "osMemPool.h"
21
#include "osMemory.h"
22
#include "querytask.h"
23
#include "tanalytics.h"
24
#include "taoserror.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdef.h"
28
#include "thash.h"
29
#include "tjson.h"
30
#include "tmsg.h"
31
#include "types.h"
32

33
#ifdef USE_ANALYTICS
34

35
#define FREQ_STR     "freq"
36
#define RADIUS_STR   "radius"
37
#define LAGSTART_STR "lag_start"
38
#define LAGEND_STR   "lag_end"
39

40
typedef struct {
41
  int64_t      timeout;
42
  int8_t       wncheck;
43
  SAnalyticBuf analyBuf;
44
  int32_t      numOfRows;
45
  int32_t      numOfBlocks;
46
  int32_t      numOfCols;
47
  STimeWindow  win;
48
  uint64_t     groupId;
49
  SArray      *pBlocks;    // SSDataBlock*
50
} SBaseSupp;
51

52
typedef struct {
53
  SBaseSupp    base;
54
  int32_t      targetSlot;
55
  int32_t      targetType;
56
  int32_t      tsSlot;
57
  int32_t      tsPrecision;
58
  int32_t      resTsSlot;
59
  int32_t      resMarkSlot;
60
  char         freq[64];         // frequency of data
61
} SImputationSupp;
62

63
typedef struct {
64
  int32_t   radius;
65
  int32_t   lagStart;
66
  int32_t   lagEnd;
67
  SBaseSupp base;
68

69
  int32_t targetSlot1;
70
  int32_t targetSlot2;
71

72
  int32_t targetSlot1Type;
73
  int32_t targetSlot2Type;
74
} SCorrelationSupp;
75

76
typedef struct {
77
  SOptrBasicInfo   binfo;
78
  SExprSupp        scalarSup;
79
  char             algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
80
  char             algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
81
  char*            options;
82
  SColumn          targetCol;
83
  int32_t          resTargetSlot;
84
  int32_t          analysisType;
85
  SImputationSupp  imputatSup;
86
  SCorrelationSupp corrSupp;
87
} SAnalysisOperatorInfo;
88

89
static void    analysisDestroyOperatorInfo(void* param);
90
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
91
static int32_t doAnalysis(SAnalysisOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
92
static int32_t doCacheBlock(SAnalysisOperatorInfo* pInfo, SSDataBlock* pBlock, const char* id);
93
static int32_t doParseInputForImputation(SAnalysisOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id);
94
static int32_t doParseInputForDtw(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id);
95
static int32_t doParseInputForTlcc(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id);
96
static int32_t doSetResSlot(SAnalysisOperatorInfo* pInfo, SExprSupp* pExprSup);
97
static int32_t doParseOption(SAnalysisOperatorInfo* pInfo, const char* id);
98
static int32_t doCreateBuf(SAnalysisOperatorInfo* pInfo, const char* pId);
99
static int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, int32_t prec, const char* pFreq, const char* id);
100
static void    doInitImputOptions(SImputationSupp* pSupp);
101
static void    doInitDtwOptions(SCorrelationSupp* pSupp);
102
static int32_t parseFreq(SImputationSupp* pSupp, SHashObj* pHashMap, const char* id);
103
static void    parseRadius(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id);
104

105
int32_t createGenericAnalysisOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
×
106
                                          SOperatorInfo** pOptrInfo) {
107
  QRY_PARAM_CHECK(pOptrInfo);
×
108

109
  int32_t                    code = TSDB_CODE_SUCCESS;
×
110
  int32_t                    lino = 0;
×
111
  size_t                     keyBufSize = 0;
×
112
  int32_t                    num = 0;
×
113
  SExprInfo*                 pExprInfo = NULL;
×
114
  int32_t                    numOfExprs = 0;
×
115
  const char*                id = GET_TASKID(pTaskInfo);
×
116
  SGenericAnalysisPhysiNode* pAnalysisNode = (SGenericAnalysisPhysiNode*)physiNode;
×
117
  SExprSupp*                 pExprSup = NULL;
×
118

119
  SAnalysisOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnalysisOperatorInfo));
×
120
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
121
  if (pOperator == NULL || pInfo == NULL) {
×
122
    code = terrno;
×
123
    goto _error;
×
124
  }
125

126
  pAnalysisNode = (SGenericAnalysisPhysiNode*)physiNode;
×
127
  pExprSup = &pOperator->exprSupp;
×
128

129
  code = createExprInfo(pAnalysisNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
130
  QUERY_CHECK_CODE(code, lino, _error);
×
131

132
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
133
  QUERY_CHECK_CODE(code, lino, _error);
×
134

135
  if (pAnalysisNode->pExprs != NULL) {
×
136
    SExprInfo* pScalarExprInfo = NULL;
×
137
    code = createExprInfo(pAnalysisNode->pExprs, NULL, &pScalarExprInfo, &num);
×
138
    QUERY_CHECK_CODE(code, lino, _error);
×
139

140
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
141
    QUERY_CHECK_CODE(code, lino, _error);
×
142
  }
143

144
  code = filterInitFromNode((SNode*)pAnalysisNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0, pTaskInfo->pStreamRuntimeInfo);
×
145
  QUERY_CHECK_CODE(code, lino, _error);
×
146

147
  for (int32_t i = 0; i < numOfExprs; ++i) {
×
148
    int32_t type = pExprInfo[i].pExpr->_function.functionType;
×
149
    if (type == FUNCTION_TYPE_IMPUTATION || type == FUNCTION_TYPE_DTW || type == FUNCTION_TYPE_DTW_PATH ||
×
150
        type == FUNCTION_TYPE_TLCC) {
151
      pInfo->analysisType = type;
×
152
      break;
×
153
    }
154
  }
155

156
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
157
    doInitImputOptions(&pInfo->imputatSup);
×
158
    code = doParseInputForImputation(pInfo, &pInfo->imputatSup, pAnalysisNode->pFuncs, id);
×
159
    QUERY_CHECK_CODE(code, lino, _error);
×
160
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
161
    doInitDtwOptions(&pInfo->corrSupp);
×
162
    code = doParseInputForDtw(pInfo, &pInfo->corrSupp, pAnalysisNode->pFuncs, id);
×
163
    QUERY_CHECK_CODE(code, lino, _error);
×
164
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
165
    doInitDtwOptions(&pInfo->corrSupp);
×
166
    code = doParseInputForTlcc(pInfo, &pInfo->corrSupp, pAnalysisNode->pFuncs, id);
×
167
    QUERY_CHECK_CODE(code, lino, _error);
×
168
  } else {
169
    code = TSDB_CODE_INVALID_PARA;
×
170
    goto _error;
×
171
  }
172

173
  code = doParseOption(pInfo, id);
×
174
  QUERY_CHECK_CODE(code, lino, _error);
×
175

176
  code = doSetResSlot(pInfo, pExprSup);
×
177
  QUERY_CHECK_CODE(code, lino, _error);
×
178

179
  code = doCreateBuf(pInfo, id);
×
180
  QUERY_CHECK_CODE(code, lino, _error);
×
181

182
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
183

184
  pInfo->binfo.pRes = createDataBlockFromDescNode(physiNode->pOutputDataBlockDesc);
×
185
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
×
186

187
  setOperatorInfo(pOperator, "GenericAnalyOptr", QUERY_NODE_PHYSICAL_PLAN_ANALYSIS_FUNC, false, OP_NOT_OPENED,
×
188
                  pInfo, pTaskInfo);
189
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, imputationNext, NULL, analysisDestroyOperatorInfo,
×
190
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
191

192
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
193
  QUERY_CHECK_CODE(code, lino, _error);
×
194

195
  code = appendDownstream(pOperator, &downstream, 1);
×
196
  QUERY_CHECK_CODE(code, lino, _error);
×
197

198
  *pOptrInfo = pOperator;
×
199

200
  qDebug("%s forecast env is initialized, option:%s", id, pInfo->options);
×
201
  return TSDB_CODE_SUCCESS;
×
202

203
_error:
×
204
  if (code != TSDB_CODE_SUCCESS) {
×
205
    qError("%s %s failed at line %d since %s", id, __func__, lino, tstrerror(code));
×
206
  }
207

208
  if (pInfo != NULL) analysisDestroyOperatorInfo(pInfo);
×
209
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
210
  pTaskInfo->code = code;
×
211
  return code;
×
212
}
213

214
static int32_t getBaseSupp(SAnalysisOperatorInfo* pInfo, SBaseSupp** ppSupp, const char* pId) {
×
215
  int32_t code = 0;
×
216
  *ppSupp = NULL;
×
217

218
  switch (pInfo->analysisType) {
×
219
    case FUNCTION_TYPE_IMPUTATION:
×
220
      *ppSupp = &pInfo->imputatSup.base;
×
221
      break;
×
222
    case FUNCTION_TYPE_DTW:
×
223
    case FUNCTION_TYPE_DTW_PATH:
224
    case FUNCTION_TYPE_TLCC:
225
      *ppSupp = &pInfo->corrSupp.base;
×
226
      break;
×
227
    default:
×
228
      // Handle error: unknown analysis type
229
      code = TSDB_CODE_INVALID_PARA;
×
230
      qError("%s unknown analysis type: %d", pId, pInfo->analysisType);
×
231
  }
232

233
  return code;
×
234
}
235

236
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
237
  int32_t                code = TSDB_CODE_SUCCESS;
×
238
  int32_t                lino = 0;
×
239
  SAnalysisOperatorInfo* pInfo = pOperator->info;
×
240
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
×
241
  SOptrBasicInfo*        pBInfo = &pInfo->binfo;
×
242
  SSDataBlock*           pRes = pInfo->binfo.pRes;
×
243
  int64_t                st = taosGetTimestampUs();
×
244
  const char*            idstr = GET_TASKID(pTaskInfo);
×
245
  SBaseSupp*             pSupp = NULL;
×
246

247
  code = getBaseSupp(pInfo, &pSupp, idstr);
×
248
  QUERY_CHECK_CODE(code, lino, _end);
×
249

250
  int32_t numOfBlocks = taosArrayGetSize(pSupp->pBlocks);
×
251
  blockDataCleanup(pRes);
×
252

253
  while (1) {
×
254
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
255
    if (pBlock == NULL) {
×
256
      break;
×
257
    }
258

259
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
260
      pSupp->groupId = pBlock->info.id.groupId;
×
261
      numOfBlocks++;
×
262
      code = doCacheBlock(pInfo, pBlock, idstr);
×
263

264
      qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%d", pSupp->groupId, numOfBlocks,
×
265
             pBlock->info.rows, pSupp->numOfRows);
266
      QUERY_CHECK_CODE(code, lino, _end);
×
267
    } else {
268
      qDebug("group:%" PRId64 ", read completed for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
×
269
      code = doAnalysis(pInfo, pTaskInfo);
×
270
      QUERY_CHECK_CODE(code, lino, _end);
×
271

272
      pSupp->groupId = pBlock->info.id.groupId;
×
273
      numOfBlocks = 1;
×
274
      qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%d", pSupp->groupId, pBlock->info.rows,
×
275
             pSupp->numOfRows);
276
      code = doCacheBlock(pInfo, pBlock, idstr);
×
277
      QUERY_CHECK_CODE(code, lino, _end);
×
278
    }
279

280
    if (pRes->info.rows > 0) {
×
281
      (*ppRes) = pRes;
×
282
      qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pRes->info.id.groupId, numOfBlocks);
×
283
      return code;
×
284
    }
285
  }
286

287
  if (numOfBlocks > 0) {
×
288
    qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks);
×
289
    code = doAnalysis(pInfo, pTaskInfo);
×
290
  }
291

292
  int64_t cost = taosGetTimestampUs() - st;
×
293
  qDebug("%s all groups finished, cost:%" PRId64 "us", idstr, cost);
×
294

295
_end:
×
296
  if (code != TSDB_CODE_SUCCESS) {
×
297
    qError("%s %s failed at line %d since %s", idstr, __func__, lino, tstrerror(code));
×
298
    pTaskInfo->code = code;
×
299
    T_LONG_JMP(pTaskInfo->env, code);
×
300
  }
301

302
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
×
303
  return code;
×
304
}
305

306
static void analysisDestroyOperatorInfo(void* param) {
×
307
  SAnalysisOperatorInfo* pInfo = (SAnalysisOperatorInfo*)param;
×
308
  if (pInfo == NULL) return;
×
309

310
  cleanupBasicInfo(&pInfo->binfo);
×
311
  cleanupExprSupp(&pInfo->scalarSup);
×
312

313
  SArray* pBlocks = NULL;
×
314
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
315
    pBlocks = pInfo->imputatSup.base.pBlocks;
×
316
  } else {
317
    pBlocks = pInfo->corrSupp.base.pBlocks;
×
318
  }
319

320
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
×
321
    SSDataBlock* pBlock = taosArrayGetP(pBlocks, i);
×
322
    blockDataDestroy(pBlock);
×
323
  }
324

325
  taosArrayDestroy(pBlocks);
×
326
  taosMemoryFreeClear(param);
×
327
}
328

329
static int32_t doCacheBlockForImputation(SImputationSupp* pSupp, const char* id, SSDataBlock* pBlock) {
×
330
  SAnalyticBuf* pBuf = &pSupp->base.analyBuf;
×
331
  int32_t code = 0;
×
332

333
  if (pSupp->base.numOfRows > ANALY_IMPUTATION_INPUT_MAX_ROWS) {
×
334
    qError("%s too many rows for imputation, maximum allowed:%d, input:%d", id, ANALY_IMPUTATION_INPUT_MAX_ROWS,
×
335
           pSupp->base.numOfRows);
336
    return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
337
  }
338

339
  pSupp->base.numOfBlocks++;
×
340
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->base.numOfBlocks, pBlock, pBlock->info.rows);
×
341

342
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
343
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot);
×
344
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->tsSlot);
×
345
    if (pTsCol == NULL || pValCol == NULL) {
×
346
      break;
347
    }
348

349
    int32_t index = 0;
×
350

351
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
×
352
    char*   val = colDataGetData(pValCol, j);
×
353
    int16_t valType = pValCol->info.type;
×
354

355
    pSupp->base.win.skey = MIN(pSupp->base.win.skey, ts);
×
356
    pSupp->base.win.ekey = MAX(pSupp->base.win.ekey, ts);
×
357

358
    pSupp->base.numOfRows++;
×
359

360
    // write the primary time stamp column data
361
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
×
362
    if (TSDB_CODE_SUCCESS != code) {
×
363
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
364
      return code;
×
365
    }
366

367
    // write the main column for imputation
368
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
×
369
    if (TSDB_CODE_SUCCESS != code) {
×
370
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
371
      return code;
×
372
    }
373
  }
374

375
  return code;
×
376
}
377

378
int32_t doCacheBlockForDtw(SCorrelationSupp* pSupp, const char* id, SSDataBlock* pBlock) {
×
379
  SAnalyticBuf* pBuf = &pSupp->base.analyBuf;
×
380
  int32_t code = 0;
×
381

382
  pSupp->base.numOfBlocks++;
×
383
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->base.numOfBlocks, pBlock, pBlock->info.rows);
×
384

385
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
386
    SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot1);
×
387
    SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot2);
×
388
    if (pCol1 == NULL || pCol2 == NULL) {
×
389
      break;
390
    }
391

392
    int32_t index = 0;
×
393

394
    char*   val1 = colDataGetData(pCol1, j);
×
395
    char*   val2 = colDataGetData(pCol2, j);
×
396

397
    pSupp->base.numOfRows++;
×
398

399
    // write the primary time stamp column data
400
    code = taosAnalyBufWriteColData(pBuf, index++, pCol1->info.type, val1);
×
401
    if (TSDB_CODE_SUCCESS != code) {
×
402
      qError("%s failed to write col1 in buf, code:%s", id, tstrerror(code));
×
403
      return code;
×
404
    }
405

406
    // write the main column for imputation
407
    code = taosAnalyBufWriteColData(pBuf, index++, pCol2->info.type, val2);
×
408
    if (TSDB_CODE_SUCCESS != code) {
×
409
      qError("%s failed to write col2 in buf, code:%s", id, tstrerror(code));
×
410
      return code;
×
411
    }
412
  }
413

414
  return code;
×
415

416
}
417

418
static int32_t doCacheBlock(SAnalysisOperatorInfo* pInfo, SSDataBlock* pBlock, const char* id) {
×
419
  int32_t       code = TSDB_CODE_SUCCESS;
×
420
  int32_t       lino = 0;
×
421

422
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
423
    code = doCacheBlockForImputation(&pInfo->imputatSup, id, pBlock);
×
424
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH || pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
425
    code = doCacheBlockForDtw(&pInfo->corrSupp, id, pBlock);
×
426
  }
427

428
  return code;
×
429
}
430

431
static int32_t finishBuildRequest(SAnalysisOperatorInfo* pInfo, SBaseSupp* pSupp, const char* id) {
×
432
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
433
  int32_t       code = 0;
×
434

435
  // let's check existed rows for imputation
436
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
437
    if (pSupp->numOfRows < ANALY_IMPUTATION_INPUT_MIN_ROWS) {
×
438
      qError("%s input rows for imputation aren't enough, min required:%d, current:%d", id, ANALY_IMPUTATION_INPUT_MIN_ROWS,
×
439
             pSupp->numOfRows);
440
      return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
441
    }
442

443
    code = estResultRowsAfterImputation(pSupp->numOfRows, pSupp->win.skey, pSupp->win.ekey, 
×
444
      pInfo->imputatSup.tsPrecision, pInfo->imputatSup.freq, id);
×
445
    if (code != 0) {
×
446
      return code;
×
447
    }
448
  }
449

450
  for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
451
    code = taosAnalyBufWriteColEnd(pBuf, i);
×
452
    if (code != 0) return code;
×
453
  }
454

455
  code = taosAnalyBufWriteDataEnd(pBuf);
×
456
  if (code != 0) return code;
×
457

458
  code = taosAnalyBufWriteOptStr(pBuf, "option", pInfo->options);
×
459
  if (code != 0) return code;
×
460

461
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pInfo->algoName);
×
462
  if (code != 0) return code;
×
463

464
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
465
    const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
466
    int32_t p = pInfo->imputatSup.tsPrecision;
×
467

468
    switch (p) {
×
469
      case TSDB_TIME_PRECISION_MICRO:
×
470
        prec = TSDB_TIME_PRECISION_MICRO_STR;
×
471
        break;
×
472
      case TSDB_TIME_PRECISION_NANO:
×
473
        prec = TSDB_TIME_PRECISION_NANO_STR;
×
474
        break;
×
475
      case TSDB_TIME_PRECISION_SECONDS:
×
476
        prec = "s";
×
477
        break;
×
478
      default:
×
479
        prec = TSDB_TIME_PRECISION_MILLI_STR;
×
480
    }
481

482
    code = taosAnalyBufWriteOptStr(pBuf, "freq", pInfo->imputatSup.freq);
×
483
    if (code != 0) return code;
×
484

485
    code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
×
486
    if (code != 0) return code;
×
487
  }
488

489
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
×
490
  if (code != 0) return code;
×
491

492
  code = taosAnalyBufClose(pBuf);
×
493
  return code;
×
494
}
495

496
static int32_t buildDtwPathResult(SJson* pathJson, SColumnInfoData* pResTargetCol, const char* pId, char* buf,
×
497
                                  int32_t bufSize) {
498
  int32_t pair = tjsonGetArraySize(pathJson);
×
499
  if (pair != 2) {
×
500
    qError("%s invalid path data, should be array of pair", pId);
×
501
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
502
  }
503

504
  SJson* first = tjsonGetArrayItem(pathJson, 0);
×
505
  SJson* second = tjsonGetArrayItem(pathJson, 1);
×
506

507
  int64_t t1, t2;
×
508
  tjsonGetObjectValueBigInt(first, &t1);
×
509
  tjsonGetObjectValueBigInt(second, &t2);
×
510

511
  int32_t n = snprintf(varDataVal(buf), bufSize - VARSTR_HEADER_SIZE, "(%" PRId64 ", %" PRId64 ")", t1, t2);
×
512
  if (n > 0) {
×
513
    varDataSetLen(buf, n);
×
514
    return TSDB_CODE_SUCCESS;
×
515
  } else {
516
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
517
  }
518
}
519

520
static int32_t doAnalysisImpl(SAnalysisOperatorInfo* pInfo, SBaseSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
×
521
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
522
  int32_t       resCurRow = pBlock->info.rows;
×
523
  int64_t       tmpI64 = 0;
×
524
  double        tmpDouble = 0;
×
525
  int32_t       code = 0;
×
526

527
  SColumnInfoData* pResTargetCol = taosArrayGet(pBlock->pDataBlock, pInfo->resTargetSlot);
×
528
  if (NULL == pResTargetCol) {
×
529
    return terrno;
×
530
  }
531

532
  SJson* pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout, pId);
×
533
  if (pJson == NULL) {
×
534
    return terrno;
×
535
  }
536

537
  int32_t rows = 0;
×
538
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
539
  if (rows < 0 && code == 0) {
×
540
    char pMsg[1024] = {0};
×
541
    code = tjsonGetStringValue(pJson, "msg", pMsg);
×
542
    if (code != 0) {
×
543
      qError("%s failed to get msg from rsp, unknown error", pId);
×
544
    } else {
545
      qError("%s failed to exec analysis, msg:%s", pId, pMsg);
×
546
    }
547

548
    tjsonDelete(pJson);
×
549
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
550
  }
551

552
  if (code < 0) {
×
553
    goto _OVER;
×
554
  }
555

556
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
557
    SJson* pTarget = tjsonGetObjectItem(pJson, "target");
×
558
    if (pTarget == NULL) goto _OVER;
×
559

560
    SJson* pTsList = tjsonGetObjectItem(pJson, "ts");
×
561
    if (pTsList == NULL) goto _OVER;
×
562

563
    SJson* pMask = tjsonGetObjectItem(pJson, "mask");
×
564
    if (pMask == NULL) goto _OVER;
×
565

566
    int32_t listLen = tjsonGetArraySize(pTarget);
×
567
    if (listLen != rows) {
×
568
      goto _OVER;
×
569
    }
570

571
    if (pInfo->imputatSup.resTsSlot != -1) {
×
572
      SColumnInfoData* pResTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->imputatSup.resTsSlot);
×
573
      if (pResTsCol != NULL) {
×
574
        for (int32_t i = 0; i < rows; ++i) {
×
575
          SJson* tsJson = tjsonGetArrayItem(pTsList, i);
×
576
          tjsonGetObjectValueBigInt(tsJson, &tmpI64);
×
577
          colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
×
578
          resCurRow++;
×
579
        }
580
      }
581
    }
582

583
    resCurRow = pBlock->info.rows;
×
584
    if (pResTargetCol->info.type == TSDB_DATA_TYPE_DOUBLE) {
×
585
      for (int32_t i = 0; i < rows; ++i) {
×
586
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
587
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
588
        colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
589
        resCurRow++;
×
590
      }
591
    } else if (pResTargetCol->info.type == TSDB_DATA_TYPE_INT) {
×
592
      for (int32_t i = 0; i < rows; ++i) {
×
593
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
594
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
595
        int32_t t = tmpDouble;
×
596
        colDataSetInt32(pResTargetCol, resCurRow, &t);
×
597
        resCurRow++;
×
598
      }
599
    } else if (pResTargetCol->info.type == TSDB_DATA_TYPE_FLOAT) {
×
600
      for (int32_t i = 0; i < rows; ++i) {
×
601
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
602
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
603
        float t = tmpDouble;
×
604
        colDataSetFloat(pResTargetCol, resCurRow, &t);
×
605
        resCurRow++;
×
606
      }
607
    }
608

609
    if (pInfo->imputatSup.resMarkSlot != -1) {
×
610
      SColumnInfoData* pResMarkCol = taosArrayGet(pBlock->pDataBlock, pInfo->imputatSup.resMarkSlot);
×
611
      if (pResMarkCol != NULL) {
×
612
        resCurRow = pBlock->info.rows;
×
613
        for (int32_t i = 0; i < rows; ++i) {
×
614
          SJson* markJson = tjsonGetArrayItem(pMask, i);
×
615
          tjsonGetObjectValueBigInt(markJson, &tmpI64);
×
616
          int32_t v = tmpI64;
×
617
          colDataSetInt32(pResMarkCol, resCurRow, &v);
×
618
          resCurRow++;
×
619
        }
620
      }
621
    }
622
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW) {
×
623
    // dtw result example:
624
    // {'option': 'algo=dtw,radius=1', 'rows': 4, 'distance': 1.6, 'path': [(0, 0), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)]}
625

626
    SJson* pTarget = tjsonGetObjectItem(pJson, "distance");
×
627
    if (pTarget == NULL) goto _OVER;
×
628

629
    tjsonGetObjectValueDouble(pTarget, &tmpDouble);
×
630
    colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
631
    rows = 1;
×
632
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
633
    // dtw path results are the same as above
634
    SJson* pPath = tjsonGetObjectItem(pJson, "path");
×
635
    if (pPath == NULL) goto _OVER;
×
636

637
    int32_t listLen = tjsonGetArraySize(pPath);
×
638
    if (listLen != rows) {
×
639
      goto _OVER;
×
640
    }
641

642
    for (int32_t i = 0; i < rows; ++i) {
×
643
      SJson* pathJson = tjsonGetArrayItem(pPath, i);
×
644

645
      char buf[128 + VARSTR_HEADER_SIZE] = {0};
×
646
      code = buildDtwPathResult(pathJson, pResTargetCol, pId, buf, sizeof(buf));
×
647
      if (code != 0) {
×
648
        qError("%s failed to build path result, code:%s", pId, tstrerror(code));
×
649
        goto _OVER;
×
650
      }
651

652
      code = colDataSetVal(pResTargetCol, resCurRow, buf, false);
×
653
      if (code != 0) {
×
654
        qError("%s failed to set path result to column, code:%s", pId, tstrerror(code));
×
655
        goto _OVER;
×
656
      }
657

658
      resCurRow++;
×
659
    }
660
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
661
    // tlcc result example:
662
    // {'option': 'algo=tlcc,lag_start=-1,lag_end=1', 'rows': 3, 'lags': [-1, 0, 1], 'ccf_vals': [-0.24, 0.9, -0.2]}
663
    SJson* pLags = tjsonGetObjectItem(pJson, "lags");
×
664
    if (pLags == NULL) goto _OVER;
×
665

666
    SJson* pCcfVals = tjsonGetObjectItem(pJson, "ccf_vals");
×
667
    if (pCcfVals == NULL) goto _OVER;
×
668

669
    for (int32_t i = 0; i < rows; ++i) {
×
670
      SJson* ccfValJson = tjsonGetArrayItem(pCcfVals, i);
×
671
      tjsonGetObjectValueDouble(ccfValJson, &tmpDouble);
×
672

673
      int64_t index = 0;
×
674
      SJson*  pLastIndexJson = tjsonGetArrayItem(pLags, i);
×
675
      tjsonGetObjectValueBigInt(pLastIndexJson, &index);
×
676

677
      char    buf[128 + VARSTR_HEADER_SIZE] = {0};
×
678
      int32_t n = snprintf(varDataVal(buf), tListLen(buf) - VARSTR_HEADER_SIZE, "(%" PRId64 ", %.4f)", index, tmpDouble);
×
679
      if (n > 0) {
×
680
        varDataSetLen(buf, n);
×
681
      } else {
682
        return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
683
      }
684

685
      code = colDataSetVal(pResTargetCol, resCurRow, buf, false);
×
686
      resCurRow++;
×
687
    }
688

689
    tjsonGetObjectValueDouble(pLags, &tmpDouble);
×
690
    colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
691
  }
692

693
  pBlock->info.rows += rows;
×
694

695
  if (pJson != NULL) tjsonDelete(pJson);
×
696
  return 0;
×
697

698
_OVER:
×
699
  tjsonDelete(pJson);
×
700
  if (code == 0) {
×
701
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
702
  }
703

704
  qError("%s failed to perform analysis finalize since %s", pId, tstrerror(code));
×
705
  return code;
×
706
}
707

708
static int32_t doAnalysis(SAnalysisOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
709
  int32_t                  code = TSDB_CODE_SUCCESS;
×
710
  int32_t                  lino = 0;
×
711
  char*                    id = GET_TASKID(pTaskInfo);
×
712
  SSDataBlock*             pRes = pInfo->binfo.pRes;
×
713

714
  SBaseSupp* pSupp = (pInfo->analysisType==FUNCTION_TYPE_IMPUTATION)? &pInfo->imputatSup.base:&pInfo->corrSupp.base;
×
715

716
  if (pSupp->numOfRows <= 0) {
×
717
    taosArrayClear(pSupp->pBlocks);
×
718
    pSupp->numOfRows = 0;
×
719
    return code;
×
720
  }
721

722
  qDebug("%s group:%" PRId64 ", do analysis, rows:%d", id, pSupp->groupId, pSupp->numOfRows);
×
723
  pRes->info.id.groupId = pSupp->groupId;
×
724

725
  code = finishBuildRequest(pInfo, pSupp, id);
×
726
  QUERY_CHECK_CODE(code, lino, _end);
×
727

728
  //   if (pBlock->info.rows < pBlock->info.capacity) {
729
  //   return TSDB_CODE_SUCCESS;
730
  // }
731

732
  // code = blockDataEnsureCapacity(pRes, newRowsNum);
733
  // if (code != TSDB_CODE_SUCCESS) {
734
  //   qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
735
  //   return code;
736
  // }
737

738
  // QUERY_CHECK_CODE(code, lino, _end);
739

740
  code = doAnalysisImpl(pInfo, pSupp, pRes, id);
×
741
  QUERY_CHECK_CODE(code, lino, _end);
×
742

743
  uInfo("%s block:%d, analysis finalize", id, pSupp->numOfBlocks);
×
744

745
_end:
×
746
  pSupp->numOfBlocks = 0;
×
747
  taosAnalyBufDestroy(&pSupp->analyBuf);
×
748
  return code;
×
749
}
750

751
static int32_t doParseInputForImputation(SAnalysisOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
752
  int32_t code = 0;
×
753
  SNode*  pNode = NULL;
×
754

755
  FOREACH(pNode, pFuncs) {
×
756
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
757
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
758
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
759

760
      if (pFunc->funcType == FUNCTION_TYPE_IMPUTATION) {
×
761
        // code = validInputParams(pFunc, id);
762
        // if (code) {
763
          // return code;
764
        // }
765

766
        pSupp->base.numOfCols = 2;
×
767

768
        if (numOfParam == 2) {
×
769
          // column, ts
770
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
771
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
772

773
          pSupp->tsSlot = pTsNode->slotId;
×
774
          pSupp->tsPrecision = pTsNode->node.resType.precision;
×
775
          pSupp->targetSlot = pTargetNode->slotId;
×
776
          pSupp->targetType = pTargetNode->node.resType.type;
×
777

778
          // let's set the moment as the default imputation algorithm
779
          pInfo->options = taosStrdup("algo=moment");
×
780
        } else {
781
          // column, options, ts
782
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
783
          if (nodeType(pTargetNode) != QUERY_NODE_COLUMN) {
×
784
            // return error
785
          }
786

787
          bool assignTs = false;
×
788
          bool assignOpt = false;
×
789

790
          pSupp->targetSlot = pTargetNode->slotId;
×
791
          pSupp->targetType = pTargetNode->node.resType.type;
×
792

793
          for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
×
794
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
×
795
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
796
              SColumnNode* pColNode = (SColumnNode*)pNode;
×
797
              if (pColNode->isPrimTs && (!assignTs)) {
×
798
                pSupp->tsSlot = pColNode->slotId;
×
799
                pSupp->tsPrecision = pColNode->node.resType.precision;
×
800
                assignTs = true;
×
801
                continue;
×
802
              }
803
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
804
              if (!assignOpt) {
×
805
                SValueNode* pOptNode = (SValueNode*)pNode;
×
806
                pInfo->options = taosStrdup(pOptNode->literal);
×
807
                assignOpt = true;
×
808
                continue;
×
809
              }
810
            }
811
          }
812

813
          if (!assignOpt) {
×
814
            qError("%s option is missing, failed to do imputation", id);
×
815
            code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
816
          }
817
        }
818
      }
819
    }
820
  }
821

822
  if (pInfo->options == NULL) {
×
823
    qError("%s option is missing or clone option string failed, failed to do imputation", id);
×
824
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
825
  }
826

827
  return code;
×
828
}
829

830
int32_t doParseInputForDtw(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
831
  int32_t code = 0;
×
832
  SNode*  pNode = NULL;
×
833

834
  FOREACH(pNode, pFuncs) {
×
835
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
836
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
837
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
838

839
      if (pFunc->funcType == FUNCTION_TYPE_DTW || pFunc->funcType == FUNCTION_TYPE_DTW_PATH) {
×
840
        // code = validInputParams(pFunc, id);
841
        // if (code) {
842
          // return code;
843
        // }
844

845
        pSupp->base.numOfCols = 2;
×
846

847
        if (numOfParam == 2) {
×
848
          // column1, column2
849
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
850
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
851

852
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
853
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
854

855
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
856
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
857

858
          // let's set the default radius to be 1
859
          pInfo->options = taosStrdup("algo=dtw,radius=1");
×
860
        } else if (numOfParam == 3) {
×
861
          // column, options, ts
862

863
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
864
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
865

866
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
867
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
868

869
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
870
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
871

872
          SValueNode* pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
×
873

874
          int32_t bufLen = strlen(pOptNode->literal) + 30;
×
875
          pInfo->options = taosMemoryMalloc(bufLen);
×
876
          if (pInfo->options == NULL) {
×
877
            code = terrno;
×
878
            qError("%s failed to prepare option buffer, code:%s", id, tstrerror(code));
×
879
            return code;
×
880
          }
881

882
          int32_t ret = snprintf(pInfo->options, bufLen, "%s,%s", pOptNode->literal, "algo=dtw");
×
883
          if (ret < 0 || ret >= bufLen) {
×
884
            code = TSDB_CODE_OUT_OF_MEMORY;
×
885
            qError("%s failed to clone options string, code:%s", id, tstrerror(code));
×
886
            return code;
×
887
          }
888
        } else {
889
          qError("%s too many parameters in dtw function", id);
×
890
          code = TSDB_CODE_INVALID_PARA;
×
891
          return code;
×
892
        }
893
      }
894
    }
895
  }
896

897
  if (pInfo->options == NULL) {
×
898
    qError("%s option is missing or clone option string failed, failed to do correlation analysis", id);
×
899
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
900
  }
901

902
  return code;
×
903
}
904

905
static int32_t doParseInputForTlcc(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
906
  int32_t code = 0;
×
907
  SNode*  pNode = NULL;
×
908

909
  FOREACH(pNode, pFuncs) {
×
910
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
911
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
912
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
913

914
      if (pFunc->funcType == FUNCTION_TYPE_TLCC) {
×
915
        // code = validInputParams(pFunc, id);
916
        // if (code) {
917
          // return code;
918
        // }
919

920
        pSupp->base.numOfCols = 2;
×
921

922
        if (numOfParam == 2) {
×
923
          // column1, column2
924
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
925
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
926

927
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
928
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
929

930
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
931
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
932

933
          // let's set the default radius to be 1
934
          pInfo->options = taosStrdup("algo=tlcc,lag_start=-1,lag_end=1");
×
935
        } else if (numOfParam == 3) {
×
936
          // column, options, ts
937
          // SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
938
          // if (nodeType(pTargetNode) != QUERY_NODE_COLUMN) {
939
          // return error
940
          // }
941

942
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
943
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
944

945
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
946
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
947

948
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
949
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
950

951
          SValueNode* pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
×
952

953
          int32_t bufLen = strlen(pOptNode->literal) + 30;
×
954
          pInfo->options = taosMemoryMalloc(bufLen);
×
955
          if (pInfo->options == NULL) {
×
956
            code = terrno;
×
957
            qError("%s failed to prepare option buffer, code:%s", id, tstrerror(code));
×
958
            return code;
×
959
          }
960

961
          int32_t ret = snprintf(pInfo->options, bufLen, "%s,%s", pOptNode->literal, "algo=tlcc");
×
962
          if (ret < 0 || ret >= bufLen) {
×
963
            code = TSDB_CODE_OUT_OF_MEMORY;
×
964
            qError("%s failed to clone options string, code:%s", id, tstrerror(code));
×
965
            return code;
×
966
          }
967
        } else {
968
          qError("%s too many parameters in tlcc function", id);
×
969
          code = TSDB_CODE_INVALID_PARA;
×
970
          return code;
×
971
        }
972
      }
973
    }
974
  }
975

976
  if (pInfo->options == NULL) {
×
977
    qError("%s option is missing or clone option string failed, failed to do correlation analysis", id);
×
978
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
979
  }
980

981
  return code;
×
982
}
983

984
static int32_t doSetResSlot(SAnalysisOperatorInfo* pInfo, SExprSupp* pExprSup) {
×
985
  pInfo->imputatSup.resTsSlot = -1;
×
986
  pInfo->resTargetSlot = -1;
×
987
  pInfo->imputatSup.resMarkSlot = -1;
×
988

989
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
×
990
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
×
991
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
×
992
    int32_t    functionType = pExprInfo->pExpr->_function.functionType;
×
993
    if (functionType == FUNCTION_TYPE_IMPUTATION || functionType == FUNCTION_TYPE_DTW ||
×
994
        functionType == FUNCTION_TYPE_DTW_PATH || functionType == FUNCTION_TYPE_TLCC) {
×
995
      pInfo->resTargetSlot = dstSlot;
×
996
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_ROWTS) {
×
997
      pInfo->imputatSup.resTsSlot = dstSlot;
×
998
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_MARK) {
×
999
      pInfo->imputatSup.resMarkSlot = dstSlot;
×
1000
    }
1001
  }
1002

1003
  return 0;
×
1004
}
1005

1006
void doInitBaseOptions(SBaseSupp* pSupp) {
×
1007
  pSupp->numOfCols = 0;
×
1008
  pSupp->numOfRows = 0;
×
1009
  pSupp->numOfBlocks = 0;
×
1010
  pSupp->wncheck = ANALY_DEFAULT_WNCHECK;
×
1011
  pSupp->timeout = ANALY_DEFAULT_TIMEOUT;
×
1012
  pSupp->groupId = 0;
×
1013

1014
  pSupp->win.skey = INT64_MAX;
×
1015
  pSupp->win.ekey = INT64_MIN;
×
1016
}
×
1017

1018
void doInitImputOptions(SImputationSupp* pSupp) {
×
1019
  doInitBaseOptions(&pSupp->base);
×
1020

1021
  pSupp->freq[0] = 'S';  // d(day), h(hour), m(minute),s(second), ms(millisecond), us(microsecond)
×
1022
  pSupp->freq[1] = '\0';
×
1023

1024
  pSupp->tsSlot = -1;
×
1025
  pSupp->targetSlot = -1;
×
1026
  pSupp->targetType = -1;
×
1027
  pSupp->tsPrecision = -1;
×
1028
}
×
1029

1030
void doInitDtwOptions(SCorrelationSupp* pSupp) {
×
1031
  doInitBaseOptions(&pSupp->base);
×
1032
  pSupp->radius = 1;
×
1033

1034
  pSupp->lagStart = -1;
×
1035
  pSupp->lagEnd = 1;
×
1036

1037
  pSupp->targetSlot1 = -1;
×
1038
  pSupp->targetSlot1Type = TSDB_DATA_TYPE_INT;
×
1039
  pSupp->targetSlot2 = -1;
×
1040
  pSupp->targetSlot2Type = TSDB_DATA_TYPE_INT;
×
1041
}
×
1042

1043
int32_t parseFreq(SImputationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1044
  int32_t     code = 0;
×
1045
  char*       p = NULL;
×
1046
  int32_t     len = 0;
×
1047

1048
  char* pFreq = taosHashGet(pHashMap, FREQ_STR, strlen(FREQ_STR));
×
1049
  if (pFreq != NULL) {
×
1050
    len = taosHashGetValueSize(pFreq);
×
1051
    p = taosStrndupi(pSupp->freq, len);
×
1052
    if (p == NULL) {
×
1053
      qError("%s failed to clone the freq param, code:%s", id, strerror(terrno));
×
1054
      return terrno;
×
1055
    }
1056

1057
    if (len >= tListLen(pSupp->freq)) {
×
1058
      qError("%s invalid freq parameter: %s", id, p);
×
1059
      code = TSDB_CODE_INVALID_PARA;
×
1060
    } else {
1061
      if ((len == 1) && (strncmp(pFreq, "d", 1) != 0) && (strncmp(pFreq, "h", 1) != 0) &&
×
1062
          (strncmp(pFreq, "m", 1) != 0) && (strncmp(pFreq, "s", 1) != 0)) {
×
1063
        code = TSDB_CODE_INVALID_PARA;
×
1064
        qError("%s invalid freq parameter: %s", id, p);
×
1065
      } else if ((len == 2) && (strncmp(pFreq, "ms", 2) != 0) && (strncmp(pFreq, "us", 2) != 0)) {
×
1066
        code = TSDB_CODE_INVALID_PARA;
×
1067
        qError("%s invalid freq parameter: %s", id, p);
×
1068
      } else if (len > 2) {
×
1069
        code = TSDB_CODE_INVALID_PARA;
×
1070
        qError("%s invalid freq parameter: %s", id, p);
×
1071
      }
1072
    }
1073

1074
    if (code == TSDB_CODE_SUCCESS) {
×
1075
      tstrncpy(pSupp->freq, pFreq, len + 1);
×
1076
      qDebug("%s data freq:%s", id, pSupp->freq);
×
1077
    }
1078
  } else {
1079
    qDebug("%s not specify data freq, default: %s", id, pSupp->freq);
×
1080
  }
1081
  
1082
  taosMemoryFreeClear(p);
×
1083
  return code;
×
1084
}
1085

1086
void parseRadius(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1087
  char* pRadius = taosHashGet(pHashMap, RADIUS_STR, strlen(RADIUS_STR));
×
1088
  if (pRadius != NULL) {
×
1089
    pSupp->radius = *(int32_t*)pRadius;
×
1090
    qDebug("%s dtw search radius:%d", id, pSupp->radius);
×
1091
  } else {
1092
    qDebug("%s not specify search radius, default: %d", id, pSupp->radius);
×
1093
  }
1094
}
×
1095

1096
void parseLag(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1097
  char* pLagStart = taosHashGet(pHashMap, LAGSTART_STR, strlen(LAGSTART_STR));
×
1098
  if (pLagStart != NULL) {
×
1099
    pSupp->lagStart = *(int32_t*)pLagStart;
×
1100
    qDebug("%s tlcc lag start:%d", id, pSupp->lagStart);
×
1101
  } else {
1102
    qDebug("%s not specify tlcc lag start, default: %d", id, pSupp->lagStart);
×
1103
  }
1104

1105
  char* pLagEnd = taosHashGet(pHashMap, LAGEND_STR, strlen(LAGEND_STR));
×
1106
  if (pLagEnd != NULL) {
×
1107
    pSupp->lagEnd = *(int32_t*)pLagEnd;
×
1108
    qDebug("%s tlcc lag end:%d", id, pSupp->lagEnd);
×
1109
  } else {
1110
    qDebug("%s not specify tlcc lag end, default: %d", id, pSupp->lagEnd);
×
1111
  }
1112
}
×
1113

1114
int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, int32_t prec, const char* pFreq, const char* id) {
×
1115
  int64_t range = ekey - skey;
×
1116
  double  factor = 0;
×
1117
  if (prec == TSDB_TIME_PRECISION_MILLI) {
×
1118
    if (strcmp(pFreq, "h") == 0) {
×
1119
      factor = 0.001 * 1/3600;
×
1120
    } else if (strcmp(pFreq, "m") == 0) {
×
1121
      factor = 0.001 * 1/60;
×
1122
    } else if (strcmp(pFreq, "s") == 0) {
×
1123
      factor = 0.001;
×
1124
    } else if (strcmp(pFreq, "ms") == 0) {
×
1125
      factor = 1;
×
1126
    } else if (strcmp(pFreq, "us") == 0) {
×
1127
      factor *= 1000;
×
1128
    } else if (strcmp(pFreq, "ns") == 0) {
×
1129
      factor *= 1000000;
×
1130
    }
1131

1132
    int64_t num = range * factor - rows;
×
1133
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1134
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
1135
      return TSDB_CODE_INVALID_PARA;
×
1136
    }
1137
  } else if (prec == TSDB_TIME_PRECISION_MICRO) {
×
1138
    if (strcmp(pFreq, "h") == 0) {
×
1139
      factor = 0.000001 * 1/3600;
×
1140
    } else if (strcmp(pFreq, "m") == 0) {
×
1141
      factor = 0.000001 * 1/60;
×
1142
    } else if (strcmp(pFreq, "s") == 0) {
×
1143
      factor = 0.000001;
×
1144
    } else if (strcmp(pFreq, "ms") == 0) {
×
1145
      factor = 1000;
×
1146
    } else if (strcmp(pFreq, "us") == 0) {
×
1147
      factor *= 1;
×
1148
    } else if (strcmp(pFreq, "ns") == 0) {
×
1149
      factor *= 1000;
×
1150
    }
1151

1152
    int64_t num = range * factor - rows;
×
1153
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1154
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
1155
      return TSDB_CODE_INVALID_PARA;
×
1156
    }
1157
  } else if (prec == TSDB_TIME_PRECISION_NANO) {
×
1158
    if (strcmp(pFreq, "h") == 0) {
×
1159
      factor = 0.000000001 * 1/3600;
×
1160
    } else if (strcmp(pFreq, "m") == 0) {
×
1161
      factor = 0.000000001 * 1/60;
×
1162
    } else if (strcmp(pFreq, "s") == 0) {
×
1163
      factor = 0.000000001;
×
1164
    } else if (strcmp(pFreq, "ms") == 0) {
×
1165
      factor = 1000000;
×
1166
    } else if (strcmp(pFreq, "us") == 0) {
×
1167
      factor *= 1000;
×
1168
    } else if (strcmp(pFreq, "ns") == 0) {
×
1169
      factor *= 1;
×
1170
    }
1171

1172
    int64_t num = range * factor - rows;
×
1173
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1174
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
1175
      return TSDB_CODE_INVALID_PARA;
×
1176
    }
1177
  }
1178

1179
  return TSDB_CODE_SUCCESS;
×
1180
}
1181

1182
int32_t doParseOption(SAnalysisOperatorInfo* pInfo, const char* id) {
×
1183
  int32_t   code = 0;
×
1184
  int32_t   lino = 0;
×
1185
  SHashObj* pHashMap = NULL;
×
1186

1187
  code = taosAnalyGetOpts(pInfo->options, &pHashMap);
×
1188
  if (code != TSDB_CODE_SUCCESS) {
×
1189
    return code;
×
1190
  }
1191

1192
  int32_t type = 0;
×
1193
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1194
    type = ANALY_ALGO_TYPE_IMPUTATION;
×
1195
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH ||
×
1196
             pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1197
    type = ANALY_ALGO_TYPE_CORREL;
×
1198
  }
1199

1200
  code = taosAnalysisParseAlgo(pInfo->options, pInfo->algoName, pInfo->algoUrl, type, tListLen(pInfo->algoUrl),
×
1201
                               pHashMap, id);
1202
  TSDB_CHECK_CODE(code, lino, _end);
×
1203

1204
  // extract the timeout parameter
1205
  SBaseSupp* pSupp =
×
1206
      (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) ? &pInfo->imputatSup.base : &pInfo->corrSupp.base;
×
1207
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
×
1208
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
1209

1210
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1211
    // extract data freq
1212
    code = parseFreq(&pInfo->imputatSup, pHashMap, id);
×
1213
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
1214
    parseRadius(&pInfo->corrSupp, pHashMap, id);
×
1215
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1216
    parseLag(&pInfo->corrSupp, pHashMap, id);
×
1217
  }
1218

1219
_end:
×
1220
  taosHashCleanup(pHashMap);
×
1221
  return code;
×
1222
}
1223

1224
static int32_t doCreateBuf(SAnalysisOperatorInfo* pInfo, const char* pId) {
×
1225
  SBaseSupp* pSupp =
×
1226
      (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) ? &pInfo->imputatSup.base : &pInfo->corrSupp.base;
×
1227

1228
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
1229
  int64_t       ts = taosGetTimestampNs();
×
1230
  int32_t       index = 0;
×
1231

1232
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
×
1233
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-analysis-%p-%" PRId64, tsTempDir, pInfo, ts);
×
1234

1235
  int32_t code = tsosAnalyBufOpen(pBuf, pSupp->numOfCols, pId);
×
1236
  if (code != 0) goto _OVER;
×
1237

1238
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1239
    code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
1240
    if (code != 0) goto _OVER;
×
1241

1242
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->imputatSup.targetType, "val");
×
1243
    if (code != 0) goto _OVER;
×
1244
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH || pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1245
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->corrSupp.targetSlot1Type, "val");
×
1246
    if (code != 0) goto _OVER;
×
1247

1248
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->corrSupp.targetSlot2Type, "val1");
×
1249
    if (code != 0) goto _OVER;
×
1250
  }
1251

1252
  code = taosAnalyBufWriteDataBegin(pBuf);
×
1253
  if (code != 0) goto _OVER;
×
1254

1255
  for (int32_t i = 0; i < pSupp->numOfCols; ++i) {
×
1256
    code = taosAnalyBufWriteColBegin(pBuf, i);
×
1257
    if (code != 0) goto _OVER;
×
1258
  }
1259

1260
_OVER:
×
1261
  if (code != 0) {
×
1262
    (void)taosAnalyBufClose(pBuf);
×
1263
    taosAnalyBufDestroy(pBuf);
×
1264
  }
1265
  return code;
×
1266
}
1267

1268
#else
1269

1270
int32_t createGenericAnalysisOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
1271
                                     SOperatorInfo** pOptrInfo) {
1272
  return TSDB_CODE_OPS_NOT_SUPPORT;
1273
}
1274
void analysisDestroyOperatorInfo(void* param) {}
1275

1276
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc