• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3535

23 Nov 2024 02:07AM UTC coverage: 60.85% (+0.03%) from 60.825%
#3535

push

travis-ci

web-flow
Merge pull request #28893 from taosdata/doc/internal

refact: rename taos lib name

120252 of 252737 branches covered (47.58%)

Branch coverage included in aggregate %.

201187 of 275508 relevant lines covered (73.02%)

15886166.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.74
/source/util/src/tanalytics.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tanalytics.h"
18
#include "ttypes.h"
19
#include "tutil.h"
20

21
#ifdef USE_ANALYTICS
22
#include <curl/curl.h>
23
#define ANAL_ALGO_SPLIT ","
24

25
typedef struct {
26
  int64_t       ver;
27
  SHashObj     *hash;  // algoname:algotype -> SAnalyticsUrl
28
  TdThreadMutex lock;
29
} SAlgoMgmt;
30

31
typedef struct {
32
  char   *data;
33
  int64_t dataLen;
34
} SCurlResp;
35

36
static SAlgoMgmt tsAlgos = {0};
37
static int32_t   taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
38

39
const char *taosAnalAlgoStr(EAnalAlgoType type) {
×
40
  switch (type) {
×
41
    case ANAL_ALGO_TYPE_ANOMALY_DETECT:
×
42
      return "anomaly-detection";
×
43
    case ANAL_ALGO_TYPE_FORECAST:
×
44
      return "forecast";
×
45
    default:
×
46
      return "unknown";
×
47
  }
48
}
49

50
const char *taosAnalAlgoUrlStr(EAnalAlgoType type) {
×
51
  switch (type) {
×
52
    case ANAL_ALGO_TYPE_ANOMALY_DETECT:
×
53
      return "anomaly-detect";
×
54
    case ANAL_ALGO_TYPE_FORECAST:
×
55
      return "forecast";
×
56
    default:
×
57
      return "unknown";
×
58
  }
59
}
60

61
EAnalAlgoType taosAnalAlgoInt(const char *name) {
×
62
  for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
×
63
    if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) {
×
64
      return i;
×
65
    }
66
  }
67

68
  return ANAL_ALGO_TYPE_END;
×
69
}
70

71
int32_t taosAnalyticsInit() {
2,434✔
72
  if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
2,434!
73
    uError("failed to init curl");
×
74
    return -1;
×
75
  }
76

77
  tsAlgos.ver = 0;
2,434✔
78
  if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) {
2,434!
79
    uError("failed to init algo mutex");
×
80
    return -1;
×
81
  }
82

83
  tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
2,434✔
84
  if (tsAlgos.hash == NULL) {
2,434!
85
    uError("failed to init algo hash");
×
86
    return -1;
×
87
  }
88

89
  uInfo("analysis env is initialized");
2,434!
90
  return 0;
2,434✔
91
}
92

93
static void taosAnalFreeHash(SHashObj *hash) {
2,434✔
94
  void *pIter = taosHashIterate(hash, NULL);
2,434✔
95
  while (pIter != NULL) {
2,434!
96
    SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
×
97
    taosMemoryFree(pUrl->url);
×
98
    pIter = taosHashIterate(hash, pIter);
×
99
  }
100
  taosHashCleanup(hash);
2,434✔
101
}
2,434✔
102

103
void taosAnalyticsCleanup() {
2,434✔
104
  curl_global_cleanup();
2,434✔
105
  if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) {
2,434!
106
    uError("failed to destroy anal lock");
×
107
  }
108
  taosAnalFreeHash(tsAlgos.hash);
2,434✔
109
  tsAlgos.hash = NULL;
2,434✔
110
  uInfo("analysis env is cleaned up");
2,434!
111
}
2,434✔
112

113
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {
×
114
  if (newVer > tsAlgos.ver) {
×
115
    if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
116
      SHashObj *hash = tsAlgos.hash;
×
117
      tsAlgos.ver = newVer;
×
118
      tsAlgos.hash = pHash;
×
119
      if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
120
        uError("failed to unlock hash")
×
121
      }
122
      taosAnalFreeHash(hash);
×
123
    }
124
  } else {
125
    taosAnalFreeHash(pHash);
×
126
  }
127
}
×
128

129
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
×
130
  char  buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
×
131
  char *pStart = NULL;
×
132
  char *pEnd = NULL;
×
133

134
  pStart = strstr(option, optName);
×
135
  if (pStart == NULL) {
×
136
    return false;
×
137
  }
138

139
  pEnd = strstr(pStart, ANAL_ALGO_SPLIT);
×
140
  if (optMaxLen > 0) {
×
141
    if (pEnd > pStart) {
×
142
      int32_t len = (int32_t)(pEnd - pStart);
×
143
      len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN);
×
144
      tstrncpy(buf, pStart, len);
×
145
    } else {
146
      int32_t len = MIN(tListLen(buf), strlen(pStart) + 1);
×
147
      tstrncpy(buf, pStart, len);
×
148
    }
149

150
    char *pRight = strstr(buf, "=");
×
151
    if (pRight == NULL) {
×
152
      return false;
×
153
    } else {
154
      pRight += 1;
×
155
    }
156

157
    int32_t unused = strtrim(pRight);
×
158

159
    int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1);
×
160
    tstrncpy(optValue, pRight, vLen);
×
161
  }
162

163
  return true;
×
164
}
165

166
bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) {
×
167
  char    buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
×
168
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
×
169

170
  char *pos1 = strstr(option, buf);
×
171
  char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
×
172
  if (pos1 != NULL) {
×
173
    *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10);
×
174
    return true;
×
175
  } else {
176
    return false;
×
177
  }
178
}
179

180
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
×
181
  int32_t code = 0;
×
182
  char    name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0};
×
183
  int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
×
184

185
  char *unused = strntolower(name, name, nameLen);
×
186

187
  if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
188
    SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
×
189
    if (pUrl != NULL) {
×
190
      tstrncpy(url, pUrl->url, urlLen);
×
191
      uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url);
×
192
    } else {
193
      url[0] = 0;
×
194
      terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND;
×
195
      code = terrno;
×
196
      uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type));
×
197
    }
198

199
    if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
200
      uError("failed to unlock hash");
×
201
      return TSDB_CODE_OUT_OF_MEMORY;
×
202
    }
203
  }
204

205
  return code;
×
206
}
207

208
int64_t taosAnalGetVersion() { return tsAlgos.ver; }
108,700✔
209

210
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
×
211
  SCurlResp *pRsp = userdata;
×
212
  if (contLen == 0 || nmemb == 0 || pCont == NULL) {
×
213
    pRsp->dataLen = 0;
×
214
    pRsp->data = NULL;
×
215
    uError("curl response is received, len:%" PRId64, pRsp->dataLen);
×
216
    return 0;
×
217
  }
218

219
  pRsp->dataLen = (int64_t)contLen * (int64_t)nmemb;
×
220
  pRsp->data = taosMemoryMalloc(pRsp->dataLen + 1);
×
221

222
  if (pRsp->data != NULL) {
×
223
    (void)memcpy(pRsp->data, pCont, pRsp->dataLen);
×
224
    pRsp->data[pRsp->dataLen] = 0;
×
225
    uDebugL("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data);
×
226
    return pRsp->dataLen;
×
227
  } else {
228
    pRsp->dataLen = 0;
×
229
    uError("failed to malloc curl response");
×
230
    return 0;
×
231
  }
232
}
233

234
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) {
×
235
  CURL    *curl = NULL;
×
236
  CURLcode code = 0;
×
237

238
  curl = curl_easy_init();
×
239
  if (curl == NULL) {
×
240
    uError("failed to create curl handle");
×
241
    return -1;
×
242
  }
243

244
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
245
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
246
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
247
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100) != 0) goto _OVER;
×
248

249
  uDebug("curl get request will sent, url:%s", url);
×
250
  code = curl_easy_perform(curl);
×
251
  if (code != CURLE_OK) {
×
252
    uError("failed to perform curl action, code:%d", code);
×
253
  }
254

255
_OVER:
×
256
  if (curl != NULL) curl_easy_cleanup(curl);
×
257
  return code;
×
258
}
259

260
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) {
×
261
  struct curl_slist *headers = NULL;
×
262
  CURL              *curl = NULL;
×
263
  CURLcode           code = 0;
×
264

265
  curl = curl_easy_init();
×
266
  if (curl == NULL) {
×
267
    uError("failed to create curl handle");
×
268
    return -1;
×
269
  }
270

271
  headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
×
272
  if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER;
×
273
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
274
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
275
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
276
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER;
×
277
  if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER;
×
278
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER;
×
279
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER;
×
280

281
  uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf);
×
282
  code = curl_easy_perform(curl);
×
283
  if (code != CURLE_OK) {
×
284
    uError("failed to perform curl action, code:%d", code);
×
285
  }
286

287
_OVER:
×
288
  if (curl != NULL) {
×
289
    curl_slist_free_all(headers);
×
290
    curl_easy_cleanup(curl);
×
291
  }
292
  return code;
×
293
}
294

295
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) {
×
296
  int32_t   code = -1;
×
297
  char     *pCont = NULL;
×
298
  int64_t   contentLen;
299
  SJson    *pJson = NULL;
×
300
  SCurlResp curlRsp = {0};
×
301

302
  if (type == ANALYTICS_HTTP_TYPE_GET) {
×
303
    if (taosCurlGetRequest(url, &curlRsp) != 0) {
×
304
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
305
      goto _OVER;
×
306
    }
307
  } else {
308
    code = taosAnalBufGetCont(pBuf, &pCont, &contentLen);
×
309
    if (code != 0) {
×
310
      terrno = code;
×
311
      goto _OVER;
×
312
    }
313
    if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) {
×
314
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
315
      goto _OVER;
×
316
    }
317
  }
318

319
  if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
×
320
    terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL;
×
321
    goto _OVER;
×
322
  }
323

324
  pJson = tjsonParse(curlRsp.data);
×
325
  if (pJson == NULL) {
×
326
    if (curlRsp.data[0] == '<') {
×
327
      terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
328
    } else {
329
      terrno = TSDB_CODE_INVALID_JSON_FORMAT;
×
330
    }
331
    goto _OVER;
×
332
  }
333

334
_OVER:
×
335
  if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
×
336
  if (pCont != NULL) taosMemoryFree(pCont);
×
337
  return pJson;
×
338
}
339

340
static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
×
341
  int32_t   code = 0;
×
342
  int64_t   contLen;
343
  char     *pCont = NULL;
×
344
  TdFilePtr pFile = NULL;
×
345

346
  pFile = taosOpenFile(fileName, TD_FILE_READ);
×
347
  if (pFile == NULL) {
×
348
    code = terrno;
×
349
    goto _OVER;
×
350
  }
351

352
  code = taosFStatFile(pFile, &contLen, NULL);
×
353
  if (code != 0) goto _OVER;
×
354

355
  pCont = taosMemoryMalloc(contLen + 1);
×
356
  if (pCont == NULL) {
×
357
    code = TSDB_CODE_OUT_OF_MEMORY;
×
358
    goto _OVER;
×
359
  }
360

361
  if (taosReadFile(pFile, pCont, contLen) != contLen) {
×
362
    code = terrno;
×
363
    goto _OVER;
×
364
  }
365

366
  pCont[contLen] = '\0';
×
367

368
_OVER:
×
369
  if (code == 0) {
×
370
    *ppCont = pCont;
×
371
    *pContLen = contLen;
×
372
  } else {
373
    if (pCont != NULL) taosMemoryFree(pCont);
×
374
  }
375
  if (pFile != NULL) taosCloseFile(&pFile);
×
376
  return code;
×
377
}
378

379
static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
380
  char    buf[64] = {0};
×
381
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
×
382
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
383
    return terrno;
×
384
  }
385
  return 0;
×
386
}
387

388
static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
389
  char    buf[128] = {0};
×
390
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
×
391
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
392
    return terrno;
×
393
  }
394
  return 0;
×
395
}
396

397
static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
398
  char    buf[128] = {0};
×
399
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
×
400
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
401
    return terrno;
×
402
  }
403
  return 0;
×
404
}
405

406
static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
×
407
  if (bufLen <= 0) {
×
408
    bufLen = strlen(buf);
×
409
  }
410
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
411
    return terrno;
×
412
  }
413
  return 0;
×
414
}
415

416
static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); }
×
417

418
static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
419
  pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
420
  if (pBuf->filePtr == NULL) {
×
421
    return terrno;
×
422
  }
423

424
  pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf));
×
425
  if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
×
426
  pBuf->numOfCols = numOfCols;
×
427

428
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
429
    return taosAnalJsonBufWriteStart(pBuf);
×
430
  }
431

432
  for (int32_t i = 0; i < numOfCols; ++i) {
×
433
    SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
434
    snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i);
×
435
    pCol->filePtr =
×
436
        taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
437
    if (pCol->filePtr == NULL) {
×
438
      return terrno;
×
439
    }
440
  }
441

442
  return taosAnalJsonBufWriteStart(pBuf);
×
443
}
444

445
static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
446
  char buf[128] = {0};
×
447
  bool first = (colIndex == 0);
×
448
  bool last = (colIndex == pBuf->numOfCols - 1);
×
449

450
  if (first) {
×
451
    if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
×
452
      return terrno;
×
453
    }
454
  }
455

456
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "  [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name,
×
457
                            tDataTypes[colType].bytes, last ? "" : ",");
458
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
459
    return terrno;
×
460
  }
461

462
  if (last) {
×
463
    if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
×
464
      return terrno;
×
465
    }
466
  }
467

468
  return 0;
×
469
}
470

471
static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
472
  return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
×
473
}
474

475
static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
×
476
  if (bufLen <= 0) {
×
477
    bufLen = strlen(buf);
×
478
  }
479

480
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
481
    if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
482
      return terrno;
×
483
    }
484
  } else {
485
    if (taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen) != bufLen) {
×
486
      return terrno;
×
487
    }
488
  }
489

490
  return 0;
×
491
}
492

493
static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
494
  return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
×
495
}
496

497
static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
498
  if (colIndex == pBuf->numOfCols - 1) {
×
499
    return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
×
500

501
  } else {
502
    return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
×
503
  }
504
}
505

506
static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
507
  char    buf[64];
508
  int32_t bufLen = 0;
×
509

510
  if (pBuf->pCols[colIndex].numOfRows != 0) {
×
511
    buf[bufLen] = ',';
×
512
    buf[bufLen + 1] = '\n';
×
513
    buf[bufLen + 2] = 0;
×
514
    bufLen += 2;
×
515
  }
516

517
  switch (colType) {
×
518
    case TSDB_DATA_TYPE_BOOL:
×
519
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0);
×
520
      break;
×
521
    case TSDB_DATA_TYPE_TINYINT:
×
522
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue);
×
523
      break;
×
524
    case TSDB_DATA_TYPE_UTINYINT:
×
525
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue);
×
526
      break;
×
527
    case TSDB_DATA_TYPE_SMALLINT:
×
528
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue);
×
529
      break;
×
530
    case TSDB_DATA_TYPE_USMALLINT:
×
531
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue);
×
532
      break;
×
533
    case TSDB_DATA_TYPE_INT:
×
534
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue);
×
535
      break;
×
536
    case TSDB_DATA_TYPE_UINT:
×
537
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue);
×
538
      break;
×
539
    case TSDB_DATA_TYPE_BIGINT:
×
540
    case TSDB_DATA_TYPE_TIMESTAMP:
541
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue);
×
542
      break;
×
543
    case TSDB_DATA_TYPE_UBIGINT:
×
544
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue);
×
545
      break;
×
546
    case TSDB_DATA_TYPE_FLOAT:
×
547
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue));
×
548
      break;
×
549
    case TSDB_DATA_TYPE_DOUBLE:
×
550
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue));
×
551
      break;
×
552
    default:
×
553
      buf[bufLen] = '\0';
×
554
  }
555

556
  pBuf->pCols[colIndex].numOfRows++;
×
557
  return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
×
558
}
559

560
static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
561
  int32_t code = 0;
×
562
  char   *pCont = NULL;
×
563
  int64_t contLen = 0;
×
564

565
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
566
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
567
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
568

569
      code = taosFsyncFile(pCol->filePtr);
×
570
      if (code != 0) return code;
×
571

572
      code = taosCloseFile(&pCol->filePtr);
×
573
      if (code != 0) return code;
×
574

575
      code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
×
576
      if (code != 0) return code;
×
577

578
      code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen);
×
579
      if (code != 0) return code;
×
580

581
      taosMemoryFreeClear(pCont);
×
582
      contLen = 0;
×
583
    }
584
  }
585

586
  return taosAnalJsonBufWriteStr(pBuf, "],\n", 0);
×
587
}
588

589
static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) {
×
590
  int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
×
591
  if (code != 0) return code;
×
592

593
  return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
×
594
}
595

596
int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
×
597
  int32_t code = taosAnalJsonBufWriteEnd(pBuf);
×
598
  if (code != 0) return code;
×
599

600
  if (pBuf->filePtr != NULL) {
×
601
    code = taosFsyncFile(pBuf->filePtr);
×
602
    if (code != 0) return code;
×
603
    code = taosCloseFile(&pBuf->filePtr);
×
604
    if (code != 0) return code;
×
605
  }
606

607
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
608
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
609
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
610
      if (pCol->filePtr != NULL) {
×
611
        code = taosFsyncFile(pCol->filePtr);
×
612
        if (code != 0) return code;
×
613
        code = taosCloseFile(&pCol->filePtr);
×
614
        if (code != 0) return code;
×
615
      }
616
    }
617
  }
618

619
  return 0;
×
620
}
621

622
void taosAnalBufDestroy(SAnalyticBuf *pBuf) {
×
623
  if (pBuf->fileName[0] != 0) {
×
624
    if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
×
625
    // taosRemoveFile(pBuf->fileName);
626
    pBuf->fileName[0] = 0;
×
627
  }
628

629
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
630
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
631
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
632
      if (pCol->fileName[0] != 0) {
×
633
        if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
×
634
        if (taosRemoveFile(pCol->fileName) != 0) {
×
635
          uError("failed to remove file %s", pCol->fileName);
×
636
        }
637
        pCol->fileName[0] = 0;
×
638
      }
639
    }
640
  }
641

642
  taosMemoryFreeClear(pBuf->pCols);
×
643
  pBuf->numOfCols = 0;
×
644
}
×
645

646
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
647
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
648
    return tsosAnalJsonBufOpen(pBuf, numOfCols);
×
649
  } else {
650
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
651
  }
652
}
653

654
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
655
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
656
    return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal);
×
657
  } else {
658
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
659
  }
660
}
661

662
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
663
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
664
    return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal);
×
665
  } else {
666
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
667
  }
668
}
669

670
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
671
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
672
    return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal);
×
673
  } else {
674
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
675
  }
676
}
677

678
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
679
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
680
    return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
×
681
  } else {
682
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
683
  }
684
}
685

686
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
687
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
688
    return taosAnalJsonBufWriteDataBegin(pBuf);
×
689
  } else {
690
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
691
  }
692
}
693

694
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
695
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
696
    return taosAnalJsonBufWriteColBegin(pBuf, colIndex);
×
697
  } else {
698
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
699
  }
700
}
701

702
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
703
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
704
    return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue);
×
705
  } else {
706
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
707
  }
708
}
709

710
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
711
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
712
    return taosAnalJsonBufWriteColEnd(pBuf, colIndex);
×
713
  } else {
714
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
715
  }
716
}
717

718
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
719
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
720
    return taosAnalJsonBufWriteDataEnd(pBuf);
×
721
  } else {
722
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
723
  }
724
}
725

726
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) {
×
727
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
728
    return taosAnalJsonBufClose(pBuf);
×
729
  } else {
730
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
731
  }
732
}
733

734
static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
×
735
  *ppCont = NULL;
×
736
  *pContLen = 0;
×
737

738
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
739
    return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
×
740
  } else {
741
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
742
  }
743
}
744

745
#else
746

747
int32_t taosAnalyticsInit() { return 0; }
748
void    taosAnalyticsCleanup() {}
749
SJson  *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; }
750

751
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
752
bool    taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
753
bool    taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; }
754
int64_t taosAnalGetVersion() { return 0; }
755
void    taosAnalUpdate(int64_t newVer, SHashObj *pHash) {}
756

757
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
758
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
759
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
760
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
761
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; }
762
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
763
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
764
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
765
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
766
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
767
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; }
768
void    taosAnalBufDestroy(SAnalyticBuf *pBuf) {}
769

770
const char   *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; }
771
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
772
const char   *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }
773

774
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc