• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 12592211147

03 Jan 2025 03:18AM UTC coverage: 75.271% (-0.06%) from 75.33%
12592211147

Pull #834

github

web-flow
Merge 9a6503594 into 2e83b2dae
Pull Request #834: enh: taosBenchmark remove stmt2 retry function

12096 of 16070 relevant lines covered (75.27%)

335669.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.98
/src/benchUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
char resEncodingChunk[] = "Encoding: chunked";
17
char succMessage[] = "succ";
18
char resHttp[] = "HTTP/1.1 ";
19
char resHttpOk[] = "HTTP/1.1 200 OK";
20
char influxHttpOk[] = "HTTP/1.1 204";
21
char opentsdbHttpOk[] = "HTTP/1.1 400";
22

23
FORCE_INLINE void* benchCalloc(size_t nmemb, size_t size, bool record) {
3,863,953✔
24
    void* ret = calloc(nmemb, size);
3,878,062✔
25
    if (NULL == ret) {
3,872,907✔
26
        errorPrint("%s", "failed to allocate memory\n");
×
27
        exit(EXIT_FAILURE);
×
28
    }
29
    if (record) {
3,878,062✔
30
        g_memoryUsage += nmemb * size;
155,275✔
31
    }
32
    return ret;
3,878,062✔
33
}
34

35
FORCE_INLINE void tmfclose(FILE *fp) {
183✔
36
    if (NULL != fp) {
6,432✔
37
        fclose(fp);
6,432✔
38
        fp = NULL;
6,432✔
39
    }
40
}
6,249✔
41

42
FORCE_INLINE void tmfree(void *buf) {
3,857,104✔
43
    if (NULL != buf) {
3,910,631✔
44
        free(buf);
3,903,554✔
45
    }
46
}
55,688✔
47

48
FORCE_INLINE bool isRest(int32_t iface) { 
723✔
49
    return REST_IFACE == iface || SML_REST_IFACE == iface;
723✔
50
}
51

52
void ERROR_EXIT(const char *msg) {
×
53
    errorPrint("%s", msg);
×
54
    exit(EXIT_FAILURE);
×
55
}
56

57
#ifdef WINDOWS
58
HANDLE g_stdoutHandle;
59
DWORD  g_consoleMode;
60

61
void setupForAnsiEscape(void) {
62
    DWORD mode = 0;
63
    g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
64

65
    if (g_stdoutHandle == INVALID_HANDLE_VALUE) {
66
        exit(GetLastError());
67
    }
68

69
    if (!GetConsoleMode(g_stdoutHandle, &mode)) {
70
        exit(GetLastError());
71
    }
72

73
    g_consoleMode = mode;
74

75
    // Enable ANSI escape codes
76
    mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
77

78
    if (!SetConsoleMode(g_stdoutHandle, mode)) {
79
        exit(GetLastError());
80
    }
81
}
82

83
void resetAfterAnsiEscape(void) {
84
    // Reset colors
85
    printf("\x1b[0m");
86

87
    // Reset console mode
88
    if (!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
89
        exit(GetLastError());
90
    }
91
}
92

93
unsigned int taosRandom() {
94
    unsigned int number;
95
    rand_s(&number);
96

97
    return number;
98
}
99
#else  // Not windows
100
void setupForAnsiEscape(void) {}
×
101

102
void resetAfterAnsiEscape(void) {
×
103
    // Reset colors
104
    printf("\x1b[0m");
×
105
}
×
106

107
FORCE_INLINE unsigned int taosRandom() { return (unsigned int)rand(); }
249,991,922✔
108
#endif
109

110
void swapItem(char** names, int32_t i, int32_t j ) {
40✔
111
    debugPrint("swap item i=%d (%s) j=%d (%s)\n", i, names[i], j, names[j]);
40✔
112
    char * p = names[i];
40✔
113
    names[i] = names[j];
40✔
114
    names[j] = p;
40✔
115
}
40✔
116

117
int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
12✔
118
        char ** childTblNameOfSuperTbl,
119
        int64_t childTblCountOfSuperTbl) {
120
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
12✔
121
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
12✔
122
             "select distinct tbname from %s.`%s` limit %" PRId64 "",
123
            dbName, stbName, childTblCountOfSuperTbl);
124
    TAOS_RES *res = taos_query(taos, cmd);
12✔
125
    int32_t   code = taos_errno(res);
12✔
126
    int64_t   count = 0;
12✔
127
    if (code) {
12✔
128
        printErrCmdCodeStr(cmd, code, res);
129
        return -1;
×
130
    }
131
    TAOS_ROW row = NULL;
12✔
132
    while ((row = taos_fetch_row(res)) != NULL) {
102✔
133
        if (0 == strlen((char *)(row[0]))) {
90✔
134
            errorPrint("No.%" PRId64 " table return empty name\n",
×
135
                    count);
136
            return -1;
×
137
        }
138
        int32_t * lengths = taos_fetch_lengths(res);
90✔
139
        childTblNameOfSuperTbl[count] =
90✔
140
            benchCalloc(1, TSDB_TABLE_NAME_LEN + 3, true);
90✔
141
        childTblNameOfSuperTbl[count][0] = '`';
90✔
142
        strncpy(childTblNameOfSuperTbl[count] + 1, row[0], lengths[0]);
90✔
143
        childTblNameOfSuperTbl[count][lengths[0] + 1] = '`';
90✔
144
        childTblNameOfSuperTbl[count][lengths[0] + 2] = '\0';
90✔
145
        debugPrint("childTblNameOfSuperTbl[%" PRId64 "]: %s\n", count,
90✔
146
                childTblNameOfSuperTbl[count]);
147
        count++;
90✔
148
    }
149
    taos_free_result(res);
12✔
150

151
    // random swap order
152
    if (count < 4) {
12✔
153
        return 0;
4✔
154
    }
155

156
    int32_t swapCnt = count/2;
8✔
157
    for(int32_t i = 0; i < swapCnt; i++ ) {
48✔
158
        int32_t j = swapCnt + RD(swapCnt);
80✔
159
        swapItem(childTblNameOfSuperTbl, i, j);
40✔
160
    }
161
    return 0;
8✔
162
}
163

164
int convertHostToServAddr(char *host, uint16_t port,
271✔
165
        struct sockaddr_in *serv_addr) {
166
    if (!host) {
271✔
167
        errorPrint("%s", "convertHostToServAddr host is null.");
×
168
        return -1;
×
169
    }
170
    debugPrint("convertHostToServAddr(host: %s, port: %d)\n", host,
271✔
171
            port);
172
#ifdef WINDOWS
173
    WSADATA wsaData;
174
    int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
175
    if (ret) {
176
        return ret;
177
    }
178
#endif
179
    struct hostent *server = gethostbyname(host);
271✔
180
    if ((server == NULL) || (server->h_addr == NULL)) {
271✔
181
        errorPrint("%s", "no such host");
×
182
        return -1;
×
183
    }
184
    memset(serv_addr, 0, sizeof(struct sockaddr_in));
271✔
185
    serv_addr->sin_family = AF_INET;
271✔
186
    serv_addr->sin_port = htons(port);
271✔
187

188
#ifdef WINDOWS
189
    struct addrinfo  hints = {0};
190
    hints.ai_family = AF_INET;
191
    hints.ai_socktype = SOCK_STREAM;
192

193
    struct addrinfo *pai = NULL;
194

195
    if (!getaddrinfo(server->h_name, NULL, &hints, &pai)) {
196
        serv_addr->sin_addr.s_addr =
197
               ((struct sockaddr_in *) pai->ai_addr)->sin_addr.s_addr;
198
        freeaddrinfo(pai);
199
    }
200
    WSACleanup();
201
#else
202
    serv_addr->sin_addr.s_addr = inet_addr(host);
271✔
203
    memcpy(&(serv_addr->sin_addr.s_addr), server->h_addr, server->h_length);
271✔
204
#endif
205
    return 0;
271✔
206
}
207

208
void prompt(bool nonStopMode) {
618✔
209
    if (!g_arguments->answer_yes) {
618✔
210
        g_arguments->in_prompt = true;
7✔
211
        if (nonStopMode) {
7✔
212
            printf(
×
213
                    "\n\n         Current is the Non-Stop insertion mode. "
214
                    "benchmark will continuously "
215
                    "insert data unless you press "
216
                    "Ctrl-C to end it.\n\n         "
217
                    "press enter key to continue and "
218
                    "Ctrl-C to "
219
                    "stop\n\n");
220
            (void)getchar();
×
221
        } else {
222
            printf(
7✔
223
                    "\n\n         Press enter key to continue or Ctrl-C to "
224
                    "stop\n\n");
225
            (void)getchar();
7✔
226
        }
227
        g_arguments->in_prompt = false;
7✔
228
    }
229
}
618✔
230

231
static void appendResultBufToFile(char *resultBuf, char * filePath) {
6,248✔
232
    FILE* fp = fopen(filePath, "at");
6,248✔
233
    if (fp == NULL) {
6,246✔
234
        errorPrint(
×
235
                "failed to open result file: %s, result will not save "
236
                "to file\n", filePath);
237
        return;
×
238
    }
239
    fprintf(fp, "%s", resultBuf);
6,246✔
240
    tmfclose(fp);
241
}
242

243
void replaceChildTblName(char *inSql, char *outSql, int tblIndex) {
6,074✔
244
    char sourceString[32] = "xxxx";
6,074✔
245
    char *pos = strstr(inSql, sourceString);
6,074✔
246
    if (0 == pos) return;
6,074✔
247

248
    char subTblName[TSDB_TABLE_NAME_LEN];
249
    snprintf(subTblName, TSDB_TABLE_NAME_LEN,
6,074✔
250
            "%s.%s", g_queryInfo.dbName,
251
            g_queryInfo.superQueryInfo.childTblName[tblIndex]);
6,074✔
252

253
    tstrncpy(outSql, inSql, pos - inSql + 1);
6,074✔
254
    snprintf(outSql + strlen(outSql), TSDB_MAX_ALLOWED_SQL_LEN -1,
6,074✔
255
             "%s%s", subTblName, pos + strlen(sourceString));
6,074✔
256
}
257

258
int64_t toolsGetTimestamp(int32_t precision) {
72✔
259
    if (precision == TSDB_TIME_PRECISION_MICRO) {
72✔
260
        return toolsGetTimestampUs();
×
261
    } else if (precision == TSDB_TIME_PRECISION_NANO) {
72✔
262
        return toolsGetTimestampNs();
×
263
    } else {
264
        return toolsGetTimestampMs();
72✔
265
    }
266
}
267

268
int regexMatch(const char *s, const char *reg, int cflags) {
193✔
269
    regex_t regex;
270
    char    msgbuf[100] = {0};
193✔
271

272
    /* Compile regular expression */
273
    if (regcomp(&regex, reg, cflags) != 0)
193✔
274
        ERROR_EXIT("Failed to regex compile\n");
×
275

276
    /* Execute regular expression */
277
    int reti = regexec(&regex, s, 0, NULL, 0);
193✔
278
    if (!reti) {
193✔
279
        regfree(&regex);
3✔
280
        return 1;
3✔
281
    } else if (reti == REG_NOMATCH) {
190✔
282
        regfree(&regex);
190✔
283
        return 0;
190✔
284
    } else {
285
        regerror(reti, &regex, msgbuf, sizeof(msgbuf));
×
286
        regfree(&regex);
×
287
        printf("Regex match failed: %s\n", msgbuf);
×
288
        exit(EXIT_FAILURE);
×
289
    }
290
    return 0;
291
}
292

293

294

295

296
SBenchConn* initBenchConnImpl() {
1,705✔
297
    SBenchConn* conn = benchCalloc(1, sizeof(SBenchConn), true);
1,705✔
298
#ifdef WEBSOCKET
299
    if (g_arguments->websocket) {
1,705✔
300
        conn->taos_ws = ws_connect(g_arguments->dsn);
32✔
301
        char maskedDsn[256] = "\0";
32✔
302
        memcpy(maskedDsn, g_arguments->dsn, 20);
32✔
303
        memcpy(maskedDsn+20, "...", 3);
32✔
304
        memcpy(maskedDsn+23,
32✔
305
               g_arguments->dsn + strlen(g_arguments->dsn)-10, 10);
32✔
306
        if (conn->taos_ws == NULL) {
32✔
307
            errorPrint("failed to connect %s, reason: %s\n",
4✔
308
                    maskedDsn, ws_errstr(NULL));
309
            tmfree(conn);
310
            return NULL;
4✔
311
        }
312

313
        succPrint("%s conneced\n", maskedDsn);
28✔
314
    } else {
315
#endif
316
        conn->taos = taos_connect(g_arguments->host,
3,346✔
317
                g_arguments->user, g_arguments->password,
1,673✔
318
                NULL, g_arguments->port);
1,673✔
319
        if (conn->taos == NULL) {
1,673✔
320
            errorPrint("failed to connect native %s:%d, "
12✔
321
                       "code: 0x%08x, reason: %s\n",
322
                    g_arguments->host, g_arguments->port,
323
                    taos_errno(NULL), taos_errstr(NULL));
324
            tmfree(conn);
325
            return NULL;
12✔
326
        }
327

328
        conn->ctaos = taos_connect(g_arguments->host,
1,661✔
329
                                   g_arguments->user,
1,661✔
330
                                   g_arguments->password,
1,661✔
331
                                   NULL, g_arguments->port);
1,661✔
332
#ifdef WEBSOCKET
333
    }
334
#endif
335
    return conn;
1,689✔
336
}
337

338
SBenchConn* initBenchConn() {
1,693✔
339

340
    SBenchConn* conn = NULL;
1,693✔
341
    int32_t keep_trying = 0;
1,693✔
342
    while(1) {
343
        conn = initBenchConnImpl();
1,705✔
344
        if(conn || ++keep_trying > g_arguments->keep_trying  || g_arguments->terminate) {
1,705✔
345
            break;
346
        }
347

348
        infoPrint("sleep %dms and try to connect... %d  \n", g_arguments->trying_interval, keep_trying);
12✔
349
        if(g_arguments->trying_interval > 0) {
12✔
350
            toolsMsleep(g_arguments->trying_interval);
×
351
        }        
352
    } 
353

354
    return conn;
1,693✔
355
}
356

357
void closeBenchConn(SBenchConn* conn) {
1,689✔
358
    if(conn == NULL)
1,689✔
359
       return ;
×
360
#ifdef WEBSOCKET
361
    if (g_arguments->websocket) {
1,689✔
362
        ws_close(conn->taos_ws);
28✔
363
    } else {
364
#endif
365
        if(conn->taos) {
1,661✔
366
            taos_close(conn->taos);
1,661✔
367
            conn->taos = NULL;
1,661✔
368
        }
369
        if (conn->ctaos) {
1,661✔
370
            taos_close(conn->ctaos);
1,661✔
371
            conn->ctaos = NULL;
1,661✔
372
        }
373
#ifdef WEBSOCKET
374
    }
375
#endif
376
    tmfree(conn);
377
}
378

379
int32_t queryDbExecRest(char *command, char* dbName, int precision,
21✔
380
                    int iface, int protocol, bool tcp, int sockfd) {
381
    int32_t code = postProceSql(command,
21✔
382
                         dbName,
383
                         precision,
384
                         iface,
385
                         protocol,
386
                         g_arguments->port,
21✔
387
                         tcp,
388
                         sockfd,
389
                         NULL);
390
    return code;
21✔
391
}
392

393
int32_t queryDbExecCall(SBenchConn *conn, char *command) {
63,614✔
394
    int32_t code = 0;
63,614✔
395
#ifdef WEBSOCKET
396
    if (g_arguments->websocket) {
63,614✔
397
        WS_RES* res = ws_query_timeout(conn->taos_ws,
1,028✔
398
                                       command, g_arguments->timeout);
1,028✔
399
        code = ws_errno(res);
1,029✔
400
        if (code != 0) {
1,029✔
401
            errorPrint("Failed to execute <%s>, code: 0x%08x, reason: %s\n",
×
402
                       command, code, ws_errstr(res));
403
        }
404
        ws_free_result(res);
1,029✔
405
    } else {
406
#endif
407
        TAOS_RES *res = taos_query(conn->taos, command);
62,586✔
408
        code = taos_errno(res);
62,593✔
409
        if (code) {
62,569✔
410
            printErrCmdCodeStr(command, code, res);
411
        } else {
412
            taos_free_result(res);
62,513✔
413
        }
414
#ifdef WEBSOCKET
415
    }
416
#endif
417
    return code;
63,624✔
418
}
419

420
void encodeAuthBase64() {
186✔
421
    char        userpass_buf[INPUT_BUF_LEN];
422
    static char base64[] = {
423
        'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
424
        'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
425
        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
426
        'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
427
        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'};
428
    snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s", g_arguments->user,
186✔
429
            g_arguments->password);
186✔
430

431
    int mod_table[] = {0, 2, 1};
186✔
432

433
    size_t userpass_buf_len = strlen(userpass_buf);
186✔
434
    size_t encoded_len = 4 * ((userpass_buf_len + 2) / 3);
186✔
435

436
    memset(g_arguments->base64_buf, 0, INPUT_BUF_LEN);
186✔
437
    for (int n = 0, m = 0; n < userpass_buf_len;) {
1,121✔
438
        uint32_t oct_a =
935✔
439
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
935✔
440
        uint32_t oct_b =
935✔
441
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
935✔
442
        uint32_t oct_c =
935✔
443
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
935✔
444
        uint32_t triple = (oct_a << 0x10) + (oct_b << 0x08) + oct_c;
935✔
445

446
        g_arguments->base64_buf[m++] = base64[(triple >> 3 * 6) & 0x3f];
935✔
447
        g_arguments->base64_buf[m++] = base64[(triple >> 2 * 6) & 0x3f];
935✔
448
        g_arguments->base64_buf[m++] = base64[(triple >> 1 * 6) & 0x3f];
935✔
449
        g_arguments->base64_buf[m++] = base64[(triple >> 0 * 6) & 0x3f];
935✔
450
    }
451

452
    for (int l = 0; l < mod_table[userpass_buf_len % 3]; l++)
557✔
453
        g_arguments->base64_buf[encoded_len - 1 - l] = '=';
371✔
454
}
186✔
455

456
int postProceSqlImpl(char *sqlstr, char* dbName, int precision, int iface,
369✔
457
                     int protocol, uint16_t rest_port, bool tcp, int sockfd,
458
                     char* filePath,
459
                     char *responseBuf, int64_t response_length) {
460
    int32_t      code = -1;
369✔
461
    char *       req_fmt =
369✔
462
        "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: "
463
        "Basic %s\r\nContent-Length: %d\r\nContent-Type: "
464
        "application/x-www-form-urlencoded\r\n\r\n%s";
465
    char url[URL_BUFF_LEN] = {0};
369✔
466
    if (iface == REST_IFACE) {
369✔
467
        snprintf(url, URL_BUFF_LEN, "/rest/sql/%s", dbName);
209✔
468
    } else if (iface == SML_REST_IFACE
160✔
469
            && protocol == TSDB_SML_LINE_PROTOCOL) {
161✔
470
        snprintf(url, URL_BUFF_LEN,
50✔
471
                 "/influxdb/v1/write?db=%s&precision=%s", dbName,
472
                precision == TSDB_TIME_PRECISION_MILLI
473
                ? "ms"
474
                : precision == TSDB_TIME_PRECISION_NANO
475
                ? "ns"
476
                : "u");
×
477
    } else if (iface == SML_REST_IFACE
110✔
478
            && protocol == TSDB_SML_TELNET_PROTOCOL) {
111✔
479
        snprintf(url, URL_BUFF_LEN, "/opentsdb/v1/put/telnet/%s", dbName);
65✔
480
    } else if (iface == SML_REST_IFACE
45✔
481
            && (protocol == TSDB_SML_JSON_PROTOCOL
46✔
482
                || protocol == SML_JSON_TAOS_FORMAT)) {
1✔
483
        snprintf(url, URL_BUFF_LEN, "/opentsdb/v1/put/json/%s", dbName);
46✔
484
    }
485

486
    int      bytes, sent, received, req_str_len, resp_len;
487
    char *   request_buf = NULL;
369✔
488
    int req_buf_len = (int)strlen(sqlstr) + REQ_EXTRA_BUF_LEN;
369✔
489

490
    if (g_arguments->terminate) {
369✔
491
        goto free_of_postImpl;
×
492
    }
493
    request_buf = benchCalloc(1, req_buf_len, false);
369✔
494

495
    int r;
496
    if (protocol == TSDB_SML_TELNET_PROTOCOL && tcp) {
369✔
497
        r = snprintf(request_buf, req_buf_len, "%s", sqlstr);
32✔
498
    } else {
499
        r = snprintf(request_buf, req_buf_len, req_fmt, url, g_arguments->host,
337✔
500
                rest_port, g_arguments->base64_buf, strlen(sqlstr),
337✔
501
                sqlstr);
502
    }
503
    if (r >= req_buf_len) {
369✔
504
        free(request_buf);
×
505
        ERROR_EXIT("too long request");
×
506
    }
507

508
    req_str_len = (int)strlen(request_buf);
370✔
509
    debugPrint("request buffer: %s\n", request_buf);
370✔
510
    sent = 0;
370✔
511
    do {
512
        bytes = send(sockfd, request_buf + sent,
739✔
513
                req_str_len - sent, 0);
370✔
514
        if (bytes < 0) {
369✔
515
            errorPrint("%s", "writing no message to socket\n");
×
516
            goto free_of_postImpl;
×
517
        }
518
        if (bytes == 0) break;
369✔
519
        sent += bytes;
369✔
520
    } while ((sent < req_str_len) && !g_arguments->terminate);
369✔
521

522
    if (protocol == TSDB_SML_TELNET_PROTOCOL
369✔
523
            && iface == SML_REST_IFACE && tcp) {
65✔
524
        code = 0;
32✔
525
        goto free_of_postImpl;
32✔
526
    }
527

528
    resp_len = response_length - 1;
337✔
529
    received = 0;
337✔
530

531
    bool chunked = false;
337✔
532

533
    if (g_arguments->terminate) {
337✔
534
        goto free_of_postImpl;
×
535
    }
536
    do {
537
        bytes = recv(sockfd, responseBuf + received,
949✔
538
                resp_len - received, 0);
473✔
539
        responseBuf[resp_len] = 0;
476✔
540
        debugPrint("response buffer: %s\n", responseBuf);
476✔
541
        if (NULL != strstr(responseBuf, resEncodingChunk)) {
476✔
542
            chunked = true;
242✔
543
        }
544
        int64_t index = strlen(responseBuf) - 1;
476✔
545
        while (responseBuf[index] == '\n' || responseBuf[index] == '\r') {
2,002✔
546
            index--;
1,526✔
547
        }
548
        debugPrint("index: %" PRId64 "\n", index);
476✔
549
        if (chunked && responseBuf[index] == '0') {
474✔
550
            code = 0;
53✔
551
            break;
53✔
552
        }
553
        if (!chunked && responseBuf[index] == '}') {
421✔
554
            code = 0;
×
555
            break;
×
556
        }
557

558
        if (bytes <= 0) {
421✔
559
            errorPrint("%s", "reading no response from socket\n");
×
560
            goto free_of_postImpl;
×
561
        }
562

563
        received += bytes;
421✔
564

565
        if (g_arguments->test_mode == INSERT_TEST) {
421✔
566
            if (strlen(responseBuf)) {
388✔
567
                if (((NULL != strstr(responseBuf, resEncodingChunk)) &&
388✔
568
                            (NULL != strstr(responseBuf, resHttp))) ||
154✔
569
                        ((NULL != strstr(responseBuf, resHttpOk)) ||
234✔
570
                         (NULL != strstr(responseBuf, influxHttpOk)) ||
234✔
571
                         (NULL != strstr(responseBuf, opentsdbHttpOk)))) {
105✔
572
                    break;
573
                }
574
            }
575
        }
576
    } while ((received < resp_len) && !g_arguments->terminate);
138✔
577

578
    if (received == resp_len) {
338✔
579
        errorPrint("%s", "storing complete response from socket\n");
×
580
        goto free_of_postImpl;
×
581
    }
582

583
    if (NULL == strstr(responseBuf, resHttpOk) &&
338✔
584
            NULL == strstr(responseBuf, influxHttpOk) &&
129✔
585
            NULL == strstr(responseBuf, succMessage) &&
×
586
            NULL == strstr(responseBuf, opentsdbHttpOk)) {
×
587
        errorPrint("Response:\n%s\n", responseBuf);
×
588
        goto free_of_postImpl;
×
589
    }
590

591
    code = 0;
338✔
592
free_of_postImpl:
370✔
593
    if (filePath && strlen(filePath) > 0 && !g_arguments->terminate) {
370✔
594
        appendResultBufToFile(responseBuf, filePath);
36✔
595
    }
596
    tmfree(request_buf);
597
    return code;
370✔
598
}
599

600
static int getServerVersionRestImpl(int16_t rest_port, int sockfd) {
18✔
601
    int server_ver = -1;
18✔
602
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
18✔
603
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT SERVER_VERSION()");
18✔
604
    char *responseBuf = benchCalloc(1, RESP_BUF_LEN, false);
18✔
605
    int code = postProceSqlImpl(command,
18✔
606
                                NULL,
607
                                0,
608
                                REST_IFACE,
609
                                0,
610
                                rest_port,
611
                                false,
612
                                sockfd,
613
                                NULL, responseBuf, RESP_BUF_LEN);
614
    if (code != 0) {
18✔
615
        errorPrint("Failed to execute command: %s\n", command);
×
616
        goto free_of_getversion;
×
617
    }
618
    debugPrint("response buffer: %s\n", responseBuf);
18✔
619
    if (NULL != strstr(responseBuf, resHttpOk)) {
18✔
620
        char* start = strstr(responseBuf, "{");
18✔
621
        if (start == NULL) {
18✔
622
            errorPrint("Invalid response format: %s\n", responseBuf);
×
623
            goto free_of_getversion;
×
624
        }
625
        tools_cJSON* resObj = tools_cJSON_Parse(start);
18✔
626
        if (resObj == NULL) {
18✔
627
            errorPrint("Cannot parse response into json: %s\n", start);
×
628
        }
629
        tools_cJSON* dataObj = tools_cJSON_GetObjectItem(resObj, "data");
18✔
630
        if (!tools_cJSON_IsArray(dataObj)) {
18✔
631
            char* pstr = tools_cJSON_Print(resObj);
×
632
            errorPrint("Invalid or miss 'data' key in json: %s\n", pstr ? pstr : "null");
×
633
            tmfree(pstr);
634
            tools_cJSON_Delete(resObj);
×
635
            goto free_of_getversion;
×
636
        }
637
        tools_cJSON *versionObj = tools_cJSON_GetArrayItem(dataObj, 0);
18✔
638
        tools_cJSON *versionStrObj = tools_cJSON_GetArrayItem(versionObj, 0);
18✔
639
        server_ver = atoi(versionStrObj->valuestring);
18✔
640
        char* pstr = tools_cJSON_Print(versionStrObj);        
18✔
641
        debugPrint("versionStrObj: %s, version: %s, server_ver: %d\n",
18✔
642
                   pstr ? pstr : "null",
643
                   versionStrObj->valuestring, server_ver);
644
        tmfree(pstr);
645
        tools_cJSON_Delete(resObj);
18✔
646
    }
647
free_of_getversion:
×
648
    free(responseBuf);
18✔
649
    return server_ver;
18✔
650
}
651

652
int getServerVersionRest(int16_t rest_port) {
18✔
653
    int sockfd = createSockFd();
18✔
654
    if (sockfd < 0) {
18✔
655
        return -1;
×
656
    }
657

658
    int server_version = getServerVersionRestImpl(rest_port, sockfd);
18✔
659

660
    destroySockFd(sockfd);
18✔
661
    return server_version;
18✔
662
}
663

664
static int getCodeFromResp(char *responseBuf) {
191✔
665
    int code = -1;
191✔
666
    char* start = strstr(responseBuf, "{");
191✔
667
    if (start == NULL) {
191✔
668
        errorPrint("Invalid response format: %s\n", responseBuf);
×
669
        return -1;
×
670
    }
671
    tools_cJSON* resObj = tools_cJSON_Parse(start);
191✔
672
    if (resObj == NULL) {
190✔
673
        errorPrint("Cannot parse response into json: %s\n", start);
×
674
        return -1;
×
675
    }
676
    tools_cJSON* codeObj = tools_cJSON_GetObjectItem(resObj, "code");
190✔
677
    if (!tools_cJSON_IsNumber(codeObj)) {
189✔
678
        char* pstr = tools_cJSON_Print(resObj);
×
679
        errorPrint("Invalid or miss 'code' key in json: %s\n", pstr ? pstr : "null");
×
680
        tmfree(pstr);
681
        tools_cJSON_Delete(resObj);
×
682
        return -1;
×
683
    }
684

685
    code = codeObj->valueint;
189✔
686

687
    if (codeObj->valueint != 0) {
189✔
688
        tools_cJSON* desc = tools_cJSON_GetObjectItem(resObj, "desc");
×
689
        if (!tools_cJSON_IsString(desc)) {
×
690
            char* pstr = tools_cJSON_Print(resObj);
×
691
            errorPrint("Invalid or miss 'desc' key in json: %s\n", pstr ? pstr : "null");
×
692
            tmfree(pstr);
693
            return -1;
×
694
        }
695
        errorPrint("response, code: %d, reason: %s\n",
×
696
                   (int)codeObj->valueint, desc->valuestring);
697
    }
698

699
    tools_cJSON_Delete(resObj);
189✔
700
    return code;
191✔
701
}
702

703
int postProceSql(char *sqlstr, char* dbName, int precision, int iface,
351✔
704
                 int protocol, uint16_t rest_port,
705
                 bool tcp, int sockfd, char* filePath) {
706
    uint64_t response_length;
707
    if (g_arguments->test_mode == INSERT_TEST) {
351✔
708
        response_length = RESP_BUF_LEN;
316✔
709
    } else {
710
        response_length = g_queryInfo.response_buffer;
35✔
711
    }
712

713
    char *responseBuf = benchCalloc(1, response_length, false);
351✔
714
    int code = postProceSqlImpl(sqlstr, dbName, precision, iface, protocol,
351✔
715
                                rest_port,
716
                                tcp, sockfd, filePath, responseBuf,
717
                                response_length);
718
    // compatibility 2.6
719
    if (-1 == g_arguments->rest_server_ver_major) {
352✔
720
        // confirm version is 2.x according to "succ"
721
        if (NULL != strstr(responseBuf, succMessage) && iface == REST_IFACE) {
36✔
722
            g_arguments->rest_server_ver_major = 2;
×
723
        }
724
    }
725

726
    if (NULL != strstr(responseBuf, resHttpOk) && iface == REST_IFACE) {
352✔
727
        // if taosd is not starting , rest_server_ver_major can't be got by 'select server_version()' , so is -1
728
        if (-1 == g_arguments->rest_server_ver_major || 3 <= g_arguments->rest_server_ver_major) {
190✔
729
            code = getCodeFromResp(responseBuf);
190✔
730
        } else {
731
            code = 0;
×
732
        }
733
        goto free_of_post;
191✔
734
    }
735

736
    if (2 == g_arguments->rest_server_ver_major) {
162✔
737
        if (NULL != strstr(responseBuf, succMessage) && iface == REST_IFACE) {
×
738
            code = getCodeFromResp(responseBuf);
×
739
        } else {
740
            code = 0;
×
741
        }
742
        goto free_of_post;
×
743
    }
744

745
    if (NULL != strstr(responseBuf, influxHttpOk) &&
162✔
746
            protocol == TSDB_SML_LINE_PROTOCOL && iface == SML_REST_IFACE) {
50✔
747
        code = 0;
50✔
748
        goto free_of_post;
50✔
749
    }
750

751
    if (NULL != strstr(responseBuf, opentsdbHttpOk)
112✔
752
            && (protocol == TSDB_SML_TELNET_PROTOCOL
×
753
            || protocol == TSDB_SML_JSON_PROTOCOL
×
754
            || protocol == SML_JSON_TAOS_FORMAT)
×
755
            && iface == SML_REST_IFACE) {
×
756
        code = 0;
×
757
        goto free_of_post;
×
758
    }
759

760
    if (g_arguments->test_mode == INSERT_TEST) {
112✔
761
        debugPrint("Response: \n%s\n", responseBuf);
111✔
762
        char* start = strstr(responseBuf, "{");
111✔
763
        if ((start == NULL)
111✔
764
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
111✔
765
                && (TSDB_SML_JSON_PROTOCOL != protocol)
46✔
766
                && (SML_JSON_TAOS_FORMAT != protocol)
1✔
767
                ) {
768
            errorPrint("Invalid response format: %s\n", responseBuf);
×
769
            goto free_of_post;
×
770
        }
771
        tools_cJSON* resObj = tools_cJSON_Parse(start);
111✔
772
        if ((resObj == NULL)
111✔
773
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
111✔
774
                && (TSDB_SML_JSON_PROTOCOL != protocol)
46✔
775
                && (SML_JSON_TAOS_FORMAT != protocol)
1✔
776
                ) {
777
            errorPrint("Cannot parse response into json: %s\n", start);
×
778
        }
779
        tools_cJSON* codeObj = tools_cJSON_GetObjectItem(resObj, "code");
111✔
780
        if ((!tools_cJSON_IsNumber(codeObj))
111✔
781
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
111✔
782
                && (TSDB_SML_JSON_PROTOCOL != protocol)
46✔
783
                && (SML_JSON_TAOS_FORMAT != protocol)
1✔
784
                ) {
785
            char* pstr = tools_cJSON_Print(resObj);
×
786
            errorPrint("Invalid or miss 'code' key in json: %s\n", pstr ? pstr : "null");
×
787
            tmfree(pstr);
788
            tools_cJSON_Delete(resObj);
×
789
            goto free_of_post;
×
790
        }
791

792
        if ((SML_REST_IFACE == iface) && codeObj
111✔
793
                && (200 == codeObj->valueint)) {
×
794
            code = 0;
×
795
            tools_cJSON_Delete(resObj);
×
796
            goto free_of_post;
×
797
        }
798

799
        if ((iface == SML_REST_IFACE)
111✔
800
                && (protocol == TSDB_SML_LINE_PROTOCOL)
111✔
801
                && codeObj
×
802
                && (codeObj->valueint != 0) && (codeObj->valueint != 200)) {
×
803
            tools_cJSON* desc = tools_cJSON_GetObjectItem(resObj, "desc");
×
804
            if (!tools_cJSON_IsString(desc)) {
×
805
                char* pstr = tools_cJSON_Print(resObj);
×
806
                errorPrint("Invalid or miss 'desc' key in json: %s\n", pstr ? pstr : "null");
×
807
                tmfree(pstr);
808
            } else {
809
                errorPrint("insert mode response, code: %d, reason: %s\n",
×
810
                       (int)codeObj->valueint, desc->valuestring);
811
            }
812
        } else {
813
            code = 0;
111✔
814
        }
815
        tools_cJSON_Delete(resObj);
111✔
816
    }
817
free_of_post:
1✔
818
    free(responseBuf);
352✔
819
    return code;
352✔
820
}
821

822
// fetch result fo file or nothing
823
int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
6,218✔
824
    TAOS_ROW    row        = NULL;
6,218✔
825
    int         num_fields = 0;
6,218✔
826
    int64_t     totalLen   = 0;
6,218✔
827
    TAOS_FIELD *fields     = 0;
6,218✔
828
    int64_t     rows       = 0;
6,218✔
829
    char       *databuf    = NULL;
6,218✔
830
    bool        toFile     = strlen(pThreadInfo->filePath) > 0;
6,218✔
831
    
832

833
    if(toFile) {
6,218✔
834
        num_fields = taos_field_count(res);
6,212✔
835
        fields     = taos_fetch_fields(res);
6,213✔
836
        databuf    = (char *)benchCalloc(1, FETCH_BUFFER_SIZE, true);
6,213✔
837
    }
838

839
    // fetch the records row by row
840
    while ((row = taos_fetch_row(res))) {
18,316✔
841
        if (toFile) {
12,097✔
842
            if (totalLen >= (FETCH_BUFFER_SIZE - HEAD_BUFF_LEN * 2)) {
12,137✔
843
                // buff is full
844
                appendResultBufToFile(databuf, pThreadInfo->filePath);
×
845
                totalLen = 0;
×
846
                memset(databuf, 0, FETCH_BUFFER_SIZE);
×
847
            }
848

849
            // format row
850
            char temp[HEAD_BUFF_LEN] = {0};
12,137✔
851
            int  len = taos_print_row(temp, row, fields, num_fields);
12,137✔
852
            len += snprintf(temp + len, HEAD_BUFF_LEN - len, "\n");
12,137✔
853
            //debugPrint("query result:%s\n", temp);
854
            memcpy(databuf + totalLen, temp, len);
12,137✔
855
            totalLen += len;
12,137✔
856
        }
857
        rows ++;
12,097✔
858
        //if not toFile , only loop call taos_fetch_row
859
    }
860

861
    // end
862
    if (toFile) {
6,247✔
863
        appendResultBufToFile(databuf, pThreadInfo->filePath);
6,203✔
864
        free(databuf);
6,213✔
865
    }
866
    return rows;
6,257✔
867
}
868

869
char *convertDatatypeToString(int type) {
2,385✔
870
    switch (type) {
2,385✔
871
        case TSDB_DATA_TYPE_BINARY:
413✔
872
            return "binary";
413✔
873
        case TSDB_DATA_TYPE_NCHAR:
41✔
874
            return "nchar";
41✔
875
        case TSDB_DATA_TYPE_TIMESTAMP:
40✔
876
            return "timestamp";
40✔
877
        case TSDB_DATA_TYPE_TINYINT:
349✔
878
            return "tinyint";
349✔
879
        case TSDB_DATA_TYPE_UTINYINT:
42✔
880
            return "tinyint unsigned";
42✔
881
        case TSDB_DATA_TYPE_SMALLINT:
275✔
882
            return "smallint";
275✔
883
        case TSDB_DATA_TYPE_USMALLINT:
42✔
884
            return "smallint unsigned";
42✔
885
        case TSDB_DATA_TYPE_INT:
449✔
886
            return "int";
449✔
887
        case TSDB_DATA_TYPE_UINT:
53✔
888
            return "int unsigned";
53✔
889
        case TSDB_DATA_TYPE_BIGINT:
275✔
890
            return "bigint";
275✔
891
        case TSDB_DATA_TYPE_UBIGINT:
42✔
892
            return "bigint unsigned";
42✔
893
        case TSDB_DATA_TYPE_BOOL:
45✔
894
            return "bool";
45✔
895
        case TSDB_DATA_TYPE_FLOAT:
152✔
896
            return "float";
152✔
897
        case TSDB_DATA_TYPE_DOUBLE:
168✔
898
            return "double";
168✔
899
        case TSDB_DATA_TYPE_JSON:
×
900
            return "json";    
×
901
        case TSDB_DATA_TYPE_VARBINARY:
×
902
            return "varbinary";
×
903
        case TSDB_DATA_TYPE_GEOMETRY:
×
904
            return "geometry";
×
905
        default:
×
906
            break;
×
907
    }
908
    return "unknown type";
×
909
}
910

911
int convertTypeToLength(uint8_t type) {
1,411✔
912
    int ret = 0;
1,411✔
913
    switch (type) {
1,411✔
914
        case TSDB_DATA_TYPE_TIMESTAMP:
239✔
915
        case TSDB_DATA_TYPE_UBIGINT:
916
        case TSDB_DATA_TYPE_BIGINT:
917
            ret = sizeof(int64_t);
239✔
918
            break;
239✔
919
        case TSDB_DATA_TYPE_BOOL:
367✔
920
        case TSDB_DATA_TYPE_TINYINT:
921
        case TSDB_DATA_TYPE_UTINYINT:
922
            ret = sizeof(int8_t);
367✔
923
            break;
367✔
924
        case TSDB_DATA_TYPE_SMALLINT:
194✔
925
        case TSDB_DATA_TYPE_USMALLINT:
926
            ret = sizeof(int16_t);
194✔
927
            break;
194✔
928
        case TSDB_DATA_TYPE_INT:
296✔
929
        case TSDB_DATA_TYPE_UINT:
930
            ret = sizeof(int32_t);
296✔
931
            break;
296✔
932
        case TSDB_DATA_TYPE_FLOAT:
144✔
933
            ret = sizeof(float);
144✔
934
            break;
144✔
935
        case TSDB_DATA_TYPE_DOUBLE:
142✔
936
            ret = sizeof(double);
142✔
937
            break;
142✔
938
        case TSDB_DATA_TYPE_JSON:
1✔
939
            ret = JSON_FIXED_LENGTH;
1✔
940
            break;
1✔
941
        default:
28✔
942
            break;
28✔
943
    }
944
    return ret;
1,411✔
945
}
946

947
int64_t convertDatatypeToDefaultMin(uint8_t type) {
1,596✔
948
    int64_t ret = 0;
1,596✔
949
    switch (type) {
1,596✔
950
        case TSDB_DATA_TYPE_BOOL:
110✔
951
        case TSDB_DATA_TYPE_GEOMETRY:
952
            ret = 0;
110✔
953
            break;
110✔
954
        case TSDB_DATA_TYPE_TINYINT:
148✔
955
            ret = -127;
148✔
956
            break;
148✔
957
        case TSDB_DATA_TYPE_SMALLINT:
102✔
958
            ret = -32767;
102✔
959
            break;
102✔
960
        case TSDB_DATA_TYPE_INT:
550✔
961
        case TSDB_DATA_TYPE_BIGINT:
962
        case TSDB_DATA_TYPE_FLOAT:
963
        case TSDB_DATA_TYPE_DOUBLE:
964
            ret = -1 * (RAND_MAX >> 1);
550✔
965
            break;
550✔
966
        default:
686✔
967
            break;
686✔
968
    }
969
    return ret;
1,596✔
970
}
971

972
int64_t convertDatatypeToDefaultMax(uint8_t type) {
1,603✔
973
    int64_t ret = 0;
1,603✔
974
    switch (type) {
1,603✔
975
        case TSDB_DATA_TYPE_BOOL:
110✔
976
            ret = 1;
110✔
977
            break;
110✔
978
        case TSDB_DATA_TYPE_TINYINT:
148✔
979
            ret = 128;
148✔
980
            break;
148✔
981
        case TSDB_DATA_TYPE_UTINYINT:
71✔
982
            ret = 254;
71✔
983
            break;
71✔
984
        case TSDB_DATA_TYPE_SMALLINT:
102✔
985
        case TSDB_DATA_TYPE_GEOMETRY:
986
            ret = 32767;
102✔
987
            break;
102✔
988
        case TSDB_DATA_TYPE_USMALLINT:
70✔
989
            ret = 65534;
70✔
990
            break;
70✔
991
        case TSDB_DATA_TYPE_INT:
557✔
992
        case TSDB_DATA_TYPE_BIGINT:
993
        case TSDB_DATA_TYPE_FLOAT:
994
        case TSDB_DATA_TYPE_DOUBLE:
995
            ret = RAND_MAX >> 1;
557✔
996
            break;
557✔
997
        case TSDB_DATA_TYPE_UINT:
188✔
998
        case TSDB_DATA_TYPE_UBIGINT:
999
        case TSDB_DATA_TYPE_TIMESTAMP:
1000
            ret = RAND_MAX;
188✔
1001
            break;
188✔
1002
        default:
357✔
1003
            break;
357✔
1004
    }
1005
    return ret;
1,603✔
1006
}
1007

1008
// compare str with length
1009
int32_t strCompareN(char *str1, char *str2, int length) {
13,169✔
1010
    if (length == 0) {
13,169✔
1011
        return strcasecmp(str1, str2);
13,091✔
1012
    } else {
1013
        return strncasecmp(str1, str2, length);
78✔
1014
    }
1015
}
1016

1017
int convertStringToDatatype(char *type, int length) {
1,737✔
1018
    // compare with length
1019
    if (0 == strCompareN(type, "binary", length)) {
1,737✔
1020
        return TSDB_DATA_TYPE_BINARY;
208✔
1021
    } else if (0 == strCompareN(type, "nchar", length)) {
1,529✔
1022
        return TSDB_DATA_TYPE_NCHAR;
110✔
1023
    } else if (0 == strCompareN(type, "timestamp", length)) {
1,419✔
1024
        return TSDB_DATA_TYPE_TIMESTAMP;
45✔
1025
    } else if (0 == strCompareN(type, "bool", length)) {
1,374✔
1026
        return TSDB_DATA_TYPE_BOOL;
116✔
1027
    } else if (0 == strCompareN(type, "tinyint", length)) {
1,258✔
1028
        return TSDB_DATA_TYPE_TINYINT;
170✔
1029
    } else if (0 == strCompareN(type, "utinyint", length)) {
1,088✔
1030
        return TSDB_DATA_TYPE_UTINYINT;
81✔
1031
    } else if (0 == strCompareN(type, "smallint", length)) {
1,007✔
1032
        return TSDB_DATA_TYPE_SMALLINT;
113✔
1033
    } else if (0 == strCompareN(type, "usmallint", length)) {
894✔
1034
        return TSDB_DATA_TYPE_USMALLINT;
81✔
1035
    } else if (0 == strCompareN(type, "int", length)) {
813✔
1036
        return TSDB_DATA_TYPE_INT;
218✔
1037
    } else if (0 == strCompareN(type, "uint", length)) {
595✔
1038
        return TSDB_DATA_TYPE_UINT;
84✔
1039
    } else if (0 == strCompareN(type, "bigint", length)) {
511✔
1040
        return TSDB_DATA_TYPE_BIGINT;
113✔
1041
    } else if (0 == strCompareN(type, "ubigint", length)) {
398✔
1042
        return TSDB_DATA_TYPE_UBIGINT;
81✔
1043
    } else if (0 == strCompareN(type, "float", length)) {
317✔
1044
        return TSDB_DATA_TYPE_FLOAT;
146✔
1045
    } else if (0 == strCompareN(type, "double", length)) {
171✔
1046
        return TSDB_DATA_TYPE_DOUBLE;
142✔
1047
    } else if (0 == strCompareN(type, "json", length)) {
29✔
1048
        return TSDB_DATA_TYPE_JSON;
2✔
1049
    } else if (0 == strCompareN(type, "varchar", length)) {
27✔
1050
        return TSDB_DATA_TYPE_BINARY;
26✔
1051
    } else if (0 == strCompareN(type, "varbinary", length)) {
1✔
1052
        return TSDB_DATA_TYPE_VARBINARY;
×
1053
    } else if (0 == strCompareN(type, "geometry", length)) {
1✔
1054
        return TSDB_DATA_TYPE_GEOMETRY;
×
1055
    } else {
1056
        errorPrint("unknown data type: %s\n", type);
1✔
1057
        exit(EXIT_FAILURE);
1✔
1058
    }
1059
}
1060

1061

1062
int compare(const void *a, const void *b) {
508,371✔
1063
    return *(int64_t *)a - *(int64_t *)b;
508,371✔
1064
}
1065

1066
//
1067
// --------------------  BArray operator -------------------
1068
//
1069

1070
BArray* benchArrayInit(size_t size, size_t elemSize) {
2,390✔
1071
    assert(elemSize > 0);
2,390✔
1072

1073
    if (size < BARRAY_MIN_SIZE) {
2,390✔
1074
        size = BARRAY_MIN_SIZE;
2,374✔
1075
    }
1076

1077
    BArray* pArray = (BArray *)benchCalloc(1, sizeof(BArray), true);
2,390✔
1078

1079
    pArray->size = 0;
2,390✔
1080
    pArray->pData = benchCalloc(size, elemSize, true);
2,390✔
1081

1082
    pArray->capacity = size;
2,390✔
1083
    pArray->elemSize = elemSize;
2,390✔
1084
    return pArray;
2,390✔
1085
}
1086

1087
static int32_t benchArrayEnsureCap(BArray* pArray, size_t newCap) {
49,913✔
1088
    if (newCap > pArray->capacity) {
49,913✔
1089
        size_t tsize = (pArray->capacity << 1u);
703✔
1090
        while (newCap > tsize) {
764✔
1091
            tsize = (tsize << 1u);
61✔
1092
        }
1093

1094
        void* pData = realloc(pArray->pData, tsize * pArray->elemSize);
703✔
1095
        if (pData == NULL) {
703✔
1096
            return -1;
×
1097
        }
1098
        pArray->pData = pData;
703✔
1099
        pArray->capacity = tsize;
703✔
1100
    }
1101
    return 0;
49,913✔
1102
}
1103

1104
void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems, bool free) {
49,941✔
1105
    if (pData == NULL) {
49,941✔
1106
        return NULL;
×
1107
    }
1108

1109
    if (benchArrayEnsureCap(pArray, pArray->size + elems) != 0) {
49,941✔
1110
        return NULL;
×
1111
    }
1112

1113
    void* dst = BARRAY_GET_ELEM(pArray, pArray->size);
49,932✔
1114
    memcpy(dst, pData, pArray->elemSize * elems);
49,932✔
1115
    if (free) {
49,932✔
1116
        tmfree(pData); // TODO remove this
1117
    }
1118
    pArray->size += elems;
49,932✔
1119
    return dst;
49,932✔
1120
}
1121

1122
FORCE_INLINE void* benchArrayPush(BArray* pArray, void* pData) {
49,242✔
1123
    return benchArrayAddBatch(pArray, pData, 1, true);
49,242✔
1124
}
1125

1126
void* benchArrayDestroy(BArray* pArray) {
2,313✔
1127
    if (pArray) {
2,313✔
1128
        tmfree(pArray->pData);
1,707✔
1129
        tmfree(pArray);
1130
    }
1131
    return NULL;
2,313✔
1132
}
1133

1134
void benchArrayClear(BArray* pArray) {
345✔
1135
    if (pArray == NULL) return;
345✔
1136
    pArray->size = 0;
345✔
1137
}
1138

1139
void* benchArrayGet(const BArray* pArray, size_t index) {
35,000,385✔
1140
    if (index >= pArray->size) {
35,000,385✔
1141
        errorPrint("benchArrayGet index(%zu) greater than BArray size(%zu)\n",
×
1142
                   index, pArray->size);
1143
        exit(EXIT_FAILURE);
×
1144
    }
1145
    return BARRAY_GET_ELEM(pArray, index);
35,000,385✔
1146
}
1147

1148
bool searchBArray(BArray *pArray, const char *field_name, int32_t name_len, uint8_t field_type) {
7✔
1149
    if (pArray == NULL || field_name == NULL) {
7✔
1150
        return false;
×
1151
    }
1152
    for (int i = 0; i < pArray->size; i++) {
11✔
1153
        Field *field = benchArrayGet(pArray, i);
11✔
1154
        if (strlen(field->name) == name_len && strncasecmp(field->name, field_name, name_len) == 0) {
11✔
1155
            if (field->type == field_type) {
7✔
1156
                return true;
7✔
1157
            }
1158
            return false;
×
1159
        }
1160
    }
1161
    return false;
×
1162
}
1163

1164
//
1165
// malloc a new and copy data from array
1166
// return value must call benchArrayDestroy to free
1167
//
1168
BArray * copyBArray(BArray *pArray) {
67✔
1169
    BArray * pNew = benchArrayInit(pArray->size, pArray->elemSize);
67✔
1170
    benchArrayAddBatch(pNew, pArray->pData, pArray->size, false);
67✔
1171
    return pNew;
67✔
1172
}
1173

1174
//
1175
//  ---------------- others ------------------------
1176
//
1177

1178
#ifdef LINUX
1179
int32_t bsem_wait(sem_t* sem) {
189✔
1180
    int ret = 0;
189✔
1181
    do {
1182
        ret = sem_wait(sem);
189✔
1183
    } while (ret != 0 && errno  == EINTR);
1✔
1184
    return ret;
1✔
1185
}
1186

1187
void benchSetSignal(int32_t signum, ToolsSignalHandler sigfp) {
189✔
1188
    struct sigaction act;
1189
    memset(&act, 0, sizeof(act));
189✔
1190
    act.sa_flags = SA_SIGINFO | SA_RESTART;
189✔
1191
    act.sa_sigaction = (void (*)(int, siginfo_t *, void *)) sigfp;
189✔
1192
    sigaction(signum, &act, NULL);
189✔
1193
}
189✔
1194
#endif
1195

1196
int convertServAddr(int iface, bool tcp, int protocol) {
268✔
1197
    if (tcp
268✔
1198
            && iface == SML_REST_IFACE
3✔
1199
            && protocol == TSDB_SML_TELNET_PROTOCOL) {
3✔
1200
        // telnet_tcp_port        
1201
        if (convertHostToServAddr(g_arguments->host,
3✔
1202
                    g_arguments->telnet_tcp_port,
3✔
1203
                    &(g_arguments->serv_addr))) {
3✔
1204
            errorPrint("%s\n", "convert host to server address");
×
1205
            return -1;
×
1206
        }
1207
        infoPrint("convertServAddr host=%s telnet_tcp_port:%d to serv_addr=%p iface=%d \n", 
3✔
1208
                g_arguments->host, g_arguments->telnet_tcp_port, &g_arguments->serv_addr, iface);
1209
    } else {
1210
        int port = g_arguments->port_inputted ? g_arguments->port:DEFAULT_REST_PORT;
265✔
1211
        if (convertHostToServAddr(g_arguments->host,
265✔
1212
                                    port,
1213
                    &(g_arguments->serv_addr))) {
265✔
1214
            errorPrint("%s\n", "convert host to server address");
×
1215
            return -1;
×
1216
        }
1217
        infoPrint("convertServAddr host=%s port:%d to serv_addr=%p iface=%d \n", 
265✔
1218
                g_arguments->host, port, &g_arguments->serv_addr, iface);
1219
    }
1220
    return 0;
268✔
1221
}
1222

1223
static void errorPrintSocketMsg(char *msg, int result) {
×
1224
#ifdef WINDOWS
1225
    errorPrint("%s: %d\n", msg, WSAGetLastError());
1226
#else
1227
    errorPrint("%s: %d\n", msg, result);
×
1228
#endif
1229
}
×
1230

1231
int createSockFd() {
114✔
1232
#ifdef WINDOWS
1233
    WSADATA wsaData;
1234
    WSAStartup(MAKEWORD(2, 2), &wsaData);
1235
    SOCKET sockfd;
1236
#else
1237
    int sockfd;
1238
#endif
1239
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
114✔
1240
    if (sockfd < 0) {
114✔
1241
        errorPrintSocketMsg("Could not create socket : ", sockfd);
×
1242
        return -1;
×
1243
    }
1244

1245
    int retConn = connect(
114✔
1246
            sockfd, (struct sockaddr *)&(g_arguments->serv_addr),
114✔
1247
            sizeof(struct sockaddr));
1248
    infoPrint("createSockFd call connect serv_addr=%p retConn=%d\n", &g_arguments->serv_addr, retConn);
114✔
1249
    if (retConn < 0) {
114✔
1250
        errorPrint("%s\n", "failed to connect");
×
1251
#ifdef WINDOWS
1252
        closesocket(sockfd);
1253
        WSACleanup();
1254
#else
1255
        close(sockfd);
×
1256
#endif
1257
        return -1;
×
1258
    }
1259
    return sockfd;
114✔
1260
}
1261

1262
static void closeSockFd(int sockfd) {
47✔
1263
#ifdef WINDOWS
1264
    closesocket(sockfd);
1265
    WSACleanup();
1266
#else
1267
    close(sockfd);
47✔
1268
#endif
1269
}
47✔
1270

1271
void destroySockFd(int sockfd) {
47✔
1272
    // check valid
1273
    if (sockfd < 0) {
47✔
1274
        return;
×
1275
    }
1276

1277
    // shutdown the connection since no more data will be sent
1278
    int result;
1279
    result = shutdown(sockfd, SHUT_WR);
47✔
1280
    if (SOCKET_ERROR == result) {
47✔
1281
        errorPrintSocketMsg("Socket shutdown failed with error: ", result);
×
1282
        closeSockFd(sockfd);
×
1283
        return;
×
1284
    }
1285
    // Receive until the peer closes the connection
1286
    do {
1287
        int recvbuflen = LARGE_BUFF_LEN;
82✔
1288
        char recvbuf[LARGE_BUFF_LEN];
1289
        result = recv(sockfd, recvbuf, recvbuflen, 0);
82✔
1290
        if ( result > 0 ) {
82✔
1291
            debugPrint("Socket bytes received: %d\n", result);
35✔
1292
        } else if (result == 0) {
47✔
1293
            infoPrint("Connection closed with result %d\n", result);
47✔
1294
        } else {
1295
            errorPrintSocketMsg("Socket recv failed with error: ", result);
×
1296
        }
1297
    } while (result > 0);
82✔
1298

1299
    closeSockFd(sockfd);
47✔
1300
}
1301

1302
FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) {    
3✔
1303
    char buff[512];
1304
    char *msg = cmd;
59✔
1305
    if (strlen(cmd) > sizeof(msg)) {
59✔
1306
        memcpy(buff, cmd, 500);
59✔
1307
        buff[500] = 0;
59✔
1308
        strcat(buff, "...");
59✔
1309
        msg = buff;
59✔
1310
    }
1311
    errorPrint("failed to run error code: 0x%08x, reason: %s command %s\n",
59✔
1312
               code, taos_errstr(res), msg);
1313
    taos_free_result(res);
59✔
1314
}
56✔
1315

1316
int32_t benchGetTotalMemory(int64_t *totalKB) {
×
1317
#ifdef WINDOWS
1318
  MEMORYSTATUSEX memsStat;
1319
  memsStat.dwLength = sizeof(memsStat);
1320
  if (!GlobalMemoryStatusEx(&memsStat)) {
1321
    return -1;
1322
  }
1323

1324
  *totalKB = memsStat.ullTotalPhys / 1024;
1325
  return 0;
1326
#elif defined(_TD_DARWIN_64)
1327
  *totalKB = 0;
1328
  return 0;
1329
#else
1330
  int64_t tsPageSizeKB = sysconf(_SC_PAGESIZE) / 1024;
×
1331
  *totalKB = (int64_t)(sysconf(_SC_PHYS_PAGES) * tsPageSizeKB);
×
1332
  return 0;
×
1333
#endif
1334
}
1335

1336
// geneate question mark string , using insert into ... values(?,?,?...)
1337
// return value must call tmfree to free memory
1338
char* genQMark( int32_t QCnt) {
197✔
1339
    char * buf = benchCalloc(4, QCnt, false);
197✔
1340
    for (int32_t i = 0; i < QCnt; i++) {
1,761✔
1341
        if (i == 0)
1,564✔
1342
            strcat(buf, "?");
186✔
1343
        else
1344
            strcat(buf, ",?");
1,378✔
1345
    }
1346
    return buf;
197✔
1347
}
1348

1349
//
1350
//  STMT2  
1351
//
1352

1353
// create
1354
TAOS_STMT2_BINDV* createBindV(int32_t capacity, int32_t tagCnt, int32_t colCnt) {
386✔
1355
    // calc total size
1356
    int32_t tableSize = sizeof(char *) + sizeof(TAOS_STMT2_BIND *) + sizeof(TAOS_STMT2_BIND *) + 
386✔
1357
                        sizeof(TAOS_STMT2_BIND) * tagCnt + sizeof(TAOS_STMT2_BIND) * colCnt;
386✔
1358
    int32_t size = sizeof(TAOS_STMT2_BINDV) + tableSize * capacity;
386✔
1359
    TAOS_STMT2_BINDV *bindv = benchCalloc(1, size, false);
386✔
1360
    resetBindV(bindv, capacity, tagCnt, colCnt);
386✔
1361

1362
    return bindv;
386✔
1363
}
1364

1365
// reset tags and cols poitner
1366
void resetBindV(TAOS_STMT2_BINDV *bindv, int32_t capacity, int32_t tagCnt, int32_t colCnt) {
386✔
1367
    unsigned char *p = (unsigned char *)bindv;
386✔
1368
    // tbnames
1369
    p += sizeof(TAOS_STMT2_BINDV); // skip BINDV
386✔
1370
    bindv->tbnames = (char **)p;
386✔
1371
    // tags
1372
    if(tagCnt == 0 ) {
386✔
1373
        bindv->tags = NULL;
386✔
1374
    } else {
1375
        p += sizeof(char *) * capacity; // skip tbnames
×
1376
        bindv->tags = (TAOS_STMT2_BIND **)p;
×
1377
    }
1378
    // bind_cols
1379
    p += sizeof(TAOS_STMT2_BIND *) * capacity; // skip tags
386✔
1380
    bindv->bind_cols = (TAOS_STMT2_BIND **)p;
386✔
1381
    p += sizeof(TAOS_STMT2_BIND *) * capacity; // skip cols
386✔
1382

1383
    int32_t i;
1384
    // tags body
1385
    if (tagCnt > 0) {
386✔
1386
        for (i = 0; i < capacity; i++) {
×
1387
            bindv->tags[i] = (TAOS_STMT2_BIND *)p;
×
1388
            p += sizeof(TAOS_STMT2_BIND) * tagCnt; // skip tag bodys
×
1389
        }
1390
    }
1391
    // bind_cols body
1392
    for (i = 0; i < capacity; i++) {
772✔
1393
        bindv->bind_cols[i] = (TAOS_STMT2_BIND*)p;
386✔
1394
        p += sizeof(TAOS_STMT2_BIND) * colCnt; // skip cols bodys
386✔
1395
    }
1396
}
386✔
1397

1398
// clear bindv
1399
void clearBindV(TAOS_STMT2_BINDV *bindv) {
×
1400
    if (bindv == NULL)
×
1401
        return ;
×
1402
    for(int32_t i = 0; i < bindv->count; i++) {
×
1403
        bindv->tags[i]      = NULL;
×
1404
        bindv->bind_cols[i] = NULL;
×
1405
    }
1406
    bindv->count = 0;
×
1407
}
1408

1409
// free
1410
void freeBindV(TAOS_STMT2_BINDV *bindv) {
454✔
1411
    tmfree(bindv);
1412
}
454✔
1413

1414
//
1415
//   debug show 
1416
//
1417

1418
void showBind(TAOS_STMT2_BIND* bind) {
×
1419
    // loop each column
1420
    int32_t pos = 0;
×
1421
    char* buff  = bind->buffer;
×
1422
    for(int32_t n=0; n<bind->num; n++) {
×
1423
        switch (bind->buffer_type) {
×
1424
        case TSDB_DATA_TYPE_TIMESTAMP:
×
1425
            debugPrint("   n=%d value=%" PRId64 "\n", n, *(int64_t *)(buff + pos));
×
1426
            pos += sizeof(int64_t);
×
1427
            break;
×
1428
        case TSDB_DATA_TYPE_FLOAT:
×
1429
            debugPrint("   n=%d value=%f\n", n, *(float *)(buff + pos));
×
1430
            pos += sizeof(float);
×
1431
            break;
×
1432
        case TSDB_DATA_TYPE_INT:
×
1433
            debugPrint("   n=%d value=%d\n", n, *(int32_t *)(buff + pos));
×
1434
            pos += sizeof(int32_t);
×
1435
            break;
×
1436
        default:
×
1437
            break;
×
1438
        } 
1439
    }
1440

1441
}
×
1442

1443
void showTableBinds(char* label, TAOS_STMT2_BIND* binds, int32_t cnt) {
×
1444
    for (int32_t j = 0; j < cnt; j++) {
×
1445
        if(binds == NULL) {
×
1446
            debugPrint("  %d %s is NULL \n", j, label);
×
1447
        } else {
1448
            debugPrint("  %d %s type=%d buffer=%p \n", j, label, binds[j].buffer_type, binds[j].buffer);
×
1449
            showBind(&binds[j]);
×
1450
        }
1451
    }
1452
}
×
1453

1454
// show bindv
1455
void showBindV(TAOS_STMT2_BINDV *bindv, BArray *tags, BArray *cols) {
×
1456
    // num and base info
1457
    debugPrint("show bindv table count=%d names=%p tags=%p bind_cols=%p\n", 
×
1458
                bindv->count, bindv->tbnames, bindv->tags, bindv->bind_cols);
1459
    
1460
    for(int32_t i=0; i< bindv->count; i++) {
×
1461
        debugPrint(" show bindv table index=%d name=%s \n", i, bindv->tbnames[i]);
×
1462
        if(bindv->tags)
×
1463
            showTableBinds("tag",    bindv->tags[i],      tags->size);
×
1464
        if(bindv->bind_cols)    
×
1465
            showTableBinds("column", bindv->bind_cols[i], cols->size + 1);
×
1466
    }
1467
}
×
1468

1469
// engine util/src/thashutil.c
1470
uint32_t MurmurHash3_32(const char *key, uint32_t len);
1471
// get group index about dbname.tbname
1472
int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt) {
294✔
1473
    // check valid
1474
    if (dbName == NULL || tbName == NULL) {
294✔
1475
        return -1;
×
1476
    }
1477
    char key[1024];
1478
    snprintf(key, sizeof(key), "1.%s.%s", dbName, tbName);
294✔
1479
    uint32_t hash = MurmurHash3_32(key, strlen(key));
294✔
1480
    uint32_t step = UINT32_MAX / groupCnt;
294✔
1481
    for (int32_t i = 0; i < groupCnt; i++) {
686✔
1482
        if (hash < (i + 1) * step)
686✔
1483
        {
1484
            return i;
294✔
1485
        }
1486
    }
1487
    return groupCnt - 1;
×
1488
}
1489

1490
// windows no export MurmurHash3_32 function from engine
1491
#ifdef WINDOWS
1492
// define
1493
#define ROTL32(x, r) ((x) << (r) | (x) >> (32u - (r)))
1494
#define FMIX32(h)      \
1495
  do {                 \
1496
    (h) ^= (h) >> 16;  \
1497
    (h) *= 0x85ebca6b; \
1498
    (h) ^= (h) >> 13;  \
1499
    (h) *= 0xc2b2ae35; \
1500
    (h) ^= (h) >> 16;  \
1501
  } while (0)
1502

1503
// impl MurmurHash3_32
1504
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
1505
  const uint8_t *data = (const uint8_t *)key;
1506
  const int32_t  nblocks = len >> 2u;
1507

1508
  uint32_t h1 = 0x12345678;
1509

1510
  const uint32_t c1 = 0xcc9e2d51;
1511
  const uint32_t c2 = 0x1b873593;
1512

1513
  const uint32_t *blocks = (const uint32_t *)(data + nblocks * 4);
1514

1515
  for (int32_t i = -nblocks; i; i++) {
1516
    uint32_t k1 = blocks[i];
1517

1518
    k1 *= c1;
1519
    k1 = ROTL32(k1, 15u);
1520
    k1 *= c2;
1521

1522
    h1 ^= k1;
1523
    h1 = ROTL32(h1, 13u);
1524
    h1 = h1 * 5 + 0xe6546b64;
1525
  }
1526

1527
  const uint8_t *tail = (data + nblocks * 4);
1528

1529
  uint32_t k1 = 0;
1530

1531
  switch (len & 3u) {
1532
    case 3:
1533
      k1 ^= tail[2] << 16;
1534
    case 2:
1535
      k1 ^= tail[1] << 8;
1536
    case 1:
1537
      k1 ^= tail[0];
1538
      k1 *= c1;
1539
      k1 = ROTL32(k1, 15u);
1540
      k1 *= c2;
1541
      h1 ^= k1;
1542
  };
1543

1544
  h1 ^= len;
1545

1546
  FMIX32(h1);
1547

1548
  return h1;
1549
}
1550
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc