• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 5041216934

pending completion
5041216934

push

github

GitHub
Merge pull request #657 from taosdata/fix/TD-24314

6 of 6 new or added lines in 1 file covered. (100.0%)

10915 of 13598 relevant lines covered (80.27%)

226908.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.36
/src/benchInsert.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include <benchData.h>
15
#include <benchInsertMix.h>
16

17
#define FREE_PIDS_INFOS_RETURN_MINUS_1()            \
18
    do {                                            \
19
        tmfree(pids);                               \
20
        tmfree(infos);                              \
21
        return -1;                                  \
22
    } while (0)
23

24
#define FREE_RESOURCE()                             \
25
    do {                                            \
26
        if (pThreadInfo->conn)                      \
27
            closeBenchConn(pThreadInfo->conn);      \
28
        benchArrayDestroy(pThreadInfo->delayList);  \
29
        tmfree(pids);                               \
30
        tmfree(infos);                              \
31
    } while (0)                                     \
32

33
static int getSuperTableFromServerRest(
3✔
34
    SDataBase* database, SSuperTable* stbInfo, char *command) {
35

36
    return -1;
3✔
37
    // TODO(me): finish full implementation
38
#if 0
39
    int sockfd = createSockFd();
40
    if (sockfd < 0) {
41
        return -1;
42
    }
43

44
    int code = postProceSql(command,
45
                         database->dbName,
46
                         database->precision,
47
                         REST_IFACE,
48
                         0,
49
                         g_arguments->port,
50
                         false,
51
                         sockfd,
52
                         NULL);
53

54
    destroySockFd(sockfd);
55
#endif   // 0
56
}
57

58
static int getSuperTableFromServerTaosc(
131✔
59
    SDataBase* database, SSuperTable* stbInfo, char *command) {
60
#ifdef WEBSOCKET
61
    if (g_arguments->websocket) {
131✔
62
        return -1;
×
63
    }
64
#endif
65
    TAOS_RES *   res;
66
    TAOS_ROW     row = NULL;
131✔
67
    SBenchConn* conn = initBenchConn();
131✔
68
    if (NULL == conn) {
131✔
69
        return -1;
×
70
    }
71

72
    res = taos_query(conn->taos, command);
131✔
73
    int32_t code = taos_errno(res);
131✔
74
    if (code != 0) {
131✔
75
        printWarnCmdCodeStr(command, code, res);
129✔
76
        infoPrint("stable %s does not exist, will create one\n",
129✔
77
                  stbInfo->stbName);
78
        closeBenchConn(conn);
129✔
79
        return -1;
129✔
80
    }
81
    infoPrint("find stable<%s>, will get meta data from server\n",
2✔
82
              stbInfo->stbName);
83
    benchArrayClear(stbInfo->tags);
2✔
84
    benchArrayClear(stbInfo->cols);
2✔
85
    int count = 0;
2✔
86
    while ((row = taos_fetch_row(res)) != NULL) {
11✔
87
        if (count == 0) {
9✔
88
            count++;
2✔
89
            continue;
2✔
90
        }
91
        int32_t *lengths = taos_fetch_lengths(res);
7✔
92
        if (lengths == NULL) {
7✔
93
            errorPrint("%s", "failed to execute taos_fetch_length\n");
×
94
            taos_free_result(res);
×
95
            closeBenchConn(conn);
×
96
            return -1;
×
97
        }
98
        if (strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "tag",
7✔
99
                        strlen("tag")) == 0) {
100
            Field* tag = benchCalloc(1, sizeof(Field), true);
3✔
101
            benchArrayPush(stbInfo->tags, tag);
3✔
102
            tag = benchArrayGet(stbInfo->tags, stbInfo->tags->size - 1);
3✔
103
            tag->type = convertStringToDatatype(
6✔
104
                    (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
3✔
105
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX]);
3✔
106
            tag->length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
3✔
107
            tag->min = convertDatatypeToDefaultMin(tag->type);
3✔
108
            tag->max = convertDatatypeToDefaultMax(tag->type);
3✔
109
            tstrncpy(tag->name,
3✔
110
                     (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
111
                     lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX] + 1);
112
        } else {
113
            Field * col = benchCalloc(1, sizeof(Field), true);
4✔
114
            benchArrayPush(stbInfo->cols, col);
4✔
115
            col = benchArrayGet(stbInfo->cols, stbInfo->cols->size - 1);
4✔
116
            col->type = convertStringToDatatype(
8✔
117
                    (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
4✔
118
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX]);
4✔
119
            col->length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
4✔
120
            col->min = convertDatatypeToDefaultMin(col->type);
4✔
121
            col->max = convertDatatypeToDefaultMax(col->type);
4✔
122
            tstrncpy(col->name,
4✔
123
                     (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
124
                     lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX] + 1);
125
        }
126
    }
127
    taos_free_result(res);
2✔
128
    closeBenchConn(conn);
2✔
129
    return 0;
2✔
130
}
131

132
static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) {
134✔
133
    int ret = 0;
134✔
134

135
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
134✔
136
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
134✔
137
             "DESCRIBE `%s`.`%s`", database->dbName,
138
             stbInfo->stbName);
139

140
    if (REST_IFACE == stbInfo->iface) {
134✔
141
        ret = getSuperTableFromServerRest(database, stbInfo, command);
3✔
142
    } else {
143
        ret = getSuperTableFromServerTaosc(database, stbInfo, command);
131✔
144
    }
145

146
    return ret;
134✔
147
}
148

149
static int queryDbExec(SDataBase *database,
124✔
150
                       SSuperTable *stbInfo, char *command) {
151
    int ret = 0;
124✔
152
    if (REST_IFACE == stbInfo->iface) {
124✔
153
        if (0 != convertServAddr(stbInfo->iface, false, 1)) {
1✔
154
            errorPrint("%s", "Failed to convert server address\n");
×
155
            return -1;
×
156
        }
157
        int sockfd = createSockFd();
1✔
158
        if (sockfd < 0) {
1✔
159
            ret = -1;
×
160
        } else {
161
            ret = queryDbExecRest(command,
1✔
162
                              database->dbName,
163
                              database->precision,
164
                              stbInfo->iface,
1✔
165
                              stbInfo->lineProtocol,
1✔
166
                              stbInfo->tcpTransfer,
1✔
167
                              sockfd);
168
            destroySockFd(sockfd);
1✔
169
        }
170
    } else {
171
        SBenchConn* conn = initBenchConn();
123✔
172
        if (NULL == conn) {
123✔
173
            ret = -1;
×
174
        } else {
175
            ret = queryDbExecCall(conn, command);
123✔
176
            int32_t trying = g_arguments->keep_trying;
123✔
177
            while (ret && trying) {
123✔
178
                infoPrint("will sleep %"PRIu32" milliseconds then re-create "
×
179
                          "supertable %s\n",
180
                          g_arguments->trying_interval, stbInfo->stbName);
181
                toolsMsleep(g_arguments->trying_interval);
×
182
                ret = queryDbExecCall(conn, command);
×
183
                if (trying != -1) {
×
184
                    trying--;
×
185
                }
186
            }
187
            if (0 != ret) {
123✔
188
                errorPrint("create supertable %s failed!\n\n",
1✔
189
                       stbInfo->stbName);
190
                ret = -1;
1✔
191
            }
192
            closeBenchConn(conn);
123✔
193
        }
194
    }
195

196
    return ret;
124✔
197
}
198

199
#ifdef WEBSOCKET
200
static void dropSuperTable(SDataBase* database, SSuperTable* stbInfo) {
×
201
    if (g_arguments->supplementInsert) {
×
202
        return;
×
203
    }
204

205
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
206
    snprintf(command, sizeof(command),
×
207
        g_arguments->escape_character
×
208
            ? "DROP TABLE `%s`.`%s`"
209
            : "DROP TABLE %s.%s",
210
             database->dbName,
211
             stbInfo->stbName);
212

213
    infoPrint("drop stable: <%s>\n", command);
×
214
    queryDbExec(database, stbInfo, command);
×
215

216
    return;
×
217
}
218
#endif  // WEBSOCKET
219

220
static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
132✔
221
    if (g_arguments->supplementInsert) {
132✔
222
        return 0;
×
223
    }
224

225
    uint32_t col_buffer_len = (TSDB_COL_NAME_LEN + 15) * stbInfo->cols->size;
132✔
226
    char         *colsBuf = benchCalloc(1, col_buffer_len, false);
132✔
227
    char*         command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
132✔
228
    int          len = 0;
132✔
229

230
    for (int colIndex = 0; colIndex < stbInfo->cols->size; colIndex++) {
926✔
231
        Field * col = benchArrayGet(stbInfo->cols, colIndex);
794✔
232
        int n;
233
        if (col->type == TSDB_DATA_TYPE_BINARY ||
794✔
234
                col->type == TSDB_DATA_TYPE_NCHAR) {
614✔
235
            n = snprintf(colsBuf + len, col_buffer_len - len,
406✔
236
                    ",%s %s(%d)", col->name,
203✔
237
                    convertDatatypeToString(col->type), col->length);
203✔
238
        } else {
239
            n = snprintf(colsBuf + len, col_buffer_len - len,
1,182✔
240
                    ",%s %s", col->name,
591✔
241
                    convertDatatypeToString(col->type));
591✔
242
        }
243
        if (n < 0 || n >= col_buffer_len - len) {
794✔
244
            errorPrint("%s() LN%d, snprintf overflow on %d\n",
×
245
                       __func__, __LINE__, colIndex);
246
            break;
×
247
        } else {
248
            len += n;
794✔
249
        }
250
    }
251

252
    // save for creating child table
253
    stbInfo->colsOfCreateChildTable =
132✔
254
        (char *)benchCalloc(len + TIMESTAMP_BUFF_LEN, 1, true);
132✔
255

256
    snprintf(stbInfo->colsOfCreateChildTable, len + TIMESTAMP_BUFF_LEN,
132✔
257
             "(ts timestamp%s)", colsBuf);
258

259
    if (stbInfo->tags->size == 0) {
132✔
260
        free(colsBuf);
8✔
261
        free(command);
8✔
262
        return 0;
8✔
263
    }
264

265
    uint32_t tag_buffer_len = (TSDB_COL_NAME_LEN + 15) * stbInfo->tags->size;
124✔
266
    char *tagsBuf = benchCalloc(1, tag_buffer_len, false);
124✔
267
    int  tagIndex;
268
    len = 0;
124✔
269

270
    int n;
271
    n = snprintf(tagsBuf + len, tag_buffer_len - len, "(");
124✔
272
    if (n < 0 || n >= tag_buffer_len - len) {
124✔
273
        errorPrint("%s() LN%d snprintf overflow\n",
×
274
                       __func__, __LINE__);
275
        free(colsBuf);
×
276
        free(command);
×
277
        tmfree(tagsBuf);
×
278
        return -1;
×
279
    } else {
280
        len += n;
124✔
281
    }
282
    for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
699✔
283
        Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
578✔
284
        if (tag->type == TSDB_DATA_TYPE_BINARY ||
578✔
285
                tag->type == TSDB_DATA_TYPE_NCHAR) {
377✔
286
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
436✔
287
                    "%s %s(%d),", tag->name,
218✔
288
                    convertDatatypeToString(tag->type), tag->length);
218✔
289
        } else if (tag->type == TSDB_DATA_TYPE_JSON) {
360✔
290
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
3✔
291
                    "%s json", tag->name);
3✔
292
            if (n < 0 || n >= tag_buffer_len - len) {
3✔
293
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
294
                       __func__, __LINE__, tagIndex);
295
                break;
×
296
            } else {
297
                len += n;
3✔
298
            }
299
            goto skip;
3✔
300
        } else {
301
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
714✔
302
                    "%s %s,", tag->name,
357✔
303
                    convertDatatypeToString(tag->type));
357✔
304
        }
305

306
        if (n < 0 || n >= tag_buffer_len - len) {
575✔
307
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
308
                       __func__, __LINE__, tagIndex);
309
            break;
×
310
        } else {
311
            len += n;
575✔
312
        }
313
    }
314
    len -= 1;
121✔
315
skip:
124✔
316
    snprintf(tagsBuf + len, tag_buffer_len - len, ")");
124✔
317

318
    int length = snprintf(
124✔
319
        command, TSDB_MAX_ALLOWED_SQL_LEN,
320
        g_arguments->escape_character
124✔
321
            ? "CREATE TABLE `%s`.`%s` (ts TIMESTAMP%s) TAGS %s"
322
            : "CREATE TABLE %s.%s (ts TIMESTAMP%s) TAGS %s",
323
        database->dbName, stbInfo->stbName, colsBuf, tagsBuf);
324
    tmfree(colsBuf);
124✔
325
    tmfree(tagsBuf);
124✔
326
    if (stbInfo->comment != NULL) {
124✔
327
        length += snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length,
×
328
                           " COMMENT '%s'", stbInfo->comment);
329
    }
330
    if (stbInfo->delay >= 0) {
124✔
331
        length += snprintf(command + length,
×
332
                           TSDB_MAX_ALLOWED_SQL_LEN - length, " DELAY %d",
×
333
                           stbInfo->delay);
334
    }
335
    if (stbInfo->file_factor >= 0) {
124✔
336
        length +=
×
337
            snprintf(command + length,
×
338
                     TSDB_MAX_ALLOWED_SQL_LEN - length, " FILE_FACTOR %f",
×
339
                     (float)stbInfo->file_factor / 100);
×
340
    }
341
    if (stbInfo->rollup != NULL) {
124✔
342
        length += snprintf(command + length,
×
343
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
344
                           " ROLLUP(%s)", stbInfo->rollup);
345
    }
346

347
    if (stbInfo->max_delay != NULL) {
124✔
348
        length += snprintf(command + length,
×
349
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
350
                " MAX_DELAY %s", stbInfo->max_delay);
351
    }
352

353
    if (stbInfo->watermark != NULL) {
124✔
354
        length += snprintf(command + length,
×
355
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
356
                " WATERMARK %s", stbInfo->watermark);
357
    }
358

359
    if (stbInfo->ttl != 0) {
124✔
360
        length += snprintf(command + length,
7✔
361
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
7✔
362
                " TTL %d", stbInfo->ttl);
363
    }
364

365
    bool first_sma = true;
124✔
366
    for (int i = 0; i < stbInfo->cols->size; i++) {
894✔
367
        Field * col = benchArrayGet(stbInfo->cols, i);
770✔
368
        if (col->sma) {
770✔
369
            if (first_sma) {
×
370
                n = snprintf(command + length,
×
371
                                   TSDB_MAX_ALLOWED_SQL_LEN - length,
×
372
                        " SMA(%s", col->name);
×
373
                first_sma = false;
×
374
            } else {
375
                n = snprintf(command + length,
×
376
                                   TSDB_MAX_ALLOWED_SQL_LEN - length,
×
377
                        ",%s", col->name);
×
378
            }
379

380
            if (n < 0 || n > TSDB_MAX_ALLOWED_SQL_LEN - length) {
×
381
                errorPrint("%s() LN%d snprintf overflow on %d iteral\n",
×
382
                           __func__, __LINE__, i);
383
                break;
×
384
            } else {
385
                length += n;
×
386
            }
387
        }
388
    }
389
    if (!first_sma) {
124✔
390
        snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ")");
×
391
    }
392
    infoPrint("create stable: <%s>\n", command);
124✔
393

394
    int ret = queryDbExec(database, stbInfo, command);
124✔
395
    free(command);
124✔
396
    return ret;
124✔
397
}
398

399
#ifdef TD_VER_COMPATIBLE_3_0_0_0
400
int32_t getVgroupsOfDb(SBenchConn *conn, SDataBase *database) {
56✔
401
    int     vgroups = 0;
56✔
402
    char    cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
56✔
403

404
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
56✔
405
            g_arguments->escape_character
56✔
406
            ? "USE `%s`"
407
            : "USE %s",
408
            database->dbName);
409

410
    int32_t   code;
411
    TAOS_RES *res = NULL;
56✔
412

413
    res = taos_query(conn->taos, cmd);
56✔
414
    code = taos_errno(res);
56✔
415
    if (code) {
56✔
416
        printErrCmdCodeStr(cmd, code, res);
×
417
        return -1;
×
418
    }
419
    taos_free_result(res);
56✔
420

421
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "SHOW VGROUPS");
56✔
422
    res = taos_query(conn->taos, cmd);
56✔
423
    code = taos_errno(res);
56✔
424
    if (code) {
56✔
425
        printErrCmdCodeStr(cmd, code, res);
×
426
        return -1;
×
427
    }
428

429
    TAOS_ROW row = NULL;
56✔
430
    while ((row = taos_fetch_row(res)) != NULL) {
168✔
431
        vgroups++;
112✔
432
    }
433
    debugPrint("%s() LN%d, vgroups: %d\n", __func__, __LINE__, vgroups);
56✔
434
    taos_free_result(res);
56✔
435

436
    database->vgroups = vgroups;
56✔
437
    database->vgArray = benchArrayInit(vgroups, sizeof(SVGroup));
56✔
438
    for (int32_t v = 0; (v < vgroups
56✔
439
            && !g_arguments->terminate); v++) {
168✔
440
        SVGroup *vg = benchCalloc(1, sizeof(SVGroup), true);
112✔
441
        benchArrayPush(database->vgArray, vg);
112✔
442
    }
443

444
    res = taos_query(conn->taos, cmd);
56✔
445
    code = taos_errno(res);
56✔
446
    if (code) {
56✔
447
        printErrCmdCodeStr(cmd, code, res);
×
448
        return -1;
×
449
    }
450

451
    int32_t vgItem = 0;
56✔
452
    while (((row = taos_fetch_row(res)) != NULL)
168✔
453
            && !g_arguments->terminate) {
168✔
454
        SVGroup *vg = benchArrayGet(database->vgArray, vgItem);
112✔
455
        vg->vgId = *(int32_t*)row[0];
112✔
456
        vgItem++;
112✔
457
    }
458
    taos_free_result(res);
56✔
459

460
    return vgroups;
56✔
461
}
462
#endif  // TD_VER_COMPATIBLE_3_0_0_0
463

464
int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) {
143✔
465
    int dataLen = 0;
143✔
466
    int n;
467
#ifdef TD_VER_COMPATIBLE_3_0_0_0
468
    if (g_arguments->nthreads_auto || (-1 != g_arguments->inputted_vgroups)) {
143✔
469
        n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
114✔
470
                    g_arguments->escape_character
58✔
471
                        ? "CREATE DATABASE IF NOT EXISTS `%s` VGROUPS %d"
472
                        : "CREATE DATABASE IF NOT EXISTS %s VGROUPS %d",
473
                            database->dbName,
474
                            (-1 != g_arguments->inputted_vgroups)?
58✔
475
                            g_arguments->inputted_vgroups:
2✔
476
                            min(remainVnodes, toolsGetNumberOfCores()));
56✔
477
    } else {
478
        n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
85✔
479
                    g_arguments->escape_character
85✔
480
                        ? "CREATE DATABASE IF NOT EXISTS `%s`"
481
                        : "CREATE DATABASE IF NOT EXISTS %s",
482
                            database->dbName);
483
    }
484
#else
485
    n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
486
                    g_arguments->escape_character
487
                        ? "CREATE DATABASE IF NOT EXISTS `%s`"
488
                        : "CREATE DATABASE IF NOT EXISTS %s", database->dbName);
489
#endif  // TD_VER_COMPATIBLE_3_0_0_0
490
    if (n < 0 || n >= SHORT_1K_SQL_BUFF_LEN - dataLen) {
143✔
491
        errorPrint("%s() LN%d snprintf overflow\n",
×
492
                           __func__, __LINE__);
493
        return -1;
×
494
    } else {
495
        dataLen += n;
143✔
496
    }
497

498
    if (database->cfgs) {
143✔
499
        for (int i = 0; i < database->cfgs->size; i++) {
183✔
500
            SDbCfg* cfg = benchArrayGet(database->cfgs, i);
40✔
501
            if (cfg->valuestring) {
40✔
502
                n = snprintf(command + dataLen,
×
503
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
×
504
                            " %s %s", cfg->name, cfg->valuestring);
505
            } else {
506
                n = snprintf(command + dataLen,
40✔
507
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
40✔
508
                            " %s %d", cfg->name, cfg->valueint);
509
            }
510
            if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - dataLen) {
40✔
511
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
512
                           __func__, __LINE__, i);
513
                break;
×
514
            } else {
515
                dataLen += n;
40✔
516
            }
517
        }
518
    }
519

520
    switch (database->precision) {
143✔
521
        case TSDB_TIME_PRECISION_MILLI:
141✔
522
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
141✔
523
                                " PRECISION \'ms\';");
524
            break;
141✔
525
        case TSDB_TIME_PRECISION_MICRO:
×
526
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
×
527
                                " PRECISION \'us\';");
528
            break;
×
529
        case TSDB_TIME_PRECISION_NANO:
2✔
530
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
2✔
531
                                " PRECISION \'ns\';");
532
            break;
2✔
533
    }
534

535
    return dataLen;
143✔
536
}
537

538
int createDatabaseRest(SDataBase* database) {
3✔
539
    int32_t code = 0;
3✔
540
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
3✔
541

542
    int sockfd = createSockFd();
3✔
543
    if (sockfd < 0) {
3✔
544
        return -1;
×
545
    }
546

547
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
3✔
548
            g_arguments->escape_character
3✔
549
                ? "DROP DATABASE IF EXISTS `%s`;"
550
                : "DROP DATABASE IF EXISTS %s;",
551
             database->dbName);
552
    code = postProceSql(command,
3✔
553
                        database->dbName,
554
                        database->precision,
555
                        REST_IFACE,
556
                        0,
557
                        g_arguments->port,
3✔
558
                        false,
559
                        sockfd,
560
                        NULL);
561
    if (code != 0) {
3✔
562
        errorPrint("Failed to drop database %s\n", database->dbName);
×
563
    } else {
564
        int remainVnodes = INT_MAX;
3✔
565
        geneDbCreateCmd(database, command, remainVnodes);
3✔
566
        code = postProceSql(command,
3✔
567
                            database->dbName,
568
                            database->precision,
569
                            REST_IFACE,
570
                            0,
571
                            g_arguments->port,
3✔
572
                            false,
573
                            sockfd,
574
                            NULL);
575
        int32_t trying = g_arguments->keep_trying;
3✔
576
        while (code && trying) {
3✔
577
            infoPrint("will sleep %"PRIu32" milliseconds then "
×
578
                  "re-create database %s\n",
579
                  g_arguments->trying_interval, database->dbName);
580
            toolsMsleep(g_arguments->trying_interval);
×
581
            code = postProceSql(command,
×
582
                            database->dbName,
583
                            database->precision,
584
                            REST_IFACE,
585
                            0,
586
                            g_arguments->port,
×
587
                            false,
588
                            sockfd,
589
                            NULL);
590
            if (trying != -1) {
×
591
                trying--;
×
592
            }
593
        }
594
    }
595
    destroySockFd(sockfd);
3✔
596
    return code;
3✔
597
}
598

599
int32_t getRemainVnodes(SBenchConn *conn) {
56✔
600
    int remainVnodes = 0;
56✔
601
    char command[SHORT_1K_SQL_BUFF_LEN] = "SHOW DNODES";
56✔
602

603
    TAOS_RES *res = taos_query(conn->taos, command);
56✔
604
    int32_t   code = taos_errno(res);
56✔
605
    if (code) {
56✔
606
        printErrCmdCodeStr(command, code, res);
×
607
        closeBenchConn(conn);
×
608
        return -1;
×
609
    }
610
    TAOS_ROW row = NULL;
56✔
611
    while ((row = taos_fetch_row(res)) != NULL) {
112✔
612
        remainVnodes += (*(int16_t*)(row[3]) - *(int16_t*)(row[2]));
56✔
613
    }
614
    debugPrint("%s() LN%d, remainVnodes: %d\n",
56✔
615
               __func__, __LINE__, remainVnodes);
616
    taos_free_result(res);
56✔
617
    return remainVnodes;
56✔
618
}
619

620
int createDatabaseTaosc(SDataBase* database) {
143✔
621
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
143✔
622
    SBenchConn* conn = initBenchConn();
143✔
623
    if (NULL == conn) {
143✔
624
        return -1;
3✔
625
    }
626
    if (g_arguments->taosc_version == 3) {
140✔
627
        for (int i = 0; i < g_arguments->streams->size; i++) {
142✔
628
            SSTREAM* stream = benchArrayGet(g_arguments->streams, i);
2✔
629
            if (stream->drop) {
2✔
630
                snprintf(command, SHORT_1K_SQL_BUFF_LEN,
2✔
631
                         "DROP STREAM IF EXISTS %s;",
632
                        stream->stream_name);
2✔
633
                if (queryDbExecCall(conn, command)) {
2✔
634
                    closeBenchConn(conn);
×
635
                    return -1;
×
636
                }
637
                infoPrint("%s\n", command);
2✔
638
                memset(command, 0, SHORT_1K_SQL_BUFF_LEN);
2✔
639
            }
640
        }
641
    }
642

643
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
140✔
644
            g_arguments->escape_character
140✔
645
                ? "DROP DATABASE IF EXISTS `%s`;":
646
            "DROP DATABASE IF EXISTS %s;",
647
             database->dbName);
648
    if (0 != queryDbExecCall(conn, command)) {
140✔
649
#ifdef WEBSOCKET
650
        if (g_arguments->websocket) {
×
651
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
652
                      "to drop database! DROP DATABASE failure is ignored!\n");
653
        } else {
654
#endif
655
            closeBenchConn(conn);
×
656
            return -1;
×
657
#ifdef WEBSOCKET
658
        }
659
#endif
660
    }
661

662
    int remainVnodes = INT_MAX;
140✔
663
#ifdef TD_VER_COMPATIBLE_3_0_0_0
664
    if (g_arguments->nthreads_auto) {
140✔
665
        remainVnodes = getRemainVnodes(conn);
56✔
666
        if (0 >= remainVnodes) {
56✔
667
            errorPrint("Remain vnodes %d, failed to create database\n",
×
668
                       remainVnodes);
669
            return -1;
×
670
        }
671
    }
672
#endif
673
    geneDbCreateCmd(database, command, remainVnodes);
140✔
674

675
    int32_t code = queryDbExecCall(conn, command);
140✔
676
    int32_t trying = g_arguments->keep_trying;
140✔
677
    while (code && trying) {
140✔
678
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
679
                  "re-create database %s\n",
680
                  g_arguments->trying_interval, database->dbName);
681
        toolsMsleep(g_arguments->trying_interval);
×
682
        code = queryDbExecCall(conn, command);
×
683
        if (trying != -1) {
×
684
            trying--;
×
685
        }
686
    }
687

688
    if (code) {
140✔
689
#ifdef WEBSOCKET
690
        if (g_arguments->websocket) {
4✔
691
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
692
                      "to create database! CREATE DATABASE "
693
                      "failure is ignored!\n");
694
        } else {
695
#endif
696

697
            closeBenchConn(conn);
4✔
698
            errorPrint("\ncreate database %s failed!\n\n",
4✔
699
               database->dbName);
700
            return -1;
4✔
701
#ifdef WEBSOCKET
702
        }
703
#endif
704
    }
705
    infoPrint("command to create database: <%s>\n", command);
136✔
706

707
#ifdef TD_VER_COMPATIBLE_3_0_0_0
708
    if (database->superTbls) {
136✔
709
        if (g_arguments->nthreads_auto) {
135✔
710
            int32_t vgroups = getVgroupsOfDb(conn, database);
56✔
711
            if (vgroups <=0) {
56✔
712
                closeBenchConn(conn);
×
713
                errorPrint("Database %s's vgroups is %d\n",
×
714
                           database->dbName, vgroups);
715
                return -1;
×
716
            }
717
        }
718
    }
719
#endif  // TD_VER_COMPATIBLE_3_0_0_0
720

721
    closeBenchConn(conn);
136✔
722
    return 0;
136✔
723
}
724

725
int createDatabase(SDataBase* database) {
146✔
726
    int ret = 0;
146✔
727
    if (REST_IFACE == g_arguments->iface) {
146✔
728
        ret = createDatabaseRest(database);
3✔
729
    } else {
730
        ret = createDatabaseTaosc(database);
143✔
731
    }
732
#if 0
733
#ifdef LINUX
734
    infoPrint("%s() LN%d, ret: %d\n", __func__, __LINE__, ret);
735
    sleep(10);
736
    infoPrint("%s() LN%d, ret: %d\n", __func__, __LINE__, ret);
737
#elif defined(DARWIN)
738
    sleep(2);
739
#else
740
    Sleep(2);
741
#endif
742
#endif
743

744
    return ret;
146✔
745
}
746

747
static int generateChildTblName(int len, char *buffer, SDataBase *database,
20,904✔
748
                                SSuperTable *stbInfo, uint64_t i,
749
                                char *ttl) {
750
    if (0 == len) {
20,904✔
751
        memset(buffer, 0, TSDB_MAX_ALLOWED_SQL_LEN);
20,583✔
752
        len += snprintf(buffer + len,
20,583✔
753
                        TSDB_MAX_ALLOWED_SQL_LEN - len, "CREATE TABLE ");
20,583✔
754
    }
755

756
    len += snprintf(
20,904✔
757
            buffer + len, TSDB_MAX_ALLOWED_SQL_LEN - len,
20,904✔
758
            g_arguments->escape_character
20,904✔
759
            ? "`%s`.`%s%" PRIu64 "` USING `%s`.`%s` TAGS (%s) %s "
760
            : "%s.%s%" PRIu64 " USING %s.%s TAGS (%s) %s ",
761
            database->dbName, stbInfo->childTblPrefix, i, database->dbName,
762
            stbInfo->stbName,
763
            stbInfo->tagDataBuf + i * stbInfo->lenOfTags, ttl);
20,904✔
764

765
    return len;
20,904✔
766
}
767

768
static int getBatchOfTblCreating(threadInfo *pThreadInfo,
20,904✔
769
                                         SSuperTable *stbInfo) {
770
    BArray *batchArray = stbInfo->batchTblCreatingNumbersArray;
20,904✔
771
    if (batchArray) {
20,904✔
772
        int *batch = benchArrayGet(
20✔
773
                batchArray, pThreadInfo->posOfTblCreatingBatch);
20✔
774
        pThreadInfo->posOfTblCreatingBatch++;
20✔
775
        if (pThreadInfo->posOfTblCreatingBatch == batchArray->size) {
20✔
776
            pThreadInfo->posOfTblCreatingBatch = 0;
4✔
777
        }
778
        return *batch;
20✔
779
    }
780
    return 0;
20,884✔
781
}
782

783
static int getIntervalOfTblCreating(threadInfo *pThreadInfo,
20,245✔
784
                                         SSuperTable *stbInfo) {
785
    BArray *intervalArray = stbInfo->batchTblCreatingIntervalsArray;
20,245✔
786
    if (intervalArray) {
20,245✔
787
        int *interval = benchArrayGet(
8✔
788
                intervalArray, pThreadInfo->posOfTblCreatingInterval);
8✔
789
        pThreadInfo->posOfTblCreatingInterval++;
8✔
790
        if (pThreadInfo->posOfTblCreatingInterval == intervalArray->size) {
8✔
791
            pThreadInfo->posOfTblCreatingInterval = 0;
2✔
792
        }
793
        return *interval;
8✔
794
    }
795
    return 0;
20,237✔
796
}
797

798
static void *createTable(void *sarg) {
525✔
799
    if (g_arguments->supplementInsert) {
525✔
800
        return NULL;
1✔
801
    }
802

803
    threadInfo * pThreadInfo = (threadInfo *)sarg;
524✔
804
    SDataBase *  database = pThreadInfo->dbInfo;
524✔
805
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
524✔
806
#ifdef LINUX
807
    prctl(PR_SET_NAME, "createTable");
524✔
808
#endif
809
    uint64_t lastPrintTime = toolsGetTimestampMs();
524✔
810
    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
524✔
811
    int len = 0;
524✔
812
    int batchNum = 0;
524✔
813
    infoPrint(
524✔
814
              "thread[%d] start creating table from %" PRIu64 " to %" PRIu64
815
              "\n",
816
              pThreadInfo->threadID, pThreadInfo->start_table_from,
817
              pThreadInfo->end_table_to);
818

819
    char ttl[SMALL_BUFF_LEN] = "";
524✔
820
    if (stbInfo->ttl != 0) {
524✔
821
        snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
20✔
822
    }
823

824
    int smallBatchCount = 0;
524✔
825
    for (uint64_t i = pThreadInfo->start_table_from + stbInfo->childTblFrom;
524✔
826
            (i <= (pThreadInfo->end_table_to + stbInfo->childTblFrom)
21,469✔
827
             && !g_arguments->terminate); i++) {
21,469✔
828
        if (g_arguments->terminate) {
20,944✔
829
            goto create_table_end;
×
830
        }
831
        if (!stbInfo->use_metric || stbInfo->tags->size == 0) {
20,944✔
832
            if (stbInfo->childTblCount == 1) {
40✔
833
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
3✔
834
                         g_arguments->escape_character
3✔
835
                         ? "CREATE TABLE `%s`.`%s` %s;"
836
                         : "CREATE TABLE %s.%s %s;",
837
                         database->dbName, stbInfo->stbName,
838
                         stbInfo->colsOfCreateChildTable);
839
            } else {
840
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
37✔
841
                         g_arguments->escape_character
37✔
842
                         ? "CREATE TABLE `%s`.`%s` %s;"
843
                         : "CREATE TABLE %s.%s %s;",
844
                         database->dbName,
845
                         stbInfo->childTblArray[i]->name,
37✔
846
                         stbInfo->colsOfCreateChildTable);
847
            }
848
            batchNum++;
40✔
849
        } else {
850
            if (0 == len) {
20,904✔
851
                batchNum = 0;
20,583✔
852
            }
853
            len = generateChildTblName(len, pThreadInfo->buffer,
20,904✔
854
                                       database, stbInfo, i, ttl);
855

856
            batchNum++;
20,904✔
857
            smallBatchCount++;
20,904✔
858

859
            int smallBatch = getBatchOfTblCreating(pThreadInfo, stbInfo);
20,904✔
860
            if ((!smallBatch || (smallBatchCount == smallBatch))
20,903✔
861
                    && (batchNum < stbInfo->batchTblCreatingNum)
20,895✔
862
                    && ((TSDB_MAX_ALLOWED_SQL_LEN - len) >=
701✔
863
                        (stbInfo->lenOfTags + EXTRA_SQL_LEN))) {
701✔
864
                continue;
701✔
865
            } else {
866
                smallBatchCount = 0;
20,202✔
867
            }
868
        }
869

870
        len = 0;
20,242✔
871

872
        int ret = 0;
20,242✔
873
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
20,242✔
874
                   pThreadInfo->buffer);
875
        if (REST_IFACE == stbInfo->iface) {
20,241✔
876
            ret = queryDbExecRest(pThreadInfo->buffer,
12✔
877
                                  database->dbName,
878
                                  database->precision,
879
                                  stbInfo->iface,
12✔
880
                                  stbInfo->lineProtocol,
12✔
881
                                  stbInfo->tcpTransfer,
12✔
882
                                  pThreadInfo->sockfd);
883
        } else {
884
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
20,229✔
885
            int32_t trying = g_arguments->keep_trying;
20,233✔
886
            while (ret && trying) {
20,233✔
887
                infoPrint("will sleep %"PRIu32" milliseconds then re-create "
×
888
                          "table %s\n",
889
                          g_arguments->trying_interval, pThreadInfo->buffer);
890
                toolsMsleep(g_arguments->trying_interval);
×
891
                ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
892
                if (trying != -1) {
×
893
                    trying--;
×
894
                }
895
            }
896
        }
897

898
        if (0 != ret) {
20,245✔
899
            g_fail = true;
×
900
            goto create_table_end;
×
901
        }
902
        uint64_t intervalOfTblCreating = getIntervalOfTblCreating(pThreadInfo,
20,245✔
903
                                                                  stbInfo);
904
        if (intervalOfTblCreating) {
20,245✔
905
            debugPrint("will sleep %"PRIu64" milliseconds "
8✔
906
                       "for table creating interval\n", intervalOfTblCreating);
907
            toolsMsleep(intervalOfTblCreating);
8✔
908
        }
909

910
        pThreadInfo->tables_created += batchNum;
20,245✔
911
        batchNum = 0;
20,245✔
912
        uint64_t currentPrintTime = toolsGetTimestampMs();
20,245✔
913
        if (currentPrintTime - lastPrintTime > PRINT_STAT_INTERVAL) {
20,244✔
914
            infoPrint(
×
915
                       "thread[%d] already created %" PRId64 " tables\n",
916
                       pThreadInfo->threadID, pThreadInfo->tables_created);
917
            lastPrintTime = currentPrintTime;
×
918
        }
919
    }
920

921
    if (0 != len) {
525✔
922
        int ret = 0;
380✔
923
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
380✔
924
                   pThreadInfo->buffer);
925
        if (REST_IFACE == stbInfo->iface) {
380✔
926
            ret = queryDbExecRest(pThreadInfo->buffer,
8✔
927
                                  database->dbName,
928
                                  database->precision,
929
                                  stbInfo->iface,
8✔
930
                                  stbInfo->lineProtocol,
8✔
931
                                  stbInfo->tcpTransfer,
8✔
932
                                  pThreadInfo->sockfd);
933
        } else {
934
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
372✔
935
        }
936
        if (0 != ret) {
380✔
937
            g_fail = true;
×
938
            goto create_table_end;
×
939
        }
940
        pThreadInfo->tables_created += batchNum;
380✔
941
        debugPrint("thread[%d] already created %" PRId64 " tables\n",
380✔
942
                   pThreadInfo->threadID, pThreadInfo->tables_created);
943
    }
944
create_table_end:
523✔
945
    tmfree(pThreadInfo->buffer);
525✔
946
    pThreadInfo->buffer = NULL;
524✔
947
    return NULL;
524✔
948
}
949

950
static int startMultiThreadCreateChildTable(
126✔
951
        SDataBase* database, SSuperTable* stbInfo) {
952
    int code = -1;
126✔
953
    int          threads = g_arguments->table_threads;
126✔
954
    int64_t      ntables;
955
    if (stbInfo->childTblTo > 0) {
126✔
956
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
1✔
957
    } else {
958
        ntables = stbInfo->childTblCount;
125✔
959
    }
960
    pthread_t   *pids = benchCalloc(1, threads * sizeof(pthread_t), false);
126✔
961
    threadInfo  *infos = benchCalloc(1, threads * sizeof(threadInfo), false);
126✔
962
    uint64_t     tableFrom = 0;
126✔
963
    if (threads < 1) {
126✔
964
        threads = 1;
×
965
    }
966

967
    int64_t a = ntables / threads;
126✔
968
    if (a < 1) {
126✔
969
        threads = (int)ntables;
71✔
970
        a = 1;
71✔
971
    }
972

973
    if (ntables == 0) {
126✔
974
        errorPrint("failed to create child table, childTblCount: %"PRId64"\n",
×
975
                ntables);
976
        goto over;
×
977
    }
978
    int64_t b = ntables % threads;
126✔
979

980
    for (uint32_t i = 0; (i < threads && !g_arguments->terminate); i++) {
651✔
981
        threadInfo *pThreadInfo = infos + i;
525✔
982
        pThreadInfo->threadID = i;
525✔
983
        pThreadInfo->stbInfo = stbInfo;
525✔
984
        pThreadInfo->dbInfo = database;
525✔
985
        if (REST_IFACE == stbInfo->iface) {
525✔
986
            int sockfd = createSockFd();
17✔
987
            if (sockfd < 0) {
17✔
988
                FREE_PIDS_INFOS_RETURN_MINUS_1();
×
989
            }
990
            pThreadInfo->sockfd = sockfd;
17✔
991
        } else {
992
            pThreadInfo->conn = initBenchConn();
508✔
993
            if (NULL == pThreadInfo->conn) {
508✔
994
                goto over;
×
995
            }
996
        }
997
        pThreadInfo->start_table_from = tableFrom;
525✔
998
        pThreadInfo->ntables = i < b ? a + 1 : a;
525✔
999
        pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
525✔
1000
        tableFrom = pThreadInfo->end_table_to + 1;
525✔
1001
        pThreadInfo->tables_created = 0;
525✔
1002
        pthread_create(pids + i, NULL, createTable, pThreadInfo);
525✔
1003
    }
1004

1005
    for (int i = 0; (i < threads && pids[i] !=0); i++) {
651✔
1006
        pthread_join(pids[i], NULL);
525✔
1007
    }
1008

1009
    if (g_arguments->terminate)  toolsMsleep(100);
126✔
1010

1011
    for (int i = 0; i < threads; i++) {
651✔
1012
        threadInfo *pThreadInfo = infos + i;
525✔
1013
        g_arguments->actualChildTables += pThreadInfo->tables_created;
525✔
1014

1015
        if ((REST_IFACE != stbInfo->iface) && pThreadInfo->conn) {
525✔
1016
            closeBenchConn(pThreadInfo->conn);
508✔
1017
        }
1018
    }
1019

1020
    if (g_fail) {
126✔
1021
        goto over;
×
1022
    }
1023
    code = 0;
126✔
1024
over:
126✔
1025
    free(pids);
126✔
1026
    free(infos);
126✔
1027
    return code;
126✔
1028
}
1029

1030
static int createChildTables() {
146✔
1031
    int32_t    code;
1032
    infoPrint("start creating %" PRId64 " table(s) with %d thread(s)\n",
146✔
1033
              g_arguments->totalChildTables, g_arguments->table_threads);
1034
    if (g_arguments->fpOfInsertResult) {
146✔
1035
        infoPrintToFile(g_arguments->fpOfInsertResult,
146✔
1036
                  "start creating %" PRId64 " table(s) with %d thread(s)\n",
1037
                  g_arguments->totalChildTables, g_arguments->table_threads);
1038
    }
1039
    double start = (double)toolsGetTimestampMs();
146✔
1040

1041
    for (int i = 0; (i < g_arguments->databases->size
146✔
1042
            && !g_arguments->terminate); i++) {
293✔
1043
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
147✔
1044
        if (database->superTbls) {
147✔
1045
            for (int j = 0; (j < database->superTbls->size
146✔
1046
                    && !g_arguments->terminate); j++) {
360✔
1047
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
214✔
1048
                if (stbInfo->autoTblCreating || stbInfo->iface == SML_IFACE
214✔
1049
                        || stbInfo->iface == SML_REST_IFACE) {
149✔
1050
                    g_arguments->autoCreatedChildTables +=
79✔
1051
                            stbInfo->childTblCount;
79✔
1052
                    continue;
79✔
1053
                }
1054
                if (stbInfo->childTblExists) {
135✔
1055
                    g_arguments->existedChildTables +=
9✔
1056
                            stbInfo->childTblCount;
9✔
1057
                    continue;
9✔
1058
                }
1059
                debugPrint("colsOfCreateChildTable: %s\n",
126✔
1060
                        stbInfo->colsOfCreateChildTable);
1061

1062
                code = startMultiThreadCreateChildTable(database, stbInfo);
126✔
1063
                if (code && !g_arguments->terminate) {
126✔
1064
                    return code;
×
1065
                }
1066
            }
1067
        }
1068
    }
1069

1070
    double end = (double)toolsGetTimestampMs();
146✔
1071
    succPrint(
146✔
1072
            "Spent %.4f seconds to create %" PRId64
1073
            " table(s) with %d thread(s), already exist %" PRId64
1074
            " table(s), actual %" PRId64 " table(s) pre created, %" PRId64
1075
            " table(s) will be auto created\n",
1076
            (end - start) / 1000.0, g_arguments->totalChildTables,
1077
            g_arguments->table_threads, g_arguments->existedChildTables,
1078
            g_arguments->actualChildTables,
1079
            g_arguments->autoCreatedChildTables);
1080
    return 0;
146✔
1081
}
1082

1083
static void freeChildTable(SChildTable *childTbl, int colsSize) {
41,684✔
1084
    if (childTbl->useOwnSample) {
41,684✔
1085
        if (childTbl->childCols) {
10,150✔
1086
            for (int col = 0; col < colsSize; col++) {
12✔
1087
                ChildField *childCol =
1088
                    benchArrayGet(childTbl->childCols, col);
8✔
1089
                if (childCol) {
8✔
1090
                    tmfree(childCol->stmtData.data);
8✔
1091
                    tmfree(childCol->stmtData.is_null);
8✔
1092
                }
1093
            }
1094
            benchArrayDestroy(childTbl->childCols);
4✔
1095
        }
1096
        tmfree(childTbl->sampleDataBuf);
10,150✔
1097
    }
1098
    tmfree(childTbl);
41,684✔
1099
}
41,684✔
1100

1101
void postFreeResource() {
172✔
1102
    if (!g_arguments->terminate) {
172✔
1103
        tmfclose(g_arguments->fpOfInsertResult);
171✔
1104
    }
1105

1106
    for (int i = 0; i < g_arguments->databases->size; i++) {
345✔
1107
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
173✔
1108
        if (database->cfgs) {
173✔
1109
            for (int c = 0; c < database->cfgs->size; c++) {
215✔
1110
                SDbCfg *cfg = benchArrayGet(database->cfgs, c);
42✔
1111
                if ((NULL == root) && (0 == strcmp(cfg->name, "replica"))) {
42✔
1112
                    tmfree(cfg->name);
1✔
1113
                    cfg->name = NULL;
1✔
1114
                }
1115
            }
1116
            benchArrayDestroy(database->cfgs);
173✔
1117
        }
1118
        if (database->superTbls) {
173✔
1119
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
415✔
1120
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
243✔
1121
                tmfree(stbInfo->colsOfCreateChildTable);
243✔
1122
                stbInfo->colsOfCreateChildTable = NULL;
243✔
1123
                tmfree(stbInfo->sampleDataBuf);
243✔
1124
                stbInfo->sampleDataBuf = NULL;
243✔
1125
                tmfree(stbInfo->tagDataBuf);
243✔
1126
                stbInfo->tagDataBuf = NULL;
243✔
1127
                tmfree(stbInfo->partialColNameBuf);
243✔
1128
                stbInfo->partialColNameBuf = NULL;
243✔
1129
                benchArrayDestroy(stbInfo->batchTblCreatingNumbersArray);
243✔
1130
                benchArrayDestroy(stbInfo->batchTblCreatingIntervalsArray);
243✔
1131
                for (int k = 0; k < stbInfo->tags->size; k++) {
1,559✔
1132
                    Field * tag = benchArrayGet(stbInfo->tags, k);
1,316✔
1133
                    tmfree(tag->stmtData.data);
1,316✔
1134
                    tag->stmtData.data = NULL;
1,316✔
1135
                }
1136
                benchArrayDestroy(stbInfo->tags);
243✔
1137

1138
                for (int k = 0; k < stbInfo->cols->size; k++) {
1,405✔
1139
                    Field * col = benchArrayGet(stbInfo->cols, k);
1,162✔
1140
                    tmfree(col->stmtData.data);
1,162✔
1141
                    col->stmtData.data = NULL;
1,162✔
1142
                    tmfree(col->stmtData.is_null);
1,162✔
1143
                    col->stmtData.is_null = NULL;
1,162✔
1144
                }
1145
                if (g_arguments->test_mode == INSERT_TEST) {
243✔
1146
                    if (stbInfo->childTblArray) {
225✔
1147
                        for (int64_t child = 0; child < stbInfo->childTblCount;
41,898✔
1148
                                child++) {
41,684✔
1149
                            SChildTable *childTbl =
41,684✔
1150
                                stbInfo->childTblArray[child];
41,684✔
1151
                            if (childTbl) {
41,684✔
1152
                                freeChildTable(childTbl, stbInfo->cols->size);
41,684✔
1153
                            }
1154
                        }
1155
                    }
1156
                }
1157
                benchArrayDestroy(stbInfo->cols);
243✔
1158
                tmfree(stbInfo->childTblArray);
243✔
1159
                stbInfo->childTblArray = NULL;
243✔
1160
                benchArrayDestroy(stbInfo->tsmas);
243✔
1161
#ifdef TD_VER_COMPATIBLE_3_0_0_0
1162
                if ((0 == stbInfo->interlaceRows)
243✔
1163
                        && (g_arguments->nthreads_auto)) {
219✔
1164
                    for (int32_t v = 0; v < database->vgroups; v++) {
167✔
1165
                        SVGroup *vg = benchArrayGet(database->vgArray, v);
110✔
1166
                        tmfree(vg->childTblArray);
110✔
1167
                        vg->childTblArray = NULL;
110✔
1168
                    }
1169
                }
1170
#endif  // TD_VER_COMPATIBLE_3_0_0_0
1171
            }
1172
#ifdef TD_VER_COMPATIBLE_3_0_0_0
1173
            if (database->vgArray)
172✔
1174
                benchArrayDestroy(database->vgArray);
56✔
1175
#endif  // TD_VER_COMPATIBLE_3_0_0_0
1176
            benchArrayDestroy(database->superTbls);
172✔
1177
        }
1178
    }
1179
    benchArrayDestroy(g_arguments->databases);
172✔
1180
    benchArrayDestroy(g_arguments->streams);
172✔
1181
    tools_cJSON_Delete(root);
172✔
1182
}
172✔
1183

1184
int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
56,587✔
1185
    SDataBase *  database = pThreadInfo->dbInfo;
56,587✔
1186
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
56,587✔
1187
    TAOS_RES *   res = NULL;
56,587✔
1188
    int32_t      code = 0;
56,587✔
1189
    uint16_t     iface = stbInfo->iface;
56,587✔
1190

1191
    int32_t trying = (stbInfo->keep_trying)?
113,174✔
1192
        stbInfo->keep_trying:g_arguments->keep_trying;
56,587✔
1193
    int32_t trying_interval = stbInfo->trying_interval?
113,174✔
1194
        stbInfo->trying_interval:g_arguments->trying_interval;
56,587✔
1195
    int protocol = stbInfo->lineProtocol;
56,587✔
1196

1197
    switch (iface) {
56,587✔
1198
        case TAOSC_IFACE:
55,368✔
1199
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
55,368✔
1200
            code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
55,368✔
1201
            while (code && trying && !g_arguments->terminate) {
55,367✔
1202
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1203
                          trying_interval);
1204
                toolsMsleep(trying_interval);
×
1205
                code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
1206
                if (trying != -1) {
×
1207
                    trying--;
×
1208
                }
1209
            }
1210
            break;
55,367✔
1211

1212
        case REST_IFACE:
28✔
1213
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
28✔
1214
            code = postProceSql(pThreadInfo->buffer,
28✔
1215
                                database->dbName,
1216
                                database->precision,
1217
                                stbInfo->iface,
28✔
1218
                                stbInfo->lineProtocol,
28✔
1219
                                g_arguments->port,
28✔
1220
                                stbInfo->tcpTransfer,
28✔
1221
                                pThreadInfo->sockfd,
1222
                                pThreadInfo->filePath);
28✔
1223
            while (code && trying && !g_arguments->terminate) {
28✔
1224
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1225
                          trying_interval);
1226
                toolsMsleep(trying_interval);
×
1227
                code = postProceSql(pThreadInfo->buffer,
×
1228
                                    database->dbName,
1229
                                    database->precision,
1230
                                    stbInfo->iface,
×
1231
                                    stbInfo->lineProtocol,
×
1232
                                    g_arguments->port,
×
1233
                                    stbInfo->tcpTransfer,
×
1234
                                    pThreadInfo->sockfd,
1235
                                    pThreadInfo->filePath);
×
1236
                if (trying != -1) {
×
1237
                    trying--;
×
1238
                }
1239
            }
1240
            break;
28✔
1241

1242
        case STMT_IFACE:
255✔
1243
            code = taos_stmt_execute(pThreadInfo->conn->stmt);
255✔
1244
            if (code) {
256✔
1245
                errorPrint(
×
1246
                           "failed to execute insert statement. reason: %s\n",
1247
                           taos_stmt_errstr(pThreadInfo->conn->stmt));
1248
                code = -1;
×
1249
            }
1250
            break;
256✔
1251

1252
        case SML_IFACE:
774✔
1253
            res = taos_schemaless_insert(
1,218✔
1254
                pThreadInfo->conn->taos, pThreadInfo->lines,
774✔
1255
                (TSDB_SML_JSON_PROTOCOL == protocol
1256
                    || SML_JSON_TAOS_FORMAT == protocol)
444✔
1257
                    ? 0 : k,
1258
                (SML_JSON_TAOS_FORMAT == protocol)
1259
                    ? TSDB_SML_JSON_PROTOCOL : protocol,
1260
                (TSDB_SML_LINE_PROTOCOL == protocol)
1261
                    ? database->sml_precision
1262
                    : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1263
            code = taos_errno(res);
774✔
1264
            trying = stbInfo->keep_trying;
773✔
1265
            while (code && trying && !g_arguments->terminate) {
773✔
1266
                taos_free_result(res);
×
1267
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1268
                          trying_interval);
1269
                toolsMsleep(trying_interval);
×
1270
                res = taos_schemaless_insert(
×
1271
                        pThreadInfo->conn->taos, pThreadInfo->lines,
×
1272
                        (TSDB_SML_JSON_PROTOCOL == protocol
1273
                            || SML_JSON_TAOS_FORMAT == protocol)
×
1274
                            ? 0 : k,
1275
                        (SML_JSON_TAOS_FORMAT == protocol)
1276
                            ? TSDB_SML_JSON_PROTOCOL : protocol,
1277
                        (TSDB_SML_LINE_PROTOCOL == protocol)
1278
                            ? database->sml_precision
1279
                            : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1280
                code = taos_errno(res);
×
1281
                if (trying != -1) {
×
1282
                    trying--;
×
1283
                }
1284
            }
1285

1286
            if (code != TSDB_CODE_SUCCESS && !g_arguments->terminate) {
773✔
1287
                debugPrint("Failed to execute "
×
1288
                           "schemaless insert content: %s\n\n",
1289
                        pThreadInfo->lines?(pThreadInfo->lines[0]?
1290
                            pThreadInfo->lines[0]:""):"");
1291
                errorPrint(
×
1292
                    "failed to execute schemaless insert. "
1293
                        "code: 0x%08x reason: %s\n\n",
1294
                        code, taos_errstr(res));
1295
            }
1296
            taos_free_result(res);
773✔
1297
            break;
774✔
1298

1299
        case SML_REST_IFACE: {
161✔
1300
            if (TSDB_SML_JSON_PROTOCOL == protocol
161✔
1301
                    || SML_JSON_TAOS_FORMAT == protocol) {
116✔
1302
                code = postProceSql(pThreadInfo->lines[0], database->dbName,
46✔
1303
                                    database->precision, stbInfo->iface,
46✔
1304
                                    protocol, g_arguments->port,
46✔
1305
                                    stbInfo->tcpTransfer,
46✔
1306
                                    pThreadInfo->sockfd, pThreadInfo->filePath);
46✔
1307
            } else {
1308
                int len = 0;
115✔
1309
                for (int i = 0; i < k; i++) {
1,078✔
1310
                    if (strlen(pThreadInfo->lines[i]) != 0) {
963✔
1311
                        int n;
1312
                        if (TSDB_SML_TELNET_PROTOCOL == protocol
963✔
1313
                                && stbInfo->tcpTransfer) {
641✔
1314
                            n = snprintf(pThreadInfo->buffer + len,
320✔
1315
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
320✔
1316
                                           "put %s\n", pThreadInfo->lines[i]);
320✔
1317
                        } else {
1318
                            n = snprintf(pThreadInfo->buffer + len,
643✔
1319
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
643✔
1320
                                            "%s\n",
1321
                                           pThreadInfo->lines[i]);
643✔
1322
                        }
1323
                        if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - len) {
963✔
1324
                            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1325
                                __func__, __LINE__, i);
1326
                            break;
×
1327
                        } else {
1328
                            len += n;
963✔
1329
                        }
1330
                    } else {
1331
                        break;
×
1332
                    }
1333
                }
1334
                if (g_arguments->terminate) {
115✔
1335
                    break;
×
1336
                }
1337
                code = postProceSql(pThreadInfo->buffer, database->dbName,
115✔
1338
                        database->precision,
1339
                        stbInfo->iface, protocol,
115✔
1340
                        g_arguments->port,
115✔
1341
                        stbInfo->tcpTransfer,
115✔
1342
                        pThreadInfo->sockfd, pThreadInfo->filePath);
115✔
1343
            }
1344
            break;
161✔
1345
        }
1346
    }
1347
    return code;
56,587✔
1348
}
1349

1350
static int smartContinueIfFail(threadInfo *pThreadInfo,
1✔
1351
                               SChildTable *childTbl,
1352
                               int64_t i,
1353
                               char *ttl) {
1354
    SDataBase *  database = pThreadInfo->dbInfo;
1✔
1355
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
1✔
1356
    char *buffer =
1357
        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
1✔
1358
    snprintf(
1✔
1359
            buffer, TSDB_MAX_ALLOWED_SQL_LEN,
1360
            g_arguments->escape_character ?
1✔
1361
                "CREATE TABLE `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s "
1362
                : "CREATE TABLE %s.%s USING %s.%s TAGS (%s) %s ",
1363
            database->dbName, childTbl->name, database->dbName,
1✔
1364
            stbInfo->stbName,
1365
            stbInfo->tagDataBuf + i * stbInfo->lenOfTags, ttl);
1✔
1366
    debugPrint("creating table: %s\n", buffer);
1✔
1367
    int ret;
1368
    if (REST_IFACE == stbInfo->iface) {
1✔
1369
        ret = queryDbExecRest(buffer,
×
1370
                              database->dbName,
1371
                              database->precision,
1372
                              stbInfo->iface,
×
1373
                              stbInfo->lineProtocol,
×
1374
                              stbInfo->tcpTransfer,
×
1375
                              pThreadInfo->sockfd);
1376
    } else {
1377
        ret = queryDbExecCall(pThreadInfo->conn, buffer);
1✔
1378
        int32_t trying = g_arguments->keep_trying;
1✔
1379
        while (ret && trying) {
1✔
1380
            infoPrint("will sleep %"PRIu32" milliseconds then "
×
1381
                      "re-create table %s\n",
1382
                      g_arguments->trying_interval, buffer);
1383
            toolsMsleep(g_arguments->trying_interval);
×
1384
            ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1385
            if (trying != -1) {
×
1386
                trying--;
×
1387
            }
1388
        }
1389
    }
1390
    tmfree(buffer);
1✔
1391

1392
    return ret;
1✔
1393
}
1394

1395
static void cleanupAndPrint(threadInfo *pThreadInfo, char *mode) {
533✔
1396
    if (pThreadInfo) {
533✔
1397
        if (pThreadInfo->json_array) {
533✔
1398
            tools_cJSON_Delete(pThreadInfo->json_array);
126✔
1399
            pThreadInfo->json_array = NULL;
126✔
1400
        }
1401
        if (0 == pThreadInfo->totalDelay) {
533✔
1402
            pThreadInfo->totalDelay = 1;
5✔
1403
        }
1404
        succPrint(
533✔
1405
            "thread[%d] %s mode, completed total inserted rows: %" PRIu64
1406
            ", %.2f records/second\n",
1407
            pThreadInfo->threadID,
1408
            mode,
1409
            pThreadInfo->totalInsertRows,
1410
            (double)(pThreadInfo->totalInsertRows /
1411
            ((double)pThreadInfo->totalDelay / 1E6)));
1412
    }
1413
}
533✔
1414

1415
static int64_t getDisorderTs(SSuperTable *stbInfo, int *disorderRange) {
100,643,796✔
1416
    int64_t disorderTs = 0;
100,643,796✔
1417
    int64_t startTimestamp = stbInfo->startTimestamp;
100,643,796✔
1418
    if (stbInfo->disorderRatio > 0) {
100,643,796✔
1419
        int rand_num = taosRandom() % 100;
320✔
1420
        if (rand_num < stbInfo->disorderRatio) {
320✔
1421
            (*disorderRange)--;
154✔
1422
            if (0 == *disorderRange) {
154✔
1423
                *disorderRange = stbInfo->disorderRange;
×
1424
            }
1425
            disorderTs = startTimestamp - *disorderRange;
154✔
1426
            debugPrint("rand_num: %d, < disorderRatio: %d, "
154✔
1427
                       "disorderTs: %"PRId64"\n",
1428
                       rand_num, stbInfo->disorderRatio,
1429
                       disorderTs);
1430
        }
1431
    }
1432
    return disorderTs;
100,424,363✔
1433
}
1434

1435
static void *syncWriteInterlace(void *sarg) {
47✔
1436
    threadInfo * pThreadInfo = (threadInfo *)sarg;
47✔
1437
    SDataBase *  database = pThreadInfo->dbInfo;
47✔
1438
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
47✔
1439
    infoPrint(
47✔
1440
              "thread[%d] start interlace inserting into table from "
1441
              "%" PRIu64 " to %" PRIu64 "\n",
1442
              pThreadInfo->threadID, pThreadInfo->start_table_from,
1443
              pThreadInfo->end_table_to);
1444

1445
    int64_t insertRows = stbInfo->insertRows;
47✔
1446
    int32_t interlaceRows = stbInfo->interlaceRows;
47✔
1447
    int64_t pos = 0;
47✔
1448
    uint32_t batchPerTblTimes = g_arguments->reqPerReq / interlaceRows;
47✔
1449
    uint64_t   lastPrintTime = toolsGetTimestampMs();
47✔
1450
    uint64_t   lastTotalInsertRows = 0;
47✔
1451
    int64_t   startTs = toolsGetTimestampUs();
47✔
1452
    int64_t   endTs;
1453
    uint64_t   tableSeq = pThreadInfo->start_table_from;
47✔
1454
    int disorderRange = stbInfo->disorderRange;
47✔
1455

1456
    while (insertRows > 0) {
323✔
1457
        int64_t tmp_total_insert_rows = 0;
277✔
1458
        uint32_t generated = 0;
277✔
1459
        if (insertRows <= interlaceRows) {
277✔
1460
            interlaceRows = insertRows;
65✔
1461
        }
1462
        for (int i = 0; i < batchPerTblTimes; i++) {
529✔
1463
            if (g_arguments->terminate) {
465✔
1464
                goto free_of_interlace;
×
1465
            }
1466
            int64_t timestamp = pThreadInfo->start_time;
465✔
1467
            SChildTable *childTbl = stbInfo->childTblArray[tableSeq];
465✔
1468
            char *  tableName =
465✔
1469
                stbInfo->childTblArray[tableSeq]->name;
465✔
1470
            char *sampleDataBuf = childTbl->useOwnSample?
930✔
1471
                                        childTbl->sampleDataBuf:
465✔
1472
                                        stbInfo->sampleDataBuf;
1473
            char ttl[SMALL_BUFF_LEN] = "";
465✔
1474
            if (stbInfo->ttl != 0) {
465✔
1475
                snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
32✔
1476
            }
1477
            switch (stbInfo->iface) {
465✔
1478
                case REST_IFACE:
67✔
1479
                case TAOSC_IFACE: {
1480
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
67✔
1481
                    if (g_arguments->escape_character) {
67✔
1482
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "`%s`",
32✔
1483
                                tableName);
1484
                    } else {
1485
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "%s",
35✔
1486
                                tableName);
1487
                    }
1488
                    if (i == 0) {
67✔
1489
                        ds_add_str(&pThreadInfo->buffer, STR_INSERT_INTO);
43✔
1490
                    }
1491
                    if (stbInfo->partialColNum == stbInfo->cols->size) {
67✔
1492
                        if (stbInfo->autoTblCreating) {
35✔
1493
                            ds_add_strs(&pThreadInfo->buffer, 8,
×
1494
                                    escapedTbName,
1495
                                    " USING `",
1496
                                    stbInfo->stbName,
1497
                                    "` TAGS (",
1498
                                    stbInfo->tagDataBuf
×
1499
                                        + stbInfo->lenOfTags * tableSeq,
×
1500
                                    ") ", ttl, " VALUES ");
1501
                        } else {
1502
                            ds_add_strs(&pThreadInfo->buffer, 2,
35✔
1503
                                    escapedTbName, " VALUES ");
1504
                        }
1505
                    } else {
1506
                        if (stbInfo->autoTblCreating) {
32✔
1507
                            ds_add_strs(&pThreadInfo->buffer, 10,
32✔
1508
                                        escapedTbName,
1509
                                        " (",
1510
                                        stbInfo->partialColNameBuf,
1511
                                        ") USING `",
1512
                                        stbInfo->stbName,
1513
                                        "` TAGS (",
1514
                                        stbInfo->tagDataBuf
32✔
1515
                                        + stbInfo->lenOfTags * tableSeq,
32✔
1516
                                        ") ", ttl, " VALUES ");
1517
                        } else {
1518
                            ds_add_strs(&pThreadInfo->buffer, 4,
×
1519
                                        escapedTbName,
1520
                                        "(",
1521
                                        stbInfo->partialColNameBuf,
1522
                                        ") VALUES ");
1523
                        }
1524
                    }
1525

1526
                    for (int64_t j = 0; j < interlaceRows; j++) {
268✔
1527
                        int64_t disorderTs = getDisorderTs(stbInfo,
201✔
1528
                                &disorderRange);
1529
                        char time_string[BIGINT_BUFF_LEN];
1530
                        snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"",
201✔
1531
                                disorderTs?disorderTs:timestamp);
1532
                        ds_add_strs(&pThreadInfo->buffer, 5,
201✔
1533
                                    "(",
1534
                                    time_string,
1535
                                    ",",
1536
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
201✔
1537
                                    ") ");
1538
                        if (ds_len(pThreadInfo->buffer)
201✔
1539
                                > stbInfo->max_sql_len) {
201✔
1540
                            errorPrint("sql buffer length (%"PRIu64") "
×
1541
                                    "is larger than max sql length "
1542
                                    "(%"PRId64")\n",
1543
                                    ds_len(pThreadInfo->buffer),
1544
                                    stbInfo->max_sql_len);
1545
                            goto free_of_interlace;
×
1546
                        }
1547
                        generated++;
201✔
1548
                        pos++;
201✔
1549
                        if (pos >= g_arguments->prepared_rand) {
201✔
1550
                            pos = 0;
1✔
1551
                        }
1552
                        timestamp += stbInfo->timestamp_step;
201✔
1553
                    }
1554
                    break;
67✔
1555
                }
1556
                case STMT_IFACE: {
40✔
1557
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
40✔
1558
                    if (g_arguments->escape_character) {
40✔
1559
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2,
×
1560
                                "`%s`", tableName);
1561
                    } else {
1562
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
40✔
1563
                                tableName);
1564
                    }
1565
                    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
40✔
1566
                                             escapedTbName)) {
1567
                        errorPrint(
×
1568
                            "taos_stmt_set_tbname(%s) failed, reason: %s\n",
1569
                            tableName,
1570
                                taos_stmt_errstr(pThreadInfo->conn->stmt));
1571
                        g_fail = true;
×
1572
                        goto free_of_interlace;
×
1573
                    }
1574
                    generated =
1575
                        bindParamBatch(pThreadInfo, interlaceRows,
40✔
1576
                                       timestamp, childTbl);
1577
                    break;
40✔
1578
                }
1579
                case SML_REST_IFACE:
358✔
1580
                case SML_IFACE: {
1581
                    int protocol = stbInfo->lineProtocol;
358✔
1582
                    for (int64_t j = 0; j < interlaceRows; j++) {
1,674✔
1583
                        int64_t disorderTs = getDisorderTs(stbInfo,
1,316✔
1584
                                &disorderRange);
1585
                        if (TSDB_SML_JSON_PROTOCOL == protocol) {
1,316✔
1586
                            tools_cJSON *tag = tools_cJSON_Duplicate(
480✔
1587
                                tools_cJSON_GetArrayItem(
480✔
1588
                                    pThreadInfo->sml_json_tags,
480✔
1589
                                    (int)tableSeq -
480✔
1590
                                        pThreadInfo->start_table_from),
480✔
1591
                                    true);
1592
                            generateSmlJsonCols(
480✔
1593
                                pThreadInfo->json_array, tag, stbInfo,
1594
                                database->sml_precision,
480✔
1595
                                    disorderTs?disorderTs:timestamp);
1596
                        } else if (SML_JSON_TAOS_FORMAT == protocol) {
836✔
1597
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
1598
                                tools_cJSON_GetArrayItem(
×
1599
                                    pThreadInfo->sml_json_tags,
×
1600
                                    (int)tableSeq -
×
1601
                                        pThreadInfo->start_table_from),
×
1602
                                    true);
1603
                            generateSmlTaosJsonCols(
×
1604
                                pThreadInfo->json_array, tag, stbInfo,
1605
                                database->sml_precision,
×
1606
                                disorderTs?disorderTs:timestamp);
1607
                        } else if (TSDB_SML_LINE_PROTOCOL == protocol) {
836✔
1608
                            snprintf(
356✔
1609
                                pThreadInfo->lines[generated],
356✔
1610
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
356✔
1611
                                "%s %s %" PRId64 "",
1612
                                pThreadInfo
1613
                                    ->sml_tags[(int)tableSeq -
356✔
1614
                                               pThreadInfo->start_table_from],
356✔
1615
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
356✔
1616
                                disorderTs?disorderTs:timestamp);
1617
                        } else {
1618
                            snprintf(
480✔
1619
                                pThreadInfo->lines[generated],
480✔
1620
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
480✔
1621
                                "%s %" PRId64 " %s %s", stbInfo->stbName,
1622
                                disorderTs?disorderTs:timestamp,
1623
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
480✔
1624
                                pThreadInfo
1625
                                    ->sml_tags[(int)tableSeq -
480✔
1626
                                               pThreadInfo->start_table_from]);
480✔
1627
                        }
1628
                        generated++;
1,316✔
1629
                        timestamp += stbInfo->timestamp_step;
1,316✔
1630
                    }
1631
                    if (TSDB_SML_JSON_PROTOCOL == protocol
358✔
1632
                            || SML_JSON_TAOS_FORMAT == protocol) {
190✔
1633
                        pThreadInfo->lines[0] =
168✔
1634
                            tools_cJSON_PrintUnformatted(
168✔
1635
                                pThreadInfo->json_array);
168✔
1636
                    }
1637
                    break;
358✔
1638
                }
1639
            }
1640
            tableSeq++;
465✔
1641
            tmp_total_insert_rows += interlaceRows;
465✔
1642
            if (tableSeq > pThreadInfo->end_table_to) {
465✔
1643
                tableSeq = pThreadInfo->start_table_from;
213✔
1644
                pThreadInfo->start_time +=
213✔
1645
                    interlaceRows * stbInfo->timestamp_step;
213✔
1646
                if (!stbInfo->non_stop) {
213✔
1647
                    insertRows -= interlaceRows;
213✔
1648
                }
1649
                if (stbInfo->insert_interval > 0) {
213✔
1650
                    debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
35✔
1651
                          __func__, __LINE__, stbInfo->insert_interval);
1652
                    perfPrint("sleep %" PRIu64 " ms\n",
35✔
1653
                                     stbInfo->insert_interval);
1654
                    toolsMsleep((int32_t)stbInfo->insert_interval);
35✔
1655
                }
1656
                break;
213✔
1657
            }
1658
        }
1659

1660
        startTs = toolsGetTimestampUs();
277✔
1661
        if (execInsert(pThreadInfo, generated)) {
277✔
1662
            g_fail = true;
×
1663
            goto free_of_interlace;
×
1664
        }
1665
        endTs = toolsGetTimestampUs();
277✔
1666

1667
        pThreadInfo->totalInsertRows += tmp_total_insert_rows;
277✔
1668

1669
        if (g_arguments->terminate) {
277✔
1670
            goto free_of_interlace;
1✔
1671
        }
1672

1673
        int protocol = stbInfo->lineProtocol;
276✔
1674
        switch (stbInfo->iface) {
276✔
1675
            case TAOSC_IFACE:
42✔
1676
            case REST_IFACE:
1677
                debugPrint("pThreadInfo->buffer: %s\n",
42✔
1678
                           pThreadInfo->buffer);
1679
                free_ds(&pThreadInfo->buffer);
42✔
1680
                pThreadInfo->buffer = new_ds(0);
42✔
1681
                break;
42✔
1682
            case SML_REST_IFACE:
92✔
1683
                memset(pThreadInfo->buffer, 0,
92✔
1684
                       g_arguments->reqPerReq * (pThreadInfo->max_sql_len + 1));
92✔
1685
            case SML_IFACE:
202✔
1686
                if (TSDB_SML_JSON_PROTOCOL == protocol
202✔
1687
                        || SML_JSON_TAOS_FORMAT == protocol) {
118✔
1688
                    debugPrint("pThreadInfo->lines[0]: %s\n",
84✔
1689
                               pThreadInfo->lines[0]);
1690
                    if (pThreadInfo->json_array && !g_arguments->terminate) {
84✔
1691
                        tools_cJSON_Delete(pThreadInfo->json_array);
84✔
1692
                        pThreadInfo->json_array = NULL;
84✔
1693
                    }
1694
                    pThreadInfo->json_array = tools_cJSON_CreateArray();
84✔
1695
                    if (pThreadInfo->lines && pThreadInfo->lines[0]) {
84✔
1696
                        tmfree(pThreadInfo->lines[0]);
84✔
1697
                        pThreadInfo->lines[0] = NULL;
84✔
1698
                    }
1699
                } else {
1700
                    for (int j = 0; j < generated; j++) {
954✔
1701
                        if (pThreadInfo && pThreadInfo->lines
836✔
1702
                                && !g_arguments->terminate) {
836✔
1703
                            debugPrint("pThreadInfo->lines[%d]: %s\n", j,
836✔
1704
                                       pThreadInfo->lines[j]);
1705
                            memset(pThreadInfo->lines[j], 0,
836✔
1706
                                   pThreadInfo->max_sql_len);
1707
                        }
1708
                    }
1709
                }
1710
                break;
202✔
1711
            case STMT_IFACE:
32✔
1712
                break;
32✔
1713
        }
1714

1715
        int64_t delay = endTs - startTs;
276✔
1716
        if (delay <=0) {
276✔
1717
            debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
×
1718
                       pThreadInfo->threadID, startTs, endTs);
1719
        } else {
1720
            perfPrint("insert execution time is %10.2f ms\n",
276✔
1721
                      delay / 1E6);
1722

1723
            int64_t * pdelay = benchCalloc(1, sizeof(int64_t), false);
276✔
1724
            *pdelay = delay;
276✔
1725
            if (benchArrayPush(pThreadInfo->delayList, pdelay) == NULL) {
276✔
1726
                tmfree(pdelay);
×
1727
            }
1728
            pThreadInfo->totalDelay += delay;
276✔
1729
        }
1730

1731
        int64_t currentPrintTime = toolsGetTimestampMs();
276✔
1732
        if (currentPrintTime - lastPrintTime > 30 * 1000) {
276✔
1733
            infoPrint(
×
1734
                    "thread[%d] has currently inserted rows: %" PRIu64
1735
                    ", peroid insert rate: %.3f rows/s \n",
1736
                    pThreadInfo->threadID, pThreadInfo->totalInsertRows, 
1737
                    (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
1738
            lastPrintTime = currentPrintTime;
×
1739
            lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
1740
        }
1741
    }
1742
free_of_interlace:
46✔
1743
    cleanupAndPrint(pThreadInfo, "interlace");
47✔
1744
    return NULL;
47✔
1745
}
1746

1747
static int32_t prepareProgressDataStmt(
224✔
1748
        threadInfo *pThreadInfo,
1749
        SChildTable *childTbl,
1750
        int64_t *timestamp, uint64_t i, char *ttl) {
1751
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
224✔
1752
    char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
224✔
1753
    if (g_arguments->escape_character) {
224✔
1754
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN + 2,
32✔
1755
                 "`%s`", childTbl->name);
32✔
1756
    } else {
1757
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
192✔
1758
                 childTbl->name);
192✔
1759
    }
1760
    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
224✔
1761
                             escapedTbName)) {
1762
        errorPrint(
×
1763
                "taos_stmt_set_tbname(%s) failed,"
1764
                "reason: %s\n", escapedTbName,
1765
                taos_stmt_errstr(pThreadInfo->conn->stmt));
1766
        return -1;
×
1767
    }
1768
    int32_t generated = bindParamBatch(
448✔
1769
            pThreadInfo,
1770
            (g_arguments->reqPerReq > (stbInfo->insertRows - i))
224✔
1771
                ? (stbInfo->insertRows - i)
1772
                : g_arguments->reqPerReq,
224✔
1773
            *timestamp, childTbl);
1774
    *timestamp += generated * stbInfo->timestamp_step;
224✔
1775
    return generated;
224✔
1776
}
1777

1778
static void makeTimestampDisorder(
×
1779
        int64_t *timestamp, SSuperTable *stbInfo) {
1780
    int64_t startTimestamp = stbInfo->startTimestamp;
×
1781
    int disorderRange = stbInfo->disorderRange;
×
1782
    int rand_num = taosRandom() % 100;
×
1783
    if (rand_num < stbInfo->disorderRatio) {
×
1784
        disorderRange--;
×
1785
        if (0 == disorderRange) {
×
1786
            disorderRange = stbInfo->disorderRange;
×
1787
        }
1788
        *timestamp = startTimestamp - disorderRange;
×
1789
        debugPrint("rand_num: %d, < disorderRatio: %d"
×
1790
                   ", ts: %"PRId64"\n",
1791
                   rand_num,
1792
                   stbInfo->disorderRatio,
1793
                   *timestamp);
1794
    }
1795
}
×
1796

1797
static int32_t prepareProgressDataSmlJsonText(
291✔
1798
    threadInfo *pThreadInfo,
1799
    uint64_t tableSeq,
1800
    int64_t *timestamp, uint64_t i, char *ttl) {
1801
    // prepareProgressDataSmlJsonText
1802
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
291✔
1803
    int32_t generated = 0;
291✔
1804

1805
    int len = 0;
291✔
1806

1807
    char *line = pThreadInfo->lines[0];
291✔
1808
    uint32_t line_buf_len = pThreadInfo->line_buf_len;
291✔
1809

1810
    strncat(line + len, "[", 2);
291✔
1811
    len += 1;
291✔
1812

1813
    int32_t pos = 0;
291✔
1814
    for (int j = 0; (j < g_arguments->reqPerReq)
291✔
1815
            && !g_arguments->terminate; j++) {
3,036✔
1816
        strncat(line + len, "{", 2);
2,892✔
1817
        len += 1;
2,892✔
1818
        int n;
1819
        n = snprintf(line + len, line_buf_len - len,
2,892✔
1820
                 "\"timestamp\":%"PRId64",", *timestamp);
1821
        if (n < 0 || n >= line_buf_len - len) {
2,892✔
1822
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1823
                       __func__, __LINE__, j);
1824
            return -1;
×
1825
        } else {
1826
            len += n;
2,892✔
1827
        }
1828

1829
        n = snprintf(line + len, line_buf_len - len, "%s",
2,892✔
1830
                        pThreadInfo->sml_json_value_array[tableSeq]);
2,892✔
1831
        if (n < 0 || n >= line_buf_len - len) {
2,892✔
1832
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1833
                       __func__, __LINE__, j);
1834
            return -1;
×
1835
        } else {
1836
            len += n;
2,892✔
1837
        }
1838
        n = snprintf(line + len, line_buf_len - len, "\"tags\":%s,",
2,892✔
1839
                       pThreadInfo->sml_tags_json_array[tableSeq]);
2,892✔
1840
        if (n < 0 || n >= line_buf_len - len) {
2,892✔
1841
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1842
                       __func__, __LINE__, j);
1843
            return -1;
×
1844
        } else {
1845
            len += n;
2,892✔
1846
        }
1847
        n = snprintf(line + len, line_buf_len - len,
2,892✔
1848
                       "\"metric\":\"%s\"}", stbInfo->stbName);
1849
        if (n < 0 || n >= line_buf_len - len) {
2,892✔
1850
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1851
                       __func__, __LINE__, j);
1852
            return -1;
×
1853
        } else {
1854
            len += n;
2,892✔
1855
        }
1856

1857
        pos++;
2,892✔
1858
        if (pos >= g_arguments->prepared_rand) {
2,892✔
1859
            pos = 0;
289✔
1860
        }
1861
        *timestamp += stbInfo->timestamp_step;
2,892✔
1862
        if (stbInfo->disorderRatio > 0) {
2,892✔
1863
            makeTimestampDisorder(timestamp, stbInfo);
×
1864
        }
1865
        generated++;
2,892✔
1866
        if (i + generated >= stbInfo->insertRows) {
2,892✔
1867
            break;
147✔
1868
        }
1869
        if ((j+1) < g_arguments->reqPerReq) {
2,745✔
1870
            strncat(line + len, ",", 2);
2,601✔
1871
            len += 1;
2,601✔
1872
        }
1873
    }
1874
    strncat(line + len, "]", 2);
291✔
1875

1876
    debugPrint("%s() LN%d, lines[0]: %s\n",
291✔
1877
               __func__, __LINE__, pThreadInfo->lines[0]);
1878
    return generated;
291✔
1879
}
1880

1881
static int32_t prepareProgressDataSmlJson(
147✔
1882
    threadInfo *pThreadInfo,
1883
    uint64_t tableSeq,
1884
    int64_t *timestamp, uint64_t i, char *ttl) {
1885
    // prepareProgressDataSmlJson
1886
    SDataBase *  database = pThreadInfo->dbInfo;
147✔
1887
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
147✔
1888
    int32_t generated = 0;
147✔
1889

1890
    int32_t pos = 0;
147✔
1891
    int protocol = stbInfo->lineProtocol;
147✔
1892
    for (int j = 0; (j < g_arguments->reqPerReq)
147✔
1893
            && !g_arguments->terminate; j++) {
1,524✔
1894
        tools_cJSON *tag = tools_cJSON_Duplicate(
1,452✔
1895
                tools_cJSON_GetArrayItem(
1,452✔
1896
                    pThreadInfo->sml_json_tags,
1,452✔
1897
                    (int)tableSeq -
1,452✔
1898
                    pThreadInfo->start_table_from),
1,452✔
1899
                true);
1900
        debugPrintJsonNoTime(tag);
1,452✔
1901
        if (TSDB_SML_JSON_PROTOCOL == protocol) {
1,452✔
1902
            generateSmlJsonCols(
×
1903
                    pThreadInfo->json_array, tag, stbInfo,
1904
                    database->sml_precision, *timestamp);
×
1905
        } else {
1906
            generateSmlTaosJsonCols(
1,452✔
1907
                    pThreadInfo->json_array, tag, stbInfo,
1908
                    database->sml_precision, *timestamp);
1,452✔
1909
        }
1910
        pos++;
1,452✔
1911
        if (pos >= g_arguments->prepared_rand) {
1,452✔
1912
            pos = 0;
145✔
1913
        }
1914
        *timestamp += stbInfo->timestamp_step;
1,452✔
1915
        if (stbInfo->disorderRatio > 0) {
1,452✔
1916
            makeTimestampDisorder(timestamp, stbInfo);
×
1917
        }
1918
        generated++;
1,452✔
1919
        if (i + generated >= stbInfo->insertRows) {
1,452✔
1920
            break;
75✔
1921
        }
1922
    }
1923

1924
    tmfree(pThreadInfo->lines[0]);
147✔
1925
    pThreadInfo->lines[0] = NULL;
147✔
1926
    pThreadInfo->lines[0] =
294✔
1927
            tools_cJSON_PrintUnformatted(
147✔
1928
                pThreadInfo->json_array);
147✔
1929
    debugPrint("pThreadInfo->lines[0]: %s\n",
147✔
1930
                   pThreadInfo->lines[0]);
1931

1932
    return generated;
147✔
1933
}
1934

1935
static int32_t prepareProgressDataSmlLineOrTelnet(
295✔
1936
    threadInfo *pThreadInfo, uint64_t tableSeq, char *sampleDataBuf,
1937
    int64_t *timestamp, uint64_t i, char *ttl, int protocol) {
1938
    // prepareProgressDataSmlLine
1939
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
295✔
1940
    int32_t generated = 0;
295✔
1941

1942
    int32_t pos = 0;
295✔
1943
    for (int j = 0; (j < g_arguments->reqPerReq)
295✔
1944
            && !g_arguments->terminate; j++) {
3,038✔
1945
        if (TSDB_SML_LINE_PROTOCOL == protocol) {
2,893✔
1946
            snprintf(
484✔
1947
                    pThreadInfo->lines[j],
484✔
1948
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
484✔
1949
                    "%s %s %" PRId64 "",
1950
                    pThreadInfo->sml_tags[tableSeq
484✔
1951
                    - pThreadInfo->start_table_from],
484✔
1952
                    sampleDataBuf + pos * stbInfo->lenOfCols,
484✔
1953
                    *timestamp);
1954
        } else {
1955
            snprintf(
2,409✔
1956
                    pThreadInfo->lines[j],
2,409✔
1957
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
2,409✔
1958
                    "%s %" PRId64 " %s %s", stbInfo->stbName,
1959
                    *timestamp,
1960
                    sampleDataBuf
1961
                    + pos * stbInfo->lenOfCols,
2,409✔
1962
                    pThreadInfo->sml_tags[tableSeq
2,409✔
1963
                    -pThreadInfo->start_table_from]);
2,409✔
1964
        }
1965
        pos++;
2,893✔
1966
        if (pos >= g_arguments->prepared_rand) {
2,893✔
1967
            pos = 0;
273✔
1968
        }
1969
        *timestamp += stbInfo->timestamp_step;
2,893✔
1970
        if (stbInfo->disorderRatio > 0) {
2,893✔
1971
            makeTimestampDisorder(timestamp, stbInfo);
×
1972
        }
1973
        generated++;
2,894✔
1974
        if (i + generated >= stbInfo->insertRows) {
2,894✔
1975
            break;
151✔
1976
        }
1977
    }
1978
    return generated;
296✔
1979
}
1980

1981
static int32_t prepareProgressDataSml(
733✔
1982
    threadInfo *pThreadInfo,
1983
    SChildTable *childTbl,
1984
    uint64_t tableSeq,
1985
    int64_t *timestamp, uint64_t i, char *ttl) {
1986
    // prepareProgressDataSml
1987
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
733✔
1988

1989
    char *sampleDataBuf;
1990
    if (childTbl->useOwnSample) {
733✔
1991
        sampleDataBuf = childTbl->sampleDataBuf;
×
1992
    } else {
1993
        sampleDataBuf = stbInfo->sampleDataBuf;
733✔
1994
    }
1995
    int protocol = stbInfo->lineProtocol;
733✔
1996
    int32_t generated = -1;
733✔
1997
    switch (protocol) {
733✔
1998
        case TSDB_SML_LINE_PROTOCOL:
295✔
1999
        case TSDB_SML_TELNET_PROTOCOL:
2000
            generated = prepareProgressDataSmlLineOrTelnet(
295✔
2001
                    pThreadInfo,
2002
                    tableSeq,
2003
                    sampleDataBuf,
2004
                    timestamp, i, ttl, protocol);
2005
            break;
295✔
2006
        case TSDB_SML_JSON_PROTOCOL:
291✔
2007
            generated = prepareProgressDataSmlJsonText(
291✔
2008
                    pThreadInfo,
2009
                    tableSeq - pThreadInfo->start_table_from,
291✔
2010
                timestamp, i, ttl);
2011
            break;
291✔
2012
        case SML_JSON_TAOS_FORMAT:
147✔
2013
            generated = prepareProgressDataSmlJson(
147✔
2014
                    pThreadInfo,
2015
                    tableSeq,
2016
                    timestamp, i, ttl);
2017
            break;
147✔
2018
        default:
×
2019
            errorPrint("%s() LN%d: unknown protcolor: %d\n",
×
2020
                       __func__, __LINE__, protocol);
2021
            break;
×
2022
    }
2023

2024
    return generated;
733✔
2025
}
2026

2027
static int32_t prepareProgressDataSql(
54,780✔
2028
    threadInfo *pThreadInfo,
2029
    SChildTable *childTbl, uint64_t tableSeq,
2030
    char *sampleDataBuf,
2031
    int64_t *timestamp, uint64_t i, char *ttl,
2032
    int32_t *pos, uint64_t *len) {
2033
    // prepareProgressDataSql
2034
    int32_t generated = 0;
54,780✔
2035
    SDataBase *database = pThreadInfo->dbInfo;
54,780✔
2036
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
54,780✔
2037
    char *  pstr = pThreadInfo->buffer;
54,780✔
2038
    int disorderRange = stbInfo->disorderRange;
54,780✔
2039

2040
    if (stbInfo->partialColNum == stbInfo->cols->size) {
54,780✔
2041
        if (stbInfo->autoTblCreating) {
54,706✔
2042
            *len =
60✔
2043
                snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
60✔
2044
                        g_arguments->escape_character
60✔
2045
                        ? "%s `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s VALUES "
2046
                        : "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ",
2047
                         STR_INSERT_INTO, database->dbName,
2048
                         childTbl->name, database->dbName,
60✔
2049
                         stbInfo->stbName,
2050
                         stbInfo->tagDataBuf +
60✔
2051
                         stbInfo->lenOfTags * tableSeq, ttl);
60✔
2052
        } else {
2053
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
54,646✔
2054
                    g_arguments->escape_character
54,646✔
2055
                           ? "%s `%s`.`%s` VALUES "
2056
                           : "%s %s.%s VALUES ",
2057
                           STR_INSERT_INTO,
2058
                           database->dbName, childTbl->name);
54,646✔
2059
        }
2060
    } else {
2061
        if (stbInfo->autoTblCreating) {
74✔
2062
            *len = snprintf(
16✔
2063
                    pstr, TSDB_MAX_ALLOWED_SQL_LEN,
2064
                    g_arguments->escape_character
16✔
2065
                    ? "%s `%s`.`%s` (%s) USING `%s`.`%s` TAGS (%s) %s VALUES "
2066
                    : "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ",
2067
                    STR_INSERT_INTO, database->dbName,
2068
                    childTbl->name,
16✔
2069
                    stbInfo->partialColNameBuf,
2070
                    database->dbName, stbInfo->stbName,
2071
                    stbInfo->tagDataBuf +
16✔
2072
                    stbInfo->lenOfTags * tableSeq, ttl);
16✔
2073
        } else {
2074
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
58✔
2075
                    g_arguments->escape_character
58✔
2076
                    ? "%s `%s`.`%s` (%s) VALUES "
2077
                    : "%s %s.%s (%s) VALUES ",
2078
                    STR_INSERT_INTO, database->dbName,
2079
                    childTbl->name,
58✔
2080
                    stbInfo->partialColNameBuf);
2081
        }
2082
    }
2083

2084
    char *ownSampleDataBuf;
2085
    if (childTbl->useOwnSample) {
54,780✔
2086
        debugPrint("%s is using own sample data\n",
10,140✔
2087
                  childTbl->name);
2088
        ownSampleDataBuf = childTbl->sampleDataBuf;
10,141✔
2089
    } else {
2090
        ownSampleDataBuf = stbInfo->sampleDataBuf;
44,640✔
2091
    }
2092
    for (int j = 0; j < g_arguments->reqPerReq; j++) {
100,400,579✔
2093
        if (stbInfo->useSampleTs
101,009,356✔
2094
                && (!stbInfo->random_data_source)) {
104✔
2095
            *len +=
104✔
2096
                snprintf(pstr + *len,
104✔
2097
                         TSDB_MAX_ALLOWED_SQL_LEN - *len, "(%s)",
104✔
2098
                         sampleDataBuf +
2099
                         *pos * stbInfo->lenOfCols);
104✔
2100
        } else {
2101
            int64_t disorderTs = getDisorderTs(stbInfo, &disorderRange);
101,009,252✔
2102
            *len += snprintf(pstr + *len,
100,380,505✔
2103
                            TSDB_MAX_ALLOWED_SQL_LEN - *len,
100,380,505✔
2104
                            "(%" PRId64 ",%s)",
2105
                            disorderTs?disorderTs:*timestamp,
2106
                            ownSampleDataBuf +
2107
                            *pos * stbInfo->lenOfCols);
100,380,505✔
2108
        }
2109
        *pos += 1;
100,380,609✔
2110
        if (*pos >= g_arguments->prepared_rand) {
100,380,609✔
2111
            *pos = 0;
1,750,278✔
2112
        }
2113
        *timestamp += stbInfo->timestamp_step;
100,380,609✔
2114
        generated++;
100,380,609✔
2115
        if (*len > (TSDB_MAX_ALLOWED_SQL_LEN
100,380,609✔
2116
            - stbInfo->lenOfCols)) {
100,380,609✔
2117
            break;
23,783✔
2118
        }
2119
        if (i + generated >= stbInfo->insertRows) {
100,356,826✔
2120
            break;
11,028✔
2121
        }
2122
    }
2123

2124
    return generated;
×
2125
}
2126

2127
void *syncWriteProgressive(void *sarg) {
492✔
2128
    threadInfo * pThreadInfo = (threadInfo *)sarg;
492✔
2129
    SDataBase *  database = pThreadInfo->dbInfo;
492✔
2130
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
492✔
2131

2132
    // special deal flow for TAOSC_IFACE
2133
    if (insertDataMix(pThreadInfo, database, stbInfo)) {
492✔
2134
        // request be dealt by this function , so return
2135
        return NULL;
6✔
2136
    }
2137

2138
#ifdef TD_VER_COMPATIBLE_3_0_0_0
2139
    if (g_arguments->nthreads_auto) {
486✔
2140
        if (0 == pThreadInfo->vg->tbCountPerVgId) {
61✔
2141
            return NULL;
×
2142
        }
2143
    } else {
2144
        infoPrint(
425✔
2145
            "thread[%d] start progressive inserting into table from "
2146
            "%" PRIu64 " to %" PRIu64 "\n",
2147
            pThreadInfo->threadID, pThreadInfo->start_table_from,
2148
            pThreadInfo->end_table_to + 1);
2149
    }
2150
#else
2151
    infoPrint(
2152
            "thread[%d] start progressive inserting into table from "
2153
            "%" PRIu64 " to %" PRIu64 "\n",
2154
            pThreadInfo->threadID, pThreadInfo->start_table_from,
2155
            pThreadInfo->end_table_to + 1);
2156
#endif
2157
    uint64_t  lastPrintTime = toolsGetTimestampMs();
486✔
2158
    uint64_t  lastTotalInsertRows = 0;
486✔
2159
    int64_t   startTs = toolsGetTimestampUs();
486✔
2160
    int64_t   endTs;
2161

2162
    for (uint64_t tableSeq = pThreadInfo->start_table_from;
486✔
2163
            tableSeq <= pThreadInfo->end_table_to; tableSeq++) {
12,082✔
2164
        char *sampleDataBuf;
2165
        SChildTable *childTbl;
2166
#ifdef TD_VER_COMPATIBLE_3_0_0_0
2167
        if (g_arguments->nthreads_auto) {
11,603✔
2168
            childTbl = pThreadInfo->vg->childTblArray[tableSeq];
10,344✔
2169
        } else {
2170
            childTbl = stbInfo->childTblArray[
1,259✔
2171
                stbInfo->childTblExists?
1,259✔
2172
                tableSeq:
2173
                stbInfo->childTblFrom + tableSeq];
1,142✔
2174
        }
2175
#else
2176
        childTbl = stbInfo->childTblArray[
2177
                stbInfo->childTblExists?
2178
                tableSeq:
2179
                stbInfo->childTblFrom + tableSeq];
2180
#endif
2181
        if (childTbl->useOwnSample) {
11,603✔
2182
            sampleDataBuf = childTbl->sampleDataBuf;
10,137✔
2183
        } else {
2184
            sampleDataBuf = stbInfo->sampleDataBuf;
1,466✔
2185
        }
2186

2187
        int64_t  timestamp = pThreadInfo->start_time;
11,603✔
2188
        uint64_t len = 0;
11,603✔
2189
        int32_t pos = 0;
11,603✔
2190
        if (stbInfo->iface == STMT_IFACE && stbInfo->autoTblCreating) {
11,603✔
2191
            taos_stmt_close(pThreadInfo->conn->stmt);
16✔
2192
            pThreadInfo->conn->stmt = taos_stmt_init(pThreadInfo->conn->taos);
16✔
2193
            if (NULL == pThreadInfo->conn->stmt) {
16✔
2194
                errorPrint("taos_stmt_init() failed, reason: %s\n",
×
2195
                        taos_errstr(NULL));
2196
                g_fail = true;
×
2197
                goto free_of_progressive;
7✔
2198
            }
2199

2200
            if (prepareStmt(stbInfo, pThreadInfo->conn->stmt, tableSeq)) {
16✔
2201
                g_fail = true;
×
2202
                goto free_of_progressive;
×
2203
            }
2204
        }
2205

2206
        char ttl[SMALL_BUFF_LEN] = "";
11,603✔
2207
        if (stbInfo->ttl != 0) {
11,603✔
2208
            snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
26✔
2209
        }
2210
        for (uint64_t i = 0; i < stbInfo->insertRows;) {
55,741✔
2211
            if (g_arguments->terminate) {
55,741✔
2212
                goto free_of_progressive;
×
2213
            }
2214
            int32_t generated = 0;
55,741✔
2215
            switch (stbInfo->iface) {
55,741✔
2216
                case TAOSC_IFACE:
54,784✔
2217
                case REST_IFACE:
2218
                    generated = prepareProgressDataSql(
54,784✔
2219
                            pThreadInfo,
2220
                            childTbl,
2221
                            tableSeq,
2222
                            sampleDataBuf,
2223
                            &timestamp, i, ttl, &pos, &len);
2224
                    break;
54,784✔
2225
                case STMT_IFACE: {
224✔
2226
                    generated = prepareProgressDataStmt(
224✔
2227
                            pThreadInfo,
2228
                            childTbl, &timestamp, i, ttl);
2229
                    break;
224✔
2230
                }
2231
                case SML_REST_IFACE:
733✔
2232
                case SML_IFACE:
2233
                    generated = prepareProgressDataSml(
733✔
2234
                            pThreadInfo,
2235
                            childTbl,
2236
                            tableSeq, &timestamp, i, ttl);
2237
                    break;
733✔
2238
                default:
×
2239
                    break;
×
2240
            }
2241
            if (generated < 0) {
55,741✔
2242
                g_fail = true;
×
2243
                goto free_of_progressive;
×
2244
            }
2245
            if (!stbInfo->non_stop) {
55,741✔
2246
                i += generated;
55,741✔
2247
            }
2248
            // only measure insert
2249
            startTs = toolsGetTimestampUs();
55,741✔
2250
            int code = execInsert(pThreadInfo, generated);
55,741✔
2251
            if (code) {
55,740✔
2252
                if (NO_IF_FAILED == stbInfo->continueIfFail) {
9✔
2253
                    warnPrint("The super table parameter "
7✔
2254
                              "continueIfFail: %d, STOP insertion!\n",
2255
                              stbInfo->continueIfFail);
2256
                    g_fail = true;
7✔
2257
                    goto free_of_progressive;
7✔
2258
                } else if (YES_IF_FAILED == stbInfo->continueIfFail) {
2✔
2259
                    infoPrint("The super table parameter "
1✔
2260
                              "continueIfFail: %d, "
2261
                              "will continue to insert ..\n",
2262
                              stbInfo->continueIfFail);
2263
                } else if (SMART_IF_FAILED == stbInfo->continueIfFail) {
1✔
2264
                    warnPrint("The super table parameter "
1✔
2265
                              "continueIfFail: %d, will create table "
2266
                              "then insert ..\n",
2267
                              stbInfo->continueIfFail);
2268
                    int ret = smartContinueIfFail(
1✔
2269
                            pThreadInfo,
2270
                            childTbl, i, ttl);
2271
                    if (0 != ret) {
1✔
2272
                        g_fail = true;
×
2273
                        goto free_of_progressive;
×
2274
                    }
2275

2276
                    code = execInsert(pThreadInfo, generated);
1✔
2277
                    if (code) {
1✔
2278
                        g_fail = true;
×
2279
                        goto free_of_progressive;
×
2280
                    }
2281
                } else {
2282
                    warnPrint("Unknown super table parameter "
×
2283
                              "continueIfFail: %d\n",
2284
                              stbInfo->continueIfFail);
2285
                    g_fail = true;
×
2286
                    goto free_of_progressive;
×
2287
                }
2288
            }
2289
            endTs = toolsGetTimestampUs()+1;
55,733✔
2290

2291
            if (stbInfo->insert_interval > 0) {
55,729✔
2292
                debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
30✔
2293
                          __func__, __LINE__, stbInfo->insert_interval);
2294
                perfPrint("sleep %" PRIu64 " ms\n",
30✔
2295
                              stbInfo->insert_interval);
2296
                toolsMsleep((int32_t)stbInfo->insert_interval);
30✔
2297
            }
2298

2299
            pThreadInfo->totalInsertRows += generated;
55,730✔
2300

2301
            if (g_arguments->terminate) {
55,730✔
2302
                goto free_of_progressive;
×
2303
            }
2304
            int protocol = stbInfo->lineProtocol;
55,730✔
2305
            switch (stbInfo->iface) {
55,730✔
2306
                case REST_IFACE:
54,776✔
2307
                case TAOSC_IFACE:
2308
                    memset(pThreadInfo->buffer, 0, pThreadInfo->max_sql_len);
54,776✔
2309
                    break;
54,776✔
2310
                case SML_REST_IFACE:
69✔
2311
                    memset(pThreadInfo->buffer, 0,
69✔
2312
                           g_arguments->reqPerReq *
69✔
2313
                               (pThreadInfo->max_sql_len + 1));
69✔
2314
                case SML_IFACE:
733✔
2315
                    if (TSDB_SML_JSON_PROTOCOL == protocol) {
733✔
2316
                        memset(pThreadInfo->lines[0], 0,
291✔
2317
                           pThreadInfo->line_buf_len);
291✔
2318
                    } else if (SML_JSON_TAOS_FORMAT == protocol) {
442✔
2319
                        if (pThreadInfo->lines && pThreadInfo->lines[0]) {
147✔
2320
                            tmfree(pThreadInfo->lines[0]);
147✔
2321
                            pThreadInfo->lines[0] = NULL;
147✔
2322
                        }
2323
                        if (pThreadInfo->json_array) {
147✔
2324
                            tools_cJSON_Delete(pThreadInfo->json_array);
147✔
2325
                            pThreadInfo->json_array = NULL;
147✔
2326
                        }
2327
                        pThreadInfo->json_array = tools_cJSON_CreateArray();
147✔
2328
                    } else {
2329
                        for (int j = 0; j < generated; j++) {
3,191✔
2330
                            debugPrint("pThreadInfo->lines[%d]: %s\n",
2,896✔
2331
                                       j, pThreadInfo->lines[j]);
2332
                            memset(pThreadInfo->lines[j], 0,
2,896✔
2333
                                   pThreadInfo->max_sql_len);
2334
                        }
2335
                    }
2336
                    break;
733✔
2337
                case STMT_IFACE:
224✔
2338
                    break;
224✔
2339
            }
2340

2341
            int64_t delay = endTs - startTs;
55,730✔
2342
            if (delay <= 0) {
55,730✔
2343
                debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
×
2344
                        pThreadInfo->threadID, startTs, endTs);
2345
            } else {
2346
                perfPrint("insert execution time is %.6f s\n",
55,730✔
2347
                              delay / 1E6);
2348

2349
                int64_t * pDelay = benchCalloc(1, sizeof(int64_t), false);
55,730✔
2350
                *pDelay = delay;
55,735✔
2351
                if (benchArrayPush(pThreadInfo->delayList, pDelay) == NULL) {
55,735✔
2352
                    tmfree(pDelay);
×
2353
                }
2354
                pThreadInfo->totalDelay += delay;
55,722✔
2355
            }
2356

2357
            int64_t currentPrintTime = toolsGetTimestampMs();
55,722✔
2358
            if (currentPrintTime - lastPrintTime > 30 * 1000) {
55,723✔
2359
                infoPrint(
28✔
2360
                        "thread[%d] has currently inserted rows: "
2361
                        "%" PRId64 ", peroid insert rate: %.3f rows/s \n",
2362
                        pThreadInfo->threadID, pThreadInfo->totalInsertRows,
2363
                        (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
2364
                lastPrintTime = currentPrintTime;
39✔
2365
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
39✔
2366
            }
2367
            if (i >= stbInfo->insertRows) {
55,734✔
2368
                break;
11,596✔
2369
            }
2370
        }  // insertRows
2371
    }      // tableSeq
2372
free_of_progressive:
479✔
2373
    cleanupAndPrint(pThreadInfo, "progressive");
486✔
2374
    return NULL;
486✔
2375
}
2376

2377
static int initStmtDataValue(SSuperTable *stbInfo, SChildTable *childTbl) {
52✔
2378
    int32_t columnCount = stbInfo->cols->size;
52✔
2379

2380
    char *sampleDataBuf;
2381
    if (childTbl) {
52✔
2382
        sampleDataBuf = childTbl->sampleDataBuf;
10✔
2383
    } else {
2384
        sampleDataBuf = stbInfo->sampleDataBuf;
42✔
2385
    }
2386
    int64_t lenOfOneRow = stbInfo->lenOfCols;
52✔
2387

2388
    if (stbInfo->useSampleTs) {
52✔
2389
        columnCount += 1;  // for skipping first column
13✔
2390
    }
2391
    for (int i=0; i < g_arguments->prepared_rand; i++) {
201,215✔
2392
        int cursor = 0;
201,163✔
2393

2394
        for (int c = 0; c < columnCount; c++) {
3,113,022✔
2395
            char *restStr = sampleDataBuf
2,911,859✔
2396
                + lenOfOneRow * i + cursor;
2,911,859✔
2397
            int lengthOfRest = strlen(restStr);
2,911,859✔
2398

2399
            int index = 0;
2,911,859✔
2400
            for (index = 0; index < lengthOfRest; index++) {
63,249,443✔
2401
                if (restStr[index] == ',') {
63,048,166✔
2402
                    break;
2,710,582✔
2403
                }
2404
            }
2405

2406
            cursor += index + 1;  // skip ',' too
2,911,859✔
2407
            if ((0 == c) && stbInfo->useSampleTs) {
2,911,859✔
2408
                continue;
130✔
2409
            }
2410

2411
            char *tmpStr = calloc(1, index + 1);
2,911,729✔
2412
            if (NULL == tmpStr) {
2,911,729✔
2413
                errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
×
2414
                        __func__, __LINE__, index + 1);
2415
                return -1;
×
2416
            }
2417
            Field *col = benchArrayGet(stbInfo->cols,
2,911,729✔
2418
                    (stbInfo->useSampleTs?c-1:c));
2,911,729✔
2419
            char dataType = col->type;
2,911,729✔
2420

2421
            StmtData *stmtData;
2422
            if (childTbl) {
2,911,729✔
2423
                ChildField *childCol =
2424
                    benchArrayGet(childTbl->childCols,
422✔
2425
                                  (stbInfo->useSampleTs?c-1:c));
422✔
2426
                stmtData = &childCol->stmtData;
422✔
2427
            } else {
2428
                stmtData = &col->stmtData;
2,911,307✔
2429
            }
2430

2431
            strncpy(tmpStr, restStr, index);
2,911,729✔
2432

2433
            if (0 == strcmp(tmpStr, "NULL")) {
2,911,729✔
2434
                *(stmtData->is_null + i) = true;
28✔
2435
            } else {
2436
                switch (dataType) {
2,911,701✔
2437
                    case TSDB_DATA_TYPE_INT:
211,985✔
2438
                    case TSDB_DATA_TYPE_UINT:
2439
                        *((int32_t*)stmtData->data + i) = atoi(tmpStr);
211,985✔
2440
                        break;
211,985✔
2441
                    case TSDB_DATA_TYPE_FLOAT:
111,196✔
2442
                        *((float*)stmtData->data +i) = (float)atof(tmpStr);
111,196✔
2443
                        break;
111,196✔
2444
                    case TSDB_DATA_TYPE_DOUBLE:
1,310,850✔
2445
                        *((double*)stmtData->data + i) = atof(tmpStr);
1,310,850✔
2446
                        break;
1,310,850✔
2447
                    case TSDB_DATA_TYPE_TINYINT:
21,700✔
2448
                    case TSDB_DATA_TYPE_UTINYINT:
2449
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
21,700✔
2450
                        break;
21,700✔
2451
                    case TSDB_DATA_TYPE_SMALLINT:
21,700✔
2452
                    case TSDB_DATA_TYPE_USMALLINT:
2453
                        *((int16_t*)stmtData->data + i) = (int16_t)atoi(tmpStr);
21,700✔
2454
                        break;
21,700✔
2455
                    case TSDB_DATA_TYPE_BIGINT:
21,700✔
2456
                    case TSDB_DATA_TYPE_UBIGINT:
2457
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
21,700✔
2458
                        break;
21,700✔
2459
                    case TSDB_DATA_TYPE_BOOL:
10,850✔
2460
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
10,850✔
2461
                        break;
10,850✔
2462
                    case TSDB_DATA_TYPE_TIMESTAMP:
10,040✔
2463
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
10,040✔
2464
                        break;
10,040✔
2465
                    case TSDB_DATA_TYPE_BINARY:
1,191,680✔
2466
                    case TSDB_DATA_TYPE_NCHAR:
2467
                        {
2468
                            size_t tmpLen = strlen(tmpStr);
1,191,680✔
2469
                            debugPrint("%s() LN%d, index: %d, "
1,191,680✔
2470
                                    "tmpStr len: %"PRIu64", col->length: %d\n",
2471
                                    __func__, __LINE__,
2472
                                    i, (uint64_t)tmpLen, col->length);
2473
                            if (tmpLen-2 > col->length) {
1,191,680✔
2474
                                errorPrint("data length %"PRIu64" "
×
2475
                                        "is larger than column length %d\n",
2476
                                        (uint64_t)tmpLen, col->length);
2477
                            }
2478
                            if (tmpLen > 2) {
1,191,680✔
2479
                                strncpy((char *)stmtData->data
1,191,680✔
2480
                                            + i * col->length,
1,191,680✔
2481
                                        tmpStr+1,
1,191,680✔
2482
                                        min(col->length, tmpLen - 2));
1,191,680✔
2483
                            } else {
2484
                                strncpy((char *)stmtData->data
×
2485
                                            + i*col->length,
×
2486
                                        "", 1);
2487
                            }
2488
                        }
2489
                        break;
1,191,680✔
2490
                    default:
×
2491
                        break;
×
2492
                }
2493
            }
2494
            free(tmpStr);
2,911,729✔
2495
        }
2496
    }
2497
    return 0;
52✔
2498
}
2499

2500
static void initStmtData(char dataType, void **data, uint32_t length) {
490✔
2501
    char *tmpP = NULL;
490✔
2502

2503
    switch (dataType) {
490✔
2504
        case TSDB_DATA_TYPE_INT:
66✔
2505
        case TSDB_DATA_TYPE_UINT:
2506
            tmpP = calloc(1, sizeof(int) * g_arguments->prepared_rand);
66✔
2507
            assert(tmpP);
66✔
2508
            tmfree(*data);
66✔
2509
            *data = (void*)tmpP;
66✔
2510
            break;
66✔
2511

2512
        case TSDB_DATA_TYPE_TINYINT:
28✔
2513
        case TSDB_DATA_TYPE_UTINYINT:
2514
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
28✔
2515
            assert(tmpP);
28✔
2516
            tmfree(*data);
28✔
2517
            *data = (void*)tmpP;
28✔
2518
            break;
28✔
2519

2520
        case TSDB_DATA_TYPE_SMALLINT:
28✔
2521
        case TSDB_DATA_TYPE_USMALLINT:
2522
            tmpP = calloc(1, sizeof(int16_t) * g_arguments->prepared_rand);
28✔
2523
            assert(tmpP);
28✔
2524
            tmfree(*data);
28✔
2525
            *data = (void*)tmpP;
28✔
2526
            break;
28✔
2527

2528
        case TSDB_DATA_TYPE_BIGINT:
28✔
2529
        case TSDB_DATA_TYPE_UBIGINT:
2530
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
28✔
2531
            assert(tmpP);
28✔
2532
            tmfree(*data);
28✔
2533
            *data = (void*)tmpP;
28✔
2534
            break;
28✔
2535

2536
        case TSDB_DATA_TYPE_BOOL:
14✔
2537
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
14✔
2538
            assert(tmpP);
14✔
2539
            tmfree(*data);
14✔
2540
            *data = (void*)tmpP;
14✔
2541
            break;
14✔
2542

2543
        case TSDB_DATA_TYPE_FLOAT:
34✔
2544
            tmpP = calloc(1, sizeof(float) * g_arguments->prepared_rand);
34✔
2545
            assert(tmpP);
34✔
2546
            tmfree(*data);
34✔
2547
            *data = (void*)tmpP;
34✔
2548
            break;
34✔
2549

2550
        case TSDB_DATA_TYPE_DOUBLE:
144✔
2551
            tmpP = calloc(1, sizeof(double) * g_arguments->prepared_rand);
144✔
2552
            assert(tmpP);
144✔
2553
            tmfree(*data);
144✔
2554
            *data = (void*)tmpP;
144✔
2555
            break;
144✔
2556

2557
        case TSDB_DATA_TYPE_BINARY:
143✔
2558
        case TSDB_DATA_TYPE_NCHAR:
2559
            tmpP = calloc(1, g_arguments->prepared_rand * length);
143✔
2560
            assert(tmpP);
143✔
2561
            tmfree(*data);
143✔
2562
            *data = (void*)tmpP;
143✔
2563
            break;
143✔
2564

2565
        case TSDB_DATA_TYPE_TIMESTAMP:
5✔
2566
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
5✔
2567
            assert(tmpP);
5✔
2568
            tmfree(*data);
5✔
2569
            *data = (void*)tmpP;
5✔
2570
            break;
5✔
2571

2572
        default:
×
2573
            errorPrint("Unknown data type: %s\n",
×
2574
                       convertDatatypeToString(dataType));
2575
            exit(EXIT_FAILURE);
×
2576
    }
2577
}
490✔
2578

2579
static int parseBufferToStmtBatchChildTbl(SSuperTable *stbInfo,
10✔
2580
                                          SChildTable* childTbl) {
2581
    int32_t columnCount = stbInfo->cols->size;
10✔
2582

2583
    for (int c = 0; c < columnCount; c++) {
24✔
2584
        Field *col = benchArrayGet(stbInfo->cols, c);
14✔
2585
        ChildField *childCol = benchArrayGet(childTbl->childCols, c);
14✔
2586
        char dataType = col->type;
14✔
2587

2588
        char *is_null = benchCalloc(
14✔
2589
                1, sizeof(char) *g_arguments->prepared_rand, false);
14✔
2590

2591
        tmfree(childCol->stmtData.is_null);
14✔
2592
        childCol->stmtData.is_null = is_null;
14✔
2593

2594
        initStmtData(dataType, &(childCol->stmtData.data), col->length);
14✔
2595
    }
2596

2597
    return initStmtDataValue(stbInfo, childTbl);
10✔
2598
}
2599

2600
static int parseBufferToStmtBatch(SSuperTable* stbInfo) {
42✔
2601
    int32_t columnCount = stbInfo->cols->size;
42✔
2602

2603
    for (int c = 0; c < columnCount; c++) {
518✔
2604
        Field *col = benchArrayGet(stbInfo->cols, c);
476✔
2605
        char dataType = col->type;
476✔
2606

2607
        char *is_null = benchCalloc(
476✔
2608
                1, sizeof(char) *g_arguments->prepared_rand, false);
476✔
2609
        tmfree(col->stmtData.is_null);
476✔
2610
        col->stmtData.is_null = is_null;
476✔
2611

2612
        initStmtData(dataType, &(col->stmtData.data), col->length);
476✔
2613
    }
2614

2615
    return initStmtDataValue(stbInfo, NULL);
42✔
2616
}
2617

2618
static int64_t fillChildTblNameByCount(SSuperTable *stbInfo) {
203✔
2619
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
41,793✔
2620
        snprintf(stbInfo->childTblArray[i]->name,
41,590✔
2621
                 TSDB_TABLE_NAME_LEN,
2622
                 "%s%" PRIu64 "",
2623
                 stbInfo->childTblPrefix, i);
2624
        debugPrint("%s(): %s\n", __func__,
41,590✔
2625
                  stbInfo->childTblArray[i]->name);
2626
    }
2627

2628
    return stbInfo->childTblCount;
203✔
2629
}
2630

2631
static int64_t fillChildTblNameByFromTo(SDataBase *database,
3✔
2632
        SSuperTable* stbInfo) {
2633
    for (int64_t i = stbInfo->childTblFrom; i < stbInfo->childTblTo; i++) {
12✔
2634
        snprintf(stbInfo->childTblArray[i-stbInfo->childTblFrom]->name,
9✔
2635
                TSDB_TABLE_NAME_LEN,
2636
                "%s%" PRIu64 "",
2637
                stbInfo->childTblPrefix, i);
2638
    }
2639

2640
    return (stbInfo->childTblTo-stbInfo->childTblFrom);
3✔
2641
}
2642

2643
static int64_t fillChildTblNameByLimitOffset(SDataBase *database,
5✔
2644
        SSuperTable* stbInfo) {
2645
    SBenchConn* conn = initBenchConn();
5✔
2646
    if (NULL == conn) {
5✔
2647
        return -1;
×
2648
    }
2649
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
5✔
2650
    if (g_arguments->taosc_version == 3) {
5✔
2651
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
5✔
2652
                 "SELECT DISTINCT(TBNAME) FROM %s.`%s` LIMIT %" PRId64
2653
                 " OFFSET %" PRIu64 "",
2654
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
2655
                 stbInfo->childTblOffset);
2656
    } else {
2657
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
2658
                 "SELECT TBNAME FROM %s.`%s` LIMIT %" PRId64
2659
                 " OFFSET %" PRIu64 "",
2660
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
2661
                 stbInfo->childTblOffset);
2662
    }
2663
    debugPrint("cmd: %s\n", cmd);
5✔
2664
    TAOS_RES *res = taos_query(conn->taos, cmd);
5✔
2665
    int32_t   code = taos_errno(res);
5✔
2666
    int64_t   count = 0;
5✔
2667
    if (code) {
5✔
2668
        printErrCmdCodeStr(cmd, code, res);
3✔
2669
        closeBenchConn(conn);
3✔
2670
        return -1;
3✔
2671
    }
2672
    TAOS_ROW row = NULL;
2✔
2673
    while ((row = taos_fetch_row(res)) != NULL) {
6✔
2674
        int *lengths = taos_fetch_lengths(res);
4✔
2675
        strncpy(stbInfo->childTblArray[count]->name, row[0], lengths[0]);
4✔
2676
        stbInfo->childTblArray[count]->name[lengths[0] + 1] = '\0';
4✔
2677
        debugPrint("stbInfo->childTblArray[%" PRId64 "]->name: %s\n",
4✔
2678
                   count, stbInfo->childTblArray[count]->name);
2679
        count++;
4✔
2680
    }
2681
    taos_free_result(res);
2✔
2682
    closeBenchConn(conn);
2✔
2683
    return count;
2✔
2684
}
2685

2686
static void preProcessArgument(SSuperTable *stbInfo) {
206✔
2687
    if (stbInfo->interlaceRows > g_arguments->reqPerReq) {
206✔
2688
        infoPrint(
6✔
2689
            "interlaceRows(%d) is larger than record per request(%u), which "
2690
            "will be set to %u\n",
2691
            stbInfo->interlaceRows, g_arguments->reqPerReq,
2692
            g_arguments->reqPerReq);
2693
        stbInfo->interlaceRows = g_arguments->reqPerReq;
6✔
2694
    }
2695

2696
    if (stbInfo->interlaceRows > stbInfo->insertRows) {
206✔
2697
        infoPrint(
1✔
2698
                "interlaceRows larger than insertRows %d > %" PRId64 "\n",
2699
                stbInfo->interlaceRows, stbInfo->insertRows);
2700
        infoPrint("%s", "interlaceRows will be set to 0\n");
1✔
2701
        stbInfo->interlaceRows = 0;
1✔
2702
    }
2703

2704
    if (stbInfo->interlaceRows == 0
206✔
2705
            && g_arguments->reqPerReq > stbInfo->insertRows) {
182✔
2706
        infoPrint("record per request (%u) is larger than "
89✔
2707
                "insert rows (%"PRIu64")"
2708
                " in progressive mode, which will be set to %"PRIu64"\n",
2709
                g_arguments->reqPerReq, stbInfo->insertRows,
2710
                stbInfo->insertRows);
2711
        g_arguments->reqPerReq = stbInfo->insertRows;
89✔
2712
    }
2713

2714
    if (stbInfo->interlaceRows > 0 && stbInfo->iface == STMT_IFACE
206✔
2715
            && stbInfo->autoTblCreating) {
6✔
2716
        infoPrint("%s",
1✔
2717
                "not support autocreate table with interlace row in stmt "
2718
                "insertion, will change to progressive mode\n");
2719
        stbInfo->interlaceRows = 0;
1✔
2720
    }
2721
}
206✔
2722

2723
static int printTotalDelay(SDataBase *database,
206✔
2724
                           int64_t totalDelay,
2725
                           BArray *total_delay_list,
2726
                            int threads,
2727
                            int64_t totalInsertRows,
2728
                            int64_t start, int64_t end) {
2729
    succPrint("Spent %.6f seconds to insert rows: %" PRIu64
206✔
2730
              " with %d thread(s) into %s %.2f records/second\n",
2731
              (end - start)/1E6, totalInsertRows, threads,
2732
              database->dbName,
2733
              (double)(totalInsertRows / ((end - start)/1E6)));
2734
    if (!total_delay_list->size) {
206✔
2735
        return -1;
1✔
2736
    }
2737

2738
    succPrint("insert delay, "
205✔
2739
              "min: %.4fms, "
2740
              "avg: %.4fms, "
2741
              "p90: %.4fms, "
2742
              "p95: %.4fms, "
2743
              "p99: %.4fms, "
2744
              "max: %.4fms\n",
2745
              *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3,
2746
              (double)totalDelay/total_delay_list->size/1E3,
2747
              *(int64_t *)(benchArrayGet(total_delay_list,
2748
                                         (int32_t)(total_delay_list->size
2749
                                         * 0.9)))/1E3,
2750
              *(int64_t *)(benchArrayGet(total_delay_list,
2751
                                         (int32_t)(total_delay_list->size
2752
                                         * 0.95)))/1E3,
2753
              *(int64_t *)(benchArrayGet(total_delay_list,
2754
                                         (int32_t)(total_delay_list->size
2755
                                         * 0.99)))/1E3,
2756
              *(int64_t *)(benchArrayGet(total_delay_list,
2757
                                         (int32_t)(total_delay_list->size
2758
                                         - 1)))/1E3);
2759
    return 0;
205✔
2760
}
2761

2762
static int64_t fillChildTblNameImp(SDataBase *database, SSuperTable *stbInfo) {
9✔
2763
    int64_t ntables;
2764
    if (stbInfo->childTblLimit) {
9✔
2765
        ntables = fillChildTblNameByLimitOffset(database, stbInfo);
5✔
2766
    } else if (stbInfo->childTblFrom || stbInfo->childTblTo) {
4✔
2767
        ntables = fillChildTblNameByFromTo(database, stbInfo);
3✔
2768
    } else {
2769
        ntables = fillChildTblNameByCount(stbInfo);
1✔
2770
    }
2771
    return ntables;
9✔
2772
}
2773

2774
static int64_t fillChildTblName(SDataBase *database, SSuperTable *stbInfo) {
214✔
2775
    int64_t ntables = stbInfo->childTblCount;
214✔
2776
    stbInfo->childTblArray = benchCalloc(stbInfo->childTblCount,
214✔
2777
            sizeof(SChildTable*), true);
2778
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
41,898✔
2779
        stbInfo->childTblArray[child] =
41,684✔
2780
            benchCalloc(1, sizeof(SChildTable), true);
41,684✔
2781
    }
2782

2783
    if (stbInfo->childTblCount == 1 && stbInfo->tags->size == 0) {
214✔
2784
        // Normal table
2785
        snprintf(stbInfo->childTblArray[0]->name, TSDB_TABLE_NAME_LEN,
3✔
2786
                    "%s", stbInfo->stbName);
2787
    } else if ((stbInfo->iface != SML_IFACE
211✔
2788
                && stbInfo->iface != SML_REST_IFACE)
153✔
2789
            && stbInfo->childTblExists) {
139✔
2790
        ntables = fillChildTblNameImp(database, stbInfo);
9✔
2791
    } else {
2792
        ntables = fillChildTblNameByCount(stbInfo);
202✔
2793
    }
2794

2795
    return ntables;
214✔
2796
}
2797

2798
static int startMultiThreadInsertData(SDataBase* database,
208✔
2799
        SSuperTable* stbInfo) {
2800
    if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE)
208✔
2801
            && !stbInfo->use_metric) {
72✔
2802
        errorPrint("%s", "schemaless cannot work without stable\n");
2✔
2803
        return -1;
2✔
2804
    }
2805

2806
    preProcessArgument(stbInfo);
206✔
2807

2808
    int64_t ntables;
2809
    if (stbInfo->childTblTo > 0) {
206✔
2810
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
4✔
2811
    } else if (stbInfo->childTblLimit) {
202✔
2812
        ntables = stbInfo->childTblLimit;
18✔
2813
    } else {
2814
        ntables = stbInfo->childTblCount;
184✔
2815
    }
2816
    if (ntables == 0) {
206✔
2817
        return 0;
×
2818
    }
2819

2820
    uint64_t tableFrom = 0;
206✔
2821
    int32_t threads = g_arguments->nthreads;
206✔
2822
    int64_t a = 0, b = 0;
206✔
2823

2824
#ifdef TD_VER_COMPATIBLE_3_0_0_0
2825
    if ((0 == stbInfo->interlaceRows)
206✔
2826
            && (g_arguments->nthreads_auto)) {
236✔
2827
        SBenchConn* conn = initBenchConn();
53✔
2828
        if (NULL == conn) {
53✔
2829
            return -1;
×
2830
        }
2831

2832
        for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
20,245✔
2833
            int vgId;
2834
            int ret = taos_get_table_vgId(
20,192✔
2835
                    conn->taos, database->dbName,
20,192✔
2836
                    stbInfo->childTblArray[i]->name, &vgId);
20,192✔
2837
            if (ret < 0) {
20,192✔
2838
                errorPrint("Failed to get %s db's %s table's vgId\n",
×
2839
                           database->dbName,
2840
                           stbInfo->childTblArray[i]->name);
2841
                closeBenchConn(conn);
×
2842
                return -1;
×
2843
            }
2844
            debugPrint("Db %s\'s table\'s %s vgId is: %d\n",
20,192✔
2845
                       database->dbName,
2846
                       stbInfo->childTblArray[i]->name, vgId);
2847
            for (int32_t v = 0; v < database->vgroups; v++) {
60,576✔
2848
                SVGroup *vg = benchArrayGet(database->vgArray, v);
40,384✔
2849
                if (vgId == vg->vgId) {
40,384✔
2850
                    vg->tbCountPerVgId++;
20,192✔
2851
                }
2852
            }
2853
        }
2854

2855
        threads = 0;
53✔
2856
        for (int v = 0; v < database->vgroups; v++) {
159✔
2857
            SVGroup *vg = benchArrayGet(database->vgArray, v);
106✔
2858
            infoPrint("Total %"PRId64" tables on bb %s's vgroup %d (id: %d)\n",
106✔
2859
                      vg->tbCountPerVgId, database->dbName, v, vg->vgId);
2860
            if (vg->tbCountPerVgId) {
106✔
2861
                threads++;
61✔
2862
            } else {
2863
                continue;
45✔
2864
            }
2865
            vg->childTblArray = benchCalloc(
61✔
2866
                    vg->tbCountPerVgId, sizeof(SChildTable*), true);
2867
            vg->tbOffset = 0;
61✔
2868
        }
2869
        for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
20,245✔
2870
            int vgId;
2871
            int ret = taos_get_table_vgId(
20,192✔
2872
                    conn->taos, database->dbName,
20,192✔
2873
                    stbInfo->childTblArray[i]->name, &vgId);
20,192✔
2874
            if (ret < 0) {
20,192✔
2875
                errorPrint("Failed to get %s db's %s table's vgId\n",
×
2876
                           database->dbName,
2877
                           stbInfo->childTblArray[i]->name);
2878

2879
                closeBenchConn(conn);
×
2880
                return -1;
×
2881
            }
2882
            debugPrint("Db %s\'s table\'s %s vgId is: %d\n",
20,192✔
2883
                       database->dbName,
2884
                       stbInfo->childTblArray[i]->name, vgId);
2885
            for (int32_t v = 0; v < database->vgroups; v++) {
60,576✔
2886
                SVGroup *vg = benchArrayGet(database->vgArray, v);
40,384✔
2887
                if (vgId == vg->vgId) {
40,384✔
2888
                    vg->childTblArray[vg->tbOffset] =
20,192✔
2889
                           stbInfo->childTblArray[i];
20,192✔
2890
                    vg->tbOffset++;
20,192✔
2891
                }
2892
            }
2893
        }
2894
        closeBenchConn(conn);
53✔
2895
    } else {
2896
        a = ntables / threads;
153✔
2897
        if (a < 1) {
153✔
2898
            threads = (int32_t)ntables;
26✔
2899
            a = 1;
26✔
2900
        }
2901
        b = 0;
153✔
2902
        if (threads != 0) {
153✔
2903
            b = ntables % threads;
153✔
2904
        }
2905
    }
2906

2907
    int32_t vgFrom = 0;
206✔
2908
#else
2909
    a = ntables / threads;
2910
    if (a < 1) {
2911
        threads = (int32_t)ntables;
2912
        a = 1;
2913
    }
2914
    b = 0;
2915
    if (threads != 0) {
2916
        b = ntables % threads;
2917
    }
2918
#endif   // TD_VER_COMPATIBLE_3_0_0_0
2919
    pthread_t   *pids = benchCalloc(1, threads * sizeof(pthread_t), true);
206✔
2920
    threadInfo  *infos = benchCalloc(1, threads * sizeof(threadInfo), true);
206✔
2921

2922
    for (int32_t i = 0; i < threads; i++) {
745✔
2923
        threadInfo *pThreadInfo = infos + i;
539✔
2924
        pThreadInfo->threadID = i;
539✔
2925
        pThreadInfo->dbInfo = database;
539✔
2926
        pThreadInfo->stbInfo = stbInfo;
539✔
2927
        pThreadInfo->start_time = stbInfo->startTimestamp;
539✔
2928
        pThreadInfo->totalInsertRows = 0;
539✔
2929
        pThreadInfo->samplePos = 0;
539✔
2930
#ifdef TD_VER_COMPATIBLE_3_0_0_0
2931
        if ((0 == stbInfo->interlaceRows)
539✔
2932
                && (g_arguments->nthreads_auto)) {
492✔
2933
            int32_t j;
2934
            for (j = vgFrom; i < database->vgroups; j++) {
63✔
2935
                SVGroup *vg = benchArrayGet(database->vgArray, j);
63✔
2936
                if (0 == vg->tbCountPerVgId) {
63✔
2937
                    continue;
2✔
2938
                }
2939
                pThreadInfo->vg = vg;
61✔
2940
                pThreadInfo->start_table_from = 0;
61✔
2941
                pThreadInfo->ntables = vg->tbCountPerVgId;
61✔
2942
                pThreadInfo->end_table_to = vg->tbCountPerVgId-1;
61✔
2943
                break;
61✔
2944
            }
2945
            vgFrom = j + 1;
61✔
2946
        } else {
2947
            pThreadInfo->start_table_from = tableFrom;
478✔
2948
            pThreadInfo->ntables = i < b ? a + 1 : a;
478✔
2949
            pThreadInfo->end_table_to = (i < b)?(tableFrom+a):(tableFrom+a-1);
478✔
2950
            tableFrom = pThreadInfo->end_table_to + 1;
478✔
2951
        }
2952
#else
2953
        pThreadInfo->start_table_from = tableFrom;
2954
        pThreadInfo->ntables = i < b ? a + 1 : a;
2955
        pThreadInfo->end_table_to = (i < b)?(tableFrom+a):(tableFrom+a-1);
2956
        tableFrom = pThreadInfo->end_table_to + 1;
2957
#endif  // TD_VER_COMPATIBLE_3_0_0_0
2958
        pThreadInfo->delayList = benchArrayInit(1, sizeof(int64_t));
539✔
2959
        switch (stbInfo->iface) {
539✔
2960
            case REST_IFACE: {
13✔
2961
                if (stbInfo->interlaceRows > 0) {
13✔
2962
                    pThreadInfo->buffer = new_ds(0);
×
2963
                } else {
2964
                    pThreadInfo->buffer =
13✔
2965
                        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
13✔
2966
                }
2967
                int sockfd = createSockFd();
13✔
2968
                if (sockfd < 0) {
13✔
2969
                    FREE_PIDS_INFOS_RETURN_MINUS_1();
×
2970
                }
2971
                pThreadInfo->sockfd = sockfd;
13✔
2972
                break;
13✔
2973
            }
2974
            case STMT_IFACE: {
42✔
2975
                pThreadInfo->conn = initBenchConn();
42✔
2976
                if (NULL == pThreadInfo->conn) {
42✔
2977
                    FREE_PIDS_INFOS_RETURN_MINUS_1();
×
2978
                }
2979
                pThreadInfo->conn->stmt =
84✔
2980
                    taos_stmt_init(pThreadInfo->conn->taos);
42✔
2981
                if (NULL == pThreadInfo->conn->stmt) {
42✔
2982
                    errorPrint("taos_stmt_init() failed, reason: %s\n",
×
2983
                               taos_errstr(NULL));
2984
                    FREE_RESOURCE();
×
2985
                    return -1;
×
2986
                }
2987
                if (taos_select_db(pThreadInfo->conn->taos, database->dbName)) {
42✔
2988
                    errorPrint("taos select database(%s) failed\n",
×
2989
                            database->dbName);
2990
                    FREE_RESOURCE();
×
2991
                    return -1;
×
2992
                }
2993
                if (!stbInfo->autoTblCreating) {
42✔
2994
                    if (prepareStmt(stbInfo, pThreadInfo->conn->stmt, 0)) {
34✔
2995
                        FREE_RESOURCE();
×
2996
                        return -1;
×
2997
                    }
2998
                }
2999

3000
                pThreadInfo->bind_ts = benchCalloc(1, sizeof(int64_t), true);
42✔
3001
                pThreadInfo->bind_ts_array =
42✔
3002
                        benchCalloc(1, sizeof(int64_t)*g_arguments->reqPerReq,
42✔
3003
                                    true);
3004
                pThreadInfo->bindParams = benchCalloc(
84✔
3005
                        1, sizeof(TAOS_MULTI_BIND)*(stbInfo->cols->size + 1),
42✔
3006
                        true);
3007
                pThreadInfo->is_null = benchCalloc(1, g_arguments->reqPerReq,
42✔
3008
                                                   true);
3009
                parseBufferToStmtBatch(stbInfo);
42✔
3010
                for (int64_t child = 0;
42✔
3011
                        child < stbInfo->childTblCount; child++) {
752✔
3012
                    SChildTable *childTbl = stbInfo->childTblArray[child];
710✔
3013
                    if (childTbl->useOwnSample) {
710✔
3014
                        parseBufferToStmtBatchChildTbl(stbInfo, childTbl);
10✔
3015
                    }
3016
                }
3017

3018
                break;
42✔
3019
            }
3020
            case SML_REST_IFACE: {
37✔
3021
                int sockfd = createSockFd();
37✔
3022
                if (sockfd < 0) {
37✔
3023
                    free(pids);
×
3024
                    free(infos);
×
3025
                    return -1;
×
3026
                }
3027
                pThreadInfo->sockfd = sockfd;
37✔
3028
            }
3029
            case SML_IFACE: {
229✔
3030
                pThreadInfo->conn = initBenchConn();
229✔
3031
                if (pThreadInfo->conn == NULL) {
229✔
3032
                    errorPrint("%s() init connection failed\n", __func__);
×
3033
                    FREE_RESOURCE();
×
3034
                    return -1;
×
3035
                }
3036
                if (taos_select_db(pThreadInfo->conn->taos, database->dbName)) {
229✔
3037
                    errorPrint("taos select database(%s) failed\n",
×
3038
                               database->dbName);
3039
                    FREE_RESOURCE();
×
3040
                    return -1;
×
3041
                }
3042
                pThreadInfo->max_sql_len =
229✔
3043
                    stbInfo->lenOfCols + stbInfo->lenOfTags;
229✔
3044
                if (stbInfo->iface == SML_REST_IFACE) {
229✔
3045
                    pThreadInfo->buffer =
37✔
3046
                            benchCalloc(1, g_arguments->reqPerReq *
37✔
3047
                                      (1 + pThreadInfo->max_sql_len), true);
37✔
3048
                }
3049
                int protocol = stbInfo->lineProtocol;
229✔
3050
                if (TSDB_SML_JSON_PROTOCOL != protocol
229✔
3051
                        && SML_JSON_TAOS_FORMAT != protocol) {
142✔
3052
                    pThreadInfo->sml_tags =
103✔
3053
                        (char **)benchCalloc(pThreadInfo->ntables,
103✔
3054
                                             sizeof(char *), true);
3055
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
306✔
3056
                        pThreadInfo->sml_tags[t] =
203✔
3057
                                benchCalloc(1, stbInfo->lenOfTags, true);
203✔
3058
                    }
3059

3060
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
306✔
3061
                        if (generateRandData(
203✔
3062
                                    stbInfo, pThreadInfo->sml_tags[t],
203✔
3063
                                    stbInfo->lenOfTags,
203✔
3064
                                    stbInfo->lenOfCols + stbInfo->lenOfTags,
203✔
3065
                                    stbInfo->tags, 1, true, NULL)) {
3066
                            return -1;
×
3067
                        }
3068
                        debugPrint("pThreadInfo->sml_tags[%d]: %s\n", t,
203✔
3069
                                   pThreadInfo->sml_tags[t]);
3070
                    }
3071
                    pThreadInfo->lines =
103✔
3072
                            benchCalloc(g_arguments->reqPerReq,
103✔
3073
                                        sizeof(char *), true);
3074

3075
                    for (int j = 0; (j < g_arguments->reqPerReq
206✔
3076
                            && !g_arguments->terminate); j++) {
61,041✔
3077
                        pThreadInfo->lines[j] =
60,938✔
3078
                                benchCalloc(1, pThreadInfo->max_sql_len, true);
60,938✔
3079
                    }
3080
                } else {
3081
                    pThreadInfo->json_array = tools_cJSON_CreateArray();
126✔
3082
                    pThreadInfo->sml_json_tags = tools_cJSON_CreateArray();
126✔
3083
                    pThreadInfo->sml_tags_json_array = (char **)benchCalloc(
126✔
3084
                            pThreadInfo->ntables, sizeof(char *), true);
3085
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
372✔
3086
                        if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
246✔
3087
                            generateSmlJsonTags(
171✔
3088
                                pThreadInfo->sml_json_tags,
3089
                                    pThreadInfo->sml_tags_json_array,
3090
                                    stbInfo,
3091
                                pThreadInfo->start_table_from, t);
3092
                        } else {
3093
                            generateSmlTaosJsonTags(
75✔
3094
                                pThreadInfo->sml_json_tags, stbInfo,
3095
                                pThreadInfo->start_table_from, t);
3096
                        }
3097
                    }
3098
                    pThreadInfo->lines = (char **)benchCalloc(
126✔
3099
                            1, sizeof(char *), true);
3100
                    if ((0 == stbInfo->interlaceRows)
126✔
3101
                        && (TSDB_SML_JSON_PROTOCOL == protocol)) {
114✔
3102
                        pThreadInfo->line_buf_len =
75✔
3103
                            g_arguments->reqPerReq *
150✔
3104
                            accumulateRowLen(pThreadInfo->stbInfo->tags,
75✔
3105
                                             pThreadInfo->stbInfo->iface);
75✔
3106
                        debugPrint("%s() LN%d, line_buf_len=%d\n",
75✔
3107
                               __func__, __LINE__, pThreadInfo->line_buf_len);
3108
                        pThreadInfo->lines[0] = benchCalloc(
150✔
3109
                            1, pThreadInfo->line_buf_len, true);
75✔
3110
                        pThreadInfo->sml_json_value_array =
75✔
3111
                            (char **)benchCalloc(
75✔
3112
                                pThreadInfo->ntables, sizeof(char *), true);
3113
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
222✔
3114
                            generateSmlJsonValues(
147✔
3115
                                pThreadInfo->sml_json_value_array, stbInfo, t);
3116
                        }
3117
                    }
3118
                }
3119
                break;
229✔
3120
            }
3121
            case TAOSC_IFACE: {
255✔
3122
                pThreadInfo->conn = initBenchConn();
255✔
3123
                if (pThreadInfo->conn == NULL) {
255✔
3124
                    errorPrint("%s() failed to connect\n", __func__);
×
3125
                    FREE_RESOURCE();
×
3126
                    return -1;
×
3127
                }
3128
                char* command = benchCalloc(1, SHORT_1K_SQL_BUFF_LEN, false);
255✔
3129
                snprintf(command, SHORT_1K_SQL_BUFF_LEN,
255✔
3130
                        g_arguments->escape_character
255✔
3131
                        ? "USE `%s`"
3132
                        : "USE %s",
3133
                        database->dbName);
3134
                if (queryDbExecCall(pThreadInfo->conn, command)) {
255✔
3135
                    errorPrint("taos select database(%s) failed\n",
×
3136
                               database->dbName);
3137
                    FREE_RESOURCE();
×
3138
                    tmfree(command);
×
3139
                    return -1;
×
3140
                }
3141
                tmfree(command);
255✔
3142
                command = NULL;
255✔
3143

3144
                if (stbInfo->interlaceRows > 0) {
255✔
3145
                    pThreadInfo->buffer = new_ds(0);
6✔
3146
                } else {
3147
                    pThreadInfo->buffer =
249✔
3148
                        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
249✔
3149
                    if (g_arguments->check_sql) {
249✔
3150
                        pThreadInfo->csql =
4✔
3151
                            benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
4✔
3152
                        memset(pThreadInfo->csql, 0, TSDB_MAX_ALLOWED_SQL_LEN);
4✔
3153
                    }
3154
                }
3155

3156
                break;
255✔
3157
            }
3158
            default:
×
3159
                break;
×
3160
        }
3161
    }
3162

3163
    infoPrint("Estimate memory usage: %.2fMB\n",
206✔
3164
              (double)g_memoryUsage / 1048576);
3165
    prompt(0);
206✔
3166

3167
    // create threads
3168
    for (int i = 0; (i < threads && !g_arguments->terminate); i++) {
745✔
3169
        threadInfo *pThreadInfo = infos + i;
539✔
3170
        if (stbInfo->interlaceRows > 0) {
539✔
3171
            pthread_create(pids + i, NULL,
47✔
3172
                           syncWriteInterlace, pThreadInfo);
3173
        } else {
3174
            pthread_create(pids + i, NULL,
492✔
3175
                           syncWriteProgressive, pThreadInfo);
3176
        }
3177
    }
3178

3179
    int64_t start = toolsGetTimestampUs();
206✔
3180

3181
    // wait threads
3182
    for (int i = 0; (i < threads); i++) {
745✔
3183
        if(pids[i] != 0) {
539✔
3184
            infoPrint(" pthread_join %d ...\n", i);
539✔
3185
            pthread_join(pids[i], NULL);
539✔
3186
        } else {
3187
            infoPrint(" pthread_join %d is null , not wait.\n", i);
×
3188
        }
3189
    }
3190

3191
    int64_t end = toolsGetTimestampUs()+1;
206✔
3192

3193
    if (g_arguments->terminate)  toolsMsleep(100);
206✔
3194

3195
    BArray *  total_delay_list = benchArrayInit(1, sizeof(int64_t));
206✔
3196
    int64_t   totalDelay = 0;
206✔
3197
    uint64_t  totalInsertRows = 0;
206✔
3198

3199
    // free threads resource
3200
    for (int i = 0; i < threads; i++) {
745✔
3201
        threadInfo *pThreadInfo = infos + i;
539✔
3202
        // free check sql
3203
        if (pThreadInfo->csql) {
539✔
3204
            tmfree(pThreadInfo->csql);
4✔
3205
            pThreadInfo->csql = NULL;
4✔
3206
        }
3207

3208
        int protocol = stbInfo->lineProtocol;
539✔
3209
        switch (stbInfo->iface) {
539✔
3210
            case REST_IFACE:
13✔
3211
                if (g_arguments->terminate)
13✔
3212
                    toolsMsleep(100);
×
3213
                destroySockFd(pThreadInfo->sockfd);
13✔
3214
                if (stbInfo->interlaceRows > 0) {
13✔
3215
                    free_ds(&pThreadInfo->buffer);
×
3216
                } else {
3217
                    tmfree(pThreadInfo->buffer);
13✔
3218
                    pThreadInfo->buffer = NULL;
13✔
3219
                }
3220
                break;
13✔
3221
            case SML_REST_IFACE:
37✔
3222
                if (g_arguments->terminate)
37✔
3223
                    toolsMsleep(100);
×
3224
                tmfree(pThreadInfo->buffer);
37✔
3225
                // on-purpose no break here
3226
            case SML_IFACE:
229✔
3227
                if (TSDB_SML_JSON_PROTOCOL != protocol
229✔
3228
                        && SML_JSON_TAOS_FORMAT != protocol) {
142✔
3229
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
306✔
3230
                        tmfree(pThreadInfo->sml_tags[t]);
203✔
3231
                    }
3232
                    for (int j = 0; j < g_arguments->reqPerReq; j++) {
61,041✔
3233
                        tmfree(pThreadInfo->lines[j]);
60,938✔
3234
                    }
3235
                    tmfree(pThreadInfo->sml_tags);
103✔
3236
                    pThreadInfo->sml_tags = NULL;
103✔
3237
                } else {
3238
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
372✔
3239
                        tmfree(pThreadInfo->sml_tags_json_array[t]);
246✔
3240
                    }
3241
                    tmfree(pThreadInfo->sml_tags_json_array);
126✔
3242
                    pThreadInfo->sml_tags_json_array = NULL;
126✔
3243
                    if (pThreadInfo->sml_json_tags) {
126✔
3244
                        tools_cJSON_Delete(pThreadInfo->sml_json_tags);
126✔
3245
                        pThreadInfo->sml_json_tags = NULL;
126✔
3246
                    }
3247
                    if (pThreadInfo->json_array) {
126✔
3248
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
3249
                        pThreadInfo->json_array = NULL;
×
3250
                    }
3251
                }
3252
                if (pThreadInfo->lines) {
229✔
3253
                    if ((0 == stbInfo->interlaceRows)
229✔
3254
                            && (TSDB_SML_JSON_PROTOCOL == protocol)) {
193✔
3255
                        tmfree(pThreadInfo->lines[0]);
75✔
3256
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
222✔
3257
                            tmfree(pThreadInfo->sml_json_value_array[t]);
147✔
3258
                        }
3259
                        tmfree(pThreadInfo->sml_json_value_array);
75✔
3260
                    }
3261
                    tmfree(pThreadInfo->lines);
229✔
3262
                    pThreadInfo->lines = NULL;
229✔
3263
                }
3264
                break;
229✔
3265

3266
            case STMT_IFACE:
42✔
3267
                taos_stmt_close(pThreadInfo->conn->stmt);
42✔
3268
                tmfree(pThreadInfo->bind_ts);
42✔
3269
                tmfree(pThreadInfo->bind_ts_array);
42✔
3270
                tmfree(pThreadInfo->bindParams);
42✔
3271
                tmfree(pThreadInfo->is_null);
42✔
3272
                break;
42✔
3273

3274
            case TAOSC_IFACE:
255✔
3275
                if (stbInfo->interlaceRows > 0) {
255✔
3276
                    free_ds(&pThreadInfo->buffer);
6✔
3277
                } else {
3278
                    tmfree(pThreadInfo->buffer);
249✔
3279
                    pThreadInfo->buffer = NULL;
249✔
3280
                }
3281
                break;
255✔
3282

3283
            default:
×
3284
                break;
×
3285
        }
3286
        totalInsertRows += pThreadInfo->totalInsertRows;
539✔
3287
        totalDelay += pThreadInfo->totalDelay;
539✔
3288
        benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
539✔
3289
                pThreadInfo->delayList->size);
539✔
3290
        tmfree(pThreadInfo->delayList);
539✔
3291
        pThreadInfo->delayList = NULL;
539✔
3292
        //  free conn
3293
        if (pThreadInfo->conn) {
539✔
3294
            closeBenchConn(pThreadInfo->conn);
526✔
3295
            pThreadInfo->conn = NULL;
526✔
3296
        }
3297
    }
3298

3299
    // calculate result
3300
    qsort(total_delay_list->pData, total_delay_list->size,
206✔
3301
            total_delay_list->elemSize, compare);
3302

3303
    if (g_arguments->terminate)  toolsMsleep(100);
206✔
3304

3305
    free(pids);
206✔
3306
    free(infos);
206✔
3307

3308
    int ret = printTotalDelay(database, totalDelay,
206✔
3309
                              total_delay_list, threads,
3310
                    totalInsertRows, start, end);
3311
    benchArrayDestroy(total_delay_list);
206✔
3312
    if (g_fail || ret) {
206✔
3313
        return -1;
3✔
3314
    }
3315
    return 0;
203✔
3316
}
3317

3318
static int getStbInsertedRows(char* dbName, char* stbName, TAOS* taos) {
×
3319
    int rows = 0;
×
3320
    char command[SHORT_1K_SQL_BUFF_LEN];
3321
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT COUNT(*) FROM %s.%s",
×
3322
             dbName, stbName);
3323
    TAOS_RES* res = taos_query(taos, command);
×
3324
    int code = taos_errno(res);
×
3325
    if (code != 0) {
×
3326
        printErrCmdCodeStr(command, code, res);
×
3327
        return -1;
×
3328
    }
3329
    TAOS_ROW row = taos_fetch_row(res);
×
3330
    if (row == NULL) {
×
3331
        rows = 0;
×
3332
    } else {
3333
        rows = (int)*(int64_t*)row[0];
×
3334
    }
3335
    taos_free_result(res);
×
3336
    return rows;
×
3337
}
3338

3339
static void create_tsma(TSMA* tsma, SBenchConn* conn, char* stbName) {
×
3340
    char command[SHORT_1K_SQL_BUFF_LEN];
3341
    int len = snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
3342
                       "CREATE sma INDEX %s ON %s function(%s) "
3343
                       "INTERVAL (%s) SLIDING (%s)",
3344
                       tsma->name, stbName, tsma->func,
3345
                       tsma->interval, tsma->sliding);
3346
    if (tsma->custom) {
×
3347
        snprintf(command + len, SHORT_1K_SQL_BUFF_LEN - len,
×
3348
                 " %s", tsma->custom);
3349
    }
3350
    int code = queryDbExecCall(conn, command);
×
3351
    if (code == 0) {
×
3352
        infoPrint("successfully create tsma with command <%s>\n", command);
×
3353
    }
3354
}
×
3355

3356
static void* create_tsmas(void* args) {
×
3357
    tsmaThreadInfo* pThreadInfo = (tsmaThreadInfo*) args;
×
3358
    int inserted_rows = 0;
×
3359
    SBenchConn* conn = initBenchConn();
×
3360
    if (NULL == conn) {
×
3361
        return NULL;
×
3362
    }
3363
    int finished = 0;
×
3364
    if (taos_select_db(conn->taos, pThreadInfo->dbName)) {
×
3365
        errorPrint("failed to use database (%s)\n", pThreadInfo->dbName);
×
3366
        closeBenchConn(conn);
×
3367
        return NULL;
×
3368
    }
3369
    while (finished < pThreadInfo->tsmas->size && inserted_rows >= 0) {
×
3370
        inserted_rows = (int)getStbInsertedRows(
×
3371
                pThreadInfo->dbName, pThreadInfo->stbName, conn->taos);
3372
        for (int i = 0; i < pThreadInfo->tsmas->size; i++) {
×
3373
            TSMA* tsma = benchArrayGet(pThreadInfo->tsmas, i);
×
3374
            if (!tsma->done &&  inserted_rows >= tsma->start_when_inserted) {
×
3375
                create_tsma(tsma, conn, pThreadInfo->stbName);
×
3376
                tsma->done = true;
×
3377
                finished++;
×
3378
                break;
×
3379
            }
3380
        }
3381
        toolsMsleep(10);
×
3382
    }
3383
    benchArrayDestroy(pThreadInfo->tsmas);
×
3384
    closeBenchConn(conn);
×
3385
    return NULL;
×
3386
}
3387

3388
static int32_t createStream(SSTREAM* stream) {
2✔
3389
    int32_t code = -1;
2✔
3390
    char * command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
2✔
3391
    snprintf(command, TSDB_MAX_ALLOWED_SQL_LEN, "DROP STREAM IF EXISTS %s",
2✔
3392
             stream->stream_name);
2✔
3393
    infoPrint("%s\n", command);
2✔
3394
    SBenchConn* conn = initBenchConn();
2✔
3395
    if (NULL == conn) {
2✔
3396
        goto END_STREAM;
×
3397
    }
3398

3399
    code = queryDbExecCall(conn, command);
2✔
3400
    int32_t trying = g_arguments->keep_trying;
2✔
3401
    while (code && trying) {
2✔
3402
        infoPrint("will sleep %"PRIu32" milliseconds then re-drop stream %s\n",
×
3403
                          g_arguments->trying_interval, stream->stream_name);
3404
        toolsMsleep(g_arguments->trying_interval);
×
3405
        code = queryDbExecCall(conn, command);
×
3406
        if (trying != -1) {
×
3407
            trying--;
×
3408
        }
3409
    }
3410

3411
    if (code) {
2✔
3412
        closeBenchConn(conn);
×
3413
        goto END_STREAM;
×
3414
    }
3415

3416
    memset(command, 0, TSDB_MAX_ALLOWED_SQL_LEN);
2✔
3417
    int pos = snprintf(command, TSDB_MAX_ALLOWED_SQL_LEN,
2✔
3418
            "CREATE STREAM IF NOT EXISTS %s ", stream->stream_name);
2✔
3419
    if (stream->trigger_mode[0] != '\0') {
2✔
3420
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3421
                "TRIGGER %s ", stream->trigger_mode);
1✔
3422
    }
3423
    if (stream->watermark[0] != '\0') {
2✔
3424
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3425
                "WATERMARK %s ", stream->watermark);
1✔
3426
    }
3427
    if (stream->ignore_update[0] != '\0') {
2✔
3428
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3429
                "IGNORE UPDATE %s ", stream->ignore_update);
1✔
3430
    }
3431
    if (stream->ignore_expired[0] != '\0') {
2✔
3432
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3433
                "IGNORE EXPIRED %s ", stream->ignore_expired);
1✔
3434
    }
3435
    if (stream->fill_history[0] != '\0') {
2✔
3436
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3437
                "FILL_HISTORY %s ", stream->fill_history);
1✔
3438
    }
3439
    pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
2✔
3440
            "INTO %s ", stream->stream_stb);
2✔
3441
    if (stream->stream_stb_field[0] != '\0') {
2✔
3442
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3443
                "%s ", stream->stream_stb_field);
1✔
3444
    }
3445
    if (stream->stream_tag_field[0] != '\0') {
2✔
3446
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3447
                "TAGS%s ", stream->stream_tag_field);
1✔
3448
    }
3449
    if (stream->subtable[0] != '\0') {
2✔
3450
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
3451
                "SUBTABLE%s ", stream->subtable);
1✔
3452
    }
3453
    snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
2✔
3454
            "as %s", stream->source_sql);
2✔
3455
    infoPrint("%s\n", command);
2✔
3456

3457
    code = queryDbExecCall(conn, command);
2✔
3458
    trying = g_arguments->keep_trying;
2✔
3459
    while (code && trying) {
2✔
3460
        infoPrint("will sleep %"PRIu32" milliseconds "
×
3461
                  "then re-create stream %s\n",
3462
                  g_arguments->trying_interval, stream->stream_name);
3463
        toolsMsleep(g_arguments->trying_interval);
×
3464
        code = queryDbExecCall(conn, command);
×
3465
        if (trying != -1) {
×
3466
            trying--;
×
3467
        }
3468
    }
3469

3470
    closeBenchConn(conn);
2✔
3471
END_STREAM:
2✔
3472
    tmfree(command);
2✔
3473
    return code;
2✔
3474
}
3475

3476
int insertTestProcess() {
154✔
3477
    prompt(0);
154✔
3478

3479
    encodeAuthBase64();
154✔
3480
    for (int i = 0; i < g_arguments->databases->size; i++) {
302✔
3481
        if (REST_IFACE == g_arguments->iface) {
155✔
3482
            if (0 != convertServAddr(g_arguments->iface,
3✔
3483
                                     false,
3484
                                     1)) {
3485
                return -1;
×
3486
            }
3487
        }
3488
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
155✔
3489

3490
        if (database->drop && !(g_arguments->supplementInsert)) {
155✔
3491
            if (database->superTbls) {
146✔
3492
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, 0);
145✔
3493
                if (stbInfo && (REST_IFACE == stbInfo->iface)) {
145✔
3494
                    if (0 != convertServAddr(stbInfo->iface,
3✔
3495
                                             stbInfo->tcpTransfer,
3✔
3496
                                             stbInfo->lineProtocol)) {
3✔
3497
                        return -1;
×
3498
                    }
3499
                }
3500
            }
3501
            if (createDatabase(database)) {
146✔
3502
                errorPrint("failed to create database (%s)\n",
7✔
3503
                        database->dbName);
3504
                return -1;
7✔
3505
            }
3506
            succPrint("created database (%s)\n", database->dbName);
139✔
3507
        }
3508
    }
3509
    for (int i = 0; i < g_arguments->databases->size; i++) {
294✔
3510
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
148✔
3511
        if (database->superTbls) {
148✔
3512
            for (int j = 0; j < database->superTbls->size; j++) {
361✔
3513
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
215✔
3514
                if (stbInfo->iface != SML_IFACE
215✔
3515
                        && stbInfo->iface != SML_REST_IFACE
157✔
3516
                        && !stbInfo->childTblExists) {
143✔
3517
#ifdef WEBSOCKET
3518
                    if (g_arguments->websocket) {
134✔
3519
                        dropSuperTable(database, stbInfo);
×
3520
                    }
3521
#endif
3522
                    if (getSuperTableFromServer(database, stbInfo) != 0) {
134✔
3523
                        if (createSuperTable(database, stbInfo)) {
132✔
3524
                            return -1;
1✔
3525
                        }
3526
                    }
3527
                }
3528
                fillChildTblName(database, stbInfo);
214✔
3529
                if (0 != prepareSampleData(database, stbInfo)) {
214✔
3530
                    return -1;
×
3531
                }
3532
            }
3533
        }
3534
    }
3535

3536
    if (g_arguments->taosc_version == 3) {
146✔
3537
        for (int i = 0; i < g_arguments->databases->size; i++) {
293✔
3538
            SDataBase* database = benchArrayGet(g_arguments->databases, i);
147✔
3539
            if (database->superTbls) {
147✔
3540
                for (int j = 0; (j < database->superTbls->size
146✔
3541
                        && !g_arguments->terminate); j++) {
360✔
3542
                    SSuperTable* stbInfo =
3543
                        benchArrayGet(database->superTbls, j);
214✔
3544
                    if (stbInfo->tsmas == NULL) {
214✔
3545
                        continue;
78✔
3546
                    }
3547
                    if (stbInfo->tsmas->size > 0) {
136✔
3548
                        tsmaThreadInfo* pThreadInfo =
3549
                            benchCalloc(1, sizeof(tsmaThreadInfo), true);
×
3550
                        pthread_t tsmas_pid = {0};
×
3551
                        pThreadInfo->dbName = database->dbName;
×
3552
                        pThreadInfo->stbName = stbInfo->stbName;
×
3553
                        pThreadInfo->tsmas = stbInfo->tsmas;
×
3554
                        pthread_create(&tsmas_pid, NULL,
×
3555
                                       create_tsmas, pThreadInfo);
3556
                    }
3557
                }
3558
            }
3559
        }
3560
    }
3561

3562
    if (createChildTables()) return -1;
146✔
3563

3564
    if (g_arguments->taosc_version == 3) {
146✔
3565
        for (int j = 0; j < g_arguments->streams->size; j++) {
148✔
3566
            SSTREAM * stream = benchArrayGet(g_arguments->streams, j);
2✔
3567
            if (stream->drop) {
2✔
3568
                if (createStream(stream)) {
2✔
3569
                    return -1;
×
3570
                }
3571
            }
3572
        }
3573
    }
3574

3575
    // create sub threads for inserting data
3576
    for (int i = 0; i < g_arguments->databases->size; i++) {
288✔
3577
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
147✔
3578
        if (database->superTbls) {
147✔
3579
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
353✔
3580
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
212✔
3581
                if (stbInfo->insertRows == 0) {
212✔
3582
                    continue;
4✔
3583
                }
3584
                prompt(stbInfo->non_stop);
208✔
3585
                if (startMultiThreadInsertData(database, stbInfo)) {
208✔
3586
                    return -1;
5✔
3587
                }
3588
            }
3589
        }
3590
    }
3591
    return 0;
141✔
3592
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc