• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/common/src/tanalytics.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tanalytics.h"
18
#include "ttypes.h"
19
#include "tutil.h"
20

21
#ifdef USE_ANALYTICS
22
#include <curl/curl.h>
23
#define ANALYTICS_ALOG_SPLIT_CHAR ","
24

25
typedef struct {
26
  int64_t       ver;
27
  SHashObj     *hash;  // algoname:algotype -> SAnalyticsUrl
28
  TdThreadMutex lock;
29
} SAlgoMgmt;
30

31
typedef struct {
32
  char   *data;
33
  int64_t dataLen;
34
} SCurlResp;
35

36
static SAlgoMgmt tsAlgos = {0};
37
static int32_t   taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
38

39
const char *taosAnalysisAlgoType(EAnalAlgoType type) {
×
40
  switch (type) {
×
41
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
42
      return "anomaly-detection";
×
43
    case ANALY_ALGO_TYPE_FORECAST:
×
44
      return "forecast";
×
45
    default:
×
46
      return "unknown";
×
47
  }
48
}
49

50
const char *taosAnalyAlgoUrlStr(EAnalAlgoType type) {
×
51
  switch (type) {
×
52
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
53
      return "anomaly-detect";
×
54
    case ANALY_ALGO_TYPE_FORECAST:
×
55
      return "forecast";
×
56
    default:
×
57
      return "unknown";
×
58
  }
59
}
60

61
EAnalAlgoType taosAnalyAlgoInt(const char *name) {
×
62
  for (EAnalAlgoType i = 0; i < ANALY_ALGO_TYPE_END; ++i) {
×
63
    if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) {
×
64
      return i;
×
65
    }
66
  }
67

68
  return ANALY_ALGO_TYPE_END;
×
69
}
70

71
int32_t taosAnalyticsInit() {
×
72
  if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
×
73
    uError("failed to init curl");
×
74
    return -1;
×
75
  }
76

77
  tsAlgos.ver = 0;
×
78
  if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) {
×
79
    uError("failed to init algo mutex");
×
80
    return -1;
×
81
  }
82

83
  tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
×
84
  if (tsAlgos.hash == NULL) {
×
85
    uError("failed to init algo hash");
×
86
    return -1;
×
87
  }
88

89
  uInfo("analysis env is initialized");
×
90
  return 0;
×
91
}
92

93
static void taosAnalyFreeHash(SHashObj *hash) {
×
94
  void *pIter = taosHashIterate(hash, NULL);
×
95
  while (pIter != NULL) {
×
96
    SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
×
97
    taosMemoryFree(pUrl->url);
×
98
    pIter = taosHashIterate(hash, pIter);
×
99
  }
100
  taosHashCleanup(hash);
×
101
}
×
102

103
void taosAnalyticsCleanup() {
×
104
  curl_global_cleanup();
×
105
  if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) {
×
106
    uError("failed to destroy anal lock");
×
107
  }
108
  taosAnalyFreeHash(tsAlgos.hash);
×
109
  tsAlgos.hash = NULL;
×
110
  uInfo("analysis env is cleaned up");
×
111
}
×
112

113
void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {
×
114
  if (newVer > tsAlgos.ver) {
×
115
    if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
116
      SHashObj *hash = tsAlgos.hash;
×
117
      tsAlgos.ver = newVer;
×
118
      tsAlgos.hash = pHash;
×
119
      if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
120
        uError("failed to unlock hash")
×
121
      }
122
      taosAnalyFreeHash(hash);
×
123
    }
124
  } else {
125
    taosAnalyFreeHash(pHash);
×
126
  }
127
}
×
128

129
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
×
130
  char  buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
×
131
  char *pStart = NULL;
×
132
  char *pEnd = NULL;
×
133

134
  pStart = strstr(option, optName);
×
135
  if (pStart == NULL) {
×
136
    return false;
×
137
  }
138

139
  pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR);
×
140
  if (optMaxLen > 0) {
×
141
    if (pEnd > pStart) {
×
142
      int32_t len = (int32_t)(pEnd - pStart);
×
143
      len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN);
×
144
      tstrncpy(buf, pStart, len);
×
145
    } else {
146
      int32_t len = MIN(tListLen(buf), strlen(pStart) + 1);
×
147
      tstrncpy(buf, pStart, len);
×
148
    }
149

150
    char *pRight = strstr(buf, "=");
×
151
    if (pRight == NULL) {
×
152
      return false;
×
153
    } else {
154
      pRight += 1;
×
155
    }
156

157
    int32_t unused = strtrim(pRight);
×
158

159
    int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1);
×
160
    tstrncpy(optValue, pRight, vLen);
×
161
  }
162

163
  return true;
×
164
}
165

166
bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) {
×
167
  char    buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
×
168
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
×
169

170
  char *pos1 = strstr(option, buf);
×
171
  char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR);
×
172
  if (pos1 != NULL) {
×
173
    *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10);
×
174
    return true;
×
175
  } else {
176
    return false;
×
177
  }
178
}
179

180
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
×
181
  int32_t code = 0;
×
182
  char    name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0};
×
183
  int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
×
184

185
  char *unused = strntolower(name, name, nameLen);
×
186

187
  if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
188
    SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
×
189
    if (pUrl != NULL) {
×
190
      tstrncpy(url, pUrl->url, urlLen);
×
191
      uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url);
×
192
    } else {
193
      url[0] = 0;
×
194
      terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND;
×
195
      code = terrno;
×
196
      uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type));
×
197
    }
198

199
    if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
200
      uError("failed to unlock hash");
×
201
      return TSDB_CODE_OUT_OF_MEMORY;
×
202
    }
203
  }
204

205
  return code;
×
206
}
207

208
int64_t taosAnalyGetVersion() { return tsAlgos.ver; }
×
209

210
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
×
211
  SCurlResp *pRsp = userdata;
×
212
  if (contLen == 0 || nmemb == 0 || pCont == NULL) {
×
213
    pRsp->dataLen = 0;
×
214
    pRsp->data = NULL;
×
215
    uError("curl response is received, len:%" PRId64, pRsp->dataLen);
×
216
    return 0;
×
217
  }
218

219
  int64_t newDataSize = (int64_t)contLen * nmemb;
×
220
  int64_t size = pRsp->dataLen + newDataSize;
×
221

222
  if (pRsp->data == NULL) {
×
223
    pRsp->data = taosMemoryMalloc(size + 1);
×
224
    if (pRsp->data == NULL) {
×
225
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
226
      return 0;  // return the recv length, if failed, return 0
×
227
    }
228
  } else {
229
    char *p = taosMemoryRealloc(pRsp->data, size + 1);
×
230
    if (p == NULL) {
×
231
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
232
      return 0;  // return the recv length, if failed, return 0
×
233
    }
234

235
    pRsp->data = p;
×
236
  }
237

238
  if (pRsp->data != NULL) {
×
239
    (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize);
×
240

241
    pRsp->dataLen = size;
×
242
    pRsp->data[size] = 0;
×
243

244
    uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data);
×
245
    return newDataSize;
×
246
  } else {
247
    pRsp->dataLen = 0;
×
248
    uError("failed to malloc curl response");
×
249
    return 0;
×
250
  }
251
}
252

253
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) {
×
254
  CURL    *curl = NULL;
×
255
  CURLcode code = 0;
×
256

257
  curl = curl_easy_init();
×
258
  if (curl == NULL) {
×
259
    uError("failed to create curl handle");
×
260
    return -1;
×
261
  }
262

263
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
264
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
265
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
266
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100) != 0) goto _OVER;
×
267

268
  uDebug("curl get request will sent, url:%s", url);
×
269
  code = curl_easy_perform(curl);
×
270
  if (code != CURLE_OK) {
×
271
    uError("failed to perform curl action, code:%d", code);
×
272
  }
273

274
_OVER:
×
275
  if (curl != NULL) curl_easy_cleanup(curl);
×
276
  return code;
×
277
}
278

279
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout) {
×
280
  struct curl_slist *headers = NULL;
×
281
  CURL              *curl = NULL;
×
282
  CURLcode           code = 0;
×
283

284
  curl = curl_easy_init();
×
285
  if (curl == NULL) {
×
286
    uError("failed to create curl handle");
×
287
    return -1;
×
288
  }
289

290
  headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
×
291
  if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER;
×
292
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
293
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
294
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
295
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER;
×
296
  if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER;
×
297
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER;
×
298
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER;
×
299

300
  uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf);
×
301
  code = curl_easy_perform(curl);
×
302
  if (code != CURLE_OK) {
×
303
    uError("failed to perform curl action, code:%d", code);
×
304
  }
305

306
_OVER:
×
307
  if (curl != NULL) {
×
308
    curl_slist_free_all(headers);
×
309
    curl_easy_cleanup(curl);
×
310
  }
311
  return code;
×
312
}
313

314
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) {
×
315
  int32_t   code = -1;
×
316
  char     *pCont = NULL;
×
317
  int64_t   contentLen;
318
  SJson    *pJson = NULL;
×
319
  SCurlResp curlRsp = {0};
×
320

321
  if (type == ANALYTICS_HTTP_TYPE_GET) {
×
322
    if (taosCurlGetRequest(url, &curlRsp) != 0) {
×
323
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
324
      goto _OVER;
×
325
    }
326
  } else {
327
    code = taosAnalyBufGetCont(pBuf, &pCont, &contentLen);
×
328
    if (code != 0) {
×
329
      terrno = code;
×
330
      goto _OVER;
×
331
    }
332
    if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout) != 0) {
×
333
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
334
      goto _OVER;
×
335
    }
336
  }
337

338
  if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
×
339
    terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL;
×
340
    goto _OVER;
×
341
  }
342

343
  pJson = tjsonParse(curlRsp.data);
×
344
  if (pJson == NULL) {
×
345
    if (curlRsp.data[0] == '<') {
×
346
      terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
347
    } else {
348
      terrno = TSDB_CODE_INVALID_JSON_FORMAT;
×
349
    }
350
    goto _OVER;
×
351
  }
352

353
_OVER:
×
354
  if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
×
355
  if (pCont != NULL) taosMemoryFree(pCont);
×
356
  return pJson;
×
357
}
358

359
static int32_t taosAnalyJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
×
360
  int32_t   code = 0;
×
361
  int64_t   contLen;
362
  char     *pCont = NULL;
×
363
  TdFilePtr pFile = NULL;
×
364

365
  pFile = taosOpenFile(fileName, TD_FILE_READ);
×
366
  if (pFile == NULL) {
×
367
    code = terrno;
×
368
    goto _OVER;
×
369
  }
370

371
  code = taosFStatFile(pFile, &contLen, NULL);
×
372
  if (code != 0) goto _OVER;
×
373

374
  pCont = taosMemoryMalloc(contLen + 1);
×
375
  if (pCont == NULL) {
×
376
    code = TSDB_CODE_OUT_OF_MEMORY;
×
377
    goto _OVER;
×
378
  }
379

380
  if (taosReadFile(pFile, pCont, contLen) != contLen) {
×
381
    code = terrno;
×
382
    goto _OVER;
×
383
  }
384

385
  pCont[contLen] = '\0';
×
386

387
_OVER:
×
388
  if (code == 0) {
×
389
    *ppCont = pCont;
×
390
    *pContLen = contLen;
×
391
  } else {
392
    if (pCont != NULL) taosMemoryFree(pCont);
×
393
  }
394
  if (pFile != NULL) taosCloseFile(&pFile);
×
395
  return code;
×
396
}
397

398
static int32_t taosAnalyJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
399
  char    buf[64] = {0};
×
400
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
×
401
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
402
    return terrno;
×
403
  }
404
  return 0;
×
405
}
406

407
static int32_t taosAnalyJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
408
  char    buf[128] = {0};
×
409
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
×
410
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
411
    return terrno;
×
412
  }
413
  return 0;
×
414
}
415

416
static int32_t taosAnalyJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
417
  char    buf[128] = {0};
×
418
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
×
419
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
420
    return terrno;
×
421
  }
422
  return 0;
×
423
}
424

425
static int32_t taosAnalyJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
×
426
  if (bufLen <= 0) {
×
427
    bufLen = strlen(buf);
×
428
  }
429
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
430
    return terrno;
×
431
  }
432
  return 0;
×
433
}
434

435
static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); }
×
436

437
static int32_t tsosAnalyJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
438
  pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
439
  if (pBuf->filePtr == NULL) {
×
440
    return terrno;
×
441
  }
442

443
  pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf));
×
444
  if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
×
445
  pBuf->numOfCols = numOfCols;
×
446

447
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
448
    return taosAnalyJsonBufWriteStart(pBuf);
×
449
  }
450

451
  for (int32_t i = 0; i < numOfCols; ++i) {
×
452
    SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
453
    snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i);
×
454
    pCol->filePtr =
×
455
        taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
456
    if (pCol->filePtr == NULL) {
×
457
      return terrno;
×
458
    }
459
  }
460

461
  return taosAnalyJsonBufWriteStart(pBuf);
×
462
}
463

464
static int32_t taosAnalyJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
465
  char buf[128] = {0};
×
466
  bool first = (colIndex == 0);
×
467
  bool last = (colIndex == pBuf->numOfCols - 1);
×
468

469
  if (first) {
×
470
    if (taosAnalyJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
×
471
      return terrno;
×
472
    }
473
  }
474

475
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "  [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name,
×
476
                             tDataTypes[colType].bytes, last ? "" : ",");
477
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
478
    return terrno;
×
479
  }
480

481
  if (last) {
×
482
    if (taosAnalyJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
×
483
      return terrno;
×
484
    }
485
  }
486

487
  return 0;
×
488
}
489

490
static int32_t taosAnalyJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
491
  return taosAnalyJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
×
492
}
493

494
static int32_t taosAnalyJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
×
495
  if (bufLen <= 0) {
×
496
    bufLen = strlen(buf);
×
497
  }
498

499
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
500
    int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen);
×
501
    if (ret != bufLen) {
×
502
      return terrno;
×
503
    }
504
  } else {
505
    int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen);
×
506
    if (ret != bufLen) {
×
507
      return terrno;
×
508
    }
509
  }
510

511
  return 0;
×
512
}
513

514
static int32_t taosAnalyJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
515
  return taosAnalyJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
×
516
}
517

518
static int32_t taosAnalyJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
519
  if (colIndex == pBuf->numOfCols - 1) {
×
520
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
×
521

522
  } else {
523
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
×
524
  }
525
}
526

527
static int32_t taosAnalyJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
528
  char    buf[64];
529
  int32_t bufLen = 0;
×
530

531
  if (pBuf->pCols[colIndex].numOfRows != 0) {
×
532
    buf[bufLen] = ',';
×
533
    buf[bufLen + 1] = '\n';
×
534
    buf[bufLen + 2] = 0;
×
535
    bufLen += 2;
×
536
  }
537

538
  switch (colType) {
×
539
    case TSDB_DATA_TYPE_BOOL:
×
540
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0);
×
541
      break;
×
542
    case TSDB_DATA_TYPE_TINYINT:
×
543
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue);
×
544
      break;
×
545
    case TSDB_DATA_TYPE_UTINYINT:
×
546
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue);
×
547
      break;
×
548
    case TSDB_DATA_TYPE_SMALLINT:
×
549
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue);
×
550
      break;
×
551
    case TSDB_DATA_TYPE_USMALLINT:
×
552
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue);
×
553
      break;
×
554
    case TSDB_DATA_TYPE_INT:
×
555
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue);
×
556
      break;
×
557
    case TSDB_DATA_TYPE_UINT:
×
558
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue);
×
559
      break;
×
560
    case TSDB_DATA_TYPE_BIGINT:
×
561
    case TSDB_DATA_TYPE_TIMESTAMP:
562
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64, *(int64_t *)colValue);
×
563
      break;
×
564
    case TSDB_DATA_TYPE_UBIGINT:
×
565
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64, *(uint64_t *)colValue);
×
566
      break;
×
567
    case TSDB_DATA_TYPE_FLOAT:
×
568
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue));
×
569
      break;
×
570
    case TSDB_DATA_TYPE_DOUBLE:
×
571
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue));
×
572
      break;
×
573
    default:
×
574
      buf[bufLen] = '\0';
×
575
  }
576

577
  pBuf->pCols[colIndex].numOfRows++;
×
578
  return taosAnalyJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
×
579
}
580

581
static int32_t taosAnalyJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
582
  int32_t code = 0;
×
583
  char   *pCont = NULL;
×
584
  int64_t contLen = 0;
×
585

586
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
587
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
588
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
589

590
      code = taosFsyncFile(pCol->filePtr);
×
591
      if (code != 0) return code;
×
592

593
      code = taosCloseFile(&pCol->filePtr);
×
594
      if (code != 0) return code;
×
595

596
      code = taosAnalyJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
×
597
      if (code != 0) return code;
×
598

599
      code = taosAnalyJsonBufWriteStr(pBuf, pCont, contLen);
×
600
      if (code != 0) return code;
×
601

602
      taosMemoryFreeClear(pCont);
×
603
      contLen = 0;
×
604
    }
605
  }
606

607
  return taosAnalyJsonBufWriteStr(pBuf, "],\n", 0);
×
608
}
609

610
static int32_t taosAnalyJsonBufWriteEnd(SAnalyticBuf *pBuf) {
×
611
  int32_t code = taosAnalyJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
×
612
  if (code != 0) return code;
×
613

614
  return taosAnalyJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
×
615
}
616

617
int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
×
618
  int32_t code = taosAnalyJsonBufWriteEnd(pBuf);
×
619
  if (code != 0) return code;
×
620

621
  if (pBuf->filePtr != NULL) {
×
622
    code = taosFsyncFile(pBuf->filePtr);
×
623
    if (code != 0) return code;
×
624
    code = taosCloseFile(&pBuf->filePtr);
×
625
    if (code != 0) return code;
×
626
  }
627

628
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
629
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
630
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
631
      if (pCol->filePtr != NULL) {
×
632
        code = taosFsyncFile(pCol->filePtr);
×
633
        if (code != 0) return code;
×
634
        code = taosCloseFile(&pCol->filePtr);
×
635
        if (code != 0) return code;
×
636
      }
637
    }
638
  }
639

640
  return 0;
×
641
}
642

643
void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {
×
644
  if (pBuf->fileName[0] != 0) {
×
645
    if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
×
646
    // taosRemoveFile(pBuf->fileName);
647
    pBuf->fileName[0] = 0;
×
648
  }
649

650
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
651
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
652
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
653
      if (pCol->fileName[0] != 0) {
×
654
        if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
×
655
        if (taosRemoveFile(pCol->fileName) != 0) {
×
656
          uError("failed to remove file %s", pCol->fileName);
×
657
        }
658
        pCol->fileName[0] = 0;
×
659
      }
660
    }
661
  }
662

663
  taosMemoryFreeClear(pBuf->pCols);
×
664
  pBuf->numOfCols = 0;
×
665
}
×
666

667
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
668
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
669
    return tsosAnalyJsonBufOpen(pBuf, numOfCols);
×
670
  } else {
671
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
672
  }
673
}
674

675
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
676
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
677
    return taosAnalyJsonBufWriteOptStr(pBuf, optName, optVal);
×
678
  } else {
679
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
680
  }
681
}
682

683
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
684
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
685
    return taosAnalyJsonBufWriteOptInt(pBuf, optName, optVal);
×
686
  } else {
687
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
688
  }
689
}
690

691
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
692
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
693
    return taosAnalyJsonBufWriteOptFloat(pBuf, optName, optVal);
×
694
  } else {
695
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
696
  }
697
}
698

699
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
700
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
701
    return taosAnalyJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
×
702
  } else {
703
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
704
  }
705
}
706

707
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
708
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
709
    return taosAnalyJsonBufWriteDataBegin(pBuf);
×
710
  } else {
711
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
712
  }
713
}
714

715
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
716
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
717
    return taosAnalyJsonBufWriteColBegin(pBuf, colIndex);
×
718
  } else {
719
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
720
  }
721
}
722

723
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
724
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
725
    return taosAnalyJsonBufWriteColData(pBuf, colIndex, colType, colValue);
×
726
  } else {
727
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
728
  }
729
}
730

731
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
732
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
733
    return taosAnalyJsonBufWriteColEnd(pBuf, colIndex);
×
734
  } else {
735
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
736
  }
737
}
738

739
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
740
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
741
    return taosAnalyJsonBufWriteDataEnd(pBuf);
×
742
  } else {
743
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
744
  }
745
}
746

747
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) {
×
748
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
749
    return taosAnalJsonBufClose(pBuf);
×
750
  } else {
751
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
752
  }
753
}
754

755
static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
×
756
  *ppCont = NULL;
×
757
  *pContLen = 0;
×
758

759
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
760
    return taosAnalyJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
×
761
  } else {
762
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
763
  }
764
}
765

766
#else
767

768
int32_t taosAnalyticsInit() { return 0; }
769
void    taosAnalyticsCleanup() {}
770
SJson  *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) { return NULL; }
771

772
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
773
bool    taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
774
bool    taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; }
775
int64_t taosAnalyGetVersion() { return 0; }
776
void    taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {}
777

778
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
779
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
780
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
781
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
782
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
783
  return 0;
784
}
785
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
786
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
787
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
788
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
789
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
790
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) { return 0; }
791
void    taosAnalyBufDestroy(SAnalyticBuf *pBuf) {}
792

793
const char   *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; }
794
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
795
const char   *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }
796

797
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc