• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.17
/tools/taos-tools/src/benchInsert.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15
#include "wrapDb.h"
16
#include <benchData.h>
17
#include <benchInsertMix.h>
18

19
static int32_t stmt2BindAndSubmit(
20
        threadInfo *pThreadInfo,
21
        SChildTable *childTbl,
22
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
23
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w);
24
TAOS_STMT2* initStmt2(TAOS* taos, bool single);        
25

26
#define FREE_PIDS_INFOS_RETURN_MINUS_1()            \
27
    do {                                            \
28
        tmfree(pids);                               \
29
        tmfree(infos);                              \
30
        return -1;                                  \
31
    } while (0)
32

33
#define FREE_RESOURCE()                             \
34
    do {                                            \
35
        if (pThreadInfo->conn)                      \
36
            closeBenchConn(pThreadInfo->conn);      \
37
        benchArrayDestroy(pThreadInfo->delayList);  \
38
        tmfree(pids);                               \
39
        tmfree(infos);                              \
40
    } while (0)                                     \
41

42
// REST
43
static int getSuperTableFromServerRest(
1✔
44
    SDataBase* database, SSuperTable* stbInfo, char *command) {
45

46
    // TODO(zero): it will create super table based on this error code.
47
    return TSDB_CODE_NOT_FOUND;
1✔
48
    // TODO(me): finish full implementation
49
#if 0
50
    int sockfd = createSockFd();
51
    if (sockfd < 0) {
52
        return -1;
53
    }
54

55
    int code = postProcessSql(command,
56
                         database->dbName,
57
                         database->precision,
58
                         REST_IFACE,
59
                         0,
60
                         g_arguments->port,
61
                         false,
62
                         sockfd,
63
                         NULL);
64

65
    destroySockFd(sockfd);
66
#endif   // 0
67
}
68

69
static int getSuperTableFromServerTaosc(
236✔
70
        SDataBase *database, SSuperTable *stbInfo, char *command) {
71
    TAOS_RES *res;
72
    TAOS_ROW row = NULL;
236✔
73
    SBenchConn *conn = initBenchConn();
236✔
74
    if (NULL == conn) {
236!
75
        return TSDB_CODE_FAILED;
×
76
    }
77

78
    res = taos_query(conn->taos, command);
236✔
79
    int32_t code = taos_errno(res);
236✔
80
    if (code != 0) {
236✔
81
        infoPrint("stable %s does not exist, will create one\n",
229✔
82
                  stbInfo->stbName);
83
        closeBenchConn(conn);
229✔
84
        return TSDB_CODE_NOT_FOUND;
229✔
85
    }
86
    infoPrint("find stable<%s>, will get meta data from server\n",
7✔
87
              stbInfo->stbName);
88

89
    // Check if the the existing super table matches exactly with the definitions in the JSON file.
90
    // If a hash table were used, the time complexity would be only O(n).
91
    // But taosBenchmark does not incorporate a hash table, the time complexity of the loop traversal is O(n^2).
92
    bool isTitleRow = true;
7✔
93
    uint32_t tag_count = 0;
7✔
94
    uint32_t col_count = 0;
7✔
95

96
    int fieldsNum = taos_num_fields(res);
7✔
97
    TAOS_FIELD_E* fields = taos_fetch_fields_e(res);
7✔
98

99
    if (fieldsNum < TSDB_MAX_DESCRIBE_METRIC || !fields) {
7!
100
        errorPrint("%s", "failed to fetch fields\n");
×
101
        taos_free_result(res);
×
102
        closeBenchConn(conn);
×
103
        return TSDB_CODE_FAILED;
×
104
    }
105

106
    while ((row = taos_fetch_row(res)) != NULL) {
81✔
107
        if (isTitleRow) {
74✔
108
            isTitleRow = false;
7✔
109
            continue;
7✔
110
        }
111
        int32_t *lengths = taos_fetch_lengths(res);
67✔
112
        if (lengths == NULL) {
67!
113
            errorPrint("%s", "failed to execute taos_fetch_length\n");
×
114
            taos_free_result(res);
×
115
            closeBenchConn(conn);
×
116
            return TSDB_CODE_FAILED;
×
117
        }
118
        if (strncasecmp((char *) row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "tag",
67✔
119
                        strlen("tag")) == 0) {
120
            if (stbInfo->tags == NULL || stbInfo->tags->size == 0 || tag_count >= stbInfo->tags->size) {
9!
121
                errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
122
                taos_free_result(res);
×
123
                closeBenchConn(conn);
×
124
                return TSDB_CODE_FAILED;
×
125
            }
126
            uint8_t tagType = convertStringToDatatype(
18✔
127
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
9✔
128
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
9✔
129
            char *tagName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
9✔
130
            if (!searchBArray(stbInfo->tags, tagName,
9!
131
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], tagType)) {
132
                errorPrint("existing stable tag:%s not found in JSON tags.\n", tagName);
×
133
                taos_free_result(res);
×
134
                closeBenchConn(conn);
×
135
                return TSDB_CODE_FAILED;
×
136
            }
137
            tag_count += 1;
9✔
138
        } else {
139
            if (stbInfo->cols == NULL || stbInfo->cols->size == 0 || col_count >= stbInfo->cols->size) {
58!
140
                errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
141
                taos_free_result(res);
×
142
                closeBenchConn(conn);
×
143
                return TSDB_CODE_FAILED;
×
144
            }
145
            uint8_t colType = convertStringToDatatype(
116✔
146
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
58✔
147
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
58✔
148
            char * colName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
58✔
149
            if (!searchBArray(stbInfo->cols, colName,
58!
150
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], colType)) {
151
                errorPrint("existing stable col:%s not found in JSON cols.\n", colName);
×
152
                taos_free_result(res);
×
153
                closeBenchConn(conn);
×
154
                return TSDB_CODE_FAILED;
×
155
            }
156
            col_count += 1;
58✔
157
        }
158
    }  // end while
159
    taos_free_result(res);
7✔
160
    closeBenchConn(conn);
7✔
161
    if (tag_count != stbInfo->tags->size) {
7!
162
        errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
163
        return TSDB_CODE_FAILED;
×
164
    }
165

166
    if (col_count != stbInfo->cols->size) {
7!
167
        errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
168
        return TSDB_CODE_FAILED;
×
169
    }
170

171
    return TSDB_CODE_SUCCESS;
7✔
172
}
173

174

175
static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) {
237✔
176
    int ret = 0;
237✔
177
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
237✔
178
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
237✔
179
             "DESCRIBE `%s`.`%s`", database->dbName,
180
             stbInfo->stbName);
181

182
    if (REST_IFACE == stbInfo->iface) {
237✔
183
        ret = getSuperTableFromServerRest(database, stbInfo, command);
1✔
184
    } else {
185
        ret = getSuperTableFromServerTaosc(database, stbInfo, command);
236✔
186
    }
187

188
    return ret;
237✔
189
}
190

191
static int queryDbExec(SDataBase *database,
296✔
192
                       SSuperTable *stbInfo, char *command) {
193
    int ret = 0;
296✔
194
    if (isRest(stbInfo->iface)) {
296✔
195
        if (0 != convertServAddr(stbInfo->iface, false, 1)) {
1!
196
            errorPrint("%s", "Failed to convert server address\n");
×
197
            return -1;
×
198
        }
199
        int sockfd = createSockFd();
1✔
200
        if (sockfd < 0) {
1!
201
            ret = -1;
×
202
        } else {
203
            ret = queryDbExecRest(command,
1✔
204
                              database->dbName,
205
                              database->precision,
206
                              stbInfo->iface,
1✔
207
                              stbInfo->lineProtocol,
1✔
208
                              stbInfo->tcpTransfer,
1✔
209
                              sockfd);
210
            destroySockFd(sockfd);
1✔
211
        }
212
    } else {
213
        SBenchConn* conn = initBenchConn();
295✔
214
        if (NULL == conn) {
295!
215
            ret = -1;
×
216
        } else {
217
            ret = queryDbExecCall(conn, command);
295✔
218
            int32_t trying = g_arguments->keep_trying;
295✔
219
            while (ret && trying) {
295!
220
                infoPrint("will sleep %"PRIu32" milliseconds then re-execute command: %s\n",
×
221
                          g_arguments->trying_interval, command);
222
                toolsMsleep(g_arguments->trying_interval);
×
223
                ret = queryDbExecCall(conn, command);
×
224
                if (trying != -1) {
×
225
                    trying--;
×
226
                }
227
            }
228
            if (0 != ret) {
295!
229
                ret = -1;
×
230
            }
231
            closeBenchConn(conn);
295✔
232
        }
233
    }
234

235
    return ret;
296✔
236
}
237

238
int getCompressStr(Field* col, char* buf) {
49,150✔
239
    int pos = 0;
49,150✔
240
    if(strlen(col->encode) > 0) {
49,150✔
241
        pos +=sprintf(buf + pos, "encode \'%s\' ", col->encode);
713✔
242
    }
243
    if(strlen(col->compress) > 0) {
49,150✔
244
        pos +=sprintf(buf + pos, "compress \'%s\' ", col->compress);
721✔
245
    }
246
    if(strlen(col->level) > 0) {
49,150✔
247
        pos +=sprintf(buf + pos, "level \'%s\' ", col->level);
717✔
248
    }
249

250
    return pos;
49,150✔
251
}
252

253
static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
237✔
254
    if (g_arguments->supplementInsert) {
237✔
255
        return 0;
1✔
256
    }
257

258
    uint32_t col_buffer_len = (TSDB_COL_NAME_LEN + 15 + COMP_NAME_LEN*3) * stbInfo->cols->size;
236✔
259
    char         *colsBuf = benchCalloc(1, col_buffer_len, false);
236✔
260
    char*         command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
236✔
261
    int          len = 0;
236✔
262

263
    for (int colIndex = 0; colIndex < stbInfo->cols->size; colIndex++) {
49,386✔
264
        Field * col = benchArrayGet(stbInfo->cols, colIndex);
49,150✔
265
        int n;
266
        if (col->type == TSDB_DATA_TYPE_BINARY ||
49,150✔
267
            col->type == TSDB_DATA_TYPE_NCHAR ||
48,214✔
268
            col->type == TSDB_DATA_TYPE_VARBINARY ||
47,546✔
269
            col->type == TSDB_DATA_TYPE_GEOMETRY) {
47,541✔
270
            n = snprintf(colsBuf + len, col_buffer_len - len,
3,228✔
271
                    ",%s %s(%d)", col->name,
1,614✔
272
                    convertDatatypeToString(col->type), col->length);
1,614✔
273
            if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
1,614!
274
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
275
                           colIndex);
276
                return -1;
×
277
            }
278
        } else if (col->type == TSDB_DATA_TYPE_DECIMAL
47,536✔
279
                || col->type == TSDB_DATA_TYPE_DECIMAL64) {
47,526✔
280
            n = snprintf(colsBuf + len, col_buffer_len - len,
40✔
281
                    ",%s %s(%d,%d)", col->name,
20✔
282
                    convertDatatypeToString(col->type), col->precision, col->scale);
20✔
283
        } else {
284
            n = snprintf(colsBuf + len, col_buffer_len - len,
95,032✔
285
                    ",%s %s", col->name,
47,516✔
286
                    convertDatatypeToString(col->type));
47,516✔
287
        }
288

289
        // primary key
290
        if(stbInfo->primary_key && colIndex == 0) {
49,150✔
291
            len += n;
5✔
292
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", PRIMARY_KEY);
5✔
293
        }
294

295
        // compress key
296
        char keys[COMP_NAME_LEN*3] = "";
49,150✔
297
        if (getCompressStr(col, keys) > 0) {
49,150✔
298
            len += n;
729✔
299
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", keys);
729✔
300
        }
301

302
        if (n < 0 || n >= col_buffer_len - len) {
49,150!
303
            errorPrint("%s() LN%d, snprintf overflow on %d\n",
×
304
                       __func__, __LINE__, colIndex);
305
            break;
×
306
        } else {
307
            len += n;
49,150✔
308
        }
309
    }
310

311
    // save for creating child table
312
    stbInfo->colsOfCreateChildTable =
236✔
313
        (char *)benchCalloc(len + TIMESTAMP_BUFF_LEN, 1, true);
236✔
314

315
    snprintf(stbInfo->colsOfCreateChildTable, len + TIMESTAMP_BUFF_LEN,
236✔
316
             "(ts timestamp%s)", colsBuf);
317

318
    if (stbInfo->tags->size == 0) {
236✔
319
        free(colsBuf);
4✔
320
        free(command);
4✔
321
        return 0;
4✔
322
    }
323

324
    uint32_t tag_buffer_len = (TSDB_COL_NAME_LEN + 15) * stbInfo->tags->size;
232✔
325
    char *tagsBuf = benchCalloc(1, tag_buffer_len, false);
232✔
326
    int  tagIndex;
327
    len = 0;
232✔
328

329
    int n;
330
    n = snprintf(tagsBuf + len, tag_buffer_len - len, "(");
232✔
331
    if (n < 0 || n >= tag_buffer_len - len) {
232!
332
        errorPrint("%s() LN%d snprintf overflow\n",
×
333
                       __func__, __LINE__);
334
        free(colsBuf);
×
335
        free(command);
×
336
        tmfree(tagsBuf);
×
337
        return -1;
×
338
    } else {
339
        len += n;
232✔
340
    }
341
    for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
1,228✔
342
        Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
997✔
343
        if (tag->type == TSDB_DATA_TYPE_BINARY ||
997✔
344
            tag->type == TSDB_DATA_TYPE_NCHAR ||
695✔
345
            tag->type == TSDB_DATA_TYPE_VARBINARY ||
652✔
346
            tag->type == TSDB_DATA_TYPE_GEOMETRY) {
647✔
347
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
710✔
348
                    "%s %s(%d),", tag->name,
355✔
349
                    convertDatatypeToString(tag->type), tag->length);
355✔
350
            if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
355!
351
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
352
                           tagIndex);
353
                return -1;
×
354
            }
355
        } else if (tag->type == TSDB_DATA_TYPE_JSON) {
642✔
356
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
1✔
357
                    "%s json", tag->name);
1✔
358
            if (n < 0 || n >= tag_buffer_len - len) {
1!
359
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
360
                       __func__, __LINE__, tagIndex);
361
                break;
×
362
            } else {
363
                len += n;
1✔
364
            }
365
            goto skip;
1✔
366
        }
367
        // else if (tag->type == TSDB_DATA_TYPE_DECIMAL
368
        //         || tag->type == TSDB_DATA_TYPE_DECIMAL64) {
369
        //     n = snprintf(tagsBuf + len, tag_buffer_len - len,
370
        //             "%s %s(%d,%d),", tag->name,
371
        //             convertDatatypeToString(tag->type), tag->precision, tag->scale);
372
        // }
373
        else {
374
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
1,282✔
375
                    "%s %s,", tag->name,
641✔
376
                    convertDatatypeToString(tag->type));
641✔
377
        }
378

379
        if (n < 0 || n >= tag_buffer_len - len) {
996!
380
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
381
                       __func__, __LINE__, tagIndex);
382
            break;
×
383
        } else {
384
            len += n;
996✔
385
        }
386
    }
387
    len -= 1;
231✔
388
skip:
232✔
389
    snprintf(tagsBuf + len, tag_buffer_len - len, ")");
232✔
390

391
    int length = snprintf(
232✔
392
        command, TSDB_MAX_ALLOWED_SQL_LEN,
393
        g_arguments->escape_character
232✔
394
            ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` (ts TIMESTAMP%s) TAGS %s"
395
            : "CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s",
396
        database->dbName, stbInfo->stbName, colsBuf, tagsBuf);
397
    tmfree(colsBuf);
232✔
398
    tmfree(tagsBuf);
232✔
399
    if (stbInfo->comment != NULL) {
232!
400
        length += snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length,
×
401
                           " COMMENT '%s'", stbInfo->comment);
402
    }
403
    if (stbInfo->delay >= 0) {
232!
404
        length += snprintf(command + length,
×
405
                           TSDB_MAX_ALLOWED_SQL_LEN - length, " DELAY %d",
×
406
                           stbInfo->delay);
407
    }
408
    if (stbInfo->file_factor >= 0) {
232!
409
        length +=
×
410
            snprintf(command + length,
×
411
                     TSDB_MAX_ALLOWED_SQL_LEN - length, " FILE_FACTOR %f",
×
412
                     (float)stbInfo->file_factor / 100);
×
413
    }
414
    if (stbInfo->rollup != NULL) {
232!
415
        length += snprintf(command + length,
×
416
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
417
                           " ROLLUP(%s)", stbInfo->rollup);
418
    }
419

420
    if (stbInfo->max_delay != NULL) {
232!
421
        length += snprintf(command + length,
×
422
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
423
                " MAX_DELAY %s", stbInfo->max_delay);
424
    }
425

426
    if (stbInfo->watermark != NULL) {
232!
427
        length += snprintf(command + length,
×
428
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
429
                " WATERMARK %s", stbInfo->watermark);
430
    }
431

432
    // not support ttl in super table
433
    /*
434
    if (stbInfo->ttl != 0) {
435
        length += snprintf(command + length,
436
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
437
                " TTL %d", stbInfo->ttl);
438
    }
439
    */
440

441
    bool first_sma = true;
232✔
442
    for (int i = 0; i < stbInfo->cols->size; i++) {
44,258✔
443
        Field * col = benchArrayGet(stbInfo->cols, i);
44,026✔
444
        if (col->sma) {
44,026!
445
            if (first_sma) {
×
446
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, " SMA(%s", col->name);
×
447
                first_sma = false;
×
448
            } else {
449
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ",%s", col->name);
×
450
            }
451

452
            if (n < 0 || n > TSDB_MAX_ALLOWED_SQL_LEN - length) {
×
453
                errorPrint("%s() LN%d snprintf overflow on %d iteral\n", __func__, __LINE__, i);
×
454
                break;
×
455
            } else {
456
                length += n;
×
457
            }
458
        }
459
    }
460
    if (!first_sma) {
232!
461
        snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ")");
×
462
    }
463
    infoPrint("create stable: <%s>\n", command);
232✔
464

465
    int ret = queryDbExec(database, stbInfo, command);
232✔
466
    free(command);
232✔
467
    return ret;
232✔
468
}
469

470

471
int32_t getVgroupsNative(SBenchConn *conn, SDataBase *database) {
4✔
472
    int     vgroups = 0;
4✔
473
    char    cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
4✔
474
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
4✔
475
            g_arguments->escape_character
4✔
476
            ? "SHOW `%s`.VGROUPS"
477
            : "SHOW %s.VGROUPS",
478
            database->dbName);
479

480
    TAOS_RES* res = taos_query(conn->taos, cmd);
4✔
481
    int code = taos_errno(res);
4✔
482
    if (code) {
4!
483
        printErrCmdCodeStr(cmd, code, res);
×
484
        return -1;
×
485
    }
486

487
    TAOS_ROW row = NULL;
4✔
488
    while ((row = taos_fetch_row(res)) != NULL) {
18✔
489
        vgroups++;
14✔
490
    }
491
    debugPrint("%s() LN%d, vgroups: %d\n", __func__, __LINE__, vgroups);
4!
492
    taos_free_result(res);
4✔
493

494
    database->vgroups = vgroups;
4✔
495
    database->vgArray = benchArrayInit(vgroups, sizeof(SVGroup));
4✔
496
    for (int32_t v = 0; (v < vgroups
4✔
497
            && !g_arguments->terminate); v++) {
18!
498
        SVGroup *vg = benchCalloc(1, sizeof(SVGroup), true);
14✔
499
        benchArrayPush(database->vgArray, vg);
14✔
500
    }
501

502
    res = taos_query(conn->taos, cmd);
4✔
503
    code = taos_errno(res);
4✔
504
    if (code) {
4!
505
        printErrCmdCodeStr(cmd, code, res);
×
506
        return -1;
×
507
    }
508

509
    int32_t vgItem = 0;
4✔
510
    while (((row = taos_fetch_row(res)) != NULL)
18✔
511
            && !g_arguments->terminate) {
18!
512
        SVGroup *vg = benchArrayGet(database->vgArray, vgItem);
14✔
513
        vg->vgId = *(int32_t*)row[0];
14✔
514
        vgItem++;
14✔
515
    }
516
    taos_free_result(res);
4✔
517

518
    return vgroups;
4✔
519
}
520

521
int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) {
207✔
522
    int dataLen = 0;
207✔
523
    int n;
524

525
    // create database
526
    n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
207✔
527
                g_arguments->escape_character
207✔
528
                    ? "CREATE DATABASE IF NOT EXISTS `%s`"
529
                    : "CREATE DATABASE IF NOT EXISTS %s",
530
                    database->dbName);
531

532
    if (n < 0 || n >= SHORT_1K_SQL_BUFF_LEN - dataLen) {
207!
533
        errorPrint("%s() LN%d snprintf overflow\n",
×
534
                           __func__, __LINE__);
535
        return -1;
×
536
    } else {
537
        dataLen += n;
207✔
538
    }
539

540
    int vgroups = g_arguments->inputted_vgroups;
207✔
541

542
    // append config items
543
    if (database->cfgs) {
207!
544
        for (int i = 0; i < database->cfgs->size; i++) {
473✔
545
            SDbCfg* cfg = benchArrayGet(database->cfgs, i);
266✔
546

547
            // check vgroups
548
            if (trimCaseCmp(cfg->name, "vgroups") == 0) {
266✔
549
                if (vgroups > 0) {
58✔
550
                    // inputted vgroups by commandline
551
                    infoPrint("ignore config set vgroups %d\n", cfg->valueint);
1✔
552
                } else {
553
                    vgroups = cfg->valueint;
57✔
554
                }
555
                continue;
58✔
556
            }
557

558
            if (cfg->valuestring) {
208✔
559
                n = snprintf(command + dataLen,
51✔
560
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
51✔
561
                            " %s %s", cfg->name, cfg->valuestring);
562
            } else {
563
                n = snprintf(command + dataLen,
157✔
564
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
157✔
565
                            " %s %d", cfg->name, cfg->valueint);
566
            }
567
            if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - dataLen) {
208!
568
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
569
                           __func__, __LINE__, i);
570
                break;
×
571
            } else {
572
                dataLen += n;
208✔
573
            }
574
        }
575
    }
576

577
    // not found vgroups
578
    if (vgroups > 0) {
207✔
579
        dataLen += snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen, " VGROUPS %d", vgroups);
83✔
580
    }
581

582
    switch (database->precision) {
207!
583
        case TSDB_TIME_PRECISION_MILLI:
197✔
584
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
197✔
585
                                " PRECISION \'ms\';");
586
            break;
197✔
587
        case TSDB_TIME_PRECISION_MICRO:
5✔
588
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
5✔
589
                                " PRECISION \'us\';");
590
            break;
5✔
591
        case TSDB_TIME_PRECISION_NANO:
5✔
592
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
5✔
593
                                " PRECISION \'ns\';");
594
            break;
5✔
595
    }
596

597
    return dataLen;
207✔
598
}
599

600
int createDatabaseRest(SDataBase* database) {
8✔
601
    int32_t code = 0;
8✔
602
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
8✔
603

604
    int sockfd = createSockFd();
8✔
605
    if (sockfd < 0) {
8✔
606
        return -1;
1✔
607
    }
608

609
    // drop exist database
610
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
7✔
611
            g_arguments->escape_character
7!
612
                ? "DROP DATABASE IF EXISTS `%s`;"
613
                : "DROP DATABASE IF EXISTS %s;",
614
             database->dbName);
615
    code = postProcessSql(command,
7✔
616
                        database->dbName,
617
                        database->precision,
618
                        REST_IFACE,
619
                        0,
620
                        g_arguments->port,
7✔
621
                        false,
622
                        sockfd,
623
                        NULL);
624
    if (code != 0) {
7!
625
        errorPrint("Failed to drop database %s\n", database->dbName);
×
626
    }
627

628
    // create database
629
    int remainVnodes = INT_MAX;
7✔
630
    geneDbCreateCmd(database, command, remainVnodes);
7✔
631
    code = postProcessSql(command,
7✔
632
                        database->dbName,
633
                        database->precision,
634
                        REST_IFACE,
635
                        0,
636
                        g_arguments->port,
7✔
637
                        false,
638
                        sockfd,
639
                        NULL);
640
    int32_t trying = g_arguments->keep_trying;
7✔
641
    while (code && trying) {
7!
642
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
643
                "re-create database %s\n",
644
                g_arguments->trying_interval, database->dbName);
645
        toolsMsleep(g_arguments->trying_interval);
×
646
        code = postProcessSql(command,
×
647
                        database->dbName,
648
                        database->precision,
649
                        REST_IFACE,
650
                        0,
651
                        g_arguments->port,
×
652
                        false,
653
                        sockfd,
654
                        NULL);
655
        if (trying != -1) {
×
656
            trying--;
×
657
        }
658
    }
659

660
    destroySockFd(sockfd);
7✔
661
    return code;
7✔
662
}
663

664
int32_t getRemainVnodes(SBenchConn *conn) {
2✔
665
    int remainVnodes = 0;
2✔
666
    char command[SHORT_1K_SQL_BUFF_LEN] = "SHOW DNODES";
2✔
667

668
    TAOS_RES *res = taos_query(conn->taos, command);
2✔
669
    int32_t   code = taos_errno(res);
2✔
670
    if (code) {
2!
671
        printErrCmdCodeStr(command, code, res);
×
672
        closeBenchConn(conn);
×
673
        return -1;
×
674
    }
675
    TAOS_ROW row = NULL;
2✔
676
    while ((row = taos_fetch_row(res)) != NULL) {
4✔
677
        remainVnodes += (*(int16_t*)(row[3]) - *(int16_t*)(row[2]));
2✔
678
    }
679
    debugPrint("%s() LN%d, remainVnodes: %d\n",
2!
680
               __func__, __LINE__, remainVnodes);
681
    taos_free_result(res);
2✔
682
    return remainVnodes;
2✔
683
}
684

685
int createDatabaseTaosc(SDataBase* database) {
203✔
686
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
203✔
687
    // conn
688
    SBenchConn* conn = initBenchConn();
203✔
689
    if (NULL == conn) {
203✔
690
        return -1;
3✔
691
    }
692

693
    // drop stream in old database
694
    for (int i = 0; i < g_arguments->streams->size; i++) {
206✔
695
        SSTREAM* stream = benchArrayGet(g_arguments->streams, i);
6✔
696
        if (stream->drop) {
6!
697
            snprintf(command, SHORT_1K_SQL_BUFF_LEN,
6✔
698
                        "DROP STREAM IF EXISTS %s;",
699
                    stream->stream_name);
6✔
700
            if (queryDbExecCall(conn, command)) {
6!
701
                closeBenchConn(conn);
×
702
                return -1;
×
703
            }
704
            infoPrint("%s\n", command);
6✔
705
            memset(command, 0, SHORT_1K_SQL_BUFF_LEN);
6✔
706
        }
707
    }
708

709
    // drop old database
710
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
200✔
711
            g_arguments->escape_character
200✔
712
                ? "DROP DATABASE IF EXISTS `%s`;":
713
            "DROP DATABASE IF EXISTS %s;",
714
             database->dbName);
715
    if (0 != queryDbExecCall(conn, command)) {
200!
716
        if (g_arguments->dsn) {
×
717
            // websocket
718
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
719
                      "to drop database! DROP DATABASE failure is ignored!\n");
720
        }
721
        closeBenchConn(conn);
×
722
        return -1;
×
723
    }
724

725
    // get remain vgroups
726
    int remainVnodes = INT_MAX;
200✔
727
    if (g_arguments->bind_vgroup) {
200✔
728
        remainVnodes = getRemainVnodes(conn);
2✔
729
        if (0 >= remainVnodes) {
2!
730
            errorPrint("Remain vnodes %d, failed to create database\n",
×
731
                       remainVnodes);
732
            return -1;
×
733
        }
734
    }
735

736
    // generate and execute create database sql
737
    geneDbCreateCmd(database, command, remainVnodes);
200✔
738
    int32_t code = queryDbExecCall(conn, command);
200✔
739
    int32_t trying = g_arguments->keep_trying;
200✔
740
    while (code && trying) {
200!
741
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
742
                  "re-create database %s\n",
743
                  g_arguments->trying_interval, database->dbName);
744
        toolsMsleep(g_arguments->trying_interval);
×
745
        code = queryDbExecCall(conn, command);
×
746
        if (trying != -1) {
×
747
            trying--;
×
748
        }
749
    }
750

751
    if (code) {
200✔
752
        if (g_arguments->dsn) {
3!
753
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
754
                      "to create database! CREATE DATABASE "
755
                      "failure is ignored!\n");
756
        } 
757

758
        closeBenchConn(conn);
3✔
759
        errorPrint("\ncreate database %s failed!\n\n", database->dbName);
3!
760
        return -1;
3✔
761
    }
762
    infoPrint("command to create database: <%s>\n", command);
197✔
763

764

765
    // malloc and get vgroup
766
    if (g_arguments->bind_vgroup) {
197✔
767
        int32_t vgroups;
768
        vgroups = getVgroupsNative(conn, database);
2✔
769
        if (vgroups <= 0) {
2!
770
            closeBenchConn(conn);
×
771
            errorPrint("Database %s's vgroups is %d\n",
×
772
                        database->dbName, vgroups);
773
            return -1;
×
774
        }
775
    }
776

777
    closeBenchConn(conn);
197✔
778
    return 0;
197✔
779
}
780

781
int createDatabase(SDataBase* database) {
211✔
782
    int ret = 0;
211✔
783
    if (isRest(g_arguments->iface)) {
211✔
784
        ret = createDatabaseRest(database);
8✔
785
    } else {
786
        ret = createDatabaseTaosc(database);
203✔
787
    }
788
    return ret;
211✔
789
}
790

791
static int generateChildTblName(int len, char *buffer, SDataBase *database,
72,855✔
792
                                SSuperTable *stbInfo, uint64_t tableSeq, char* tagData, int i,
793
                                char *ttl) {
794
    if (0 == len) {
72,855✔
795
        memset(buffer, 0, TSDB_MAX_ALLOWED_SQL_LEN);
55,482✔
796
        len += snprintf(buffer + len,
55,482✔
797
                        TSDB_MAX_ALLOWED_SQL_LEN - len, "CREATE TABLE");
55,482✔
798
    }
799

800
    len += snprintf(
72,855✔
801
            buffer + len, TSDB_MAX_ALLOWED_SQL_LEN - len,
72,855✔
802
            g_arguments->escape_character
72,855✔
803
            ? " IF NOT EXISTS `%s`.`%s%" PRIu64 "` USING `%s`.`%s` TAGS (%s) %s "
804
            : " IF NOT EXISTS %s.%s%" PRIu64 " USING %s.%s TAGS (%s) %s ",
805
            database->dbName, stbInfo->childTblPrefix, tableSeq, database->dbName,
806
            stbInfo->stbName,
807
            tagData + i * stbInfo->lenOfTags, ttl);
72,855✔
808

809
    return len;
72,855✔
810
}
811

812
static int getBatchOfTblCreating(threadInfo *pThreadInfo,
72,867✔
813
                                         SSuperTable *stbInfo) {
814
    BArray *batchArray = stbInfo->batchTblCreatingNumbersArray;
72,867✔
815
    if (batchArray) {
72,867✔
816
        int *batch = benchArrayGet(
20✔
817
                batchArray, pThreadInfo->posOfTblCreatingBatch);
20✔
818
        pThreadInfo->posOfTblCreatingBatch++;
20✔
819
        if (pThreadInfo->posOfTblCreatingBatch == batchArray->size) {
20✔
820
            pThreadInfo->posOfTblCreatingBatch = 0;
4✔
821
        }
822
        return *batch;
20✔
823
    }
824
    return 0;
72,847✔
825
}
826

827
static int getIntervalOfTblCreating(threadInfo *pThreadInfo,
54,671✔
828
                                         SSuperTable *stbInfo) {
829
    BArray *intervalArray = stbInfo->batchTblCreatingIntervalsArray;
54,671✔
830
    if (intervalArray) {
54,671✔
831
        int *interval = benchArrayGet(
8✔
832
                intervalArray, pThreadInfo->posOfTblCreatingInterval);
8✔
833
        pThreadInfo->posOfTblCreatingInterval++;
8✔
834
        if (pThreadInfo->posOfTblCreatingInterval == intervalArray->size) {
8✔
835
            pThreadInfo->posOfTblCreatingInterval = 0;
2✔
836
        }
837
        return *interval;
8✔
838
    }
839
    return 0;
54,663✔
840
}
841

842
// table create thread
843
static void *createTable(void *sarg) {
1,255✔
844
    if (g_arguments->supplementInsert) {
1,255✔
845
        return NULL;
1✔
846
    }
847

848
    threadInfo  *pThreadInfo    = (threadInfo *)sarg;
1,254✔
849
    SDataBase   *database       = pThreadInfo->dbInfo;
1,254✔
850
    SSuperTable *stbInfo        = pThreadInfo->stbInfo;
1,254✔
851
    uint64_t    lastTotalCreate = 0;
1,254✔
852
    uint64_t    lastPrintTime   = toolsGetTimestampMs();
1,254✔
853
    int32_t     len             = 0;
1,254✔
854
    int32_t     batchNum        = 0;
1,254✔
855
    char ttl[SMALL_BUFF_LEN]    = "";
1,254✔
856

857
#ifdef LINUX
858
    prctl(PR_SET_NAME, "createTable");
1,254✔
859
#endif
860
    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
1,254✔
861
    infoPrint(
1,254✔
862
              "thread[%d] start creating table from %" PRIu64 " to %" PRIu64
863
              "\n",
864
              pThreadInfo->threadID, pThreadInfo->start_table_from,
865
              pThreadInfo->end_table_to);
866
    if (stbInfo->ttl != 0) {
1,254✔
867
        snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
20✔
868
    }
869

870
    // tag read from csv
871
    FILE *csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
1,254✔
872
    // malloc
873
    char* tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
1,254✔
874
    int         w = 0; // record tagData
1,251✔
875

876
    int smallBatchCount = 0;
1,251✔
877
    int index=  pThreadInfo->start_table_from;
1,251✔
878
    for (uint64_t i = pThreadInfo->start_table_from;
1,251✔
879
                  i <= pThreadInfo->end_table_to && !g_arguments->terminate;
74,110✔
880
                  i++) {
72,859✔
881
        if (g_arguments->terminate) {
72,856!
882
            goto create_table_end;
×
883
        }
884
        if (!stbInfo->use_metric || stbInfo->tags->size == 0) {
72,856!
885
            if (stbInfo->childTblCount == 1) {
×
886
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
4✔
887
                         g_arguments->escape_character
4✔
888
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
889
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
890
                         database->dbName, stbInfo->stbName,
891
                         stbInfo->colsOfCreateChildTable);
892
            } else {
893
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
×
894
                         g_arguments->escape_character
×
895
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
896
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
897
                         database->dbName,
898
                         stbInfo->childTblArray[i]->name,
×
899
                         stbInfo->colsOfCreateChildTable);
900
            }
901
            batchNum++;
×
902
        } else {
903
            if (0 == len) {
72,870✔
904
                batchNum = 0;
55,500✔
905
            }
906
            // generator
907
            if (w == 0) {
72,870✔
908
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
1,823!
909
                    goto create_table_end;
×
910
                }
911
            }
912

913
            len = generateChildTblName(len, pThreadInfo->buffer,
72,870✔
914
                                       database, stbInfo, i, tagData, w, ttl);
915
            // move next
916
            if (++w >= TAG_BATCH_COUNT) {
72,869✔
917
                // reset for gen again
918
                w = 0;
579✔
919
                index += TAG_BATCH_COUNT;
579✔
920
            }                           
921

922
            batchNum++;
72,869✔
923
            smallBatchCount++;
72,869✔
924

925
            int smallBatch = getBatchOfTblCreating(pThreadInfo, stbInfo);
72,869✔
926
            if ((!smallBatch || (smallBatchCount == smallBatch))
72,877✔
927
                    && (batchNum < stbInfo->batchTblCreatingNum)
72,869✔
928
                    && ((TSDB_MAX_ALLOWED_SQL_LEN - len) >=
18,206✔
929
                        (stbInfo->lenOfTags + EXTRA_SQL_LEN))) {
18,206!
930
                continue;
18,207✔
931
            } else {
932
                smallBatchCount = 0;
54,670✔
933
            }
934
        }
935

936
        len = 0;
54,656✔
937

938
        int ret = 0;
54,656✔
939
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
54,656✔
940
                   pThreadInfo->buffer);
941
        // REST
942
        if (REST_IFACE == stbInfo->iface) {
54,643!
943
            ret = queryDbExecRest(pThreadInfo->buffer,
×
944
                                  database->dbName,
945
                                  database->precision,
946
                                  stbInfo->iface,
×
947
                                  stbInfo->lineProtocol,
×
948
                                  stbInfo->tcpTransfer,
×
949
                                  pThreadInfo->sockfd);
950
        } else {
951
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
54,643✔
952
            int32_t trying = g_arguments->keep_trying;
54,665✔
953
            while (ret && trying) {
54,665!
954
                infoPrint("will sleep %"PRIu32" milliseconds then re-create "
×
955
                          "table %s\n",
956
                          g_arguments->trying_interval, pThreadInfo->buffer);
957
                toolsMsleep(g_arguments->trying_interval);
×
958
                ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
959
                if (trying != -1) {
×
960
                    trying--;
×
961
                }
962
            }
963
        }
964

965
        if (0 != ret) {
54,689!
966
            g_fail = true;
×
967
            goto create_table_end;
×
968
        }
969
        uint64_t intervalOfTblCreating = getIntervalOfTblCreating(pThreadInfo,
54,689✔
970
                                                                  stbInfo);
971
        if (intervalOfTblCreating) {
54,683✔
972
            debugPrint("will sleep %"PRIu64" milliseconds "
8!
973
                       "for table creating interval\n", intervalOfTblCreating);
974
            toolsMsleep(intervalOfTblCreating);
8✔
975
        }
976

977
        pThreadInfo->tables_created += batchNum;
54,683✔
978
        batchNum = 0;
54,683✔
979
        uint64_t currentPrintTime = toolsGetTimestampMs();
54,683✔
980
        if (currentPrintTime - lastPrintTime > PRINT_STAT_INTERVAL) {
54,666!
981
            float speed = (pThreadInfo->tables_created - lastTotalCreate) * 1000 / (currentPrintTime - lastPrintTime);
×
982
            infoPrint("thread[%d] already created %" PRId64 " tables, peroid speed: %.0f tables/s\n",
×
983
                       pThreadInfo->threadID, pThreadInfo->tables_created, speed);
984
            lastPrintTime   = currentPrintTime;
×
985
            lastTotalCreate = pThreadInfo->tables_created;
×
986
        }
987
    }
988

989
    if (0 != len) {
1,254✔
990
        int ret = 0;
815✔
991
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
815✔
992
                   pThreadInfo->buffer);
993
        // REST
994
        if (REST_IFACE == stbInfo->iface) {
815✔
995
            ret = queryDbExecRest(pThreadInfo->buffer,
8✔
996
                                  database->dbName,
997
                                  database->precision,
998
                                  stbInfo->iface,
8✔
999
                                  stbInfo->lineProtocol,
8✔
1000
                                  stbInfo->tcpTransfer,
8✔
1001
                                  pThreadInfo->sockfd);
1002
        } else {
1003
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
807✔
1004
        }
1005
        if (0 != ret) {
815✔
1006
            g_fail = true;
2✔
1007
            goto create_table_end;
2✔
1008
        }
1009
        pThreadInfo->tables_created += batchNum;
813✔
1010
        debugPrint("thread[%d] already created %" PRId64 " tables\n",
813✔
1011
                   pThreadInfo->threadID, pThreadInfo->tables_created);
1012
    }
1013
create_table_end:
1,250✔
1014
    // free
1015
    tmfree(tagData);
1,254✔
1016
    tmfree(pThreadInfo->buffer);
1,254✔
1017
    pThreadInfo->buffer = NULL;
1,254✔
1018
    if(csvFile) {
1,254✔
1019
        fclose(csvFile);
16✔
1020
    }
1021
    return NULL;
1,254✔
1022
}
1023

1024
static int startMultiThreadCreateChildTable(SDataBase* database, SSuperTable* stbInfo) {
223✔
1025
    int32_t code    = -1;
223✔
1026
    int32_t threads = g_arguments->table_threads;
223✔
1027
    int64_t ntables;
1028
    if (stbInfo->childTblTo > 0) {
223✔
1029
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
1✔
1030
    } else if(stbInfo->childTblFrom > 0) {
222!
1031
        ntables = stbInfo->childTblCount - stbInfo->childTblFrom;
×
1032
    } else {
1033
        ntables = stbInfo->childTblCount;
222✔
1034
    }
1035
    pthread_t   *pids = benchCalloc(1, threads * sizeof(pthread_t), false);
223✔
1036
    threadInfo  *infos = benchCalloc(1, threads * sizeof(threadInfo), false);
223✔
1037
    uint64_t     tableFrom = stbInfo->childTblFrom;
223✔
1038
    if (threads < 1) {
223!
1039
        threads = 1;
×
1040
    }
1041
    if (ntables == 0) {
223✔
1042
        code = 0;
9✔
1043
        infoPrint("child table is zero, no need create. childTblCount: %"PRId64"\n", ntables);
9✔
1044
        goto over;
9✔
1045
    }
1046

1047
    int64_t div = ntables / threads;
214✔
1048
    if (div < 1) {
214✔
1049
        threads = (int)ntables;
73✔
1050
        div = 1;
73✔
1051
    }
1052
    int64_t mod = ntables % threads;
214✔
1053

1054
    int threadCnt = 0;
214✔
1055
    for (uint32_t i = 0; (i < threads && !g_arguments->terminate); i++) {
1,469!
1056
        threadInfo *pThreadInfo = infos + i;
1,255✔
1057
        pThreadInfo->threadID = i;
1,255✔
1058
        pThreadInfo->stbInfo = stbInfo;
1,255✔
1059
        pThreadInfo->dbInfo = database;
1,255✔
1060
        if (REST_IFACE == stbInfo->iface) {
1,255✔
1061
            int sockfd = createSockFd();
8✔
1062
            if (sockfd < 0) {
8!
1063
                FREE_PIDS_INFOS_RETURN_MINUS_1();
×
1064
            }
1065
            pThreadInfo->sockfd = sockfd;
8✔
1066
        } else {
1067
            pThreadInfo->conn = initBenchConn();
1,247✔
1068
            if (NULL == pThreadInfo->conn) {
1,247!
1069
                goto over;
×
1070
            }
1071
        }
1072
        pThreadInfo->start_table_from = tableFrom;
1,255✔
1073
        pThreadInfo->ntables          = i < mod ? div + 1 : div;
1,255✔
1074
        pThreadInfo->end_table_to     = i < mod ? tableFrom + div : tableFrom + div - 1;
1,255✔
1075
        tableFrom = pThreadInfo->end_table_to + 1;
1,255✔
1076
        pThreadInfo->tables_created = 0;
1,255✔
1077
        debugPrint("div table by thread. i=%d from=%"PRId64" to=%"PRId64" ntable=%"PRId64"\n", i, pThreadInfo->start_table_from,
1,255✔
1078
                                        pThreadInfo->end_table_to, pThreadInfo->ntables);
1079
        pthread_create(pids + i, NULL, createTable, pThreadInfo);
1,255✔
1080
        threadCnt ++;
1,255✔
1081
    }
1082

1083
    for (int i = 0; i < threadCnt; i++) {
1,469✔
1084
        pthread_join(pids[i], NULL);
1,255✔
1085
    }
1086

1087
    if (g_arguments->terminate)  toolsMsleep(100);
214!
1088

1089
    for (int i = 0; i < threadCnt; i++) {
1,469✔
1090
        threadInfo *pThreadInfo = infos + i;
1,255✔
1091
        g_arguments->actualChildTables += pThreadInfo->tables_created;
1,255✔
1092

1093
        if ((REST_IFACE != stbInfo->iface) && pThreadInfo->conn) {
1,255!
1094
            closeBenchConn(pThreadInfo->conn);
1,247✔
1095
        }
1096
    }
1097

1098
    if (g_fail) {
214✔
1099
        goto over;
1✔
1100
    }
1101
    code = 0;
213✔
1102
over:
223✔
1103
    free(pids);
223✔
1104
    free(infos);
223✔
1105
    return code;
223✔
1106
}
1107

1108
static int createChildTables() {
269✔
1109
    int32_t    code;
1110
    infoPrint("start creating %" PRId64 " table(s) with %d thread(s)\n",
269✔
1111
              g_arguments->totalChildTables, g_arguments->table_threads);
1112
    if (g_arguments->fpOfInsertResult) {
269!
1113
        infoPrintToFile(
269✔
1114
                  "start creating %" PRId64 " table(s) with %d thread(s)\n",
1115
                  g_arguments->totalChildTables, g_arguments->table_threads);
1116
    }
1117
    int64_t start = (double)toolsGetTimestampMs();
269✔
1118

1119
    for (int i = 0; (i < g_arguments->databases->size
269✔
1120
            && !g_arguments->terminate); i++) {
558!
1121
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
290✔
1122
        if (database->superTbls) {
290!
1123
            for (int j = 0; (j < database->superTbls->size
290✔
1124
                    && !g_arguments->terminate); j++) {
646!
1125
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
357✔
1126
                if (stbInfo->autoTblCreating || stbInfo->iface == SML_IFACE
357✔
1127
                    || stbInfo->iface == SML_REST_IFACE) {
288✔
1128
                    g_arguments->autoCreatedChildTables +=
83✔
1129
                            stbInfo->childTblCount;
83✔
1130
                    continue;
83✔
1131
                }
1132
                if (stbInfo->childTblExists) {
274✔
1133
                    g_arguments->existedChildTables +=
51✔
1134
                            stbInfo->childTblCount;
51✔
1135
                    continue;
51✔
1136
                }
1137
                debugPrint("colsOfCreateChildTable: %s\n",
223✔
1138
                        stbInfo->colsOfCreateChildTable);
1139

1140
                code = startMultiThreadCreateChildTable(database, stbInfo);
223✔
1141
                if (code && !g_arguments->terminate) {
223!
1142
                    return code;
1✔
1143
                }
1144
            }
1145
        }
1146
    }
1147

1148
    int64_t end = toolsGetTimestampMs();
268✔
1149
    if(end == start) {
268✔
1150
        end += 1;
84✔
1151
    }
1152
    succPrint(
268!
1153
            "Spent %.4f seconds to create %" PRId64
1154
            " table(s) with %d thread(s) speed: %.0f tables/s, already exist %" PRId64
1155
            " table(s), actual %" PRId64 " table(s) pre created, %" PRId64
1156
            " table(s) will be auto created\n",
1157
            (float)(end - start) / 1000.0,
1158
            g_arguments->totalChildTables,
1159
            g_arguments->table_threads,
1160
            g_arguments->actualChildTables * 1000 / (float)(end - start),
1161
            g_arguments->existedChildTables,
1162
            g_arguments->actualChildTables,
1163
            g_arguments->autoCreatedChildTables);
1164
    return 0;
268✔
1165
}
1166

1167
static void freeChildTable(SChildTable *childTbl, int colsSize) {
154,785✔
1168
    if (childTbl->useOwnSample) {
154,785✔
1169
        if (childTbl->childCols) {
12✔
1170
            for (int col = 0; col < colsSize; col++) {
28✔
1171
                ChildField *childCol =
1172
                    benchArrayGet(childTbl->childCols, col);
20✔
1173
                if (childCol) {
20!
1174
                    tmfree(childCol->stmtData.data);
20✔
1175
                    childCol->stmtData.data = NULL;
20✔
1176
                    tmfree(childCol->stmtData.is_null);
20✔
1177
                    childCol->stmtData.is_null = NULL;
20✔
1178
                    tmfree(childCol->stmtData.lengths);
20✔
1179
                    childCol->stmtData.lengths = NULL;
20✔
1180
                }
1181
            }
1182
            benchArrayDestroy(childTbl->childCols);
8✔
1183
        }
1184
        tmfree(childTbl->sampleDataBuf);
12✔
1185
    }
1186
    tmfree(childTbl);
154,785✔
1187
}
154,785✔
1188

1189
void postFreeResource() {
321✔
1190
    infoPrint("%s\n", "free resource and exit ...");
321✔
1191
    if (!g_arguments->terminate) {
321✔
1192
        tmfclose(g_arguments->fpOfInsertResult);
318✔
1193
    }
1194

1195
    for (int i = 0; i < g_arguments->databases->size; i++) {
663✔
1196
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
342✔
1197
        if (database->cfgs) {
342!
1198
            for (int c = 0; c < database->cfgs->size; c++) {
821✔
1199
                SDbCfg *cfg = benchArrayGet(database->cfgs, c);
479✔
1200
                if (cfg->valuestring && cfg->free) {
479✔
1201
                    tmfree(cfg->valuestring);
1✔
1202
                    cfg->valuestring = NULL;
1✔
1203
                }
1204
            }
1205
            benchArrayDestroy(database->cfgs);
342✔
1206
        }
1207
        if (database->superTbls) {
342!
1208
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
754✔
1209
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
412✔
1210
                tmfree(stbInfo->colsOfCreateChildTable);
412✔
1211
                stbInfo->colsOfCreateChildTable = NULL;
412✔
1212
                tmfree(stbInfo->sampleDataBuf);
412✔
1213
                stbInfo->sampleDataBuf = NULL;
412✔
1214
                tmfree(stbInfo->partialColNameBuf);
412✔
1215
                stbInfo->partialColNameBuf = NULL;
412✔
1216
                benchArrayDestroy(stbInfo->batchTblCreatingNumbersArray);
412✔
1217
                benchArrayDestroy(stbInfo->batchTblCreatingIntervalsArray);
412✔
1218
                for (int k = 0; k < stbInfo->tags->size; k++) {
2,196✔
1219
                    Field * tag = benchArrayGet(stbInfo->tags, k);
1,784✔
1220
                    tmfree(tag->stmtData.data);
1,784✔
1221
                    tag->stmtData.data = NULL;
1,784✔
1222
                    tmfree(tag->stmtData.is_null);
1,784✔
1223
                    tag->stmtData.is_null = NULL;
1,784✔
1224
                    tmfree(tag->stmtData.lengths);
1,784✔
1225
                    tag->stmtData.lengths = NULL;
1,784✔
1226
                }
1227
                benchArrayDestroy(stbInfo->tags);
412✔
1228

1229
                for (int k = 0; k < stbInfo->cols->size; k++) {
50,169✔
1230
                    Field * col = benchArrayGet(stbInfo->cols, k);
49,757✔
1231
                    tmfree(col->stmtData.data);
49,757✔
1232
                    col->stmtData.data = NULL;
49,757✔
1233
                    tmfree(col->stmtData.is_null);
49,757✔
1234
                    col->stmtData.is_null = NULL;
49,757✔
1235
                    tmfree(col->stmtData.lengths);
49,757✔
1236
                    col->stmtData.lengths = NULL;
49,757✔
1237
                }
1238
                if (g_arguments->test_mode == INSERT_TEST) {
412✔
1239
                    if (stbInfo->childTblArray) {
364✔
1240
                        for (int64_t child = 0; child < stbInfo->childTblCount;
155,142✔
1241
                                child++) {
154,785✔
1242
                            SChildTable *childTbl = stbInfo->childTblArray[child];
154,785✔
1243
                            if (childTbl) {
154,785!
1244
                                tmfree(childTbl->name);
154,785✔
1245
                                freeChildTable(childTbl, stbInfo->cols->size);
154,785✔
1246
                            }
1247
                        }
1248
                    }
1249
                }
1250
                benchArrayDestroy(stbInfo->cols);
412✔
1251
                tmfree(stbInfo->childTblArray);
412✔
1252
                stbInfo->childTblArray = NULL;
412✔
1253
                benchArrayDestroy(stbInfo->tsmas);
412✔
1254

1255
                // free sqls
1256
                if(stbInfo->sqls) {
412✔
1257
                    char **sqls = stbInfo->sqls;
16✔
1258
                    while (*sqls) {
80✔
1259
                        free(*sqls);
64✔
1260
                        sqls++;
64✔
1261
                    }
1262
                    tmfree(stbInfo->sqls);
16✔
1263
                }
1264

1265
                // thread_bind
1266
                if (database->vgArray) {
412✔
1267
                    for (int32_t v = 0; v < database->vgroups; v++) {
18✔
1268
                        SVGroup *vg = benchArrayGet(database->vgArray, v);
14✔
1269
                        tmfree(vg->childTblArray);
14✔
1270
                        vg->childTblArray = NULL;
14✔
1271
                    }
1272
                    benchArrayDestroy(database->vgArray);
4✔
1273
                    database->vgArray = NULL;
4✔
1274
                }
1275
            }
1276
            benchArrayDestroy(database->superTbls);
342✔
1277
        }
1278
    }
1279
    benchArrayDestroy(g_arguments->databases);
321✔
1280
    benchArrayDestroy(g_arguments->streams);
321✔
1281
    tools_cJSON_Delete(root);
321✔
1282
}
321✔
1283

1284
int32_t execInsert(threadInfo *pThreadInfo, uint32_t k, int64_t *delay3) {
8,131,826✔
1285
    SDataBase *  database = pThreadInfo->dbInfo;
8,131,826✔
1286
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
8,131,826✔
1287
    TAOS_RES *   res = NULL;
8,131,826✔
1288
    int32_t      code = 0;
8,131,826✔
1289
    uint16_t     iface = stbInfo->iface;
8,131,826✔
1290
    int64_t      start = 0;
8,131,826✔
1291
    int32_t      affectRows = 0;
8,131,826✔
1292

1293
    int32_t trying = (stbInfo->keep_trying)?
16,263,652✔
1294
        stbInfo->keep_trying:g_arguments->keep_trying;
8,131,826✔
1295
    int32_t trying_interval = stbInfo->trying_interval?
16,263,652✔
1296
        stbInfo->trying_interval:g_arguments->trying_interval;
8,131,826✔
1297
    int protocol = stbInfo->lineProtocol;
8,131,826✔
1298

1299
    switch (iface) {
8,131,826!
1300
        case TAOSC_IFACE:
8,091,239✔
1301
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
8,091,239!
1302
            code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
8,091,239✔
1303
            while (code && trying && !g_arguments->terminate) {
8,074,981!
1304
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
343✔
1305
                          trying_interval);
1306
                toolsMsleep(trying_interval);
343✔
1307
                code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
341✔
1308
                if (trying != -1) {
839!
1309
                    trying--;
×
1310
                }
1311
            }
1312
            break;
8,074,638✔
1313
        // REST
1314
        case REST_IFACE:
16✔
1315
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
16!
1316
            code = postProcessSql(pThreadInfo->buffer,
16✔
1317
                                database->dbName,
1318
                                database->precision,
1319
                                stbInfo->iface,
16✔
1320
                                stbInfo->lineProtocol,
16✔
1321
                                g_arguments->port,
16✔
1322
                                stbInfo->tcpTransfer,
16✔
1323
                                pThreadInfo->sockfd,
1324
                                pThreadInfo->filePath);
16✔
1325
            while (code && trying && !g_arguments->terminate) {
16!
1326
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1327
                          trying_interval);
1328
                toolsMsleep(trying_interval);
×
1329
                code = postProcessSql(pThreadInfo->buffer,
×
1330
                                    database->dbName,
1331
                                    database->precision,
1332
                                    stbInfo->iface,
×
1333
                                    stbInfo->lineProtocol,
×
1334
                                    g_arguments->port,
×
1335
                                    stbInfo->tcpTransfer,
×
1336
                                    pThreadInfo->sockfd,
1337
                                    pThreadInfo->filePath);
×
1338
                if (trying != -1) {
×
1339
                    trying--;
×
1340
                }
1341
            }
1342
            break;
16✔
1343
            
1344
        case STMT_IFACE:
54,828✔
1345
            // add batch
1346
            if(!stbInfo->autoTblCreating) {
54,828✔
1347
                start = toolsGetTimestampUs();
14,815✔
1348
                if (taos_stmt_add_batch(pThreadInfo->conn->stmt) != 0) {
14,817!
1349
                    errorPrint("taos_stmt_add_batch() failed! reason: %s\n",
×
1350
                            taos_stmt_errstr(pThreadInfo->conn->stmt));
1351
                    return -1;
×
1352
                }
1353
                if(delay3) {
14,817✔
1354
                    *delay3 += toolsGetTimestampUs() - start;
14,816✔
1355
                }
1356
            }
1357
            
1358
            // execute 
1359
            code = taos_stmt_execute(pThreadInfo->conn->stmt);
54,833✔
1360
            if (code) {
54,826!
1361
                errorPrint(
×
1362
                           "failed to execute insert statement. reason: %s\n",
1363
                           taos_stmt_errstr(pThreadInfo->conn->stmt));
1364
                code = -1;
×
1365
            }
1366
            break;
54,826✔
1367

1368
        case STMT2_IFACE:
327✔
1369
            // execute 
1370
            code = taos_stmt2_exec(pThreadInfo->conn->stmt2, &affectRows);
327✔
1371
            if (code) {
328!
1372
                errorPrint( "failed to call taos_stmt2_exec(). reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1373
                code = -1;
×
1374
            }
1375
            debugPrint( "succ call taos_stmt2_exec() affectRows:%d\n", affectRows);
328!
1376
            break;
328✔
1377

1378
        case SML_IFACE:
761✔
1379
            res = taos_schemaless_insert(
1,193✔
1380
                pThreadInfo->conn->taos, pThreadInfo->lines,
761✔
1381
                (TSDB_SML_JSON_PROTOCOL == protocol
1382
                    || SML_JSON_TAOS_FORMAT == protocol)
432✔
1383
                    ? 0 : k,
1384
                (SML_JSON_TAOS_FORMAT == protocol)
1385
                    ? TSDB_SML_JSON_PROTOCOL : protocol,
1386
                (TSDB_SML_LINE_PROTOCOL == protocol)
1387
                    ? database->sml_precision
1388
                    : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1389
            code = taos_errno(res);
762✔
1390
            trying = stbInfo->keep_trying;
762✔
1391
            while (code && trying && !g_arguments->terminate) {
762!
1392
                taos_free_result(res);
×
1393
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1394
                          trying_interval);
1395
                toolsMsleep(trying_interval);
×
1396
                res = taos_schemaless_insert(
×
1397
                        pThreadInfo->conn->taos, pThreadInfo->lines,
×
1398
                        (TSDB_SML_JSON_PROTOCOL == protocol
1399
                            || SML_JSON_TAOS_FORMAT == protocol)
×
1400
                            ? 0 : k,
1401
                        (SML_JSON_TAOS_FORMAT == protocol)
1402
                            ? TSDB_SML_JSON_PROTOCOL : protocol,
1403
                        (TSDB_SML_LINE_PROTOCOL == protocol)
1404
                            ? database->sml_precision
1405
                            : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1406
                code = taos_errno(res);
×
1407
                if (trying != -1) {
×
1408
                    trying--;
×
1409
                }
1410
            }
1411

1412
            if (code != TSDB_CODE_SUCCESS && !g_arguments->terminate) {
762!
1413
                debugPrint("Failed to execute "
×
1414
                           "schemaless insert content: %s\n\n",
1415
                        pThreadInfo->lines?(pThreadInfo->lines[0]?
1416
                            pThreadInfo->lines[0]:""):"");
1417
                errorPrint(
×
1418
                    "failed to execute schemaless insert. "
1419
                        "code: 0x%08x reason: %s\n\n",
1420
                        code, taos_errstr(res));
1421
            }
1422
            taos_free_result(res);
762✔
1423
            break;
762✔
1424
        case SML_REST_IFACE: {
159✔
1425
            if (TSDB_SML_JSON_PROTOCOL == protocol
159✔
1426
                    || SML_JSON_TAOS_FORMAT == protocol) {
115✔
1427
                code = postProcessSql(pThreadInfo->lines[0], database->dbName,
45✔
1428
                                    database->precision, stbInfo->iface,
45✔
1429
                                    protocol, g_arguments->port,
45✔
1430
                                    stbInfo->tcpTransfer,
45✔
1431
                                    pThreadInfo->sockfd, pThreadInfo->filePath);
45✔
1432
            } else {
1433
                int len = 0;
114✔
1434
                for (int i = 0; i < k; i++) {
1,075✔
1435
                    if (strlen(pThreadInfo->lines[i]) != 0) {
960!
1436
                        int n;
1437
                        if (TSDB_SML_TELNET_PROTOCOL == protocol
960✔
1438
                                && stbInfo->tcpTransfer) {
639✔
1439
                            n = snprintf(pThreadInfo->buffer + len,
318✔
1440
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
318✔
1441
                                           "put %s\n", pThreadInfo->lines[i]);
318✔
1442
                        } else {
1443
                            n = snprintf(pThreadInfo->buffer + len,
642✔
1444
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
642✔
1445
                                            "%s\n",
1446
                                           pThreadInfo->lines[i]);
642✔
1447
                        }
1448
                        if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - len) {
960!
1449
                            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1450
                                __func__, __LINE__, i);
1451
                            break;
×
1452
                        } else {
1453
                            len += n;
961✔
1454
                        }
1455
                    } else {
1456
                        break;
×
1457
                    }
1458
                }
1459
                if (g_arguments->terminate) {
115!
1460
                    break;
×
1461
                }
1462
                code = postProcessSql(pThreadInfo->buffer, database->dbName,
115✔
1463
                        database->precision,
1464
                        stbInfo->iface, protocol,
115✔
1465
                        g_arguments->port,
115✔
1466
                        stbInfo->tcpTransfer,
115✔
1467
                        pThreadInfo->sockfd, pThreadInfo->filePath);
115✔
1468
            }
1469
            break;
159✔
1470
        }
1471
    }
1472
    return code;
8,115,225✔
1473
}
1474

1475
static int smartContinueIfFail(threadInfo *pThreadInfo,
×
1476
                               SChildTable *childTbl,
1477
                               char *tagData,
1478
                               int64_t i,
1479
                               char *ttl) {
1480
    SDataBase *  database = pThreadInfo->dbInfo;
×
1481
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
1482
    char *buffer =
1483
        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
×
1484
    snprintf(
×
1485
            buffer, TSDB_MAX_ALLOWED_SQL_LEN,
1486
            g_arguments->escape_character ?
×
1487
                "CREATE TABLE IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s "
1488
                : "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ",
1489
            database->dbName, childTbl->name, database->dbName,
1490
            stbInfo->stbName,
1491
            tagData + i * stbInfo->lenOfTags, ttl);
×
1492
    debugPrint("creating table: %s\n", buffer);
×
1493

1494
    int ret;
1495
    if (REST_IFACE == stbInfo->iface) {
×
1496
        ret = queryDbExecRest(buffer,
×
1497
                              database->dbName,
1498
                              database->precision,
1499
                              stbInfo->iface,
×
1500
                              stbInfo->lineProtocol,
×
1501
                              stbInfo->tcpTransfer,
×
1502
                              pThreadInfo->sockfd);
1503
    } else {
1504
        ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1505
        int32_t trying = g_arguments->keep_trying;
×
1506
        while (ret && trying) {
×
1507
            infoPrint("will sleep %"PRIu32" milliseconds then "
×
1508
                      "re-create table %s\n",
1509
                      g_arguments->trying_interval, buffer);
1510
            toolsMsleep(g_arguments->trying_interval);
×
1511
            ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1512
            if (trying != -1) {
×
1513
                trying--;
×
1514
            }
1515
        }
1516
    }
1517

1518
    tmfree(buffer);
×
1519

1520
    return ret;
×
1521
}
1522

1523
static void cleanupAndPrint(threadInfo *pThreadInfo, char *mode) {
1,868✔
1524
    if (pThreadInfo) {
1,868!
1525
        if (pThreadInfo->json_array) {
1,869✔
1526
            tools_cJSON_Delete(pThreadInfo->json_array);
126✔
1527
            pThreadInfo->json_array = NULL;
126✔
1528
        }
1529
        if (0 == pThreadInfo->totalDelay) {
1,869✔
1530
            pThreadInfo->totalDelay = 1;
404✔
1531
        }
1532
        succPrint(
1,869!
1533
            "thread[%d] %s mode, completed total inserted rows: %" PRIu64
1534
            ", %.2f records/second\n",
1535
            pThreadInfo->threadID,
1536
            mode,
1537
            pThreadInfo->totalInsertRows,
1538
            (double)(pThreadInfo->totalInsertRows /
1539
            ((double)pThreadInfo->totalDelay / 1E6)));
1540
    }
1541
}
1,870✔
1542

1543
static int64_t getDisorderTs(SSuperTable *stbInfo, int *disorderRange) {
568,963,772✔
1544
    int64_t disorderTs = 0;
568,963,772✔
1545
    int64_t startTimestamp = stbInfo->startTimestamp;
568,963,772✔
1546
    if (stbInfo->disorderRatio > 0) {
568,963,772✔
1547
        int rand_num = taosRandom() % 100;
4,369,843✔
1548
        if (rand_num < stbInfo->disorderRatio) {
4,433,411✔
1549
            (*disorderRange)--;
111,569✔
1550
            if (0 == *disorderRange) {
111,569✔
1551
                *disorderRange = stbInfo->disorderRange;
2✔
1552
            }
1553
            disorderTs = startTimestamp - *disorderRange;
111,569✔
1554
            debugPrint("rand_num: %d, < disorderRatio: %d, "
111,569!
1555
                       "disorderTs: %"PRId64"\n",
1556
                       rand_num, stbInfo->disorderRatio,
1557
                       disorderTs);
1558
        }
1559
    }
1560
    return disorderTs;
569,072,569✔
1561
}
1562

1563
void loadChildTableInfo(threadInfo* pThreadInfo) {
2,531✔
1564
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
2,531✔
1565
    if(!g_arguments->pre_load_tb_meta) {
2,531!
1566
        return ;
2,573✔
1567
    }
1568
    if(pThreadInfo->conn == NULL) {
×
1569
        return ;
×
1570
    }
1571

1572
    char *db    = pThreadInfo->dbInfo->dbName;
×
1573
    int64_t cnt = pThreadInfo->end_table_to - pThreadInfo->start_table_from;
×
1574

1575
    // 100k
1576
    int   bufLen = 100 * 1024;
×
1577
    char *buf    = benchCalloc(1, bufLen, false);
×
1578
    int   pos    = 0;
×
1579
    infoPrint("start load child tables(%"PRId64") info...\n", cnt);
×
1580
    int64_t start = toolsGetTimestampUs();
×
1581
    for(int64_t i = pThreadInfo->start_table_from; i < pThreadInfo->end_table_to; i++) {
×
1582
        SChildTable *childTbl = stbInfo->childTblArray[i];
×
1583
        pos += sprintf(buf + pos, ",%s.%s", db, childTbl->name);
×
1584

1585
        if(pos >= bufLen - 256 || i + 1 == pThreadInfo->end_table_to) {
×
1586
            taos_load_table_info(pThreadInfo->conn, buf);
×
1587
            pos = 0;
×
1588
        }
1589
    }
1590
    int64_t delay = toolsGetTimestampUs() - start;
×
1591
    infoPrint("end load child tables info. delay=%.2fs\n", delay/1E6);
×
1592
    pThreadInfo->totalDelay += delay;
×
1593

1594
    tmfree(buf);
×
1595
}
1596

1597
// create conn again
1598
int32_t reCreateConn(threadInfo * pThreadInfo) {
×
1599
    // single
1600
    bool single = true;
×
1601
    if (pThreadInfo->dbInfo->superTbls->size > 1) {
×
1602
        single = false;
×
1603
    }
1604

1605
    //
1606
    // retry stmt2 init 
1607
    //
1608

1609
    // stmt2 close
1610
    if (pThreadInfo->conn->stmt2) {
×
1611
        taos_stmt2_close(pThreadInfo->conn->stmt2);
×
1612
        pThreadInfo->conn->stmt2 = NULL;
×
1613
    }
1614

1615
    // retry stmt2 init , maybe success
1616
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1617
    if (pThreadInfo->conn->stmt2) {
×
1618
        succPrint("%s", "reCreateConn first taos_stmt2_init() success and return.\n");
×
1619
        return 0;
×
1620
    }
1621

1622
    //
1623
    // close old
1624
    //
1625
    closeBenchConn(pThreadInfo->conn);
×
1626
    pThreadInfo->conn = NULL;
×
1627

1628
    //
1629
    // create new
1630
    //
1631

1632
    // conn
1633
    pThreadInfo->conn = initBenchConn();
×
1634
    if (pThreadInfo->conn == NULL) {
×
1635
        errorPrint("%s", "reCreateConn initBenchConn failed.");
×
1636
        return -1;
×
1637
    }
1638
    // stmt2
1639
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1640
    if (NULL == pThreadInfo->conn->stmt2) {
×
1641
        errorPrint("reCreateConn taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
1642
        return -1;
×
1643
    } 
1644
        
1645
    succPrint("%s", "reCreateConn second taos_stmt2_init() success.\n");
×
1646
    // select db 
1647
    if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
×
1648
        errorPrint("second taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
1649
        return -1;
×
1650
    }
1651

1652
    return 0;
×
1653
}
1654

1655
// reinit
1656
int32_t reConnectStmt2(threadInfo * pThreadInfo, int32_t w) {
×
1657
    // re-create connection
1658
    int32_t code = reCreateConn(pThreadInfo);
×
1659
    if (code != 0) {
×
1660
        return code;
×
1661
    }
1662

1663
    // prepare
1664
    code = prepareStmt2(pThreadInfo->conn->stmt2, pThreadInfo->stbInfo, NULL, w, pThreadInfo->dbInfo->dbName);
×
1665
    if (code != 0) {
×
1666
        return code;
×
1667
    }
1668

1669
    return code;
×
1670
}
1671

1672
int32_t submitStmt2Impl(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
329✔
1673
                    int64_t* startTs, int64_t* endTs, uint32_t* generated) {
1674
    // call bind
1675
    int64_t start = toolsGetTimestampUs();
329✔
1676
    int32_t code = taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1);
329✔
1677
    if (code != 0) {
328!
1678
        errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1679
        return code;
×
1680
    }
1681
    debugPrint("interlace taos_stmt2_bind_param() ok.  bindv->count=%d \n", bindv->count);
328!
1682
    *delay1 += toolsGetTimestampUs() - start;
328✔
1683

1684
    // execute
1685
    *startTs = toolsGetTimestampUs();
328✔
1686
    code = execInsert(pThreadInfo, *generated, delay3);
327✔
1687
    *endTs = toolsGetTimestampUs();
328✔
1688
    return code;
328✔
1689
}
1690

1691
int32_t submitStmt2(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
328✔
1692
                    int64_t* startTs, int64_t* endTs, uint32_t* generated, int32_t w) {
1693
    // calc loop
1694
    int32_t loop = 1;
328✔
1695
    SSuperTable* stbInfo = pThreadInfo->stbInfo;
328✔
1696
    if(stbInfo->continueIfFail == YES_IF_FAILED) {          
328!
1697
        if(stbInfo->keep_trying > 1) {
×
1698
            loop = stbInfo->keep_trying;
×
1699
        } else {
1700
            loop = 3; // default
×
1701
        }
1702
    }
1703

1704
    // submit stmt2
1705
    int32_t i = 0;
328✔
1706
    bool connected = true;
328✔
1707
    while (1) {
×
1708
        int32_t code = -1;
328✔
1709
        if(connected) {
328!
1710
            // reinit success to do submit
1711
            code = submitStmt2Impl(pThreadInfo, bindv, delay1, delay3, startTs, endTs, generated);
329✔
1712
        }
1713

1714
        // check code
1715
        if ( code == 0) {
328!
1716
            // success
1717
            break;
328✔
1718
        } else {
1719
            // failed to try
1720
            if (--loop == 0) {
×
1721
                // failed finally
1722
                char tip[64] = "";
×
1723
                if (i > 0) {
×
1724
                    snprintf(tip, sizeof(tip), " after retry %d", i);
×
1725
                }
1726
                errorPrint("finally faild execute submitStmt2()%s\n", tip);
×
1727
                return -1;
×
1728
            }
1729

1730
            // wait a memont for trying
1731
            toolsMsleep(stbInfo->trying_interval);
×
1732
            // reinit
1733
            infoPrint("stmt2 start retry submit i=%d  after sleep %d ms...\n", i++, stbInfo->trying_interval);
×
1734
            code = reConnectStmt2(pThreadInfo, w);
×
1735
            if (code != 0) {
×
1736
                // faild and try again
1737
                errorPrint("faild reConnectStmt2 and retry again for next i=%d \n", i);
×
1738
                connected = false;
×
1739
            } else {
1740
                // succ 
1741
                connected = true;
×
1742
            }
1743
        }
1744
    }
1745
    
1746
    // success
1747
    return 0;
328✔
1748
}
1749

1750
static void *syncWriteInterlace(void *sarg) {
120✔
1751
    threadInfo * pThreadInfo = (threadInfo *)sarg;
120✔
1752
    SDataBase *  database = pThreadInfo->dbInfo;
120✔
1753
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
120✔
1754
    infoPrint(
120✔
1755
              "thread[%d] start interlace inserting into table from "
1756
              "%" PRIu64 " to %" PRIu64 "\n",
1757
              pThreadInfo->threadID, pThreadInfo->start_table_from,
1758
              pThreadInfo->end_table_to);
1759

1760
    int64_t insertRows = stbInfo->insertRows;
121✔
1761
    int32_t interlaceRows = stbInfo->interlaceRows;
121✔
1762
    uint32_t nBatchTable  = g_arguments->reqPerReq / interlaceRows;
121✔
1763
    uint64_t   lastPrintTime = toolsGetTimestampMs();
121✔
1764
    uint64_t   lastTotalInsertRows = 0;
121✔
1765
    int64_t   startTs = toolsGetTimestampUs();
121✔
1766
    int64_t   endTs;
1767
    uint64_t   tableSeq = pThreadInfo->start_table_from;
121✔
1768
    int disorderRange = stbInfo->disorderRange;
121✔
1769
    int32_t i = 0;
121✔
1770

1771
    loadChildTableInfo(pThreadInfo);
121✔
1772
    // check if filling back mode
1773
    bool fillBack = false;
121✔
1774
    if(stbInfo->useNow && stbInfo->startFillbackTime) {
121!
1775
        fillBack = true;
×
1776
        pThreadInfo->start_time = stbInfo->startFillbackTime;
×
1777
        infoPrint("start time change to startFillbackTime = %"PRId64" \n", pThreadInfo->start_time);
×
1778
    }
1779

1780
    FILE* csvFile = NULL;
121✔
1781
    char* tagData = NULL;
121✔
1782
    int   w       = 0; // record tags position, if w > TAG_BATCH_COUNT , need recreate new tag values
121✔
1783
    if (stbInfo->autoTblCreating) {
121✔
1784
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
21✔
1785
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
21✔
1786
    }
1787
    int64_t delay1 = 0;
120✔
1788
    int64_t delay2 = 0;
120✔
1789
    int64_t delay3 = 0;
120✔
1790
    bool    firstInsertTb = true;
120✔
1791

1792
    TAOS_STMT2_BINDV *bindv = NULL;
120✔
1793

1794
    // create bindv
1795
    if(stbInfo->iface == STMT2_IFACE) {
120✔
1796
        int32_t tagCnt = stbInfo->autoTblCreating ? stbInfo->tags->size : 0;
6!
1797
        if (csvFile) {
6!
1798
            tagCnt = 0;
×
1799
        }
1800
        //int32_t tagCnt = stbInfo->tags->size;
1801
        bindv = createBindV(nBatchTable,  tagCnt, stbInfo->cols->size + 1);
6✔
1802
    }
1803

1804
    bool oldInitStmt = stbInfo->autoTblCreating;
120✔
1805
    // not auto create table call once
1806
    if(stbInfo->iface == STMT_IFACE && !oldInitStmt) {
120!
1807
        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
20!
1808
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
20!
1809
            g_fail = true;
×
1810
            goto free_of_interlace;
×
1811
        }
1812
    }
1813
    else if (stbInfo->iface == STMT2_IFACE) {
100✔
1814
        // only prepare once
1815
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, NULL, w, database->dbName)) {
6!
1816
            g_fail = true;
×
1817
            goto free_of_interlace;
×
1818
        }
1819
    }    
1820
    int64_t index = tableSeq;
119✔
1821
    while (insertRows > 0) {
27,171✔
1822
        int64_t tmp_total_insert_rows = 0;
27,045✔
1823
        uint32_t generated = 0;
27,045✔
1824
        if (insertRows <= interlaceRows) {
27,045✔
1825
            interlaceRows = insertRows;
129✔
1826
        }
1827

1828
        // loop each table
1829
        for (i = 0; i < nBatchTable; i++) {
67,033✔
1830
            if (g_arguments->terminate) {
67,026!
1831
                goto free_of_interlace;
×
1832
            }
1833
            int64_t pos = pThreadInfo->pos;
67,026✔
1834
            
1835
            // get childTable
1836
            SChildTable *childTbl;
1837
            if (g_arguments->bind_vgroup) {
67,026✔
1838
                childTbl = pThreadInfo->vg->childTblArray[tableSeq];
36,787✔
1839
            } else {
1840
                childTbl = stbInfo->childTblArray[tableSeq];
30,239✔
1841
            }
1842

1843
            char *  tableName   = childTbl->name;
67,026✔
1844
            char *sampleDataBuf = childTbl->useOwnSample?
134,052✔
1845
                                        childTbl->sampleDataBuf:
67,026!
1846
                                        stbInfo->sampleDataBuf;
1847
            // init ts
1848
            if(childTbl->ts == 0) {
67,026✔
1849
               childTbl->ts = pThreadInfo->start_time;
322✔
1850
            }
1851
            char ttl[SMALL_BUFF_LEN] = "";
67,026✔
1852
            if (stbInfo->ttl != 0) {
67,026✔
1853
                snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
32✔
1854
            }
1855
            switch (stbInfo->iface) {
67,026!
1856
                case REST_IFACE:
30,543✔
1857
                case TAOSC_IFACE: {
1858
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
30,543✔
1859
                    if (g_arguments->escape_character) {
30,543✔
1860
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "`%s`",
7,342✔
1861
                                tableName);
1862
                    } else {
1863
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "%s",
23,201✔
1864
                                tableName);
1865
                    }
1866
                    if (i == 0) {
30,543✔
1867
                        ds_add_str(&pThreadInfo->buffer, STR_INSERT_INTO);
24,039✔
1868
                    }
1869

1870
                    // generator
1871
                    if (stbInfo->autoTblCreating && w == 0) {
30,540✔
1872
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
41!
1873
                            goto free_of_interlace;
×
1874
                        }
1875
                    }
1876

1877
                    // create child table
1878
                    if (stbInfo->partialColNum == stbInfo->cols->size) {
30,540✔
1879
                        if (stbInfo->autoTblCreating) {
30,508✔
1880
                            ds_add_strs(&pThreadInfo->buffer, 8,
3,998✔
1881
                                    escapedTbName,
1882
                                    " USING `",
1883
                                    stbInfo->stbName,
1884
                                    "` TAGS (",
1885
                                    tagData + stbInfo->lenOfTags * w,
3,998✔
1886
                                    ") ", ttl, " VALUES ");
1887
                        } else {
1888
                            ds_add_strs(&pThreadInfo->buffer, 2,
26,510✔
1889
                                    escapedTbName, " VALUES ");
1890
                        }
1891
                    } else {
1892
                        if (stbInfo->autoTblCreating) {
32!
1893
                            ds_add_strs(&pThreadInfo->buffer, 10,
32✔
1894
                                        escapedTbName,
1895
                                        " (",
1896
                                        stbInfo->partialColNameBuf,
1897
                                        ") USING `",
1898
                                        stbInfo->stbName,
1899
                                        "` TAGS (",
1900
                                        tagData + stbInfo->lenOfTags * w,
32✔
1901
                                        ") ", ttl, " VALUES ");
1902
                        } else {
1903
                            ds_add_strs(&pThreadInfo->buffer, 4,
×
1904
                                        escapedTbName,
1905
                                        "(",
1906
                                        stbInfo->partialColNameBuf,
1907
                                        ") VALUES ");
1908
                        }
1909
                    }
1910

1911
                    // move next
1912
                    if (stbInfo->autoTblCreating && ++w >= TAG_BATCH_COUNT) {
30,530✔
1913
                        // reset for gen again
1914
                        w = 0;
40✔
1915
                        index += TAG_BATCH_COUNT;
40✔
1916
                    }  
1917

1918
                    // write child data with interlaceRows
1919
                    for (int64_t j = 0; j < interlaceRows; j++) {
291,473✔
1920
                        int64_t disorderTs = getDisorderTs(stbInfo,
260,923✔
1921
                                &disorderRange);
1922

1923
                        // change fillBack mode with condition
1924
                        if(fillBack) {
261,021!
1925
                            int64_t tsnow = toolsGetTimestamp(database->precision);
×
1926
                            if(childTbl->ts >= tsnow){
×
1927
                                fillBack = false;
×
1928
                                infoPrint("fillBack mode set end. because timestamp(%"PRId64") >= now(%"PRId64")\n", childTbl->ts, tsnow);
×
1929
                            }
1930
                        }
1931

1932
                        // timestamp         
1933
                        char time_string[BIGINT_BUFF_LEN];
1934
                        if(stbInfo->useNow && stbInfo->interlaceRows == 1 && !fillBack) {
261,023!
1935
                            int64_t now = toolsGetTimestamp(database->precision);
3✔
1936
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"", now);
3✔
1937
                        } else {
1938
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"",
261,020✔
1939
                                    disorderTs?disorderTs:childTbl->ts);
1940
                        }
1941

1942
                        // combine rows timestamp | other cols = sampleDataBuf[pos]
1943
                        if(stbInfo->useSampleTs) {
261,023!
1944
                            ds_add_strs(&pThreadInfo->buffer, 3, "(", 
×
1945
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
×
1946
                        } else {
1947
                            ds_add_strs(&pThreadInfo->buffer, 5, "(", time_string, ",",
261,023✔
1948
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
261,023✔
1949
                        }
1950
                        // check buffer enough
1951
                        if (ds_len(pThreadInfo->buffer)
261,006✔
1952
                                > stbInfo->max_sql_len) {
260,963!
1953
                            errorPrint("sql buffer length (%"PRIu64") "
×
1954
                                    "is larger than max sql length "
1955
                                    "(%"PRId64")\n",
1956
                                    ds_len(pThreadInfo->buffer),
1957
                                    stbInfo->max_sql_len);
1958
                            goto free_of_interlace;
×
1959
                        }
1960

1961
                        // move next
1962
                        generated++;
260,963✔
1963
                        pos++;
260,963✔
1964
                        if (pos >= g_arguments->prepared_rand) {
260,963✔
1965
                            pos = 0;
50✔
1966
                        }
1967
                        if(stbInfo->primary_key)
260,963!
1968
                            debugPrint("add child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
1969

1970
                        // primary key
1971
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
260,963!
1972
                            childTbl->ts += stbInfo->timestamp_step;
260,949✔
1973
                            if(stbInfo->primary_key)
260,949!
1974
                                debugPrint("changedTs child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
1975
                        }
1976
                        
1977
                    }
1978
                    break;
30,550✔
1979
                }
1980
                case STMT_IFACE: {
35,704✔
1981
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
35,704✔
1982
                    if (g_arguments->escape_character) {
35,704✔
1983
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2,
3,395✔
1984
                                "`%s`", tableName);
1985
                    } else {
1986
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
32,309✔
1987
                                tableName);
1988
                    }
1989

1990
                    // generator
1991
                    if (stbInfo->autoTblCreating && w == 0) {
35,704!
1992
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
1993
                            goto free_of_interlace;
×
1994
                        }
1995
                    }
1996
                    
1997
                    // old must call prepareStmt for each table
1998
                    if (oldInitStmt) {
35,704!
1999
                        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
×
2000
                        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
×
2001
                            g_fail = true;
×
2002
                            goto free_of_interlace;
×
2003
                        }
2004
                    }
2005
      
2006
                    int64_t start = toolsGetTimestampUs();
35,704✔
2007
                    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
35,823!
2008
                                             escapedTbName)) {
2009
                        errorPrint(
×
2010
                            "taos_stmt_set_tbname(%s) failed, reason: %s\n",
2011
                            tableName,
2012
                                taos_stmt_errstr(pThreadInfo->conn->stmt));
2013
                        g_fail = true;
×
2014
                        goto free_of_interlace;
×
2015
                    }
2016
                    delay1 += toolsGetTimestampUs() - start;
35,628✔
2017

2018
                    int32_t n = 0;
35,649✔
2019
                    generated += bindParamBatch(pThreadInfo, interlaceRows,
35,649✔
2020
                                       childTbl->ts, pos, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3);
2021
                    
2022
                    // move next
2023
                    pos += interlaceRows;
35,687✔
2024
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
35,687✔
2025
                        pos = 0;
34✔
2026
                    }
2027
                    childTbl->ts += stbInfo->timestamp_step * n;
35,687✔
2028

2029
                    // move next
2030
                    if (stbInfo->autoTblCreating) {
35,687!
2031
                        w += 1;
×
2032
                        if (w >= TAG_BATCH_COUNT) {
×
2033
                            // reset for gen again
2034
                            w = 0;
×
2035
                            index += TAG_BATCH_COUNT;
×
2036
                        }
2037
                    }
2038

2039
                    break;
35,687✔
2040
                }
2041
                case STMT2_IFACE: {
510✔
2042
                    // tbnames
2043
                    bindv->tbnames[i] = childTbl->name;
510✔
2044

2045
                    // tags
2046
                    if (stbInfo->autoTblCreating && firstInsertTb) {
510!
2047
                        // create
2048
                        if (w == 0) {
×
2049
                            // recreate sample tags
2050
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, pThreadInfo->tagsStmt, index)) {
×
2051
                                goto free_of_interlace;
×
2052
                            }
2053
                        }
2054

2055
                        if (csvFile) {
×
2056
                            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
×
2057
                                g_fail = true;
×
2058
                                goto free_of_interlace;
×
2059
                            }
2060
                        }
2061
                    
2062
                        bindVTags(bindv, i, w, pThreadInfo->tagsStmt);
×
2063
                    }
2064

2065
                    // cols
2066
                    int32_t n = 0;
510✔
2067
                    generated += bindVColsInterlace(bindv, i, pThreadInfo, interlaceRows, childTbl->ts, pos, 
510✔
2068
                                                    childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n);                    
2069
                    // move next
2070
                    pos += interlaceRows;
511✔
2071
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
511!
2072
                        pos = 0;
×
2073
                    }
2074
                    childTbl->ts += stbInfo->timestamp_step * n;
511✔
2075
                    if (stbInfo->autoTblCreating) {
511!
2076
                        w += 1;
×
2077
                        if (w >= TAG_BATCH_COUNT) {
×
2078
                            // reset for gen again
2079
                            w = 0;
×
2080
                            index += TAG_BATCH_COUNT;
×
2081
                        }
2082
                    }
2083

2084
                    break;
511✔
2085
                }
2086
                case SML_REST_IFACE:
327✔
2087
                case SML_IFACE: {
2088
                    int protocol = stbInfo->lineProtocol;
327✔
2089
                    for (int64_t j = 0; j < interlaceRows; j++) {
1,598✔
2090
                        int64_t disorderTs = getDisorderTs(stbInfo,
1,270✔
2091
                                &disorderRange);
2092
                        if (TSDB_SML_JSON_PROTOCOL == protocol) {
1,271✔
2093
                            tools_cJSON *tag = tools_cJSON_Duplicate(
479✔
2094
                                tools_cJSON_GetArrayItem(
479✔
2095
                                    pThreadInfo->sml_json_tags,
479✔
2096
                                    (int)tableSeq -
479✔
2097
                                        pThreadInfo->start_table_from),
479✔
2098
                                    true);
2099
                            generateSmlJsonCols(
479✔
2100
                                pThreadInfo->json_array, tag, stbInfo,
2101
                                database->sml_precision,
479✔
2102
                                    disorderTs?disorderTs:childTbl->ts);
2103
                        } else if (SML_JSON_TAOS_FORMAT == protocol) {
792!
2104
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
2105
                                tools_cJSON_GetArrayItem(
×
2106
                                    pThreadInfo->sml_json_tags,
×
2107
                                    (int)tableSeq -
×
2108
                                        pThreadInfo->start_table_from),
×
2109
                                    true);
2110
                            generateSmlTaosJsonCols(
×
2111
                                pThreadInfo->json_array, tag, stbInfo,
2112
                                database->sml_precision,
×
2113
                                disorderTs?disorderTs:childTbl->ts);
2114
                        } else if (TSDB_SML_LINE_PROTOCOL == protocol) {
792✔
2115
                            snprintf(
320!
2116
                                pThreadInfo->lines[generated],
320✔
2117
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
320✔
2118
                                "%s %s %" PRId64,
2119
                                pThreadInfo
2120
                                    ->sml_tags[(int)tableSeq -
320✔
2121
                                               pThreadInfo->start_table_from],
320✔
2122
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
320✔
2123
                                disorderTs?disorderTs:childTbl->ts);
2124
                        } else {
2125
                            snprintf(
472✔
2126
                                pThreadInfo->lines[generated],
472✔
2127
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
472✔
2128
                                "%s %" PRId64 " %s %s", stbInfo->stbName,
2129
                                disorderTs?disorderTs:childTbl->ts,
2130
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
472✔
2131
                                pThreadInfo
2132
                                    ->sml_tags[(int)tableSeq -
472✔
2133
                                               pThreadInfo->start_table_from]);
472✔
2134
                        }
2135
                        generated++;
1,271✔
2136
                        // primary key
2137
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
1,271!
2138
                            childTbl->ts += stbInfo->timestamp_step;
1,272✔
2139
                        }
2140
                    }
2141
                    if (TSDB_SML_JSON_PROTOCOL == protocol
328✔
2142
                            || SML_JSON_TAOS_FORMAT == protocol) {
159!
2143
                        pThreadInfo->lines[0] =
168✔
2144
                            tools_cJSON_PrintUnformatted(
168✔
2145
                                pThreadInfo->json_array);
168✔
2146
                    }
2147
                    break;
328✔
2148
                }
2149
            }
2150

2151
            // move to next table in one batch
2152
            tableSeq++;
67,018✔
2153
            tmp_total_insert_rows += interlaceRows;
67,018✔
2154
            if (tableSeq > pThreadInfo->end_table_to) {
67,018✔
2155
                // first insert tables loop is end
2156
                firstInsertTb = false;
27,030✔
2157
                // one tables loop timestamp and pos add 
2158
                tableSeq = pThreadInfo->start_table_from;
27,030✔
2159
                // save    
2160
                pThreadInfo->pos = pos;    
27,030✔
2161
                if (!stbInfo->non_stop) {
27,030!
2162
                    insertRows -= interlaceRows;
27,036✔
2163
                }
2164

2165
                // if fillBack mode , can't sleep
2166
                if (stbInfo->insert_interval > 0 && !fillBack) {
27,030!
2167
                    debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
3,289!
2168
                          __func__, __LINE__, stbInfo->insert_interval);
2169
                    perfPrint("sleep %" PRIu64 " ms\n",
3,289!
2170
                                     stbInfo->insert_interval);
2171
                    toolsMsleep((int32_t)stbInfo->insert_interval);
3,289✔
2172
                }
2173

2174
                i++;
27,022✔
2175
                // rectify bind count
2176
                if (bindv && bindv->count != i) {
27,022!
2177
                    bindv->count = i;
×
2178
                }                
2179
                break;
27,022✔
2180
            }
2181
        }
2182

2183
        // exec
2184
        if(stbInfo->iface == STMT2_IFACE) {
27,029✔
2185
            // exec stmt2
2186
            if(g_arguments->debug_print)
212!
2187
                showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
2188
            // bind & exec stmt2
2189
            if (submitStmt2(pThreadInfo, bindv, &delay1, &delay3, &startTs, &endTs, &generated, w) != 0) {
212!
2190
                g_fail = true;
×
2191
                goto free_of_interlace;
×
2192
            }
2193
        } else {
2194
            // exec other
2195
            startTs = toolsGetTimestampUs();
26,817✔
2196
            if (execInsert(pThreadInfo, generated, &delay3)) {
26,851!
2197
                g_fail = true;
×
2198
                goto free_of_interlace;
×
2199
            }
2200
            endTs = toolsGetTimestampUs();
26,842✔
2201
        }
2202

2203
        debugPrint("execInsert tableIndex=%d left insert rows=%"PRId64" generated=%d\n", i, insertRows, generated);
27,068!
2204
                
2205
        // reset count
2206
        if(bindv) {
27,057✔
2207
            bindv->count = 0;
211✔
2208
        }            
2209

2210
        pThreadInfo->totalInsertRows += tmp_total_insert_rows;
27,057✔
2211

2212
        if (g_arguments->terminate) {
27,057!
2213
            goto free_of_interlace;
×
2214
        }
2215

2216
        int protocol = stbInfo->lineProtocol;
27,057✔
2217
        switch (stbInfo->iface) {
27,057✔
2218
            case TAOSC_IFACE:
24,031✔
2219
            case REST_IFACE:
2220
                debugPrint("pThreadInfo->buffer: %s\n",
24,031!
2221
                           pThreadInfo->buffer);
2222
                free_ds(&pThreadInfo->buffer);
24,031✔
2223
                pThreadInfo->buffer = new_ds(0);
24,037✔
2224
                break;
24,027✔
2225
            case SML_REST_IFACE:
90✔
2226
                memset(pThreadInfo->buffer, 0,
90✔
2227
                       g_arguments->reqPerReq * (pThreadInfo->max_sql_len + 1));
90✔
2228
            case SML_IFACE:
177✔
2229
                if (TSDB_SML_JSON_PROTOCOL == protocol
177✔
2230
                        || SML_JSON_TAOS_FORMAT == protocol) {
96!
2231
                    debugPrint("pThreadInfo->lines[0]: %s\n",
81!
2232
                               pThreadInfo->lines[0]);
2233
                    if (pThreadInfo->json_array && !g_arguments->terminate) {
81!
2234
                        tools_cJSON_Delete(pThreadInfo->json_array);
84✔
2235
                        pThreadInfo->json_array = NULL;
84✔
2236
                    }
2237
                    pThreadInfo->json_array = tools_cJSON_CreateArray();
81✔
2238
                    if (pThreadInfo->lines && pThreadInfo->lines[0]) {
84!
2239
                        tmfree(pThreadInfo->lines[0]);
84✔
2240
                        pThreadInfo->lines[0] = NULL;
84✔
2241
                    }
2242
                } else {
2243
                    for (int j = 0; j < generated; j++) {
886✔
2244
                        if (pThreadInfo && pThreadInfo->lines
794!
2245
                                && !g_arguments->terminate) {
791!
2246
                            debugPrint("pThreadInfo->lines[%d]: %s\n", j,
792!
2247
                                       pThreadInfo->lines[j]);
2248
                            memset(pThreadInfo->lines[j], 0,
788✔
2249
                                   pThreadInfo->max_sql_len);
2250
                        }
2251
                    }
2252
                }
2253
                break;
176✔
2254
            case STMT_IFACE:
2,635✔
2255
                break;
2,635✔
2256
        }
2257

2258
        int64_t delay4 = endTs - startTs;
27,052✔
2259
        int64_t delay = delay1 + delay2 + delay3 + delay4;
27,052✔
2260
        if (delay <=0) {
27,052!
2261
            debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
×
2262
                       pThreadInfo->threadID, startTs, endTs);
2263
        } else {
2264
            perfPrint("insert execution time is %10.2f ms\n",
27,052!
2265
                      delay / 1E6);
2266

2267
            int64_t * pdelay = benchCalloc(1, sizeof(int64_t), false);
27,052✔
2268
            *pdelay = delay;
27,049✔
2269
            if (benchArrayPush(pThreadInfo->delayList, pdelay) == NULL) {
27,049!
2270
                tmfree(pdelay);
×
2271
            }
2272
            pThreadInfo->totalDelay += delay;
27,077✔
2273
            pThreadInfo->totalDelay1 += delay1;
27,077✔
2274
            pThreadInfo->totalDelay2 += delay2;
27,077✔
2275
            pThreadInfo->totalDelay3 += delay3;
27,077✔
2276
        }
2277
        delay1 = delay2 = delay3 = 0;
27,077✔
2278

2279
        int64_t currentPrintTime = toolsGetTimestampMs();
27,077✔
2280
        if (currentPrintTime - lastPrintTime > 30 * 1000) {
27,073!
2281
            infoPrint(
×
2282
                    "thread[%d] has currently inserted rows: %" PRIu64
2283
                    ", peroid insert rate: %.3f rows/s \n",
2284
                    pThreadInfo->threadID, pThreadInfo->totalInsertRows,
2285
                    (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
2286
            lastPrintTime = currentPrintTime;
×
2287
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
2288
        }
2289
    }
2290

2291
free_of_interlace:
126✔
2292
    cleanupAndPrint(pThreadInfo, "interlace");
126✔
2293
    if(csvFile) {
121!
2294
        fclose(csvFile);
×
2295
    }
2296
    tmfree(tagData);
121✔
2297
    freeBindV(bindv);
121✔
2298
    return NULL;
121✔
2299
}
2300

2301
static int32_t prepareProgressDataStmt(
52,159✔
2302
        threadInfo *pThreadInfo,
2303
        SChildTable *childTbl,
2304
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) {
2305
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
52,159✔
2306
    char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
52,159✔
2307
    if (g_arguments->escape_character) {
52,159✔
2308
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN + 2,
39,999✔
2309
                 "`%s`", childTbl->name);
2310
    } else {
2311
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
12,160✔
2312
                 childTbl->name);
2313
    }
2314
    int64_t start = toolsGetTimestampUs();
52,159✔
2315
    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
52,204!
2316
                             escapedTbName)) {
2317
        errorPrint(
×
2318
                "taos_stmt_set_tbname(%s) failed,"
2319
                "reason: %s\n", escapedTbName,
2320
                taos_stmt_errstr(pThreadInfo->conn->stmt));
2321
        return -1;
×
2322
    }
2323
    *delay1 = toolsGetTimestampUs() - start;
52,169✔
2324
    int32_t n = 0;
52,164✔
2325
    int64_t pos = i % g_arguments->prepared_rand;
52,164✔
2326
    if (g_arguments->prepared_rand - pos < g_arguments->reqPerReq) {
52,164!
2327
        // remain prepare data less than batch, reset pos to zero
2328
        pos = 0;
×
2329
    }
2330
    int32_t generated = bindParamBatch(
104,342✔
2331
            pThreadInfo,
2332
            (g_arguments->reqPerReq > (stbInfo->insertRows - i))
52,164✔
2333
                ? (stbInfo->insertRows - i)
2334
                : g_arguments->reqPerReq,
52,164✔
2335
            *timestamp, pos, childTbl, pkCur, pkCnt, &n, delay2, delay3);
2336
    *timestamp += n * stbInfo->timestamp_step;
52,178✔
2337
    return generated;
52,178✔
2338
}
2339

2340
static void makeTimestampDisorder(
160✔
2341
        int64_t *timestamp, SSuperTable *stbInfo) {
2342
    int64_t startTimestamp = stbInfo->startTimestamp;
160✔
2343
    int disorderRange = stbInfo->disorderRange;
160✔
2344
    int rand_num = taosRandom() % 100;
160✔
2345
    if (rand_num < stbInfo->disorderRatio) {
160!
2346
        disorderRange--;
×
2347
        if (0 == disorderRange) {
×
2348
            disorderRange = stbInfo->disorderRange;
×
2349
        }
2350
        *timestamp = startTimestamp - disorderRange;
×
2351
        debugPrint("rand_num: %d, < disorderRatio: %d"
×
2352
                   ", ts: %"PRId64"\n",
2353
                   rand_num,
2354
                   stbInfo->disorderRatio,
2355
                   *timestamp);
2356
    }
2357
}
160✔
2358

2359
static int32_t prepareProgressDataSmlJsonText(
290✔
2360
    threadInfo *pThreadInfo,
2361
    uint64_t tableSeq,
2362
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2363
    // prepareProgressDataSmlJsonText
2364
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
290✔
2365
    int32_t generated = 0;
290✔
2366

2367
    int len = 0;
290✔
2368

2369
    char *line = pThreadInfo->lines[0];
290✔
2370
    uint32_t line_buf_len = pThreadInfo->line_buf_len;
290✔
2371

2372
    strncat(line + len, "[", 2);
290✔
2373
    len += 1;
290✔
2374

2375
    int32_t pos = 0;
290✔
2376
    for (int j = 0; (j < g_arguments->reqPerReq)
290✔
2377
            && !g_arguments->terminate; j++) {
3,029!
2378
        strncat(line + len, "{", 2);
2,889✔
2379
        len += 1;
2,889✔
2380
        int n;
2381
        n = snprintf(line + len, line_buf_len - len,
2,889✔
2382
                 "\"timestamp\":%"PRId64",", *timestamp);
2383
        if (n < 0 || n >= line_buf_len - len) {
2,889!
2384
            errorPrint("%s() LN%d snprintf overflow on %d\n",
1!
2385
                       __func__, __LINE__, j);
2386
            return -1;
×
2387
        } else {
2388
            len += n;
2,888✔
2389
        }
2390

2391
        n = snprintf(line + len, line_buf_len - len, "%s",
2,888✔
2392
                        pThreadInfo->sml_json_value_array[tableSeq]);
2,888✔
2393
        if (n < 0 || n >= line_buf_len - len) {
2,888!
2394
            errorPrint("%s() LN%d snprintf overflow on %d\n",
1!
2395
                       __func__, __LINE__, j);
2396
            return -1;
×
2397
        } else {
2398
            len += n;
2,887✔
2399
        }
2400
        n = snprintf(line + len, line_buf_len - len, "\"tags\":%s,",
2,887✔
2401
                       pThreadInfo->sml_tags_json_array[tableSeq]);
2,887✔
2402
        if (n < 0 || n >= line_buf_len - len) {
2,887!
2403
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2404
                       __func__, __LINE__, j);
2405
            return -1;
×
2406
        } else {
2407
            len += n;
2,887✔
2408
        }
2409
        n = snprintf(line + len, line_buf_len - len,
2,887✔
2410
                       "\"metric\":\"%s\"}", stbInfo->stbName);
2411
        if (n < 0 || n >= line_buf_len - len) {
2,887!
2412
            errorPrint("%s() LN%d snprintf overflow on %d\n",
1!
2413
                       __func__, __LINE__, j);
2414
            return -1;
×
2415
        } else {
2416
            len += n;
2,886✔
2417
        }
2418

2419
        pos++;
2,886✔
2420
        if (pos >= g_arguments->prepared_rand) {
2,886✔
2421
            pos = 0;
289✔
2422
        }
2423

2424
        // primay key repeat ts count
2425
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
2,886!
2426
            *timestamp += stbInfo->timestamp_step;
2,885✔
2427
        }
2428

2429
        if (stbInfo->disorderRatio > 0) {
2,886!
2430
            makeTimestampDisorder(timestamp, stbInfo);
×
2431
        }
2432
        generated++;
2,886✔
2433
        if (i + generated >= stbInfo->insertRows) {
2,886✔
2434
            break;
147✔
2435
        }
2436
        if ((j+1) < g_arguments->reqPerReq) {
2,739✔
2437
            strncat(line + len, ",", 2);
2,596✔
2438
            len += 1;
2,596✔
2439
        }
2440
    }
2441
    strncat(line + len, "]", 2);
287✔
2442

2443
    debugPrint("%s() LN%d, lines[0]: %s\n",
287!
2444
               __func__, __LINE__, pThreadInfo->lines[0]);
2445
    return generated;
291✔
2446
}
2447

2448
static int32_t prepareProgressDataSmlJson(
147✔
2449
    threadInfo *pThreadInfo,
2450
    uint64_t tableSeq,
2451
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2452
    // prepareProgressDataSmlJson
2453
    SDataBase *  database = pThreadInfo->dbInfo;
147✔
2454
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
147✔
2455
    int32_t generated = 0;
147✔
2456

2457
    int32_t pos = 0;
147✔
2458
    for (int j = 0; (j < g_arguments->reqPerReq)
147✔
2459
            && !g_arguments->terminate; j++) {
1,521!
2460
        tools_cJSON *tag = tools_cJSON_Duplicate(
1,450✔
2461
                tools_cJSON_GetArrayItem(
1,450✔
2462
                    pThreadInfo->sml_json_tags,
1,450✔
2463
                    (int)tableSeq -
1,450✔
2464
                    pThreadInfo->start_table_from),
1,450✔
2465
                true);
2466
        debugPrintJsonNoTime(tag);
1,451!
2467
        generateSmlTaosJsonCols(
1,451✔
2468
                pThreadInfo->json_array, tag, stbInfo,
2469
                database->sml_precision, *timestamp);
1,451✔
2470
        pos++;
1,449✔
2471
        if (pos >= g_arguments->prepared_rand) {
1,449✔
2472
            pos = 0;
145✔
2473
        }
2474

2475
        // primay key repeat ts count
2476
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
1,449!
2477
            *timestamp += stbInfo->timestamp_step;
1,449✔
2478
        }
2479

2480
        if (stbInfo->disorderRatio > 0) {
1,449!
2481
            makeTimestampDisorder(timestamp, stbInfo);
×
2482
        }
2483
        generated++;
1,449✔
2484
        if (i + generated >= stbInfo->insertRows) {
1,449✔
2485
            break;
75✔
2486
        }
2487
    }
2488

2489
    tmfree(pThreadInfo->lines[0]);
146✔
2490
    pThreadInfo->lines[0] = NULL;
147✔
2491
    pThreadInfo->lines[0] =
294✔
2492
            tools_cJSON_PrintUnformatted(
147✔
2493
                pThreadInfo->json_array);
147✔
2494
    debugPrint("pThreadInfo->lines[0]: %s\n",
147!
2495
                   pThreadInfo->lines[0]);
2496

2497
    return generated;
147✔
2498
}
2499

2500
static int32_t prepareProgressDataSmlLineOrTelnet(
305✔
2501
    threadInfo *pThreadInfo, uint64_t tableSeq, char *sampleDataBuf,
2502
    int64_t *timestamp, uint64_t i, char *ttl, int protocol, int32_t *pkCur, int32_t *pkCnt) {
2503
    // prepareProgressDataSmlLine
2504
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
305✔
2505
    int32_t generated = 0;
305✔
2506

2507
    int32_t pos = 0;
305✔
2508
    for (int j = 0; (j < g_arguments->reqPerReq)
305✔
2509
            && !g_arguments->terminate; j++) {
66,302✔
2510
        // table index
2511
        int ti = tableSeq - pThreadInfo->start_table_from;
58,113✔
2512
        if (TSDB_SML_LINE_PROTOCOL == protocol) {
58,113✔
2513
            snprintf(
55,708✔
2514
                    pThreadInfo->lines[j],
55,708✔
2515
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
55,708✔
2516
                    "%s %s %" PRId64,
2517
                    pThreadInfo->sml_tags[ti],
55,708✔
2518
                    sampleDataBuf + pos * stbInfo->lenOfCols,
55,708✔
2519
                    *timestamp);
2520
        } else {
2521
            snprintf(
2,405✔
2522
                    pThreadInfo->lines[j],
2,405✔
2523
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
2,405✔
2524
                    "%s %" PRId64 " %s %s", stbInfo->stbName,
2525
                    *timestamp,
2526
                    sampleDataBuf
2527
                    + pos * stbInfo->lenOfCols,
2,405✔
2528
                    pThreadInfo->sml_tags[ti]);
2,405✔
2529
        }
2530
        //infoPrint("sml prepare j=%d stb=%s sml_tags=%s \n", j, stbInfo->stbName, pThreadInfo->sml_tags[ti]);
2531
        pos++;
58,113✔
2532
        if (pos >= g_arguments->prepared_rand) {
58,113✔
2533
            pos = 0;
273✔
2534
        }
2535
        // primay key repeat ts count
2536
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
58,113!
2537
            *timestamp += stbInfo->timestamp_step;
65,673✔
2538
        }
2539
        
2540
        if (stbInfo->disorderRatio > 0) {
58,113✔
2541
            makeTimestampDisorder(timestamp, stbInfo);
160✔
2542
        }
2543
        generated++;
66,158✔
2544
        if (i + generated >= stbInfo->insertRows) {
66,158✔
2545
            break;
161✔
2546
        }
2547
    }
2548
    return generated;
8,350✔
2549
}
2550

2551
static int32_t prepareProgressDataSml(
739✔
2552
    threadInfo *pThreadInfo,
2553
    SChildTable *childTbl,
2554
    uint64_t tableSeq,
2555
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2556
    // prepareProgressDataSml
2557
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
739✔
2558

2559
    char *sampleDataBuf;
2560
    if (childTbl->useOwnSample) {
739!
2561
        sampleDataBuf = childTbl->sampleDataBuf;
×
2562
    } else {
2563
        sampleDataBuf = stbInfo->sampleDataBuf;
739✔
2564
    }
2565
    int protocol = stbInfo->lineProtocol;
739✔
2566
    int32_t generated = -1;
739✔
2567
    switch (protocol) {
739!
2568
        case TSDB_SML_LINE_PROTOCOL:
305✔
2569
        case TSDB_SML_TELNET_PROTOCOL:
2570
            generated = prepareProgressDataSmlLineOrTelnet(
305✔
2571
                    pThreadInfo,
2572
                    tableSeq,
2573
                    sampleDataBuf,
2574
                    timestamp, i, ttl, protocol, pkCur, pkCnt);
2575
            break;
305✔
2576
        case TSDB_SML_JSON_PROTOCOL:
290✔
2577
            generated = prepareProgressDataSmlJsonText(
290✔
2578
                    pThreadInfo,
2579
                    tableSeq - pThreadInfo->start_table_from,
290✔
2580
                timestamp, i, ttl, pkCur, pkCnt);
2581
            break;
291✔
2582
        case SML_JSON_TAOS_FORMAT:
147✔
2583
            generated = prepareProgressDataSmlJson(
147✔
2584
                    pThreadInfo,
2585
                    tableSeq,
2586
                    timestamp, i, ttl, pkCur, pkCnt);
2587
            break;
147✔
2588
        default:
×
2589
            errorPrint("%s() LN%d: unknown protcolor: %d\n",
×
2590
                       __func__, __LINE__, protocol);
2591
            break;
×
2592
    }
2593

2594
    return generated;
743✔
2595
}
2596

2597
// if return true, timestmap must add timestap_step, else timestamp no need changed
2598
bool needChangeTs(SSuperTable * stbInfo, int32_t *pkCur, int32_t *pkCnt) {
202,936,063✔
2599
    // check need generate cnt
2600
    if(*pkCnt == 0) {
202,936,063!
2601
        if (stbInfo->repeat_ts_min >= stbInfo->repeat_ts_max) {
202,992,766!
2602
            // fixed count value is max
2603
            if (stbInfo->repeat_ts_max == 0){
203,010,310!
2604
                return true;
203,011,388✔
2605
            }
2606

2607
            *pkCnt = stbInfo->repeat_ts_max;
×
2608
        } else {
2609
            // random range
2610
            *pkCnt = RD(stbInfo->repeat_ts_max + 1);
×
2611
            if(*pkCnt < stbInfo->repeat_ts_min) {
×
2612
                *pkCnt = (*pkCnt + stbInfo->repeat_ts_min) % stbInfo->repeat_ts_max;
×
2613
            }
2614
        }
2615
    }
2616

2617
    // compare with current value
2618
    *pkCur = *pkCur + 1;
×
2619
    if(*pkCur >= *pkCnt) {
×
2620
        // reset zero
2621
        *pkCur = 0;
990✔
2622
        *pkCnt = 0;
990✔
2623
        return true;
990✔
2624
    } else {
2625
        // add one
2626
        return false;
×
2627
    }
2628
}
2629

2630
static int32_t prepareProgressDataSql(
7,961,752✔
2631
                    threadInfo *pThreadInfo,
2632
                    SChildTable *childTbl, 
2633
                    char* tagData,
2634
                    uint64_t tableSeq,
2635
                    char *sampleDataBuf,
2636
                    int64_t *timestamp, uint64_t i, char *ttl,
2637
                    int32_t *pos, uint64_t *len, int32_t* pkCur, int32_t* pkCnt) {
2638
    // prepareProgressDataSql
2639
    int32_t generated = 0;
7,961,752✔
2640
    SDataBase *database = pThreadInfo->dbInfo;
7,961,752✔
2641
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
7,961,752✔
2642
    char *  pstr = pThreadInfo->buffer;
7,961,752✔
2643
    int disorderRange = stbInfo->disorderRange;
7,961,752✔
2644

2645
    if (stbInfo->partialColNum == stbInfo->cols->size) {
7,961,752!
2646
        if (stbInfo->autoTblCreating) {
7,967,231✔
2647
            *len =
60✔
2648
                snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
60✔
2649
                        g_arguments->escape_character
60✔
2650
                        ? "%s `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s VALUES "
2651
                        : "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ",
2652
                         STR_INSERT_INTO, database->dbName,
2653
                         childTbl->name, database->dbName,
2654
                         stbInfo->stbName,
2655
                         tagData +
2656
                         stbInfo->lenOfTags * tableSeq, ttl);
60!
2657
        } else {
2658
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
7,967,171✔
2659
                    g_arguments->escape_character
7,967,171✔
2660
                           ? "%s `%s`.`%s` VALUES "
2661
                           : "%s %s.%s VALUES ",
2662
                           STR_INSERT_INTO,
2663
                           database->dbName, childTbl->name);
2664
        }
2665
    } else {
2666
        if (stbInfo->autoTblCreating) {
×
2667
            *len = snprintf(
16✔
2668
                    pstr, TSDB_MAX_ALLOWED_SQL_LEN,
2669
                    g_arguments->escape_character
16✔
2670
                    ? "%s `%s`.`%s` (%s) USING `%s`.`%s` TAGS (%s) %s VALUES "
2671
                    : "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ",
2672
                    STR_INSERT_INTO, database->dbName,
2673
                    childTbl->name,
2674
                    stbInfo->partialColNameBuf,
2675
                    database->dbName, stbInfo->stbName,
2676
                    tagData +
2677
                    stbInfo->lenOfTags * tableSeq, ttl);
16!
2678
        } else {
2679
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
×
2680
                    g_arguments->escape_character
×
2681
                    ? "%s `%s`.`%s` (%s) VALUES "
2682
                    : "%s %s.%s (%s) VALUES ",
2683
                    STR_INSERT_INTO, database->dbName,
2684
                    childTbl->name,
2685
                    stbInfo->partialColNameBuf);
2686
        }
2687
    }
2688

2689
    char *ownSampleDataBuf;
2690
    if (childTbl->useOwnSample) {
7,961,752✔
2691
        debugPrint("%s is using own sample data\n",
6!
2692
                  childTbl->name);
2693
        ownSampleDataBuf = childTbl->sampleDataBuf;
6✔
2694
    } else {
2695
        ownSampleDataBuf = stbInfo->sampleDataBuf;
7,961,746✔
2696
    }
2697
    for (int j = 0; j < g_arguments->reqPerReq; j++) {
576,326,870✔
2698
        if (stbInfo->useSampleTs
568,920,875✔
2699
                && (!stbInfo->random_data_source)) {
103!
2700
            *len +=
103✔
2701
                snprintf(pstr + *len,
103✔
2702
                         TSDB_MAX_ALLOWED_SQL_LEN - *len, "(%s)",
103✔
2703
                         sampleDataBuf +
2704
                         *pos * stbInfo->lenOfCols);
103✔
2705
        } else {
2706
            int64_t disorderTs = getDisorderTs(stbInfo, &disorderRange);
568,920,772✔
2707
            *len += snprintf(pstr + *len,
568,475,497✔
2708
                            TSDB_MAX_ALLOWED_SQL_LEN - *len,
568,475,497✔
2709
                            "(%" PRId64 ",%s)",
2710
                            disorderTs?disorderTs:*timestamp,
2711
                            ownSampleDataBuf +
2712
                            *pos * stbInfo->lenOfCols);
568,475,497!
2713
        }
2714
        *pos += 1;
568,475,600✔
2715
        if (*pos >= g_arguments->prepared_rand) {
568,475,600✔
2716
            *pos = 0;
91,344✔
2717
        }
2718
        // primary key
2719
        if(!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
568,475,600!
2720
            *timestamp += stbInfo->timestamp_step;
568,841,741✔
2721
        }
2722
   
2723
        generated++;
568,417,232✔
2724
        if (*len > (TSDB_MAX_ALLOWED_SQL_LEN
568,417,232✔
2725
            - stbInfo->lenOfCols)) {
568,417,232✔
2726
            break;
632✔
2727
        }
2728
        if (i + generated >= stbInfo->insertRows) {
568,416,600✔
2729
            break;
51,482✔
2730
        }
2731
    }
2732

2733
    return generated;
7,458,109✔
2734
}
2735

2736
void *syncWriteProgressive(void *sarg) {
2,446✔
2737
    threadInfo * pThreadInfo = (threadInfo *)sarg;
2,446✔
2738
    SDataBase *  database = pThreadInfo->dbInfo;
2,446✔
2739
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
2,446✔
2740

2741
    loadChildTableInfo(pThreadInfo);
2,446✔
2742

2743
    // special deal flow for TAOSC_IFACE
2744
    if (insertDataMix(pThreadInfo, database, stbInfo)) {
2,453✔
2745
        // request be dealt by this function , so return
2746
        return NULL;
704✔
2747
    }
2748

2749
    infoPrint(
1,744✔
2750
        "thread[%d] start progressive inserting into table from "
2751
        "%" PRIu64 " to %" PRIu64 "\n",
2752
        pThreadInfo->threadID, pThreadInfo->start_table_from,
2753
        pThreadInfo->end_table_to + 1);
2754

2755
    uint64_t  lastPrintTime = toolsGetTimestampMs();
1,750✔
2756
    uint64_t  lastTotalInsertRows = 0;
1,750✔
2757
    int64_t   startTs = toolsGetTimestampUs();
1,750✔
2758
    int64_t   endTs;
2759

2760
    FILE* csvFile = NULL;
1,749✔
2761
    char* tagData = NULL;
1,749✔
2762
    bool  stmt    = (stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating;
1,749✔
2763
    bool  smart   = SMART_IF_FAILED == stbInfo->continueIfFail;
1,749✔
2764
    bool  acreate = (stbInfo->iface == TAOSC_IFACE || stbInfo->iface == REST_IFACE) && stbInfo->autoTblCreating;
1,749✔
2765
    int   w       = 0;
1,749✔
2766
    if (stmt || smart || acreate) {
1,749✔
2767
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
42✔
2768
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
43✔
2769
    }
2770

2771
    bool oldInitStmt = stbInfo->autoTblCreating;
1,750✔
2772
    // stmt.  not auto table create call on stmt
2773
    if (stbInfo->iface == STMT_IFACE && !oldInitStmt) {
1,750✔
2774
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
63!
2775
            g_fail = true;
×
2776
            goto free_of_progressive;
×
2777
        }
2778
    }
2779
    else if (stbInfo->iface == STMT2_IFACE && !stbInfo->autoTblCreating) {
1,687✔
2780
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
16!
2781
            g_fail = true;
×
2782
            goto free_of_progressive;
×
2783
        }
2784
    }
2785
    
2786
    //
2787
    // loop write each child table
2788
    //
2789
    int16_t index = pThreadInfo->start_table_from;
1,748✔
2790
    for (uint64_t tableSeq = pThreadInfo->start_table_from;
1,748✔
2791
            tableSeq <= pThreadInfo->end_table_to; tableSeq++) {
85,378✔
2792
        char *sampleDataBuf;
2793
        SChildTable *childTbl;
2794

2795
        if (g_arguments->bind_vgroup) {
84,036!
2796
            childTbl = pThreadInfo->vg->childTblArray[tableSeq];
×
2797
        } else {
2798
            childTbl = stbInfo->childTblArray[tableSeq];
84,036✔
2799
        }
2800
        debugPrint("tableSeq=%"PRId64" childTbl->name=%s\n", tableSeq, childTbl->name);
84,036!
2801

2802
        if (childTbl->useOwnSample) {
84,070✔
2803
            sampleDataBuf = childTbl->sampleDataBuf;
11✔
2804
        } else {
2805
            sampleDataBuf = stbInfo->sampleDataBuf;
84,059✔
2806
        }
2807

2808
        int64_t  timestamp = pThreadInfo->start_time;
84,070✔
2809
        uint64_t len = 0;
84,070✔
2810
        int32_t pos = 0;
84,070✔
2811
        int32_t pkCur = 0; // record generate same timestamp current count
84,070✔
2812
        int32_t pkCnt = 0; // record generate same timestamp count
84,070✔
2813
        int64_t delay1 = 0;
84,070✔
2814
        int64_t delay2 = 0;
84,070✔
2815
        int64_t delay3 = 0;
84,070✔
2816

2817
        if(stmt || smart || acreate) {
84,070!
2818
            // generator
2819
            if (w == 0) {
20,083✔
2820
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
223!
2821
                    g_fail = true;
×
2822
                    goto free_of_progressive;
420✔
2823
                }
2824
            }
2825
        }
2826

2827
        // old init stmt must call for each table
2828
        if (stbInfo->iface == STMT_IFACE && oldInitStmt) {
84,070✔
2829
            if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
20,013!
2830
                g_fail = true;
×
2831
                goto free_of_progressive;
×
2832
            }
2833
        }
2834
        else if (stbInfo->iface == STMT2_IFACE && stbInfo->autoTblCreating) {
64,057✔
2835
            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
4!
2836
                g_fail = true;
×
2837
                goto free_of_progressive;
×
2838
            }
2839
        }
2840
        
2841
        if(stmt || smart || acreate) {
84,050!
2842
            // move next
2843
            if (++w >= TAG_BATCH_COUNT) {
20,063✔
2844
                // reset for gen again
2845
                w = 0;
200✔
2846
                index += TAG_BATCH_COUNT;
200✔
2847
            } 
2848
        }
2849

2850
        char ttl[SMALL_BUFF_LEN] = "";
84,050✔
2851
        if (stbInfo->ttl != 0) {
84,050✔
2852
            snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
26✔
2853
        }
2854
        for (uint64_t i = 0; i < stbInfo->insertRows;) {
8,013,451✔
2855
            if (g_arguments->terminate) {
8,013,445!
2856
                goto free_of_progressive;
×
2857
            }
2858
            int32_t generated = 0;
8,013,445✔
2859
            switch (stbInfo->iface) {
8,013,445!
2860
                case TAOSC_IFACE:
7,963,442✔
2861
                case REST_IFACE:                
2862
                    generated = prepareProgressDataSql(
7,963,442✔
2863
                            pThreadInfo,
2864
                            childTbl,
2865
                            tagData,
2866
                            w,
2867
                            sampleDataBuf,
2868
                            &timestamp, i, ttl, &pos, &len, &pkCur, &pkCnt);
2869
                    break;        
7,958,455✔
2870
                case STMT_IFACE: {
52,186✔
2871
                    generated = prepareProgressDataStmt(
52,186✔
2872
                            pThreadInfo,
2873
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3);
2874
                    break;
52,179✔
2875
                }
2876
                case STMT2_IFACE: {
116✔
2877
                    generated = stmt2BindAndSubmit(
116✔
2878
                            pThreadInfo,
2879
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1,
2880
                            &delay3, &startTs, &endTs, w);
2881
                    break;
117✔
2882
                }
2883
                case SML_REST_IFACE:
742✔
2884
                case SML_IFACE:
2885
                    generated = prepareProgressDataSml(
742✔
2886
                            pThreadInfo,
2887
                            childTbl,
2888
                            tableSeq, &timestamp, i, ttl, &pkCur, &pkCnt);
2889
                    break;
742✔
2890
                default:
×
2891
                    break;
×
2892
            }
2893
            if (generated < 0) {
8,008,452!
2894
                g_fail = true;
×
2895
                goto free_of_progressive;
×
2896
            }
2897
            if (!stbInfo->non_stop) {
8,008,452!
2898
                i += generated;
8,014,681✔
2899
            }
2900

2901
            // stmt2 execInsert already execute on stmt2BindAndSubmit
2902
            if (stbInfo->iface != STMT2_IFACE) {
8,008,452!
2903
                // no stmt2 exec
2904
                startTs = toolsGetTimestampUs();
8,014,692✔
2905
                int code = execInsert(pThreadInfo, generated, &delay3);
8,019,244✔
2906
                if (code) {
8,000,636✔
2907
                    if (NO_IF_FAILED == stbInfo->continueIfFail) {
472✔
2908
                        warnPrint("The super table parameter "
404!
2909
                                "continueIfFail: %d, STOP insertion!\n",
2910
                                stbInfo->continueIfFail);
2911
                        g_fail = true;
404✔
2912
                        goto free_of_progressive;
404✔
2913
                    } else if (YES_IF_FAILED == stbInfo->continueIfFail) {
68!
2914
                        infoPrint("The super table parameter "
68✔
2915
                                "continueIfFail: %d, "
2916
                                "will continue to insert ..\n",
2917
                                stbInfo->continueIfFail);
2918
                    } else if (smart) {
×
2919
                        warnPrint("The super table parameter "
×
2920
                                "continueIfFail: %d, will create table "
2921
                                "then insert ..\n",
2922
                                stbInfo->continueIfFail);
2923

2924
                        // generator
2925
                        if (w == 0) {
×
2926
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
2927
                                g_fail = true;
×
2928
                                goto free_of_progressive;
×
2929
                            }
2930
                        }
2931

2932
                        code = smartContinueIfFail(
×
2933
                                pThreadInfo,
2934
                                childTbl, tagData, w, ttl);
2935
                        if (0 != code) {
×
2936
                            g_fail = true;
×
2937
                            goto free_of_progressive;
×
2938
                        }
2939

2940
                        // move next
2941
                        if (++w >= TAG_BATCH_COUNT) {
×
2942
                            // reset for gen again
2943
                            w = 0;
×
2944
                            index += TAG_BATCH_COUNT;
×
2945
                        }
2946

2947
                        code = execInsert(pThreadInfo, generated, &delay3);
×
2948
                        if (code) {
×
2949
                            g_fail = true;
×
2950
                            goto free_of_progressive;
×
2951
                        }
2952
                    } else {
2953
                        warnPrint("Unknown super table parameter "
×
2954
                                "continueIfFail: %d\n",
2955
                                stbInfo->continueIfFail);
2956
                        g_fail = true;
×
2957
                        goto free_of_progressive;
×
2958
                    }
2959
                }
2960
                endTs = toolsGetTimestampUs() + 1;
8,000,232✔
2961
            }
2962

2963
            if (stbInfo->insert_interval > 0) {
7,996,288✔
2964
                debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
30!
2965
                          __func__, __LINE__, stbInfo->insert_interval);
2966
                perfPrint("sleep %" PRIu64 " ms\n",
30!
2967
                              stbInfo->insert_interval);
2968
                toolsMsleep((int32_t)stbInfo->insert_interval);
30✔
2969
            }
2970

2971
            // flush
2972
            if (database->flush) {
8,006,923✔
2973
                char sql[260] = "";
3✔
2974
                sprintf(sql, "flush database %s", database->dbName);
3✔
2975
                int32_t code = executeSql(pThreadInfo->conn->taos,sql);
3✔
2976
                if (code != 0) {
3!
2977
                  perfPrint(" %s failed. error code = 0x%x\n", sql, code);
×
2978
                } else {
2979
                   perfPrint(" %s ok.\n", sql);
3!
2980
                }
2981
            }
2982

2983
            pThreadInfo->totalInsertRows += generated;
8,006,923✔
2984

2985
            if (g_arguments->terminate) {
8,006,923✔
2986
                goto free_of_progressive;
16✔
2987
            }
2988
            int protocol = stbInfo->lineProtocol;
8,006,907✔
2989
            switch (stbInfo->iface) {
8,006,907!
2990
                case REST_IFACE:
7,960,586✔
2991
                case TAOSC_IFACE:
2992
                    memset(pThreadInfo->buffer, 0, pThreadInfo->max_sql_len);
7,960,586✔
2993
                    break;
7,960,586✔
2994
                case SML_REST_IFACE:
69✔
2995
                    memset(pThreadInfo->buffer, 0,
69✔
2996
                           g_arguments->reqPerReq *
69✔
2997
                               (pThreadInfo->max_sql_len + 1));                    
69✔
2998
                case SML_IFACE:
743✔
2999
                    if (TSDB_SML_JSON_PROTOCOL == protocol) {
743✔
3000
                        memset(pThreadInfo->lines[0], 0,
291✔
3001
                           pThreadInfo->line_buf_len);
291✔
3002
                    } else if (SML_JSON_TAOS_FORMAT == protocol) {
452✔
3003
                        if (pThreadInfo->lines && pThreadInfo->lines[0]) {
147!
3004
                            tmfree(pThreadInfo->lines[0]);
147✔
3005
                            pThreadInfo->lines[0] = NULL;
147✔
3006
                        }
3007
                        if (pThreadInfo->json_array) {
147!
3008
                            tools_cJSON_Delete(pThreadInfo->json_array);
147✔
3009
                            pThreadInfo->json_array = NULL;
147✔
3010
                        }
3011
                        pThreadInfo->json_array = tools_cJSON_CreateArray();
147✔
3012
                    } else {
3013
                        for (int j = 0; j < generated; j++) {
93,632✔
3014
                            debugPrint("pThreadInfo->lines[%d]: %s\n",
93,549!
3015
                                       j, pThreadInfo->lines[j]);
3016
                            memset(pThreadInfo->lines[j], 0,
93,327✔
3017
                                   pThreadInfo->max_sql_len);
3018
                        }
3019
                    }
3020
                    break;
521✔
3021
                case STMT_IFACE:
52,159✔
3022
                    break;
52,159✔
3023
            }
3024

3025
            int64_t delay4 = endTs - startTs;
8,006,685✔
3026
            int64_t delay = delay1 + delay2 + delay3 + delay4;
8,006,685✔
3027
            if (delay <= 0) {
8,006,685!
3028
                debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
×
3029
                        pThreadInfo->threadID, startTs, endTs);
3030
            } else {
3031
                perfPrint("insert execution time is %.6f s\n",
8,006,685!
3032
                              delay / 1E6);
3033

3034
                int64_t * pDelay = benchCalloc(1, sizeof(int64_t), false);
8,006,685✔
3035
                *pDelay = delay;
8,018,485✔
3036
                if (benchArrayPush(pThreadInfo->delayList, pDelay) == NULL) {
8,018,485!
3037
                    tmfree(pDelay);
×
3038
                }
3039
                pThreadInfo->totalDelay += delay;
8,019,530✔
3040
                pThreadInfo->totalDelay1 += delay1;
8,019,530✔
3041
                pThreadInfo->totalDelay2 += delay2;
8,019,530✔
3042
                pThreadInfo->totalDelay3 += delay3;
8,019,530✔
3043
            }
3044
            delay1 = delay2 = delay3 = 0;
8,019,530✔
3045

3046
            int64_t currentPrintTime = toolsGetTimestampMs();
8,019,530✔
3047
            if (currentPrintTime - lastPrintTime > 30 * 1000) {
8,013,982✔
3048
                infoPrint(
1,233✔
3049
                        "thread[%d] has currently inserted rows: "
3050
                        "%" PRId64 ", peroid insert rate: %.3f rows/s \n",
3051
                        pThreadInfo->threadID, pThreadInfo->totalInsertRows,
3052
                        (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
3053
                lastPrintTime = currentPrintTime;
276✔
3054
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
276✔
3055
            }
3056
            if (i >= stbInfo->insertRows) {
8,013,025✔
3057
                break;
83,624✔
3058
            }
3059
        }  // insertRows
3060
    }      // tableSeq
3061
free_of_progressive:
1,342✔
3062
    cleanupAndPrint(pThreadInfo, "progressive");
1,762✔
3063
    if(csvFile) {
1,750✔
3064
        fclose(csvFile);
1✔
3065
    }
3066
    tmfree(tagData);
1,750✔
3067
    return NULL;
1,750✔
3068
}
3069

3070
uint64_t strToTimestamp(char * tsStr) {
10,040✔
3071
    uint64_t ts = 0;
10,040✔
3072
    // remove double quota mark
3073
    if (tsStr[0] == '\"' || tsStr[0] == '\'') {
10,040!
3074
        tsStr += 1;
×
3075
        int32_t last = strlen(tsStr) - 1;
×
3076
        if (tsStr[last] == '\"' || tsStr[0] == '\'') {
×
3077
            tsStr[last] = 0;
×
3078
        }
3079
    }
3080

3081
    if (toolsParseTime(tsStr, (int64_t*)&ts, strlen(tsStr), TSDB_TIME_PRECISION_MILLI, 0)) {
10,040!
3082
        // not timestamp str format, maybe int64 format
3083
        ts = (int64_t)atol(tsStr);
10,040✔
3084
    }
3085

3086
    return ts;
10,040✔
3087
}
3088

3089
static int initStmtDataValue(SSuperTable *stbInfo, SChildTable *childTbl, uint64_t *bind_ts_array) {
44✔
3090
    int32_t columnCount = stbInfo->cols->size;
44✔
3091

3092
    char *sampleDataBuf;
3093
    if (childTbl) {
44✔
3094
        sampleDataBuf = childTbl->sampleDataBuf;
8✔
3095
    } else {
3096
        sampleDataBuf = stbInfo->sampleDataBuf;
36✔
3097
    }
3098
    int64_t lenOfOneRow = stbInfo->lenOfCols;
44✔
3099

3100
    if (stbInfo->useSampleTs) {
44✔
3101
        columnCount += 1;  // for skipping first column
5✔
3102
    }
3103
    for (int i=0; i < g_arguments->prepared_rand; i++) {
443,035✔
3104
        int cursor = 0;
442,991✔
3105

3106
        for (int c = 0; c < columnCount; c++) {
10,704,894✔
3107
            char *restStr = sampleDataBuf
10,261,903✔
3108
                + lenOfOneRow * i + cursor;
10,261,903✔
3109
            int lengthOfRest = strlen(restStr);
10,261,903✔
3110

3111
            int index = 0;
10,261,903✔
3112
            for (index = 0; index < lengthOfRest; index++) {
71,972,204✔
3113
                if (restStr[index] == ',') {
71,547,799✔
3114
                    break;
9,837,498✔
3115
                }
3116
            }
3117

3118
            cursor += index + 1;  // skip ',' too
10,261,903✔
3119

3120
            char *tmpStr = calloc(1, index + 1);
10,261,903✔
3121
            if (NULL == tmpStr) {
10,261,903!
3122
                errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
×
3123
                        __func__, __LINE__, index + 1);
3124
                return -1;
×
3125
            }
3126

3127
            strncpy(tmpStr, restStr, index);
10,261,903✔
3128
            if ((0 == c) && stbInfo->useSampleTs) {
10,261,903✔
3129
                // set ts to 
3130
                bind_ts_array[i] = strToTimestamp(tmpStr); 
10,040✔
3131
                free(tmpStr);
10,040✔
3132
                continue;
10,040✔
3133
            }
3134

3135
            Field *col = benchArrayGet(stbInfo->cols,
10,251,863✔
3136
                    (stbInfo->useSampleTs?c-1:c));
10,251,863✔
3137
            char dataType = col->type;
10,251,863✔
3138

3139
            StmtData *stmtData;
3140
            if (childTbl) {
10,251,863✔
3141
                ChildField *childCol =
3142
                    benchArrayGet(childTbl->childCols,
8,462✔
3143
                                  (stbInfo->useSampleTs?c-1:c));
8,462✔
3144
                stmtData = &childCol->stmtData;
8,462✔
3145
            } else {
3146
                stmtData = &col->stmtData;
10,243,401✔
3147
            }
3148

3149
            // set value
3150
            stmtData->is_null[i] = 0;
10,251,863✔
3151
            stmtData->lengths[i] = col->length;
10,251,863✔
3152

3153
            if (0 == strcmp(tmpStr, "NULL")) {
10,251,863✔
3154
                *(stmtData->is_null + i) = true;
10✔
3155
            } else {
3156
                switch (dataType) {
10,251,853!
3157
                    case TSDB_DATA_TYPE_INT:
391,201✔
3158
                    case TSDB_DATA_TYPE_UINT:
3159
                        *((int32_t*)stmtData->data + i) = atoi(tmpStr);
391,201✔
3160
                        break;
391,201✔
3161
                    case TSDB_DATA_TYPE_FLOAT:
387,262✔
3162
                        *((float*)stmtData->data +i) = (float)atof(tmpStr);
387,262✔
3163
                        break;
387,262✔
3164
                    case TSDB_DATA_TYPE_DOUBLE:
957,420✔
3165
                        *((double*)stmtData->data + i) = atof(tmpStr);
957,420✔
3166
                        break;
957,420✔
3167
                    case TSDB_DATA_TYPE_TINYINT:
200,440✔
3168
                    case TSDB_DATA_TYPE_UTINYINT:
3169
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
200,440✔
3170
                        break;
200,440✔
3171
                    case TSDB_DATA_TYPE_SMALLINT:
7,150,440✔
3172
                    case TSDB_DATA_TYPE_USMALLINT:
3173
                        *((int16_t*)stmtData->data + i) = (int16_t)atoi(tmpStr);
7,150,440✔
3174
                        break;
7,150,440✔
3175
                    case TSDB_DATA_TYPE_BIGINT:
120,440✔
3176
                    case TSDB_DATA_TYPE_UBIGINT:
3177
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
120,440✔
3178
                        break;
120,440✔
3179
                    case TSDB_DATA_TYPE_BOOL:
60,220✔
3180
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
60,220✔
3181
                        break;
60,220✔
3182
                    case TSDB_DATA_TYPE_TIMESTAMP:
22,010✔
3183
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
22,010✔
3184
                        break;
22,010✔
3185
                    case TSDB_DATA_TYPE_BINARY:
962,420✔
3186
                    case TSDB_DATA_TYPE_NCHAR:
3187
                    case TSDB_DATA_TYPE_VARBINARY:
3188
                    case TSDB_DATA_TYPE_GEOMETRY:
3189
                        {
3190
                            size_t tmpLen = strlen(tmpStr);
962,420✔
3191
                            debugPrint("%s() LN%d, index: %d, "
962,420!
3192
                                    "tmpStr len: %"PRIu64", col->length: %d\n",
3193
                                    __func__, __LINE__,
3194
                                    i, (uint64_t)tmpLen, col->length);
3195
                            if (tmpLen-2 > col->length) {
962,420!
3196
                                errorPrint("data length %"PRIu64" "
×
3197
                                        "is larger than column length %d\n",
3198
                                        (uint64_t)tmpLen, col->length);
3199
                            }
3200
                            if (tmpLen > 2) {
962,420✔
3201
                                strncpy((char *)stmtData->data
896,174✔
3202
                                            + i * col->length,
896,174✔
3203
                                        tmpStr+1,
896,174✔
3204
                                        min(col->length, tmpLen - 2));
896,174✔
3205
                            } else {
3206
                                strncpy((char *)stmtData->data
66,246✔
3207
                                            + i*col->length,
66,246✔
3208
                                        "", 1);
3209
                            }
3210
                        }
3211
                        break;
962,420✔
3212
                    case TSDB_DATA_TYPE_DECIMAL:
×
3213
                    case TSDB_DATA_TYPE_DECIMAL64:
3214
                        errorPrint("Not implemented data type in func initStmtDataValue: %s\n",
×
3215
                                convertDatatypeToString(dataType));
3216
                        exit(EXIT_FAILURE);
×
3217
                    default:
×
3218
                        break;
×
3219
                }
3220
            }
3221
            free(tmpStr);
10,251,863✔
3222
        }
3223
    }
3224
    return 0;
44✔
3225
}
3226

3227
static void initStmtData(char dataType, void **data, uint32_t length) {
951✔
3228
    char *tmpP = NULL;
951✔
3229

3230
    switch (dataType) {
951!
3231
        case TSDB_DATA_TYPE_INT:
42✔
3232
        case TSDB_DATA_TYPE_UINT:
3233
            tmpP = calloc(1, sizeof(int) * g_arguments->prepared_rand);
42✔
3234
            assert(tmpP);
42!
3235
            tmfree(*data);
42✔
3236
            *data = (void*)tmpP;
42✔
3237
            break;
42✔
3238

3239
        case TSDB_DATA_TYPE_TINYINT:
18✔
3240
        case TSDB_DATA_TYPE_UTINYINT:
3241
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
18✔
3242
            assert(tmpP);
18!
3243
            tmfree(*data);
18✔
3244
            *data = (void*)tmpP;
18✔
3245
            break;
18✔
3246

3247
        case TSDB_DATA_TYPE_SMALLINT:
717✔
3248
        case TSDB_DATA_TYPE_USMALLINT:
3249
            tmpP = calloc(1, sizeof(int16_t) * g_arguments->prepared_rand);
717✔
3250
            assert(tmpP);
717!
3251
            tmfree(*data);
717✔
3252
            *data = (void*)tmpP;
717✔
3253
            break;
717✔
3254

3255
        case TSDB_DATA_TYPE_BIGINT:
14✔
3256
        case TSDB_DATA_TYPE_UBIGINT:
3257
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
14✔
3258
            assert(tmpP);
14!
3259
            tmfree(*data);
14✔
3260
            *data = (void*)tmpP;
14✔
3261
            break;
14✔
3262

3263
        case TSDB_DATA_TYPE_BOOL:
7✔
3264
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
7✔
3265
            assert(tmpP);
7!
3266
            tmfree(*data);
7✔
3267
            *data = (void*)tmpP;
7✔
3268
            break;
7✔
3269

3270
        case TSDB_DATA_TYPE_FLOAT:
39✔
3271
            tmpP = calloc(1, sizeof(float) * g_arguments->prepared_rand);
39✔
3272
            assert(tmpP);
39!
3273
            tmfree(*data);
39✔
3274
            *data = (void*)tmpP;
39✔
3275
            break;
39✔
3276

3277
        case TSDB_DATA_TYPE_DOUBLE:
54✔
3278
            tmpP = calloc(1, sizeof(double) * g_arguments->prepared_rand);
54✔
3279
            assert(tmpP);
54!
3280
            tmfree(*data);
54✔
3281
            *data = (void*)tmpP;
54✔
3282
            break;
54✔
3283

3284
        case TSDB_DATA_TYPE_BINARY:
56✔
3285
        case TSDB_DATA_TYPE_NCHAR:
3286
        case TSDB_DATA_TYPE_VARBINARY:
3287
        case TSDB_DATA_TYPE_GEOMETRY:
3288
            tmpP = calloc(1, g_arguments->prepared_rand * length);
56✔
3289
            assert(tmpP);
56!
3290
            tmfree(*data);
56✔
3291
            *data = (void*)tmpP;
56✔
3292
            break;
56✔
3293

3294
        case TSDB_DATA_TYPE_TIMESTAMP:
4✔
3295
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
4✔
3296
            assert(tmpP);
4!
3297
            tmfree(*data);
4✔
3298
            *data = (void*)tmpP;
4✔
3299
            break;
4✔
3300

3301
        case TSDB_DATA_TYPE_DECIMAL:
×
3302
        case TSDB_DATA_TYPE_DECIMAL64:
3303
            errorPrint("Not implemented data type in func initStmtData: %s\n",
×
3304
                       convertDatatypeToString(dataType));
3305
            exit(EXIT_FAILURE);
×
3306

3307
        default:
×
3308
            errorPrint("Unknown data type on initStmtData: %s\n",
×
3309
                       convertDatatypeToString(dataType));
3310
            exit(EXIT_FAILURE);
×
3311
    }
3312
}
951✔
3313

3314
static int parseBufferToStmtBatchChildTbl(SSuperTable *stbInfo,
8✔
3315
                                          SChildTable* childTbl, uint64_t *bind_ts_array) {
3316
    int32_t columnCount = stbInfo->cols->size;
8✔
3317

3318
    for (int c = 0; c < columnCount; c++) {
28✔
3319
        Field *col = benchArrayGet(stbInfo->cols, c);
20✔
3320
        ChildField *childCol = benchArrayGet(childTbl->childCols, c);
20✔
3321
        char dataType = col->type;
20✔
3322

3323
        // malloc memory
3324
        tmfree(childCol->stmtData.is_null);
20✔
3325
        tmfree(childCol->stmtData.lengths);
20✔
3326
        childCol->stmtData.is_null = benchCalloc(sizeof(char),     g_arguments->prepared_rand, true);
20✔
3327
        childCol->stmtData.lengths = benchCalloc(sizeof(int32_t),  g_arguments->prepared_rand, true);
20✔
3328

3329
        initStmtData(dataType, &(childCol->stmtData.data), col->length);
20✔
3330
    }
3331

3332
    return initStmtDataValue(stbInfo, childTbl, bind_ts_array);
8✔
3333
}
3334

3335
static int parseBufferToStmtBatch(SSuperTable* stbInfo, uint64_t *bind_ts_array) {
36✔
3336
    int32_t columnCount = stbInfo->cols->size;
36✔
3337

3338
    for (int c = 0; c < columnCount; c++) {
967✔
3339
        Field *col = benchArrayGet(stbInfo->cols, c);
931✔
3340

3341
        //remalloc element count is g_arguments->prepared_rand buffer
3342
        tmfree(col->stmtData.is_null);
931✔
3343
        col->stmtData.is_null = benchCalloc(sizeof(char), g_arguments->prepared_rand, false);
931✔
3344
        tmfree(col->stmtData.lengths);
931✔
3345
        col->stmtData.lengths = benchCalloc(sizeof(int32_t), g_arguments->prepared_rand, false);
931✔
3346

3347
        initStmtData(col->type, &(col->stmtData.data), col->length);
931✔
3348
    }
3349

3350
    return initStmtDataValue(stbInfo, NULL, bind_ts_array);
36✔
3351
}
3352

3353
static int64_t fillChildTblNameByCount(SSuperTable *stbInfo) {
346✔
3354
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
155,043✔
3355
        char childName[TSDB_TABLE_NAME_LEN]={0};
154,697✔
3356
        snprintf(childName,
154,697✔
3357
                 TSDB_TABLE_NAME_LEN,
3358
                 "%s%" PRIu64,
3359
                 stbInfo->childTblPrefix, i);
3360
        stbInfo->childTblArray[i]->name = strdup(childName);
154,697✔
3361
        debugPrint("%s(): %s\n", __func__,
154,697✔
3362
                  stbInfo->childTblArray[i]->name);
3363
    }
3364

3365
    return stbInfo->childTblCount;
346✔
3366
}
3367

3368
static int64_t fillChildTblNameByFromTo(SDataBase *database,
3✔
3369
        SSuperTable* stbInfo) {
3370
    for (int64_t i = stbInfo->childTblFrom; i <= stbInfo->childTblTo; i++) {
13✔
3371
        char childName[TSDB_TABLE_NAME_LEN]={0};
10✔
3372
        snprintf(childName,
10✔
3373
                TSDB_TABLE_NAME_LEN,
3374
                "%s%" PRIu64,
3375
                stbInfo->childTblPrefix, i);
3376
        stbInfo->childTblArray[i]->name = strdup(childName);
10✔
3377
    }
3378

3379
    return (stbInfo->childTblTo-stbInfo->childTblFrom);
3✔
3380
}
3381

3382
static int64_t fillChildTblNameByLimitOffset(SDataBase *database,
4✔
3383
        SSuperTable* stbInfo) {
3384
    SBenchConn* conn = initBenchConn();
4✔
3385
    if (NULL == conn) {
4!
3386
        return -1;
×
3387
    }
3388
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
4✔
3389
    if (g_arguments->taosc_version == 3) {
4!
3390
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
4✔
3391
                 "SELECT DISTINCT(TBNAME) FROM %s.`%s` LIMIT %" PRId64
3392
                 " OFFSET %" PRIu64,
3393
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3394
                 stbInfo->childTblOffset);
3395
    } else {
3396
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
3397
                 "SELECT TBNAME FROM %s.`%s` LIMIT %" PRId64
3398
                 " OFFSET %" PRIu64,
3399
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3400
                 stbInfo->childTblOffset);
3401
    }
3402
    debugPrint("cmd: %s\n", cmd);
4!
3403
    TAOS_RES *res = taos_query(conn->taos, cmd);
4✔
3404
    int32_t   code = taos_errno(res);
4✔
3405
    int64_t   count = 0;
4✔
3406
    if (code) {
4✔
3407
        printErrCmdCodeStr(cmd, code, res);
3✔
3408
        closeBenchConn(conn);
3✔
3409
        return -1;
3✔
3410
    }
3411
    TAOS_ROW row = NULL;
1✔
3412
    while ((row = taos_fetch_row(res)) != NULL) {
3✔
3413
        int *lengths = taos_fetch_lengths(res);
2✔
3414
        char * childName = benchCalloc(1, lengths[0] + 1, true);
2✔
3415
        strncpy(childName, row[0], lengths[0]);
2✔
3416
        childName[lengths[0]] = '\0';
2✔
3417
        stbInfo->childTblArray[count]->name = childName;
2✔
3418
        debugPrint("stbInfo->childTblArray[%" PRId64 "]->name: %s\n",
2!
3419
                   count, stbInfo->childTblArray[count]->name);
3420
        count++;
2✔
3421
    }
3422
    taos_free_result(res);
1✔
3423
    closeBenchConn(conn);
1✔
3424
    return count;
1✔
3425
}
3426

3427
static void preProcessArgument(SSuperTable *stbInfo) {
334✔
3428
    if (stbInfo->interlaceRows > g_arguments->reqPerReq) {
334✔
3429
        infoPrint(
2✔
3430
            "interlaceRows(%d) is larger than record per request(%u), which "
3431
            "will be set to %u\n",
3432
            stbInfo->interlaceRows, g_arguments->reqPerReq,
3433
            g_arguments->reqPerReq);
3434
        stbInfo->interlaceRows = g_arguments->reqPerReq;
2✔
3435
    }
3436

3437
    if (stbInfo->interlaceRows > stbInfo->insertRows) {
334✔
3438
        infoPrint(
1✔
3439
                "interlaceRows larger than insertRows %d > %" PRId64 "\n",
3440
                stbInfo->interlaceRows, stbInfo->insertRows);
3441
        infoPrint("%s", "interlaceRows will be set to 0\n");
1✔
3442
        stbInfo->interlaceRows = 0;
1✔
3443
    }
3444

3445
    if (stbInfo->interlaceRows == 0
334✔
3446
            && g_arguments->reqPerReq > stbInfo->insertRows) {
300✔
3447
        infoPrint("record per request (%u) is larger than "
129✔
3448
                "insert rows (%"PRIu64")"
3449
                " in progressive mode, which will be set to %"PRIu64"\n",
3450
                g_arguments->reqPerReq, stbInfo->insertRows,
3451
                stbInfo->insertRows);
3452
        g_arguments->reqPerReq = stbInfo->insertRows;
129✔
3453
    }
3454

3455
    if (stbInfo->interlaceRows > 0 && stbInfo->iface == STMT_IFACE
334✔
3456
            && stbInfo->autoTblCreating) {
7!
3457
        errorPrint("%s","stmt not support autocreate table with interlace row , quit programe!\n");
×
3458
        exit(-1);
×
3459
    }
3460
}
334✔
3461

3462
static int printTotalDelay(SDataBase *database,
324✔
3463
                           int64_t totalDelay,
3464
                           int64_t totalDelay1,
3465
                           int64_t totalDelay2,
3466
                           int64_t totalDelay3,
3467
                           BArray *total_delay_list,
3468
                            int threads,
3469
                            int64_t totalInsertRows,
3470
                            int64_t spend) {
3471
    // zero check
3472
    if (total_delay_list->size == 0 || spend == 0 || threads == 0) {
324!
3473
        return -1;
11✔
3474
    }
3475

3476
    char subDelay[128] = "";
313✔
3477
    if(totalDelay1 + totalDelay2 + totalDelay3 > 0) {
313✔
3478
        sprintf(subDelay, " stmt delay1=%.2fs delay2=%.2fs delay3=%.2fs",
36✔
3479
                totalDelay1/threads/1E6,
36✔
3480
                totalDelay2/threads/1E6,
36✔
3481
                totalDelay3/threads/1E6);
36✔
3482
    }
3483

3484
    double time_cost = spend / 1E6;
313✔
3485
    double real_time_cost = totalDelay/threads/1E6;
313✔
3486
    double records_per_second = (double)(totalInsertRows / (spend/1E6));
313✔
3487
    double real_records_per_second = (double)(totalInsertRows / (totalDelay/threads/1E6));
313✔
3488

3489
    succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
313!
3490
              " with %d thread(s) into %s %.2f (real %.2f) records/second%s\n",
3491
              time_cost, real_time_cost, totalInsertRows, threads,
3492
              database->dbName, records_per_second,
3493
              real_records_per_second, subDelay);
3494

3495
    if (!total_delay_list->size) {
313!
3496
        return -1;
×
3497
    }
3498
    
3499
    double minDelay = *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3;
313✔
3500
    double avgDelay = (double)totalDelay/total_delay_list->size/1E3;
313✔
3501
    double p90 = *(int64_t *)(benchArrayGet(total_delay_list,
626✔
3502
                                         (int32_t)(total_delay_list->size
313✔
3503
                                         * 0.9)))/1E3;
313✔
3504
    double p95 = *(int64_t *)(benchArrayGet(total_delay_list,
626✔
3505
                                         (int32_t)(total_delay_list->size
313✔
3506
                                         * 0.95)))/1E3;
313✔
3507
    double p99 = *(int64_t *)(benchArrayGet(total_delay_list,
626✔
3508
                                         (int32_t)(total_delay_list->size
313✔
3509
                                         * 0.99)))/1E3;
313✔
3510
    double maxDelay = *(int64_t *)(benchArrayGet(total_delay_list,
626✔
3511
                                         (int32_t)(total_delay_list->size
313✔
3512
                                         - 1)))/1E3;                                     
313✔
3513

3514
    succPrint("insert delay, "
313!
3515
              "min: %.4fms, "
3516
              "avg: %.4fms, "
3517
              "p90: %.4fms, "
3518
              "p95: %.4fms, "
3519
              "p99: %.4fms, "
3520
              "max: %.4fms\n",
3521
            minDelay, avgDelay, p90, p95, p99, maxDelay);
3522
    
3523
    if (g_arguments->output_json_file) {
313✔
3524
        tools_cJSON *root = tools_cJSON_CreateObject();
1✔
3525
        if (root) {
1!
3526
            tools_cJSON_AddStringToObject(root, "db_name", database->dbName);
1✔
3527
            tools_cJSON_AddNumberToObject(root, "inserted_rows", totalInsertRows);
1✔
3528
            tools_cJSON_AddNumberToObject(root, "threads", threads);
1✔
3529
            tools_cJSON_AddNumberToObject(root, "time_cost", time_cost);
1✔
3530
            tools_cJSON_AddNumberToObject(root, "real_time_cost", real_time_cost);
1✔
3531
            tools_cJSON_AddNumberToObject(root, "records_per_second",  records_per_second);
1✔
3532
            tools_cJSON_AddNumberToObject(root, "real_records_per_second", real_records_per_second);
1✔
3533
            
3534
            tools_cJSON_AddNumberToObject(root, "avg", avgDelay);
1✔
3535
            tools_cJSON_AddNumberToObject(root, "min", minDelay);
1✔
3536
            tools_cJSON_AddNumberToObject(root, "max", maxDelay);
1✔
3537
            tools_cJSON_AddNumberToObject(root, "p90", p90);
1✔
3538
            tools_cJSON_AddNumberToObject(root, "p95", p95);
1✔
3539
            tools_cJSON_AddNumberToObject(root, "p99", p99);
1✔
3540
            
3541
            char *jsonStr = tools_cJSON_PrintUnformatted(root);
1✔
3542
            if (jsonStr) {
1!
3543
                FILE *fp = fopen(g_arguments->output_json_file, "w");
1✔
3544
                if (fp) {
1!
3545
                    fprintf(fp, "%s\n", jsonStr);
1✔
3546
                    fclose(fp);
1✔
3547
                } else {
3548
                    errorPrint("Failed to open output JSON file, file name %s\n",
×
3549
                            g_arguments->output_json_file);
3550
                }
3551
                free(jsonStr);
1✔
3552
            }
3553
            tools_cJSON_Delete(root);
1✔
3554
        }
3555
    }        
3556
    return 0;
313✔
3557
}
3558

3559
static int64_t fillChildTblNameImp(SDataBase *database, SSuperTable *stbInfo) {
51✔
3560
    int64_t ntables;
3561
    if (stbInfo->childTblLimit) {
51✔
3562
        ntables = fillChildTblNameByLimitOffset(database, stbInfo);
4✔
3563
    } else if (stbInfo->childTblFrom || stbInfo->childTblTo) {
47✔
3564
        ntables = fillChildTblNameByFromTo(database, stbInfo);
3✔
3565
    } else {
3566
        ntables = fillChildTblNameByCount(stbInfo);
44✔
3567
    }
3568
    return ntables;
51✔
3569
}
3570

3571
static int64_t fillChildTblName(SDataBase *database, SSuperTable *stbInfo) {
357✔
3572
    int64_t ntables = stbInfo->childTblCount;
357✔
3573
    stbInfo->childTblArray = benchCalloc(stbInfo->childTblCount,
357✔
3574
            sizeof(SChildTable*), true);
3575
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
155,142✔
3576
        stbInfo->childTblArray[child] =
154,785✔
3577
            benchCalloc(1, sizeof(SChildTable), true);
154,785✔
3578
    }
3579

3580
    if (stbInfo->childTblCount == 1 && stbInfo->tags->size == 0) {
357✔
3581
        // Normal table
3582
        char childName[TSDB_TABLE_NAME_LEN]={0};
4✔
3583
        snprintf(childName, TSDB_TABLE_NAME_LEN,
4✔
3584
                    "%s", stbInfo->stbName);
3585
        stbInfo->childTblArray[0]->name = strdup(childName);
4✔
3586
    } else if ((stbInfo->iface != SML_IFACE
353✔
3587
        && stbInfo->iface != SML_REST_IFACE)
298✔
3588
            && stbInfo->childTblExists) {
284✔
3589
        ntables = fillChildTblNameImp(database, stbInfo);
51✔
3590
    } else {
3591
        ntables = fillChildTblNameByCount(stbInfo);
302✔
3592
    }
3593

3594
    return ntables;
357✔
3595
}
3596

3597
// last ts fill to filllBackTime
3598
static bool fillSTableLastTs(SDataBase *database, SSuperTable *stbInfo) {
×
3599
    SBenchConn* conn = initBenchConn();
×
3600
    if (NULL == conn) {
×
3601
        return false;
×
3602
    }
3603
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
3604
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select last(ts) from %s.`%s`", database->dbName, stbInfo->stbName);
×
3605

3606
    infoPrint("fillBackTime: %s\n", cmd);
×
3607
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
3608
    int32_t   code = taos_errno(res);
×
3609
    if (code) {
×
3610
        printErrCmdCodeStr(cmd, code, res);
×
3611
        closeBenchConn(conn);
×
3612
        return false;
×
3613
    }
3614

3615
    TAOS_ROW row = taos_fetch_row(res);
×
3616
    if(row == NULL) {
×
3617
        taos_free_result(res);
×
3618
        closeBenchConn(conn);
×
3619
        return false;
×
3620
    }
3621
    
3622
    char lastTs[128];
3623
    memset(lastTs, 0, sizeof(lastTs));
×
3624

3625
    stbInfo->startFillbackTime = *(int64_t*)row[0];
×
3626
    toolsFormatTimestamp(lastTs, stbInfo->startFillbackTime, database->precision);
×
3627
    infoPrint("fillBackTime: get ok %s.%s last ts=%s \n", database->dbName, stbInfo->stbName, lastTs);
×
3628
    
3629
    taos_free_result(res);
×
3630
    closeBenchConn(conn);
×
3631

3632
    return true;
×
3633
}
3634

3635
// calcNow expression fill to timestamp_start
3636
static bool calcExprFromServer(SDataBase *database, SSuperTable *stbInfo) {
6✔
3637
    SBenchConn* conn = initBenchConn();
6✔
3638
    if (NULL == conn) {
6!
3639
        return false;
×
3640
    }
3641
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
6✔
3642
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select %s", stbInfo->calcNow);
6✔
3643

3644
    infoPrint("calcExprFromServer: %s\n", cmd);
6✔
3645
    TAOS_RES *res = taos_query(conn->taos, cmd);
6✔
3646
    int32_t   code = taos_errno(res);
6✔
3647
    if (code) {
6!
3648
        printErrCmdCodeStr(cmd, code, res);
×
3649
        closeBenchConn(conn);
×
3650
        return false;
×
3651
    }
3652

3653
    TAOS_ROW row = taos_fetch_row(res);
6✔
3654
    if(row == NULL) {
6!
3655
        taos_free_result(res);
×
3656
        closeBenchConn(conn);
×
3657
        return false;
×
3658
    }
3659
    
3660
    char ts[128];
3661
    memset(ts, 0, sizeof(ts));
6✔
3662

3663
    stbInfo->startTimestamp = *(int64_t*)row[0];
6✔
3664
    toolsFormatTimestamp(ts, stbInfo->startTimestamp, database->precision);
6✔
3665
    infoPrint("calcExprFromServer: get ok.  %s = %s \n", stbInfo->calcNow, ts);
6✔
3666
    
3667
    taos_free_result(res);
6✔
3668
    closeBenchConn(conn);
6✔
3669

3670
    return true;
6✔
3671
}
3672

3673
int64_t obtainTableCount(SDataBase* database, SSuperTable* stbInfo) {
334✔
3674
    // ntable calc
3675
    int64_t ntables;
3676
    if (stbInfo->childTblTo > 0) {
334✔
3677
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
4✔
3678
    } else if (stbInfo->childTblLimit > 0 && stbInfo->childTblExists) {
330✔
3679
        ntables = stbInfo->childTblLimit;
2✔
3680
    } else {
3681
        ntables = stbInfo->childTblCount;
328✔
3682
    }
3683

3684
    return ntables;
334✔
3685
}
3686

3687
// assign table to thread with vgroups, return assign thread count
3688
int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
4✔
3689
    int32_t threads = 0;
4✔
3690

3691
    // calc table count per vgroup
3692
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
151✔
3693
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
147✔
3694
        if (vgIdx == -1) {
147!
3695
            continue;
×
3696
        }
3697
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
147✔
3698
        vg->tbCountPerVgId ++;
147✔
3699
    }
3700

3701
    // malloc vg->childTblArray memory with table count
3702
    for (int v = 0; v < database->vgroups; v++) {
18✔
3703
        SVGroup *vg = benchArrayGet(database->vgArray, v);
14✔
3704
        infoPrint("Local hash calc %"PRId64" tables on %s's vgroup %d (id: %d)\n",
14✔
3705
                    vg->tbCountPerVgId, database->dbName, v, vg->vgId);
3706
        if (vg->tbCountPerVgId) {
14!
3707
            threads++;
14✔
3708
        } else {
3709
            continue;
×
3710
        }
3711
        vg->childTblArray = benchCalloc(vg->tbCountPerVgId, sizeof(SChildTable*), true);
14✔
3712
        vg->tbOffset      = 0;
14✔
3713
    }
3714
    
3715
    // set vg->childTblArray data
3716
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
151✔
3717
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
147✔
3718
        if (vgIdx == -1) {
147!
3719
            continue;
×
3720
        }
3721
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
147✔
3722
        debugPrint("calc table hash to vgroup %s.%s vgIdx=%d\n",
147!
3723
                    database->dbName,
3724
                    stbInfo->childTblArray[i]->name, vgIdx);
3725
        vg->childTblArray[vg->tbOffset] = stbInfo->childTblArray[i];
147✔
3726
        vg->tbOffset++;
147✔
3727
    }
3728
    return threads;
4✔
3729
}
3730

3731
// init stmt
3732
TAOS_STMT* initStmt(TAOS* taos, bool single) {
111✔
3733
    if (!single) {
111!
3734
        infoPrint("initStmt call taos_stmt_init single=%d\n", single);
×
3735
        return taos_stmt_init(taos);
×
3736
    }
3737

3738
    TAOS_STMT_OPTIONS op;
3739
    memset(&op, 0, sizeof(op));
111✔
3740
    op.singleStbInsert      = single;
111✔
3741
    op.singleTableBindOnce  = single;
111✔
3742
    infoPrint("initStmt call taos_stmt_init_with_options single=%d\n", single);
111✔
3743
    return taos_stmt_init_with_options(taos, &op);
111✔
3744
}
3745

3746
// init stmt2
3747
TAOS_STMT2* initStmt2(TAOS* taos, bool single) {
23✔
3748
    TAOS_STMT2_OPTION op2;
3749
    memset(&op2, 0, sizeof(op2));
23✔
3750
    op2.singleStbInsert      = single;
23✔
3751
    op2.singleTableBindOnce  = single;
23✔
3752
    
3753
    TAOS_STMT2* stmt2 = taos_stmt2_init(taos, &op2);
23✔
3754
    if (stmt2) 
23!
3755
        succPrint("succ  taos_stmt2_init single=%d\n", single);
23!
3756
    else
3757
        errorPrint("failed taos_stmt2_init single=%d\n", single);
×
3758
    return stmt2;
23✔
3759
}
3760

3761
// init insert thread
3762
void initTsArray(uint64_t *bind_ts_array, SSuperTable* stbInfo) {
36✔
3763
    parseBufferToStmtBatch(stbInfo, bind_ts_array);
36✔
3764
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
51,411✔
3765
        SChildTable *childTbl = stbInfo->childTblArray[child];
51,375✔
3766
        if (childTbl->useOwnSample) {
51,375✔
3767
            parseBufferToStmtBatchChildTbl(stbInfo, childTbl, bind_ts_array);
8✔
3768
        }
3769
    }
3770
    
3771
}
36✔
3772

3773
void *genInsertTheadInfo(void* arg) {
2,973✔
3774

3775
    if (g_arguments->terminate || g_fail) {
2,973!
3776
        return NULL;
×
3777
    }
3778

3779
    threadInfo* pThreadInfo = (threadInfo*)arg;
2,974✔
3780
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
2,974✔
3781
    pThreadInfo->delayList = benchArrayInit(1, sizeof(int64_t));
2,974✔
3782
    switch (stbInfo->iface) {
2,974!
3783
        // rest
3784
        case REST_IFACE: {
4✔
3785
            if (stbInfo->interlaceRows > 0) {
4!
3786
                pThreadInfo->buffer = new_ds(0);
×
3787
            } else {
3788
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
4✔
3789
            }
3790
            int sockfd = createSockFd();
4✔
3791
            if (sockfd < 0) {
4!
3792
                g_fail = true;
×
3793
                goto END;
×
3794
            }
3795
            pThreadInfo->sockfd = sockfd;
4✔
3796
            break;
4✔
3797
        }            
3798
        // stmt & stmt2 init
3799
        case STMT_IFACE: 
134✔
3800
        case STMT2_IFACE: {
3801
            pThreadInfo->conn = initBenchConn();
134✔
3802
            if (NULL == pThreadInfo->conn) {
134!
3803
                goto END;
×
3804
            }
3805
            // single always true for benchmark
3806
            bool single = true;
134✔
3807
            if (stbInfo->iface == STMT2_IFACE) {
134✔
3808
                // stmt2 init
3809
                if (pThreadInfo->conn->stmt2)
23!
3810
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
×
3811
                pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
23✔
3812
                if (NULL == pThreadInfo->conn->stmt2) {
23!
3813
                    errorPrint("taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
3814
                    g_fail = true;
×
3815
                    goto END;                    
×
3816
                }
3817
            } else {
3818
                // stmt init
3819
                if (pThreadInfo->conn->stmt)
111!
3820
                    taos_stmt_close(pThreadInfo->conn->stmt);
×
3821
                pThreadInfo->conn->stmt = initStmt(pThreadInfo->conn->taos, single);
111✔
3822
                if (NULL == pThreadInfo->conn->stmt) {
111!
3823
                    errorPrint("taos_stmt_init() failed, reason: %s\n", taos_errstr(NULL));
×
3824
                    g_fail = true;
×
3825
                    goto END;                    
×
3826
                }
3827
            }
3828

3829
            // select db
3830
            if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
134!
3831
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3832
                g_fail = true;
×
3833
                goto END;
×
3834
            }
3835

3836
            // malloc bind
3837
            int32_t unit = stbInfo->iface == STMT2_IFACE ? sizeof(TAOS_STMT2_BIND) : sizeof(TAOS_MULTI_BIND);
134✔
3838
            pThreadInfo->bind_ts       = benchCalloc(1, sizeof(int64_t), true);
134✔
3839
            
3840
            pThreadInfo->bindParams    = benchCalloc(1, unit * (stbInfo->cols->size + 1), true);
134✔
3841
            // have ts columns, so size + 1
3842
            pThreadInfo->lengths       = benchCalloc(stbInfo->cols->size + 1, sizeof(int32_t*), true);
133✔
3843
            for(int32_t c = 0; c <= stbInfo->cols->size; c++) {
3,608✔
3844
                pThreadInfo->lengths[c] = benchCalloc(g_arguments->reqPerReq, sizeof(int32_t), true);
3,474✔
3845
            }
3846
            // tags data
3847
            pThreadInfo->tagsStmt = copyBArray(stbInfo->tags);
134✔
3848
            for(int32_t n = 0; n < pThreadInfo->tagsStmt->size; n ++ ) {
668✔
3849
                Field *field = benchArrayGet(pThreadInfo->tagsStmt, n);
534✔
3850
                memset(&field->stmtData, 0, sizeof(StmtData));
534✔
3851
            }
3852
        
3853
            break;
134✔
3854
        }
3855
        // sml rest
3856
        case SML_REST_IFACE: {
37✔
3857
            int sockfd = createSockFd();
37✔
3858
            if (sockfd < 0) {
37!
3859
                g_fail = true;
×
3860
                goto END;
×
3861
            }
3862
            pThreadInfo->sockfd = sockfd;
37✔
3863
        }            
3864
        // sml
3865
        case SML_IFACE: {
233✔
3866
            if (stbInfo->iface == SML_IFACE) {
233✔
3867
                pThreadInfo->conn = initBenchConn();
196✔
3868
                if (pThreadInfo->conn == NULL) {
195!
3869
                    errorPrint("%s() init connection failed\n", __func__);
×
3870
                    g_fail = true;
×
3871
                    goto END;
×
3872
                }
3873
                if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
195!
3874
                    errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3875
                    g_fail = true;
×
3876
                    goto END;
×
3877
                }
3878
            }
3879
            pThreadInfo->max_sql_len = stbInfo->lenOfCols + stbInfo->lenOfTags;
233✔
3880
            if (stbInfo->iface == SML_REST_IFACE) {
233✔
3881
                pThreadInfo->buffer = benchCalloc(1, g_arguments->reqPerReq * (1 + pThreadInfo->max_sql_len), true);
37✔
3882
            }                
3883
            int protocol = stbInfo->lineProtocol;
233✔
3884
            if (TSDB_SML_JSON_PROTOCOL != protocol && SML_JSON_TAOS_FORMAT != protocol) {
981✔
3885
                pThreadInfo->sml_tags = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
107✔
3886
                for (int t = 0; t < pThreadInfo->ntables; t++) {
308✔
3887
                    pThreadInfo->sml_tags[t] = benchCalloc(1, stbInfo->lenOfTags, true);
201✔
3888
                }
3889
                int64_t index = pThreadInfo->start_table_from;
107✔
3890
                for (int t = 0; t < pThreadInfo->ntables; t++) {
308✔
3891
                    if (generateRandData(
201!
3892
                                stbInfo, pThreadInfo->sml_tags[t],
201✔
3893
                                stbInfo->lenOfTags,
201✔
3894
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
201✔
3895
                                stbInfo->tags, 1, true, NULL, index++)) {
3896
                        g_fail = true;            
×
3897
                        goto END;
×
3898
                    }
3899
                    debugPrint("pThreadInfo->sml_tags[%d]: %s\n", t,
201!
3900
                               pThreadInfo->sml_tags[t]);
3901
                }
3902
                pThreadInfo->lines = benchCalloc(g_arguments->reqPerReq, sizeof(char *), true);
107✔
3903
                for (int j = 0; (j < g_arguments->reqPerReq && !g_arguments->terminate); j++) {
77,201!
3904
                    pThreadInfo->lines[j] = benchCalloc(1, pThreadInfo->max_sql_len, true);
76,453✔
3905
                }
3906
            } else {
3907
                pThreadInfo->json_array          = tools_cJSON_CreateArray();
126✔
3908
                pThreadInfo->sml_json_tags       = tools_cJSON_CreateArray();
126✔
3909
                pThreadInfo->sml_tags_json_array = (char **)benchCalloc( pThreadInfo->ntables, sizeof(char *), true);
126✔
3910
                for (int t = 0; t < pThreadInfo->ntables; t++) {
372✔
3911
                    if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
246✔
3912
                        generateSmlJsonTags(
171✔
3913
                            pThreadInfo->sml_json_tags,
3914
                                pThreadInfo->sml_tags_json_array,
3915
                                stbInfo,
3916
                            pThreadInfo->start_table_from, t);
3917
                    } else {
3918
                        generateSmlTaosJsonTags(
75✔
3919
                            pThreadInfo->sml_json_tags, stbInfo,
3920
                            pThreadInfo->start_table_from, t);
3921
                    }
3922
                }
3923
                pThreadInfo->lines = (char **)benchCalloc(1, sizeof(char *), true);
126✔
3924
                if (0 == stbInfo->interlaceRows && TSDB_SML_JSON_PROTOCOL == protocol) {
126✔
3925
                    pThreadInfo->line_buf_len = g_arguments->reqPerReq * accumulateRowLen(pThreadInfo->stbInfo->tags, pThreadInfo->stbInfo->iface);
75✔
3926
                    debugPrint("%s() LN%d, line_buf_len=%d\n", __func__, __LINE__, pThreadInfo->line_buf_len);
75!
3927
                    pThreadInfo->lines[0]             = benchCalloc(1, pThreadInfo->line_buf_len, true);
75✔
3928
                    pThreadInfo->sml_json_value_array = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
75✔
UNCOV
3929
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
×
3930
                        generateSmlJsonValues(pThreadInfo->sml_json_value_array, stbInfo, t);
147✔
3931
                    }
3932
                }
3933
            }
3934
            break;
233✔
3935
        }
3936
        // taos
3937
        case TAOSC_IFACE: {
2,603✔
3938
            pThreadInfo->conn = initBenchConn();
2,603✔
3939
            if (pThreadInfo->conn == NULL) {
2,597!
3940
                errorPrint("%s() failed to connect\n", __func__);
×
3941
                g_fail = true;
×
3942
                goto END;
×
3943
            }
3944
            char* command = benchCalloc(1, SHORT_1K_SQL_BUFF_LEN, false);
2,597✔
3945
            snprintf(command, SHORT_1K_SQL_BUFF_LEN,
2,599✔
3946
                    g_arguments->escape_character ? "USE `%s`" : "USE %s",
2,599✔
3947
                    pThreadInfo->dbInfo->dbName);
2,599✔
3948
            if (queryDbExecCall(pThreadInfo->conn, command)) {
2,599✔
3949
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
400!
3950
                g_fail = true;
400✔
3951
                goto END;
400✔
3952
            }
3953
            tmfree(command);
2,193✔
3954
            command = NULL;
2,192✔
3955

3956
            if (stbInfo->interlaceRows > 0) {
2,192✔
3957
                pThreadInfo->buffer = new_ds(0);
63✔
3958
            } else {
3959
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
2,129✔
3960
                if (g_arguments->check_sql) {
2,140✔
3961
                    pThreadInfo->csql = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
4✔
3962
                    memset(pThreadInfo->csql, 0, TSDB_MAX_ALLOWED_SQL_LEN);
4✔
3963
                }
3964
            }
3965

3966
            break;
2,203✔
3967
        }
3968
        default:
×
3969
            break;
×
3970
    }
3971

3972
END:
2,974✔
3973
    return NULL;
2,974✔
3974
}
3975
    
3976

3977
// init insert thread
3978
int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, int64_t div, int64_t mod) {
334✔
3979
    int32_t  ret     = -1;
334✔
3980
    uint64_t tbNext  = stbInfo->childTblFrom;
334✔
3981
    int32_t  vgNext  = 0;
334✔
3982
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
334✔
3983
    int threadCnt = 0;
334✔
3984
    uint64_t * bind_ts_array = NULL;
334✔
3985
    if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
334✔
3986
        bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
36✔
3987
        initTsArray(bind_ts_array, stbInfo);
36✔
3988
    }
3989
    
3990

3991
    for (int32_t i = 0; i < nthreads && !g_arguments->terminate; i++) {
3,309!
3992
        // set table
3993
        threadInfo *pThreadInfo = infos + i;
2,975✔
3994
        pThreadInfo->threadID   = i;
2,975✔
3995
        pThreadInfo->dbInfo     = database;
2,975✔
3996
        pThreadInfo->stbInfo    = stbInfo;
2,975✔
3997
        pThreadInfo->start_time = stbInfo->startTimestamp;
2,975✔
3998
        pThreadInfo->pos        = 0;
2,975✔
3999
        pThreadInfo->samplePos  = 0;
2,975✔
4000
        pThreadInfo->totalInsertRows = 0;
2,975✔
4001
        if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
2,975✔
4002
            pThreadInfo->bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
134✔
4003
            memcpy(pThreadInfo->bind_ts_array, bind_ts_array, sizeof(int64_t)*g_arguments->prepared_rand);
134✔
4004
            
4005
        }
4006
        if (g_arguments->bind_vgroup) {
2,975✔
4007
            for (int32_t j = vgNext; j < database->vgroups; j++) {
14!
4008
                SVGroup *vg = benchArrayGet(database->vgArray, j);
14✔
4009
                if (0 == vg->tbCountPerVgId) {
14!
4010
                    continue;
×
4011
                }
4012
                pThreadInfo->vg               = vg;
14✔
4013
                pThreadInfo->ntables          = vg->tbCountPerVgId;
14✔
4014
                pThreadInfo->start_table_from = 0;
14✔
4015
                pThreadInfo->end_table_to     = vg->tbCountPerVgId - 1;
14✔
4016
                vgNext                        = j + 1;
14✔
4017
                break;
14✔
4018
            }            
4019
        } else {
4020
            pThreadInfo->start_table_from = tbNext;
2,961✔
4021
            pThreadInfo->ntables          = i < mod ? div + 1 : div;
2,961✔
4022
            pThreadInfo->end_table_to     = i < mod ? tbNext + div : tbNext + div - 1;
2,961✔
4023
            tbNext                        = pThreadInfo->end_table_to + 1;
2,961✔
4024
        }
4025

4026
        // init conn
4027
        pthread_create(pids + i, NULL, genInsertTheadInfo,   pThreadInfo);
2,975✔
4028
        threadCnt ++;
2,975✔
4029
    }
4030
    
4031
    // wait threads
4032
    for (int i = 0; i < threadCnt; i++) {
3,309✔
4033
        infoPrint("init pthread_join %d ...\n", i);
2,975✔
4034
        pthread_join(pids[i], NULL);
2,975✔
4035
    }
4036

4037
    if (bind_ts_array) {
334✔
4038
        tmfree(bind_ts_array);
36✔
4039
    }
4040
    
4041
    tmfree(pids);
334✔
4042
    if (g_fail) {
334✔
4043
       return -1;
10✔
4044
    }
4045
    return 0;
324✔
4046
}
4047

4048
#ifdef LINUX
4049
#define EMPTY_SLOT -1
4050
// run with limit thread
4051
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids) {
×
4052
    infoPrint("run with bind vgroups limit thread. limit threads=%d nthread=%d\n", limitThread, nthreads);
×
4053
    
4054
    // slots save threadInfo array index
4055
    int32_t* slot = benchCalloc(limitThread, sizeof(int32_t), false); 
×
4056
    int32_t  t    = 0; // thread index
×
4057
    for (int32_t i = 0; i < limitThread; i++) {
×
4058
        slot[i] = EMPTY_SLOT;
×
4059
    }
4060

4061
    while (!g_arguments->terminate) {
×
4062
        int32_t emptySlot = 0;
×
4063
        for (int32_t i = 0; i < limitThread; i++) {
×
4064
            int32_t idx = slot[i];
×
4065
            // check slot thread end
4066
            if(idx != EMPTY_SLOT) {
×
4067
                if (pthread_tryjoin_np(pids[idx], NULL) == EBUSY ) {
×
4068
                    // thread is running
4069
                    toolsMsleep(2000);
×
4070
                } else {
4071
                    // thread is end , set slot empty 
4072
                    infoPrint("slot[%d] finished tidx=%d. completed thread count=%d\n", i, slot[i], t);
×
4073
                    slot[i] = EMPTY_SLOT;
×
4074
                }
4075
            } 
4076

4077
            if (slot[i] == EMPTY_SLOT && t < nthreads) {
×
4078
                // slot is empty , set new thread to running
4079
                threadInfo *pThreadInfo = infos + t;
×
4080
                if (stbInfo->interlaceRows > 0) {
×
4081
                    pthread_create(pids + t, NULL, syncWriteInterlace,   pThreadInfo);
×
4082
                } else {
4083
                    pthread_create(pids + t, NULL, syncWriteProgressive, pThreadInfo);
×
4084
                }
4085
                
4086
                // save current and move next
4087
                slot[i] = t;
×
4088
                t++;
×
4089
                infoPrint("slot[%d] start new thread tidx=%d. \n", i, slot[i]);
×
4090
            }
4091

4092
            // check slot empty
4093
            if(slot[i] == EMPTY_SLOT) {
×
4094
                emptySlot++;
×
4095
            }
4096
        }
4097

4098
        // check all thread end
4099
        if(emptySlot == limitThread) {
×
4100
            debugPrint("all threads(%d) is run finished.\n", nthreads);
×
4101
            break;
×
4102
        } else {
4103
            debugPrint("current thread index=%d all thread=%d\n", t, nthreads);
×
4104
        }
4105
    }
4106

4107
    tmfree(slot);
×
4108

4109
    return 0;
×
4110
}
4111
#endif
4112

4113
// run
4114
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids) {
324✔
4115
    infoPrint("run insert thread. real nthread=%d\n", nthreads);
324✔
4116
    // create threads
4117
    int threadCnt = 0;
324✔
4118
    for (int i = 0; i < nthreads && !g_arguments->terminate; i++) {
2,899!
4119
        threadInfo *pThreadInfo = infos + i;
2,575✔
4120
        if (stbInfo->interlaceRows > 0) {
2,575✔
4121
            pthread_create(pids + i, NULL, syncWriteInterlace,   pThreadInfo);
121✔
4122
        } else {
4123
            pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo);
2,454✔
4124
        }
4125
        threadCnt ++;
2,575✔
4126
    }    
4127

4128
    // wait threads
4129
    for (int i = 0; i < threadCnt; i++) {
2,899✔
4130
        infoPrint("pthread_join %d ...\n", i);
2,575✔
4131
        pthread_join(pids[i], NULL);
2,575✔
4132
    }
4133

4134
    return 0;
324✔
4135
}
4136

4137

4138
// exit and free resource
4139
int32_t exitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t spend) {
324✔
4140

4141
    if (g_arguments->terminate)  toolsMsleep(100);
324✔
4142

4143
    BArray *  total_delay_list = benchArrayInit(1, sizeof(int64_t));
324✔
4144
    int64_t   totalDelay = 0;
324✔
4145
    int64_t   totalDelay1 = 0;
324✔
4146
    int64_t   totalDelay2 = 0;
324✔
4147
    int64_t   totalDelay3 = 0;
324✔
4148
    uint64_t  totalInsertRows = 0;
324✔
4149

4150
    // free threads resource
4151
    for (int i = 0; i < nthreads; i++) {
2,899✔
4152
        threadInfo *pThreadInfo = infos + i;
2,575✔
4153
        // free check sql
4154
        if (pThreadInfo->csql) {
2,575✔
4155
            tmfree(pThreadInfo->csql);
4✔
4156
            pThreadInfo->csql = NULL;
4✔
4157
        }
4158

4159
        // close conn
4160
        int protocol = stbInfo->lineProtocol;
2,575✔
4161
        switch (stbInfo->iface) {
2,575!
4162
            case REST_IFACE:
4✔
4163
                if (g_arguments->terminate)
4!
4164
                    toolsMsleep(100);
×
4165
                destroySockFd(pThreadInfo->sockfd);
4✔
4166
                if (stbInfo->interlaceRows > 0) {
4!
4167
                    free_ds(&pThreadInfo->buffer);
×
4168
                } else {
4169
                    tmfree(pThreadInfo->buffer);
4✔
4170
                    pThreadInfo->buffer = NULL;
4✔
4171
                }
4172
                break;
4✔
4173
            case SML_REST_IFACE:
37✔
4174
                if (g_arguments->terminate)
37!
4175
                    toolsMsleep(100);
×
4176
                tmfree(pThreadInfo->buffer);
37✔
4177
                // on-purpose no break here            
4178
            case SML_IFACE:
233✔
4179
                if (TSDB_SML_JSON_PROTOCOL != protocol
233✔
4180
                        && SML_JSON_TAOS_FORMAT != protocol) {
146✔
4181
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
308✔
4182
                        tmfree(pThreadInfo->sml_tags[t]);
201✔
4183
                    }
4184
                    for (int j = 0; j < g_arguments->reqPerReq; j++) {
81,043✔
4185
                        tmfree(pThreadInfo->lines[j]);
80,936✔
4186
                    }
4187
                    tmfree(pThreadInfo->sml_tags);
107✔
4188
                    pThreadInfo->sml_tags = NULL;
107✔
4189
                } else {
4190
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
372✔
4191
                        tmfree(pThreadInfo->sml_tags_json_array[t]);
246✔
4192
                    }
4193
                    tmfree(pThreadInfo->sml_tags_json_array);
126✔
4194
                    pThreadInfo->sml_tags_json_array = NULL;
126✔
4195
                    if (pThreadInfo->sml_json_tags) {
126!
4196
                        tools_cJSON_Delete(pThreadInfo->sml_json_tags);
126✔
4197
                        pThreadInfo->sml_json_tags = NULL;
126✔
4198
                    }
4199
                    if (pThreadInfo->json_array) {
126!
4200
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
4201
                        pThreadInfo->json_array = NULL;
×
4202
                    }
4203
                }
4204
                if (pThreadInfo->lines) {
233!
4205
                    if ((0 == stbInfo->interlaceRows)
233✔
4206
                            && (TSDB_SML_JSON_PROTOCOL == protocol)) {
201✔
4207
                        tmfree(pThreadInfo->lines[0]);
75✔
4208
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
222✔
4209
                            tmfree(pThreadInfo->sml_json_value_array[t]);
147✔
4210
                        }
4211
                        tmfree(pThreadInfo->sml_json_value_array);
75✔
4212
                    }
4213
                    tmfree(pThreadInfo->lines);
233✔
4214
                    pThreadInfo->lines = NULL;
233✔
4215
                }
4216
                break;
233✔
4217

4218
            case STMT_IFACE:
111✔
4219
                // close stmt
4220
                if(pThreadInfo->conn->stmt) {
111!
4221
                    taos_stmt_close(pThreadInfo->conn->stmt);
111✔
4222
                    pThreadInfo->conn->stmt = NULL;
111✔
4223
                }
4224
            case STMT2_IFACE:
4225
                // close stmt2
4226
                if (pThreadInfo->conn->stmt2) {
134✔
4227
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
23✔
4228
                    pThreadInfo->conn->stmt2 = NULL;
23✔
4229
                }
4230

4231
                tmfree(pThreadInfo->bind_ts);
134✔
4232
                tmfree(pThreadInfo->bind_ts_array);
134✔
4233
                tmfree(pThreadInfo->bindParams);
134✔
4234
                
4235
                // free tagsStmt
4236
                BArray *tags = pThreadInfo->tagsStmt;
134✔
4237
                if(tags) {
134!
4238
                    // free child
4239
                    for (int k = 0; k < tags->size; k++) {
669✔
4240
                        Field * tag = benchArrayGet(tags, k);
535✔
4241
                        tmfree(tag->stmtData.data);
535✔
4242
                        tag->stmtData.data = NULL;
535✔
4243
                        tmfree(tag->stmtData.is_null);
535✔
4244
                        tag->stmtData.is_null = NULL;
535✔
4245
                        tmfree(tag->stmtData.lengths);
535✔
4246
                        tag->stmtData.lengths = NULL;
535✔
4247
                    }
4248
                    // free parent
4249
                    benchArrayDestroy(tags);
134✔
4250
                    pThreadInfo->tagsStmt = NULL;
134✔
4251
                }
4252

4253
                // free lengths
4254
                if(pThreadInfo->lengths) {
134!
4255
                    for(int c = 0; c <= stbInfo->cols->size; c++) {
3,831✔
4256
                        tmfree(pThreadInfo->lengths[c]);
3,697✔
4257
                    }
4258
                    free(pThreadInfo->lengths);
134✔
4259
                    pThreadInfo->lengths = NULL;
134✔
4260
                }
4261
                break;
134✔
4262

4263
            case TAOSC_IFACE:
2,204✔
4264
                if (stbInfo->interlaceRows > 0) {
2,204✔
4265
                    free_ds(&pThreadInfo->buffer);
63✔
4266
                } else {
4267
                    tmfree(pThreadInfo->buffer);
2,141✔
4268
                    pThreadInfo->buffer = NULL;
2,141✔
4269
                }
4270
                break;
2,204✔
4271

4272
            default:
×
4273
                break;
×
4274
        }
4275
        totalInsertRows += pThreadInfo->totalInsertRows;
2,575✔
4276
        totalDelay += pThreadInfo->totalDelay;
2,575✔
4277
        totalDelay1 += pThreadInfo->totalDelay1;
2,575✔
4278
        totalDelay2 += pThreadInfo->totalDelay2;
2,575✔
4279
        totalDelay3 += pThreadInfo->totalDelay3;
2,575✔
4280
        benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
2,575✔
4281
                pThreadInfo->delayList->size, true);
2,575✔
4282
        tmfree(pThreadInfo->delayList);
2,575✔
4283
        pThreadInfo->delayList = NULL;
2,575✔
4284
        //  free conn
4285
        if (pThreadInfo->conn) {
2,575✔
4286
            closeBenchConn(pThreadInfo->conn);
2,534✔
4287
            pThreadInfo->conn = NULL;
2,534✔
4288
        }
4289
    }
4290

4291
    // calculate result
4292
    qsort(total_delay_list->pData, total_delay_list->size,
324✔
4293
            total_delay_list->elemSize, compare);
4294

4295
    if (g_arguments->terminate)  toolsMsleep(100);
324✔
4296

4297
    tmfree(pids);
324✔
4298
    tmfree(infos);
324✔
4299

4300
    // print result
4301
    int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
324✔
4302
                              total_delay_list, nthreads, totalInsertRows, spend);
4303
    benchArrayDestroy(total_delay_list);
324✔
4304
    if (g_fail || ret != 0) {
324!
4305
        return -1;
11✔
4306
    }
4307
    return 0;
313✔
4308
}
4309

4310
static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo) {
336✔
4311
    if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE)
336✔
4312
            && !stbInfo->use_metric) {
69✔
4313
        errorPrint("%s", "schemaless cannot work without stable\n");
2!
4314
        return -1;
2✔
4315
    }
4316

4317
    // check argument valid
4318
    preProcessArgument(stbInfo);
334✔
4319

4320
    // ntable
4321
    int64_t ntables = obtainTableCount(database, stbInfo);
334✔
4322
    if (ntables == 0) {
334!
4323
        errorPrint("insert table count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4324
        return -1;
×
4325
    }
4326

4327
    // assign table to thread
4328
    int32_t  nthreads  = g_arguments->nthreads;
334✔
4329
    int64_t  div       = 0;  // ntable / nthread  division
334✔
4330
    int64_t  mod       = 0;  // ntable % nthread
334✔
4331
    int64_t  spend     = 0;
334✔
4332

4333
    if (g_arguments->bind_vgroup) {
334✔
4334
        nthreads = assignTableToThread(database, stbInfo);
4✔
4335
        if(nthreads == 0) {
4!
4336
            errorPrint("bind vgroup assign theads count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4337
            return -1;
×
4338
        }
4339
    } else {
4340
        if(nthreads == 0) {
330!
4341
            errorPrint("argument thread_count can not be zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4342
            return -1;
×
4343
        }
4344
        div = ntables / nthreads;
330✔
4345
        if (div < 1) {
330✔
4346
            nthreads = (int32_t)ntables;
68✔
4347
            div = 1;
68✔
4348
        }
4349
        mod = ntables % nthreads;
330✔
4350
    }
4351

4352

4353
    // init each thread information
4354
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
334✔
4355
    threadInfo  *infos = benchCalloc(1, nthreads * sizeof(threadInfo), true);
334✔
4356

4357
    // init
4358
    int32_t ret = initInsertThread(database, stbInfo, nthreads, infos, div, mod);
334✔
4359
    if( ret != 0) {
334✔
4360
        errorPrint("init insert thread failed. %s.%s\n", database->dbName, stbInfo->stbName);
10!
4361
        tmfree(pids);
10✔
4362
        tmfree(infos);
10✔
4363
        return ret;
10✔
4364
    }
4365

4366
    infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576);
324✔
4367
    prompt(0);
324✔
4368

4369
   
4370
    // run
4371
    int64_t start = toolsGetTimestampUs();
324✔
4372
    if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) {
324!
4373
        // need many batch execute all threads
4374
#ifdef LINUX        
4375
        ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids);
×
4376
#else
4377
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
4378
#endif        
4379
    } else {
4380
        // only one batch execute all threads
4381
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
324✔
4382
    }
4383

4384
    int64_t end = toolsGetTimestampUs();
324✔
4385
    if(end == start) {
324!
4386
        spend = 1;
×
4387
    } else {
4388
        spend = end - start;
324✔
4389
    }
4390

4391
    // exit
4392
    ret = exitInsertThread(database, stbInfo, nthreads, infos, pids, spend);
324✔
4393
    return ret;
324✔
4394
}
4395

4396
static int getStbInsertedRows(char* dbName, char* stbName, TAOS* taos) {
×
4397
    int rows = 0;
×
4398
    char command[SHORT_1K_SQL_BUFF_LEN];
4399
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT COUNT(*) FROM %s.%s",
×
4400
             dbName, stbName);
4401
    TAOS_RES* res = taos_query(taos, command);
×
4402
    int code = taos_errno(res);
×
4403
    if (code != 0) {
×
4404
        printErrCmdCodeStr(command, code, res);
×
4405
        return -1;
×
4406
    }
4407
    TAOS_ROW row = taos_fetch_row(res);
×
4408
    if (row == NULL) {
×
4409
        rows = 0;
×
4410
    } else {
4411
        rows = (int)*(int64_t*)row[0];
×
4412
    }
4413
    taos_free_result(res);
×
4414
    return rows;
×
4415
}
4416

4417
static void create_tsma(TSMA* tsma, SBenchConn* conn, char* stbName) {
×
4418
    char command[SHORT_1K_SQL_BUFF_LEN];
4419
    int len = snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
4420
                       "CREATE sma INDEX %s ON %s function(%s) "
4421
                       "INTERVAL (%s) SLIDING (%s)",
4422
                       tsma->name, stbName, tsma->func,
4423
                       tsma->interval, tsma->sliding);
4424
    if (tsma->custom) {
×
4425
        snprintf(command + len, SHORT_1K_SQL_BUFF_LEN - len,
×
4426
                 " %s", tsma->custom);
4427
    }
4428
    int code = queryDbExecCall(conn, command);
×
4429
    if (code == 0) {
×
4430
        infoPrint("successfully create tsma with command <%s>\n", command);
×
4431
    }
4432
}
×
4433

4434
static void* create_tsmas(void* args) {
×
4435
    tsmaThreadInfo* pThreadInfo = (tsmaThreadInfo*) args;
×
4436
    int inserted_rows = 0;
×
4437
    SBenchConn* conn = initBenchConn();
×
4438
    if (NULL == conn) {
×
4439
        return NULL;
×
4440
    }
4441
    int finished = 0;
×
4442
    if (taos_select_db(conn->taos, pThreadInfo->dbName)) {
×
4443
        errorPrint("failed to use database (%s)\n", pThreadInfo->dbName);
×
4444
        closeBenchConn(conn);
×
4445
        return NULL;
×
4446
    }
4447
    while (finished < pThreadInfo->tsmas->size && inserted_rows >= 0) {
×
4448
        inserted_rows = (int)getStbInsertedRows(
×
4449
                pThreadInfo->dbName, pThreadInfo->stbName, conn->taos);
4450
        for (int i = 0; i < pThreadInfo->tsmas->size; i++) {
×
4451
            TSMA* tsma = benchArrayGet(pThreadInfo->tsmas, i);
×
4452
            if (!tsma->done &&  inserted_rows >= tsma->start_when_inserted) {
×
4453
                create_tsma(tsma, conn, pThreadInfo->stbName);
×
4454
                tsma->done = true;
×
4455
                finished++;
×
4456
                break;
×
4457
            }
4458
        }
4459
        toolsMsleep(10);
×
4460
    }
4461
    benchArrayDestroy(pThreadInfo->tsmas);
×
4462
    closeBenchConn(conn);
×
4463
    return NULL;
×
4464
}
4465

4466
static int32_t createStream(SSTREAM* stream) {
7✔
4467
    int32_t code = -1;
7✔
4468
    char * command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
7✔
4469
    snprintf(command, TSDB_MAX_ALLOWED_SQL_LEN, "DROP STREAM IF EXISTS %s",
7✔
4470
             stream->stream_name);
7✔
4471
    infoPrint("%s\n", command);
7✔
4472
    SBenchConn* conn = initBenchConn();
7✔
4473
    if (NULL == conn) {
7!
4474
        goto END_STREAM;
×
4475
    }
4476

4477
    code = queryDbExecCall(conn, command);
7✔
4478
    int32_t trying = g_arguments->keep_trying;
7✔
4479
    while (code && trying) {
7!
4480
        infoPrint("will sleep %"PRIu32" milliseconds then re-drop stream %s\n",
×
4481
                          g_arguments->trying_interval, stream->stream_name);
4482
        toolsMsleep(g_arguments->trying_interval);
×
4483
        code = queryDbExecCall(conn, command);
×
4484
        if (trying != -1) {
×
4485
            trying--;
×
4486
        }
4487
    }
4488

4489
    if (code) {
7!
4490
        closeBenchConn(conn);
×
4491
        goto END_STREAM;
×
4492
    }
4493

4494
    memset(command, 0, TSDB_MAX_ALLOWED_SQL_LEN);
7✔
4495
    int pos = snprintf(command, TSDB_MAX_ALLOWED_SQL_LEN,
7✔
4496
            "CREATE STREAM IF NOT EXISTS %s ", stream->stream_name);
7✔
4497
    if (stream->trigger_mode[0] != '\0') {
7✔
4498
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
6✔
4499
                "TRIGGER %s ", stream->trigger_mode);
6✔
4500
    }
4501
    if (stream->watermark[0] != '\0') {
7✔
4502
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4503
                "WATERMARK %s ", stream->watermark);
1✔
4504
    }
4505
    if (stream->ignore_update[0] != '\0') {
7✔
4506
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
6✔
4507
                "IGNORE UPDATE %s ", stream->ignore_update);
6✔
4508
    }
4509
    if (stream->ignore_expired[0] != '\0') {
7✔
4510
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
6✔
4511
                "IGNORE EXPIRED %s ", stream->ignore_expired);
6✔
4512
    }
4513
    if (stream->fill_history[0] != '\0') {
7✔
4514
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
6✔
4515
                "FILL_HISTORY %s ", stream->fill_history);
6✔
4516
    }
4517
    pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
7✔
4518
            "INTO %s ", stream->stream_stb);
7✔
4519
    if (stream->stream_stb_field[0] != '\0') {
7✔
4520
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4521
                "%s ", stream->stream_stb_field);
1✔
4522
    }
4523
    if (stream->stream_tag_field[0] != '\0') {
7✔
4524
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4525
                "TAGS%s ", stream->stream_tag_field);
1✔
4526
    }
4527
    if (stream->subtable[0] != '\0') {
7✔
4528
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4529
                "SUBTABLE%s ", stream->subtable);
1✔
4530
    }
4531
    snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
7✔
4532
            "as %s", stream->source_sql);
7✔
4533
    infoPrint("%s\n", command);
7✔
4534

4535
    code = queryDbExecCall(conn, command);
7✔
4536
    trying = g_arguments->keep_trying;
7✔
4537
    while (code && trying) {
7!
4538
        infoPrint("will sleep %"PRIu32" milliseconds "
×
4539
                  "then re-create stream %s\n",
4540
                  g_arguments->trying_interval, stream->stream_name);
4541
        toolsMsleep(g_arguments->trying_interval);
×
4542
        code = queryDbExecCall(conn, command);
×
4543
        if (trying != -1) {
×
4544
            trying--;
×
4545
        }
4546
    }
4547

4548
    closeBenchConn(conn);
7✔
4549
END_STREAM:
7✔
4550
    tmfree(command);
7✔
4551
    return code;
7✔
4552
}
4553

4554
void changeGlobalIface() {
276✔
4555
    if (g_arguments->databases->size == 1) {
276✔
4556
            SDataBase *db = benchArrayGet(g_arguments->databases, 0);
255✔
4557
            if (db && db->superTbls->size == 1) {
255!
4558
                SSuperTable *stb = benchArrayGet(db->superTbls, 0);
222✔
4559
                if (stb) {
222!
4560
                    if(g_arguments->iface != stb->iface) {
222✔
4561
                        infoPrint("only 1 db 1 super table, g_arguments->iface(%d) replace with stb->iface(%d) \n", g_arguments->iface, stb->iface);
21✔
4562
                        g_arguments->iface = stb->iface;
21✔
4563
                    }
4564
                }
4565
            }
4566
    }
4567
}
276✔
4568

4569
int insertTestProcess() {
276✔
4570
    prompt(0);
276✔
4571

4572
    encodeAuthBase64();
276✔
4573
    // if only one stable, global iface same with stable->iface
4574
    changeGlobalIface();
276✔
4575

4576
    // move from loop to here
4577
    if (isRest(g_arguments->iface)) {
276✔
4578
        if (0 != convertServAddr(g_arguments->iface,
8!
4579
                                 false,
4580
                                 1)) {
4581
            return -1;
×
4582
        }
4583
    }    
4584

4585
    //loop create database 
4586
    for (int i = 0; i < g_arguments->databases->size; i++) {
566✔
4587
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
297✔
4588

4589
        if (database->drop && !(g_arguments->supplementInsert)) {
297✔
4590
            if (database->superTbls && database->superTbls->size > 0) {
211!
4591
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, 0);
208✔
4592
                if (stbInfo && isRest(stbInfo->iface)) {
208!
4593
                    if (0 != convertServAddr(stbInfo->iface,
12!
4594
                                             stbInfo->tcpTransfer,
12✔
4595
                                             stbInfo->lineProtocol)) {
12✔
4596
                        return -1;
×
4597
                    }
4598
                }
4599
            }
4600

4601
            if (createDatabase(database)) {
211✔
4602
                errorPrint("failed to create database (%s)\n",
7!
4603
                        database->dbName);
4604
                return -1;
7✔
4605
            }
4606
            succPrint("created database (%s)\n", database->dbName);
204!
4607
        } else if(g_arguments->bind_vgroup) {
86✔
4608
            // database already exist, get vgroups from server
4609
            SBenchConn* conn = initBenchConn();
2✔
4610
            if (conn) {
2!
4611
                int32_t vgroups = getVgroupsNative(conn, database);
2✔
4612
                if (vgroups <=0) {
2!
4613
                    closeBenchConn(conn);
×
4614
                    errorPrint("Database %s's vgroups is zero , db exist case.\n", database->dbName);
×
4615
                    return -1;
×
4616
                }
4617
                closeBenchConn(conn);
2✔
4618
                succPrint("Database (%s) get vgroups num is %d from server.\n", database->dbName, vgroups);
2!
4619
            }
4620
        }
4621
    }
4622

4623
    // create super table && fill child tables && prepareSampleData
4624
    for (int i = 0; i < g_arguments->databases->size; i++) {
559✔
4625
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
290✔
4626
        if (database->superTbls) {
290!
4627
            for (int j = 0; j < database->superTbls->size; j++) {
647✔
4628
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
357✔
4629
                if (stbInfo->iface != SML_IFACE
357✔
4630
                        && stbInfo->iface != SML_REST_IFACE
302✔
4631
                        && !stbInfo->childTblExists) {
288✔
4632
                    int code = getSuperTableFromServer(database, stbInfo);
237✔
4633
                    if (code == TSDB_CODE_FAILED) {
237!
4634
                        return -1;
×
4635
                    }
4636
                    
4637
                    // with create table if not exists, so if exist, can not report failed
4638
                    if (createSuperTable(database, stbInfo)) {
237!
4639
                        return -1;
×
4640
                    }
4641
                    
4642
                }
4643
                // fill last ts from super table
4644
                if(stbInfo->autoFillback && stbInfo->childTblExists) {
357!
4645
                    fillSTableLastTs(database, stbInfo);
×
4646
                }
4647

4648
                // calc now 
4649
                if(stbInfo->calcNow) {
357✔
4650
                    calcExprFromServer(database, stbInfo);
6✔
4651
                }
4652

4653
                // check fill child table count valid
4654
                if(fillChildTblName(database, stbInfo) <= 0) {
357✔
4655
                    infoPrint(" warning fill childs table count is zero, db:%s stb: %s \n", database->dbName, stbInfo->stbName);
12✔
4656
                }
4657
                if (0 != prepareSampleData(database, stbInfo)) {
357!
4658
                    return -1;
×
4659
                }
4660

4661
                // early malloc buffer for auto create table
4662
                if((stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating) {
357✔
4663
                    prepareTagsStmt(stbInfo);
5✔
4664
                }
4665

4666
                // execute sqls
4667
                if (stbInfo->sqls) {
357✔
4668
                    char **sqls = stbInfo->sqls;
16✔
4669
                    while (*sqls) {
80✔
4670
                        queryDbExec(database, stbInfo, *sqls);
64✔
4671
                        sqls++;
64✔
4672
                    } 
4673
                }
4674
            }
4675
        }
4676
    }
4677

4678
    // tsma
4679
    if (g_arguments->taosc_version == 3) {
269!
4680
        for (int i = 0; i < g_arguments->databases->size; i++) {
559✔
4681
            SDataBase* database = benchArrayGet(g_arguments->databases, i);
290✔
4682
            if (database->superTbls) {
290!
4683
                for (int j = 0; (j < database->superTbls->size
290✔
4684
                        && !g_arguments->terminate); j++) {
647!
4685
                    SSuperTable* stbInfo =
4686
                        benchArrayGet(database->superTbls, j);
357✔
4687
                    if (stbInfo->tsmas == NULL) {
357✔
4688
                        continue;
84✔
4689
                    }
4690
                    if (stbInfo->tsmas->size > 0) {
273!
4691
                        tsmaThreadInfo* pThreadInfo =
4692
                            benchCalloc(1, sizeof(tsmaThreadInfo), true);
×
4693
                        pthread_t tsmas_pid = {0};
×
4694
                        pThreadInfo->dbName = database->dbName;
×
4695
                        pThreadInfo->stbName = stbInfo->stbName;
×
4696
                        pThreadInfo->tsmas = stbInfo->tsmas;
×
4697
                        pthread_create(&tsmas_pid, NULL,
×
4698
                                       create_tsmas, pThreadInfo);
4699
                    }
4700
                }
4701
            }
4702
        }
4703
    }
4704

4705
    if (createChildTables()) return -1;
269✔
4706

4707
    if (g_arguments->taosc_version == 3) {
268!
4708
        for (int j = 0; j < g_arguments->streams->size; j++) {
287✔
4709
            SSTREAM * stream = benchArrayGet(g_arguments->streams, j);
21✔
4710
            if (stream->drop) {
21✔
4711
                if (createStream(stream)) {
7✔
4712
                    return -1;
2✔
4713
                }
4714
            }
4715
        }
4716
    }
4717

4718
    // create sub threads for inserting data
4719
    for (int i = 0; i < g_arguments->databases->size; i++) {
528✔
4720
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
285✔
4721
        if (database->superTbls) {
285!
4722
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
614✔
4723
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
352✔
4724
                if (stbInfo->insertRows == 0) {
352✔
4725
                    continue;
16✔
4726
                }
4727
                prompt(stbInfo->non_stop);
336✔
4728
                if (startMultiThreadInsertData(database, stbInfo)) {
336✔
4729
                    return -1;
23✔
4730
                }
4731
            }
4732
        }
4733
    }
4734
    return 0;
243✔
4735
}
4736

4737
//
4738
//     ------- STMT 2 -----------
4739
//
4740

4741
static int32_t stmt2BindAndSubmit(
117✔
4742
        threadInfo *pThreadInfo,
4743
        SChildTable *childTbl,
4744
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
4745
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w) {
4746
    
4747
    // create bindV
4748
    int32_t count            = 1;
117✔
4749
    TAOS_STMT2_BINDV * bindv = createBindV(count, 0, 0);
117✔
4750
    TAOS_STMT2 *stmt2        = pThreadInfo->conn->stmt2;
116✔
4751
    SSuperTable *stbInfo     = pThreadInfo->stbInfo;
116✔
4752

4753
    //
4754
    // bind
4755
    //
4756

4757
    // count
4758
    bindv->count = 1;
116✔
4759
    // tbnames
4760
    bindv->tbnames[0] = childTbl->name;
116✔
4761
    // tags
4762
    //bindv->tags[0] = NULL; // Progrssive mode tag put on prepare sql, no need put here
4763
   
4764
    // bind_cols
4765
    uint32_t batch = (g_arguments->reqPerReq > stbInfo->insertRows - i) ? (stbInfo->insertRows - i) : g_arguments->reqPerReq;
116✔
4766
    int32_t n = 0;
116✔
4767
    int64_t pos = i % g_arguments->prepared_rand;
116✔
4768

4769
    // adjust batch about pos
4770
    if(g_arguments->prepared_rand - pos < batch ) {
116!
4771
        debugPrint("prepared_rand(%" PRId64 ") is not a multiple of num_of_records_per(%d), the batch size can be modify. before=%d after=%d\n", 
×
4772
                    (int64_t)g_arguments->prepared_rand, (int32_t)g_arguments->reqPerReq, (int32_t)batch, (int32_t)(g_arguments->prepared_rand - pos));
4773
        batch = g_arguments->prepared_rand - pos;
×
4774
    } 
4775

4776
    if (batch == 0) {
116!
4777
        infoPrint("batch size is zero. pos = %"PRId64"\n", pos);
×
4778
        return 0;
×
4779
    }
4780

4781
    uint32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n);
116✔
4782
    if(generated == 0) {
116!
4783
        errorPrint( "get cols data bind information failed. table: %s\n", childTbl->name);
×
4784
        freeBindV(bindv);
×
4785
        return -1;
×
4786
    }
4787
    *timestamp += n * stbInfo->timestamp_step;
116✔
4788

4789
    if (g_arguments->debug_print) {
116!
4790
        showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
4791
    }
4792

4793
    // bind and submit
4794
    int32_t code = submitStmt2(pThreadInfo, bindv, delay1, delay3, startTs, endTs, &generated, w);
116✔
4795
    // free
4796
    freeBindV(bindv);
117✔
4797

4798
    if(code != 0) {
117!
4799
        errorPrint( "failed submitStmt2() progressive mode, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2));
×
4800
        return code;
×
4801
    } else {
4802
        debugPrint("succ submitStmt2 progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n",
117!
4803
                childTbl->name, batch, pos, *timestamp, generated);
4804
        return generated;
117✔
4805
    }
4806
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc